use anyhow::Context as _;
use tracing::{error, info};
use crate::config::{Config, DeploymentRole, SourceConfig};
use crate::cosmos::CosmosBackend;
use crate::cosmos::meta::CursorKey;
use crate::ingest::config::CycleConfig;
use crate::ingest::connector_kind::AnyConnector;
use crate::ingest::rate_limit::build_rate_limited_client;
use crate::ingest::{cycle, reconcile};
use crate::sources::SourceConnector;
#[derive(Debug, Clone, Default)]
pub struct WorkerOptions {
pub once: bool,
pub max_docs: Option<u64>,
}
pub async fn run(
config: &Config,
deployment_name: &str,
options: WorkerOptions,
) -> anyhow::Result<()> {
let sliced = crate::config::slice::for_deployment(config, deployment_name)?;
let dep = sliced
.deployments
.first()
.expect("slice::for_deployment guarantees exactly one deployment");
anyhow::ensure!(
dep.role == DeploymentRole::Ingest,
"worker only supports ingest deployments, but '{}' has role {:?}",
deployment_name,
dep.role,
);
let cycle_cfg = CycleConfig::from_config(&sliced, deployment_name);
let cosmos = build_cosmos_backend(&sliced).await?;
let connectors = build_connectors(&sliced)?;
run_with(connectors, cosmos, cycle_cfg, options).await
}
pub async fn run_with<C>(
connectors: Vec<(CursorKey, C)>,
cosmos: Box<dyn CosmosBackend>,
cfg: CycleConfig,
options: WorkerOptions,
) -> anyhow::Result<()>
where
C: SourceConnector,
{
let mut cycle_n: u64 = 0;
loop {
cycle_n += 1;
for (key, connector) in &connectors {
let outcome = cycle::run(connector, cosmos.as_ref(), key, &cfg).await;
info!(?outcome, key = %key.id(), "cycle complete");
if cycle_n.is_multiple_of(cfg.reconcile_every) {
match reconcile::run(connector, cosmos.as_ref(), key, &cfg).await {
Ok(deleted) => info!(deleted, key = %key.id(), "reconcile complete"),
Err(e) => error!(error = %e, key = %key.id(), "reconcile failed"),
}
}
}
if options.once {
break;
}
tokio::time::sleep(cfg.poll_interval).await;
}
Ok(())
}
fn build_connectors(config: &Config) -> anyhow::Result<Vec<(CursorKey, AnyConnector)>> {
let http = build_rate_limited_client(reqwest::Client::new(), config.ingest.max_retries);
let dep = config
.deployments
.first()
.expect("slice::for_deployment guarantees exactly one deployment");
let mut out: Vec<(CursorKey, AnyConnector)> = Vec::new();
for src_config in &config.sources {
match src_config {
SourceConfig::Jira(j) => {
let connector = crate::sources::jira::JiraConnector::new(j, http.clone())
.with_context(|| format!("build JiraConnector for '{}'", j.name))?;
for project in &j.projects {
out.push((
CursorKey {
deployment_name: dep.name.clone(),
source_name: j.name.clone(),
subsource: project.clone(),
},
AnyConnector::Jira(connector.clone()),
));
}
}
SourceConfig::Confluence(c) => {
let connector =
crate::sources::confluence::ConfluenceConnector::new(c, http.clone())
.with_context(|| format!("build ConfluenceConnector for '{}'", c.name))?;
for space in &c.spaces {
out.push((
CursorKey {
deployment_name: dep.name.clone(),
source_name: c.name.clone(),
subsource: space.clone(),
},
AnyConnector::Confluence(connector.clone()),
));
}
}
}
}
Ok(out)
}
async fn build_cosmos_backend(config: &Config) -> anyhow::Result<Box<dyn CosmosBackend>> {
use crate::config::StateBackend;
match &config.state.backend {
StateBackend::Cosmos => {
let account = config.cosmos.account.as_deref().ok_or_else(|| {
anyhow::anyhow!("cosmos.account is required for state.backend=cosmos")
})?;
let endpoint = if account.starts_with("https://") {
account.to_owned()
} else {
format!("https://{account}.documents.azure.com:443/")
};
let client =
crate::cosmos::CosmosClient::new(&endpoint, &config.cosmos.database).await?;
Ok(Box::new(client))
}
StateBackend::LocalFile => {
anyhow::bail!("local-file state backend is not supported in ingest mode")
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cosmos::InMemoryCosmos;
use crate::ingest::connector_kind::AnyConnector;
use crate::ingest::test_helpers::{MockConnector, make_source_doc};
fn test_key(subsource: &str) -> CursorKey {
CursorKey {
deployment_name: "test".into(),
source_name: "test-source".into(),
subsource: subsource.into(),
}
}
#[tokio::test]
async fn worker_runs_one_cycle_and_exits_when_once_set() {
let cosmos: Box<dyn CosmosBackend> = Box::new(InMemoryCosmos::new());
let mock = MockConnector::new("test-source", "jira-issues");
mock.push_window_page(vec![make_source_doc("DO-1", "DO")], None);
let connectors: Vec<(CursorKey, AnyConnector)> =
vec![(test_key("DO"), AnyConnector::Mock(mock))];
let cfg = CycleConfig {
reconcile_every: 12,
..CycleConfig::default()
};
let options = WorkerOptions {
once: true,
..Default::default()
};
run_with(connectors, cosmos, cfg, options)
.await
.expect("worker should complete without error");
}
#[tokio::test]
async fn worker_runs_multiple_connectors() {
let cosmos: Box<dyn CosmosBackend> = Box::new(InMemoryCosmos::new());
let mock_a = MockConnector::new("source-a", "jira-issues");
mock_a.push_window_page(vec![make_source_doc("A-1", "A")], None);
let mock_b = MockConnector::new("source-b", "confluence-pages");
mock_b.push_window_page(vec![make_source_doc("B-1", "ENG")], None);
let connectors: Vec<(CursorKey, AnyConnector)> = vec![
(test_key("A"), AnyConnector::Mock(mock_a)),
(test_key("B"), AnyConnector::Mock(mock_b)),
];
let cfg = CycleConfig {
reconcile_every: 100, ..CycleConfig::default()
};
let options = WorkerOptions {
once: true,
..Default::default()
};
run_with(connectors, cosmos, cfg, options)
.await
.expect("worker with multiple connectors should complete");
}
}