use std::collections::HashSet;
use chrono::Utc;
use serde_json::Value;
use crate::{
cosmos::meta::CursorKey,
cosmos::{CosmosBackend, meta},
ingest::config::CycleConfig,
sources::SourceConnector,
};
pub async fn run<C>(
connector: &C,
cosmos: &dyn CosmosBackend,
key: &CursorKey,
cfg: &CycleConfig,
) -> Result<u64, String>
where
C: SourceConnector,
{
let source_ids: HashSet<String> = match connector.list_all_ids(&key.subsource).await {
Ok(ids) => ids.into_iter().collect(),
Err(e) => return Err(format!("list_all_ids: {e}")),
};
let cosmos_ids =
match list_cosmos_ids(cosmos, connector.primary_container(), &key.subsource).await {
Ok(ids) => ids,
Err(e) => return Err(format!("list cosmos ids: {e}")),
};
let now = Utc::now();
let mut deleted = 0u64;
for row in &cosmos_ids {
if source_ids.contains(&row.id) || row.already_deleted {
continue;
}
let doc = match cosmos
.get(connector.primary_container(), &row.id, &row.partition_key)
.await
{
Ok(Some(d)) => d,
Ok(None) => continue, Err(e) => return Err(format!("get doc '{}': {e}", row.id)),
};
let mut doc = doc;
if let Some(obj) = doc.as_object_mut() {
obj.insert("_deleted".into(), Value::Bool(true));
obj.insert("_deleted_at".into(), now.to_rfc3339().into());
}
if let Err(e) = cosmos.upsert(connector.primary_container(), doc).await {
return Err(format!("mark deleted '{}': {e}", row.id));
}
deleted += 1;
}
let mut cursor = meta::load(cosmos, &cfg.meta_container, key)
.await
.unwrap_or_default();
cursor.last_reconciliation_at = Some(now);
cursor.last_reconciliation_deleted = deleted;
if let Err(e) = meta::save(cosmos, &cfg.meta_container, key, &cursor).await {
tracing::warn!(
error = %e,
key = %key.id(),
"could not persist reconciliation cursor metadata"
);
}
Ok(deleted)
}
struct CosmosIdRow {
id: String,
partition_key: String,
already_deleted: bool,
}
async fn list_cosmos_ids(
cosmos: &dyn CosmosBackend,
container: &str,
subsource: &str,
) -> Result<Vec<CosmosIdRow>, String> {
let sql = "SELECT * FROM c WHERE c._partition_key = @pk".to_string();
let params = vec![("@pk".into(), subsource.into())];
let mut stream = cosmos
.query(container, &sql, params)
.await
.map_err(|e| format!("query: {e}"))?;
let mut out = Vec::new();
while let Some(page) = stream.next_page().await.map_err(|e| format!("page: {e}"))? {
for doc in page {
if let Some(obj) = doc.as_object() {
let id = obj
.get("id")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
let pk = obj
.get("_partition_key")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
let already_deleted = obj
.get("_deleted")
.and_then(Value::as_bool)
.unwrap_or(false);
out.push(CosmosIdRow {
id,
partition_key: pk,
already_deleted,
});
}
}
}
Ok(out)
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
use crate::{
cosmos::{CosmosBackend, InMemoryCosmos, meta::CursorKey},
ingest::{config::CycleConfig, test_helpers::MockConnector},
};
fn make_key() -> CursorKey {
CursorKey {
source_name: "mock".into(),
subsource: "DO".into(),
}
}
async fn insert_doc(cosmos: &InMemoryCosmos, container: &str, id: &str, pk: &str) {
let doc = json!({
"id": id,
"_partition_key": pk,
"title": format!("Doc {id}"),
});
cosmos.upsert(container, doc).await.unwrap();
}
async fn insert_deleted_doc(cosmos: &InMemoryCosmos, container: &str, id: &str, pk: &str) {
let doc = json!({
"id": id,
"_partition_key": pk,
"title": format!("Deleted {id}"),
"_deleted": true,
"_deleted_at": "2024-01-01T00:00:00Z",
});
cosmos.upsert(container, doc).await.unwrap();
}
#[tokio::test]
async fn reconcile_marks_missing_docs_deleted() {
let cosmos = InMemoryCosmos::new();
let key = make_key();
insert_doc(&cosmos, "jira-issues", "A", "DO").await;
insert_doc(&cosmos, "jira-issues", "B", "DO").await;
insert_doc(&cosmos, "jira-issues", "C", "DO").await;
let connector = MockConnector::new("mock", "jira-issues");
connector.set_list_ids(vec!["A".into(), "C".into()]);
let cfg = CycleConfig::default();
let deleted = run(&connector, &cosmos, &key, &cfg).await.unwrap();
assert_eq!(deleted, 1, "expected 1 doc marked deleted");
let b = cosmos.get("jira-issues", "B", "DO").await.unwrap().unwrap();
assert_eq!(b["_deleted"], json!(true));
assert!(b.get("_deleted_at").is_some());
let a = cosmos.get("jira-issues", "A", "DO").await.unwrap().unwrap();
assert!(a.get("_deleted").is_none() || a["_deleted"] != json!(true));
let c = cosmos.get("jira-issues", "C", "DO").await.unwrap().unwrap();
assert!(c.get("_deleted").is_none() || c["_deleted"] != json!(true));
}
#[tokio::test]
async fn reconcile_does_not_touch_already_deleted_docs() {
let cosmos = InMemoryCosmos::new();
let key = make_key();
insert_doc(&cosmos, "jira-issues", "A", "DO").await;
insert_deleted_doc(&cosmos, "jira-issues", "B", "DO").await;
let connector = MockConnector::new("mock", "jira-issues");
connector.set_list_ids(vec!["A".into()]);
let cfg = CycleConfig::default();
let deleted = run(&connector, &cosmos, &key, &cfg).await.unwrap();
assert_eq!(
deleted, 0,
"expected 0 new deletions — B was already deleted"
);
}
#[tokio::test]
async fn reconcile_updates_cursor_state() {
let cosmos = InMemoryCosmos::new();
let key = make_key();
insert_doc(&cosmos, "jira-issues", "X", "DO").await;
insert_doc(&cosmos, "jira-issues", "Y", "DO").await;
let connector = MockConnector::new("mock", "jira-issues");
connector.set_list_ids(vec!["Y".into()]);
let cfg = CycleConfig::default();
let deleted = run(&connector, &cosmos, &key, &cfg).await.unwrap();
assert_eq!(deleted, 1);
let cursor = meta::load(&cosmos, "quelch-meta", &key).await.unwrap();
assert!(
cursor.last_reconciliation_at.is_some(),
"last_reconciliation_at must be set"
);
assert_eq!(
cursor.last_reconciliation_deleted, 1,
"last_reconciliation_deleted must be 1"
);
}
#[tokio::test]
async fn reconcile_empty_source_deletes_all() {
let cosmos = InMemoryCosmos::new();
let key = make_key();
insert_doc(&cosmos, "jira-issues", "P", "DO").await;
insert_doc(&cosmos, "jira-issues", "Q", "DO").await;
let connector = MockConnector::new("mock", "jira-issues");
connector.set_list_ids(vec![]);
let cfg = CycleConfig::default();
let deleted = run(&connector, &cosmos, &key, &cfg).await.unwrap();
assert_eq!(deleted, 2);
}
#[tokio::test]
async fn reconcile_no_op_when_cosmos_empty() {
let cosmos = InMemoryCosmos::new();
let key = make_key();
let connector = MockConnector::new("mock", "jira-issues");
connector.set_list_ids(vec!["A".into()]);
let cfg = CycleConfig::default();
let deleted = run(&connector, &cosmos, &key, &cfg).await.unwrap();
assert_eq!(deleted, 0);
}
}