zlayer_storage/replicator/
mod.rs1mod cache;
63mod restore;
64mod s3_backend;
65mod wal_monitor;
66
67pub use crate::config::{LayerStorageConfig, SqliteReplicatorConfig};
68use crate::error::Result;
69use aws_sdk_s3::Client as S3Client;
70use cache::WriteCache;
71use restore::RestoreManager;
72use s3_backend::S3Backend;
73use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
74use std::sync::Arc;
75use tokio::sync::{mpsc, Mutex, RwLock};
76use tracing::{debug, error, info, warn};
77use wal_monitor::WalMonitor;
78
79pub use cache::CacheEntry;
80pub use s3_backend::ReplicationMetadata;
81pub use wal_monitor::WalEvent;
82
83#[derive(Debug, Clone)]
85pub struct ReplicationStatus {
86 pub running: bool,
88 pub pending_segments: usize,
90 pub pending_bytes: u64,
92 pub last_snapshot: Option<chrono::DateTime<chrono::Utc>>,
94 pub last_wal_sync: Option<chrono::DateTime<chrono::Utc>>,
96 pub failed_uploads: u64,
98 pub wal_frame_count: u64,
100}
101
102pub struct SqliteReplicator {
107 config: SqliteReplicatorConfig,
108 s3_backend: Arc<S3Backend>,
109 cache: Arc<WriteCache>,
110 wal_monitor: Arc<Mutex<Option<WalMonitor>>>,
111 restore_manager: RestoreManager,
112
113 running: Arc<AtomicBool>,
115 last_snapshot: Arc<RwLock<Option<chrono::DateTime<chrono::Utc>>>>,
116 last_wal_sync: Arc<RwLock<Option<chrono::DateTime<chrono::Utc>>>>,
117 failed_uploads: Arc<AtomicU64>,
118
119 shutdown_tx: mpsc::Sender<()>,
121 #[allow(dead_code)]
123 shutdown_rx: Arc<Mutex<mpsc::Receiver<()>>>,
124}
125
126impl SqliteReplicator {
127 pub async fn new(
139 config: SqliteReplicatorConfig,
140 s3_config: &LayerStorageConfig,
141 ) -> Result<Self> {
142 tokio::fs::create_dir_all(&config.cache_dir).await?;
144
145 let mut aws_config_builder = aws_config::from_env();
147
148 if let Some(region) = &s3_config.region {
149 aws_config_builder =
150 aws_config_builder.region(aws_sdk_s3::config::Region::new(region.clone()));
151 }
152
153 let aws_config = aws_config_builder.load().await;
154
155 let s3_client_config = if let Some(endpoint) = &s3_config.endpoint_url {
156 aws_sdk_s3::config::Builder::from(&aws_config)
157 .endpoint_url(endpoint)
158 .force_path_style(true)
159 .build()
160 } else {
161 aws_sdk_s3::config::Builder::from(&aws_config).build()
162 };
163
164 let s3_client = S3Client::from_conf(s3_client_config);
165
166 let s3_backend = Arc::new(S3Backend::new(
168 s3_client,
169 config.s3_bucket.clone(),
170 config.s3_prefix.clone(),
171 s3_config.compression_level,
172 ));
173
174 let cache = Arc::new(WriteCache::new(
176 config.cache_dir.clone(),
177 config.max_cache_size,
178 ));
179
180 let restore_manager = RestoreManager::new(
182 config.db_path.clone(),
183 s3_backend.clone(),
184 config.cache_dir.clone(),
185 );
186
187 let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
188
189 Ok(Self {
190 config,
191 s3_backend,
192 cache,
193 wal_monitor: Arc::new(Mutex::new(None)),
194 restore_manager,
195 running: Arc::new(AtomicBool::new(false)),
196 last_snapshot: Arc::new(RwLock::new(None)),
197 last_wal_sync: Arc::new(RwLock::new(None)),
198 failed_uploads: Arc::new(AtomicU64::new(0)),
199 shutdown_tx,
200 shutdown_rx: Arc::new(Mutex::new(shutdown_rx)),
201 })
202 }
203
204 pub async fn start(&self) -> Result<()> {
216 if self.running.load(Ordering::SeqCst) {
217 return Ok(());
218 }
219
220 info!(
221 "Starting SQLite replicator for {}",
222 self.config.db_path.display()
223 );
224
225 if !self.config.db_path.exists() {
227 if self.config.auto_restore {
228 info!("Database not found, attempting auto-restore from S3");
229 match self.restore().await {
230 Ok(true) => info!("Database restored from S3"),
231 Ok(false) => info!("No backup found in S3, starting fresh"),
232 Err(e) => warn!("Auto-restore failed: {}", e),
233 }
234 } else {
235 debug!("Database not found and auto_restore is disabled");
236 }
237 }
238
239 self.running.store(true, Ordering::SeqCst);
240
241 let wal_path = self.wal_path();
243 let wal_monitor = WalMonitor::new(wal_path.clone())?;
244 *self.wal_monitor.lock().await = Some(wal_monitor);
245
246 self.spawn_wal_monitor_task();
248
249 self.spawn_upload_worker();
251
252 self.spawn_snapshot_task();
254
255 info!("SQLite replicator started");
256 Ok(())
257 }
258
259 pub async fn flush(&self) -> Result<()> {
270 info!("Flushing SQLite replicator");
271
272 self.running.store(false, Ordering::SeqCst);
274 let _ = self.shutdown_tx.send(()).await;
275
276 if self.config.db_path.exists() {
278 self.create_snapshot().await?;
279 }
280
281 while let Some(entry) = self.cache.pop_oldest().await? {
283 match self.s3_backend.upload_wal_segment(&entry).await {
284 Ok(()) => {
285 debug!("Flushed WAL segment {}", entry.sequence);
286 self.cache.remove(&entry).await?;
287 }
288 Err(e) => {
289 error!("Failed to flush WAL segment {}: {}", entry.sequence, e);
290 return Err(e);
291 }
292 }
293 }
294
295 info!("SQLite replicator flushed");
296 Ok(())
297 }
298
299 pub async fn restore(&self) -> Result<bool> {
315 self.restore_manager.restore().await
316 }
317
318 #[must_use]
320 pub fn status(&self) -> ReplicationStatus {
321 let cache = self.cache.clone();
322
323 let (pending_segments, pending_bytes) = cache.stats();
325
326 ReplicationStatus {
327 running: self.running.load(Ordering::SeqCst),
328 pending_segments,
329 pending_bytes,
330 last_snapshot: None, last_wal_sync: None, failed_uploads: self.failed_uploads.load(Ordering::SeqCst),
333 wal_frame_count: 0, }
335 }
336
337 fn wal_path(&self) -> std::path::PathBuf {
339 let mut wal_path = self.config.db_path.clone();
340 let filename = wal_path
341 .file_name()
342 .unwrap_or_default()
343 .to_string_lossy()
344 .to_string();
345 wal_path.set_file_name(format!("{filename}-wal"));
346 wal_path
347 }
348
349 async fn create_snapshot(&self) -> Result<()> {
351 info!("Creating database snapshot");
352
353 if let Some(parent) = self.config.db_path.parent() {
355 tokio::fs::create_dir_all(parent).await?;
356 }
357
358 let db_bytes = tokio::fs::read(&self.config.db_path).await?;
360
361 self.s3_backend.upload_snapshot(&db_bytes).await?;
363
364 self.s3_backend.update_metadata(None).await?;
366
367 *self.last_snapshot.write().await = Some(chrono::Utc::now());
369
370 info!("Database snapshot created successfully");
371 Ok(())
372 }
373
374 fn spawn_wal_monitor_task(&self) {
376 let running = self.running.clone();
377 let wal_monitor = self.wal_monitor.clone();
378 let cache = self.cache.clone();
379 let wal_path = self.wal_path();
380
381 tokio::spawn(async move {
382 while running.load(Ordering::SeqCst) {
383 let monitor_guard = wal_monitor.lock().await;
385 if let Some(monitor) = monitor_guard.as_ref() {
386 match monitor.check_for_changes().await {
388 Ok(Some(event)) => {
389 debug!("WAL change detected: {:?}", event);
390
391 if wal_path.exists() {
393 match tokio::fs::read(&wal_path).await {
394 Ok(wal_data) => {
395 let sequence = event.frame_count;
396 if let Err(e) = cache.add(sequence, wal_data).await {
397 error!("Failed to cache WAL segment: {}", e);
398 }
399 }
400 Err(e) => {
401 error!("Failed to read WAL file: {}", e);
402 }
403 }
404 }
405 }
406 Ok(None) => {
407 }
409 Err(e) => {
410 error!("WAL monitor error: {}", e);
411 }
412 }
413 }
414 drop(monitor_guard);
415
416 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
418 }
419 });
420 }
421
422 fn spawn_upload_worker(&self) {
424 let running = self.running.clone();
425 let cache = self.cache.clone();
426 let s3_backend = self.s3_backend.clone();
427 let failed_uploads = self.failed_uploads.clone();
428 let last_wal_sync = self.last_wal_sync.clone();
429
430 tokio::spawn(async move {
431 let mut retry_delay = tokio::time::Duration::from_secs(1);
432 let max_retry_delay = tokio::time::Duration::from_secs(60);
433
434 while running.load(Ordering::SeqCst) {
435 match cache.pop_oldest().await {
437 Ok(Some(entry)) => {
438 match s3_backend.upload_wal_segment(&entry).await {
439 Ok(()) => {
440 debug!("Uploaded WAL segment {}", entry.sequence);
441 if let Err(e) = cache.remove(&entry).await {
442 error!("Failed to remove cached entry: {}", e);
443 }
444 *last_wal_sync.write().await = Some(chrono::Utc::now());
445 retry_delay = tokio::time::Duration::from_secs(1);
446 }
447 Err(e) => {
448 warn!("Failed to upload WAL segment: {}", e);
449 failed_uploads.fetch_add(1, Ordering::SeqCst);
450
451 if let Err(e) = cache.add(entry.sequence, entry.data).await {
453 error!("Failed to re-cache entry: {}", e);
454 }
455
456 tokio::time::sleep(retry_delay).await;
458 retry_delay = std::cmp::min(retry_delay * 2, max_retry_delay);
459 }
460 }
461 }
462 Ok(None) => {
463 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
465 }
466 Err(e) => {
467 error!("Cache error: {}", e);
468 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
469 }
470 }
471 }
472 });
473 }
474
475 fn spawn_snapshot_task(&self) {
477 let running = self.running.clone();
478 let interval = tokio::time::Duration::from_secs(self.config.snapshot_interval_secs);
479 let db_path = self.config.db_path.clone();
480 let s3_backend = self.s3_backend.clone();
481 let last_snapshot = self.last_snapshot.clone();
482
483 tokio::spawn(async move {
484 let mut interval_timer = tokio::time::interval(interval);
485 interval_timer.tick().await; while running.load(Ordering::SeqCst) {
488 interval_timer.tick().await;
489
490 if !running.load(Ordering::SeqCst) {
491 break;
492 }
493
494 info!("Creating periodic snapshot");
495
496 if !db_path.exists() {
497 debug!("Database file doesn't exist, skipping snapshot");
498 continue;
499 }
500
501 match tokio::fs::read(&db_path).await {
503 Ok(db_bytes) => match s3_backend.upload_snapshot(&db_bytes).await {
504 Ok(()) => {
505 info!("Periodic snapshot created");
506 if let Err(e) = s3_backend.update_metadata(None).await {
507 error!("Failed to update metadata: {}", e);
508 }
509 *last_snapshot.write().await = Some(chrono::Utc::now());
510 }
511 Err(e) => {
512 error!("Failed to upload snapshot: {}", e);
513 }
514 },
515 Err(e) => {
516 error!("Failed to read database for snapshot: {}", e);
517 }
518 }
519 }
520 });
521 }
522}
523
524#[cfg(test)]
525mod tests {
526 use super::*;
527
528 #[test]
529 fn test_wal_path_derivation() {
530 let config = SqliteReplicatorConfig {
531 db_path: std::path::PathBuf::from("/var/lib/myapp/data.db"),
532 s3_bucket: "test".to_string(),
533 s3_prefix: "test/".to_string(),
534 cache_dir: std::path::PathBuf::from("/tmp/cache"),
535 max_cache_size: 1024,
536 auto_restore: false,
537 snapshot_interval_secs: 3600,
538 };
539
540 assert_eq!(config.s3_bucket, "test");
542 assert_eq!(config.snapshot_interval_secs, 3600);
543
544 let mut wal_path = config.db_path.clone();
546 let filename = wal_path.file_name().unwrap().to_string_lossy().to_string();
547 wal_path.set_file_name(format!("{filename}-wal"));
548
549 assert_eq!(
550 wal_path,
551 std::path::PathBuf::from("/var/lib/myapp/data.db-wal")
552 );
553 }
554}