grafeo_adapters/storage/wal/
log.rs1use super::WalRecord;
4use grafeo_common::types::{EpochId, TransactionId};
5use grafeo_common::utils::error::{Error, Result};
6use parking_lot::Mutex;
7use serde::{Deserialize, Serialize};
8use std::fs::{self, File, OpenOptions};
9use std::io::{BufReader, BufWriter, Read, Write};
10use std::path::{Path, PathBuf};
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct CheckpointMetadata {
20 pub epoch: EpochId,
22 pub log_sequence: u64,
24 pub timestamp_ms: u64,
26 pub transaction_id: TransactionId,
28}
29
30const CHECKPOINT_METADATA_FILE: &str = "checkpoint.meta";
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum DurabilityMode {
36 Sync,
39 Batch {
42 max_delay_ms: u64,
44 max_records: u64,
46 },
47 Adaptive {
56 target_interval_ms: u64,
59 },
60 NoSync,
63}
64
65impl Default for DurabilityMode {
66 fn default() -> Self {
67 Self::Batch {
68 max_delay_ms: 100,
69 max_records: 1000,
70 }
71 }
72}
73
74#[derive(Debug, Clone)]
76pub struct WalConfig {
77 pub durability: DurabilityMode,
79 pub max_log_size: u64,
81 pub compression: bool,
83}
84
85impl Default for WalConfig {
86 fn default() -> Self {
87 Self {
88 durability: DurabilityMode::default(),
89 max_log_size: 64 * 1024 * 1024, compression: false,
91 }
92 }
93}
94
95struct LogFile {
97 writer: BufWriter<File>,
99 size: u64,
101 path: PathBuf,
103}
104
105pub struct WalManager {
107 dir: PathBuf,
109 config: WalConfig,
111 active_log: Mutex<Option<LogFile>>,
113 total_record_count: AtomicU64,
115 records_since_sync: AtomicU64,
117 last_sync: Mutex<Instant>,
119 current_sequence: AtomicU64,
121 checkpoint_epoch: Mutex<Option<EpochId>>,
123}
124
125impl WalManager {
126 pub fn open(dir: impl AsRef<Path>) -> Result<Self> {
132 Self::with_config(dir, WalConfig::default())
133 }
134
135 pub fn with_config(dir: impl AsRef<Path>, config: WalConfig) -> Result<Self> {
141 let dir = dir.as_ref().to_path_buf();
142 fs::create_dir_all(&dir)?;
143
144 let mut max_sequence = 0u64;
146 if let Ok(entries) = fs::read_dir(&dir) {
147 for entry in entries.flatten() {
148 if let Some(name) = entry.file_name().to_str()
149 && let Some(seq_str) = name
150 .strip_prefix("wal_")
151 .and_then(|s| s.strip_suffix(".log"))
152 && let Ok(seq) = seq_str.parse::<u64>()
153 {
154 max_sequence = max_sequence.max(seq);
155 }
156 }
157 }
158
159 let manager = Self {
160 dir,
161 config,
162 active_log: Mutex::new(None),
163 total_record_count: AtomicU64::new(0),
164 records_since_sync: AtomicU64::new(0),
165 last_sync: Mutex::new(Instant::now()),
166 current_sequence: AtomicU64::new(max_sequence),
167 checkpoint_epoch: Mutex::new(None),
168 };
169
170 manager.ensure_active_log()?;
172
173 Ok(manager)
174 }
175
176 pub fn log(&self, record: &WalRecord) -> Result<()> {
182 let data = bincode::serde::encode_to_vec(record, bincode::config::standard())
183 .map_err(|e| Error::Serialization(e.to_string()))?;
184 let force_sync = matches!(record, WalRecord::TransactionCommit { .. });
185 self.write_frame(&data, force_sync)
186 }
187
188 pub(crate) fn write_frame(&self, data: &[u8], force_sync: bool) -> Result<()> {
196 use grafeo_core::testing::crash::maybe_crash;
197
198 self.ensure_active_log()?;
199
200 let mut guard = self.active_log.lock();
201 let log_file = guard
202 .as_mut()
203 .ok_or_else(|| Error::Internal("WAL writer not available".to_string()))?;
204
205 maybe_crash("wal_before_write");
206
207 let len = data.len() as u32;
209 log_file.writer.write_all(&len.to_le_bytes())?;
210
211 log_file.writer.write_all(data)?;
213
214 let checksum = crc32fast::hash(data);
216 log_file.writer.write_all(&checksum.to_le_bytes())?;
217
218 maybe_crash("wal_after_write");
219
220 let record_size = 4 + data.len() as u64 + 4; log_file.size += record_size;
223
224 self.total_record_count.fetch_add(1, Ordering::Relaxed);
225 self.records_since_sync.fetch_add(1, Ordering::Relaxed);
226
227 let needs_rotation = log_file.size >= self.config.max_log_size;
229
230 match &self.config.durability {
232 DurabilityMode::Sync => {
233 if force_sync {
234 maybe_crash("wal_before_flush");
235 log_file.writer.flush()?;
236 log_file.writer.get_ref().sync_all()?;
237 self.records_since_sync.store(0, Ordering::Relaxed);
238 *self.last_sync.lock() = Instant::now();
239 }
240 }
241 DurabilityMode::Batch {
242 max_delay_ms,
243 max_records,
244 } => {
245 let records = self.records_since_sync.load(Ordering::Relaxed);
246 let elapsed = self.last_sync.lock().elapsed();
247
248 if records >= *max_records || elapsed >= Duration::from_millis(*max_delay_ms) {
249 log_file.writer.flush()?;
250 log_file.writer.get_ref().sync_all()?;
251 self.records_since_sync.store(0, Ordering::Relaxed);
252 *self.last_sync.lock() = Instant::now();
253 }
254 }
255 DurabilityMode::Adaptive { .. } => {
256 log_file.writer.flush()?;
258 }
259 DurabilityMode::NoSync => {
260 log_file.writer.flush()?;
262 }
263 }
264
265 drop(guard);
266
267 if needs_rotation {
269 self.rotate()?;
270 }
271
272 Ok(())
273 }
274
275 pub fn checkpoint(&self, current_transaction: TransactionId, epoch: EpochId) -> Result<()> {
284 self.log(&WalRecord::Checkpoint {
285 transaction_id: current_transaction,
286 })?;
287 self.complete_checkpoint(current_transaction, epoch)
288 }
289
290 pub(crate) fn complete_checkpoint(
295 &self,
296 transaction_id: TransactionId,
297 epoch: EpochId,
298 ) -> Result<()> {
299 self.sync()?;
301
302 let log_sequence = self.current_sequence.load(Ordering::SeqCst);
304
305 let timestamp_ms = SystemTime::now()
307 .duration_since(UNIX_EPOCH)
308 .map(|d| d.as_millis() as u64)
309 .unwrap_or(0);
310
311 let metadata = CheckpointMetadata {
313 epoch,
314 log_sequence,
315 timestamp_ms,
316 transaction_id,
317 };
318
319 self.write_checkpoint_metadata(&metadata)?;
321
322 *self.checkpoint_epoch.lock() = Some(epoch);
324
325 self.truncate_old_logs()?;
327
328 Ok(())
329 }
330
331 fn write_checkpoint_metadata(&self, metadata: &CheckpointMetadata) -> Result<()> {
335 let metadata_path = self.dir.join(CHECKPOINT_METADATA_FILE);
336 let temp_path = self.dir.join(format!("{}.tmp", CHECKPOINT_METADATA_FILE));
337
338 let data = bincode::serde::encode_to_vec(metadata, bincode::config::standard())
340 .map_err(|e| Error::Serialization(e.to_string()))?;
341
342 let mut file = File::create(&temp_path)?;
344 file.write_all(&data)?;
345 file.sync_all()?;
346 drop(file);
347
348 fs::rename(&temp_path, &metadata_path)?;
350
351 Ok(())
352 }
353
354 pub fn read_checkpoint_metadata(&self) -> Result<Option<CheckpointMetadata>> {
358 let metadata_path = self.dir.join(CHECKPOINT_METADATA_FILE);
359
360 if !metadata_path.exists() {
361 return Ok(None);
362 }
363
364 let file = File::open(&metadata_path)?;
365 let mut reader = BufReader::new(file);
366 let mut data = Vec::new();
367 reader.read_to_end(&mut data)?;
368
369 let (metadata, _): (CheckpointMetadata, _) =
370 bincode::serde::decode_from_slice(&data, bincode::config::standard())
371 .map_err(|e| Error::Serialization(e.to_string()))?;
372
373 Ok(Some(metadata))
374 }
375
376 pub fn rotate(&self) -> Result<()> {
382 let new_sequence = self.current_sequence.fetch_add(1, Ordering::SeqCst) + 1;
383 let new_path = self.log_path(new_sequence);
384
385 let file = OpenOptions::new()
386 .create(true)
387 .read(true)
388 .append(true)
389 .open(&new_path)?;
390
391 let new_log = LogFile {
392 writer: BufWriter::new(file),
393 size: 0,
394 path: new_path,
395 };
396
397 let mut guard = self.active_log.lock();
399 if let Some(old_log) = guard.take() {
400 drop(old_log);
402 }
403 *guard = Some(new_log);
404
405 Ok(())
406 }
407
408 pub fn flush(&self) -> Result<()> {
414 let mut guard = self.active_log.lock();
415 if let Some(log_file) = guard.as_mut() {
416 log_file.writer.flush()?;
417 }
418 Ok(())
419 }
420
421 pub fn sync(&self) -> Result<()> {
427 let mut guard = self.active_log.lock();
428 if let Some(log_file) = guard.as_mut() {
429 log_file.writer.flush()?;
430 log_file.writer.get_ref().sync_all()?;
431 }
432 self.records_since_sync.store(0, Ordering::Relaxed);
433 *self.last_sync.lock() = Instant::now();
434 Ok(())
435 }
436
437 #[must_use]
439 pub fn record_count(&self) -> u64 {
440 self.total_record_count.load(Ordering::Relaxed)
441 }
442
443 #[must_use]
445 pub fn dir(&self) -> &Path {
446 &self.dir
447 }
448
449 #[must_use]
451 pub fn durability_mode(&self) -> DurabilityMode {
452 self.config.durability
453 }
454
455 pub fn log_files(&self) -> Result<Vec<PathBuf>> {
457 let mut files = Vec::new();
458
459 if let Ok(entries) = fs::read_dir(&self.dir) {
460 for entry in entries.flatten() {
461 let path = entry.path();
462 if path.extension().is_some_and(|ext| ext == "log") {
463 files.push(path);
464 }
465 }
466 }
467
468 files.sort_by(|a, b| {
470 let seq_a = Self::sequence_from_path(a).unwrap_or(0);
471 let seq_b = Self::sequence_from_path(b).unwrap_or(0);
472 seq_a.cmp(&seq_b)
473 });
474
475 Ok(files)
476 }
477
478 #[must_use]
480 pub fn checkpoint_epoch(&self) -> Option<EpochId> {
481 *self.checkpoint_epoch.lock()
482 }
483
484 #[must_use]
486 pub fn size_bytes(&self) -> usize {
487 let mut total = 0usize;
488 if let Ok(files) = self.log_files() {
489 for file in files {
490 if let Ok(metadata) = fs::metadata(&file) {
491 total += metadata.len() as usize;
492 }
493 }
494 }
495 let metadata_path = self.dir.join(CHECKPOINT_METADATA_FILE);
497 if let Ok(metadata) = fs::metadata(&metadata_path) {
498 total += metadata.len() as usize;
499 }
500 total
501 }
502
503 #[must_use]
505 pub fn last_checkpoint_timestamp(&self) -> Option<u64> {
506 if let Ok(Some(metadata)) = self.read_checkpoint_metadata() {
507 Some(metadata.timestamp_ms / 1000)
509 } else {
510 None
511 }
512 }
513
514 pub fn close_active_log(&self) {
520 let mut guard = self.active_log.lock();
521 *guard = None;
523 }
524
525 fn ensure_active_log(&self) -> Result<()> {
528 let mut guard = self.active_log.lock();
529 if guard.is_none() {
530 let sequence = self.current_sequence.load(Ordering::Relaxed);
531 let path = self.log_path(sequence);
532
533 let file = OpenOptions::new()
534 .create(true)
535 .read(true)
536 .append(true)
537 .open(&path)?;
538
539 let size = file.metadata()?.len();
540
541 *guard = Some(LogFile {
542 writer: BufWriter::new(file),
543 size,
544 path,
545 });
546 }
547 Ok(())
548 }
549
550 fn log_path(&self, sequence: u64) -> PathBuf {
551 self.dir.join(format!("wal_{:08}.log", sequence))
552 }
553
554 fn sequence_from_path(path: &Path) -> Option<u64> {
555 path.file_stem()
556 .and_then(|s| s.to_str())
557 .and_then(|s| s.strip_prefix("wal_"))
558 .and_then(|s| s.parse().ok())
559 }
560
561 fn truncate_old_logs(&self) -> Result<()> {
562 let Some(checkpoint) = *self.checkpoint_epoch.lock() else {
563 return Ok(());
564 };
565
566 let files = self.log_files()?;
569 let current_seq = self.current_sequence.load(Ordering::Relaxed);
570
571 for file in files {
572 if let Some(seq) = Self::sequence_from_path(&file) {
573 if seq + 2 < current_seq {
575 if checkpoint.as_u64() > seq {
577 let _ = fs::remove_file(&file);
578 }
579 }
580 }
581 }
582
583 Ok(())
584 }
585}
586
587impl WalManager {
589 pub fn open_file(path: impl AsRef<Path>) -> Result<Self> {
595 let path = path.as_ref();
596 let dir = path.parent().unwrap_or(Path::new("."));
597 let manager = Self::open(dir)?;
598 Ok(manager)
599 }
600
601 #[must_use]
603 pub fn path(&self) -> PathBuf {
604 let guard = self.active_log.lock();
605 guard
606 .as_ref()
607 .map_or_else(|| self.log_path(0), |l| l.path.clone())
608 }
609}
610
611#[cfg(test)]
612mod tests {
613 use super::*;
614 use grafeo_common::types::NodeId;
615 use tempfile::tempdir;
616
617 #[test]
618 fn test_wal_write() {
619 let dir = tempdir().unwrap();
620
621 let wal = WalManager::open(dir.path()).unwrap();
622
623 let record = WalRecord::CreateNode {
624 id: NodeId::new(1),
625 labels: vec!["Person".to_string()],
626 };
627
628 wal.log(&record).unwrap();
629 wal.flush().unwrap();
630
631 assert_eq!(wal.record_count(), 1);
632 }
633
634 #[test]
635 fn test_wal_rotation() {
636 let dir = tempdir().unwrap();
637
638 let config = WalConfig {
640 max_log_size: 100,
641 ..Default::default()
642 };
643
644 let wal = WalManager::with_config(dir.path(), config).unwrap();
645
646 for i in 0..10 {
648 let record = WalRecord::CreateNode {
649 id: NodeId::new(i),
650 labels: vec!["Person".to_string()],
651 };
652 wal.log(&record).unwrap();
653 }
654
655 wal.flush().unwrap();
656
657 let files = wal.log_files().unwrap();
659 assert!(
660 files.len() > 1,
661 "Expected multiple log files after rotation"
662 );
663 }
664
665 #[test]
666 fn test_durability_modes() {
667 let dir = tempdir().unwrap();
668
669 let config = WalConfig {
671 durability: DurabilityMode::Sync,
672 ..Default::default()
673 };
674 let wal = WalManager::with_config(dir.path().join("sync"), config).unwrap();
675 wal.log(&WalRecord::TransactionCommit {
676 transaction_id: TransactionId::new(1),
677 })
678 .unwrap();
679
680 let config = WalConfig {
682 durability: DurabilityMode::NoSync,
683 ..Default::default()
684 };
685 let wal = WalManager::with_config(dir.path().join("nosync"), config).unwrap();
686 wal.log(&WalRecord::CreateNode {
687 id: NodeId::new(1),
688 labels: vec![],
689 })
690 .unwrap();
691
692 let config = WalConfig {
694 durability: DurabilityMode::Batch {
695 max_delay_ms: 10,
696 max_records: 5,
697 },
698 ..Default::default()
699 };
700 let wal = WalManager::with_config(dir.path().join("batch"), config).unwrap();
701 for i in 0..10 {
702 wal.log(&WalRecord::CreateNode {
703 id: NodeId::new(i),
704 labels: vec![],
705 })
706 .unwrap();
707 }
708
709 let config = WalConfig {
711 durability: DurabilityMode::Adaptive {
712 target_interval_ms: 100,
713 },
714 ..Default::default()
715 };
716 let wal = WalManager::with_config(dir.path().join("adaptive"), config).unwrap();
717 for i in 0..10 {
718 wal.log(&WalRecord::CreateNode {
719 id: NodeId::new(i),
720 labels: vec![],
721 })
722 .unwrap();
723 }
724 wal.sync().unwrap();
726 }
727
728 #[test]
729 fn test_checkpoint() {
730 let dir = tempdir().unwrap();
731
732 let wal = WalManager::open(dir.path()).unwrap();
733
734 wal.log(&WalRecord::CreateNode {
736 id: NodeId::new(1),
737 labels: vec!["Test".to_string()],
738 })
739 .unwrap();
740
741 wal.log(&WalRecord::TransactionCommit {
742 transaction_id: TransactionId::new(1),
743 })
744 .unwrap();
745
746 wal.checkpoint(TransactionId::new(1), EpochId::new(10))
748 .unwrap();
749
750 assert_eq!(wal.checkpoint_epoch(), Some(EpochId::new(10)));
751 }
752}