Skip to main content

graphos_adapters/storage/wal/
log.rs

1//! WAL log file management.
2
3use super::WalRecord;
4use graphos_common::types::{EpochId, TxId};
5use graphos_common::utils::error::{Error, Result};
6use parking_lot::Mutex;
7use serde::{Deserialize, Serialize};
8use std::fs::{self, File, OpenOptions};
9use std::io::{BufReader, BufWriter, Read, Write};
10use std::path::{Path, PathBuf};
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
13
14/// Checkpoint metadata stored in a separate file.
15///
16/// This file is written atomically (via rename) during checkpoint and read
17/// during recovery to determine which WAL files can be skipped.
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct CheckpointMetadata {
20    /// The epoch at which the checkpoint was taken.
21    pub epoch: EpochId,
22    /// The log sequence number at the time of checkpoint.
23    pub log_sequence: u64,
24    /// Timestamp of the checkpoint (milliseconds since UNIX epoch).
25    pub timestamp_ms: u64,
26    /// Transaction ID at checkpoint.
27    pub tx_id: TxId,
28}
29
30/// Name of the checkpoint metadata file.
31const CHECKPOINT_METADATA_FILE: &str = "checkpoint.meta";
32
33/// Durability mode for the WAL.
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum DurabilityMode {
36    /// Sync (fsync) after every commit for maximum durability.
37    /// Slowest but safest.
38    Sync,
39    /// Batch sync - fsync periodically (e.g., every N ms or N records).
40    /// Good balance of performance and durability.
41    Batch {
42        /// Maximum time between syncs in milliseconds.
43        max_delay_ms: u64,
44        /// Maximum records between syncs.
45        max_records: u64,
46    },
47    /// No sync - rely on OS buffer flushing.
48    /// Fastest but may lose recent data on crash.
49    NoSync,
50}
51
52impl Default for DurabilityMode {
53    fn default() -> Self {
54        Self::Batch {
55            max_delay_ms: 100,
56            max_records: 1000,
57        }
58    }
59}
60
61/// Configuration for the WAL manager.
62#[derive(Debug, Clone)]
63pub struct WalConfig {
64    /// Durability mode.
65    pub durability: DurabilityMode,
66    /// Maximum log file size before rotation (in bytes).
67    pub max_log_size: u64,
68    /// Whether to enable compression.
69    pub compression: bool,
70}
71
72impl Default for WalConfig {
73    fn default() -> Self {
74        Self {
75            durability: DurabilityMode::default(),
76            max_log_size: 64 * 1024 * 1024, // 64 MB
77            compression: false,
78        }
79    }
80}
81
82/// State for a single log file.
83struct LogFile {
84    /// File handle.
85    writer: BufWriter<File>,
86    /// Current size in bytes.
87    size: u64,
88    /// File path.
89    path: PathBuf,
90    /// Sequence number (for log file ordering during recovery).
91    #[allow(dead_code)]
92    sequence: u64,
93}
94
95/// Manages the Write-Ahead Log with rotation, checkpointing, and durability modes.
96pub struct WalManager {
97    /// Directory for WAL files.
98    dir: PathBuf,
99    /// Configuration.
100    config: WalConfig,
101    /// Active log file.
102    active_log: Mutex<Option<LogFile>>,
103    /// Total number of records written across all log files.
104    total_record_count: AtomicU64,
105    /// Records since last sync (for batch mode).
106    records_since_sync: AtomicU64,
107    /// Time of last sync (for batch mode).
108    last_sync: Mutex<Instant>,
109    /// Current log sequence number.
110    current_sequence: AtomicU64,
111    /// Latest checkpoint epoch.
112    checkpoint_epoch: Mutex<Option<EpochId>>,
113}
114
115impl WalManager {
116    /// Opens or creates a WAL in the given directory.
117    ///
118    /// # Errors
119    ///
120    /// Returns an error if the directory cannot be created or accessed.
121    pub fn open(dir: impl AsRef<Path>) -> Result<Self> {
122        Self::with_config(dir, WalConfig::default())
123    }
124
125    /// Opens or creates a WAL with custom configuration.
126    ///
127    /// # Errors
128    ///
129    /// Returns an error if the directory cannot be created or accessed.
130    pub fn with_config(dir: impl AsRef<Path>, config: WalConfig) -> Result<Self> {
131        let dir = dir.as_ref().to_path_buf();
132        fs::create_dir_all(&dir)?;
133
134        // Find the highest existing sequence number
135        let mut max_sequence = 0u64;
136        if let Ok(entries) = fs::read_dir(&dir) {
137            for entry in entries.flatten() {
138                if let Some(name) = entry.file_name().to_str() {
139                    if let Some(seq_str) = name
140                        .strip_prefix("wal_")
141                        .and_then(|s| s.strip_suffix(".log"))
142                    {
143                        if let Ok(seq) = seq_str.parse::<u64>() {
144                            max_sequence = max_sequence.max(seq);
145                        }
146                    }
147                }
148            }
149        }
150
151        let manager = Self {
152            dir,
153            config,
154            active_log: Mutex::new(None),
155            total_record_count: AtomicU64::new(0),
156            records_since_sync: AtomicU64::new(0),
157            last_sync: Mutex::new(Instant::now()),
158            current_sequence: AtomicU64::new(max_sequence),
159            checkpoint_epoch: Mutex::new(None),
160        };
161
162        // Open or create the active log
163        manager.ensure_active_log()?;
164
165        Ok(manager)
166    }
167
168    /// Logs a record to the WAL.
169    ///
170    /// # Errors
171    ///
172    /// Returns an error if the record cannot be written.
173    pub fn log(&self, record: &WalRecord) -> Result<()> {
174        self.ensure_active_log()?;
175
176        let mut guard = self.active_log.lock();
177        let log_file = guard
178            .as_mut()
179            .ok_or_else(|| Error::Internal("WAL writer not available".to_string()))?;
180
181        // Serialize the record
182        let data = bincode::serde::encode_to_vec(record, bincode::config::standard())
183            .map_err(|e| Error::Serialization(e.to_string()))?;
184
185        // Write length prefix
186        let len = data.len() as u32;
187        log_file.writer.write_all(&len.to_le_bytes())?;
188
189        // Write data
190        log_file.writer.write_all(&data)?;
191
192        // Write checksum
193        let checksum = crc32fast::hash(&data);
194        log_file.writer.write_all(&checksum.to_le_bytes())?;
195
196        // Update size tracking
197        let record_size = 4 + data.len() as u64 + 4; // length + data + checksum
198        log_file.size += record_size;
199
200        self.total_record_count.fetch_add(1, Ordering::Relaxed);
201        self.records_since_sync.fetch_add(1, Ordering::Relaxed);
202
203        // Check if we need to rotate
204        let needs_rotation = log_file.size >= self.config.max_log_size;
205
206        // Handle durability mode
207        match &self.config.durability {
208            DurabilityMode::Sync => {
209                // Sync on every commit record
210                if matches!(record, WalRecord::TxCommit { .. }) {
211                    log_file.writer.flush()?;
212                    log_file.writer.get_ref().sync_all()?;
213                    self.records_since_sync.store(0, Ordering::Relaxed);
214                    *self.last_sync.lock() = Instant::now();
215                }
216            }
217            DurabilityMode::Batch {
218                max_delay_ms,
219                max_records,
220            } => {
221                let records = self.records_since_sync.load(Ordering::Relaxed);
222                let elapsed = self.last_sync.lock().elapsed();
223
224                if records >= *max_records || elapsed >= Duration::from_millis(*max_delay_ms) {
225                    log_file.writer.flush()?;
226                    log_file.writer.get_ref().sync_all()?;
227                    self.records_since_sync.store(0, Ordering::Relaxed);
228                    *self.last_sync.lock() = Instant::now();
229                }
230            }
231            DurabilityMode::NoSync => {
232                // Just flush buffer, no sync
233                log_file.writer.flush()?;
234            }
235        }
236
237        drop(guard);
238
239        // Rotate if needed
240        if needs_rotation {
241            self.rotate()?;
242        }
243
244        Ok(())
245    }
246
247    /// Writes a checkpoint marker and persists checkpoint metadata.
248    ///
249    /// The checkpoint metadata is written atomically to a separate file,
250    /// allowing recovery to skip WAL files that precede the checkpoint.
251    ///
252    /// # Errors
253    ///
254    /// Returns an error if the checkpoint cannot be written.
255    pub fn checkpoint(&self, current_tx: TxId, epoch: EpochId) -> Result<()> {
256        // Write checkpoint record to WAL
257        self.log(&WalRecord::Checkpoint { tx_id: current_tx })?;
258
259        // Force sync on checkpoint
260        self.sync()?;
261
262        // Get current log sequence
263        let log_sequence = self.current_sequence.load(Ordering::SeqCst);
264
265        // Get current timestamp
266        let timestamp_ms = SystemTime::now()
267            .duration_since(UNIX_EPOCH)
268            .map(|d| d.as_millis() as u64)
269            .unwrap_or(0);
270
271        // Create checkpoint metadata
272        let metadata = CheckpointMetadata {
273            epoch,
274            log_sequence,
275            timestamp_ms,
276            tx_id: current_tx,
277        };
278
279        // Write checkpoint metadata atomically
280        self.write_checkpoint_metadata(&metadata)?;
281
282        // Update in-memory checkpoint epoch
283        *self.checkpoint_epoch.lock() = Some(epoch);
284
285        // Optionally truncate old logs
286        self.truncate_old_logs()?;
287
288        Ok(())
289    }
290
291    /// Writes checkpoint metadata to disk atomically.
292    ///
293    /// Uses a write-to-temp-then-rename pattern for atomicity.
294    fn write_checkpoint_metadata(&self, metadata: &CheckpointMetadata) -> Result<()> {
295        let metadata_path = self.dir.join(CHECKPOINT_METADATA_FILE);
296        let temp_path = self.dir.join(format!("{}.tmp", CHECKPOINT_METADATA_FILE));
297
298        // Serialize metadata
299        let data = bincode::serde::encode_to_vec(metadata, bincode::config::standard())
300            .map_err(|e| Error::Serialization(e.to_string()))?;
301
302        // Write to temp file
303        let mut file = File::create(&temp_path)?;
304        file.write_all(&data)?;
305        file.sync_all()?;
306        drop(file);
307
308        // Atomic rename
309        fs::rename(&temp_path, &metadata_path)?;
310
311        Ok(())
312    }
313
314    /// Reads checkpoint metadata from disk.
315    ///
316    /// Returns `None` if no checkpoint metadata exists.
317    pub fn read_checkpoint_metadata(&self) -> Result<Option<CheckpointMetadata>> {
318        let metadata_path = self.dir.join(CHECKPOINT_METADATA_FILE);
319
320        if !metadata_path.exists() {
321            return Ok(None);
322        }
323
324        let file = File::open(&metadata_path)?;
325        let mut reader = BufReader::new(file);
326        let mut data = Vec::new();
327        reader.read_to_end(&mut data)?;
328
329        let (metadata, _): (CheckpointMetadata, _) =
330            bincode::serde::decode_from_slice(&data, bincode::config::standard())
331                .map_err(|e| Error::Serialization(e.to_string()))?;
332
333        Ok(Some(metadata))
334    }
335
336    /// Rotates to a new log file.
337    ///
338    /// # Errors
339    ///
340    /// Returns an error if rotation fails.
341    pub fn rotate(&self) -> Result<()> {
342        let new_sequence = self.current_sequence.fetch_add(1, Ordering::SeqCst) + 1;
343        let new_path = self.log_path(new_sequence);
344
345        let file = OpenOptions::new()
346            .create(true)
347            .read(true)
348            .append(true)
349            .open(&new_path)?;
350
351        let new_log = LogFile {
352            writer: BufWriter::new(file),
353            size: 0,
354            path: new_path,
355            sequence: new_sequence,
356        };
357
358        // Replace active log
359        let mut guard = self.active_log.lock();
360        if let Some(old_log) = guard.take() {
361            // Ensure old log is flushed
362            drop(old_log);
363        }
364        *guard = Some(new_log);
365
366        Ok(())
367    }
368
369    /// Flushes the WAL buffer to disk.
370    ///
371    /// # Errors
372    ///
373    /// Returns an error if the flush fails.
374    pub fn flush(&self) -> Result<()> {
375        let mut guard = self.active_log.lock();
376        if let Some(log_file) = guard.as_mut() {
377            log_file.writer.flush()?;
378        }
379        Ok(())
380    }
381
382    /// Syncs the WAL to disk (fsync).
383    ///
384    /// # Errors
385    ///
386    /// Returns an error if the sync fails.
387    pub fn sync(&self) -> Result<()> {
388        let mut guard = self.active_log.lock();
389        if let Some(log_file) = guard.as_mut() {
390            log_file.writer.flush()?;
391            log_file.writer.get_ref().sync_all()?;
392        }
393        self.records_since_sync.store(0, Ordering::Relaxed);
394        *self.last_sync.lock() = Instant::now();
395        Ok(())
396    }
397
398    /// Returns the total number of records written.
399    #[must_use]
400    pub fn record_count(&self) -> u64 {
401        self.total_record_count.load(Ordering::Relaxed)
402    }
403
404    /// Returns the WAL directory path.
405    #[must_use]
406    pub fn dir(&self) -> &Path {
407        &self.dir
408    }
409
410    /// Returns the current durability mode.
411    #[must_use]
412    pub fn durability_mode(&self) -> DurabilityMode {
413        self.config.durability
414    }
415
416    /// Returns all WAL log file paths in sequence order.
417    pub fn log_files(&self) -> Result<Vec<PathBuf>> {
418        let mut files = Vec::new();
419
420        if let Ok(entries) = fs::read_dir(&self.dir) {
421            for entry in entries.flatten() {
422                let path = entry.path();
423                if path.extension().is_some_and(|ext| ext == "log") {
424                    files.push(path);
425                }
426            }
427        }
428
429        // Sort by sequence number
430        files.sort_by(|a, b| {
431            let seq_a = Self::sequence_from_path(a).unwrap_or(0);
432            let seq_b = Self::sequence_from_path(b).unwrap_or(0);
433            seq_a.cmp(&seq_b)
434        });
435
436        Ok(files)
437    }
438
439    /// Returns the latest checkpoint epoch, if any.
440    #[must_use]
441    pub fn checkpoint_epoch(&self) -> Option<EpochId> {
442        *self.checkpoint_epoch.lock()
443    }
444
445    // === Private methods ===
446
447    fn ensure_active_log(&self) -> Result<()> {
448        let mut guard = self.active_log.lock();
449        if guard.is_none() {
450            let sequence = self.current_sequence.load(Ordering::Relaxed);
451            let path = self.log_path(sequence);
452
453            let file = OpenOptions::new()
454                .create(true)
455                .read(true)
456                .append(true)
457                .open(&path)?;
458
459            let size = file.metadata()?.len();
460
461            *guard = Some(LogFile {
462                writer: BufWriter::new(file),
463                size,
464                path,
465                sequence,
466            });
467        }
468        Ok(())
469    }
470
471    fn log_path(&self, sequence: u64) -> PathBuf {
472        self.dir.join(format!("wal_{:08}.log", sequence))
473    }
474
475    fn sequence_from_path(path: &Path) -> Option<u64> {
476        path.file_stem()
477            .and_then(|s| s.to_str())
478            .and_then(|s| s.strip_prefix("wal_"))
479            .and_then(|s| s.parse().ok())
480    }
481
482    fn truncate_old_logs(&self) -> Result<()> {
483        let checkpoint = match *self.checkpoint_epoch.lock() {
484            Some(e) => e,
485            None => return Ok(()),
486        };
487
488        // Keep logs that might still be needed
489        // For now, keep the two most recent logs after checkpoint
490        let files = self.log_files()?;
491        let current_seq = self.current_sequence.load(Ordering::Relaxed);
492
493        for file in files {
494            if let Some(seq) = Self::sequence_from_path(&file) {
495                // Keep the last 2 log files before current
496                if seq + 2 < current_seq {
497                    // Only delete if we have a checkpoint after this log
498                    if checkpoint.as_u64() > seq {
499                        let _ = fs::remove_file(&file);
500                    }
501                }
502            }
503        }
504
505        Ok(())
506    }
507}
508
509// Backward compatibility - single-file API
510impl WalManager {
511    /// Opens a single WAL file (legacy API).
512    ///
513    /// # Errors
514    ///
515    /// Returns an error if the file cannot be opened.
516    pub fn open_file(path: impl AsRef<Path>) -> Result<Self> {
517        let path = path.as_ref();
518        let dir = path.parent().unwrap_or(Path::new("."));
519        let manager = Self::open(dir)?;
520        Ok(manager)
521    }
522
523    /// Returns the path to the active WAL file.
524    #[must_use]
525    pub fn path(&self) -> PathBuf {
526        let guard = self.active_log.lock();
527        guard
528            .as_ref()
529            .map(|l| l.path.clone())
530            .unwrap_or_else(|| self.log_path(0))
531    }
532}
533
534#[cfg(test)]
535mod tests {
536    use super::*;
537    use graphos_common::types::NodeId;
538    use tempfile::tempdir;
539
540    #[test]
541    fn test_wal_write() {
542        let dir = tempdir().unwrap();
543
544        let wal = WalManager::open(dir.path()).unwrap();
545
546        let record = WalRecord::CreateNode {
547            id: NodeId::new(1),
548            labels: vec!["Person".to_string()],
549        };
550
551        wal.log(&record).unwrap();
552        wal.flush().unwrap();
553
554        assert_eq!(wal.record_count(), 1);
555    }
556
557    #[test]
558    fn test_wal_rotation() {
559        let dir = tempdir().unwrap();
560
561        // Small max size to force rotation
562        let config = WalConfig {
563            max_log_size: 100,
564            ..Default::default()
565        };
566
567        let wal = WalManager::with_config(dir.path(), config).unwrap();
568
569        // Write enough records to trigger rotation
570        for i in 0..10 {
571            let record = WalRecord::CreateNode {
572                id: NodeId::new(i),
573                labels: vec!["Person".to_string()],
574            };
575            wal.log(&record).unwrap();
576        }
577
578        wal.flush().unwrap();
579
580        // Should have multiple log files
581        let files = wal.log_files().unwrap();
582        assert!(
583            files.len() > 1,
584            "Expected multiple log files after rotation"
585        );
586    }
587
588    #[test]
589    fn test_durability_modes() {
590        let dir = tempdir().unwrap();
591
592        // Test Sync mode
593        let config = WalConfig {
594            durability: DurabilityMode::Sync,
595            ..Default::default()
596        };
597        let wal = WalManager::with_config(dir.path().join("sync"), config).unwrap();
598        wal.log(&WalRecord::TxCommit {
599            tx_id: TxId::new(1),
600        })
601        .unwrap();
602
603        // Test NoSync mode
604        let config = WalConfig {
605            durability: DurabilityMode::NoSync,
606            ..Default::default()
607        };
608        let wal = WalManager::with_config(dir.path().join("nosync"), config).unwrap();
609        wal.log(&WalRecord::CreateNode {
610            id: NodeId::new(1),
611            labels: vec![],
612        })
613        .unwrap();
614
615        // Test Batch mode
616        let config = WalConfig {
617            durability: DurabilityMode::Batch {
618                max_delay_ms: 10,
619                max_records: 5,
620            },
621            ..Default::default()
622        };
623        let wal = WalManager::with_config(dir.path().join("batch"), config).unwrap();
624        for i in 0..10 {
625            wal.log(&WalRecord::CreateNode {
626                id: NodeId::new(i),
627                labels: vec![],
628            })
629            .unwrap();
630        }
631    }
632
633    #[test]
634    fn test_checkpoint() {
635        let dir = tempdir().unwrap();
636
637        let wal = WalManager::open(dir.path()).unwrap();
638
639        // Write some records
640        wal.log(&WalRecord::CreateNode {
641            id: NodeId::new(1),
642            labels: vec!["Test".to_string()],
643        })
644        .unwrap();
645
646        wal.log(&WalRecord::TxCommit {
647            tx_id: TxId::new(1),
648        })
649        .unwrap();
650
651        // Create checkpoint
652        wal.checkpoint(TxId::new(1), EpochId::new(10)).unwrap();
653
654        assert_eq!(wal.checkpoint_epoch(), Some(EpochId::new(10)));
655    }
656}