use anyhow::{Context, Result};
use async_nats::jetstream::{self, consumer::pull::Config as PullConfig};
use futures::StreamExt;
use kanade_shared::ExecResult;
use kanade_shared::kv::{BUCKET_SCHEDULES, STREAM_RESULTS};
use kanade_shared::manifest::{InventoryHint, Schedule};
use sqlx::SqlitePool;
use tracing::{info, warn};
const CONSUMER_NAME: &str = "backend_results_projector";
pub async fn run(js: jetstream::Context, pool: SqlitePool) -> Result<()> {
let stream = js
.get_stream(STREAM_RESULTS)
.await
.with_context(|| format!("get stream {STREAM_RESULTS}"))?;
let consumer = stream
.get_or_create_consumer(
CONSUMER_NAME,
PullConfig {
durable_name: Some(CONSUMER_NAME.into()),
ack_policy: jetstream::consumer::AckPolicy::Explicit,
..Default::default()
},
)
.await
.context("create results consumer")?;
info!(
stream = STREAM_RESULTS,
consumer = CONSUMER_NAME,
"results projector started"
);
let schedules_kv = js
.get_key_value(BUCKET_SCHEDULES)
.await
.with_context(|| format!("get KV {BUCKET_SCHEDULES}"))?;
let mut messages = consumer
.messages()
.await
.context("subscribe results messages")?;
while let Some(msg) = messages.next().await {
let msg = match msg {
Ok(m) => m,
Err(e) => {
warn!(error = %e, "results consumer error");
continue;
}
};
match serde_json::from_slice::<ExecResult>(&msg.payload) {
Ok(r) => {
if let Err(e) = insert_result(&pool, &r).await {
warn!(error = %e, request_id = %r.request_id, "insert result failed");
} else {
info!(request_id = %r.request_id, pc_id = %r.pc_id, exit_code = r.exit_code, "projected result");
}
if r.exit_code == 0 {
if let Err(e) = maybe_project_inventory(&pool, &schedules_kv, &r).await {
warn!(error = ?e, request_id = %r.request_id, "inventory fact projection failed");
}
}
}
Err(e) => warn!(error = %e, subject = %msg.subject, "deserialize ExecResult"),
}
if let Err(e) = msg.ack().await {
warn!(error = ?e, "ack results message");
}
}
Ok(())
}
async fn insert_result(pool: &SqlitePool, r: &ExecResult) -> Result<()> {
sqlx::query(
"INSERT INTO deployment_results (
request_id, pc_id, exit_code, stdout, stderr, started_at, finished_at
) VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(request_id) DO NOTHING",
)
.bind(&r.request_id)
.bind(&r.pc_id)
.bind(r.exit_code as i64)
.bind(&r.stdout)
.bind(&r.stderr)
.bind(r.started_at)
.bind(r.finished_at)
.execute(pool)
.await?;
Ok(())
}
async fn maybe_project_inventory(
pool: &SqlitePool,
schedules_kv: &async_nats::jetstream::kv::Store,
r: &ExecResult,
) -> Result<()> {
let Some(manifest_id) = r.manifest_id.as_deref() else {
return Ok(());
};
use futures::StreamExt;
let mut keys = match schedules_kv.keys().await {
Ok(k) => k,
Err(_) => return Ok(()), };
while let Some(key) = keys.next().await {
let key = match key {
Ok(k) => k,
Err(_) => continue,
};
let entry = match schedules_kv.get(&key).await? {
Some(b) => b,
None => continue,
};
let schedule: Schedule = match serde_json::from_slice(&entry) {
Ok(s) => s,
Err(_) => continue,
};
if schedule.manifest.id != manifest_id {
continue;
}
if let Some(hint) = schedule.manifest.inventory.as_ref() {
return upsert_inventory(pool, r, manifest_id, hint).await;
}
return Ok(());
}
Ok(())
}
async fn upsert_inventory(
pool: &SqlitePool,
r: &ExecResult,
manifest_id: &str,
hint: &InventoryHint,
) -> Result<()> {
let _facts: serde_json::Value = serde_json::from_str(&r.stdout)
.with_context(|| format!("manifest '{manifest_id}' stdout was not JSON"))?;
let display_json = serde_json::to_string(&hint.display)?;
sqlx::query(
"INSERT INTO inventory_facts (
pc_id, job_id, facts_json, display_json, collected_at, recorded_at
) VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(pc_id, job_id) DO UPDATE SET
facts_json = excluded.facts_json,
display_json = excluded.display_json,
collected_at = excluded.collected_at,
recorded_at = CURRENT_TIMESTAMP",
)
.bind(&r.pc_id)
.bind(manifest_id)
.bind(&r.stdout)
.bind(display_json)
.bind(r.finished_at)
.execute(pool)
.await?;
info!(
pc_id = %r.pc_id,
manifest_id,
"projected inventory fact",
);
Ok(())
}