graphos_adapters/storage/wal/
log.rs1use super::WalRecord;
4use graphos_common::types::{EpochId, TxId};
5use graphos_common::utils::error::{Error, Result};
6use parking_lot::Mutex;
7use serde::{Deserialize, Serialize};
8use std::fs::{self, File, OpenOptions};
9use std::io::{BufReader, BufWriter, Read, Write};
10use std::path::{Path, PathBuf};
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct CheckpointMetadata {
20 pub epoch: EpochId,
22 pub log_sequence: u64,
24 pub timestamp_ms: u64,
26 pub tx_id: TxId,
28}
29
30const CHECKPOINT_METADATA_FILE: &str = "checkpoint.meta";
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum DurabilityMode {
36 Sync,
39 Batch {
42 max_delay_ms: u64,
44 max_records: u64,
46 },
47 NoSync,
50}
51
52impl Default for DurabilityMode {
53 fn default() -> Self {
54 Self::Batch {
55 max_delay_ms: 100,
56 max_records: 1000,
57 }
58 }
59}
60
61#[derive(Debug, Clone)]
63pub struct WalConfig {
64 pub durability: DurabilityMode,
66 pub max_log_size: u64,
68 pub compression: bool,
70}
71
72impl Default for WalConfig {
73 fn default() -> Self {
74 Self {
75 durability: DurabilityMode::default(),
76 max_log_size: 64 * 1024 * 1024, compression: false,
78 }
79 }
80}
81
82struct LogFile {
84 writer: BufWriter<File>,
86 size: u64,
88 path: PathBuf,
90 #[allow(dead_code)]
92 sequence: u64,
93}
94
95pub struct WalManager {
97 dir: PathBuf,
99 config: WalConfig,
101 active_log: Mutex<Option<LogFile>>,
103 total_record_count: AtomicU64,
105 records_since_sync: AtomicU64,
107 last_sync: Mutex<Instant>,
109 current_sequence: AtomicU64,
111 checkpoint_epoch: Mutex<Option<EpochId>>,
113}
114
115impl WalManager {
116 pub fn open(dir: impl AsRef<Path>) -> Result<Self> {
122 Self::with_config(dir, WalConfig::default())
123 }
124
125 pub fn with_config(dir: impl AsRef<Path>, config: WalConfig) -> Result<Self> {
131 let dir = dir.as_ref().to_path_buf();
132 fs::create_dir_all(&dir)?;
133
134 let mut max_sequence = 0u64;
136 if let Ok(entries) = fs::read_dir(&dir) {
137 for entry in entries.flatten() {
138 if let Some(name) = entry.file_name().to_str() {
139 if let Some(seq_str) = name
140 .strip_prefix("wal_")
141 .and_then(|s| s.strip_suffix(".log"))
142 {
143 if let Ok(seq) = seq_str.parse::<u64>() {
144 max_sequence = max_sequence.max(seq);
145 }
146 }
147 }
148 }
149 }
150
151 let manager = Self {
152 dir,
153 config,
154 active_log: Mutex::new(None),
155 total_record_count: AtomicU64::new(0),
156 records_since_sync: AtomicU64::new(0),
157 last_sync: Mutex::new(Instant::now()),
158 current_sequence: AtomicU64::new(max_sequence),
159 checkpoint_epoch: Mutex::new(None),
160 };
161
162 manager.ensure_active_log()?;
164
165 Ok(manager)
166 }
167
168 pub fn log(&self, record: &WalRecord) -> Result<()> {
174 self.ensure_active_log()?;
175
176 let mut guard = self.active_log.lock();
177 let log_file = guard
178 .as_mut()
179 .ok_or_else(|| Error::Internal("WAL writer not available".to_string()))?;
180
181 let data = bincode::serde::encode_to_vec(record, bincode::config::standard())
183 .map_err(|e| Error::Serialization(e.to_string()))?;
184
185 let len = data.len() as u32;
187 log_file.writer.write_all(&len.to_le_bytes())?;
188
189 log_file.writer.write_all(&data)?;
191
192 let checksum = crc32fast::hash(&data);
194 log_file.writer.write_all(&checksum.to_le_bytes())?;
195
196 let record_size = 4 + data.len() as u64 + 4; log_file.size += record_size;
199
200 self.total_record_count.fetch_add(1, Ordering::Relaxed);
201 self.records_since_sync.fetch_add(1, Ordering::Relaxed);
202
203 let needs_rotation = log_file.size >= self.config.max_log_size;
205
206 match &self.config.durability {
208 DurabilityMode::Sync => {
209 if matches!(record, WalRecord::TxCommit { .. }) {
211 log_file.writer.flush()?;
212 log_file.writer.get_ref().sync_all()?;
213 self.records_since_sync.store(0, Ordering::Relaxed);
214 *self.last_sync.lock() = Instant::now();
215 }
216 }
217 DurabilityMode::Batch {
218 max_delay_ms,
219 max_records,
220 } => {
221 let records = self.records_since_sync.load(Ordering::Relaxed);
222 let elapsed = self.last_sync.lock().elapsed();
223
224 if records >= *max_records || elapsed >= Duration::from_millis(*max_delay_ms) {
225 log_file.writer.flush()?;
226 log_file.writer.get_ref().sync_all()?;
227 self.records_since_sync.store(0, Ordering::Relaxed);
228 *self.last_sync.lock() = Instant::now();
229 }
230 }
231 DurabilityMode::NoSync => {
232 log_file.writer.flush()?;
234 }
235 }
236
237 drop(guard);
238
239 if needs_rotation {
241 self.rotate()?;
242 }
243
244 Ok(())
245 }
246
247 pub fn checkpoint(&self, current_tx: TxId, epoch: EpochId) -> Result<()> {
256 self.log(&WalRecord::Checkpoint { tx_id: current_tx })?;
258
259 self.sync()?;
261
262 let log_sequence = self.current_sequence.load(Ordering::SeqCst);
264
265 let timestamp_ms = SystemTime::now()
267 .duration_since(UNIX_EPOCH)
268 .map(|d| d.as_millis() as u64)
269 .unwrap_or(0);
270
271 let metadata = CheckpointMetadata {
273 epoch,
274 log_sequence,
275 timestamp_ms,
276 tx_id: current_tx,
277 };
278
279 self.write_checkpoint_metadata(&metadata)?;
281
282 *self.checkpoint_epoch.lock() = Some(epoch);
284
285 self.truncate_old_logs()?;
287
288 Ok(())
289 }
290
291 fn write_checkpoint_metadata(&self, metadata: &CheckpointMetadata) -> Result<()> {
295 let metadata_path = self.dir.join(CHECKPOINT_METADATA_FILE);
296 let temp_path = self.dir.join(format!("{}.tmp", CHECKPOINT_METADATA_FILE));
297
298 let data = bincode::serde::encode_to_vec(metadata, bincode::config::standard())
300 .map_err(|e| Error::Serialization(e.to_string()))?;
301
302 let mut file = File::create(&temp_path)?;
304 file.write_all(&data)?;
305 file.sync_all()?;
306 drop(file);
307
308 fs::rename(&temp_path, &metadata_path)?;
310
311 Ok(())
312 }
313
314 pub fn read_checkpoint_metadata(&self) -> Result<Option<CheckpointMetadata>> {
318 let metadata_path = self.dir.join(CHECKPOINT_METADATA_FILE);
319
320 if !metadata_path.exists() {
321 return Ok(None);
322 }
323
324 let file = File::open(&metadata_path)?;
325 let mut reader = BufReader::new(file);
326 let mut data = Vec::new();
327 reader.read_to_end(&mut data)?;
328
329 let (metadata, _): (CheckpointMetadata, _) =
330 bincode::serde::decode_from_slice(&data, bincode::config::standard())
331 .map_err(|e| Error::Serialization(e.to_string()))?;
332
333 Ok(Some(metadata))
334 }
335
336 pub fn rotate(&self) -> Result<()> {
342 let new_sequence = self.current_sequence.fetch_add(1, Ordering::SeqCst) + 1;
343 let new_path = self.log_path(new_sequence);
344
345 let file = OpenOptions::new()
346 .create(true)
347 .read(true)
348 .append(true)
349 .open(&new_path)?;
350
351 let new_log = LogFile {
352 writer: BufWriter::new(file),
353 size: 0,
354 path: new_path,
355 sequence: new_sequence,
356 };
357
358 let mut guard = self.active_log.lock();
360 if let Some(old_log) = guard.take() {
361 drop(old_log);
363 }
364 *guard = Some(new_log);
365
366 Ok(())
367 }
368
369 pub fn flush(&self) -> Result<()> {
375 let mut guard = self.active_log.lock();
376 if let Some(log_file) = guard.as_mut() {
377 log_file.writer.flush()?;
378 }
379 Ok(())
380 }
381
382 pub fn sync(&self) -> Result<()> {
388 let mut guard = self.active_log.lock();
389 if let Some(log_file) = guard.as_mut() {
390 log_file.writer.flush()?;
391 log_file.writer.get_ref().sync_all()?;
392 }
393 self.records_since_sync.store(0, Ordering::Relaxed);
394 *self.last_sync.lock() = Instant::now();
395 Ok(())
396 }
397
398 #[must_use]
400 pub fn record_count(&self) -> u64 {
401 self.total_record_count.load(Ordering::Relaxed)
402 }
403
404 #[must_use]
406 pub fn dir(&self) -> &Path {
407 &self.dir
408 }
409
410 #[must_use]
412 pub fn durability_mode(&self) -> DurabilityMode {
413 self.config.durability
414 }
415
416 pub fn log_files(&self) -> Result<Vec<PathBuf>> {
418 let mut files = Vec::new();
419
420 if let Ok(entries) = fs::read_dir(&self.dir) {
421 for entry in entries.flatten() {
422 let path = entry.path();
423 if path.extension().is_some_and(|ext| ext == "log") {
424 files.push(path);
425 }
426 }
427 }
428
429 files.sort_by(|a, b| {
431 let seq_a = Self::sequence_from_path(a).unwrap_or(0);
432 let seq_b = Self::sequence_from_path(b).unwrap_or(0);
433 seq_a.cmp(&seq_b)
434 });
435
436 Ok(files)
437 }
438
439 #[must_use]
441 pub fn checkpoint_epoch(&self) -> Option<EpochId> {
442 *self.checkpoint_epoch.lock()
443 }
444
445 fn ensure_active_log(&self) -> Result<()> {
448 let mut guard = self.active_log.lock();
449 if guard.is_none() {
450 let sequence = self.current_sequence.load(Ordering::Relaxed);
451 let path = self.log_path(sequence);
452
453 let file = OpenOptions::new()
454 .create(true)
455 .read(true)
456 .append(true)
457 .open(&path)?;
458
459 let size = file.metadata()?.len();
460
461 *guard = Some(LogFile {
462 writer: BufWriter::new(file),
463 size,
464 path,
465 sequence,
466 });
467 }
468 Ok(())
469 }
470
471 fn log_path(&self, sequence: u64) -> PathBuf {
472 self.dir.join(format!("wal_{:08}.log", sequence))
473 }
474
475 fn sequence_from_path(path: &Path) -> Option<u64> {
476 path.file_stem()
477 .and_then(|s| s.to_str())
478 .and_then(|s| s.strip_prefix("wal_"))
479 .and_then(|s| s.parse().ok())
480 }
481
482 fn truncate_old_logs(&self) -> Result<()> {
483 let checkpoint = match *self.checkpoint_epoch.lock() {
484 Some(e) => e,
485 None => return Ok(()),
486 };
487
488 let files = self.log_files()?;
491 let current_seq = self.current_sequence.load(Ordering::Relaxed);
492
493 for file in files {
494 if let Some(seq) = Self::sequence_from_path(&file) {
495 if seq + 2 < current_seq {
497 if checkpoint.as_u64() > seq {
499 let _ = fs::remove_file(&file);
500 }
501 }
502 }
503 }
504
505 Ok(())
506 }
507}
508
509impl WalManager {
511 pub fn open_file(path: impl AsRef<Path>) -> Result<Self> {
517 let path = path.as_ref();
518 let dir = path.parent().unwrap_or(Path::new("."));
519 let manager = Self::open(dir)?;
520 Ok(manager)
521 }
522
523 #[must_use]
525 pub fn path(&self) -> PathBuf {
526 let guard = self.active_log.lock();
527 guard
528 .as_ref()
529 .map(|l| l.path.clone())
530 .unwrap_or_else(|| self.log_path(0))
531 }
532}
533
534#[cfg(test)]
535mod tests {
536 use super::*;
537 use graphos_common::types::NodeId;
538 use tempfile::tempdir;
539
540 #[test]
541 fn test_wal_write() {
542 let dir = tempdir().unwrap();
543
544 let wal = WalManager::open(dir.path()).unwrap();
545
546 let record = WalRecord::CreateNode {
547 id: NodeId::new(1),
548 labels: vec!["Person".to_string()],
549 };
550
551 wal.log(&record).unwrap();
552 wal.flush().unwrap();
553
554 assert_eq!(wal.record_count(), 1);
555 }
556
557 #[test]
558 fn test_wal_rotation() {
559 let dir = tempdir().unwrap();
560
561 let config = WalConfig {
563 max_log_size: 100,
564 ..Default::default()
565 };
566
567 let wal = WalManager::with_config(dir.path(), config).unwrap();
568
569 for i in 0..10 {
571 let record = WalRecord::CreateNode {
572 id: NodeId::new(i),
573 labels: vec!["Person".to_string()],
574 };
575 wal.log(&record).unwrap();
576 }
577
578 wal.flush().unwrap();
579
580 let files = wal.log_files().unwrap();
582 assert!(
583 files.len() > 1,
584 "Expected multiple log files after rotation"
585 );
586 }
587
588 #[test]
589 fn test_durability_modes() {
590 let dir = tempdir().unwrap();
591
592 let config = WalConfig {
594 durability: DurabilityMode::Sync,
595 ..Default::default()
596 };
597 let wal = WalManager::with_config(dir.path().join("sync"), config).unwrap();
598 wal.log(&WalRecord::TxCommit {
599 tx_id: TxId::new(1),
600 })
601 .unwrap();
602
603 let config = WalConfig {
605 durability: DurabilityMode::NoSync,
606 ..Default::default()
607 };
608 let wal = WalManager::with_config(dir.path().join("nosync"), config).unwrap();
609 wal.log(&WalRecord::CreateNode {
610 id: NodeId::new(1),
611 labels: vec![],
612 })
613 .unwrap();
614
615 let config = WalConfig {
617 durability: DurabilityMode::Batch {
618 max_delay_ms: 10,
619 max_records: 5,
620 },
621 ..Default::default()
622 };
623 let wal = WalManager::with_config(dir.path().join("batch"), config).unwrap();
624 for i in 0..10 {
625 wal.log(&WalRecord::CreateNode {
626 id: NodeId::new(i),
627 labels: vec![],
628 })
629 .unwrap();
630 }
631 }
632
633 #[test]
634 fn test_checkpoint() {
635 let dir = tempdir().unwrap();
636
637 let wal = WalManager::open(dir.path()).unwrap();
638
639 wal.log(&WalRecord::CreateNode {
641 id: NodeId::new(1),
642 labels: vec!["Test".to_string()],
643 })
644 .unwrap();
645
646 wal.log(&WalRecord::TxCommit {
647 tx_id: TxId::new(1),
648 })
649 .unwrap();
650
651 wal.checkpoint(TxId::new(1), EpochId::new(10)).unwrap();
653
654 assert_eq!(wal.checkpoint_epoch(), Some(EpochId::new(10)));
655 }
656}