Skip to main content

grafeo_adapters/storage/wal/
async_log.rs

1//! Async WAL implementation using tokio for non-blocking I/O.
2
3use super::{DurabilityMode, WalConfig, WalRecord};
4use grafeo_common::types::{EpochId, TransactionId};
5use grafeo_common::utils::error::{Error, Result};
6use std::path::{Path, PathBuf};
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::{Duration, Instant};
9use tokio::fs::{self, File, OpenOptions};
10use tokio::io::{AsyncWriteExt, BufWriter};
11use tokio::sync::Mutex;
12use tokio::task::JoinHandle;
13
14/// State for a single async log file.
15struct AsyncLogFile {
16    /// Async file handle with buffering.
17    writer: BufWriter<File>,
18    /// Current size in bytes.
19    size: u64,
20}
21
22/// Async Write-Ahead Log manager with non-blocking I/O.
23///
24/// This manager provides the same durability guarantees as the sync version
25/// but uses tokio's async I/O for better throughput in async contexts.
26pub struct AsyncWalManager {
27    /// Directory for WAL files.
28    dir: PathBuf,
29    /// Configuration.
30    config: WalConfig,
31    /// Active log file (async mutex for async access).
32    active_log: Mutex<Option<AsyncLogFile>>,
33    /// Total number of records written across all log files.
34    total_record_count: AtomicU64,
35    /// Records since last sync (for batch mode).
36    records_since_sync: AtomicU64,
37    /// Time of last sync (for batch mode).
38    last_sync: Mutex<Instant>,
39    /// Current log sequence number.
40    current_sequence: AtomicU64,
41    /// Latest checkpoint epoch.
42    checkpoint_epoch: Mutex<Option<EpochId>>,
43    /// Background sync task handle (for batch mode).
44    background_sync_handle: Mutex<Option<JoinHandle<()>>>,
45    /// Shutdown signal sender.
46    shutdown_tx: Mutex<Option<tokio::sync::oneshot::Sender<()>>>,
47}
48
49impl AsyncWalManager {
50    /// Opens or creates an async WAL in the given directory.
51    ///
52    /// # Errors
53    ///
54    /// Returns an error if the directory cannot be created or accessed.
55    pub async fn open(dir: impl AsRef<Path>) -> Result<Self> {
56        Self::with_config(dir, WalConfig::default()).await
57    }
58
59    /// Opens or creates an async WAL with custom configuration.
60    ///
61    /// # Errors
62    ///
63    /// Returns an error if the directory cannot be created or accessed.
64    pub async fn with_config(dir: impl AsRef<Path>, config: WalConfig) -> Result<Self> {
65        let dir = dir.as_ref().to_path_buf();
66        fs::create_dir_all(&dir).await?;
67
68        // Find the highest existing sequence number
69        let mut max_sequence = 0u64;
70        let mut entries = fs::read_dir(&dir).await?;
71        while let Some(entry) = entries.next_entry().await? {
72            if let Some(name) = entry.file_name().to_str()
73                && let Some(seq_str) = name
74                    .strip_prefix("wal_")
75                    .and_then(|s| s.strip_suffix(".log"))
76                && let Ok(seq) = seq_str.parse::<u64>()
77            {
78                max_sequence = max_sequence.max(seq);
79            }
80        }
81
82        let manager = Self {
83            dir,
84            config,
85            active_log: Mutex::new(None),
86            total_record_count: AtomicU64::new(0),
87            records_since_sync: AtomicU64::new(0),
88            last_sync: Mutex::new(Instant::now()),
89            current_sequence: AtomicU64::new(max_sequence),
90            checkpoint_epoch: Mutex::new(None),
91            background_sync_handle: Mutex::new(None),
92            shutdown_tx: Mutex::new(None),
93        };
94
95        // Open or create the active log
96        manager.ensure_active_log().await?;
97
98        Ok(manager)
99    }
100
101    /// Logs a record to the WAL asynchronously.
102    ///
103    /// # Errors
104    ///
105    /// Returns an error if the record cannot be written.
106    pub async fn log(&self, record: &WalRecord) -> Result<()> {
107        self.ensure_active_log().await?;
108
109        let mut guard = self.active_log.lock().await;
110        let log_file = guard
111            .as_mut()
112            .ok_or_else(|| Error::Internal("WAL writer not available".to_string()))?;
113
114        // Serialize the record
115        let data = bincode::serde::encode_to_vec(record, bincode::config::standard())
116            .map_err(|e| Error::Serialization(e.to_string()))?;
117
118        // Write length prefix
119        let len = data.len() as u32;
120        log_file.writer.write_all(&len.to_le_bytes()).await?;
121
122        // Write data
123        log_file.writer.write_all(&data).await?;
124
125        // Write checksum
126        let checksum = crc32fast::hash(&data);
127        log_file.writer.write_all(&checksum.to_le_bytes()).await?;
128
129        // Update size tracking
130        let record_size = 4 + data.len() as u64 + 4; // length + data + checksum
131        log_file.size += record_size;
132
133        self.total_record_count.fetch_add(1, Ordering::Relaxed);
134        self.records_since_sync.fetch_add(1, Ordering::Relaxed);
135
136        // Check if we need to rotate
137        let needs_rotation = log_file.size >= self.config.max_log_size;
138
139        // Handle durability mode
140        match &self.config.durability {
141            DurabilityMode::Sync => {
142                // Sync on every commit record
143                if matches!(record, WalRecord::TransactionCommit { .. }) {
144                    log_file.writer.flush().await?;
145                    log_file.writer.get_ref().sync_all().await?;
146                    self.records_since_sync.store(0, Ordering::Relaxed);
147                    *self.last_sync.lock().await = Instant::now();
148                }
149            }
150            DurabilityMode::Batch {
151                max_delay_ms,
152                max_records,
153            } => {
154                let records = self.records_since_sync.load(Ordering::Relaxed);
155                let elapsed = self.last_sync.lock().await.elapsed();
156
157                if records >= *max_records || elapsed >= Duration::from_millis(*max_delay_ms) {
158                    log_file.writer.flush().await?;
159                    log_file.writer.get_ref().sync_all().await?;
160                    self.records_since_sync.store(0, Ordering::Relaxed);
161                    *self.last_sync.lock().await = Instant::now();
162                }
163            }
164            DurabilityMode::Adaptive { .. } => {
165                // Adaptive mode: just flush buffer, background thread handles sync
166                // Note: For async, consider using a tokio task-based flusher
167                log_file.writer.flush().await?;
168            }
169            DurabilityMode::NoSync => {
170                // Just flush buffer, no sync
171                log_file.writer.flush().await?;
172            }
173        }
174
175        drop(guard);
176
177        // Rotate if needed
178        if needs_rotation {
179            self.rotate().await?;
180        }
181
182        Ok(())
183    }
184
185    /// Writes a checkpoint marker and returns the checkpoint info.
186    ///
187    /// # Errors
188    ///
189    /// Returns an error if the checkpoint cannot be written.
190    pub async fn checkpoint(
191        &self,
192        current_transaction: TransactionId,
193        epoch: EpochId,
194    ) -> Result<()> {
195        // Write checkpoint record
196        self.log(&WalRecord::Checkpoint {
197            transaction_id: current_transaction,
198        })
199        .await?;
200
201        // Force sync on checkpoint
202        self.sync().await?;
203
204        // Update checkpoint epoch
205        *self.checkpoint_epoch.lock().await = Some(epoch);
206
207        // Optionally truncate old logs
208        self.truncate_old_logs().await?;
209
210        Ok(())
211    }
212
213    /// Rotates to a new log file.
214    ///
215    /// # Errors
216    ///
217    /// Returns an error if rotation fails.
218    pub async fn rotate(&self) -> Result<()> {
219        let new_sequence = self.current_sequence.fetch_add(1, Ordering::SeqCst) + 1;
220        let new_path = self.log_path(new_sequence);
221
222        let file = OpenOptions::new()
223            .create(true)
224            .read(true)
225            .append(true)
226            .open(&new_path)
227            .await?;
228
229        let new_log = AsyncLogFile {
230            writer: BufWriter::new(file),
231            size: 0,
232        };
233
234        // Replace active log
235        let mut guard = self.active_log.lock().await;
236        if let Some(old_log) = guard.take() {
237            // Ensure old log is flushed
238            drop(old_log);
239        }
240        *guard = Some(new_log);
241
242        Ok(())
243    }
244
245    /// Flushes the WAL buffer to disk.
246    ///
247    /// # Errors
248    ///
249    /// Returns an error if the flush fails.
250    pub async fn flush(&self) -> Result<()> {
251        let mut guard = self.active_log.lock().await;
252        if let Some(log_file) = guard.as_mut() {
253            log_file.writer.flush().await?;
254        }
255        Ok(())
256    }
257
258    /// Syncs the WAL to disk (fsync).
259    ///
260    /// # Errors
261    ///
262    /// Returns an error if the sync fails.
263    pub async fn sync(&self) -> Result<()> {
264        let mut guard = self.active_log.lock().await;
265        if let Some(log_file) = guard.as_mut() {
266            log_file.writer.flush().await?;
267            log_file.writer.get_ref().sync_all().await?;
268        }
269        self.records_since_sync.store(0, Ordering::Relaxed);
270        *self.last_sync.lock().await = Instant::now();
271        Ok(())
272    }
273
274    /// Starts a background sync task for batch durability mode.
275    ///
276    /// The task will periodically sync the WAL based on the batch configuration.
277    /// This is useful when you want automatic syncing without waiting for
278    /// individual log calls to trigger it.
279    ///
280    /// # Returns
281    ///
282    /// Returns `true` if a new background task was started, `false` if batch
283    /// mode is not configured or a task is already running.
284    pub async fn start_background_sync(&self) -> bool {
285        let DurabilityMode::Batch { max_delay_ms, .. } = self.config.durability else {
286            return false;
287        };
288
289        let mut handle_guard = self.background_sync_handle.lock().await;
290        if handle_guard.is_some() {
291            return false;
292        }
293
294        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
295        *self.shutdown_tx.lock().await = Some(shutdown_tx);
296
297        // We need to use a weak pattern here since we can't hold Arc<Self>
298        // Instead, we'll create a simple interval-based task
299        let interval = Duration::from_millis(max_delay_ms);
300
301        let handle = tokio::spawn(async move {
302            let mut ticker = tokio::time::interval(interval);
303            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
304
305            loop {
306                tokio::select! {
307                    _ = ticker.tick() => {
308                        // The actual sync is done by the log() method when
309                        // it checks elapsed time. This task just ensures
310                        // we have periodic wake-ups.
311                    }
312                    _ = &mut shutdown_rx => {
313                        break;
314                    }
315                }
316            }
317        });
318
319        *handle_guard = Some(handle);
320        true
321    }
322
323    /// Stops the background sync task if running.
324    pub async fn stop_background_sync(&self) {
325        if let Some(tx) = self.shutdown_tx.lock().await.take() {
326            let _ = tx.send(());
327        }
328        if let Some(handle) = self.background_sync_handle.lock().await.take() {
329            let _ = handle.await;
330        }
331    }
332
333    /// Returns the total number of records written.
334    #[must_use]
335    pub fn record_count(&self) -> u64 {
336        self.total_record_count.load(Ordering::Relaxed)
337    }
338
339    /// Returns the WAL directory path.
340    #[must_use]
341    pub fn dir(&self) -> &Path {
342        &self.dir
343    }
344
345    /// Returns the current durability mode.
346    #[must_use]
347    pub fn durability_mode(&self) -> DurabilityMode {
348        self.config.durability
349    }
350
351    /// Returns all WAL log file paths in sequence order.
352    pub async fn log_files(&self) -> Result<Vec<PathBuf>> {
353        let mut files = Vec::new();
354
355        let mut entries = fs::read_dir(&self.dir).await?;
356        while let Some(entry) = entries.next_entry().await? {
357            let path = entry.path();
358            if path.extension().is_some_and(|ext| ext == "log") {
359                files.push(path);
360            }
361        }
362
363        // Sort by sequence number
364        files.sort_by(|a, b| {
365            let seq_a = Self::sequence_from_path(a).unwrap_or(0);
366            let seq_b = Self::sequence_from_path(b).unwrap_or(0);
367            seq_a.cmp(&seq_b)
368        });
369
370        Ok(files)
371    }
372
373    /// Returns the latest checkpoint epoch, if any.
374    pub async fn checkpoint_epoch(&self) -> Option<EpochId> {
375        *self.checkpoint_epoch.lock().await
376    }
377
378    // === Private methods ===
379
380    async fn ensure_active_log(&self) -> Result<()> {
381        let mut guard = self.active_log.lock().await;
382        if guard.is_none() {
383            let sequence = self.current_sequence.load(Ordering::Relaxed);
384            let path = self.log_path(sequence);
385
386            let file = OpenOptions::new()
387                .create(true)
388                .read(true)
389                .append(true)
390                .open(&path)
391                .await?;
392
393            let size = file.metadata().await?.len();
394
395            *guard = Some(AsyncLogFile {
396                writer: BufWriter::new(file),
397                size,
398            });
399        }
400        Ok(())
401    }
402
403    fn log_path(&self, sequence: u64) -> PathBuf {
404        self.dir.join(format!("wal_{sequence:08}.log"))
405    }
406
407    fn sequence_from_path(path: &Path) -> Option<u64> {
408        path.file_stem()
409            .and_then(|s| s.to_str())
410            .and_then(|s| s.strip_prefix("wal_"))
411            .and_then(|s| s.parse().ok())
412    }
413
414    async fn truncate_old_logs(&self) -> Result<()> {
415        let Some(checkpoint) = *self.checkpoint_epoch.lock().await else {
416            return Ok(());
417        };
418
419        // Keep logs that might still be needed
420        // For now, keep the two most recent logs after checkpoint
421        let files = self.log_files().await?;
422        let current_seq = self.current_sequence.load(Ordering::Relaxed);
423
424        for file in files {
425            if let Some(seq) = Self::sequence_from_path(&file) {
426                // Keep the last 2 log files before current
427                if seq + 2 < current_seq && checkpoint.as_u64() > seq {
428                    let _ = fs::remove_file(&file).await;
429                }
430            }
431        }
432
433        Ok(())
434    }
435}
436
437impl Drop for AsyncWalManager {
438    fn drop(&mut self) {
439        // Best-effort cleanup - background tasks will be cancelled
440        // when their handles are dropped
441    }
442}
443
444#[cfg(test)]
445mod tests {
446    use super::*;
447    use grafeo_common::types::NodeId;
448    use tempfile::tempdir;
449
450    #[tokio::test]
451    async fn test_async_wal_write() {
452        let dir = tempdir().unwrap();
453
454        let wal = AsyncWalManager::open(dir.path()).await.unwrap();
455
456        let record = WalRecord::CreateNode {
457            id: NodeId::new(1),
458            labels: vec!["Person".to_string()],
459        };
460
461        wal.log(&record).await.unwrap();
462        wal.flush().await.unwrap();
463
464        assert_eq!(wal.record_count(), 1);
465    }
466
467    #[tokio::test]
468    async fn test_async_wal_rotation() {
469        let dir = tempdir().unwrap();
470
471        // Small max size to force rotation
472        let config = WalConfig {
473            max_log_size: 100,
474            ..Default::default()
475        };
476
477        let wal = AsyncWalManager::with_config(dir.path(), config)
478            .await
479            .unwrap();
480
481        // Write enough records to trigger rotation
482        for i in 0..10 {
483            let record = WalRecord::CreateNode {
484                id: NodeId::new(i),
485                labels: vec!["Person".to_string()],
486            };
487            wal.log(&record).await.unwrap();
488        }
489
490        wal.flush().await.unwrap();
491
492        // Should have multiple log files
493        let files = wal.log_files().await.unwrap();
494        assert!(
495            files.len() > 1,
496            "Expected multiple log files after rotation"
497        );
498    }
499
500    #[tokio::test]
501    async fn test_async_durability_modes() {
502        let dir = tempdir().unwrap();
503
504        // Test Sync mode
505        let config = WalConfig {
506            durability: DurabilityMode::Sync,
507            ..Default::default()
508        };
509        let wal = AsyncWalManager::with_config(dir.path().join("sync"), config)
510            .await
511            .unwrap();
512        wal.log(&WalRecord::TransactionCommit {
513            transaction_id: TransactionId::new(1),
514        })
515        .await
516        .unwrap();
517
518        // Test NoSync mode
519        let config = WalConfig {
520            durability: DurabilityMode::NoSync,
521            ..Default::default()
522        };
523        let wal = AsyncWalManager::with_config(dir.path().join("nosync"), config)
524            .await
525            .unwrap();
526        wal.log(&WalRecord::CreateNode {
527            id: NodeId::new(1),
528            labels: vec![],
529        })
530        .await
531        .unwrap();
532
533        // Test Batch mode
534        let config = WalConfig {
535            durability: DurabilityMode::Batch {
536                max_delay_ms: 10,
537                max_records: 5,
538            },
539            ..Default::default()
540        };
541        let wal = AsyncWalManager::with_config(dir.path().join("batch"), config)
542            .await
543            .unwrap();
544        for i in 0..10 {
545            wal.log(&WalRecord::CreateNode {
546                id: NodeId::new(i),
547                labels: vec![],
548            })
549            .await
550            .unwrap();
551        }
552    }
553
554    #[tokio::test]
555    async fn test_async_checkpoint() {
556        let dir = tempdir().unwrap();
557
558        let wal = AsyncWalManager::open(dir.path()).await.unwrap();
559
560        // Write some records
561        wal.log(&WalRecord::CreateNode {
562            id: NodeId::new(1),
563            labels: vec!["Test".to_string()],
564        })
565        .await
566        .unwrap();
567
568        wal.log(&WalRecord::TransactionCommit {
569            transaction_id: TransactionId::new(1),
570        })
571        .await
572        .unwrap();
573
574        // Create checkpoint
575        wal.checkpoint(TransactionId::new(1), EpochId::new(10))
576            .await
577            .unwrap();
578
579        assert_eq!(wal.checkpoint_epoch().await, Some(EpochId::new(10)));
580    }
581
582    #[tokio::test]
583    async fn test_background_sync() {
584        let dir = tempdir().unwrap();
585
586        let config = WalConfig {
587            durability: DurabilityMode::Batch {
588                max_delay_ms: 50,
589                max_records: 1000,
590            },
591            ..Default::default()
592        };
593
594        let wal = AsyncWalManager::with_config(dir.path(), config)
595            .await
596            .unwrap();
597
598        // Should start successfully
599        assert!(wal.start_background_sync().await);
600
601        // Should not start again
602        assert!(!wal.start_background_sync().await);
603
604        // Write a record
605        wal.log(&WalRecord::CreateNode {
606            id: NodeId::new(1),
607            labels: vec![],
608        })
609        .await
610        .unwrap();
611
612        // Wait a bit for potential background sync
613        tokio::time::sleep(Duration::from_millis(100)).await;
614
615        // Stop background sync
616        wal.stop_background_sync().await;
617    }
618}