use std::path::Path;
use std::sync::Arc;
use bytes::Bytes;
use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
use sqlx::{Row, SqlitePool};
use tracing::{debug, info};
use crate::error::{Error, Result};
use crate::storage::StorageBackend;
#[derive(Debug, Clone)]
pub struct QueueProgress {
pub backup_id: String,
pub vhost: String,
pub queue_name: String,
pub messages_backed_up: u64,
pub last_segment_sequence: u64,
pub target_message_count: u64,
pub completed: bool,
pub updated_at: i64,
}
pub struct QueueProgressUpdate<'a> {
pub backup_id: &'a str,
pub vhost: &'a str,
pub queue_name: &'a str,
pub messages_backed_up: u64,
pub last_segment_sequence: u64,
pub target_message_count: u64,
pub completed: bool,
}
pub struct SqliteOffsetStore {
pool: SqlitePool,
db_path: String,
}
impl SqliteOffsetStore {
pub async fn new(db_path: &Path) -> Result<Self> {
let db_path_str = db_path.to_string_lossy().to_string();
let options = SqliteConnectOptions::new()
.filename(db_path)
.create_if_missing(true)
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(sqlx::sqlite::SqliteSynchronous::Normal);
let pool = SqlitePoolOptions::new()
.max_connections(2)
.connect_with(options)
.await
.map_err(|e| Error::Checkpoint(format!("Failed to open offset DB: {}", e)))?;
let store = Self {
pool,
db_path: db_path_str,
};
store.run_migrations().await?;
Ok(store)
}
async fn run_migrations(&self) -> Result<()> {
sqlx::query(
"CREATE TABLE IF NOT EXISTS queue_progress (
backup_id TEXT NOT NULL,
vhost TEXT NOT NULL,
queue_name TEXT NOT NULL,
messages_backed_up INTEGER NOT NULL DEFAULT 0,
last_segment_sequence INTEGER NOT NULL DEFAULT 0,
target_message_count INTEGER NOT NULL DEFAULT 0,
completed INTEGER NOT NULL DEFAULT 0,
updated_at INTEGER NOT NULL,
PRIMARY KEY (backup_id, vhost, queue_name)
)",
)
.execute(&self.pool)
.await?;
self.ensure_column(
"queue_progress",
"target_message_count",
"ALTER TABLE queue_progress ADD COLUMN target_message_count INTEGER NOT NULL DEFAULT 0",
)
.await?;
self.ensure_column(
"queue_progress",
"completed",
"ALTER TABLE queue_progress ADD COLUMN completed INTEGER NOT NULL DEFAULT 0",
)
.await?;
sqlx::query(
"CREATE TABLE IF NOT EXISTS backup_jobs (
backup_id TEXT PRIMARY KEY,
status TEXT NOT NULL DEFAULT 'running',
created_at INTEGER NOT NULL,
last_heartbeat INTEGER NOT NULL
)",
)
.execute(&self.pool)
.await?;
debug!("Offset store migrations complete");
Ok(())
}
async fn ensure_column(&self, table: &str, column: &str, alter_sql: &str) -> Result<()> {
let rows = sqlx::query(&format!("PRAGMA table_info({})", table))
.fetch_all(&self.pool)
.await?;
let exists = rows
.iter()
.any(|row| row.get::<String, _>("name") == column);
if !exists {
sqlx::query(alter_sql).execute(&self.pool).await?;
}
Ok(())
}
pub async fn get_progress(
&self,
backup_id: &str,
vhost: &str,
queue_name: &str,
) -> Result<Option<QueueProgress>> {
let row = sqlx::query(
"SELECT backup_id, vhost, queue_name, messages_backed_up, last_segment_sequence,
target_message_count, completed, updated_at
FROM queue_progress
WHERE backup_id = ? AND vhost = ? AND queue_name = ?",
)
.bind(backup_id)
.bind(vhost)
.bind(queue_name)
.fetch_optional(&self.pool)
.await?;
Ok(row.map(|r| QueueProgress {
backup_id: r.get("backup_id"),
vhost: r.get("vhost"),
queue_name: r.get("queue_name"),
messages_backed_up: r.get::<i64, _>("messages_backed_up") as u64,
last_segment_sequence: r.get::<i64, _>("last_segment_sequence") as u64,
target_message_count: r.get::<i64, _>("target_message_count") as u64,
completed: r.get::<i64, _>("completed") != 0,
updated_at: r.get("updated_at"),
}))
}
pub async fn set_progress(
&self,
backup_id: &str,
vhost: &str,
queue_name: &str,
messages_backed_up: u64,
last_segment_sequence: u64,
) -> Result<()> {
self.set_progress_state(QueueProgressUpdate {
backup_id,
vhost,
queue_name,
messages_backed_up,
last_segment_sequence,
target_message_count: 0,
completed: false,
})
.await
}
pub async fn set_progress_state(&self, update: QueueProgressUpdate<'_>) -> Result<()> {
let now = chrono::Utc::now().timestamp();
sqlx::query(
"INSERT INTO queue_progress (
backup_id, vhost, queue_name, messages_backed_up, last_segment_sequence,
target_message_count, completed, updated_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (backup_id, vhost, queue_name) DO UPDATE SET
messages_backed_up = excluded.messages_backed_up,
last_segment_sequence = excluded.last_segment_sequence,
target_message_count = excluded.target_message_count,
completed = excluded.completed,
updated_at = excluded.updated_at",
)
.bind(update.backup_id)
.bind(update.vhost)
.bind(update.queue_name)
.bind(update.messages_backed_up as i64)
.bind(update.last_segment_sequence as i64)
.bind(update.target_message_count as i64)
.bind(if update.completed { 1_i64 } else { 0_i64 })
.bind(now)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn set_job_status(&self, backup_id: &str, status: &str) -> Result<()> {
let now = chrono::Utc::now().timestamp();
sqlx::query(
"INSERT INTO backup_jobs (backup_id, status, created_at, last_heartbeat)
VALUES (?, ?, ?, ?)
ON CONFLICT (backup_id) DO UPDATE SET
status = excluded.status,
last_heartbeat = excluded.last_heartbeat",
)
.bind(backup_id)
.bind(status)
.bind(now)
.bind(now)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn checkpoint(&self) -> Result<()> {
sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)")
.execute(&self.pool)
.await?;
debug!("SQLite WAL checkpoint complete");
Ok(())
}
pub async fn sync_to_storage(
&self,
storage: &Arc<dyn StorageBackend>,
key: &str,
) -> Result<()> {
self.checkpoint().await?;
let data = tokio::fs::read(&self.db_path)
.await
.map_err(|e| Error::Checkpoint(format!("Failed to read offset DB: {}", e)))?;
storage.put(key, Bytes::from(data)).await?;
info!("Synced offset store to {}", key);
Ok(())
}
pub async fn try_load_from_storage(
storage: &Arc<dyn StorageBackend>,
key: &str,
local_path: &Path,
) -> Result<bool> {
match storage.get(key).await {
Ok(data) => {
if let Some(parent) = local_path.parent() {
tokio::fs::create_dir_all(parent).await.map_err(|e| {
Error::Checkpoint(format!("Failed to create directory: {}", e))
})?;
}
tokio::fs::write(local_path, &data)
.await
.map_err(|e| Error::Checkpoint(format!("Failed to write offset DB: {}", e)))?;
info!("Loaded offset store from {}", key);
Ok(true)
}
Err(_) => {
debug!("No remote offset store found at {}", key);
Ok(false)
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_sqlite_offset_store_basic() {
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join("test-offsets.db");
let store = SqliteOffsetStore::new(&db_path).await.unwrap();
let progress = store.get_progress("backup-1", "/", "orders").await.unwrap();
assert!(progress.is_none());
store
.set_progress("backup-1", "/", "orders", 100, 3)
.await
.unwrap();
let progress = store
.get_progress("backup-1", "/", "orders")
.await
.unwrap()
.unwrap();
assert_eq!(progress.messages_backed_up, 100);
assert_eq!(progress.last_segment_sequence, 3);
assert_eq!(progress.target_message_count, 0);
assert!(!progress.completed);
store
.set_progress_state(QueueProgressUpdate {
backup_id: "backup-1",
vhost: "/",
queue_name: "orders",
messages_backed_up: 200,
last_segment_sequence: 5,
target_message_count: 200,
completed: true,
})
.await
.unwrap();
let progress = store
.get_progress("backup-1", "/", "orders")
.await
.unwrap()
.unwrap();
assert_eq!(progress.messages_backed_up, 200);
assert_eq!(progress.last_segment_sequence, 5);
assert_eq!(progress.target_message_count, 200);
assert!(progress.completed);
}
#[tokio::test]
async fn test_sqlite_offset_store_job_status() {
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join("test-offsets.db");
let store = SqliteOffsetStore::new(&db_path).await.unwrap();
store.set_job_status("backup-1", "running").await.unwrap();
store.set_job_status("backup-1", "completed").await.unwrap();
}
#[tokio::test]
async fn test_sqlite_offset_store_checkpoint() {
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join("test-offsets.db");
let store = SqliteOffsetStore::new(&db_path).await.unwrap();
store
.set_progress("backup-1", "/", "queue1", 50, 1)
.await
.unwrap();
store.checkpoint().await.unwrap();
}
#[tokio::test]
async fn test_sqlite_offset_store_sync_roundtrip() {
use crate::storage::MemoryBackend;
let temp_dir = tempfile::tempdir().unwrap();
let db_path = temp_dir.path().join("test-offsets.db");
let store = SqliteOffsetStore::new(&db_path).await.unwrap();
store
.set_progress("backup-1", "/", "queue1", 42, 2)
.await
.unwrap();
let backend: Arc<dyn StorageBackend> = Arc::new(MemoryBackend::new());
store
.sync_to_storage(&backend, "state/offsets.db")
.await
.unwrap();
let new_db_path = temp_dir.path().join("loaded-offsets.db");
let loaded =
SqliteOffsetStore::try_load_from_storage(&backend, "state/offsets.db", &new_db_path)
.await
.unwrap();
assert!(loaded);
let store2 = SqliteOffsetStore::new(&new_db_path).await.unwrap();
let progress = store2
.get_progress("backup-1", "/", "queue1")
.await
.unwrap()
.unwrap();
assert_eq!(progress.messages_backed_up, 42);
assert_eq!(progress.last_segment_sequence, 2);
assert_eq!(progress.target_message_count, 0);
assert!(!progress.completed);
}
}