zlayer_storage/replicator/
mod.rs1mod cache;
63mod restore;
64mod s3_backend;
65mod wal_monitor;
66
67pub use crate::config::{LayerStorageConfig, SqliteReplicatorConfig};
68use crate::error::Result;
69use aws_sdk_s3::Client as S3Client;
70use cache::WriteCache;
71use restore::RestoreManager;
72use s3_backend::S3Backend;
73use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
74use std::sync::Arc;
75use tokio::sync::{mpsc, Mutex, RwLock};
76use tracing::{debug, error, info, warn};
77use wal_monitor::WalMonitor;
78
79pub use cache::CacheEntry;
80pub use s3_backend::ReplicationMetadata;
81pub use wal_monitor::WalEvent;
82
83#[derive(Debug, Clone)]
85pub struct ReplicationStatus {
86 pub running: bool,
88 pub pending_segments: usize,
90 pub pending_bytes: u64,
92 pub last_snapshot: Option<chrono::DateTime<chrono::Utc>>,
94 pub last_wal_sync: Option<chrono::DateTime<chrono::Utc>>,
96 pub failed_uploads: u64,
98 pub wal_frame_count: u64,
100}
101
102pub struct SqliteReplicator {
107 config: SqliteReplicatorConfig,
108 s3_backend: Arc<S3Backend>,
109 cache: Arc<WriteCache>,
110 wal_monitor: Arc<Mutex<Option<WalMonitor>>>,
111 restore_manager: RestoreManager,
112
113 running: Arc<AtomicBool>,
115 last_snapshot: Arc<RwLock<Option<chrono::DateTime<chrono::Utc>>>>,
116 last_wal_sync: Arc<RwLock<Option<chrono::DateTime<chrono::Utc>>>>,
117 failed_uploads: Arc<AtomicU64>,
118
119 shutdown_tx: mpsc::Sender<()>,
121 #[allow(dead_code)]
123 shutdown_rx: Arc<Mutex<mpsc::Receiver<()>>>,
124}
125
126impl SqliteReplicator {
127 pub async fn new(
139 config: SqliteReplicatorConfig,
140 s3_config: &LayerStorageConfig,
141 ) -> Result<Self> {
142 tokio::fs::create_dir_all(&config.cache_dir).await?;
144
145 let mut aws_config_builder = aws_config::from_env();
147
148 if let Some(region) = &s3_config.region {
149 aws_config_builder =
150 aws_config_builder.region(aws_sdk_s3::config::Region::new(region.clone()));
151 } else if s3_config.endpoint_url.is_some() {
152 aws_config_builder =
157 aws_config_builder.region(aws_sdk_s3::config::Region::new("us-east-1"));
158 }
159
160 let aws_config = aws_config_builder.load().await;
161
162 let s3_client_config = if let Some(endpoint) = &s3_config.endpoint_url {
163 aws_sdk_s3::config::Builder::from(&aws_config)
164 .endpoint_url(endpoint)
165 .force_path_style(true)
166 .build()
167 } else {
168 aws_sdk_s3::config::Builder::from(&aws_config).build()
169 };
170
171 let s3_client = S3Client::from_conf(s3_client_config);
172
173 let s3_backend = Arc::new(S3Backend::new(
175 s3_client,
176 config.s3_bucket.clone(),
177 config.s3_prefix.clone(),
178 s3_config.compression_level,
179 ));
180
181 let cache = Arc::new(WriteCache::new(
183 config.cache_dir.clone(),
184 config.max_cache_size,
185 ));
186
187 let restore_manager = RestoreManager::new(
189 config.db_path.clone(),
190 s3_backend.clone(),
191 config.cache_dir.clone(),
192 );
193
194 let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
195
196 Ok(Self {
197 config,
198 s3_backend,
199 cache,
200 wal_monitor: Arc::new(Mutex::new(None)),
201 restore_manager,
202 running: Arc::new(AtomicBool::new(false)),
203 last_snapshot: Arc::new(RwLock::new(None)),
204 last_wal_sync: Arc::new(RwLock::new(None)),
205 failed_uploads: Arc::new(AtomicU64::new(0)),
206 shutdown_tx,
207 shutdown_rx: Arc::new(Mutex::new(shutdown_rx)),
208 })
209 }
210
211 pub async fn start(&self) -> Result<()> {
223 if self.running.load(Ordering::SeqCst) {
224 return Ok(());
225 }
226
227 info!(
228 "Starting SQLite replicator for {}",
229 self.config.db_path.display()
230 );
231
232 if !self.config.db_path.exists() {
234 if self.config.auto_restore {
235 info!("Database not found, attempting auto-restore from S3");
236 match self.restore().await {
237 Ok(true) => info!("Database restored from S3"),
238 Ok(false) => info!("No backup found in S3, starting fresh"),
239 Err(e) => warn!("Auto-restore failed: {}", e),
240 }
241 } else {
242 debug!("Database not found and auto_restore is disabled");
243 }
244 }
245
246 self.running.store(true, Ordering::SeqCst);
247
248 let wal_path = self.wal_path();
250 let wal_monitor = WalMonitor::new(wal_path.clone())?;
251 *self.wal_monitor.lock().await = Some(wal_monitor);
252
253 self.spawn_wal_monitor_task();
255
256 self.spawn_upload_worker();
258
259 self.spawn_snapshot_task();
261
262 info!("SQLite replicator started");
263 Ok(())
264 }
265
266 pub async fn flush(&self) -> Result<()> {
277 info!("Flushing SQLite replicator");
278
279 self.running.store(false, Ordering::SeqCst);
281 let _ = self.shutdown_tx.send(()).await;
282
283 if self.config.db_path.exists() {
285 self.create_snapshot().await?;
286 }
287
288 while let Some(entry) = self.cache.pop_oldest().await? {
290 match self.s3_backend.upload_wal_segment(&entry).await {
291 Ok(()) => {
292 debug!("Flushed WAL segment {}", entry.sequence);
293 self.cache.remove(&entry).await?;
294 }
295 Err(e) => {
296 error!("Failed to flush WAL segment {}: {}", entry.sequence, e);
297 return Err(e);
298 }
299 }
300 }
301
302 info!("SQLite replicator flushed");
303 Ok(())
304 }
305
306 pub async fn restore(&self) -> Result<bool> {
322 self.restore_manager.restore().await
323 }
324
325 #[must_use]
327 pub fn status(&self) -> ReplicationStatus {
328 let cache = self.cache.clone();
329
330 let (pending_segments, pending_bytes) = cache.stats();
332
333 ReplicationStatus {
334 running: self.running.load(Ordering::SeqCst),
335 pending_segments,
336 pending_bytes,
337 last_snapshot: None, last_wal_sync: None, failed_uploads: self.failed_uploads.load(Ordering::SeqCst),
340 wal_frame_count: 0, }
342 }
343
344 fn wal_path(&self) -> std::path::PathBuf {
346 let mut wal_path = self.config.db_path.clone();
347 let filename = wal_path
348 .file_name()
349 .unwrap_or_default()
350 .to_string_lossy()
351 .to_string();
352 wal_path.set_file_name(format!("{filename}-wal"));
353 wal_path
354 }
355
356 async fn create_snapshot(&self) -> Result<()> {
358 info!("Creating database snapshot");
359
360 if let Some(parent) = self.config.db_path.parent() {
362 tokio::fs::create_dir_all(parent).await?;
363 }
364
365 let db_bytes = tokio::fs::read(&self.config.db_path).await?;
367
368 self.s3_backend.upload_snapshot(&db_bytes).await?;
370
371 self.s3_backend.update_metadata(None).await?;
373
374 *self.last_snapshot.write().await = Some(chrono::Utc::now());
376
377 info!("Database snapshot created successfully");
378 Ok(())
379 }
380
381 fn spawn_wal_monitor_task(&self) {
383 let running = self.running.clone();
384 let wal_monitor = self.wal_monitor.clone();
385 let cache = self.cache.clone();
386 let wal_path = self.wal_path();
387
388 tokio::spawn(async move {
389 while running.load(Ordering::SeqCst) {
390 let monitor_guard = wal_monitor.lock().await;
392 if let Some(monitor) = monitor_guard.as_ref() {
393 match monitor.check_for_changes().await {
395 Ok(Some(event)) => {
396 debug!("WAL change detected: {:?}", event);
397
398 if wal_path.exists() {
400 match tokio::fs::read(&wal_path).await {
401 Ok(wal_data) => {
402 let sequence = event.frame_count;
403 if let Err(e) = cache.add(sequence, wal_data).await {
404 error!("Failed to cache WAL segment: {}", e);
405 }
406 }
407 Err(e) => {
408 error!("Failed to read WAL file: {}", e);
409 }
410 }
411 }
412 }
413 Ok(None) => {
414 }
416 Err(e) => {
417 error!("WAL monitor error: {}", e);
418 }
419 }
420 }
421 drop(monitor_guard);
422
423 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
425 }
426 });
427 }
428
429 fn spawn_upload_worker(&self) {
431 let running = self.running.clone();
432 let cache = self.cache.clone();
433 let s3_backend = self.s3_backend.clone();
434 let failed_uploads = self.failed_uploads.clone();
435 let last_wal_sync = self.last_wal_sync.clone();
436
437 tokio::spawn(async move {
438 let mut retry_delay = tokio::time::Duration::from_secs(1);
439 let max_retry_delay = tokio::time::Duration::from_secs(60);
440
441 while running.load(Ordering::SeqCst) {
442 match cache.pop_oldest().await {
444 Ok(Some(entry)) => {
445 match s3_backend.upload_wal_segment(&entry).await {
446 Ok(()) => {
447 debug!("Uploaded WAL segment {}", entry.sequence);
448 if let Err(e) = cache.remove(&entry).await {
449 error!("Failed to remove cached entry: {}", e);
450 }
451 *last_wal_sync.write().await = Some(chrono::Utc::now());
452 retry_delay = tokio::time::Duration::from_secs(1);
453 }
454 Err(e) => {
455 warn!("Failed to upload WAL segment: {}", e);
456 failed_uploads.fetch_add(1, Ordering::SeqCst);
457
458 if let Err(e) = cache.add(entry.sequence, entry.data).await {
460 error!("Failed to re-cache entry: {}", e);
461 }
462
463 tokio::time::sleep(retry_delay).await;
465 retry_delay = std::cmp::min(retry_delay * 2, max_retry_delay);
466 }
467 }
468 }
469 Ok(None) => {
470 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
472 }
473 Err(e) => {
474 error!("Cache error: {}", e);
475 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
476 }
477 }
478 }
479 });
480 }
481
482 fn spawn_snapshot_task(&self) {
484 let running = self.running.clone();
485 let interval = tokio::time::Duration::from_secs(self.config.snapshot_interval_secs);
486 let db_path = self.config.db_path.clone();
487 let s3_backend = self.s3_backend.clone();
488 let last_snapshot = self.last_snapshot.clone();
489
490 tokio::spawn(async move {
491 let mut interval_timer = tokio::time::interval(interval);
492 interval_timer.tick().await; while running.load(Ordering::SeqCst) {
495 interval_timer.tick().await;
496
497 if !running.load(Ordering::SeqCst) {
498 break;
499 }
500
501 info!("Creating periodic snapshot");
502
503 if !db_path.exists() {
504 debug!("Database file doesn't exist, skipping snapshot");
505 continue;
506 }
507
508 match tokio::fs::read(&db_path).await {
510 Ok(db_bytes) => match s3_backend.upload_snapshot(&db_bytes).await {
511 Ok(()) => {
512 info!("Periodic snapshot created");
513 if let Err(e) = s3_backend.update_metadata(None).await {
514 error!("Failed to update metadata: {}", e);
515 }
516 *last_snapshot.write().await = Some(chrono::Utc::now());
517 }
518 Err(e) => {
519 error!("Failed to upload snapshot: {}", e);
520 }
521 },
522 Err(e) => {
523 error!("Failed to read database for snapshot: {}", e);
524 }
525 }
526 }
527 });
528 }
529}
530
531#[cfg(test)]
532mod tests {
533 use super::*;
534
535 #[test]
536 fn test_wal_path_derivation() {
537 let config = SqliteReplicatorConfig {
538 db_path: std::path::PathBuf::from("/var/lib/myapp/data.db"),
539 s3_bucket: "test".to_string(),
540 s3_prefix: "test/".to_string(),
541 cache_dir: std::path::PathBuf::from("/tmp/cache"),
542 max_cache_size: 1024,
543 auto_restore: false,
544 snapshot_interval_secs: 3600,
545 };
546
547 assert_eq!(config.s3_bucket, "test");
549 assert_eq!(config.snapshot_interval_secs, 3600);
550
551 let mut wal_path = config.db_path.clone();
553 let filename = wal_path.file_name().unwrap().to_string_lossy().to_string();
554 wal_path.set_file_name(format!("{filename}-wal"));
555
556 assert_eq!(
557 wal_path,
558 std::path::PathBuf::from("/var/lib/myapp/data.db-wal")
559 );
560 }
561}