mod cache;
mod restore;
mod s3_backend;
mod wal_monitor;
pub use crate::config::{LayerStorageConfig, SqliteReplicatorConfig};
use crate::error::Result;
use aws_sdk_s3::Client as S3Client;
use cache::WriteCache;
use restore::RestoreManager;
use s3_backend::S3Backend;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex, RwLock};
use tracing::{debug, error, info, warn};
use wal_monitor::WalMonitor;
pub use cache::CacheEntry;
pub use s3_backend::ReplicationMetadata;
pub use wal_monitor::WalEvent;
#[derive(Debug, Clone)]
pub struct ReplicationStatus {
pub running: bool,
pub pending_segments: usize,
pub pending_bytes: u64,
pub last_snapshot: Option<chrono::DateTime<chrono::Utc>>,
pub last_wal_sync: Option<chrono::DateTime<chrono::Utc>>,
pub failed_uploads: u64,
pub wal_frame_count: u64,
}
pub struct SqliteReplicator {
config: SqliteReplicatorConfig,
s3_backend: Arc<S3Backend>,
cache: Arc<WriteCache>,
wal_monitor: Arc<Mutex<Option<WalMonitor>>>,
restore_manager: RestoreManager,
running: Arc<AtomicBool>,
last_snapshot: Arc<RwLock<Option<chrono::DateTime<chrono::Utc>>>>,
last_wal_sync: Arc<RwLock<Option<chrono::DateTime<chrono::Utc>>>>,
failed_uploads: Arc<AtomicU64>,
shutdown_tx: mpsc::Sender<()>,
#[allow(dead_code)]
shutdown_rx: Arc<Mutex<mpsc::Receiver<()>>>,
}
impl SqliteReplicator {
pub async fn new(
config: SqliteReplicatorConfig,
s3_config: &LayerStorageConfig,
) -> Result<Self> {
tokio::fs::create_dir_all(&config.cache_dir).await?;
let mut aws_config_builder = aws_config::from_env();
if let Some(region) = &s3_config.region {
aws_config_builder =
aws_config_builder.region(aws_sdk_s3::config::Region::new(region.clone()));
}
let aws_config = aws_config_builder.load().await;
let s3_client_config = if let Some(endpoint) = &s3_config.endpoint_url {
aws_sdk_s3::config::Builder::from(&aws_config)
.endpoint_url(endpoint)
.force_path_style(true)
.build()
} else {
aws_sdk_s3::config::Builder::from(&aws_config).build()
};
let s3_client = S3Client::from_conf(s3_client_config);
let s3_backend = Arc::new(S3Backend::new(
s3_client,
config.s3_bucket.clone(),
config.s3_prefix.clone(),
s3_config.compression_level,
));
let cache = Arc::new(WriteCache::new(
config.cache_dir.clone(),
config.max_cache_size,
));
let restore_manager = RestoreManager::new(
config.db_path.clone(),
s3_backend.clone(),
config.cache_dir.clone(),
);
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
Ok(Self {
config,
s3_backend,
cache,
wal_monitor: Arc::new(Mutex::new(None)),
restore_manager,
running: Arc::new(AtomicBool::new(false)),
last_snapshot: Arc::new(RwLock::new(None)),
last_wal_sync: Arc::new(RwLock::new(None)),
failed_uploads: Arc::new(AtomicU64::new(0)),
shutdown_tx,
shutdown_rx: Arc::new(Mutex::new(shutdown_rx)),
})
}
pub async fn start(&self) -> Result<()> {
if self.running.load(Ordering::SeqCst) {
return Ok(());
}
info!(
"Starting SQLite replicator for {}",
self.config.db_path.display()
);
if !self.config.db_path.exists() {
if self.config.auto_restore {
info!("Database not found, attempting auto-restore from S3");
match self.restore().await {
Ok(true) => info!("Database restored from S3"),
Ok(false) => info!("No backup found in S3, starting fresh"),
Err(e) => warn!("Auto-restore failed: {}", e),
}
} else {
debug!("Database not found and auto_restore is disabled");
}
}
self.running.store(true, Ordering::SeqCst);
let wal_path = self.wal_path();
let wal_monitor = WalMonitor::new(wal_path.clone())?;
*self.wal_monitor.lock().await = Some(wal_monitor);
self.spawn_wal_monitor_task();
self.spawn_upload_worker();
self.spawn_snapshot_task();
info!("SQLite replicator started");
Ok(())
}
pub async fn flush(&self) -> Result<()> {
info!("Flushing SQLite replicator");
self.running.store(false, Ordering::SeqCst);
let _ = self.shutdown_tx.send(()).await;
if self.config.db_path.exists() {
self.create_snapshot().await?;
}
while let Some(entry) = self.cache.pop_oldest().await? {
match self.s3_backend.upload_wal_segment(&entry).await {
Ok(()) => {
debug!("Flushed WAL segment {}", entry.sequence);
self.cache.remove(&entry).await?;
}
Err(e) => {
error!("Failed to flush WAL segment {}: {}", entry.sequence, e);
return Err(e);
}
}
}
info!("SQLite replicator flushed");
Ok(())
}
pub async fn restore(&self) -> Result<bool> {
self.restore_manager.restore().await
}
#[must_use]
pub fn status(&self) -> ReplicationStatus {
let cache = self.cache.clone();
let (pending_segments, pending_bytes) = cache.stats();
ReplicationStatus {
running: self.running.load(Ordering::SeqCst),
pending_segments,
pending_bytes,
last_snapshot: None, last_wal_sync: None, failed_uploads: self.failed_uploads.load(Ordering::SeqCst),
wal_frame_count: 0, }
}
fn wal_path(&self) -> std::path::PathBuf {
let mut wal_path = self.config.db_path.clone();
let filename = wal_path
.file_name()
.unwrap_or_default()
.to_string_lossy()
.to_string();
wal_path.set_file_name(format!("{filename}-wal"));
wal_path
}
async fn create_snapshot(&self) -> Result<()> {
info!("Creating database snapshot");
if let Some(parent) = self.config.db_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let db_bytes = tokio::fs::read(&self.config.db_path).await?;
self.s3_backend.upload_snapshot(&db_bytes).await?;
self.s3_backend.update_metadata(None).await?;
*self.last_snapshot.write().await = Some(chrono::Utc::now());
info!("Database snapshot created successfully");
Ok(())
}
fn spawn_wal_monitor_task(&self) {
let running = self.running.clone();
let wal_monitor = self.wal_monitor.clone();
let cache = self.cache.clone();
let wal_path = self.wal_path();
tokio::spawn(async move {
while running.load(Ordering::SeqCst) {
let monitor_guard = wal_monitor.lock().await;
if let Some(monitor) = monitor_guard.as_ref() {
match monitor.check_for_changes().await {
Ok(Some(event)) => {
debug!("WAL change detected: {:?}", event);
if wal_path.exists() {
match tokio::fs::read(&wal_path).await {
Ok(wal_data) => {
let sequence = event.frame_count;
if let Err(e) = cache.add(sequence, wal_data).await {
error!("Failed to cache WAL segment: {}", e);
}
}
Err(e) => {
error!("Failed to read WAL file: {}", e);
}
}
}
}
Ok(None) => {
}
Err(e) => {
error!("WAL monitor error: {}", e);
}
}
}
drop(monitor_guard);
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
});
}
fn spawn_upload_worker(&self) {
let running = self.running.clone();
let cache = self.cache.clone();
let s3_backend = self.s3_backend.clone();
let failed_uploads = self.failed_uploads.clone();
let last_wal_sync = self.last_wal_sync.clone();
tokio::spawn(async move {
let mut retry_delay = tokio::time::Duration::from_secs(1);
let max_retry_delay = tokio::time::Duration::from_secs(60);
while running.load(Ordering::SeqCst) {
match cache.pop_oldest().await {
Ok(Some(entry)) => {
match s3_backend.upload_wal_segment(&entry).await {
Ok(()) => {
debug!("Uploaded WAL segment {}", entry.sequence);
if let Err(e) = cache.remove(&entry).await {
error!("Failed to remove cached entry: {}", e);
}
*last_wal_sync.write().await = Some(chrono::Utc::now());
retry_delay = tokio::time::Duration::from_secs(1);
}
Err(e) => {
warn!("Failed to upload WAL segment: {}", e);
failed_uploads.fetch_add(1, Ordering::SeqCst);
if let Err(e) = cache.add(entry.sequence, entry.data).await {
error!("Failed to re-cache entry: {}", e);
}
tokio::time::sleep(retry_delay).await;
retry_delay = std::cmp::min(retry_delay * 2, max_retry_delay);
}
}
}
Ok(None) => {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
Err(e) => {
error!("Cache error: {}", e);
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
}
});
}
fn spawn_snapshot_task(&self) {
let running = self.running.clone();
let interval = tokio::time::Duration::from_secs(self.config.snapshot_interval_secs);
let db_path = self.config.db_path.clone();
let s3_backend = self.s3_backend.clone();
let last_snapshot = self.last_snapshot.clone();
tokio::spawn(async move {
let mut interval_timer = tokio::time::interval(interval);
interval_timer.tick().await;
while running.load(Ordering::SeqCst) {
interval_timer.tick().await;
if !running.load(Ordering::SeqCst) {
break;
}
info!("Creating periodic snapshot");
if !db_path.exists() {
debug!("Database file doesn't exist, skipping snapshot");
continue;
}
match tokio::fs::read(&db_path).await {
Ok(db_bytes) => match s3_backend.upload_snapshot(&db_bytes).await {
Ok(()) => {
info!("Periodic snapshot created");
if let Err(e) = s3_backend.update_metadata(None).await {
error!("Failed to update metadata: {}", e);
}
*last_snapshot.write().await = Some(chrono::Utc::now());
}
Err(e) => {
error!("Failed to upload snapshot: {}", e);
}
},
Err(e) => {
error!("Failed to read database for snapshot: {}", e);
}
}
}
});
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_wal_path_derivation() {
let config = SqliteReplicatorConfig {
db_path: std::path::PathBuf::from("/var/lib/myapp/data.db"),
s3_bucket: "test".to_string(),
s3_prefix: "test/".to_string(),
cache_dir: std::path::PathBuf::from("/tmp/cache"),
max_cache_size: 1024,
auto_restore: false,
snapshot_interval_secs: 3600,
};
assert_eq!(config.s3_bucket, "test");
assert_eq!(config.snapshot_interval_secs, 3600);
let mut wal_path = config.db_path.clone();
let filename = wal_path.file_name().unwrap().to_string_lossy().to_string();
wal_path.set_file_name(format!("{filename}-wal"));
assert_eq!(
wal_path,
std::path::PathBuf::from("/var/lib/myapp/data.db-wal")
);
}
}