graphos_adapters/storage/wal/
log.rs1use super::WalRecord;
4use graphos_common::utils::error::{Error, Result};
5use parking_lot::Mutex;
6use std::fs::{File, OpenOptions};
7use std::io::{BufWriter, Write};
8use std::path::{Path, PathBuf};
9
10pub struct WalManager {
12 path: PathBuf,
14 writer: Mutex<Option<BufWriter<File>>>,
16 record_count: Mutex<u64>,
18}
19
20impl WalManager {
21 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
27 let path = path.as_ref().to_path_buf();
28
29 let file = OpenOptions::new()
30 .create(true)
31 .read(true)
32 .append(true)
33 .open(&path)?;
34
35 let writer = BufWriter::new(file);
36
37 Ok(Self {
38 path,
39 writer: Mutex::new(Some(writer)),
40 record_count: Mutex::new(0),
41 })
42 }
43
44 pub fn log(&self, record: &WalRecord) -> Result<()> {
50 let mut guard = self.writer.lock();
51 let writer = guard
52 .as_mut()
53 .ok_or_else(|| Error::Internal("WAL writer not available".to_string()))?;
54
55 let data = bincode::serde::encode_to_vec(record, bincode::config::standard())
57 .map_err(|e| Error::Serialization(e.to_string()))?;
58
59 let len = data.len() as u32;
61 writer.write_all(&len.to_le_bytes())?;
62
63 writer.write_all(&data)?;
65
66 let checksum = crc32fast::hash(&data);
68 writer.write_all(&checksum.to_le_bytes())?;
69
70 *self.record_count.lock() += 1;
71
72 Ok(())
73 }
74
75 pub fn flush(&self) -> Result<()> {
81 let mut guard = self.writer.lock();
82 if let Some(writer) = guard.as_mut() {
83 writer.flush()?;
84 }
85 Ok(())
86 }
87
88 pub fn sync(&self) -> Result<()> {
94 let mut guard = self.writer.lock();
95 if let Some(writer) = guard.as_mut() {
96 writer.flush()?;
97 writer.get_ref().sync_all()?;
98 }
99 Ok(())
100 }
101
102 #[must_use]
104 pub fn record_count(&self) -> u64 {
105 *self.record_count.lock()
106 }
107
108 #[must_use]
110 pub fn path(&self) -> &Path {
111 &self.path
112 }
113}
114
115#[cfg(test)]
116mod tests {
117 use super::*;
118 use graphos_common::types::NodeId;
119 use tempfile::tempdir;
120
121 #[test]
122 fn test_wal_write() {
123 let dir = tempdir().unwrap();
124 let path = dir.path().join("test.wal");
125
126 let wal = WalManager::open(&path).unwrap();
127
128 let record = WalRecord::CreateNode {
129 id: NodeId::new(1),
130 labels: vec!["Person".to_string()],
131 };
132
133 wal.log(&record).unwrap();
134 wal.flush().unwrap();
135
136 assert_eq!(wal.record_count(), 1);
137 }
138}