use crate::databases::DatabaseConnection;
use crate::models::FieldMapping;
use anyhow::Result;
#[derive(Debug, Clone)]
pub struct IncrementalConfig {
pub track_field: String,
pub sync_interval: u64,
pub sync_deletions: bool,
pub conflict_resolution: ConflictResolution,
}
impl Default for IncrementalConfig {
fn default() -> Self {
Self {
track_field: "updated_at".to_string(),
sync_interval: 300, sync_deletions: false,
conflict_resolution: ConflictResolution::SourceWins,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConflictResolution {
SourceWins,
TargetWins,
Merge,
Error,
}
pub struct IncrementalSync {
source_db: Box<dyn DatabaseConnection>,
target_db: Box<dyn DatabaseConnection>,
config: IncrementalConfig,
last_sync_time: Option<String>,
}
impl IncrementalSync {
pub fn new(
source_db: Box<dyn DatabaseConnection>,
target_db: Box<dyn DatabaseConnection>,
config: IncrementalConfig,
) -> Self {
Self {
source_db,
target_db,
config,
last_sync_time: None,
}
}
pub fn set_last_sync_time(&mut self, time: &str) {
self.last_sync_time = Some(time.to_string());
}
pub async fn sync(
&self,
source_table: &str,
target_table: &str,
mappings: &[FieldMapping],
) -> Result<SyncResult> {
let mut result = SyncResult::default();
let last_sync = self.last_sync_time.as_deref().unwrap_or("1970-01-01 00:00:00");
let updated_rows = self.sync_updates(source_table, target_table, mappings, last_sync).await?;
result.updated_rows = updated_rows;
if self.config.sync_deletions {
let deleted_rows = self.sync_deletions(source_table, target_table, mappings).await?;
result.deleted_rows = deleted_rows;
}
result.sync_time = self.get_current_timestamp().await?;
Ok(result)
}
async fn sync_updates(
&self,
source_table: &str,
target_table: &str,
mappings: &[FieldMapping],
last_sync: &str,
) -> Result<usize> {
let track_field = &self.config.track_field;
let query = format!(
"SELECT * FROM {} WHERE {} > '{}'",
source_table, track_field, last_sync
);
let rows = self.source_db.query(&query).await?;
let mut updated_count = 0;
for row in rows {
if let serde_json::Value::Object(obj) = row {
let exists = self.check_record_exists(target_table, &obj, mappings).await?;
if exists {
self.update_record(target_table, &obj, mappings).await?;
} else {
self.insert_record(target_table, &obj, mappings).await?;
}
updated_count += 1;
}
}
Ok(updated_count)
}
async fn sync_deletions(
&self,
source_table: &str,
target_table: &str,
mappings: &[FieldMapping],
) -> Result<usize> {
let target_ids = self.get_all_ids(target_table, mappings).await?;
let source_ids = self.get_all_ids(source_table, mappings).await?;
let mut deleted_count = 0;
for id in &target_ids {
if !source_ids.contains(id) {
self.delete_record(target_table, id, mappings).await?;
deleted_count += 1;
}
}
Ok(deleted_count)
}
async fn check_record_exists(
&self,
target_table: &str,
data: &serde_json::Map<String, serde_json::Value>,
mappings: &[FieldMapping],
) -> Result<bool> {
if let Some(pk_mapping) = mappings.iter().find(|m| m.source_field == "id") {
if let Some(id_value) = data.get("id") {
let query = format!(
"SELECT COUNT(*) as count FROM {} WHERE {} = {}",
target_table,
pk_mapping.target_field,
id_value
);
let result = self.target_db.query(&query).await?;
if let Some(serde_json::Value::Object(obj)) = result.first() {
if let Some(serde_json::Value::Number(count)) = obj.get("count") {
return Ok(count.as_i64().unwrap_or(0) > 0);
}
}
}
}
Ok(false)
}
async fn update_record(
&self,
target_table: &str,
data: &serde_json::Map<String, serde_json::Value>,
mappings: &[FieldMapping],
) -> Result<()> {
let mut set_clauses = vec![];
let mut where_clause = String::new();
for mapping in mappings {
if let Some(value) = data.get(&mapping.source_field) {
if mapping.source_field == "id" {
where_clause = format!("{} = {}", mapping.target_field, value);
} else {
set_clauses.push(format!("{} = {}", mapping.target_field, value));
}
}
}
if !set_clauses.is_empty() && !where_clause.is_empty() {
let query = format!(
"UPDATE {} SET {} WHERE {}",
target_table,
set_clauses.join(", "),
where_clause
);
self.target_db.execute(&query).await?;
}
Ok(())
}
async fn insert_record(
&self,
target_table: &str,
data: &serde_json::Map<String, serde_json::Value>,
mappings: &[FieldMapping],
) -> Result<()> {
let mut fields = vec![];
let mut values = vec![];
for mapping in mappings {
if let Some(value) = data.get(&mapping.source_field) {
fields.push(mapping.target_field.clone());
values.push(value.to_string());
}
}
if !fields.is_empty() {
let query = format!(
"INSERT INTO {} ({}) VALUES ({})",
target_table,
fields.join(", "),
values.join(", ")
);
self.target_db.execute(&query).await?;
}
Ok(())
}
async fn delete_record(
&self,
target_table: &str,
id: &str,
mappings: &[FieldMapping],
) -> Result<()> {
if let Some(pk_mapping) = mappings.iter().find(|m| m.source_field == "id") {
let query = format!(
"DELETE FROM {} WHERE {} = {}",
target_table,
pk_mapping.target_field,
id
);
self.target_db.execute(&query).await?;
}
Ok(())
}
async fn get_all_ids(
&self,
table: &str,
mappings: &[FieldMapping],
) -> Result<Vec<String>> {
let mut ids = vec![];
if let Some(pk_mapping) = mappings.iter().find(|m| m.source_field == "id") {
let query = format!("SELECT {} FROM {}", pk_mapping.target_field, table);
let rows = self.source_db.query(&query).await?;
for row in rows {
if let serde_json::Value::Object(obj) = row {
if let Some(value) = obj.get(&pk_mapping.target_field) {
ids.push(value.to_string());
}
}
}
}
Ok(ids)
}
async fn get_current_timestamp(&self) -> Result<String> {
let result = self.source_db.query("SELECT datetime('now') as now").await?;
if let Some(serde_json::Value::Object(obj)) = result.first() {
if let Some(serde_json::Value::String(timestamp)) = obj.get("now") {
return Ok(timestamp.clone());
}
}
Ok(chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string())
}
}
#[derive(Debug, Clone, Default)]
pub struct SyncResult {
pub updated_rows: usize,
pub deleted_rows: usize,
pub sync_time: String,
}
impl SyncResult {
pub fn total_changes(&self) -> usize {
self.updated_rows + self.deleted_rows
}
pub fn format(&self) -> String {
format!(
"Sync completed: {} rows updated, {} rows deleted, total: {} changes at {}",
self.updated_rows,
self.deleted_rows,
self.total_changes(),
self.sync_time
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{create_connection, DatabaseType, FieldMapping};
#[tokio::test]
async fn test_incremental_sync_config() {
let config = IncrementalConfig::default();
assert_eq!(config.track_field, "updated_at");
assert_eq!(config.sync_interval, 300);
assert!(!config.sync_deletions);
assert_eq!(config.conflict_resolution, ConflictResolution::SourceWins);
}
#[tokio::test]
async fn test_sync_result() {
let result = SyncResult {
updated_rows: 10,
deleted_rows: 2,
sync_time: "2024-01-01 12:00:00".to_string(),
};
assert_eq!(result.total_changes(), 12);
assert!(result.format().contains("10 rows updated"));
assert!(result.format().contains("2 rows deleted"));
}
}