oxirs_tsdb/write/
wal.rs

1//! Write-Ahead Log for crash recovery
2//!
3//! Provides durability guarantees by logging data points before they
4//! are committed to compressed storage.
5
6use crate::error::{TsdbError, TsdbResult};
7use crate::series::DataPoint;
8use chrono::{DateTime, Utc};
9use std::fs::{File, OpenOptions};
10use std::io::{BufReader, BufWriter, Write};
11use std::path::{Path, PathBuf};
12
13/// Write-Ahead Log entry
14#[derive(Debug, Clone)]
15pub struct WalEntry {
16    /// Series identifier
17    pub series_id: u64,
18    /// Data point timestamp
19    pub timestamp: DateTime<Utc>,
20    /// Value
21    pub value: f64,
22}
23
24impl WalEntry {
25    /// Create new WAL entry
26    pub fn new(series_id: u64, point: DataPoint) -> Self {
27        Self {
28            series_id,
29            timestamp: point.timestamp,
30            value: point.value,
31        }
32    }
33
34    /// Serialize to binary format
35    ///
36    /// Format: series_id(8) + timestamp_ms(8) + value(8) = 24 bytes
37    pub fn to_bytes(&self) -> Vec<u8> {
38        let mut bytes = Vec::with_capacity(24);
39
40        // Series ID (8 bytes)
41        bytes.extend_from_slice(&self.series_id.to_le_bytes());
42
43        // Timestamp as milliseconds since epoch (8 bytes)
44        let ts_ms = self.timestamp.timestamp_millis();
45        bytes.extend_from_slice(&ts_ms.to_le_bytes());
46
47        // Value (8 bytes)
48        bytes.extend_from_slice(&self.value.to_le_bytes());
49
50        bytes
51    }
52
53    /// Deserialize from binary format
54    pub fn from_bytes(bytes: &[u8]) -> TsdbResult<Self> {
55        if bytes.len() != 24 {
56            return Err(TsdbError::Wal(format!(
57                "WAL entry must be 24 bytes, got {}",
58                bytes.len()
59            )));
60        }
61
62        let series_id = u64::from_le_bytes(
63            bytes[0..8]
64                .try_into()
65                .expect("slice is exactly 8 bytes as verified above"),
66        );
67        let ts_ms = i64::from_le_bytes(
68            bytes[8..16]
69                .try_into()
70                .expect("slice is exactly 8 bytes as verified above"),
71        );
72        let value = f64::from_le_bytes(
73            bytes[16..24]
74                .try_into()
75                .expect("slice is exactly 8 bytes as verified above"),
76        );
77
78        let timestamp = DateTime::from_timestamp_millis(ts_ms)
79            .ok_or_else(|| TsdbError::Wal(format!("Invalid timestamp: {}", ts_ms)))?;
80
81        Ok(Self {
82            series_id,
83            timestamp,
84            value,
85        })
86    }
87}
88
89/// Write-Ahead Log for durability
90pub struct WriteAheadLog {
91    /// Path to WAL file
92    path: PathBuf,
93    /// Writer handle
94    writer: BufWriter<File>,
95    /// Whether to fsync after each write
96    sync_on_write: bool,
97    /// Number of entries in current WAL
98    entry_count: u64,
99}
100
101impl WriteAheadLog {
102    /// Create or open a Write-Ahead Log
103    ///
104    /// # Arguments
105    ///
106    /// * `path` - Path to WAL file
107    /// * `sync_on_write` - If true, fsync after each append
108    pub fn new<P: AsRef<Path>>(path: P, sync_on_write: bool) -> TsdbResult<Self> {
109        let path = path.as_ref().to_path_buf();
110
111        let file = OpenOptions::new().create(true).append(true).open(&path)?;
112
113        // Count existing entries
114        let entry_count = if path.exists() {
115            let reader_file = File::open(&path)?;
116            let metadata = reader_file.metadata()?;
117            metadata.len() / 24 // 24 bytes per entry
118        } else {
119            0
120        };
121
122        Ok(Self {
123            path,
124            writer: BufWriter::new(file),
125            sync_on_write,
126            entry_count,
127        })
128    }
129
130    /// Append a data point to the WAL
131    pub fn append(&mut self, series_id: u64, point: DataPoint) -> TsdbResult<()> {
132        let entry = WalEntry::new(series_id, point);
133        let bytes = entry.to_bytes();
134
135        self.writer.write_all(&bytes)?;
136
137        if self.sync_on_write {
138            self.writer.flush()?;
139            self.writer.get_ref().sync_all()?;
140        }
141
142        self.entry_count += 1;
143
144        Ok(())
145    }
146
147    /// Append multiple data points in a batch
148    pub fn append_batch(&mut self, entries: &[(u64, DataPoint)]) -> TsdbResult<()> {
149        for (series_id, point) in entries {
150            let entry = WalEntry::new(*series_id, *point);
151            let bytes = entry.to_bytes();
152            self.writer.write_all(&bytes)?;
153        }
154
155        self.writer.flush()?;
156
157        if self.sync_on_write {
158            self.writer.get_ref().sync_all()?;
159        }
160
161        self.entry_count += entries.len() as u64;
162
163        Ok(())
164    }
165
166    /// Replay all entries in the WAL
167    ///
168    /// Returns vector of (series_id, DataPoint) tuples
169    pub fn replay(&self) -> TsdbResult<Vec<(u64, DataPoint)>> {
170        let file = File::open(&self.path)?;
171        let mut reader = BufReader::new(file);
172        let mut entries = Vec::new();
173
174        let mut buffer = vec![0u8; 24];
175
176        loop {
177            match std::io::Read::read_exact(&mut reader, &mut buffer) {
178                Ok(_) => {
179                    let entry = WalEntry::from_bytes(&buffer)?;
180                    entries.push((
181                        entry.series_id,
182                        DataPoint {
183                            timestamp: entry.timestamp,
184                            value: entry.value,
185                        },
186                    ));
187                }
188                Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
189                    break; // End of file
190                }
191                Err(e) => return Err(e.into()),
192            }
193        }
194
195        Ok(entries)
196    }
197
198    /// Clear the WAL after successful compaction
199    pub fn clear(&mut self) -> TsdbResult<()> {
200        // Close current writer
201        self.writer.flush()?;
202
203        // Truncate file
204        let file = OpenOptions::new()
205            .write(true)
206            .truncate(true)
207            .open(&self.path)?;
208
209        self.writer = BufWriter::new(file);
210        self.entry_count = 0;
211
212        Ok(())
213    }
214
215    /// Get number of entries in WAL
216    pub fn entry_count(&self) -> u64 {
217        self.entry_count
218    }
219
220    /// Get WAL file path
221    pub fn path(&self) -> &Path {
222        &self.path
223    }
224
225    /// Flush buffers to disk
226    pub fn flush(&mut self) -> TsdbResult<()> {
227        self.writer.flush()?;
228
229        if self.sync_on_write {
230            self.writer.get_ref().sync_all()?;
231        }
232
233        Ok(())
234    }
235}
236
237impl Drop for WriteAheadLog {
238    fn drop(&mut self) {
239        let _ = self.writer.flush();
240    }
241}
242
243#[cfg(test)]
244mod tests {
245    use super::*;
246    use chrono::TimeZone;
247    use std::env;
248
249    #[test]
250    fn test_wal_entry_serialization() {
251        let entry = WalEntry {
252            series_id: 42,
253            timestamp: Utc.with_ymd_and_hms(2024, 1, 1, 12, 0, 0).unwrap(),
254            value: 22.5,
255        };
256
257        let bytes = entry.to_bytes();
258        assert_eq!(bytes.len(), 24);
259
260        let recovered = WalEntry::from_bytes(&bytes).unwrap();
261        assert_eq!(recovered.series_id, 42);
262        assert_eq!(recovered.timestamp, entry.timestamp);
263        assert_eq!(recovered.value, 22.5);
264    }
265
266    #[test]
267    fn test_wal_append_and_replay() {
268        let temp_dir = env::temp_dir();
269        let wal_path = temp_dir.join("test_wal_append.log");
270
271        // Clean up any existing file
272        let _ = std::fs::remove_file(&wal_path);
273
274        {
275            let mut wal = WriteAheadLog::new(&wal_path, false).unwrap();
276
277            let timestamp = Utc::now();
278            let point1 = DataPoint {
279                timestamp,
280                value: 10.0,
281            };
282            let point2 = DataPoint {
283                timestamp: timestamp + chrono::Duration::seconds(1),
284                value: 20.0,
285            };
286
287            wal.append(1, point1).unwrap();
288            wal.append(2, point2).unwrap();
289
290            assert_eq!(wal.entry_count(), 2);
291        }
292
293        // Replay from disk
294        let wal = WriteAheadLog::new(&wal_path, false).unwrap();
295        let entries = wal.replay().unwrap();
296
297        assert_eq!(entries.len(), 2);
298        assert_eq!(entries[0].0, 1);
299        assert_eq!(entries[0].1.value, 10.0);
300        assert_eq!(entries[1].0, 2);
301        assert_eq!(entries[1].1.value, 20.0);
302
303        // Cleanup
304        let _ = std::fs::remove_file(&wal_path);
305    }
306
307    #[test]
308    fn test_wal_clear() {
309        let temp_dir = env::temp_dir();
310        let wal_path = temp_dir.join("test_wal_clear.log");
311
312        // Clean up any existing file
313        let _ = std::fs::remove_file(&wal_path);
314
315        {
316            let mut wal = WriteAheadLog::new(&wal_path, false).unwrap();
317
318            let point = DataPoint {
319                timestamp: Utc::now(),
320                value: 42.0,
321            };
322
323            wal.append(1, point).unwrap();
324            assert_eq!(wal.entry_count(), 1);
325
326            wal.clear().unwrap();
327            assert_eq!(wal.entry_count(), 0);
328        }
329
330        // Verify file is empty
331        let wal = WriteAheadLog::new(&wal_path, false).unwrap();
332        let entries = wal.replay().unwrap();
333        assert_eq!(entries.len(), 0);
334
335        // Cleanup
336        let _ = std::fs::remove_file(&wal_path);
337    }
338
339    #[test]
340    fn test_wal_batch_append() {
341        let temp_dir = env::temp_dir();
342        let wal_path = temp_dir.join("test_wal_batch.log");
343
344        // Clean up any existing file
345        let _ = std::fs::remove_file(&wal_path);
346
347        {
348            let mut wal = WriteAheadLog::new(&wal_path, false).unwrap();
349
350            let base_time = Utc::now();
351            let mut batch = Vec::new();
352
353            for i in 0..100 {
354                let point = DataPoint {
355                    timestamp: base_time + chrono::Duration::seconds(i),
356                    value: i as f64,
357                };
358                batch.push((i as u64, point));
359            }
360
361            wal.append_batch(&batch).unwrap();
362            assert_eq!(wal.entry_count(), 100);
363        }
364
365        // Replay and verify
366        let wal = WriteAheadLog::new(&wal_path, false).unwrap();
367        let entries = wal.replay().unwrap();
368
369        assert_eq!(entries.len(), 100);
370        for (i, (series_id, point)) in entries.iter().enumerate() {
371            assert_eq!(*series_id, i as u64);
372            assert_eq!(point.value, i as f64);
373        }
374
375        // Cleanup
376        let _ = std::fs::remove_file(&wal_path);
377    }
378
379    #[test]
380    fn test_wal_fsync() {
381        let temp_dir = env::temp_dir();
382        let wal_path = temp_dir.join("test_wal_fsync.log");
383
384        // Clean up any existing file
385        let _ = std::fs::remove_file(&wal_path);
386
387        {
388            let mut wal = WriteAheadLog::new(&wal_path, true).unwrap(); // sync_on_write = true
389
390            let point = DataPoint {
391                timestamp: Utc::now(),
392                value: 123.456,
393            };
394
395            wal.append(1, point).unwrap();
396        }
397
398        // Verify data persisted
399        let wal = WriteAheadLog::new(&wal_path, false).unwrap();
400        let entries = wal.replay().unwrap();
401
402        assert_eq!(entries.len(), 1);
403        assert_eq!(entries[0].0, 1);
404        assert_eq!(entries[0].1.value, 123.456);
405
406        // Cleanup
407        let _ = std::fs::remove_file(&wal_path);
408    }
409}