graphos_adapters/storage/wal/
async_log.rs1use super::{DurabilityMode, WalConfig, WalRecord};
4use graphos_common::types::{EpochId, TxId};
5use graphos_common::utils::error::{Error, Result};
6use std::path::{Path, PathBuf};
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::{Duration, Instant};
9use tokio::fs::{self, File, OpenOptions};
10use tokio::io::{AsyncWriteExt, BufWriter};
11use tokio::sync::Mutex;
12use tokio::task::JoinHandle;
13
14struct AsyncLogFile {
16 writer: BufWriter<File>,
18 size: u64,
20 #[allow(dead_code)]
22 path: PathBuf,
23 #[allow(dead_code)]
25 sequence: u64,
26}
27
28pub struct AsyncWalManager {
33 dir: PathBuf,
35 config: WalConfig,
37 active_log: Mutex<Option<AsyncLogFile>>,
39 total_record_count: AtomicU64,
41 records_since_sync: AtomicU64,
43 last_sync: Mutex<Instant>,
45 current_sequence: AtomicU64,
47 checkpoint_epoch: Mutex<Option<EpochId>>,
49 background_sync_handle: Mutex<Option<JoinHandle<()>>>,
51 shutdown_tx: Mutex<Option<tokio::sync::oneshot::Sender<()>>>,
53}
54
55impl AsyncWalManager {
56 pub async fn open(dir: impl AsRef<Path>) -> Result<Self> {
62 Self::with_config(dir, WalConfig::default()).await
63 }
64
65 pub async fn with_config(dir: impl AsRef<Path>, config: WalConfig) -> Result<Self> {
71 let dir = dir.as_ref().to_path_buf();
72 fs::create_dir_all(&dir).await?;
73
74 let mut max_sequence = 0u64;
76 let mut entries = fs::read_dir(&dir).await?;
77 while let Some(entry) = entries.next_entry().await? {
78 if let Some(name) = entry.file_name().to_str() {
79 if let Some(seq_str) = name
80 .strip_prefix("wal_")
81 .and_then(|s| s.strip_suffix(".log"))
82 {
83 if let Ok(seq) = seq_str.parse::<u64>() {
84 max_sequence = max_sequence.max(seq);
85 }
86 }
87 }
88 }
89
90 let manager = Self {
91 dir,
92 config,
93 active_log: Mutex::new(None),
94 total_record_count: AtomicU64::new(0),
95 records_since_sync: AtomicU64::new(0),
96 last_sync: Mutex::new(Instant::now()),
97 current_sequence: AtomicU64::new(max_sequence),
98 checkpoint_epoch: Mutex::new(None),
99 background_sync_handle: Mutex::new(None),
100 shutdown_tx: Mutex::new(None),
101 };
102
103 manager.ensure_active_log().await?;
105
106 Ok(manager)
107 }
108
109 pub async fn log(&self, record: &WalRecord) -> Result<()> {
115 self.ensure_active_log().await?;
116
117 let mut guard = self.active_log.lock().await;
118 let log_file = guard
119 .as_mut()
120 .ok_or_else(|| Error::Internal("WAL writer not available".to_string()))?;
121
122 let data = bincode::serde::encode_to_vec(record, bincode::config::standard())
124 .map_err(|e| Error::Serialization(e.to_string()))?;
125
126 let len = data.len() as u32;
128 log_file.writer.write_all(&len.to_le_bytes()).await?;
129
130 log_file.writer.write_all(&data).await?;
132
133 let checksum = crc32fast::hash(&data);
135 log_file.writer.write_all(&checksum.to_le_bytes()).await?;
136
137 let record_size = 4 + data.len() as u64 + 4; log_file.size += record_size;
140
141 self.total_record_count.fetch_add(1, Ordering::Relaxed);
142 self.records_since_sync.fetch_add(1, Ordering::Relaxed);
143
144 let needs_rotation = log_file.size >= self.config.max_log_size;
146
147 match &self.config.durability {
149 DurabilityMode::Sync => {
150 if matches!(record, WalRecord::TxCommit { .. }) {
152 log_file.writer.flush().await?;
153 log_file.writer.get_ref().sync_all().await?;
154 self.records_since_sync.store(0, Ordering::Relaxed);
155 *self.last_sync.lock().await = Instant::now();
156 }
157 }
158 DurabilityMode::Batch {
159 max_delay_ms,
160 max_records,
161 } => {
162 let records = self.records_since_sync.load(Ordering::Relaxed);
163 let elapsed = self.last_sync.lock().await.elapsed();
164
165 if records >= *max_records || elapsed >= Duration::from_millis(*max_delay_ms) {
166 log_file.writer.flush().await?;
167 log_file.writer.get_ref().sync_all().await?;
168 self.records_since_sync.store(0, Ordering::Relaxed);
169 *self.last_sync.lock().await = Instant::now();
170 }
171 }
172 DurabilityMode::NoSync => {
173 log_file.writer.flush().await?;
175 }
176 }
177
178 drop(guard);
179
180 if needs_rotation {
182 self.rotate().await?;
183 }
184
185 Ok(())
186 }
187
188 pub async fn checkpoint(&self, current_tx: TxId, epoch: EpochId) -> Result<()> {
194 self.log(&WalRecord::Checkpoint { tx_id: current_tx })
196 .await?;
197
198 self.sync().await?;
200
201 *self.checkpoint_epoch.lock().await = Some(epoch);
203
204 self.truncate_old_logs().await?;
206
207 Ok(())
208 }
209
210 pub async fn rotate(&self) -> Result<()> {
216 let new_sequence = self.current_sequence.fetch_add(1, Ordering::SeqCst) + 1;
217 let new_path = self.log_path(new_sequence);
218
219 let file = OpenOptions::new()
220 .create(true)
221 .read(true)
222 .append(true)
223 .open(&new_path)
224 .await?;
225
226 let new_log = AsyncLogFile {
227 writer: BufWriter::new(file),
228 size: 0,
229 path: new_path,
230 sequence: new_sequence,
231 };
232
233 let mut guard = self.active_log.lock().await;
235 if let Some(old_log) = guard.take() {
236 drop(old_log);
238 }
239 *guard = Some(new_log);
240
241 Ok(())
242 }
243
244 pub async fn flush(&self) -> Result<()> {
250 let mut guard = self.active_log.lock().await;
251 if let Some(log_file) = guard.as_mut() {
252 log_file.writer.flush().await?;
253 }
254 Ok(())
255 }
256
257 pub async fn sync(&self) -> Result<()> {
263 let mut guard = self.active_log.lock().await;
264 if let Some(log_file) = guard.as_mut() {
265 log_file.writer.flush().await?;
266 log_file.writer.get_ref().sync_all().await?;
267 }
268 self.records_since_sync.store(0, Ordering::Relaxed);
269 *self.last_sync.lock().await = Instant::now();
270 Ok(())
271 }
272
273 pub async fn start_background_sync(&self) -> bool {
284 let DurabilityMode::Batch { max_delay_ms, .. } = self.config.durability else {
285 return false;
286 };
287
288 let mut handle_guard = self.background_sync_handle.lock().await;
289 if handle_guard.is_some() {
290 return false;
291 }
292
293 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
294 *self.shutdown_tx.lock().await = Some(shutdown_tx);
295
296 let interval = Duration::from_millis(max_delay_ms);
299
300 let handle = tokio::spawn(async move {
301 let mut ticker = tokio::time::interval(interval);
302 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
303
304 loop {
305 tokio::select! {
306 _ = ticker.tick() => {
307 }
311 _ = &mut shutdown_rx => {
312 break;
313 }
314 }
315 }
316 });
317
318 *handle_guard = Some(handle);
319 true
320 }
321
322 pub async fn stop_background_sync(&self) {
324 if let Some(tx) = self.shutdown_tx.lock().await.take() {
325 let _ = tx.send(());
326 }
327 if let Some(handle) = self.background_sync_handle.lock().await.take() {
328 let _ = handle.await;
329 }
330 }
331
332 #[must_use]
334 pub fn record_count(&self) -> u64 {
335 self.total_record_count.load(Ordering::Relaxed)
336 }
337
338 #[must_use]
340 pub fn dir(&self) -> &Path {
341 &self.dir
342 }
343
344 #[must_use]
346 pub fn durability_mode(&self) -> DurabilityMode {
347 self.config.durability
348 }
349
350 pub async fn log_files(&self) -> Result<Vec<PathBuf>> {
352 let mut files = Vec::new();
353
354 let mut entries = fs::read_dir(&self.dir).await?;
355 while let Some(entry) = entries.next_entry().await? {
356 let path = entry.path();
357 if path.extension().is_some_and(|ext| ext == "log") {
358 files.push(path);
359 }
360 }
361
362 files.sort_by(|a, b| {
364 let seq_a = Self::sequence_from_path(a).unwrap_or(0);
365 let seq_b = Self::sequence_from_path(b).unwrap_or(0);
366 seq_a.cmp(&seq_b)
367 });
368
369 Ok(files)
370 }
371
372 pub async fn checkpoint_epoch(&self) -> Option<EpochId> {
374 *self.checkpoint_epoch.lock().await
375 }
376
377 async fn ensure_active_log(&self) -> Result<()> {
380 let mut guard = self.active_log.lock().await;
381 if guard.is_none() {
382 let sequence = self.current_sequence.load(Ordering::Relaxed);
383 let path = self.log_path(sequence);
384
385 let file = OpenOptions::new()
386 .create(true)
387 .read(true)
388 .append(true)
389 .open(&path)
390 .await?;
391
392 let size = file.metadata().await?.len();
393
394 *guard = Some(AsyncLogFile {
395 writer: BufWriter::new(file),
396 size,
397 path,
398 sequence,
399 });
400 }
401 Ok(())
402 }
403
404 fn log_path(&self, sequence: u64) -> PathBuf {
405 self.dir.join(format!("wal_{sequence:08}.log"))
406 }
407
408 fn sequence_from_path(path: &Path) -> Option<u64> {
409 path.file_stem()
410 .and_then(|s| s.to_str())
411 .and_then(|s| s.strip_prefix("wal_"))
412 .and_then(|s| s.parse().ok())
413 }
414
415 async fn truncate_old_logs(&self) -> Result<()> {
416 let checkpoint = match *self.checkpoint_epoch.lock().await {
417 Some(e) => e,
418 None => return Ok(()),
419 };
420
421 let files = self.log_files().await?;
424 let current_seq = self.current_sequence.load(Ordering::Relaxed);
425
426 for file in files {
427 if let Some(seq) = Self::sequence_from_path(&file) {
428 if seq + 2 < current_seq && checkpoint.as_u64() > seq {
430 let _ = fs::remove_file(&file).await;
431 }
432 }
433 }
434
435 Ok(())
436 }
437}
438
439impl Drop for AsyncWalManager {
440 fn drop(&mut self) {
441 }
444}
445
446#[cfg(test)]
447mod tests {
448 use super::*;
449 use graphos_common::types::NodeId;
450 use tempfile::tempdir;
451
452 #[tokio::test]
453 async fn test_async_wal_write() {
454 let dir = tempdir().unwrap();
455
456 let wal = AsyncWalManager::open(dir.path()).await.unwrap();
457
458 let record = WalRecord::CreateNode {
459 id: NodeId::new(1),
460 labels: vec!["Person".to_string()],
461 };
462
463 wal.log(&record).await.unwrap();
464 wal.flush().await.unwrap();
465
466 assert_eq!(wal.record_count(), 1);
467 }
468
469 #[tokio::test]
470 async fn test_async_wal_rotation() {
471 let dir = tempdir().unwrap();
472
473 let config = WalConfig {
475 max_log_size: 100,
476 ..Default::default()
477 };
478
479 let wal = AsyncWalManager::with_config(dir.path(), config)
480 .await
481 .unwrap();
482
483 for i in 0..10 {
485 let record = WalRecord::CreateNode {
486 id: NodeId::new(i),
487 labels: vec!["Person".to_string()],
488 };
489 wal.log(&record).await.unwrap();
490 }
491
492 wal.flush().await.unwrap();
493
494 let files = wal.log_files().await.unwrap();
496 assert!(
497 files.len() > 1,
498 "Expected multiple log files after rotation"
499 );
500 }
501
502 #[tokio::test]
503 async fn test_async_durability_modes() {
504 let dir = tempdir().unwrap();
505
506 let config = WalConfig {
508 durability: DurabilityMode::Sync,
509 ..Default::default()
510 };
511 let wal = AsyncWalManager::with_config(dir.path().join("sync"), config)
512 .await
513 .unwrap();
514 wal.log(&WalRecord::TxCommit {
515 tx_id: TxId::new(1),
516 })
517 .await
518 .unwrap();
519
520 let config = WalConfig {
522 durability: DurabilityMode::NoSync,
523 ..Default::default()
524 };
525 let wal = AsyncWalManager::with_config(dir.path().join("nosync"), config)
526 .await
527 .unwrap();
528 wal.log(&WalRecord::CreateNode {
529 id: NodeId::new(1),
530 labels: vec![],
531 })
532 .await
533 .unwrap();
534
535 let config = WalConfig {
537 durability: DurabilityMode::Batch {
538 max_delay_ms: 10,
539 max_records: 5,
540 },
541 ..Default::default()
542 };
543 let wal = AsyncWalManager::with_config(dir.path().join("batch"), config)
544 .await
545 .unwrap();
546 for i in 0..10 {
547 wal.log(&WalRecord::CreateNode {
548 id: NodeId::new(i),
549 labels: vec![],
550 })
551 .await
552 .unwrap();
553 }
554 }
555
556 #[tokio::test]
557 async fn test_async_checkpoint() {
558 let dir = tempdir().unwrap();
559
560 let wal = AsyncWalManager::open(dir.path()).await.unwrap();
561
562 wal.log(&WalRecord::CreateNode {
564 id: NodeId::new(1),
565 labels: vec!["Test".to_string()],
566 })
567 .await
568 .unwrap();
569
570 wal.log(&WalRecord::TxCommit {
571 tx_id: TxId::new(1),
572 })
573 .await
574 .unwrap();
575
576 wal.checkpoint(TxId::new(1), EpochId::new(10))
578 .await
579 .unwrap();
580
581 assert_eq!(wal.checkpoint_epoch().await, Some(EpochId::new(10)));
582 }
583
584 #[tokio::test]
585 async fn test_background_sync() {
586 let dir = tempdir().unwrap();
587
588 let config = WalConfig {
589 durability: DurabilityMode::Batch {
590 max_delay_ms: 50,
591 max_records: 1000,
592 },
593 ..Default::default()
594 };
595
596 let wal = AsyncWalManager::with_config(dir.path(), config)
597 .await
598 .unwrap();
599
600 assert!(wal.start_background_sync().await);
602
603 assert!(!wal.start_background_sync().await);
605
606 wal.log(&WalRecord::CreateNode {
608 id: NodeId::new(1),
609 labels: vec![],
610 })
611 .await
612 .unwrap();
613
614 tokio::time::sleep(Duration::from_millis(100)).await;
616
617 wal.stop_background_sync().await;
619 }
620}