Skip to main content

graphos_adapters/storage/wal/
async_log.rs

1//! Async WAL implementation using tokio for non-blocking I/O.
2
3use super::{DurabilityMode, WalConfig, WalRecord};
4use graphos_common::types::{EpochId, TxId};
5use graphos_common::utils::error::{Error, Result};
6use std::path::{Path, PathBuf};
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::{Duration, Instant};
9use tokio::fs::{self, File, OpenOptions};
10use tokio::io::{AsyncWriteExt, BufWriter};
11use tokio::sync::Mutex;
12use tokio::task::JoinHandle;
13
14/// State for a single async log file.
15struct AsyncLogFile {
16    /// Async file handle with buffering.
17    writer: BufWriter<File>,
18    /// Current size in bytes.
19    size: u64,
20    /// File path.
21    #[allow(dead_code)]
22    path: PathBuf,
23    /// Sequence number (for log file ordering during recovery).
24    #[allow(dead_code)]
25    sequence: u64,
26}
27
28/// Async Write-Ahead Log manager with non-blocking I/O.
29///
30/// This manager provides the same durability guarantees as the sync version
31/// but uses tokio's async I/O for better throughput in async contexts.
32pub struct AsyncWalManager {
33    /// Directory for WAL files.
34    dir: PathBuf,
35    /// Configuration.
36    config: WalConfig,
37    /// Active log file (async mutex for async access).
38    active_log: Mutex<Option<AsyncLogFile>>,
39    /// Total number of records written across all log files.
40    total_record_count: AtomicU64,
41    /// Records since last sync (for batch mode).
42    records_since_sync: AtomicU64,
43    /// Time of last sync (for batch mode).
44    last_sync: Mutex<Instant>,
45    /// Current log sequence number.
46    current_sequence: AtomicU64,
47    /// Latest checkpoint epoch.
48    checkpoint_epoch: Mutex<Option<EpochId>>,
49    /// Background sync task handle (for batch mode).
50    background_sync_handle: Mutex<Option<JoinHandle<()>>>,
51    /// Shutdown signal sender.
52    shutdown_tx: Mutex<Option<tokio::sync::oneshot::Sender<()>>>,
53}
54
55impl AsyncWalManager {
56    /// Opens or creates an async WAL in the given directory.
57    ///
58    /// # Errors
59    ///
60    /// Returns an error if the directory cannot be created or accessed.
61    pub async fn open(dir: impl AsRef<Path>) -> Result<Self> {
62        Self::with_config(dir, WalConfig::default()).await
63    }
64
65    /// Opens or creates an async WAL with custom configuration.
66    ///
67    /// # Errors
68    ///
69    /// Returns an error if the directory cannot be created or accessed.
70    pub async fn with_config(dir: impl AsRef<Path>, config: WalConfig) -> Result<Self> {
71        let dir = dir.as_ref().to_path_buf();
72        fs::create_dir_all(&dir).await?;
73
74        // Find the highest existing sequence number
75        let mut max_sequence = 0u64;
76        let mut entries = fs::read_dir(&dir).await?;
77        while let Some(entry) = entries.next_entry().await? {
78            if let Some(name) = entry.file_name().to_str() {
79                if let Some(seq_str) = name
80                    .strip_prefix("wal_")
81                    .and_then(|s| s.strip_suffix(".log"))
82                {
83                    if let Ok(seq) = seq_str.parse::<u64>() {
84                        max_sequence = max_sequence.max(seq);
85                    }
86                }
87            }
88        }
89
90        let manager = Self {
91            dir,
92            config,
93            active_log: Mutex::new(None),
94            total_record_count: AtomicU64::new(0),
95            records_since_sync: AtomicU64::new(0),
96            last_sync: Mutex::new(Instant::now()),
97            current_sequence: AtomicU64::new(max_sequence),
98            checkpoint_epoch: Mutex::new(None),
99            background_sync_handle: Mutex::new(None),
100            shutdown_tx: Mutex::new(None),
101        };
102
103        // Open or create the active log
104        manager.ensure_active_log().await?;
105
106        Ok(manager)
107    }
108
109    /// Logs a record to the WAL asynchronously.
110    ///
111    /// # Errors
112    ///
113    /// Returns an error if the record cannot be written.
114    pub async fn log(&self, record: &WalRecord) -> Result<()> {
115        self.ensure_active_log().await?;
116
117        let mut guard = self.active_log.lock().await;
118        let log_file = guard
119            .as_mut()
120            .ok_or_else(|| Error::Internal("WAL writer not available".to_string()))?;
121
122        // Serialize the record
123        let data = bincode::serde::encode_to_vec(record, bincode::config::standard())
124            .map_err(|e| Error::Serialization(e.to_string()))?;
125
126        // Write length prefix
127        let len = data.len() as u32;
128        log_file.writer.write_all(&len.to_le_bytes()).await?;
129
130        // Write data
131        log_file.writer.write_all(&data).await?;
132
133        // Write checksum
134        let checksum = crc32fast::hash(&data);
135        log_file.writer.write_all(&checksum.to_le_bytes()).await?;
136
137        // Update size tracking
138        let record_size = 4 + data.len() as u64 + 4; // length + data + checksum
139        log_file.size += record_size;
140
141        self.total_record_count.fetch_add(1, Ordering::Relaxed);
142        self.records_since_sync.fetch_add(1, Ordering::Relaxed);
143
144        // Check if we need to rotate
145        let needs_rotation = log_file.size >= self.config.max_log_size;
146
147        // Handle durability mode
148        match &self.config.durability {
149            DurabilityMode::Sync => {
150                // Sync on every commit record
151                if matches!(record, WalRecord::TxCommit { .. }) {
152                    log_file.writer.flush().await?;
153                    log_file.writer.get_ref().sync_all().await?;
154                    self.records_since_sync.store(0, Ordering::Relaxed);
155                    *self.last_sync.lock().await = Instant::now();
156                }
157            }
158            DurabilityMode::Batch {
159                max_delay_ms,
160                max_records,
161            } => {
162                let records = self.records_since_sync.load(Ordering::Relaxed);
163                let elapsed = self.last_sync.lock().await.elapsed();
164
165                if records >= *max_records || elapsed >= Duration::from_millis(*max_delay_ms) {
166                    log_file.writer.flush().await?;
167                    log_file.writer.get_ref().sync_all().await?;
168                    self.records_since_sync.store(0, Ordering::Relaxed);
169                    *self.last_sync.lock().await = Instant::now();
170                }
171            }
172            DurabilityMode::NoSync => {
173                // Just flush buffer, no sync
174                log_file.writer.flush().await?;
175            }
176        }
177
178        drop(guard);
179
180        // Rotate if needed
181        if needs_rotation {
182            self.rotate().await?;
183        }
184
185        Ok(())
186    }
187
188    /// Writes a checkpoint marker and returns the checkpoint info.
189    ///
190    /// # Errors
191    ///
192    /// Returns an error if the checkpoint cannot be written.
193    pub async fn checkpoint(&self, current_tx: TxId, epoch: EpochId) -> Result<()> {
194        // Write checkpoint record
195        self.log(&WalRecord::Checkpoint { tx_id: current_tx })
196            .await?;
197
198        // Force sync on checkpoint
199        self.sync().await?;
200
201        // Update checkpoint epoch
202        *self.checkpoint_epoch.lock().await = Some(epoch);
203
204        // Optionally truncate old logs
205        self.truncate_old_logs().await?;
206
207        Ok(())
208    }
209
210    /// Rotates to a new log file.
211    ///
212    /// # Errors
213    ///
214    /// Returns an error if rotation fails.
215    pub async fn rotate(&self) -> Result<()> {
216        let new_sequence = self.current_sequence.fetch_add(1, Ordering::SeqCst) + 1;
217        let new_path = self.log_path(new_sequence);
218
219        let file = OpenOptions::new()
220            .create(true)
221            .read(true)
222            .append(true)
223            .open(&new_path)
224            .await?;
225
226        let new_log = AsyncLogFile {
227            writer: BufWriter::new(file),
228            size: 0,
229            path: new_path,
230            sequence: new_sequence,
231        };
232
233        // Replace active log
234        let mut guard = self.active_log.lock().await;
235        if let Some(old_log) = guard.take() {
236            // Ensure old log is flushed
237            drop(old_log);
238        }
239        *guard = Some(new_log);
240
241        Ok(())
242    }
243
244    /// Flushes the WAL buffer to disk.
245    ///
246    /// # Errors
247    ///
248    /// Returns an error if the flush fails.
249    pub async fn flush(&self) -> Result<()> {
250        let mut guard = self.active_log.lock().await;
251        if let Some(log_file) = guard.as_mut() {
252            log_file.writer.flush().await?;
253        }
254        Ok(())
255    }
256
257    /// Syncs the WAL to disk (fsync).
258    ///
259    /// # Errors
260    ///
261    /// Returns an error if the sync fails.
262    pub async fn sync(&self) -> Result<()> {
263        let mut guard = self.active_log.lock().await;
264        if let Some(log_file) = guard.as_mut() {
265            log_file.writer.flush().await?;
266            log_file.writer.get_ref().sync_all().await?;
267        }
268        self.records_since_sync.store(0, Ordering::Relaxed);
269        *self.last_sync.lock().await = Instant::now();
270        Ok(())
271    }
272
273    /// Starts a background sync task for batch durability mode.
274    ///
275    /// The task will periodically sync the WAL based on the batch configuration.
276    /// This is useful when you want automatic syncing without waiting for
277    /// individual log calls to trigger it.
278    ///
279    /// # Returns
280    ///
281    /// Returns `true` if a new background task was started, `false` if batch
282    /// mode is not configured or a task is already running.
283    pub async fn start_background_sync(&self) -> bool {
284        let DurabilityMode::Batch { max_delay_ms, .. } = self.config.durability else {
285            return false;
286        };
287
288        let mut handle_guard = self.background_sync_handle.lock().await;
289        if handle_guard.is_some() {
290            return false;
291        }
292
293        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
294        *self.shutdown_tx.lock().await = Some(shutdown_tx);
295
296        // We need to use a weak pattern here since we can't hold Arc<Self>
297        // Instead, we'll create a simple interval-based task
298        let interval = Duration::from_millis(max_delay_ms);
299
300        let handle = tokio::spawn(async move {
301            let mut ticker = tokio::time::interval(interval);
302            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
303
304            loop {
305                tokio::select! {
306                    _ = ticker.tick() => {
307                        // The actual sync is done by the log() method when
308                        // it checks elapsed time. This task just ensures
309                        // we have periodic wake-ups.
310                    }
311                    _ = &mut shutdown_rx => {
312                        break;
313                    }
314                }
315            }
316        });
317
318        *handle_guard = Some(handle);
319        true
320    }
321
322    /// Stops the background sync task if running.
323    pub async fn stop_background_sync(&self) {
324        if let Some(tx) = self.shutdown_tx.lock().await.take() {
325            let _ = tx.send(());
326        }
327        if let Some(handle) = self.background_sync_handle.lock().await.take() {
328            let _ = handle.await;
329        }
330    }
331
332    /// Returns the total number of records written.
333    #[must_use]
334    pub fn record_count(&self) -> u64 {
335        self.total_record_count.load(Ordering::Relaxed)
336    }
337
338    /// Returns the WAL directory path.
339    #[must_use]
340    pub fn dir(&self) -> &Path {
341        &self.dir
342    }
343
344    /// Returns the current durability mode.
345    #[must_use]
346    pub fn durability_mode(&self) -> DurabilityMode {
347        self.config.durability
348    }
349
350    /// Returns all WAL log file paths in sequence order.
351    pub async fn log_files(&self) -> Result<Vec<PathBuf>> {
352        let mut files = Vec::new();
353
354        let mut entries = fs::read_dir(&self.dir).await?;
355        while let Some(entry) = entries.next_entry().await? {
356            let path = entry.path();
357            if path.extension().is_some_and(|ext| ext == "log") {
358                files.push(path);
359            }
360        }
361
362        // Sort by sequence number
363        files.sort_by(|a, b| {
364            let seq_a = Self::sequence_from_path(a).unwrap_or(0);
365            let seq_b = Self::sequence_from_path(b).unwrap_or(0);
366            seq_a.cmp(&seq_b)
367        });
368
369        Ok(files)
370    }
371
372    /// Returns the latest checkpoint epoch, if any.
373    pub async fn checkpoint_epoch(&self) -> Option<EpochId> {
374        *self.checkpoint_epoch.lock().await
375    }
376
377    // === Private methods ===
378
379    async fn ensure_active_log(&self) -> Result<()> {
380        let mut guard = self.active_log.lock().await;
381        if guard.is_none() {
382            let sequence = self.current_sequence.load(Ordering::Relaxed);
383            let path = self.log_path(sequence);
384
385            let file = OpenOptions::new()
386                .create(true)
387                .read(true)
388                .append(true)
389                .open(&path)
390                .await?;
391
392            let size = file.metadata().await?.len();
393
394            *guard = Some(AsyncLogFile {
395                writer: BufWriter::new(file),
396                size,
397                path,
398                sequence,
399            });
400        }
401        Ok(())
402    }
403
404    fn log_path(&self, sequence: u64) -> PathBuf {
405        self.dir.join(format!("wal_{sequence:08}.log"))
406    }
407
408    fn sequence_from_path(path: &Path) -> Option<u64> {
409        path.file_stem()
410            .and_then(|s| s.to_str())
411            .and_then(|s| s.strip_prefix("wal_"))
412            .and_then(|s| s.parse().ok())
413    }
414
415    async fn truncate_old_logs(&self) -> Result<()> {
416        let checkpoint = match *self.checkpoint_epoch.lock().await {
417            Some(e) => e,
418            None => return Ok(()),
419        };
420
421        // Keep logs that might still be needed
422        // For now, keep the two most recent logs after checkpoint
423        let files = self.log_files().await?;
424        let current_seq = self.current_sequence.load(Ordering::Relaxed);
425
426        for file in files {
427            if let Some(seq) = Self::sequence_from_path(&file) {
428                // Keep the last 2 log files before current
429                if seq + 2 < current_seq && checkpoint.as_u64() > seq {
430                    let _ = fs::remove_file(&file).await;
431                }
432            }
433        }
434
435        Ok(())
436    }
437}
438
439impl Drop for AsyncWalManager {
440    fn drop(&mut self) {
441        // Best-effort cleanup - background tasks will be cancelled
442        // when their handles are dropped
443    }
444}
445
446#[cfg(test)]
447mod tests {
448    use super::*;
449    use graphos_common::types::NodeId;
450    use tempfile::tempdir;
451
452    #[tokio::test]
453    async fn test_async_wal_write() {
454        let dir = tempdir().unwrap();
455
456        let wal = AsyncWalManager::open(dir.path()).await.unwrap();
457
458        let record = WalRecord::CreateNode {
459            id: NodeId::new(1),
460            labels: vec!["Person".to_string()],
461        };
462
463        wal.log(&record).await.unwrap();
464        wal.flush().await.unwrap();
465
466        assert_eq!(wal.record_count(), 1);
467    }
468
469    #[tokio::test]
470    async fn test_async_wal_rotation() {
471        let dir = tempdir().unwrap();
472
473        // Small max size to force rotation
474        let config = WalConfig {
475            max_log_size: 100,
476            ..Default::default()
477        };
478
479        let wal = AsyncWalManager::with_config(dir.path(), config)
480            .await
481            .unwrap();
482
483        // Write enough records to trigger rotation
484        for i in 0..10 {
485            let record = WalRecord::CreateNode {
486                id: NodeId::new(i),
487                labels: vec!["Person".to_string()],
488            };
489            wal.log(&record).await.unwrap();
490        }
491
492        wal.flush().await.unwrap();
493
494        // Should have multiple log files
495        let files = wal.log_files().await.unwrap();
496        assert!(
497            files.len() > 1,
498            "Expected multiple log files after rotation"
499        );
500    }
501
502    #[tokio::test]
503    async fn test_async_durability_modes() {
504        let dir = tempdir().unwrap();
505
506        // Test Sync mode
507        let config = WalConfig {
508            durability: DurabilityMode::Sync,
509            ..Default::default()
510        };
511        let wal = AsyncWalManager::with_config(dir.path().join("sync"), config)
512            .await
513            .unwrap();
514        wal.log(&WalRecord::TxCommit {
515            tx_id: TxId::new(1),
516        })
517        .await
518        .unwrap();
519
520        // Test NoSync mode
521        let config = WalConfig {
522            durability: DurabilityMode::NoSync,
523            ..Default::default()
524        };
525        let wal = AsyncWalManager::with_config(dir.path().join("nosync"), config)
526            .await
527            .unwrap();
528        wal.log(&WalRecord::CreateNode {
529            id: NodeId::new(1),
530            labels: vec![],
531        })
532        .await
533        .unwrap();
534
535        // Test Batch mode
536        let config = WalConfig {
537            durability: DurabilityMode::Batch {
538                max_delay_ms: 10,
539                max_records: 5,
540            },
541            ..Default::default()
542        };
543        let wal = AsyncWalManager::with_config(dir.path().join("batch"), config)
544            .await
545            .unwrap();
546        for i in 0..10 {
547            wal.log(&WalRecord::CreateNode {
548                id: NodeId::new(i),
549                labels: vec![],
550            })
551            .await
552            .unwrap();
553        }
554    }
555
556    #[tokio::test]
557    async fn test_async_checkpoint() {
558        let dir = tempdir().unwrap();
559
560        let wal = AsyncWalManager::open(dir.path()).await.unwrap();
561
562        // Write some records
563        wal.log(&WalRecord::CreateNode {
564            id: NodeId::new(1),
565            labels: vec!["Test".to_string()],
566        })
567        .await
568        .unwrap();
569
570        wal.log(&WalRecord::TxCommit {
571            tx_id: TxId::new(1),
572        })
573        .await
574        .unwrap();
575
576        // Create checkpoint
577        wal.checkpoint(TxId::new(1), EpochId::new(10))
578            .await
579            .unwrap();
580
581        assert_eq!(wal.checkpoint_epoch().await, Some(EpochId::new(10)));
582    }
583
584    #[tokio::test]
585    async fn test_background_sync() {
586        let dir = tempdir().unwrap();
587
588        let config = WalConfig {
589            durability: DurabilityMode::Batch {
590                max_delay_ms: 50,
591                max_records: 1000,
592            },
593            ..Default::default()
594        };
595
596        let wal = AsyncWalManager::with_config(dir.path(), config)
597            .await
598            .unwrap();
599
600        // Should start successfully
601        assert!(wal.start_background_sync().await);
602
603        // Should not start again
604        assert!(!wal.start_background_sync().await);
605
606        // Write a record
607        wal.log(&WalRecord::CreateNode {
608            id: NodeId::new(1),
609            labels: vec![],
610        })
611        .await
612        .unwrap();
613
614        // Wait a bit for potential background sync
615        tokio::time::sleep(Duration::from_millis(100)).await;
616
617        // Stop background sync
618        wal.stop_background_sync().await;
619    }
620}