use chrono::Utc;
use serde_json::Value;
use crate::{
cosmos::meta::CursorKey,
cosmos::{CosmosBackend, meta},
ingest::{backfill, config::CycleConfig, window},
sources::{Companions, SourceConnector, SourceDocument},
};
#[derive(Debug)]
pub enum CycleOutcome {
Advanced {
documents_written: usize,
window_end: chrono::DateTime<Utc>,
},
BackfillInProgress { documents_written: usize },
BackfillCompleted {
documents_written: usize,
target: chrono::DateTime<Utc>,
},
NothingToDo,
Failed { error: String },
}
pub async fn run<C>(
connector: &C,
cosmos: &dyn CosmosBackend,
key: &CursorKey,
cfg: &CycleConfig,
) -> CycleOutcome
where
C: SourceConnector,
{
let cursor = match meta::load(cosmos, &cfg.meta_container, key).await {
Ok(c) => c,
Err(e) => {
return CycleOutcome::Failed {
error: format!("load cursor: {e}"),
};
}
};
if cursor.backfill_in_progress {
return backfill::resume(connector, cosmos, key, cursor, cfg).await;
}
if cursor.last_complete_minute.is_none() {
return backfill::start(connector, cosmos, key, cfg).await;
}
let now = Utc::now();
let Some(w) =
window::plan_next_window(cursor.last_complete_minute, now, cfg.safety_lag_minutes)
else {
return CycleOutcome::NothingToDo;
};
let mut total_written = 0usize;
let mut page_token: Option<String> = None;
loop {
let page = match connector
.fetch_window(
&key.subsource,
w.start,
w.end,
cfg.batch_size,
page_token.as_deref(),
)
.await
{
Ok(p) => p,
Err(e) => {
return CycleOutcome::Failed {
error: format!("fetch_window: {e}"),
};
}
};
if !page.documents.is_empty() {
let docs: Vec<Value> = page
.documents
.iter()
.map(|d| document_envelope(d, connector.source_name()))
.collect();
if let Err(e) = cosmos
.bulk_upsert(connector.primary_container(), docs)
.await
{
return CycleOutcome::Failed {
error: format!("upsert: {e}"),
};
}
}
total_written += page.documents.len();
match page.next_page_token {
Some(t) => page_token = Some(t),
None => break,
}
}
match connector.fetch_companions(&key.subsource).await {
Ok(companions) => {
if let Err(e) = upsert_companions(cosmos, &companions, cfg).await {
tracing::warn!(
error = %e,
source = connector.source_name(),
subsource = %key.subsource,
"companion upsert failed; primary docs unaffected"
);
}
}
Err(e) => {
tracing::warn!(
error = %e,
source = connector.source_name(),
subsource = %key.subsource,
"companion fetch failed; primary docs unaffected"
);
}
}
let mut new_cursor = cursor;
new_cursor.last_complete_minute = Some(w.end);
new_cursor.documents_synced_total += total_written as u64;
new_cursor.last_sync_at = Some(Utc::now());
new_cursor.last_error = None;
if let Err(e) = meta::save(cosmos, &cfg.meta_container, key, &new_cursor).await {
return CycleOutcome::Failed {
error: format!("save cursor: {e}"),
};
}
CycleOutcome::Advanced {
documents_written: total_written,
window_end: w.end,
}
}
pub(crate) fn document_envelope(doc: &SourceDocument, source_name: &str) -> Value {
let mut map = serde_json::Map::new();
map.insert("id".into(), doc.id.clone().into());
map.insert("_partition_key".into(), doc.partition_key.clone().into());
map.insert("source_name".into(), source_name.to_string().into());
map.insert("source_link".into(), doc.source_link.clone().into());
map.insert("updated".into(), doc.updated_at.to_rfc3339().into());
for (k, v) in &doc.fields {
map.insert(k.clone(), v.clone());
}
Value::Object(map)
}
async fn upsert_companions(
cosmos: &dyn CosmosBackend,
companions: &Companions,
cfg: &CycleConfig,
) -> Result<(), String> {
let pairs: &[(&str, &[SourceDocument])] = &[
("sprints", &companions.sprints),
("fix_versions", &companions.fix_versions),
("projects", &companions.projects),
("spaces", &companions.spaces),
];
for (category, docs) in pairs {
if docs.is_empty() {
continue;
}
let Some(container) = cfg.companion_containers.get(*category) else {
continue;
};
let values: Vec<Value> = docs
.iter()
.map(|d| document_envelope(d, category))
.collect();
cosmos
.bulk_upsert(container, values)
.await
.map_err(|e| format!("companion upsert ({category}): {e}"))?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::{Duration, TimeZone};
use crate::{
cosmos::{InMemoryCosmos, meta::CursorKey},
ingest::{
config::CycleConfig,
test_helpers::{MockConnector, make_source_doc},
},
sources::Companions,
};
fn make_key() -> CursorKey {
CursorKey {
source_name: "mock".into(),
subsource: "DO".into(),
}
}
fn ts(h: u32, m: u32) -> chrono::DateTime<Utc> {
Utc.with_ymd_and_hms(2024, 6, 1, h, m, 0).single().unwrap()
}
async fn seed_cursor(
cosmos: &InMemoryCosmos,
key: &CursorKey,
last_minute: chrono::DateTime<Utc>,
) {
let cursor = crate::cosmos::meta::Cursor {
last_complete_minute: Some(last_minute),
..Default::default()
};
meta::save(cosmos, "quelch-meta", key, &cursor)
.await
.unwrap();
}
#[tokio::test]
async fn cycle_advances_cursor_after_full_window() {
let cosmos = InMemoryCosmos::new();
let key = make_key();
seed_cursor(&cosmos, &key, ts(10, 0)).await;
let connector = MockConnector::new("mock", "jira-issues");
connector.push_window_page(
vec![
make_source_doc("doc-A", "DO"),
make_source_doc("doc-B", "DO"),
],
None,
);
let cfg = CycleConfig {
safety_lag_minutes: 2,
..CycleConfig::default()
};
let past = Utc::now() - Duration::minutes(15);
let past = crate::ingest::window::floor_to_minute(past);
{
let cursor = crate::cosmos::meta::Cursor {
last_complete_minute: Some(past),
..Default::default()
};
meta::save(&cosmos, "quelch-meta", &key, &cursor)
.await
.unwrap();
}
let outcome = run(&connector, &cosmos, &key, &cfg).await;
assert!(
matches!(
outcome,
CycleOutcome::Advanced {
documents_written: 2,
..
}
),
"expected Advanced(2), got {outcome:?}"
);
let loaded = meta::load(&cosmos, "quelch-meta", &key).await.unwrap();
assert!(loaded.last_complete_minute.unwrap() > past);
assert_eq!(loaded.documents_synced_total, 2);
assert!(loaded.last_error.is_none());
}
#[tokio::test]
async fn cycle_writes_docs_to_primary_container() {
let cosmos = InMemoryCosmos::new();
let key = make_key();
let past = crate::ingest::window::floor_to_minute(Utc::now() - Duration::minutes(15));
{
let cursor = crate::cosmos::meta::Cursor {
last_complete_minute: Some(past),
..Default::default()
};
meta::save(&cosmos, "quelch-meta", &key, &cursor)
.await
.unwrap();
}
let connector = MockConnector::new("mock", "jira-issues");
connector.push_window_page(vec![make_source_doc("issue-1", "DO")], None);
let cfg = CycleConfig::default();
run(&connector, &cosmos, &key, &cfg).await;
let doc = cosmos.get("jira-issues", "issue-1", "DO").await.unwrap();
assert!(doc.is_some(), "expected doc in jira-issues container");
let doc = doc.unwrap();
assert_eq!(doc["id"], "issue-1");
assert_eq!(doc["source_name"], "mock");
}
#[tokio::test]
async fn cycle_does_not_advance_cursor_on_failure() {
let cosmos = InMemoryCosmos::new();
let key = make_key();
let past = crate::ingest::window::floor_to_minute(Utc::now() - Duration::minutes(15));
{
let cursor = crate::cosmos::meta::Cursor {
last_complete_minute: Some(past),
..Default::default()
};
meta::save(&cosmos, "quelch-meta", &key, &cursor)
.await
.unwrap();
}
let connector = MockConnector::new("mock", "jira-issues");
connector.push_window_page(vec![make_source_doc("doc-1", "DO")], Some("page2".into()));
connector.push_window_error("network failure");
let cfg = CycleConfig::default();
let outcome = run(&connector, &cosmos, &key, &cfg).await;
assert!(
matches!(outcome, CycleOutcome::Failed { .. }),
"expected Failed, got {outcome:?}"
);
let loaded = meta::load(&cosmos, "quelch-meta", &key).await.unwrap();
assert_eq!(
loaded.last_complete_minute.unwrap(),
past,
"cursor should not have advanced after failure"
);
}
#[tokio::test]
async fn cycle_skips_when_no_progress_possible() {
let cosmos = InMemoryCosmos::new();
let key = make_key();
let recent = crate::ingest::window::floor_to_minute(Utc::now() - Duration::minutes(1));
{
let cursor = crate::cosmos::meta::Cursor {
last_complete_minute: Some(recent),
..Default::default()
};
meta::save(&cosmos, "quelch-meta", &key, &cursor)
.await
.unwrap();
}
let connector = MockConnector::new("mock", "jira-issues");
let cfg = CycleConfig {
safety_lag_minutes: 2,
..CycleConfig::default()
};
let outcome = run(&connector, &cosmos, &key, &cfg).await;
assert!(
matches!(outcome, CycleOutcome::NothingToDo),
"expected NothingToDo, got {outcome:?}"
);
}
#[tokio::test]
async fn cycle_writes_companions() {
let cosmos = InMemoryCosmos::new();
let key = make_key();
let past = crate::ingest::window::floor_to_minute(Utc::now() - Duration::minutes(15));
{
let cursor = crate::cosmos::meta::Cursor {
last_complete_minute: Some(past),
..Default::default()
};
meta::save(&cosmos, "quelch-meta", &key, &cursor)
.await
.unwrap();
}
let connector = MockConnector::new("mock", "jira-issues");
connector.push_window_page(vec![], None);
let mut sprint = make_source_doc("sprint-1", "DO");
sprint.id = "sprint-1".into();
let companions = Companions {
sprints: vec![sprint],
..Default::default()
};
connector.set_companions(companions);
let cfg = CycleConfig::default();
run(&connector, &cosmos, &key, &cfg).await;
let sprint_doc = cosmos.get("jira-sprints", "sprint-1", "DO").await.unwrap();
assert!(
sprint_doc.is_some(),
"expected sprint in jira-sprints container"
);
}
#[tokio::test]
async fn cycle_starts_backfill_when_cursor_unset() {
let cosmos = InMemoryCosmos::new();
let key = make_key();
let connector = MockConnector::new("mock", "jira-issues");
let cfg = CycleConfig::default();
let outcome = run(&connector, &cosmos, &key, &cfg).await;
assert!(
matches!(
outcome,
CycleOutcome::BackfillCompleted { .. } | CycleOutcome::BackfillInProgress { .. }
),
"expected BackfillCompleted or BackfillInProgress, got {outcome:?}"
);
}
#[test]
fn document_envelope_includes_required_fields() {
let doc = make_source_doc("my-id", "my-pk");
let env = document_envelope(&doc, "test-source");
assert_eq!(env["id"], "my-id");
assert_eq!(env["_partition_key"], "my-pk");
assert_eq!(env["source_name"], "test-source");
assert!(env.get("source_link").is_some());
assert!(env.get("updated").is_some());
assert_eq!(env["title"], "Doc my-id");
}
}