Skip to main content

zlayer_storage/replicator/
mod.rs

1//! `SQLite` WAL-based replication to S3
2//!
3//! Provides automatic backup and restore of `SQLite` databases to S3, using WAL
4//! (Write-Ahead Logging) for incremental replication. This enables crash-tolerant
5//! persistence and cross-node database restoration.
6//!
7//! # Features
8//!
9//! - **WAL Monitoring**: Detects database changes via WAL file modifications
10//! - **Network Tolerance**: Local write cache buffers changes during network outages
11//! - **Automatic Snapshots**: Periodic full database snapshots with configurable intervals
12//! - **S3 Backend**: Stores snapshots and WAL segments in S3 with zstd compression
13//! - **Auto-Restore**: Automatically restores from S3 on startup if local DB is missing
14//!
15//! # Architecture
16//!
17//! ```text
18//! SQLite DB (WAL mode)
19//!        |
20//!        v
21//! WAL Monitor (notify) --> Write Cache --> S3 Backend
22//!        |                     |               |
23//!        v                     v               v
24//!   Frame Detection      FIFO Queue    Upload/Download
25//! ```
26//!
27//! # Example
28//!
29//! ```rust,no_run
30//! use zlayer_storage::replicator::{SqliteReplicator, SqliteReplicatorConfig};
31//! use zlayer_storage::config::LayerStorageConfig;
32//!
33//! #[tokio::main]
34//! async fn main() -> anyhow::Result<()> {
35//!     let replicator_config = SqliteReplicatorConfig {
36//!         db_path: "/var/lib/myapp/data.db".into(),
37//!         s3_bucket: "my-bucket".to_string(),
38//!         s3_prefix: "sqlite-backups/myapp/".to_string(),
39//!         cache_dir: "/var/lib/zlayer/sqlite-replicator/cache".into(),
40//!         max_cache_size: 100 * 1024 * 1024, // 100MB
41//!         auto_restore: true,
42//!         snapshot_interval_secs: 3600, // 1 hour
43//!     };
44//!
45//!     let s3_config = LayerStorageConfig::new("my-bucket");
46//!     let replicator = SqliteReplicator::new(replicator_config, &s3_config).await?;
47//!
48//!     // Optionally restore from S3 on startup
49//!     replicator.restore().await?;
50//!
51//!     // Start background replication
52//!     replicator.start().await?;
53//!
54//!     // ... run your application ...
55//!
56//!     // Graceful shutdown
57//!     replicator.flush().await?;
58//!     Ok(())
59//! }
60//! ```
61
62mod cache;
63mod restore;
64mod s3_backend;
65mod wal_monitor;
66
67pub use crate::config::{LayerStorageConfig, SqliteReplicatorConfig};
68use crate::error::Result;
69use aws_sdk_s3::Client as S3Client;
70use cache::WriteCache;
71use restore::RestoreManager;
72use s3_backend::S3Backend;
73use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
74use std::sync::Arc;
75use tokio::sync::{mpsc, Mutex, RwLock};
76use tracing::{debug, error, info, warn};
77use wal_monitor::WalMonitor;
78
79pub use cache::CacheEntry;
80pub use s3_backend::ReplicationMetadata;
81pub use wal_monitor::WalEvent;
82
83/// Current replication status
84#[derive(Debug, Clone)]
85pub struct ReplicationStatus {
86    /// Whether the replicator is running
87    pub running: bool,
88    /// Number of WAL segments pending upload
89    pub pending_segments: usize,
90    /// Total bytes pending upload
91    pub pending_bytes: u64,
92    /// Last successful snapshot timestamp
93    pub last_snapshot: Option<chrono::DateTime<chrono::Utc>>,
94    /// Last successful WAL sync timestamp
95    pub last_wal_sync: Option<chrono::DateTime<chrono::Utc>>,
96    /// Number of failed upload attempts
97    pub failed_uploads: u64,
98    /// Current WAL frame count
99    pub wal_frame_count: u64,
100}
101
102/// `SQLite` WAL-based replicator to S3
103///
104/// Monitors a `SQLite` database's WAL file and replicates changes to S3 for
105/// persistence and disaster recovery.
106pub struct SqliteReplicator {
107    config: SqliteReplicatorConfig,
108    s3_backend: Arc<S3Backend>,
109    cache: Arc<WriteCache>,
110    wal_monitor: Arc<Mutex<Option<WalMonitor>>>,
111    restore_manager: RestoreManager,
112
113    // Runtime state
114    running: Arc<AtomicBool>,
115    last_snapshot: Arc<RwLock<Option<chrono::DateTime<chrono::Utc>>>>,
116    last_wal_sync: Arc<RwLock<Option<chrono::DateTime<chrono::Utc>>>>,
117    failed_uploads: Arc<AtomicU64>,
118
119    // Shutdown channel
120    shutdown_tx: mpsc::Sender<()>,
121    /// Receiver for shutdown signals (used by future graceful shutdown handling)
122    #[allow(dead_code)]
123    shutdown_rx: Arc<Mutex<mpsc::Receiver<()>>>,
124}
125
126impl SqliteReplicator {
127    /// Create a new `SQLite` replicator
128    ///
129    /// # Arguments
130    ///
131    /// * `config` - Replicator configuration
132    /// * `s3_config` - S3/Layer storage configuration for credentials and endpoint
133    ///
134    /// # Errors
135    ///
136    /// Returns an error if the cache directory cannot be created or if S3 client
137    /// initialization fails.
138    pub async fn new(
139        config: SqliteReplicatorConfig,
140        s3_config: &LayerStorageConfig,
141    ) -> Result<Self> {
142        // Ensure cache directory exists
143        tokio::fs::create_dir_all(&config.cache_dir).await?;
144
145        // Initialize S3 client
146        let mut aws_config_builder = aws_config::from_env();
147
148        if let Some(region) = &s3_config.region {
149            aws_config_builder =
150                aws_config_builder.region(aws_sdk_s3::config::Region::new(region.clone()));
151        } else if s3_config.endpoint_url.is_some() {
152            // Custom / non-AWS endpoint (MinIO, R2, Hetzner, …): region is cosmetic for
153            // path-style S3-compatible stores. Pin a static region so the SDK never consults
154            // the IMDS region provider — off-AWS that probe blocks ~5s+ per startup and was
155            // gating daemon `Type=notify` readiness.
156            aws_config_builder =
157                aws_config_builder.region(aws_sdk_s3::config::Region::new("us-east-1"));
158        }
159
160        let aws_config = aws_config_builder.load().await;
161
162        let s3_client_config = if let Some(endpoint) = &s3_config.endpoint_url {
163            aws_sdk_s3::config::Builder::from(&aws_config)
164                .endpoint_url(endpoint)
165                .force_path_style(true)
166                .build()
167        } else {
168            aws_sdk_s3::config::Builder::from(&aws_config).build()
169        };
170
171        let s3_client = S3Client::from_conf(s3_client_config);
172
173        // Create S3 backend
174        let s3_backend = Arc::new(S3Backend::new(
175            s3_client,
176            config.s3_bucket.clone(),
177            config.s3_prefix.clone(),
178            s3_config.compression_level,
179        ));
180
181        // Create write cache
182        let cache = Arc::new(WriteCache::new(
183            config.cache_dir.clone(),
184            config.max_cache_size,
185        ));
186
187        // Create restore manager
188        let restore_manager = RestoreManager::new(
189            config.db_path.clone(),
190            s3_backend.clone(),
191            config.cache_dir.clone(),
192        );
193
194        let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
195
196        Ok(Self {
197            config,
198            s3_backend,
199            cache,
200            wal_monitor: Arc::new(Mutex::new(None)),
201            restore_manager,
202            running: Arc::new(AtomicBool::new(false)),
203            last_snapshot: Arc::new(RwLock::new(None)),
204            last_wal_sync: Arc::new(RwLock::new(None)),
205            failed_uploads: Arc::new(AtomicU64::new(0)),
206            shutdown_tx,
207            shutdown_rx: Arc::new(Mutex::new(shutdown_rx)),
208        })
209    }
210
211    /// Start the replicator background tasks
212    ///
213    /// This spawns background tasks for:
214    /// - WAL file monitoring and change detection
215    /// - Cache upload worker (handles retries)
216    /// - Periodic snapshot creation
217    ///
218    /// # Errors
219    ///
220    /// Returns an error if the WAL monitor cannot be started (e.g., database
221    /// file doesn't exist).
222    pub async fn start(&self) -> Result<()> {
223        if self.running.load(Ordering::SeqCst) {
224            return Ok(());
225        }
226
227        info!(
228            "Starting SQLite replicator for {}",
229            self.config.db_path.display()
230        );
231
232        // Check if DB exists, optionally restore
233        if !self.config.db_path.exists() {
234            if self.config.auto_restore {
235                info!("Database not found, attempting auto-restore from S3");
236                match self.restore().await {
237                    Ok(true) => info!("Database restored from S3"),
238                    Ok(false) => info!("No backup found in S3, starting fresh"),
239                    Err(e) => warn!("Auto-restore failed: {}", e),
240                }
241            } else {
242                debug!("Database not found and auto_restore is disabled");
243            }
244        }
245
246        self.running.store(true, Ordering::SeqCst);
247
248        // Create WAL monitor
249        let wal_path = self.wal_path();
250        let wal_monitor = WalMonitor::new(wal_path.clone())?;
251        *self.wal_monitor.lock().await = Some(wal_monitor);
252
253        // Start WAL monitoring task
254        self.spawn_wal_monitor_task();
255
256        // Start cache upload worker
257        self.spawn_upload_worker();
258
259        // Start periodic snapshot task
260        self.spawn_snapshot_task();
261
262        info!("SQLite replicator started");
263        Ok(())
264    }
265
266    /// Force flush all pending changes to S3
267    ///
268    /// Call this before shutdown to ensure all changes are persisted. This will:
269    /// 1. Create a final snapshot
270    /// 2. Upload all pending cache entries
271    /// 3. Wait for uploads to complete
272    ///
273    /// # Errors
274    ///
275    /// Returns an error if the final snapshot or upload fails.
276    pub async fn flush(&self) -> Result<()> {
277        info!("Flushing SQLite replicator");
278
279        // Signal shutdown
280        self.running.store(false, Ordering::SeqCst);
281        let _ = self.shutdown_tx.send(()).await;
282
283        // Create final snapshot
284        if self.config.db_path.exists() {
285            self.create_snapshot().await?;
286        }
287
288        // Upload all pending cache entries
289        while let Some(entry) = self.cache.pop_oldest().await? {
290            match self.s3_backend.upload_wal_segment(&entry).await {
291                Ok(()) => {
292                    debug!("Flushed WAL segment {}", entry.sequence);
293                    self.cache.remove(&entry).await?;
294                }
295                Err(e) => {
296                    error!("Failed to flush WAL segment {}: {}", entry.sequence, e);
297                    return Err(e);
298                }
299            }
300        }
301
302        info!("SQLite replicator flushed");
303        Ok(())
304    }
305
306    /// Restore database from S3
307    ///
308    /// Downloads the latest snapshot and any subsequent WAL segments from S3,
309    /// then applies them to reconstruct the database.
310    ///
311    /// # Returns
312    ///
313    /// - `Ok(true)` if a backup was found and restored
314    /// - `Ok(false)` if no backup was found in S3
315    /// - `Err(_)` if restore failed
316    ///
317    /// # Errors
318    ///
319    /// Returns an error if downloading the snapshot or WAL segments fails, or
320    /// if applying them to reconstruct the database fails.
321    pub async fn restore(&self) -> Result<bool> {
322        self.restore_manager.restore().await
323    }
324
325    /// Get current replication status
326    #[must_use]
327    pub fn status(&self) -> ReplicationStatus {
328        let cache = self.cache.clone();
329
330        // Get cache stats synchronously from the last known state
331        let (pending_segments, pending_bytes) = cache.stats();
332
333        ReplicationStatus {
334            running: self.running.load(Ordering::SeqCst),
335            pending_segments,
336            pending_bytes,
337            last_snapshot: None, // Would need async to read
338            last_wal_sync: None, // Would need async to read
339            failed_uploads: self.failed_uploads.load(Ordering::SeqCst),
340            wal_frame_count: 0, // Would need async to read from monitor
341        }
342    }
343
344    /// Get the WAL file path for the database
345    fn wal_path(&self) -> std::path::PathBuf {
346        let mut wal_path = self.config.db_path.clone();
347        let filename = wal_path
348            .file_name()
349            .unwrap_or_default()
350            .to_string_lossy()
351            .to_string();
352        wal_path.set_file_name(format!("{filename}-wal"));
353        wal_path
354    }
355
356    /// Create a full database snapshot
357    async fn create_snapshot(&self) -> Result<()> {
358        info!("Creating database snapshot");
359
360        // Ensure parent directory exists
361        if let Some(parent) = self.config.db_path.parent() {
362            tokio::fs::create_dir_all(parent).await?;
363        }
364
365        // Read the entire database file
366        let db_bytes = tokio::fs::read(&self.config.db_path).await?;
367
368        // Upload to S3
369        self.s3_backend.upload_snapshot(&db_bytes).await?;
370
371        // Update metadata
372        self.s3_backend.update_metadata(None).await?;
373
374        // Update last snapshot time
375        *self.last_snapshot.write().await = Some(chrono::Utc::now());
376
377        info!("Database snapshot created successfully");
378        Ok(())
379    }
380
381    /// Spawn the WAL monitoring task
382    fn spawn_wal_monitor_task(&self) {
383        let running = self.running.clone();
384        let wal_monitor = self.wal_monitor.clone();
385        let cache = self.cache.clone();
386        let wal_path = self.wal_path();
387
388        tokio::spawn(async move {
389            while running.load(Ordering::SeqCst) {
390                // Check if WAL monitor is initialized
391                let monitor_guard = wal_monitor.lock().await;
392                if let Some(monitor) = monitor_guard.as_ref() {
393                    // Check for WAL changes
394                    match monitor.check_for_changes().await {
395                        Ok(Some(event)) => {
396                            debug!("WAL change detected: {:?}", event);
397
398                            // Read WAL file and add to cache
399                            if wal_path.exists() {
400                                match tokio::fs::read(&wal_path).await {
401                                    Ok(wal_data) => {
402                                        let sequence = event.frame_count;
403                                        if let Err(e) = cache.add(sequence, wal_data).await {
404                                            error!("Failed to cache WAL segment: {}", e);
405                                        }
406                                    }
407                                    Err(e) => {
408                                        error!("Failed to read WAL file: {}", e);
409                                    }
410                                }
411                            }
412                        }
413                        Ok(None) => {
414                            // No changes
415                        }
416                        Err(e) => {
417                            error!("WAL monitor error: {}", e);
418                        }
419                    }
420                }
421                drop(monitor_guard);
422
423                // Poll interval
424                tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
425            }
426        });
427    }
428
429    /// Spawn the cache upload worker
430    fn spawn_upload_worker(&self) {
431        let running = self.running.clone();
432        let cache = self.cache.clone();
433        let s3_backend = self.s3_backend.clone();
434        let failed_uploads = self.failed_uploads.clone();
435        let last_wal_sync = self.last_wal_sync.clone();
436
437        tokio::spawn(async move {
438            let mut retry_delay = tokio::time::Duration::from_secs(1);
439            let max_retry_delay = tokio::time::Duration::from_secs(60);
440
441            while running.load(Ordering::SeqCst) {
442                // Try to upload oldest entry
443                match cache.pop_oldest().await {
444                    Ok(Some(entry)) => {
445                        match s3_backend.upload_wal_segment(&entry).await {
446                            Ok(()) => {
447                                debug!("Uploaded WAL segment {}", entry.sequence);
448                                if let Err(e) = cache.remove(&entry).await {
449                                    error!("Failed to remove cached entry: {}", e);
450                                }
451                                *last_wal_sync.write().await = Some(chrono::Utc::now());
452                                retry_delay = tokio::time::Duration::from_secs(1);
453                            }
454                            Err(e) => {
455                                warn!("Failed to upload WAL segment: {}", e);
456                                failed_uploads.fetch_add(1, Ordering::SeqCst);
457
458                                // Re-add to cache for retry
459                                if let Err(e) = cache.add(entry.sequence, entry.data).await {
460                                    error!("Failed to re-cache entry: {}", e);
461                                }
462
463                                // Exponential backoff
464                                tokio::time::sleep(retry_delay).await;
465                                retry_delay = std::cmp::min(retry_delay * 2, max_retry_delay);
466                            }
467                        }
468                    }
469                    Ok(None) => {
470                        // No entries to upload, wait a bit
471                        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
472                    }
473                    Err(e) => {
474                        error!("Cache error: {}", e);
475                        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
476                    }
477                }
478            }
479        });
480    }
481
482    /// Spawn the periodic snapshot task
483    fn spawn_snapshot_task(&self) {
484        let running = self.running.clone();
485        let interval = tokio::time::Duration::from_secs(self.config.snapshot_interval_secs);
486        let db_path = self.config.db_path.clone();
487        let s3_backend = self.s3_backend.clone();
488        let last_snapshot = self.last_snapshot.clone();
489
490        tokio::spawn(async move {
491            let mut interval_timer = tokio::time::interval(interval);
492            interval_timer.tick().await; // Skip first immediate tick
493
494            while running.load(Ordering::SeqCst) {
495                interval_timer.tick().await;
496
497                if !running.load(Ordering::SeqCst) {
498                    break;
499                }
500
501                info!("Creating periodic snapshot");
502
503                if !db_path.exists() {
504                    debug!("Database file doesn't exist, skipping snapshot");
505                    continue;
506                }
507
508                // Read and upload snapshot
509                match tokio::fs::read(&db_path).await {
510                    Ok(db_bytes) => match s3_backend.upload_snapshot(&db_bytes).await {
511                        Ok(()) => {
512                            info!("Periodic snapshot created");
513                            if let Err(e) = s3_backend.update_metadata(None).await {
514                                error!("Failed to update metadata: {}", e);
515                            }
516                            *last_snapshot.write().await = Some(chrono::Utc::now());
517                        }
518                        Err(e) => {
519                            error!("Failed to upload snapshot: {}", e);
520                        }
521                    },
522                    Err(e) => {
523                        error!("Failed to read database for snapshot: {}", e);
524                    }
525                }
526            }
527        });
528    }
529}
530
531#[cfg(test)]
532mod tests {
533    use super::*;
534
535    #[test]
536    fn test_wal_path_derivation() {
537        let config = SqliteReplicatorConfig {
538            db_path: std::path::PathBuf::from("/var/lib/myapp/data.db"),
539            s3_bucket: "test".to_string(),
540            s3_prefix: "test/".to_string(),
541            cache_dir: std::path::PathBuf::from("/tmp/cache"),
542            max_cache_size: 1024,
543            auto_restore: false,
544            snapshot_interval_secs: 3600,
545        };
546
547        // Verify config creation
548        assert_eq!(config.s3_bucket, "test");
549        assert_eq!(config.snapshot_interval_secs, 3600);
550
551        // We can't fully test without async, but we can verify the path logic
552        let mut wal_path = config.db_path.clone();
553        let filename = wal_path.file_name().unwrap().to_string_lossy().to_string();
554        wal_path.set_file_name(format!("{filename}-wal"));
555
556        assert_eq!(
557            wal_path,
558            std::path::PathBuf::from("/var/lib/myapp/data.db-wal")
559        );
560    }
561}