Skip to main content

zlayer_storage/replicator/
mod.rs

1//! `SQLite` WAL-based replication to S3
2//!
3//! Provides automatic backup and restore of `SQLite` databases to S3, using WAL
4//! (Write-Ahead Logging) for incremental replication. This enables crash-tolerant
5//! persistence and cross-node database restoration.
6//!
7//! # Features
8//!
9//! - **WAL Monitoring**: Detects database changes via WAL file modifications
10//! - **Network Tolerance**: Local write cache buffers changes during network outages
11//! - **Automatic Snapshots**: Periodic full database snapshots with configurable intervals
12//! - **S3 Backend**: Stores snapshots and WAL segments in S3 with zstd compression
13//! - **Auto-Restore**: Automatically restores from S3 on startup if local DB is missing
14//!
15//! # Architecture
16//!
17//! ```text
18//! SQLite DB (WAL mode)
19//!        |
20//!        v
21//! WAL Monitor (notify) --> Write Cache --> S3 Backend
22//!        |                     |               |
23//!        v                     v               v
24//!   Frame Detection      FIFO Queue    Upload/Download
25//! ```
26//!
27//! # Example
28//!
29//! ```rust,no_run
30//! use zlayer_storage::replicator::{SqliteReplicator, SqliteReplicatorConfig};
31//! use zlayer_storage::config::LayerStorageConfig;
32//!
33//! #[tokio::main]
34//! async fn main() -> anyhow::Result<()> {
35//!     let replicator_config = SqliteReplicatorConfig {
36//!         db_path: "/var/lib/myapp/data.db".into(),
37//!         s3_bucket: "my-bucket".to_string(),
38//!         s3_prefix: "sqlite-backups/myapp/".to_string(),
39//!         cache_dir: "/tmp/zlayer-replicator/cache".into(),
40//!         max_cache_size: 100 * 1024 * 1024, // 100MB
41//!         auto_restore: true,
42//!         snapshot_interval_secs: 3600, // 1 hour
43//!     };
44//!
45//!     let s3_config = LayerStorageConfig::new("my-bucket");
46//!     let replicator = SqliteReplicator::new(replicator_config, &s3_config).await?;
47//!
48//!     // Optionally restore from S3 on startup
49//!     replicator.restore().await?;
50//!
51//!     // Start background replication
52//!     replicator.start().await?;
53//!
54//!     // ... run your application ...
55//!
56//!     // Graceful shutdown
57//!     replicator.flush().await?;
58//!     Ok(())
59//! }
60//! ```
61
62mod cache;
63mod restore;
64mod s3_backend;
65mod wal_monitor;
66
67pub use crate::config::{LayerStorageConfig, SqliteReplicatorConfig};
68use crate::error::Result;
69use aws_sdk_s3::Client as S3Client;
70use cache::WriteCache;
71use restore::RestoreManager;
72use s3_backend::S3Backend;
73use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
74use std::sync::Arc;
75use tokio::sync::{mpsc, Mutex, RwLock};
76use tracing::{debug, error, info, warn};
77use wal_monitor::WalMonitor;
78
79pub use cache::CacheEntry;
80pub use s3_backend::ReplicationMetadata;
81pub use wal_monitor::WalEvent;
82
83/// Current replication status
84#[derive(Debug, Clone)]
85pub struct ReplicationStatus {
86    /// Whether the replicator is running
87    pub running: bool,
88    /// Number of WAL segments pending upload
89    pub pending_segments: usize,
90    /// Total bytes pending upload
91    pub pending_bytes: u64,
92    /// Last successful snapshot timestamp
93    pub last_snapshot: Option<chrono::DateTime<chrono::Utc>>,
94    /// Last successful WAL sync timestamp
95    pub last_wal_sync: Option<chrono::DateTime<chrono::Utc>>,
96    /// Number of failed upload attempts
97    pub failed_uploads: u64,
98    /// Current WAL frame count
99    pub wal_frame_count: u64,
100}
101
102/// `SQLite` WAL-based replicator to S3
103///
104/// Monitors a `SQLite` database's WAL file and replicates changes to S3 for
105/// persistence and disaster recovery.
106pub struct SqliteReplicator {
107    config: SqliteReplicatorConfig,
108    s3_backend: Arc<S3Backend>,
109    cache: Arc<WriteCache>,
110    wal_monitor: Arc<Mutex<Option<WalMonitor>>>,
111    restore_manager: RestoreManager,
112
113    // Runtime state
114    running: Arc<AtomicBool>,
115    last_snapshot: Arc<RwLock<Option<chrono::DateTime<chrono::Utc>>>>,
116    last_wal_sync: Arc<RwLock<Option<chrono::DateTime<chrono::Utc>>>>,
117    failed_uploads: Arc<AtomicU64>,
118
119    // Shutdown channel
120    shutdown_tx: mpsc::Sender<()>,
121    /// Receiver for shutdown signals (used by future graceful shutdown handling)
122    #[allow(dead_code)]
123    shutdown_rx: Arc<Mutex<mpsc::Receiver<()>>>,
124}
125
126impl SqliteReplicator {
127    /// Create a new `SQLite` replicator
128    ///
129    /// # Arguments
130    ///
131    /// * `config` - Replicator configuration
132    /// * `s3_config` - S3/Layer storage configuration for credentials and endpoint
133    ///
134    /// # Errors
135    ///
136    /// Returns an error if the cache directory cannot be created or if S3 client
137    /// initialization fails.
138    pub async fn new(
139        config: SqliteReplicatorConfig,
140        s3_config: &LayerStorageConfig,
141    ) -> Result<Self> {
142        // Ensure cache directory exists
143        tokio::fs::create_dir_all(&config.cache_dir).await?;
144
145        // Initialize S3 client
146        let mut aws_config_builder = aws_config::from_env();
147
148        if let Some(region) = &s3_config.region {
149            aws_config_builder =
150                aws_config_builder.region(aws_sdk_s3::config::Region::new(region.clone()));
151        }
152
153        let aws_config = aws_config_builder.load().await;
154
155        let s3_client_config = if let Some(endpoint) = &s3_config.endpoint_url {
156            aws_sdk_s3::config::Builder::from(&aws_config)
157                .endpoint_url(endpoint)
158                .force_path_style(true)
159                .build()
160        } else {
161            aws_sdk_s3::config::Builder::from(&aws_config).build()
162        };
163
164        let s3_client = S3Client::from_conf(s3_client_config);
165
166        // Create S3 backend
167        let s3_backend = Arc::new(S3Backend::new(
168            s3_client,
169            config.s3_bucket.clone(),
170            config.s3_prefix.clone(),
171            s3_config.compression_level,
172        ));
173
174        // Create write cache
175        let cache = Arc::new(WriteCache::new(
176            config.cache_dir.clone(),
177            config.max_cache_size,
178        ));
179
180        // Create restore manager
181        let restore_manager = RestoreManager::new(
182            config.db_path.clone(),
183            s3_backend.clone(),
184            config.cache_dir.clone(),
185        );
186
187        let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
188
189        Ok(Self {
190            config,
191            s3_backend,
192            cache,
193            wal_monitor: Arc::new(Mutex::new(None)),
194            restore_manager,
195            running: Arc::new(AtomicBool::new(false)),
196            last_snapshot: Arc::new(RwLock::new(None)),
197            last_wal_sync: Arc::new(RwLock::new(None)),
198            failed_uploads: Arc::new(AtomicU64::new(0)),
199            shutdown_tx,
200            shutdown_rx: Arc::new(Mutex::new(shutdown_rx)),
201        })
202    }
203
204    /// Start the replicator background tasks
205    ///
206    /// This spawns background tasks for:
207    /// - WAL file monitoring and change detection
208    /// - Cache upload worker (handles retries)
209    /// - Periodic snapshot creation
210    ///
211    /// # Errors
212    ///
213    /// Returns an error if the WAL monitor cannot be started (e.g., database
214    /// file doesn't exist).
215    pub async fn start(&self) -> Result<()> {
216        if self.running.load(Ordering::SeqCst) {
217            return Ok(());
218        }
219
220        info!(
221            "Starting SQLite replicator for {}",
222            self.config.db_path.display()
223        );
224
225        // Check if DB exists, optionally restore
226        if !self.config.db_path.exists() {
227            if self.config.auto_restore {
228                info!("Database not found, attempting auto-restore from S3");
229                match self.restore().await {
230                    Ok(true) => info!("Database restored from S3"),
231                    Ok(false) => info!("No backup found in S3, starting fresh"),
232                    Err(e) => warn!("Auto-restore failed: {}", e),
233                }
234            } else {
235                debug!("Database not found and auto_restore is disabled");
236            }
237        }
238
239        self.running.store(true, Ordering::SeqCst);
240
241        // Create WAL monitor
242        let wal_path = self.wal_path();
243        let wal_monitor = WalMonitor::new(wal_path.clone())?;
244        *self.wal_monitor.lock().await = Some(wal_monitor);
245
246        // Start WAL monitoring task
247        self.spawn_wal_monitor_task();
248
249        // Start cache upload worker
250        self.spawn_upload_worker();
251
252        // Start periodic snapshot task
253        self.spawn_snapshot_task();
254
255        info!("SQLite replicator started");
256        Ok(())
257    }
258
259    /// Force flush all pending changes to S3
260    ///
261    /// Call this before shutdown to ensure all changes are persisted. This will:
262    /// 1. Create a final snapshot
263    /// 2. Upload all pending cache entries
264    /// 3. Wait for uploads to complete
265    ///
266    /// # Errors
267    ///
268    /// Returns an error if the final snapshot or upload fails.
269    pub async fn flush(&self) -> Result<()> {
270        info!("Flushing SQLite replicator");
271
272        // Signal shutdown
273        self.running.store(false, Ordering::SeqCst);
274        let _ = self.shutdown_tx.send(()).await;
275
276        // Create final snapshot
277        if self.config.db_path.exists() {
278            self.create_snapshot().await?;
279        }
280
281        // Upload all pending cache entries
282        while let Some(entry) = self.cache.pop_oldest().await? {
283            match self.s3_backend.upload_wal_segment(&entry).await {
284                Ok(()) => {
285                    debug!("Flushed WAL segment {}", entry.sequence);
286                    self.cache.remove(&entry).await?;
287                }
288                Err(e) => {
289                    error!("Failed to flush WAL segment {}: {}", entry.sequence, e);
290                    return Err(e);
291                }
292            }
293        }
294
295        info!("SQLite replicator flushed");
296        Ok(())
297    }
298
299    /// Restore database from S3
300    ///
301    /// Downloads the latest snapshot and any subsequent WAL segments from S3,
302    /// then applies them to reconstruct the database.
303    ///
304    /// # Returns
305    ///
306    /// - `Ok(true)` if a backup was found and restored
307    /// - `Ok(false)` if no backup was found in S3
308    /// - `Err(_)` if restore failed
309    ///
310    /// # Errors
311    ///
312    /// Returns an error if downloading the snapshot or WAL segments fails, or
313    /// if applying them to reconstruct the database fails.
314    pub async fn restore(&self) -> Result<bool> {
315        self.restore_manager.restore().await
316    }
317
318    /// Get current replication status
319    #[must_use]
320    pub fn status(&self) -> ReplicationStatus {
321        let cache = self.cache.clone();
322
323        // Get cache stats synchronously from the last known state
324        let (pending_segments, pending_bytes) = cache.stats();
325
326        ReplicationStatus {
327            running: self.running.load(Ordering::SeqCst),
328            pending_segments,
329            pending_bytes,
330            last_snapshot: None, // Would need async to read
331            last_wal_sync: None, // Would need async to read
332            failed_uploads: self.failed_uploads.load(Ordering::SeqCst),
333            wal_frame_count: 0, // Would need async to read from monitor
334        }
335    }
336
337    /// Get the WAL file path for the database
338    fn wal_path(&self) -> std::path::PathBuf {
339        let mut wal_path = self.config.db_path.clone();
340        let filename = wal_path
341            .file_name()
342            .unwrap_or_default()
343            .to_string_lossy()
344            .to_string();
345        wal_path.set_file_name(format!("{filename}-wal"));
346        wal_path
347    }
348
349    /// Create a full database snapshot
350    async fn create_snapshot(&self) -> Result<()> {
351        info!("Creating database snapshot");
352
353        // Ensure parent directory exists
354        if let Some(parent) = self.config.db_path.parent() {
355            tokio::fs::create_dir_all(parent).await?;
356        }
357
358        // Read the entire database file
359        let db_bytes = tokio::fs::read(&self.config.db_path).await?;
360
361        // Upload to S3
362        self.s3_backend.upload_snapshot(&db_bytes).await?;
363
364        // Update metadata
365        self.s3_backend.update_metadata(None).await?;
366
367        // Update last snapshot time
368        *self.last_snapshot.write().await = Some(chrono::Utc::now());
369
370        info!("Database snapshot created successfully");
371        Ok(())
372    }
373
374    /// Spawn the WAL monitoring task
375    fn spawn_wal_monitor_task(&self) {
376        let running = self.running.clone();
377        let wal_monitor = self.wal_monitor.clone();
378        let cache = self.cache.clone();
379        let wal_path = self.wal_path();
380
381        tokio::spawn(async move {
382            while running.load(Ordering::SeqCst) {
383                // Check if WAL monitor is initialized
384                let monitor_guard = wal_monitor.lock().await;
385                if let Some(monitor) = monitor_guard.as_ref() {
386                    // Check for WAL changes
387                    match monitor.check_for_changes().await {
388                        Ok(Some(event)) => {
389                            debug!("WAL change detected: {:?}", event);
390
391                            // Read WAL file and add to cache
392                            if wal_path.exists() {
393                                match tokio::fs::read(&wal_path).await {
394                                    Ok(wal_data) => {
395                                        let sequence = event.frame_count;
396                                        if let Err(e) = cache.add(sequence, wal_data).await {
397                                            error!("Failed to cache WAL segment: {}", e);
398                                        }
399                                    }
400                                    Err(e) => {
401                                        error!("Failed to read WAL file: {}", e);
402                                    }
403                                }
404                            }
405                        }
406                        Ok(None) => {
407                            // No changes
408                        }
409                        Err(e) => {
410                            error!("WAL monitor error: {}", e);
411                        }
412                    }
413                }
414                drop(monitor_guard);
415
416                // Poll interval
417                tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
418            }
419        });
420    }
421
422    /// Spawn the cache upload worker
423    fn spawn_upload_worker(&self) {
424        let running = self.running.clone();
425        let cache = self.cache.clone();
426        let s3_backend = self.s3_backend.clone();
427        let failed_uploads = self.failed_uploads.clone();
428        let last_wal_sync = self.last_wal_sync.clone();
429
430        tokio::spawn(async move {
431            let mut retry_delay = tokio::time::Duration::from_secs(1);
432            let max_retry_delay = tokio::time::Duration::from_secs(60);
433
434            while running.load(Ordering::SeqCst) {
435                // Try to upload oldest entry
436                match cache.pop_oldest().await {
437                    Ok(Some(entry)) => {
438                        match s3_backend.upload_wal_segment(&entry).await {
439                            Ok(()) => {
440                                debug!("Uploaded WAL segment {}", entry.sequence);
441                                if let Err(e) = cache.remove(&entry).await {
442                                    error!("Failed to remove cached entry: {}", e);
443                                }
444                                *last_wal_sync.write().await = Some(chrono::Utc::now());
445                                retry_delay = tokio::time::Duration::from_secs(1);
446                            }
447                            Err(e) => {
448                                warn!("Failed to upload WAL segment: {}", e);
449                                failed_uploads.fetch_add(1, Ordering::SeqCst);
450
451                                // Re-add to cache for retry
452                                if let Err(e) = cache.add(entry.sequence, entry.data).await {
453                                    error!("Failed to re-cache entry: {}", e);
454                                }
455
456                                // Exponential backoff
457                                tokio::time::sleep(retry_delay).await;
458                                retry_delay = std::cmp::min(retry_delay * 2, max_retry_delay);
459                            }
460                        }
461                    }
462                    Ok(None) => {
463                        // No entries to upload, wait a bit
464                        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
465                    }
466                    Err(e) => {
467                        error!("Cache error: {}", e);
468                        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
469                    }
470                }
471            }
472        });
473    }
474
475    /// Spawn the periodic snapshot task
476    fn spawn_snapshot_task(&self) {
477        let running = self.running.clone();
478        let interval = tokio::time::Duration::from_secs(self.config.snapshot_interval_secs);
479        let db_path = self.config.db_path.clone();
480        let s3_backend = self.s3_backend.clone();
481        let last_snapshot = self.last_snapshot.clone();
482
483        tokio::spawn(async move {
484            let mut interval_timer = tokio::time::interval(interval);
485            interval_timer.tick().await; // Skip first immediate tick
486
487            while running.load(Ordering::SeqCst) {
488                interval_timer.tick().await;
489
490                if !running.load(Ordering::SeqCst) {
491                    break;
492                }
493
494                info!("Creating periodic snapshot");
495
496                if !db_path.exists() {
497                    debug!("Database file doesn't exist, skipping snapshot");
498                    continue;
499                }
500
501                // Read and upload snapshot
502                match tokio::fs::read(&db_path).await {
503                    Ok(db_bytes) => match s3_backend.upload_snapshot(&db_bytes).await {
504                        Ok(()) => {
505                            info!("Periodic snapshot created");
506                            if let Err(e) = s3_backend.update_metadata(None).await {
507                                error!("Failed to update metadata: {}", e);
508                            }
509                            *last_snapshot.write().await = Some(chrono::Utc::now());
510                        }
511                        Err(e) => {
512                            error!("Failed to upload snapshot: {}", e);
513                        }
514                    },
515                    Err(e) => {
516                        error!("Failed to read database for snapshot: {}", e);
517                    }
518                }
519            }
520        });
521    }
522}
523
524#[cfg(test)]
525mod tests {
526    use super::*;
527
528    #[test]
529    fn test_wal_path_derivation() {
530        let config = SqliteReplicatorConfig {
531            db_path: std::path::PathBuf::from("/var/lib/myapp/data.db"),
532            s3_bucket: "test".to_string(),
533            s3_prefix: "test/".to_string(),
534            cache_dir: std::path::PathBuf::from("/tmp/cache"),
535            max_cache_size: 1024,
536            auto_restore: false,
537            snapshot_interval_secs: 3600,
538        };
539
540        // Verify config creation
541        assert_eq!(config.s3_bucket, "test");
542        assert_eq!(config.snapshot_interval_secs, 3600);
543
544        // We can't fully test without async, but we can verify the path logic
545        let mut wal_path = config.db_path.clone();
546        let filename = wal_path.file_name().unwrap().to_string_lossy().to_string();
547        wal_path.set_file_name(format!("{filename}-wal"));
548
549        assert_eq!(
550            wal_path,
551            std::path::PathBuf::from("/var/lib/myapp/data.db-wal")
552        );
553    }
554}