grafeo_adapters/storage/wal/
async_log.rs1use super::{DurabilityMode, WalConfig, WalRecord};
4use grafeo_common::types::{EpochId, TransactionId};
5use grafeo_common::utils::error::{Error, Result};
6use std::path::{Path, PathBuf};
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::{Duration, Instant};
9use tokio::fs::{self, File, OpenOptions};
10use tokio::io::{AsyncWriteExt, BufWriter};
11use tokio::sync::Mutex;
12use tokio::task::JoinHandle;
13
14struct AsyncLogFile {
16 writer: BufWriter<File>,
18 size: u64,
20}
21
22pub struct AsyncWalManager {
27 dir: PathBuf,
29 config: WalConfig,
31 active_log: Mutex<Option<AsyncLogFile>>,
33 total_record_count: AtomicU64,
35 records_since_sync: AtomicU64,
37 last_sync: Mutex<Instant>,
39 current_sequence: AtomicU64,
41 checkpoint_epoch: Mutex<Option<EpochId>>,
43 background_sync_handle: Mutex<Option<JoinHandle<()>>>,
45 shutdown_tx: Mutex<Option<tokio::sync::oneshot::Sender<()>>>,
47}
48
49impl AsyncWalManager {
50 pub async fn open(dir: impl AsRef<Path>) -> Result<Self> {
56 Self::with_config(dir, WalConfig::default()).await
57 }
58
59 pub async fn with_config(dir: impl AsRef<Path>, config: WalConfig) -> Result<Self> {
65 let dir = dir.as_ref().to_path_buf();
66 fs::create_dir_all(&dir).await?;
67
68 let mut max_sequence = 0u64;
70 let mut entries = fs::read_dir(&dir).await?;
71 while let Some(entry) = entries.next_entry().await? {
72 if let Some(name) = entry.file_name().to_str()
73 && let Some(seq_str) = name
74 .strip_prefix("wal_")
75 .and_then(|s| s.strip_suffix(".log"))
76 && let Ok(seq) = seq_str.parse::<u64>()
77 {
78 max_sequence = max_sequence.max(seq);
79 }
80 }
81
82 let manager = Self {
83 dir,
84 config,
85 active_log: Mutex::new(None),
86 total_record_count: AtomicU64::new(0),
87 records_since_sync: AtomicU64::new(0),
88 last_sync: Mutex::new(Instant::now()),
89 current_sequence: AtomicU64::new(max_sequence),
90 checkpoint_epoch: Mutex::new(None),
91 background_sync_handle: Mutex::new(None),
92 shutdown_tx: Mutex::new(None),
93 };
94
95 manager.ensure_active_log().await?;
97
98 Ok(manager)
99 }
100
101 pub async fn log(&self, record: &WalRecord) -> Result<()> {
107 self.ensure_active_log().await?;
108
109 let mut guard = self.active_log.lock().await;
110 let log_file = guard
111 .as_mut()
112 .ok_or_else(|| Error::Internal("WAL writer not available".to_string()))?;
113
114 let data = bincode::serde::encode_to_vec(record, bincode::config::standard())
116 .map_err(|e| Error::Serialization(e.to_string()))?;
117
118 let len = data.len() as u32;
120 log_file.writer.write_all(&len.to_le_bytes()).await?;
121
122 log_file.writer.write_all(&data).await?;
124
125 let checksum = crc32fast::hash(&data);
127 log_file.writer.write_all(&checksum.to_le_bytes()).await?;
128
129 let record_size = 4 + data.len() as u64 + 4; log_file.size += record_size;
132
133 self.total_record_count.fetch_add(1, Ordering::Relaxed);
134 self.records_since_sync.fetch_add(1, Ordering::Relaxed);
135
136 let needs_rotation = log_file.size >= self.config.max_log_size;
138
139 match &self.config.durability {
141 DurabilityMode::Sync => {
142 if matches!(record, WalRecord::TransactionCommit { .. }) {
144 log_file.writer.flush().await?;
145 log_file.writer.get_ref().sync_all().await?;
146 self.records_since_sync.store(0, Ordering::Relaxed);
147 *self.last_sync.lock().await = Instant::now();
148 }
149 }
150 DurabilityMode::Batch {
151 max_delay_ms,
152 max_records,
153 } => {
154 let records = self.records_since_sync.load(Ordering::Relaxed);
155 let elapsed = self.last_sync.lock().await.elapsed();
156
157 if records >= *max_records || elapsed >= Duration::from_millis(*max_delay_ms) {
158 log_file.writer.flush().await?;
159 log_file.writer.get_ref().sync_all().await?;
160 self.records_since_sync.store(0, Ordering::Relaxed);
161 *self.last_sync.lock().await = Instant::now();
162 }
163 }
164 DurabilityMode::Adaptive { .. } => {
165 log_file.writer.flush().await?;
168 }
169 DurabilityMode::NoSync => {
170 log_file.writer.flush().await?;
172 }
173 }
174
175 drop(guard);
176
177 if needs_rotation {
179 self.rotate().await?;
180 }
181
182 Ok(())
183 }
184
185 pub async fn checkpoint(
191 &self,
192 current_transaction: TransactionId,
193 epoch: EpochId,
194 ) -> Result<()> {
195 self.log(&WalRecord::Checkpoint {
197 transaction_id: current_transaction,
198 })
199 .await?;
200
201 self.sync().await?;
203
204 *self.checkpoint_epoch.lock().await = Some(epoch);
206
207 self.truncate_old_logs().await?;
209
210 Ok(())
211 }
212
213 pub async fn rotate(&self) -> Result<()> {
219 let new_sequence = self.current_sequence.fetch_add(1, Ordering::SeqCst) + 1;
220 let new_path = self.log_path(new_sequence);
221
222 let file = OpenOptions::new()
223 .create(true)
224 .read(true)
225 .append(true)
226 .open(&new_path)
227 .await?;
228
229 let new_log = AsyncLogFile {
230 writer: BufWriter::new(file),
231 size: 0,
232 };
233
234 let mut guard = self.active_log.lock().await;
236 if let Some(old_log) = guard.take() {
237 drop(old_log);
239 }
240 *guard = Some(new_log);
241
242 Ok(())
243 }
244
245 pub async fn flush(&self) -> Result<()> {
251 let mut guard = self.active_log.lock().await;
252 if let Some(log_file) = guard.as_mut() {
253 log_file.writer.flush().await?;
254 }
255 Ok(())
256 }
257
258 pub async fn sync(&self) -> Result<()> {
264 let mut guard = self.active_log.lock().await;
265 if let Some(log_file) = guard.as_mut() {
266 log_file.writer.flush().await?;
267 log_file.writer.get_ref().sync_all().await?;
268 }
269 self.records_since_sync.store(0, Ordering::Relaxed);
270 *self.last_sync.lock().await = Instant::now();
271 Ok(())
272 }
273
274 pub async fn start_background_sync(&self) -> bool {
285 let DurabilityMode::Batch { max_delay_ms, .. } = self.config.durability else {
286 return false;
287 };
288
289 let mut handle_guard = self.background_sync_handle.lock().await;
290 if handle_guard.is_some() {
291 return false;
292 }
293
294 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
295 *self.shutdown_tx.lock().await = Some(shutdown_tx);
296
297 let interval = Duration::from_millis(max_delay_ms);
300
301 let handle = tokio::spawn(async move {
302 let mut ticker = tokio::time::interval(interval);
303 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
304
305 loop {
306 tokio::select! {
307 _ = ticker.tick() => {
308 }
312 _ = &mut shutdown_rx => {
313 break;
314 }
315 }
316 }
317 });
318
319 *handle_guard = Some(handle);
320 true
321 }
322
323 pub async fn stop_background_sync(&self) {
325 if let Some(tx) = self.shutdown_tx.lock().await.take() {
326 let _ = tx.send(());
327 }
328 if let Some(handle) = self.background_sync_handle.lock().await.take() {
329 let _ = handle.await;
330 }
331 }
332
333 #[must_use]
335 pub fn record_count(&self) -> u64 {
336 self.total_record_count.load(Ordering::Relaxed)
337 }
338
339 #[must_use]
341 pub fn dir(&self) -> &Path {
342 &self.dir
343 }
344
345 #[must_use]
347 pub fn durability_mode(&self) -> DurabilityMode {
348 self.config.durability
349 }
350
351 pub async fn log_files(&self) -> Result<Vec<PathBuf>> {
353 let mut files = Vec::new();
354
355 let mut entries = fs::read_dir(&self.dir).await?;
356 while let Some(entry) = entries.next_entry().await? {
357 let path = entry.path();
358 if path.extension().is_some_and(|ext| ext == "log") {
359 files.push(path);
360 }
361 }
362
363 files.sort_by(|a, b| {
365 let seq_a = Self::sequence_from_path(a).unwrap_or(0);
366 let seq_b = Self::sequence_from_path(b).unwrap_or(0);
367 seq_a.cmp(&seq_b)
368 });
369
370 Ok(files)
371 }
372
373 pub async fn checkpoint_epoch(&self) -> Option<EpochId> {
375 *self.checkpoint_epoch.lock().await
376 }
377
378 async fn ensure_active_log(&self) -> Result<()> {
381 let mut guard = self.active_log.lock().await;
382 if guard.is_none() {
383 let sequence = self.current_sequence.load(Ordering::Relaxed);
384 let path = self.log_path(sequence);
385
386 let file = OpenOptions::new()
387 .create(true)
388 .read(true)
389 .append(true)
390 .open(&path)
391 .await?;
392
393 let size = file.metadata().await?.len();
394
395 *guard = Some(AsyncLogFile {
396 writer: BufWriter::new(file),
397 size,
398 });
399 }
400 Ok(())
401 }
402
403 fn log_path(&self, sequence: u64) -> PathBuf {
404 self.dir.join(format!("wal_{sequence:08}.log"))
405 }
406
407 fn sequence_from_path(path: &Path) -> Option<u64> {
408 path.file_stem()
409 .and_then(|s| s.to_str())
410 .and_then(|s| s.strip_prefix("wal_"))
411 .and_then(|s| s.parse().ok())
412 }
413
414 async fn truncate_old_logs(&self) -> Result<()> {
415 let Some(checkpoint) = *self.checkpoint_epoch.lock().await else {
416 return Ok(());
417 };
418
419 let files = self.log_files().await?;
422 let current_seq = self.current_sequence.load(Ordering::Relaxed);
423
424 for file in files {
425 if let Some(seq) = Self::sequence_from_path(&file) {
426 if seq + 2 < current_seq && checkpoint.as_u64() > seq {
428 let _ = fs::remove_file(&file).await;
429 }
430 }
431 }
432
433 Ok(())
434 }
435}
436
437impl Drop for AsyncWalManager {
438 fn drop(&mut self) {
439 }
442}
443
444#[cfg(test)]
445mod tests {
446 use super::*;
447 use grafeo_common::types::NodeId;
448 use tempfile::tempdir;
449
450 #[tokio::test]
451 async fn test_async_wal_write() {
452 let dir = tempdir().unwrap();
453
454 let wal = AsyncWalManager::open(dir.path()).await.unwrap();
455
456 let record = WalRecord::CreateNode {
457 id: NodeId::new(1),
458 labels: vec!["Person".to_string()],
459 };
460
461 wal.log(&record).await.unwrap();
462 wal.flush().await.unwrap();
463
464 assert_eq!(wal.record_count(), 1);
465 }
466
467 #[tokio::test]
468 async fn test_async_wal_rotation() {
469 let dir = tempdir().unwrap();
470
471 let config = WalConfig {
473 max_log_size: 100,
474 ..Default::default()
475 };
476
477 let wal = AsyncWalManager::with_config(dir.path(), config)
478 .await
479 .unwrap();
480
481 for i in 0..10 {
483 let record = WalRecord::CreateNode {
484 id: NodeId::new(i),
485 labels: vec!["Person".to_string()],
486 };
487 wal.log(&record).await.unwrap();
488 }
489
490 wal.flush().await.unwrap();
491
492 let files = wal.log_files().await.unwrap();
494 assert!(
495 files.len() > 1,
496 "Expected multiple log files after rotation"
497 );
498 }
499
500 #[tokio::test]
501 async fn test_async_durability_modes() {
502 let dir = tempdir().unwrap();
503
504 let config = WalConfig {
506 durability: DurabilityMode::Sync,
507 ..Default::default()
508 };
509 let wal = AsyncWalManager::with_config(dir.path().join("sync"), config)
510 .await
511 .unwrap();
512 wal.log(&WalRecord::TransactionCommit {
513 transaction_id: TransactionId::new(1),
514 })
515 .await
516 .unwrap();
517
518 let config = WalConfig {
520 durability: DurabilityMode::NoSync,
521 ..Default::default()
522 };
523 let wal = AsyncWalManager::with_config(dir.path().join("nosync"), config)
524 .await
525 .unwrap();
526 wal.log(&WalRecord::CreateNode {
527 id: NodeId::new(1),
528 labels: vec![],
529 })
530 .await
531 .unwrap();
532
533 let config = WalConfig {
535 durability: DurabilityMode::Batch {
536 max_delay_ms: 10,
537 max_records: 5,
538 },
539 ..Default::default()
540 };
541 let wal = AsyncWalManager::with_config(dir.path().join("batch"), config)
542 .await
543 .unwrap();
544 for i in 0..10 {
545 wal.log(&WalRecord::CreateNode {
546 id: NodeId::new(i),
547 labels: vec![],
548 })
549 .await
550 .unwrap();
551 }
552 }
553
554 #[tokio::test]
555 async fn test_async_checkpoint() {
556 let dir = tempdir().unwrap();
557
558 let wal = AsyncWalManager::open(dir.path()).await.unwrap();
559
560 wal.log(&WalRecord::CreateNode {
562 id: NodeId::new(1),
563 labels: vec!["Test".to_string()],
564 })
565 .await
566 .unwrap();
567
568 wal.log(&WalRecord::TransactionCommit {
569 transaction_id: TransactionId::new(1),
570 })
571 .await
572 .unwrap();
573
574 wal.checkpoint(TransactionId::new(1), EpochId::new(10))
576 .await
577 .unwrap();
578
579 assert_eq!(wal.checkpoint_epoch().await, Some(EpochId::new(10)));
580 }
581
582 #[tokio::test]
583 async fn test_background_sync() {
584 let dir = tempdir().unwrap();
585
586 let config = WalConfig {
587 durability: DurabilityMode::Batch {
588 max_delay_ms: 50,
589 max_records: 1000,
590 },
591 ..Default::default()
592 };
593
594 let wal = AsyncWalManager::with_config(dir.path(), config)
595 .await
596 .unwrap();
597
598 assert!(wal.start_background_sync().await);
600
601 assert!(!wal.start_background_sync().await);
603
604 wal.log(&WalRecord::CreateNode {
606 id: NodeId::new(1),
607 labels: vec![],
608 })
609 .await
610 .unwrap();
611
612 tokio::time::sleep(Duration::from_millis(100)).await;
614
615 wal.stop_background_sync().await;
617 }
618}