use std::collections::HashMap;
use axum::Json;
use axum::extract::{Path, Query, State};
use axum::http::StatusCode;
use chrono::{DateTime, Utc};
use futures::StreamExt;
use kanade_shared::kv::BUCKET_JOBS;
use kanade_shared::manifest::{DisplayField, ExplodeSpec, Manifest};
use serde::Serialize;
use sqlx::Row;
use tracing::warn;
use crate::projector::explode::validate_ident;
use super::AppState;
#[derive(Serialize)]
pub struct InventoryFact {
pub job_id: String,
pub facts: serde_json::Value,
pub display: Vec<DisplayField>,
pub summary: Option<Vec<DisplayField>>,
pub collected_at: Option<DateTime<Utc>>,
pub recorded_at: Option<DateTime<Utc>>,
}
pub async fn list_for_pc(
State(state): State<AppState>,
Path(pc_id): Path<String>,
) -> Result<Json<Vec<InventoryFact>>, (StatusCode, String)> {
let rows = sqlx::query(
"SELECT job_id, facts_json, display_json, summary_json,
collected_at, recorded_at
FROM inventory_facts
WHERE pc_id = ?
ORDER BY job_id",
)
.bind(&pc_id)
.fetch_all(&state.pool)
.await
.map_err(|e| {
warn!(error = %e, %pc_id, "inventory_facts query");
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
})?;
let facts: Vec<InventoryFact> = rows.into_iter().map(row_to_fact).collect();
Ok(Json(facts))
}
#[derive(Serialize)]
pub struct InventoryRow {
pub pc_id: String,
pub facts: serde_json::Value,
pub collected_at: Option<DateTime<Utc>>,
}
#[derive(Serialize)]
pub struct InventoryByJob {
pub manifest_id: String,
pub display: Vec<DisplayField>,
pub summary: Option<Vec<DisplayField>>,
pub rows: Vec<InventoryRow>,
}
pub async fn list_for_job(
State(state): State<AppState>,
Path(manifest_id): Path<String>,
) -> Result<Json<InventoryByJob>, (StatusCode, String)> {
let rows = sqlx::query(
"SELECT pc_id, facts_json, display_json, summary_json, collected_at
FROM inventory_facts
WHERE job_id = ?
ORDER BY pc_id",
)
.bind(&manifest_id)
.fetch_all(&state.pool)
.await
.map_err(|e| {
warn!(error = %e, %manifest_id, "inventory_facts by-job query");
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
})?;
let display = rows
.iter()
.find_map(|r| {
r.try_get::<Option<String>, _>("display_json")
.ok()
.flatten()
.and_then(|s| serde_json::from_str::<Vec<DisplayField>>(&s).ok())
})
.unwrap_or_default();
let summary = rows.iter().find_map(|r| {
r.try_get::<Option<String>, _>("summary_json")
.ok()
.flatten()
.and_then(|s| serde_json::from_str::<Vec<DisplayField>>(&s).ok())
});
let inv_rows: Vec<InventoryRow> = rows
.into_iter()
.map(|r| InventoryRow {
pc_id: r.try_get("pc_id").unwrap_or_default(),
facts: r
.try_get::<String, _>("facts_json")
.ok()
.and_then(|s| serde_json::from_str(&s).ok())
.unwrap_or(serde_json::Value::Null),
collected_at: r.try_get("collected_at").ok(),
})
.collect();
Ok(Json(InventoryByJob {
manifest_id,
display,
summary,
rows: inv_rows,
}))
}
#[derive(Serialize)]
pub struct InventoryJob {
pub manifest_id: String,
pub description: Option<String>,
pub display: Vec<DisplayField>,
pub summary: Option<Vec<DisplayField>>,
}
pub async fn list_jobs(
State(state): State<AppState>,
) -> Result<Json<Vec<InventoryJob>>, (StatusCode, String)> {
let kv = state
.jetstream
.get_key_value(BUCKET_JOBS)
.await
.map_err(|e| {
(
StatusCode::SERVICE_UNAVAILABLE,
format!("get KV {BUCKET_JOBS}: {e}"),
)
})?;
let mut out = Vec::new();
let mut keys = match kv.keys().await {
Ok(k) => k,
Err(_) => return Ok(Json(out)),
};
while let Some(key) = keys.next().await {
let key = match key {
Ok(k) => k,
Err(_) => continue,
};
let entry = match kv.get(&key).await.unwrap_or(None) {
Some(b) => b,
None => continue,
};
let job: Manifest = match serde_json::from_slice(&entry) {
Ok(j) => j,
Err(_) => continue,
};
if let Some(hint) = job.inventory {
out.push(InventoryJob {
manifest_id: job.id,
description: job.description,
display: hint.display,
summary: hint.summary,
});
}
}
out.sort_by(|a, b| a.manifest_id.cmp(&b.manifest_id));
Ok(Json(out))
}
pub async fn search(
State(state): State<AppState>,
Path((manifest_id, field)): Path<(String, String)>,
Query(filters): Query<HashMap<String, String>>,
) -> Result<Json<Vec<serde_json::Map<String, serde_json::Value>>>, (StatusCode, String)> {
let spec = load_explode_spec(&state, &manifest_id, &field).await?;
crate::projector::explode::ensure_table_cached(&state.pool, &spec)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("ensure derived table: {e}"),
)
})?;
validate_ident(&spec.table)
.map_err(|e| (StatusCode::BAD_REQUEST, format!("table name: {e}")))?;
for col in &spec.columns {
validate_ident(&col.field)
.map_err(|e| (StatusCode::BAD_REQUEST, format!("column name: {e}")))?;
}
let column_csv = spec
.columns
.iter()
.map(|c| format!("\"{}\"", c.field))
.collect::<Vec<_>>()
.join(", ");
let mut sql = format!(
"SELECT pc_id, collected_at, {column_csv} FROM \"{}\"",
spec.table
);
let mut binds: Vec<String> = Vec::new();
let mut sep = " WHERE ";
for (raw_key, value) in &filters {
if raw_key == "limit" || raw_key == "offset" {
continue;
}
let (col, op) = match raw_key.split_once("__") {
Some((c, o)) => (c.to_string(), o),
None => (raw_key.clone(), "eq"),
};
if !spec.columns.iter().any(|c| c.field == col) {
return Err((
StatusCode::BAD_REQUEST,
format!("unknown column for filter: {col:?}"),
));
}
validate_ident(&col).map_err(|e| (StatusCode::BAD_REQUEST, format!("column: {e}")))?;
let escape_like = |s: &str| -> String {
s.replace('\\', "\\\\")
.replace('%', "\\%")
.replace('_', "\\_")
};
let (comparator, bound_value) = match op {
"eq" => ("=", value.clone()),
"ne" => ("<>", value.clone()),
"lt" => ("<", value.clone()),
"le" => ("<=", value.clone()),
"gt" => (">", value.clone()),
"ge" => (">=", value.clone()),
"contains" => ("LIKE_ESC", format!("%{}%", escape_like(value))),
"prefix" => ("LIKE_ESC", format!("{}%", escape_like(value))),
"suffix" => ("LIKE_ESC", format!("%{}", escape_like(value))),
other => {
return Err((
StatusCode::BAD_REQUEST,
format!("unknown filter operator {other:?}"),
));
}
};
sql.push_str(sep);
if comparator == "LIKE_ESC" {
sql.push_str(&format!("\"{col}\" LIKE ? ESCAPE '\\'"));
} else {
sql.push_str(&format!("\"{col}\" {comparator} ?"));
}
binds.push(bound_value);
sep = " AND ";
}
let limit: u32 = filters
.get("limit")
.and_then(|v| v.parse().ok())
.unwrap_or(1000)
.min(5000);
let offset: u32 = filters
.get("offset")
.and_then(|v| v.parse().ok())
.unwrap_or(0);
sql.push_str(&format!(
" ORDER BY pc_id, collected_at DESC LIMIT {limit} OFFSET {offset}"
));
let mut q = sqlx::query(&sql);
for b in &binds {
q = q.bind(b);
}
let rows = q.fetch_all(&state.pool).await.map_err(|e| {
warn!(error = %e, manifest_id, field, "explode search query");
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
})?;
let mut out: Vec<serde_json::Map<String, serde_json::Value>> = Vec::with_capacity(rows.len());
for r in rows {
let mut map = serde_json::Map::new();
if let Ok(pc_id) = r.try_get::<String, _>("pc_id") {
map.insert("pc_id".into(), serde_json::Value::String(pc_id));
}
if let Ok(Some(t)) = r.try_get::<Option<DateTime<Utc>>, _>("collected_at") {
map.insert(
"collected_at".into(),
serde_json::Value::String(t.to_rfc3339()),
);
}
for col in &spec.columns {
let v: serde_json::Value = match col.kind.as_deref() {
Some("integer") => r
.try_get::<Option<i64>, _>(col.field.as_str())
.ok()
.flatten()
.map(|i| serde_json::Value::Number(i.into()))
.unwrap_or(serde_json::Value::Null),
Some("real") => r
.try_get::<Option<f64>, _>(col.field.as_str())
.ok()
.flatten()
.and_then(serde_json::Number::from_f64)
.map(serde_json::Value::Number)
.unwrap_or(serde_json::Value::Null),
_ => r
.try_get::<Option<String>, _>(col.field.as_str())
.ok()
.flatten()
.map(serde_json::Value::String)
.unwrap_or(serde_json::Value::Null),
};
map.insert(col.field.clone(), v);
}
out.push(map);
}
Ok(Json(out))
}
async fn load_explode_spec(
state: &AppState,
manifest_id: &str,
field: &str,
) -> Result<ExplodeSpec, (StatusCode, String)> {
let kv = state
.jetstream
.get_key_value(BUCKET_JOBS)
.await
.map_err(|e| (StatusCode::SERVICE_UNAVAILABLE, format!("jobs KV: {e}")))?;
let entry = kv
.get(manifest_id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.ok_or((
StatusCode::NOT_FOUND,
format!("manifest {manifest_id:?} not registered"),
))?;
let manifest: Manifest = serde_json::from_slice(&entry).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("parse manifest: {e}"),
)
})?;
let hint = manifest.inventory.ok_or((
StatusCode::NOT_FOUND,
format!("manifest {manifest_id:?} has no inventory hint"),
))?;
let specs = hint.explode.ok_or((
StatusCode::NOT_FOUND,
format!("manifest {manifest_id:?} has no explode specs"),
))?;
specs.into_iter().find(|s| s.field == field).ok_or((
StatusCode::NOT_FOUND,
format!("manifest {manifest_id:?} has no explode field {field:?}"),
))
}
#[derive(Serialize)]
pub struct HistoryEventRow {
pub id: i64,
pub pc_id: String,
pub job_id: String,
pub field_path: String,
pub identity_json: Option<String>,
pub change_kind: String,
pub before_json: Option<String>,
pub after_json: Option<String>,
pub observed_at: Option<DateTime<Utc>>,
}
#[derive(serde::Deserialize)]
pub struct HistoryParams {
pub field: Option<String>,
pub since: Option<DateTime<Utc>>,
pub limit: Option<u32>,
}
pub async fn history_for_pc(
State(state): State<AppState>,
Path((manifest_id, pc_id)): Path<(String, String)>,
Query(params): Query<HistoryParams>,
) -> Result<Json<Vec<HistoryEventRow>>, (StatusCode, String)> {
let limit = params.limit.unwrap_or(500).min(5000);
let mut qb = sqlx::QueryBuilder::<sqlx::Sqlite>::new(
"SELECT id, pc_id, job_id, field_path, identity_json, \
change_kind, before_json, after_json, observed_at \
FROM inventory_history \
WHERE job_id = ",
);
qb.push_bind(manifest_id);
qb.push(" AND pc_id = ");
qb.push_bind(pc_id);
if let Some(f) = params.field.filter(|s| !s.is_empty()) {
qb.push(" AND field_path = ");
qb.push_bind(f);
}
if let Some(t) = params.since {
qb.push(" AND observed_at >= ");
qb.push_bind(t);
}
qb.push(" ORDER BY observed_at DESC LIMIT ");
qb.push_bind(limit as i64);
let rows = qb.build().fetch_all(&state.pool).await.map_err(|e| {
warn!(error = %e, "inventory_history per-pc query");
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
})?;
let out: Result<Vec<HistoryEventRow>, _> = rows
.into_iter()
.map(|r| {
Ok::<_, sqlx::Error>(HistoryEventRow {
id: r.try_get("id")?,
pc_id: r.try_get("pc_id")?,
job_id: r.try_get("job_id")?,
field_path: r.try_get("field_path")?,
identity_json: r.try_get("identity_json").ok(),
change_kind: r.try_get("change_kind")?,
before_json: r.try_get("before_json").ok(),
after_json: r.try_get("after_json").ok(),
observed_at: r.try_get("observed_at").ok(),
})
})
.collect();
let out = out.map_err(|e| {
warn!(error = %e, "inventory_history row decode");
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("decode history row: {e}"),
)
})?;
Ok(Json(out))
}
fn row_to_fact(r: sqlx::sqlite::SqliteRow) -> InventoryFact {
let facts: serde_json::Value = r
.try_get::<String, _>("facts_json")
.ok()
.and_then(|s| serde_json::from_str(&s).ok())
.unwrap_or(serde_json::Value::Null);
let display: Vec<DisplayField> = r
.try_get::<Option<String>, _>("display_json")
.ok()
.flatten()
.and_then(|s| serde_json::from_str(&s).ok())
.unwrap_or_default();
let summary: Option<Vec<DisplayField>> = r
.try_get::<Option<String>, _>("summary_json")
.ok()
.flatten()
.and_then(|s| serde_json::from_str(&s).ok());
InventoryFact {
job_id: r.try_get("job_id").unwrap_or_default(),
facts,
display,
summary,
collected_at: r.try_get("collected_at").ok(),
recorded_at: r.try_get("recorded_at").ok(),
}
}