use super::coalescing::{coalesce_commands, QuoteStyle};
use super::destination_factory::{DestinationHandler, PreCommitHook};
use crate::error::{CdcError, Result};
use async_trait::async_trait;
use sqlx::{sqlite::SqliteConnectOptions, SqlitePool};
use std::collections::HashMap;
use std::path::Path;
use std::str::FromStr;
use tracing::{debug, info};
pub struct SQLiteDestination {
pool: Option<SqlitePool>,
database_path: Option<String>,
}
impl SQLiteDestination {
pub fn new() -> Self {
Self {
pool: None,
database_path: None,
}
}
}
impl Default for SQLiteDestination {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl DestinationHandler for SQLiteDestination {
async fn connect(&mut self, connection_string: &str) -> Result<()> {
let db_path = if connection_string.starts_with("file://") {
connection_string
.strip_prefix("file://")
.unwrap_or(connection_string)
} else if connection_string.starts_with("sqlite://") {
connection_string
.strip_prefix("sqlite://")
.unwrap_or(connection_string)
} else {
connection_string
};
if let Some(parent) = Path::new(db_path).parent() {
if tokio::fs::metadata(parent).await.is_err() {
tokio::fs::create_dir_all(parent).await.map_err(|e| {
CdcError::generic(format!(
"Failed to create directory for SQLite database: {e}"
))
})?;
}
}
let mut options =
SqliteConnectOptions::from_str(&format!("sqlite://{db_path}")).map_err(|e| {
CdcError::generic(format!(
"Failed to parse SQLite connection string '{db_path}': {e}"
))
})?;
options = options
.create_if_missing(true)
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.foreign_keys(true);
let pool = SqlitePool::connect_with(options).await.map_err(|e| {
CdcError::generic(format!(
"Failed to connect to SQLite database '{db_path}': {e}"
))
})?;
self.pool = Some(pool);
self.database_path = Some(db_path.to_string());
info!("Connected to SQLite database: {}", db_path);
Ok(())
}
fn set_schema_mappings(&mut self, _mappings: HashMap<String, String>) {}
async fn execute_sql_batch_with_hook(
&mut self,
commands: &[String],
pre_commit_hook: Option<PreCommitHook>,
) -> Result<()> {
if commands.is_empty() {
return Ok(());
}
let pool = self
.pool
.as_ref()
.ok_or_else(|| CdcError::generic("SQLite pool not initialized"))?;
let coalesced = coalesce_commands(commands, u64::MAX, QuoteStyle::DoubleQuote);
if coalesced.len() < commands.len() {
debug!(
"Coalesced {} commands into {} statements (reduction: {:.1}%)",
commands.len(),
coalesced.len(),
(1.0 - coalesced.len() as f64 / commands.len() as f64) * 100.0
);
}
super::common::execute_sqlx_batch_with_hook(pool, &coalesced, pre_commit_hook, "SQLite")
.await
}
async fn close(&mut self) -> Result<()> {
if let Some(pool) = &self.pool {
pool.close().await;
}
self.pool = None;
self.database_path = None;
info!("SQLite connection closed successfully");
Ok(())
}
}