Skip to main content

grafeo_adapters/storage/wal/
log.rs

1//! WAL log file management.
2
3use super::WalRecord;
4use grafeo_common::types::{EpochId, TransactionId};
5use grafeo_common::utils::error::{Error, Result};
6use parking_lot::Mutex;
7use serde::{Deserialize, Serialize};
8use std::fs::{self, File, OpenOptions};
9use std::io::{BufReader, BufWriter, Read, Write};
10use std::path::{Path, PathBuf};
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
13
14/// Checkpoint metadata stored in a separate file.
15///
16/// This file is written atomically (via rename) during checkpoint and read
17/// during recovery to determine which WAL files can be skipped.
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct CheckpointMetadata {
20    /// The epoch at which the checkpoint was taken.
21    pub epoch: EpochId,
22    /// The log sequence number at the time of checkpoint.
23    pub log_sequence: u64,
24    /// Timestamp of the checkpoint (milliseconds since UNIX epoch).
25    pub timestamp_ms: u64,
26    /// Transaction ID at checkpoint.
27    pub transaction_id: TransactionId,
28}
29
30/// Name of the checkpoint metadata file.
31const CHECKPOINT_METADATA_FILE: &str = "checkpoint.meta";
32
33/// Durability mode for the WAL.
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum DurabilityMode {
36    /// Sync (fsync) after every commit for maximum durability.
37    /// Slowest but safest.
38    Sync,
39    /// Batch sync - fsync periodically (e.g., every N ms or N records).
40    /// Good balance of performance and durability.
41    Batch {
42        /// Maximum time between syncs in milliseconds.
43        max_delay_ms: u64,
44        /// Maximum records between syncs.
45        max_records: u64,
46    },
47    /// Adaptive sync - background thread adjusts timing based on flush duration.
48    ///
49    /// Unlike `Batch` which checks thresholds inline, `Adaptive` spawns a
50    /// dedicated flusher thread that maintains consistent flush cadence
51    /// regardless of disk speed. Use [`AdaptiveFlusher`](super::AdaptiveFlusher)
52    /// to manage the background thread.
53    ///
54    /// The WAL itself only buffers writes; the flusher thread handles syncing.
55    Adaptive {
56        /// Target interval between flushes in milliseconds.
57        /// The flusher adjusts wait times to maintain this cadence.
58        target_interval_ms: u64,
59    },
60    /// No sync - rely on OS buffer flushing.
61    /// Fastest but may lose recent data on crash.
62    NoSync,
63}
64
65impl Default for DurabilityMode {
66    fn default() -> Self {
67        Self::Batch {
68            max_delay_ms: 100,
69            max_records: 1000,
70        }
71    }
72}
73
74/// Configuration for the WAL manager.
75#[derive(Debug, Clone)]
76pub struct WalConfig {
77    /// Durability mode.
78    pub durability: DurabilityMode,
79    /// Maximum log file size before rotation (in bytes).
80    pub max_log_size: u64,
81    /// Whether to enable compression.
82    pub compression: bool,
83}
84
85impl Default for WalConfig {
86    fn default() -> Self {
87        Self {
88            durability: DurabilityMode::default(),
89            max_log_size: 64 * 1024 * 1024, // 64 MB
90            compression: false,
91        }
92    }
93}
94
95/// State for a single log file.
96struct LogFile {
97    /// File handle.
98    writer: BufWriter<File>,
99    /// Current size in bytes.
100    size: u64,
101    /// File path.
102    path: PathBuf,
103}
104
105/// Manages the Write-Ahead Log with rotation, checkpointing, and durability modes.
106pub struct WalManager {
107    /// Directory for WAL files.
108    dir: PathBuf,
109    /// Configuration.
110    config: WalConfig,
111    /// Active log file.
112    active_log: Mutex<Option<LogFile>>,
113    /// Total number of records written across all log files.
114    total_record_count: AtomicU64,
115    /// Records since last sync (for batch mode).
116    records_since_sync: AtomicU64,
117    /// Time of last sync (for batch mode).
118    last_sync: Mutex<Instant>,
119    /// Current log sequence number.
120    current_sequence: AtomicU64,
121    /// Latest checkpoint epoch.
122    checkpoint_epoch: Mutex<Option<EpochId>>,
123}
124
125impl WalManager {
126    /// Opens or creates a WAL in the given directory.
127    ///
128    /// # Errors
129    ///
130    /// Returns an error if the directory cannot be created or accessed.
131    pub fn open(dir: impl AsRef<Path>) -> Result<Self> {
132        Self::with_config(dir, WalConfig::default())
133    }
134
135    /// Opens or creates a WAL with custom configuration.
136    ///
137    /// # Errors
138    ///
139    /// Returns an error if the directory cannot be created or accessed.
140    pub fn with_config(dir: impl AsRef<Path>, config: WalConfig) -> Result<Self> {
141        let dir = dir.as_ref().to_path_buf();
142        fs::create_dir_all(&dir)?;
143
144        // Find the highest existing sequence number
145        let mut max_sequence = 0u64;
146        if let Ok(entries) = fs::read_dir(&dir) {
147            for entry in entries.flatten() {
148                if let Some(name) = entry.file_name().to_str()
149                    && let Some(seq_str) = name
150                        .strip_prefix("wal_")
151                        .and_then(|s| s.strip_suffix(".log"))
152                    && let Ok(seq) = seq_str.parse::<u64>()
153                {
154                    max_sequence = max_sequence.max(seq);
155                }
156            }
157        }
158
159        let manager = Self {
160            dir,
161            config,
162            active_log: Mutex::new(None),
163            total_record_count: AtomicU64::new(0),
164            records_since_sync: AtomicU64::new(0),
165            last_sync: Mutex::new(Instant::now()),
166            current_sequence: AtomicU64::new(max_sequence),
167            checkpoint_epoch: Mutex::new(None),
168        };
169
170        // Open or create the active log
171        manager.ensure_active_log()?;
172
173        Ok(manager)
174    }
175
176    /// Logs a record to the WAL.
177    ///
178    /// # Errors
179    ///
180    /// Returns an error if the record cannot be written.
181    pub fn log(&self, record: &WalRecord) -> Result<()> {
182        let data = bincode::serde::encode_to_vec(record, bincode::config::standard())
183            .map_err(|e| Error::Serialization(e.to_string()))?;
184        let force_sync = matches!(record, WalRecord::TransactionCommit { .. });
185        self.write_frame(&data, force_sync)
186    }
187
188    /// Writes a pre-serialized frame to the active WAL log.
189    ///
190    /// Frame format: `[length: u32 LE][data: bytes][crc32: u32 LE]`.
191    /// Handles durability mode (sync/batch/adaptive/nosync) and log rotation.
192    ///
193    /// `force_sync` controls whether an fsync is performed in Sync durability
194    /// mode. Callers typically set this to `true` for commit markers.
195    pub(crate) fn write_frame(&self, data: &[u8], force_sync: bool) -> Result<()> {
196        use grafeo_core::testing::crash::maybe_crash;
197
198        self.ensure_active_log()?;
199
200        let mut guard = self.active_log.lock();
201        let log_file = guard
202            .as_mut()
203            .ok_or_else(|| Error::Internal("WAL writer not available".to_string()))?;
204
205        maybe_crash("wal_before_write");
206
207        // Write length prefix
208        let len = data.len() as u32;
209        log_file.writer.write_all(&len.to_le_bytes())?;
210
211        // Write data
212        log_file.writer.write_all(data)?;
213
214        // Write checksum
215        let checksum = crc32fast::hash(data);
216        log_file.writer.write_all(&checksum.to_le_bytes())?;
217
218        maybe_crash("wal_after_write");
219
220        // Update size tracking
221        let record_size = 4 + data.len() as u64 + 4; // length + data + checksum
222        log_file.size += record_size;
223
224        self.total_record_count.fetch_add(1, Ordering::Relaxed);
225        self.records_since_sync.fetch_add(1, Ordering::Relaxed);
226
227        // Check if we need to rotate
228        let needs_rotation = log_file.size >= self.config.max_log_size;
229
230        // Handle durability mode
231        match &self.config.durability {
232            DurabilityMode::Sync => {
233                if force_sync {
234                    maybe_crash("wal_before_flush");
235                    log_file.writer.flush()?;
236                    log_file.writer.get_ref().sync_all()?;
237                    self.records_since_sync.store(0, Ordering::Relaxed);
238                    *self.last_sync.lock() = Instant::now();
239                }
240            }
241            DurabilityMode::Batch {
242                max_delay_ms,
243                max_records,
244            } => {
245                let records = self.records_since_sync.load(Ordering::Relaxed);
246                let elapsed = self.last_sync.lock().elapsed();
247
248                if records >= *max_records || elapsed >= Duration::from_millis(*max_delay_ms) {
249                    log_file.writer.flush()?;
250                    log_file.writer.get_ref().sync_all()?;
251                    self.records_since_sync.store(0, Ordering::Relaxed);
252                    *self.last_sync.lock() = Instant::now();
253                }
254            }
255            DurabilityMode::Adaptive { .. } => {
256                // Adaptive mode: just flush buffer, background thread handles sync
257                log_file.writer.flush()?;
258            }
259            DurabilityMode::NoSync => {
260                // Just flush buffer, no sync
261                log_file.writer.flush()?;
262            }
263        }
264
265        drop(guard);
266
267        // Rotate if needed
268        if needs_rotation {
269            self.rotate()?;
270        }
271
272        Ok(())
273    }
274
275    /// Writes a checkpoint marker and persists checkpoint metadata.
276    ///
277    /// The checkpoint metadata is written atomically to a separate file,
278    /// allowing recovery to skip WAL files that precede the checkpoint.
279    ///
280    /// # Errors
281    ///
282    /// Returns an error if the checkpoint cannot be written.
283    pub fn checkpoint(&self, current_transaction: TransactionId, epoch: EpochId) -> Result<()> {
284        self.log(&WalRecord::Checkpoint {
285            transaction_id: current_transaction,
286        })?;
287        self.complete_checkpoint(current_transaction, epoch)
288    }
289
290    /// Completes a checkpoint after the checkpoint record has been written.
291    ///
292    /// Syncs the WAL, writes checkpoint metadata atomically, updates the
293    /// in-memory epoch, and truncates old log files.
294    pub(crate) fn complete_checkpoint(
295        &self,
296        transaction_id: TransactionId,
297        epoch: EpochId,
298    ) -> Result<()> {
299        // Force sync on checkpoint
300        self.sync()?;
301
302        // Get current log sequence
303        let log_sequence = self.current_sequence.load(Ordering::SeqCst);
304
305        // Get current timestamp
306        let timestamp_ms = SystemTime::now()
307            .duration_since(UNIX_EPOCH)
308            .map(|d| d.as_millis() as u64)
309            .unwrap_or(0);
310
311        // Create checkpoint metadata
312        let metadata = CheckpointMetadata {
313            epoch,
314            log_sequence,
315            timestamp_ms,
316            transaction_id,
317        };
318
319        // Write checkpoint metadata atomically
320        self.write_checkpoint_metadata(&metadata)?;
321
322        // Update in-memory checkpoint epoch
323        *self.checkpoint_epoch.lock() = Some(epoch);
324
325        // Optionally truncate old logs
326        self.truncate_old_logs()?;
327
328        Ok(())
329    }
330
331    /// Writes checkpoint metadata to disk atomically.
332    ///
333    /// Uses a write-to-temp-then-rename pattern for atomicity.
334    fn write_checkpoint_metadata(&self, metadata: &CheckpointMetadata) -> Result<()> {
335        let metadata_path = self.dir.join(CHECKPOINT_METADATA_FILE);
336        let temp_path = self.dir.join(format!("{}.tmp", CHECKPOINT_METADATA_FILE));
337
338        // Serialize metadata
339        let data = bincode::serde::encode_to_vec(metadata, bincode::config::standard())
340            .map_err(|e| Error::Serialization(e.to_string()))?;
341
342        // Write to temp file
343        let mut file = File::create(&temp_path)?;
344        file.write_all(&data)?;
345        file.sync_all()?;
346        drop(file);
347
348        // Atomic rename
349        fs::rename(&temp_path, &metadata_path)?;
350
351        Ok(())
352    }
353
354    /// Reads checkpoint metadata from disk.
355    ///
356    /// Returns `None` if no checkpoint metadata exists.
357    pub fn read_checkpoint_metadata(&self) -> Result<Option<CheckpointMetadata>> {
358        let metadata_path = self.dir.join(CHECKPOINT_METADATA_FILE);
359
360        if !metadata_path.exists() {
361            return Ok(None);
362        }
363
364        let file = File::open(&metadata_path)?;
365        let mut reader = BufReader::new(file);
366        let mut data = Vec::new();
367        reader.read_to_end(&mut data)?;
368
369        let (metadata, _): (CheckpointMetadata, _) =
370            bincode::serde::decode_from_slice(&data, bincode::config::standard())
371                .map_err(|e| Error::Serialization(e.to_string()))?;
372
373        Ok(Some(metadata))
374    }
375
376    /// Rotates to a new log file.
377    ///
378    /// # Errors
379    ///
380    /// Returns an error if rotation fails.
381    pub fn rotate(&self) -> Result<()> {
382        let new_sequence = self.current_sequence.fetch_add(1, Ordering::SeqCst) + 1;
383        let new_path = self.log_path(new_sequence);
384
385        let file = OpenOptions::new()
386            .create(true)
387            .read(true)
388            .append(true)
389            .open(&new_path)?;
390
391        let new_log = LogFile {
392            writer: BufWriter::new(file),
393            size: 0,
394            path: new_path,
395        };
396
397        // Replace active log
398        let mut guard = self.active_log.lock();
399        if let Some(old_log) = guard.take() {
400            // Ensure old log is flushed
401            drop(old_log);
402        }
403        *guard = Some(new_log);
404
405        Ok(())
406    }
407
408    /// Flushes the WAL buffer to disk.
409    ///
410    /// # Errors
411    ///
412    /// Returns an error if the flush fails.
413    pub fn flush(&self) -> Result<()> {
414        let mut guard = self.active_log.lock();
415        if let Some(log_file) = guard.as_mut() {
416            log_file.writer.flush()?;
417        }
418        Ok(())
419    }
420
421    /// Syncs the WAL to disk (fsync).
422    ///
423    /// # Errors
424    ///
425    /// Returns an error if the sync fails.
426    pub fn sync(&self) -> Result<()> {
427        let mut guard = self.active_log.lock();
428        if let Some(log_file) = guard.as_mut() {
429            log_file.writer.flush()?;
430            log_file.writer.get_ref().sync_all()?;
431        }
432        self.records_since_sync.store(0, Ordering::Relaxed);
433        *self.last_sync.lock() = Instant::now();
434        Ok(())
435    }
436
437    /// Returns the total number of records written.
438    #[must_use]
439    pub fn record_count(&self) -> u64 {
440        self.total_record_count.load(Ordering::Relaxed)
441    }
442
443    /// Returns the WAL directory path.
444    #[must_use]
445    pub fn dir(&self) -> &Path {
446        &self.dir
447    }
448
449    /// Returns the current durability mode.
450    #[must_use]
451    pub fn durability_mode(&self) -> DurabilityMode {
452        self.config.durability
453    }
454
455    /// Returns all WAL log file paths in sequence order.
456    pub fn log_files(&self) -> Result<Vec<PathBuf>> {
457        let mut files = Vec::new();
458
459        if let Ok(entries) = fs::read_dir(&self.dir) {
460            for entry in entries.flatten() {
461                let path = entry.path();
462                if path.extension().is_some_and(|ext| ext == "log") {
463                    files.push(path);
464                }
465            }
466        }
467
468        // Sort by sequence number
469        files.sort_by(|a, b| {
470            let seq_a = Self::sequence_from_path(a).unwrap_or(0);
471            let seq_b = Self::sequence_from_path(b).unwrap_or(0);
472            seq_a.cmp(&seq_b)
473        });
474
475        Ok(files)
476    }
477
478    /// Returns the latest checkpoint epoch, if any.
479    #[must_use]
480    pub fn checkpoint_epoch(&self) -> Option<EpochId> {
481        *self.checkpoint_epoch.lock()
482    }
483
484    /// Returns the total size of all WAL files in bytes.
485    #[must_use]
486    pub fn size_bytes(&self) -> usize {
487        let mut total = 0usize;
488        if let Ok(files) = self.log_files() {
489            for file in files {
490                if let Ok(metadata) = fs::metadata(&file) {
491                    total += metadata.len() as usize;
492                }
493            }
494        }
495        // Also include checkpoint metadata file
496        let metadata_path = self.dir.join(CHECKPOINT_METADATA_FILE);
497        if let Ok(metadata) = fs::metadata(&metadata_path) {
498            total += metadata.len() as usize;
499        }
500        total
501    }
502
503    /// Returns the timestamp of the last checkpoint (Unix epoch seconds), if any.
504    #[must_use]
505    pub fn last_checkpoint_timestamp(&self) -> Option<u64> {
506        if let Ok(Some(metadata)) = self.read_checkpoint_metadata() {
507            // Convert milliseconds to seconds
508            Some(metadata.timestamp_ms / 1000)
509        } else {
510            None
511        }
512    }
513
514    /// Closes the active log file, releasing its file handle.
515    ///
516    /// This allows the WAL directory to be safely removed on Windows,
517    /// where open file handles prevent directory deletion. A new log file
518    /// will be created automatically on the next write.
519    pub fn close_active_log(&self) {
520        let mut guard = self.active_log.lock();
521        // Dropping the LogFile closes the BufWriter and underlying File
522        *guard = None;
523    }
524
525    // === Private methods ===
526
527    fn ensure_active_log(&self) -> Result<()> {
528        let mut guard = self.active_log.lock();
529        if guard.is_none() {
530            let sequence = self.current_sequence.load(Ordering::Relaxed);
531            let path = self.log_path(sequence);
532
533            let file = OpenOptions::new()
534                .create(true)
535                .read(true)
536                .append(true)
537                .open(&path)?;
538
539            let size = file.metadata()?.len();
540
541            *guard = Some(LogFile {
542                writer: BufWriter::new(file),
543                size,
544                path,
545            });
546        }
547        Ok(())
548    }
549
550    fn log_path(&self, sequence: u64) -> PathBuf {
551        self.dir.join(format!("wal_{:08}.log", sequence))
552    }
553
554    fn sequence_from_path(path: &Path) -> Option<u64> {
555        path.file_stem()
556            .and_then(|s| s.to_str())
557            .and_then(|s| s.strip_prefix("wal_"))
558            .and_then(|s| s.parse().ok())
559    }
560
561    fn truncate_old_logs(&self) -> Result<()> {
562        let Some(checkpoint) = *self.checkpoint_epoch.lock() else {
563            return Ok(());
564        };
565
566        // Keep logs that might still be needed
567        // For now, keep the two most recent logs after checkpoint
568        let files = self.log_files()?;
569        let current_seq = self.current_sequence.load(Ordering::Relaxed);
570
571        for file in files {
572            if let Some(seq) = Self::sequence_from_path(&file) {
573                // Keep the last 2 log files before current
574                if seq + 2 < current_seq {
575                    // Only delete if we have a checkpoint after this log
576                    if checkpoint.as_u64() > seq {
577                        let _ = fs::remove_file(&file);
578                    }
579                }
580            }
581        }
582
583        Ok(())
584    }
585}
586
587// Backward compatibility - single-file API
588impl WalManager {
589    /// Opens a single WAL file (legacy API).
590    ///
591    /// # Errors
592    ///
593    /// Returns an error if the file cannot be opened.
594    pub fn open_file(path: impl AsRef<Path>) -> Result<Self> {
595        let path = path.as_ref();
596        let dir = path.parent().unwrap_or(Path::new("."));
597        let manager = Self::open(dir)?;
598        Ok(manager)
599    }
600
601    /// Returns the path to the active WAL file.
602    #[must_use]
603    pub fn path(&self) -> PathBuf {
604        let guard = self.active_log.lock();
605        guard
606            .as_ref()
607            .map_or_else(|| self.log_path(0), |l| l.path.clone())
608    }
609}
610
611#[cfg(test)]
612mod tests {
613    use super::*;
614    use grafeo_common::types::NodeId;
615    use tempfile::tempdir;
616
617    #[test]
618    fn test_wal_write() {
619        let dir = tempdir().unwrap();
620
621        let wal = WalManager::open(dir.path()).unwrap();
622
623        let record = WalRecord::CreateNode {
624            id: NodeId::new(1),
625            labels: vec!["Person".to_string()],
626        };
627
628        wal.log(&record).unwrap();
629        wal.flush().unwrap();
630
631        assert_eq!(wal.record_count(), 1);
632    }
633
634    #[test]
635    fn test_wal_rotation() {
636        let dir = tempdir().unwrap();
637
638        // Small max size to force rotation
639        let config = WalConfig {
640            max_log_size: 100,
641            ..Default::default()
642        };
643
644        let wal = WalManager::with_config(dir.path(), config).unwrap();
645
646        // Write enough records to trigger rotation
647        for i in 0..10 {
648            let record = WalRecord::CreateNode {
649                id: NodeId::new(i),
650                labels: vec!["Person".to_string()],
651            };
652            wal.log(&record).unwrap();
653        }
654
655        wal.flush().unwrap();
656
657        // Should have multiple log files
658        let files = wal.log_files().unwrap();
659        assert!(
660            files.len() > 1,
661            "Expected multiple log files after rotation"
662        );
663    }
664
665    #[test]
666    fn test_durability_modes() {
667        let dir = tempdir().unwrap();
668
669        // Test Sync mode
670        let config = WalConfig {
671            durability: DurabilityMode::Sync,
672            ..Default::default()
673        };
674        let wal = WalManager::with_config(dir.path().join("sync"), config).unwrap();
675        wal.log(&WalRecord::TransactionCommit {
676            transaction_id: TransactionId::new(1),
677        })
678        .unwrap();
679
680        // Test NoSync mode
681        let config = WalConfig {
682            durability: DurabilityMode::NoSync,
683            ..Default::default()
684        };
685        let wal = WalManager::with_config(dir.path().join("nosync"), config).unwrap();
686        wal.log(&WalRecord::CreateNode {
687            id: NodeId::new(1),
688            labels: vec![],
689        })
690        .unwrap();
691
692        // Test Batch mode
693        let config = WalConfig {
694            durability: DurabilityMode::Batch {
695                max_delay_ms: 10,
696                max_records: 5,
697            },
698            ..Default::default()
699        };
700        let wal = WalManager::with_config(dir.path().join("batch"), config).unwrap();
701        for i in 0..10 {
702            wal.log(&WalRecord::CreateNode {
703                id: NodeId::new(i),
704                labels: vec![],
705            })
706            .unwrap();
707        }
708
709        // Test Adaptive mode (just buffer flush, no inline sync)
710        let config = WalConfig {
711            durability: DurabilityMode::Adaptive {
712                target_interval_ms: 100,
713            },
714            ..Default::default()
715        };
716        let wal = WalManager::with_config(dir.path().join("adaptive"), config).unwrap();
717        for i in 0..10 {
718            wal.log(&WalRecord::CreateNode {
719                id: NodeId::new(i),
720                labels: vec![],
721            })
722            .unwrap();
723        }
724        // Manually sync since no flusher thread in this test
725        wal.sync().unwrap();
726    }
727
728    #[test]
729    fn test_checkpoint() {
730        let dir = tempdir().unwrap();
731
732        let wal = WalManager::open(dir.path()).unwrap();
733
734        // Write some records
735        wal.log(&WalRecord::CreateNode {
736            id: NodeId::new(1),
737            labels: vec!["Test".to_string()],
738        })
739        .unwrap();
740
741        wal.log(&WalRecord::TransactionCommit {
742            transaction_id: TransactionId::new(1),
743        })
744        .unwrap();
745
746        // Create checkpoint
747        wal.checkpoint(TransactionId::new(1), EpochId::new(10))
748            .unwrap();
749
750        assert_eq!(wal.checkpoint_epoch(), Some(EpochId::new(10)));
751    }
752}