Skip to main content

graphos_adapters/storage/wal/
log.rs

1//! WAL log file management.
2
3use super::WalRecord;
4use graphos_common::utils::error::{Error, Result};
5use parking_lot::Mutex;
6use std::fs::{File, OpenOptions};
7use std::io::{BufWriter, Write};
8use std::path::{Path, PathBuf};
9
10/// Manages the Write-Ahead Log.
11pub struct WalManager {
12    /// Path to the WAL file.
13    path: PathBuf,
14    /// Writer for appending records.
15    writer: Mutex<Option<BufWriter<File>>>,
16    /// Number of records written.
17    record_count: Mutex<u64>,
18}
19
20impl WalManager {
21    /// Opens or creates a WAL file at the given path.
22    ///
23    /// # Errors
24    ///
25    /// Returns an error if the file cannot be opened or created.
26    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
27        let path = path.as_ref().to_path_buf();
28
29        let file = OpenOptions::new()
30            .create(true)
31            .read(true)
32            .append(true)
33            .open(&path)?;
34
35        let writer = BufWriter::new(file);
36
37        Ok(Self {
38            path,
39            writer: Mutex::new(Some(writer)),
40            record_count: Mutex::new(0),
41        })
42    }
43
44    /// Logs a record to the WAL.
45    ///
46    /// # Errors
47    ///
48    /// Returns an error if the record cannot be written.
49    pub fn log(&self, record: &WalRecord) -> Result<()> {
50        let mut guard = self.writer.lock();
51        let writer = guard
52            .as_mut()
53            .ok_or_else(|| Error::Internal("WAL writer not available".to_string()))?;
54
55        // Serialize the record
56        let data = bincode::serde::encode_to_vec(record, bincode::config::standard())
57            .map_err(|e| Error::Serialization(e.to_string()))?;
58
59        // Write length prefix
60        let len = data.len() as u32;
61        writer.write_all(&len.to_le_bytes())?;
62
63        // Write data
64        writer.write_all(&data)?;
65
66        // Write checksum
67        let checksum = crc32fast::hash(&data);
68        writer.write_all(&checksum.to_le_bytes())?;
69
70        *self.record_count.lock() += 1;
71
72        Ok(())
73    }
74
75    /// Flushes the WAL to disk.
76    ///
77    /// # Errors
78    ///
79    /// Returns an error if the flush fails.
80    pub fn flush(&self) -> Result<()> {
81        let mut guard = self.writer.lock();
82        if let Some(writer) = guard.as_mut() {
83            writer.flush()?;
84        }
85        Ok(())
86    }
87
88    /// Syncs the WAL to disk (fsync).
89    ///
90    /// # Errors
91    ///
92    /// Returns an error if the sync fails.
93    pub fn sync(&self) -> Result<()> {
94        let mut guard = self.writer.lock();
95        if let Some(writer) = guard.as_mut() {
96            writer.flush()?;
97            writer.get_ref().sync_all()?;
98        }
99        Ok(())
100    }
101
102    /// Returns the number of records written.
103    #[must_use]
104    pub fn record_count(&self) -> u64 {
105        *self.record_count.lock()
106    }
107
108    /// Returns the path to the WAL file.
109    #[must_use]
110    pub fn path(&self) -> &Path {
111        &self.path
112    }
113}
114
115#[cfg(test)]
116mod tests {
117    use super::*;
118    use graphos_common::types::NodeId;
119    use tempfile::tempdir;
120
121    #[test]
122    fn test_wal_write() {
123        let dir = tempdir().unwrap();
124        let path = dir.path().join("test.wal");
125
126        let wal = WalManager::open(&path).unwrap();
127
128        let record = WalRecord::CreateNode {
129            id: NodeId::new(1),
130            labels: vec!["Person".to_string()],
131        };
132
133        wal.log(&record).unwrap();
134        wal.flush().unwrap();
135
136        assert_eq!(wal.record_count(), 1);
137    }
138}