use rusqlite::Connection;
use tracing;
use crate::types::SyncChange;
pub fn export_changes_since(
conn: &Connection,
table: &str,
since: Option<&str>,
) -> Result<Vec<SyncChange>, rusqlite::Error> {
if !table.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') {
return Err(rusqlite::Error::InvalidParameterName(
"invalid table name".to_string(),
));
}
let exists: bool = conn
.prepare("SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1")?
.exists(rusqlite::params![table])?;
if !exists {
return Ok(vec![]);
}
let ts_col = detect_timestamp_column(conn, table);
let sql = match (&since, &ts_col) {
(Some(_), Some(col)) => format!(
"SELECT id, * FROM \"{table}\" \
WHERE REPLACE(\"{col}\",'T',' ') > ?1 ORDER BY id"
),
_ => format!("SELECT id, * FROM \"{table}\" ORDER BY id"),
};
let mut stmt = conn.prepare(&sql)?;
let col_count = stmt.column_count();
let col_names: Vec<String> = (0..col_count)
.map(|i| stmt.column_name(i).unwrap_or("").to_string())
.collect();
let params: Vec<Box<dyn rusqlite::types::ToSql>> = match since {
Some(ts) => vec![Box::new(ts.replace('T', " "))],
None => vec![],
};
let param_refs: Vec<&dyn rusqlite::types::ToSql> = params.iter().map(|p| p.as_ref()).collect();
let rows = stmt.query_map(param_refs.as_slice(), |row| {
let pk: serde_json::Value = match row.get::<_, i64>(0) {
Ok(i) => serde_json::json!(i),
Err(_) => match row.get::<_, String>(0) {
Ok(s) => serde_json::json!(s),
Err(_) => serde_json::json!(0),
},
};
let mut data = serde_json::Map::new();
for (i, name) in col_names.iter().enumerate().skip(1) {
let val: rusqlite::types::Value = row.get(i)?;
data.insert(name.clone(), sqlite_to_json(val));
}
Ok(SyncChange {
table_name: table.to_string(),
pk,
data: serde_json::Value::Object(data),
})
})?;
rows.collect()
}
fn detect_timestamp_column(conn: &Connection, table: &str) -> Option<String> {
let mut stmt = conn
.prepare(&format!("PRAGMA table_info(\"{table}\")"))
.ok()?;
let cols: Vec<String> = stmt
.query_map([], |row| row.get::<_, String>(1))
.ok()?
.filter_map(|r| r.ok())
.collect();
if cols.iter().any(|c| c == "updated_at") {
Some("updated_at".into())
} else if cols.iter().any(|c| c == "created_at") {
Some("created_at".into())
} else {
None
}
}
fn sqlite_to_json(val: rusqlite::types::Value) -> serde_json::Value {
match val {
rusqlite::types::Value::Null => serde_json::Value::Null,
rusqlite::types::Value::Integer(i) => serde_json::json!(i),
rusqlite::types::Value::Real(f) => serde_json::json!(f),
rusqlite::types::Value::Text(s) => serde_json::json!(s),
rusqlite::types::Value::Blob(b) => {
use base64::{engine::general_purpose::STANDARD, Engine};
serde_json::json!(STANDARD.encode(&b))
}
}
}
fn is_safe_identifier(name: &str) -> bool {
!name.is_empty() && name.chars().all(|c| c.is_ascii_alphanumeric() || c == '_')
}
fn local_table_columns(conn: &Connection, table: &str) -> Vec<String> {
let Ok(mut stmt) = conn.prepare(&format!("PRAGMA table_info(\"{table}\")")) else {
return vec![];
};
stmt.query_map([], |row| row.get::<_, String>(1))
.map(|rows| rows.filter_map(|r| r.ok()).collect())
.unwrap_or_default()
}
fn local_updated_at(conn: &Connection, table: &str, pk: &serde_json::Value) -> Option<String> {
let sql = format!("SELECT updated_at FROM \"{table}\" WHERE id = ?1");
match pk {
serde_json::Value::Number(n) => {
let id = n.as_i64()?;
conn.query_row(&sql, [id], |r| r.get::<_, Option<String>>(0))
.ok()
.flatten()
}
serde_json::Value::String(s) => conn
.query_row(&sql, [s.as_str()], |r| r.get::<_, Option<String>>(0))
.ok()
.flatten(),
_ => None,
}
}
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct ApplyReport {
pub applied: usize,
pub rejected: usize,
pub applied_max_updated_at: Option<String>,
}
pub fn apply_changes(conn: &Connection, changes: &[SyncChange]) -> Result<usize, rusqlite::Error> {
apply_changes_detailed(conn, changes).map(|r| r.applied)
}
pub fn apply_changes_detailed(
conn: &Connection,
changes: &[SyncChange],
) -> Result<ApplyReport, rusqlite::Error> {
if changes.is_empty() {
return Ok(ApplyReport::default());
}
let mut report = ApplyReport::default();
conn.execute("PRAGMA foreign_keys = OFF", [])?;
let result = apply_changes_inner(conn, changes, &mut report);
conn.execute("PRAGMA foreign_keys = ON", [])?;
result.map(|_| report)
}
pub fn max_updated_at(changes: &[SyncChange]) -> Option<String> {
changes
.iter()
.filter_map(|c| c.data.as_object())
.filter_map(|o| {
o.get("updated_at")
.and_then(|v| v.as_str())
.map(|s| s.replace('T', " "))
})
.max()
}
fn apply_changes_inner(
conn: &Connection,
changes: &[SyncChange],
report: &mut ApplyReport,
) -> Result<(), rusqlite::Error> {
for change in changes {
if !crate::types::SYNC_TABLES.contains(&change.table_name.as_str()) {
tracing::warn!(table = %change.table_name, "rejected: not in SYNC_TABLES");
report.rejected += 1;
continue;
}
let Some(obj) = change.data.as_object() else {
report.rejected += 1;
continue;
};
let local_cols = local_table_columns(conn, &change.table_name);
if local_cols.is_empty() {
report.rejected += 1;
continue;
}
let cols: Vec<&str> = obj
.keys()
.map(|k| k.as_str())
.filter(|c| local_cols.iter().any(|lc| lc == c))
.collect();
if cols.is_empty() {
report.rejected += 1;
continue;
}
if cols.iter().any(|c| !is_safe_identifier(c)) {
tracing::warn!(table = %change.table_name, "rejected: unsafe column names");
report.rejected += 1;
continue;
}
let remote_updated = obj
.get("updated_at")
.and_then(|v| v.as_str())
.map(|s| s.replace('T', " "));
if let Some(remote_ts) = &remote_updated {
if let Some(local_ts) = local_updated_at(conn, &change.table_name, &change.pk) {
let l = local_ts.replace('T', " ");
if remote_ts.as_str() <= l.as_str() {
report.rejected += 1;
continue;
}
}
}
let quoted: Vec<String> = cols.iter().map(|c| format!("\"{c}\"")).collect();
let ph: Vec<String> = (1..=cols.len()).map(|i| format!("?{i}")).collect();
let sql = format!(
"INSERT OR REPLACE INTO \"{}\" ({}) VALUES ({})",
change.table_name,
quoted.join(", "),
ph.join(", ")
);
let vals: Vec<String> = cols
.iter()
.filter_map(|c| obj.get(*c).map(json_to_sql_string))
.collect();
let params: Vec<&dyn rusqlite::types::ToSql> = vals
.iter()
.map(|v| v as &dyn rusqlite::types::ToSql)
.collect();
match conn.execute(&sql, params.as_slice()) {
Ok(_) => {
report.applied += 1;
if let Some(ts) = remote_updated {
match &report.applied_max_updated_at {
Some(cur) if cur.as_str() >= ts.as_str() => {}
_ => report.applied_max_updated_at = Some(ts),
}
}
}
Err(e) => {
tracing::warn!(
table = %change.table_name,
pk = ?change.pk,
error = %e,
"apply rejected row",
);
report.rejected += 1;
}
}
}
Ok(())
}
fn json_to_sql_string(v: &serde_json::Value) -> String {
match v {
serde_json::Value::String(s) => s.clone(),
serde_json::Value::Null => String::new(),
other => other.to_string(),
}
}
#[cfg(test)]
#[path = "sync_apply_tests.rs"]
mod tests;