sync_engine/resilience/
wal.rs

1//! Write-Ahead Log (WAL) for L3 durability during MySQL outages.
2//!
3//! When MySQL is unavailable (maintenance, network issues, etc.), items
4//! are written to a local SQLite database. A background task drains
5//! the WAL to MySQL when connectivity is restored.
6//!
7//! This is NOT a tier - it's a durability buffer. Items in the WAL
8//! are "in flight" to MySQL, not a permanent storage location.
9
10use crate::storage::sql::SqlStore;
11use crate::storage::traits::{ArchiveStore, StorageError};
12use crate::sync_item::SyncItem;
13use std::path::Path;
14use std::sync::atomic::{AtomicU64, AtomicBool, Ordering};
15use tokio::sync::Mutex;
16use tracing::{debug, error, info, warn};
17
18/// WAL state for observability
19#[derive(Debug, Clone, Copy)]
20pub struct WalStats {
21    /// Number of items currently in WAL
22    pub pending_items: u64,
23    /// Total items written to WAL since startup
24    pub total_written: u64,
25    /// Total items drained to MySQL since startup
26    pub total_drained: u64,
27    /// Whether drain is currently in progress
28    pub draining: bool,
29    /// Whether MySQL is currently reachable
30    pub mysql_healthy: bool,
31}
32
33/// Write-ahead log backed by SQLite.
34pub struct WriteAheadLog {
35    /// Local SQLite store
36    store: SqlStore,
37    /// Path to SQLite file (for display)
38    path: String,
39    /// Items pending drain
40    pending_count: AtomicU64,
41    /// Total written since startup
42    total_written: AtomicU64,
43    /// Total drained since startup
44    total_drained: AtomicU64,
45    /// Whether currently draining
46    draining: AtomicBool,
47    /// Max items before backpressure
48    max_items: u64,
49    /// Max file size in bytes before backpressure (default 100MB)
50    max_bytes: u64,
51}
52
53impl WriteAheadLog {
54    /// Default max WAL size: 100MB
55    const DEFAULT_MAX_BYTES: u64 = 100 * 1024 * 1024;
56
57    /// Create a new WAL at the given path.
58    pub async fn new(path: impl AsRef<Path>, max_items: u64) -> Result<Self, StorageError> {
59        Self::with_max_bytes(path, max_items, Self::DEFAULT_MAX_BYTES).await
60    }
61
62    /// Create a new WAL with custom max size limit.
63    pub async fn with_max_bytes(
64        path: impl AsRef<Path>,
65        max_items: u64,
66        max_bytes: u64,
67    ) -> Result<Self, StorageError> {
68        let path_str = path.as_ref().to_string_lossy().to_string();
69        let url = format!("sqlite://{}?mode=rwc", path_str);
70        
71        info!(path = %path_str, max_items, max_bytes, "Initializing write-ahead log");
72        
73        let store = SqlStore::new(&url).await?;
74        
75        // Count existing items (from previous run)
76        let pending = store.count_all().await.unwrap_or(0);
77        if pending > 0 {
78            warn!(pending, "WAL has items from previous run, will drain");
79        }
80        
81        Ok(Self {
82            store,
83            path: path_str,
84            pending_count: AtomicU64::new(pending),
85            total_written: AtomicU64::new(0),
86            total_drained: AtomicU64::new(0),
87            draining: AtomicBool::new(false),
88            max_items,
89            max_bytes,
90        })
91    }
92
93    /// Write an item to the WAL.
94    pub async fn write(&self, item: &SyncItem) -> Result<(), StorageError> {
95        // Check item count limit
96        let pending = self.pending_count.load(Ordering::Acquire);
97        if pending >= self.max_items {
98            return Err(StorageError::Backend(format!(
99                "WAL full: {} items (max {})",
100                pending, self.max_items
101            )));
102        }
103        
104        // Check file size limit (periodic check - every 100 writes)
105        if pending % 100 == 0 {
106            if let Ok(size) = self.file_size_bytes() {
107                if size >= self.max_bytes {
108                    return Err(StorageError::Backend(format!(
109                        "WAL file too large: {} bytes (max {})",
110                        size, self.max_bytes
111                    )));
112                }
113            }
114        }
115
116        self.store.put(item).await?;
117        self.pending_count.fetch_add(1, Ordering::Release);
118        self.total_written.fetch_add(1, Ordering::Relaxed);
119        
120        debug!(
121            id = %item.object_id,
122            pending = pending + 1,
123            "Item written to WAL"
124        );
125        
126        Ok(())
127    }
128
129    /// Get the WAL file size in bytes.
130    pub fn file_size_bytes(&self) -> std::io::Result<u64> {
131        std::fs::metadata(&self.path).map(|m| m.len())
132    }
133
134    /// Run a WAL checkpoint to reclaim disk space.
135    /// Call this after draining to prevent unbounded file growth.
136    pub async fn checkpoint(&self) -> Result<(), StorageError> {
137        // Run PRAGMA wal_checkpoint(TRUNCATE) to reclaim space
138        sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)")
139            .execute(&self.store.pool())
140            .await
141            .map_err(|e| StorageError::Backend(format!("WAL checkpoint failed: {}", e)))?;
142        
143        debug!(path = %self.path, "WAL checkpoint completed");
144        Ok(())
145    }
146
147    /// Check if WAL has items to drain.
148    #[must_use]
149    pub fn has_pending(&self) -> bool {
150        self.pending_count.load(Ordering::Acquire) > 0
151    }
152
153    /// Get current stats.
154    #[must_use]
155    pub fn stats(&self, mysql_healthy: bool) -> WalStats {
156        WalStats {
157            pending_items: self.pending_count.load(Ordering::Acquire),
158            total_written: self.total_written.load(Ordering::Relaxed),
159            total_drained: self.total_drained.load(Ordering::Relaxed),
160            draining: self.draining.load(Ordering::Acquire),
161            mysql_healthy,
162        }
163    }
164
165    /// Check if WAL is under pressure (>= 80% full).
166    #[must_use]
167    pub fn under_pressure(&self) -> bool {
168        let pending = self.pending_count.load(Ordering::Acquire);
169        pending as f64 / self.max_items as f64 >= 0.8
170    }
171
172    /// Drain items from WAL to MySQL using batch operations.
173    ///
174    /// Returns IDs of items successfully drained, or error if MySQL unreachable.
175    pub async fn drain_to(
176        &self,
177        mysql: &dyn ArchiveStore,
178        batch_size: usize,
179    ) -> Result<Vec<String>, StorageError> {
180        if self.draining.swap(true, Ordering::AcqRel) {
181            // Already draining
182            return Ok(Vec::new());
183        }
184
185        let _guard = DrainGuard(&self.draining);
186
187        let pending = self.pending_count.load(Ordering::Acquire);
188        if pending == 0 {
189            return Ok(Vec::new());
190        }
191
192        info!(pending, batch_size, "Starting WAL drain to MySQL");
193
194        // Fetch a batch of items from WAL
195        let items = self.store.scan_batch(batch_size).await?;
196        let batch_len = items.len();
197        
198        if batch_len == 0 {
199            return Ok(Vec::new());
200        }
201
202        // Collect IDs for batch delete and return
203        let ids: Vec<String> = items.iter().map(|i| i.object_id.clone()).collect();
204
205        // Write batch to MySQL
206        let mut items_mut: Vec<_> = items;
207        match mysql.put_batch(&mut items_mut).await {
208            Ok(result) => {
209                if !result.verified {
210                    warn!(
211                        batch_id = %result.batch_id,
212                        written = result.written,
213                        "WAL drain batch verification failed"
214                    );
215                }
216                
217                // Batch delete from WAL - items are now safely in MySQL
218                match self.store.delete_batch(&ids).await {
219                    Ok(deleted) => {
220                        debug!(deleted, "Batch deleted from WAL");
221                    }
222                    Err(e) => {
223                        // Log but don't fail - duplicates are fine, we'll dedup on read
224                        error!(error = %e, "Failed to batch delete from WAL after MySQL write");
225                    }
226                }
227                
228                let drained = result.written;
229                self.pending_count.fetch_sub(drained as u64, Ordering::Release);
230                self.total_drained.fetch_add(drained as u64, Ordering::Relaxed);
231                
232                info!(drained, remaining = pending - drained as u64, "WAL drain batch complete");
233                
234                // Checkpoint to reclaim space if WAL is now empty
235                if self.pending_count.load(Ordering::Acquire) == 0 {
236                    if let Err(e) = self.checkpoint().await {
237                        warn!(error = %e, "Failed to checkpoint WAL after drain");
238                    }
239                }
240                
241                Ok(ids)
242            }
243            Err(e) => {
244                // MySQL failed - stop draining, will retry later
245                warn!(
246                    error = %e,
247                    batch_size = batch_len,
248                    "MySQL batch write failed during drain"
249                );
250                Err(e)
251            }
252        }
253    }
254
255    /// Get the path to the WAL file.
256    pub fn path(&self) -> &str {
257        &self.path
258    }
259}
260
261/// RAII guard to reset draining flag.
262struct DrainGuard<'a>(&'a AtomicBool);
263
264impl Drop for DrainGuard<'_> {
265    fn drop(&mut self) {
266        self.0.store(false, Ordering::Release);
267    }
268}
269
270/// Health checker for MySQL connectivity.
271pub struct MysqlHealthChecker {
272    /// Last known health state
273    healthy: AtomicBool,
274    /// Consecutive failure count
275    failures: AtomicU64,
276    /// Lock for health check (prevent thundering herd)
277    checking: Mutex<()>,
278}
279
280impl MysqlHealthChecker {
281    pub fn new() -> Self {
282        Self {
283            healthy: AtomicBool::new(true), // Assume healthy until proven otherwise
284            failures: AtomicU64::new(0),
285            checking: Mutex::new(()),
286        }
287    }
288
289    /// Record a successful MySQL operation.
290    pub fn record_success(&self) {
291        self.failures.store(0, Ordering::Release);
292        self.healthy.store(true, Ordering::Release);
293    }
294
295    /// Record a failed MySQL operation.
296    pub fn record_failure(&self) {
297        let failures = self.failures.fetch_add(1, Ordering::AcqRel) + 1;
298        if failures >= 3 {
299            self.healthy.store(false, Ordering::Release);
300        }
301    }
302
303    /// Check if MySQL is considered healthy.
304    pub fn is_healthy(&self) -> bool {
305        self.healthy.load(Ordering::Acquire)
306    }
307
308    /// Get consecutive failure count.
309    pub fn failure_count(&self) -> u64 {
310        self.failures.load(Ordering::Acquire)
311    }
312
313    /// Perform a health check (ping).
314    pub async fn check(&self, mysql: &dyn ArchiveStore) -> bool {
315        // Prevent multiple simultaneous checks
316        let _guard = self.checking.lock().await;
317        
318        // Try a simple operation
319        match mysql.get("__health_check__").await {
320            Ok(_) => {
321                self.record_success();
322                true
323            }
324            Err(_) => {
325                self.record_failure();
326                false
327            }
328        }
329    }
330}
331
332impl Default for MysqlHealthChecker {
333    fn default() -> Self {
334        Self::new()
335    }
336}
337
338#[cfg(test)]
339mod tests {
340    use super::*;
341    use crate::sync_item::SyncItem;
342    use serde_json::json;
343    use tempfile::tempdir;
344
345    fn test_item(id: &str) -> SyncItem {
346        SyncItem::from_json(id.to_string(), json!({"test": "data", "id": id}))
347    }
348
349    #[tokio::test]
350    async fn test_wal_write_and_stats() {
351        let dir = tempdir().unwrap();
352        let wal_path = dir.path().join("test.db");
353        
354        let wal = WriteAheadLog::new(wal_path.as_path(), 1000).await.unwrap();
355        
356        // Initial state
357        let stats = wal.stats(true);
358        assert_eq!(stats.pending_items, 0);
359        assert_eq!(stats.total_written, 0);
360        assert!(!wal.has_pending());
361        
362        // Write some items
363        for i in 0..5 {
364            wal.write(&test_item(&format!("item-{}", i))).await.unwrap();
365        }
366        
367        let stats = wal.stats(true);
368        assert_eq!(stats.pending_items, 5);
369        assert_eq!(stats.total_written, 5);
370        assert!(wal.has_pending());
371    }
372
373    #[tokio::test]
374    async fn test_wal_max_items_limit() {
375        let dir = tempdir().unwrap();
376        let wal_path = dir.path().join("test_limit.db");
377        
378        // Very small limit for testing
379        let wal = WriteAheadLog::new(wal_path.as_path(), 3).await.unwrap();
380        
381        // Fill to capacity
382        wal.write(&test_item("item-1")).await.unwrap();
383        wal.write(&test_item("item-2")).await.unwrap();
384        wal.write(&test_item("item-3")).await.unwrap();
385        
386        // This should fail - at capacity
387        let result: Result<(), StorageError> = wal.write(&test_item("item-4")).await;
388        assert!(result.is_err());
389        assert!(result.unwrap_err().to_string().contains("WAL full"));
390    }
391
392    #[tokio::test]
393    async fn test_wal_under_pressure() {
394        let dir = tempdir().unwrap();
395        let wal_path = dir.path().join("test_pressure.db");
396        
397        let wal = WriteAheadLog::new(wal_path.as_path(), 10).await.unwrap();
398        
399        // Not under pressure when empty
400        assert!(!wal.under_pressure());
401        
402        // Fill to 80%
403        for i in 0..8 {
404            wal.write(&test_item(&format!("item-{}", i))).await.unwrap();
405        }
406        
407        // Should be under pressure now (8/10 = 80%)
408        assert!(wal.under_pressure());
409    }
410
411    #[tokio::test]
412    async fn test_wal_persistence_across_restart() {
413        let dir = tempdir().unwrap();
414        let wal_path = dir.path().join("test_restart.db");
415        
416        // Write items in first instance
417        {
418            let wal = WriteAheadLog::new(wal_path.as_path(), 1000).await.unwrap();
419            wal.write(&test_item("persist-1")).await.unwrap();
420            wal.write(&test_item("persist-2")).await.unwrap();
421            wal.write(&test_item("persist-3")).await.unwrap();
422        }
423        
424        // Reopen and verify items persisted
425        {
426            let wal = WriteAheadLog::new(wal_path.as_path(), 1000).await.unwrap();
427            assert_eq!(wal.stats(true).pending_items, 3);
428            assert!(wal.has_pending());
429        }
430    }
431
432    #[tokio::test]
433    async fn test_wal_file_size_check() {
434        let dir = tempdir().unwrap();
435        let wal_path = dir.path().join("test_size.db");
436        
437        let wal = WriteAheadLog::new(wal_path.as_path(), 1000).await.unwrap();
438        
439        // Write some data
440        wal.write(&test_item("size-test")).await.unwrap();
441        
442        // File should exist and have non-zero size
443        let size = wal.file_size_bytes().unwrap();
444        assert!(size > 0);
445    }
446
447    #[tokio::test]
448    async fn test_wal_with_max_bytes_limit() {
449        let dir = tempdir().unwrap();
450        let wal_path = dir.path().join("test_bytes.db");
451        
452        // Very small byte limit (1KB) - will trigger after a few writes
453        let wal = WriteAheadLog::with_max_bytes(wal_path.as_path(), 1000, 1024).await.unwrap();
454        
455        // Write items until we hit the limit
456        // Note: size check only happens every 100 writes, so this tests the item limit path
457        for i in 0..50 {
458            let result: Result<(), StorageError> = wal.write(&test_item(&format!("bytes-item-{}", i))).await;
459            if result.is_err() {
460                // Expected to eventually fail
461                return;
462            }
463        }
464        
465        // If we got here, the byte limit wasn't hit (file too small) - that's OK
466        // The important thing is the code path exists
467    }
468
469    // ========== MysqlHealthChecker tests ==========
470
471    #[test]
472    fn test_health_checker_initial_state() {
473        let checker = MysqlHealthChecker::new();
474        assert!(checker.is_healthy()); // Assume healthy initially
475        assert_eq!(checker.failure_count(), 0);
476    }
477
478    #[test]
479    fn test_health_checker_failure_threshold() {
480        let checker = MysqlHealthChecker::new();
481        
482        // First 2 failures don't mark unhealthy
483        checker.record_failure();
484        assert!(checker.is_healthy());
485        assert_eq!(checker.failure_count(), 1);
486        
487        checker.record_failure();
488        assert!(checker.is_healthy());
489        assert_eq!(checker.failure_count(), 2);
490        
491        // 3rd failure marks unhealthy
492        checker.record_failure();
493        assert!(!checker.is_healthy());
494        assert_eq!(checker.failure_count(), 3);
495    }
496
497    #[test]
498    fn test_health_checker_success_resets() {
499        let checker = MysqlHealthChecker::new();
500        
501        // Mark unhealthy
502        checker.record_failure();
503        checker.record_failure();
504        checker.record_failure();
505        assert!(!checker.is_healthy());
506        
507        // One success resets everything
508        checker.record_success();
509        assert!(checker.is_healthy());
510        assert_eq!(checker.failure_count(), 0);
511    }
512
513    #[test]
514    fn test_health_checker_partial_failures_then_success() {
515        let checker = MysqlHealthChecker::new();
516        
517        // 2 failures, then success
518        checker.record_failure();
519        checker.record_failure();
520        checker.record_success();
521        
522        // Should be healthy and reset
523        assert!(checker.is_healthy());
524        assert_eq!(checker.failure_count(), 0);
525    }
526}