use std::collections::HashMap;
use axum::Json;
use axum::extract::{Path, Query, State};
use axum::http::StatusCode;
use chrono::{DateTime, Utc};
use futures::StreamExt;
use kanade_shared::kv::BUCKET_JOBS;
use kanade_shared::manifest::{DisplayField, ExplodeSpec, Manifest};
use serde::Serialize;
use sqlx::Row;
use tracing::warn;
use crate::projector::explode::validate_ident;
use super::AppState;
#[derive(Serialize)]
pub struct InventoryFact {
pub job_id: String,
pub facts: serde_json::Value,
pub display: Vec<DisplayField>,
pub summary: Option<Vec<DisplayField>>,
pub collected_at: Option<DateTime<Utc>>,
pub recorded_at: Option<DateTime<Utc>>,
}
pub async fn list_for_pc(
State(state): State<AppState>,
Path(pc_id): Path<String>,
) -> Result<Json<Vec<InventoryFact>>, (StatusCode, String)> {
let rows = sqlx::query(
"SELECT job_id, facts_json, display_json, summary_json,
collected_at, recorded_at
FROM inventory_facts
WHERE pc_id = ?
ORDER BY job_id",
)
.bind(&pc_id)
.fetch_all(&state.pool)
.await
.map_err(|e| {
warn!(error = %e, %pc_id, "inventory_facts query");
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
})?;
let facts: Vec<InventoryFact> = rows.into_iter().map(row_to_fact).collect();
Ok(Json(facts))
}
#[derive(Serialize)]
pub struct InventoryRow {
pub pc_id: String,
pub facts: serde_json::Value,
pub collected_at: Option<DateTime<Utc>>,
}
#[derive(Serialize)]
pub struct InventoryByJob {
pub manifest_id: String,
pub display: Vec<DisplayField>,
pub summary: Option<Vec<DisplayField>>,
pub rows: Vec<InventoryRow>,
}
pub async fn list_for_job(
State(state): State<AppState>,
Path(manifest_id): Path<String>,
) -> Result<Json<InventoryByJob>, (StatusCode, String)> {
let rows = sqlx::query(
"SELECT pc_id, facts_json, display_json, summary_json, collected_at
FROM inventory_facts
WHERE job_id = ?
ORDER BY pc_id",
)
.bind(&manifest_id)
.fetch_all(&state.pool)
.await
.map_err(|e| {
warn!(error = %e, %manifest_id, "inventory_facts by-job query");
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
})?;
let display = rows
.iter()
.find_map(|r| {
r.try_get::<Option<String>, _>("display_json")
.ok()
.flatten()
.and_then(|s| serde_json::from_str::<Vec<DisplayField>>(&s).ok())
})
.unwrap_or_default();
let summary = rows.iter().find_map(|r| {
r.try_get::<Option<String>, _>("summary_json")
.ok()
.flatten()
.and_then(|s| serde_json::from_str::<Vec<DisplayField>>(&s).ok())
});
let inv_rows: Vec<InventoryRow> = rows
.into_iter()
.map(|r| InventoryRow {
pc_id: r.try_get("pc_id").unwrap_or_default(),
facts: r
.try_get::<String, _>("facts_json")
.ok()
.and_then(|s| serde_json::from_str(&s).ok())
.unwrap_or(serde_json::Value::Null),
collected_at: r.try_get("collected_at").ok(),
})
.collect();
Ok(Json(InventoryByJob {
manifest_id,
display,
summary,
rows: inv_rows,
}))
}
#[derive(Serialize)]
pub struct InventoryJob {
pub manifest_id: String,
pub description: Option<String>,
pub display: Vec<DisplayField>,
pub summary: Option<Vec<DisplayField>>,
pub explode: Option<Vec<ExplodeSpec>>,
}
pub async fn list_jobs(
State(state): State<AppState>,
) -> Result<Json<Vec<InventoryJob>>, (StatusCode, String)> {
let kv = state
.jetstream
.get_key_value(BUCKET_JOBS)
.await
.map_err(|e| {
(
StatusCode::SERVICE_UNAVAILABLE,
format!("get KV {BUCKET_JOBS}: {e}"),
)
})?;
let mut out = Vec::new();
let mut keys = match kv.keys().await {
Ok(k) => k,
Err(_) => return Ok(Json(out)),
};
while let Some(key) = keys.next().await {
let key = match key {
Ok(k) => k,
Err(_) => continue,
};
let entry = match kv.get(&key).await.unwrap_or(None) {
Some(b) => b,
None => continue,
};
let job: Manifest = match serde_json::from_slice(&entry) {
Ok(j) => j,
Err(_) => continue,
};
if let Some(hint) = job.inventory {
out.push(InventoryJob {
manifest_id: job.id,
description: job.description,
display: hint.display,
summary: hint.summary,
explode: hint.explode,
});
}
}
out.sort_by(|a, b| a.manifest_id.cmp(&b.manifest_id));
Ok(Json(out))
}
pub async fn search(
State(state): State<AppState>,
Path((manifest_id, field)): Path<(String, String)>,
Query(filters): Query<HashMap<String, String>>,
) -> Result<Json<Vec<serde_json::Map<String, serde_json::Value>>>, (StatusCode, String)> {
let spec = load_explode_spec(&state, &manifest_id, &field).await?;
crate::projector::explode::ensure_table_cached(&state.pool, &spec)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("ensure derived table: {e}"),
)
})?;
validate_ident(&spec.table)
.map_err(|e| (StatusCode::BAD_REQUEST, format!("table name: {e}")))?;
for col in &spec.columns {
validate_ident(&col.field)
.map_err(|e| (StatusCode::BAD_REQUEST, format!("column name: {e}")))?;
}
let column_csv = spec
.columns
.iter()
.map(|c| format!("\"{}\"", c.field))
.collect::<Vec<_>>()
.join(", ");
let mut sql = format!(
"SELECT pc_id, collected_at, {column_csv} FROM \"{}\"",
spec.table
);
let mut binds: Vec<String> = Vec::new();
let mut sep = " WHERE ";
for (raw_key, value) in &filters {
if raw_key == "limit" || raw_key == "offset" {
continue;
}
let (col, op) = match raw_key.split_once("__") {
Some((c, o)) => (c.to_string(), o),
None => (raw_key.clone(), "eq"),
};
if !spec.columns.iter().any(|c| c.field == col) {
return Err((
StatusCode::BAD_REQUEST,
format!("unknown column for filter: {col:?}"),
));
}
validate_ident(&col).map_err(|e| (StatusCode::BAD_REQUEST, format!("column: {e}")))?;
let escape_like = |s: &str| -> String {
s.replace('\\', "\\\\")
.replace('%', "\\%")
.replace('_', "\\_")
};
let (comparator, bound_value) = match op {
"eq" => ("=", value.clone()),
"ne" => ("<>", value.clone()),
"lt" => ("<", value.clone()),
"le" => ("<=", value.clone()),
"gt" => (">", value.clone()),
"ge" => (">=", value.clone()),
"contains" => ("LIKE_ESC", format!("%{}%", escape_like(value))),
"prefix" => ("LIKE_ESC", format!("{}%", escape_like(value))),
"suffix" => ("LIKE_ESC", format!("%{}", escape_like(value))),
other => {
return Err((
StatusCode::BAD_REQUEST,
format!("unknown filter operator {other:?}"),
));
}
};
sql.push_str(sep);
if comparator == "LIKE_ESC" {
sql.push_str(&format!("\"{col}\" LIKE ? ESCAPE '\\'"));
} else {
sql.push_str(&format!("\"{col}\" {comparator} ?"));
}
binds.push(bound_value);
sep = " AND ";
}
let limit: u32 = filters
.get("limit")
.and_then(|v| v.parse().ok())
.unwrap_or(1000)
.min(5000);
let offset: u32 = filters
.get("offset")
.and_then(|v| v.parse().ok())
.unwrap_or(0);
sql.push_str(&format!(
" ORDER BY pc_id, collected_at DESC LIMIT {limit} OFFSET {offset}"
));
let mut q = sqlx::query(&sql);
for b in &binds {
q = q.bind(b);
}
let rows = q.fetch_all(&state.pool).await.map_err(|e| {
warn!(error = %e, manifest_id, field, "explode search query");
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
})?;
let mut out: Vec<serde_json::Map<String, serde_json::Value>> = Vec::with_capacity(rows.len());
for r in rows {
let mut map = serde_json::Map::new();
if let Ok(pc_id) = r.try_get::<String, _>("pc_id") {
map.insert("pc_id".into(), serde_json::Value::String(pc_id));
}
if let Ok(Some(t)) = r.try_get::<Option<DateTime<Utc>>, _>("collected_at") {
map.insert(
"collected_at".into(),
serde_json::Value::String(t.to_rfc3339()),
);
}
for col in &spec.columns {
let v: serde_json::Value = match col.kind.as_deref() {
Some("integer") => r
.try_get::<Option<i64>, _>(col.field.as_str())
.ok()
.flatten()
.map(|i| serde_json::Value::Number(i.into()))
.unwrap_or(serde_json::Value::Null),
Some("real") => r
.try_get::<Option<f64>, _>(col.field.as_str())
.ok()
.flatten()
.and_then(serde_json::Number::from_f64)
.map(serde_json::Value::Number)
.unwrap_or(serde_json::Value::Null),
_ => r
.try_get::<Option<String>, _>(col.field.as_str())
.ok()
.flatten()
.map(serde_json::Value::String)
.unwrap_or(serde_json::Value::Null),
};
map.insert(col.field.clone(), v);
}
out.push(map);
}
Ok(Json(out))
}
async fn load_explode_spec(
state: &AppState,
manifest_id: &str,
field: &str,
) -> Result<ExplodeSpec, (StatusCode, String)> {
if let Some(hit) = state.explode_spec_cache.get(manifest_id, field).await {
return Ok(hit);
}
let kv = state
.jetstream
.get_key_value(BUCKET_JOBS)
.await
.map_err(|e| (StatusCode::SERVICE_UNAVAILABLE, format!("jobs KV: {e}")))?;
let entry = kv
.get(manifest_id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.ok_or((
StatusCode::NOT_FOUND,
format!("manifest {manifest_id:?} not registered"),
))?;
let manifest: Manifest = serde_json::from_slice(&entry).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("parse manifest: {e}"),
)
})?;
let hint = manifest.inventory.ok_or((
StatusCode::NOT_FOUND,
format!("manifest {manifest_id:?} has no inventory hint"),
))?;
let specs = hint.explode.ok_or((
StatusCode::NOT_FOUND,
format!("manifest {manifest_id:?} has no explode specs"),
))?;
state
.explode_spec_cache
.insert(manifest_id.to_string(), specs.clone())
.await;
specs.into_iter().find(|s| s.field == field).ok_or((
StatusCode::NOT_FOUND,
format!("manifest {manifest_id:?} has no explode field {field:?}"),
))
}
#[derive(Serialize)]
pub struct HistoryEventRow {
pub id: i64,
pub pc_id: String,
pub job_id: String,
pub field_path: String,
pub identity_json: Option<String>,
pub change_kind: String,
pub before_json: Option<String>,
pub after_json: Option<String>,
pub observed_at: Option<DateTime<Utc>>,
}
#[derive(serde::Deserialize)]
pub struct HistoryParams {
pub field: Option<String>,
pub since: Option<DateTime<Utc>>,
pub limit: Option<u32>,
}
pub async fn history_for_pc(
State(state): State<AppState>,
Path((manifest_id, pc_id)): Path<(String, String)>,
Query(params): Query<HistoryParams>,
) -> Result<Json<Vec<HistoryEventRow>>, (StatusCode, String)> {
let limit = params.limit.unwrap_or(500).min(5000);
let mut qb = sqlx::QueryBuilder::<sqlx::Sqlite>::new(
"SELECT id, pc_id, job_id, field_path, identity_json, \
change_kind, before_json, after_json, observed_at \
FROM inventory_history \
WHERE job_id = ",
);
qb.push_bind(manifest_id);
qb.push(" AND pc_id = ");
qb.push_bind(pc_id);
if let Some(f) = params.field.filter(|s| !s.is_empty()) {
qb.push(" AND field_path = ");
qb.push_bind(f);
}
if let Some(t) = params.since {
qb.push(" AND observed_at >= ");
qb.push_bind(t);
}
qb.push(" ORDER BY observed_at DESC LIMIT ");
qb.push_bind(limit as i64);
let rows = qb.build().fetch_all(&state.pool).await.map_err(|e| {
warn!(error = %e, "inventory_history per-pc query");
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
})?;
let out: Result<Vec<HistoryEventRow>, _> = rows
.into_iter()
.map(|r| {
Ok::<_, sqlx::Error>(HistoryEventRow {
id: r.try_get("id")?,
pc_id: r.try_get("pc_id")?,
job_id: r.try_get("job_id")?,
field_path: r.try_get("field_path")?,
identity_json: r.try_get("identity_json").ok(),
change_kind: r.try_get("change_kind")?,
before_json: r.try_get("before_json").ok(),
after_json: r.try_get("after_json").ok(),
observed_at: r.try_get("observed_at").ok(),
})
})
.collect();
let out = out.map_err(|e| {
warn!(error = %e, "inventory_history row decode");
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("decode history row: {e}"),
)
})?;
Ok(Json(out))
}
fn parse_identity_filters(
params: &HashMap<String, String>,
) -> Result<Vec<(String, String)>, (StatusCode, String)> {
let mut out = Vec::new();
for (k, v) in params {
if let Some(field) = k.strip_prefix("identity.") {
validate_ident(field)
.map_err(|e| (StatusCode::BAD_REQUEST, format!("identity.{field}: {e}")))?;
out.push((field.to_string(), v.clone()));
}
}
out.sort_by(|a, b| a.0.cmp(&b.0));
Ok(out)
}
pub async fn fleet_history_search(
State(state): State<AppState>,
Path(manifest_id): Path<String>,
Query(params): Query<HashMap<String, String>>,
) -> Result<Json<Vec<HistoryEventRow>>, (StatusCode, String)> {
let limit = params
.get("limit")
.and_then(|s| s.parse::<u32>().ok())
.unwrap_or(500)
.min(5000);
let offset = params
.get("offset")
.and_then(|s| s.parse::<u32>().ok())
.unwrap_or(0);
let kind = params.get("kind").filter(|s| !s.is_empty());
if let Some(k) = kind
&& !matches!(k.as_str(), "added" | "removed" | "changed")
{
return Err((
StatusCode::BAD_REQUEST,
format!("kind must be one of added / removed / changed (got {k:?})"),
));
}
let since: Option<DateTime<Utc>> = params
.get("since")
.filter(|s| !s.is_empty())
.map(|s| {
s.parse::<DateTime<Utc>>()
.map_err(|e| (StatusCode::BAD_REQUEST, format!("since: {e}")))
})
.transpose()?;
let until: Option<DateTime<Utc>> = params
.get("until")
.filter(|s| !s.is_empty())
.map(|s| {
s.parse::<DateTime<Utc>>()
.map_err(|e| (StatusCode::BAD_REQUEST, format!("until: {e}")))
})
.transpose()?;
let field = params.get("field").filter(|s| !s.is_empty());
let identity_filters = parse_identity_filters(¶ms)?;
let mut qb = sqlx::QueryBuilder::<sqlx::Sqlite>::new(
"SELECT id, pc_id, job_id, field_path, identity_json, \
change_kind, before_json, after_json, observed_at \
FROM inventory_history \
WHERE job_id = ",
);
qb.push_bind(&manifest_id);
if let Some(f) = field {
qb.push(" AND field_path = ");
qb.push_bind(f);
}
if let Some(k) = kind {
qb.push(" AND change_kind = ");
qb.push_bind(k);
}
if let Some(t) = since {
qb.push(" AND observed_at >= ");
qb.push_bind(t);
}
if let Some(t) = until {
qb.push(" AND observed_at < ");
qb.push_bind(t);
}
for (key, value) in &identity_filters {
qb.push(format!(" AND json_extract(identity_json, '$.{key}') = "));
qb.push_bind(value);
}
qb.push(" ORDER BY observed_at DESC LIMIT ");
qb.push_bind(limit as i64);
qb.push(" OFFSET ");
qb.push_bind(offset as i64);
let rows = qb.build().fetch_all(&state.pool).await.map_err(|e| {
warn!(error = %e, "inventory_history fleet query");
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
})?;
let out: Result<Vec<HistoryEventRow>, _> = rows
.into_iter()
.map(|r| {
Ok::<_, sqlx::Error>(HistoryEventRow {
id: r.try_get("id")?,
pc_id: r.try_get("pc_id")?,
job_id: r.try_get("job_id")?,
field_path: r.try_get("field_path")?,
identity_json: r.try_get("identity_json").ok(),
change_kind: r.try_get("change_kind")?,
before_json: r.try_get("before_json").ok(),
after_json: r.try_get("after_json").ok(),
observed_at: r.try_get("observed_at").ok(),
})
})
.collect();
let out = out.map_err(|e| {
warn!(error = %e, "fleet history row decode");
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("decode history row: {e}"),
)
})?;
Ok(Json(out))
}
pub async fn first_seen(
State(state): State<AppState>,
Path(manifest_id): Path<String>,
Query(params): Query<HashMap<String, String>>,
) -> Result<Json<Vec<FirstSeenRow>>, (StatusCode, String)> {
let limit = params
.get("limit")
.and_then(|s| s.parse::<u32>().ok())
.unwrap_or(5000)
.min(5000);
let offset = params
.get("offset")
.and_then(|s| s.parse::<u32>().ok())
.unwrap_or(0);
let field = params.get("field").filter(|s| !s.is_empty());
let since: Option<DateTime<Utc>> = params
.get("since")
.filter(|s| !s.is_empty())
.map(|s| {
s.parse::<DateTime<Utc>>()
.map_err(|e| (StatusCode::BAD_REQUEST, format!("since: {e}")))
})
.transpose()?;
let identity_filters = parse_identity_filters(¶ms)?;
let mut qb = sqlx::QueryBuilder::<sqlx::Sqlite>::new(
"SELECT pc_id, MIN(observed_at) AS first_seen_at \
FROM inventory_history \
WHERE job_id = ",
);
qb.push_bind(&manifest_id);
if let Some(f) = field {
qb.push(" AND field_path = ");
qb.push_bind(f);
}
if let Some(t) = since {
qb.push(" AND observed_at >= ");
qb.push_bind(t);
}
for (key, value) in &identity_filters {
qb.push(format!(" AND json_extract(identity_json, '$.{key}') = "));
qb.push_bind(value);
}
qb.push(" GROUP BY pc_id ORDER BY first_seen_at ASC LIMIT ");
qb.push_bind(limit as i64);
qb.push(" OFFSET ");
qb.push_bind(offset as i64);
let rows = qb.build().fetch_all(&state.pool).await.map_err(|e| {
warn!(error = %e, "inventory_history first_seen query");
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
})?;
let out: Result<Vec<FirstSeenRow>, _> = rows
.into_iter()
.map(|r| {
Ok::<_, sqlx::Error>(FirstSeenRow {
pc_id: r.try_get("pc_id")?,
first_seen_at: r.try_get("first_seen_at").ok(),
})
})
.collect();
let out = out.map_err(|e| {
warn!(error = %e, "first_seen row decode");
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("decode first_seen row: {e}"),
)
})?;
Ok(Json(out))
}
#[derive(Serialize)]
pub struct FirstSeenRow {
pub pc_id: String,
pub first_seen_at: Option<DateTime<Utc>>,
}
fn row_to_fact(r: sqlx::sqlite::SqliteRow) -> InventoryFact {
let facts: serde_json::Value = r
.try_get::<String, _>("facts_json")
.ok()
.and_then(|s| serde_json::from_str(&s).ok())
.unwrap_or(serde_json::Value::Null);
let display: Vec<DisplayField> = r
.try_get::<Option<String>, _>("display_json")
.ok()
.flatten()
.and_then(|s| serde_json::from_str(&s).ok())
.unwrap_or_default();
let summary: Option<Vec<DisplayField>> = r
.try_get::<Option<String>, _>("summary_json")
.ok()
.flatten()
.and_then(|s| serde_json::from_str(&s).ok());
InventoryFact {
job_id: r.try_get("job_id").unwrap_or_default(),
facts,
display,
summary,
collected_at: r.try_get("collected_at").ok(),
recorded_at: r.try_get("recorded_at").ok(),
}
}
#[cfg(test)]
mod tests {
use super::*;
fn params(pairs: &[(&str, &str)]) -> HashMap<String, String> {
pairs
.iter()
.map(|(k, v)| ((*k).to_string(), (*v).to_string()))
.collect()
}
#[test]
fn parse_identity_filters_empty_when_no_identity_prefix() {
let got = parse_identity_filters(¶ms(&[
("field", "apps"),
("kind", "added"),
("since", "2026-04-01"),
]))
.unwrap();
assert!(got.is_empty());
}
#[test]
fn parse_identity_filters_extracts_pairs() {
let got = parse_identity_filters(¶ms(&[
("identity.name", "Chrome"),
("identity.source", "appx"),
("field", "apps"),
]))
.unwrap();
assert_eq!(
got,
vec![
("name".to_string(), "Chrome".to_string()),
("source".to_string(), "appx".to_string()),
]
);
}
#[test]
fn parse_identity_filters_rejects_injection_attempts() {
let err = parse_identity_filters(¶ms(&[("identity.name';--", "x")])).unwrap_err();
assert_eq!(err.0, StatusCode::BAD_REQUEST);
}
#[test]
fn parse_identity_filters_rejects_empty_field_name() {
let err = parse_identity_filters(¶ms(&[("identity.", "x")])).unwrap_err();
assert_eq!(err.0, StatusCode::BAD_REQUEST);
}
}