use std::collections::HashMap;
use axum::Json;
use axum::extract::{Path, Query, State};
use axum::http::StatusCode;
use chrono::{DateTime, Utc};
use futures::StreamExt;
use kanade_shared::kv::BUCKET_JOBS;
use kanade_shared::manifest::{DisplayField, ExplodeSpec, InventoryHint, Manifest};
use serde::Serialize;
use sqlx::{AssertSqlSafe, Row};
use tracing::warn;
use crate::projector::explode::validate_ident;
use super::AppState;
#[derive(Serialize)]
pub struct InventoryFact {
pub job_id: String,
pub facts: serde_json::Value,
pub display: Vec<DisplayField>,
pub summary: Option<Vec<DisplayField>>,
pub collected_at: Option<DateTime<Utc>>,
pub recorded_at: Option<DateTime<Utc>>,
}
pub async fn list_for_pc(
State(state): State<AppState>,
Path(pc_id): Path<String>,
) -> Result<Json<Vec<InventoryFact>>, (StatusCode, String)> {
let rows = sqlx::query(
"SELECT job_id, facts_json, display_json, summary_json,
collected_at, recorded_at
FROM inventory_facts
WHERE pc_id = ?
ORDER BY job_id",
)
.bind(&pc_id)
.fetch_all(&state.pool)
.await
.map_err(|e| {
warn!(error = %e, %pc_id, "inventory_facts query");
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
})?;
let facts: Vec<InventoryFact> = rows.into_iter().map(row_to_fact).collect();
Ok(Json(facts))
}
#[derive(Serialize)]
pub struct InventoryRow {
pub pc_id: String,
pub facts: serde_json::Value,
pub collected_at: Option<DateTime<Utc>>,
pub last_logon_user: Option<String>,
pub last_logon_display_name: Option<String>,
}
#[derive(Serialize)]
pub struct InventoryByJob {
pub manifest_id: String,
pub display: Vec<DisplayField>,
pub summary: Option<Vec<DisplayField>>,
pub rows: Vec<InventoryRow>,
pub total: i64,
pub limit: i64,
pub offset: i64,
}
#[derive(serde::Deserialize)]
pub struct ByJobParams {
pub limit: Option<i64>,
pub offset: Option<i64>,
pub q: Option<String>,
}
const BY_JOB_DEFAULT_LIMIT: i64 = 100;
const BY_JOB_MAX_LIMIT: i64 = 1000;
pub async fn list_for_job(
State(state): State<AppState>,
Path(manifest_id): Path<String>,
Query(params): Query<ByJobParams>,
) -> Result<Json<InventoryByJob>, (StatusCode, String)> {
let limit = params
.limit
.unwrap_or(BY_JOB_DEFAULT_LIMIT)
.clamp(1, BY_JOB_MAX_LIMIT);
let offset = params.offset.unwrap_or(0).max(0);
let like = params.q.as_deref().filter(|q| !q.is_empty()).map(|q| {
format!(
"%{}%",
q.replace('\\', "\\\\")
.replace('%', "\\%")
.replace('_', "\\_")
)
});
let total: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM inventory_facts
WHERE job_id = ?
AND (?2 IS NULL OR pc_id LIKE ?2 ESCAPE '\\')",
)
.bind(&manifest_id)
.bind(&like)
.fetch_one(&state.pool)
.await
.map_err(|e| {
warn!(error = %e, %manifest_id, "inventory_facts by-job count");
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
})?;
let rows = sqlx::query(
"SELECT f.pc_id, f.facts_json, f.display_json, f.summary_json, f.collected_at,
a.last_logon_user, a.last_logon_display_name
FROM inventory_facts f
LEFT JOIN agents a ON a.pc_id = f.pc_id
WHERE f.job_id = ?
AND (?2 IS NULL OR f.pc_id LIKE ?2 ESCAPE '\\')
ORDER BY f.pc_id
LIMIT ?3 OFFSET ?4",
)
.bind(&manifest_id)
.bind(&like)
.bind(limit)
.bind(offset)
.fetch_all(&state.pool)
.await
.map_err(|e| {
warn!(error = %e, %manifest_id, "inventory_facts by-job query");
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
})?;
let mut display = rows
.iter()
.find_map(|r| {
r.try_get::<Option<String>, _>("display_json")
.ok()
.flatten()
.and_then(|s| serde_json::from_str::<Vec<DisplayField>>(&s).ok())
})
.unwrap_or_default();
let mut summary = rows.iter().find_map(|r| {
r.try_get::<Option<String>, _>("summary_json")
.ok()
.flatten()
.and_then(|s| serde_json::from_str::<Vec<DisplayField>>(&s).ok())
});
if display.is_empty() {
match sqlx::query(
"SELECT display_json, summary_json FROM inventory_facts
WHERE job_id = ? LIMIT 1",
)
.bind(&manifest_id)
.fetch_optional(&state.pool)
.await
{
Ok(Some(r)) => {
display = r
.try_get::<Option<String>, _>("display_json")
.ok()
.flatten()
.and_then(|s| serde_json::from_str::<Vec<DisplayField>>(&s).ok())
.unwrap_or_default();
summary = r
.try_get::<Option<String>, _>("summary_json")
.ok()
.flatten()
.and_then(|s| serde_json::from_str::<Vec<DisplayField>>(&s).ok());
}
Ok(None) => {} Err(e) => {
warn!(error = %e, %manifest_id, "inventory_facts fallback config probe failed");
}
}
}
let inv_rows: Vec<InventoryRow> = rows
.into_iter()
.map(|r| InventoryRow {
pc_id: r.try_get("pc_id").unwrap_or_default(),
facts: r
.try_get::<String, _>("facts_json")
.ok()
.and_then(|s| serde_json::from_str(&s).ok())
.unwrap_or(serde_json::Value::Null),
collected_at: r.try_get("collected_at").ok(),
last_logon_user: r
.try_get::<Option<String>, _>("last_logon_user")
.ok()
.flatten(),
last_logon_display_name: r
.try_get::<Option<String>, _>("last_logon_display_name")
.ok()
.flatten(),
})
.collect();
Ok(Json(InventoryByJob {
manifest_id,
display,
summary,
rows: inv_rows,
total,
limit,
offset,
}))
}
#[derive(Serialize)]
pub struct InventoryJob {
pub manifest_id: String,
pub description: Option<String>,
pub display: Vec<DisplayField>,
pub summary: Option<Vec<DisplayField>>,
pub explode: Option<Vec<ExplodeSpec>>,
}
pub async fn list_jobs(
State(state): State<AppState>,
) -> Result<Json<Vec<InventoryJob>>, (StatusCode, String)> {
let kv = state
.jetstream
.get_key_value(BUCKET_JOBS)
.await
.map_err(|e| {
(
StatusCode::SERVICE_UNAVAILABLE,
format!("get KV {BUCKET_JOBS}: {e}"),
)
})?;
let mut out = Vec::new();
let mut keys = match kv.keys().await {
Ok(k) => k,
Err(_) => return Ok(Json(out)),
};
while let Some(key) = keys.next().await {
let key = match key {
Ok(k) => k,
Err(_) => continue,
};
let entry = match kv.get(&key).await.unwrap_or(None) {
Some(b) => b,
None => continue,
};
let job: Manifest = match serde_json::from_slice(&entry) {
Ok(j) => j,
Err(_) => continue,
};
if let Some(hint) = job.inventory {
out.push(InventoryJob {
manifest_id: job.id,
description: job.description,
display: hint.display,
summary: hint.summary,
explode: hint.explode,
});
}
}
out.sort_by(|a, b| a.manifest_id.cmp(&b.manifest_id));
Ok(Json(out))
}
pub async fn search(
State(state): State<AppState>,
Path((manifest_id, field)): Path<(String, String)>,
Query(filters): Query<HashMap<String, String>>,
) -> Result<Json<Vec<serde_json::Map<String, serde_json::Value>>>, (StatusCode, String)> {
let spec = load_explode_spec(&state, &manifest_id, &field).await?;
crate::projector::explode::ensure_table_cached(&state.pool, &spec)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("ensure derived table: {e}"),
)
})?;
validate_ident(&spec.table)
.map_err(|e| (StatusCode::BAD_REQUEST, format!("table name: {e}")))?;
for col in &spec.columns {
validate_ident(&col.field)
.map_err(|e| (StatusCode::BAD_REQUEST, format!("column name: {e}")))?;
}
let column_csv = spec
.columns
.iter()
.map(|c| format!("\"{}\"", c.field))
.collect::<Vec<_>>()
.join(", ");
let mut sql = format!(
"SELECT pc_id, collected_at, {column_csv} FROM \"{}\"",
spec.table
);
let mut binds: Vec<String> = Vec::new();
let mut sep = " WHERE ";
for (raw_key, value) in &filters {
if raw_key == "limit" || raw_key == "offset" {
continue;
}
let (col, op) = match raw_key.split_once("__") {
Some((c, o)) => (c.to_string(), o),
None => (raw_key.clone(), "eq"),
};
if !spec.columns.iter().any(|c| c.field == col) {
return Err((
StatusCode::BAD_REQUEST,
format!("unknown column for filter: {col:?}"),
));
}
validate_ident(&col).map_err(|e| (StatusCode::BAD_REQUEST, format!("column: {e}")))?;
let escape_like = |s: &str| -> String {
s.replace('\\', "\\\\")
.replace('%', "\\%")
.replace('_', "\\_")
};
let (comparator, bound_value) = match op {
"eq" => ("=", value.clone()),
"ne" => ("<>", value.clone()),
"lt" => ("<", value.clone()),
"le" => ("<=", value.clone()),
"gt" => (">", value.clone()),
"ge" => (">=", value.clone()),
"contains" => ("LIKE_ESC", format!("%{}%", escape_like(value))),
"prefix" => ("LIKE_ESC", format!("{}%", escape_like(value))),
"suffix" => ("LIKE_ESC", format!("%{}", escape_like(value))),
other => {
return Err((
StatusCode::BAD_REQUEST,
format!("unknown filter operator {other:?}"),
));
}
};
sql.push_str(sep);
if comparator == "LIKE_ESC" {
sql.push_str(&format!("\"{col}\" LIKE ? ESCAPE '\\'"));
} else {
sql.push_str(&format!("\"{col}\" {comparator} ?"));
}
binds.push(bound_value);
sep = " AND ";
}
let limit: u32 = filters
.get("limit")
.and_then(|v| v.parse().ok())
.unwrap_or(1000)
.min(5000);
let offset: u32 = filters
.get("offset")
.and_then(|v| v.parse().ok())
.unwrap_or(0);
sql.push_str(&format!(
" ORDER BY pc_id, collected_at DESC LIMIT {limit} OFFSET {offset}"
));
let mut q = sqlx::query(AssertSqlSafe(sql));
for b in &binds {
q = q.bind(b);
}
let rows = q.fetch_all(&state.pool).await.map_err(|e| {
warn!(error = %e, manifest_id, field, "explode search query");
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
})?;
let mut out: Vec<serde_json::Map<String, serde_json::Value>> = Vec::with_capacity(rows.len());
for r in rows {
let mut map = serde_json::Map::new();
if let Ok(pc_id) = r.try_get::<String, _>("pc_id") {
map.insert("pc_id".into(), serde_json::Value::String(pc_id));
}
if let Ok(Some(t)) = r.try_get::<Option<DateTime<Utc>>, _>("collected_at") {
map.insert(
"collected_at".into(),
serde_json::Value::String(t.to_rfc3339()),
);
}
for col in &spec.columns {
let v: serde_json::Value = match col.kind.as_deref() {
Some("integer") => r
.try_get::<Option<i64>, _>(col.field.as_str())
.ok()
.flatten()
.map(|i| serde_json::Value::Number(i.into()))
.unwrap_or(serde_json::Value::Null),
Some("real") => r
.try_get::<Option<f64>, _>(col.field.as_str())
.ok()
.flatten()
.and_then(serde_json::Number::from_f64)
.map(serde_json::Value::Number)
.unwrap_or(serde_json::Value::Null),
_ => r
.try_get::<Option<String>, _>(col.field.as_str())
.ok()
.flatten()
.map(serde_json::Value::String)
.unwrap_or(serde_json::Value::Null),
};
map.insert(col.field.clone(), v);
}
out.push(map);
}
enrich_with_account(&state.pool, &mut out).await;
Ok(Json(out))
}
const ACCOUNT_USER_KEY: &str = "@account_user";
const ACCOUNT_DISPLAY_NAME_KEY: &str = "@account_display_name";
async fn enrich_with_account(
pool: &sqlx::SqlitePool,
rows: &mut [serde_json::Map<String, serde_json::Value>],
) {
use std::collections::{BTreeSet, HashMap};
let pc_ids: Vec<String> = rows
.iter()
.filter_map(|r| match r.get("pc_id") {
Some(serde_json::Value::String(p)) => Some(p.clone()),
_ => None,
})
.collect::<BTreeSet<_>>()
.into_iter()
.collect();
let mut by_pc: HashMap<String, (Option<String>, Option<String>)> = HashMap::new();
if pc_ids.is_empty() {
apply_account(rows, &by_pc);
return;
}
for chunk in pc_ids.chunks(999) {
let mut qb = sqlx::QueryBuilder::<sqlx::Sqlite>::new(
"SELECT pc_id, last_logon_user, last_logon_display_name \
FROM agents WHERE pc_id IN (",
);
let mut sep = qb.separated(", ");
for id in chunk {
sep.push_bind(id);
}
qb.push(")");
let account_rows = match qb.build().fetch_all(pool).await {
Ok(rs) => rs,
Err(e) => {
warn!(error = %e, "inventory account enrichment query");
return;
}
};
for r in account_rows {
let Ok(pc) = r.try_get::<String, _>("pc_id") else {
continue;
};
let user = r
.try_get::<Option<String>, _>("last_logon_user")
.ok()
.flatten();
let name = r
.try_get::<Option<String>, _>("last_logon_display_name")
.ok()
.flatten();
by_pc.insert(pc, (user, name));
}
}
apply_account(rows, &by_pc);
}
fn apply_account(
rows: &mut [serde_json::Map<String, serde_json::Value>],
by_pc: &std::collections::HashMap<String, (Option<String>, Option<String>)>,
) {
let to_json = |s: Option<String>| {
s.map(serde_json::Value::String)
.unwrap_or(serde_json::Value::Null)
};
for r in rows.iter_mut() {
let (user, name) = match r.get("pc_id") {
Some(serde_json::Value::String(p)) => by_pc.get(p).cloned().unwrap_or((None, None)),
_ => (None, None),
};
r.insert(ACCOUNT_USER_KEY.into(), to_json(user));
r.insert(ACCOUNT_DISPLAY_NAME_KEY.into(), to_json(name));
}
}
struct ScalarColumn {
field: String,
numeric: bool,
}
fn scalar_columns(hint: &InventoryHint) -> Vec<ScalarColumn> {
hint.display
.iter()
.filter(|d| d.kind.as_deref() != Some("table"))
.map(|d| ScalarColumn {
field: d.field.clone(),
numeric: matches!(d.kind.as_deref(), Some("number") | Some("bytes")),
})
.collect()
}
pub async fn search_scalars(
State(state): State<AppState>,
Path(manifest_id): Path<String>,
Query(filters): Query<HashMap<String, String>>,
) -> Result<Json<Vec<serde_json::Map<String, serde_json::Value>>>, (StatusCode, String)> {
let hint = load_inventory_hint(&state, &manifest_id).await?;
let scalars = scalar_columns(&hint);
if scalars.is_empty() {
return Err((
StatusCode::NOT_FOUND,
format!("manifest {manifest_id:?} has no scalar display fields to search"),
));
}
for s in &scalars {
validate_ident(&s.field)
.map_err(|e| (StatusCode::BAD_REQUEST, format!("display field name: {e}")))?;
}
let projection = scalars
.iter()
.map(|s| format!("'{0}', json_extract(facts_json, '$.{0}')", s.field))
.collect::<Vec<_>>()
.join(", ");
let mut qb = sqlx::QueryBuilder::<sqlx::Sqlite>::new(format!(
"SELECT pc_id, collected_at, json_object({projection}) AS projected_json \
FROM inventory_facts WHERE job_id = "
));
qb.push_bind(&manifest_id);
let escape_like = |s: &str| -> String {
s.replace('\\', "\\\\")
.replace('%', "\\%")
.replace('_', "\\_")
};
for (raw_key, value) in &filters {
if raw_key == "limit" || raw_key == "offset" {
continue;
}
let (col, op) = match raw_key.split_once("__") {
Some((c, o)) => (c.to_string(), o),
None => (raw_key.clone(), "eq"),
};
let scalar = scalars.iter().find(|s| s.field == col).ok_or((
StatusCode::BAD_REQUEST,
format!("unknown column for filter: {col:?}"),
))?;
match op {
"eq" | "ne" | "lt" | "le" | "gt" | "ge" => {
let comparator = match op {
"eq" => "=",
"ne" => "<>",
"lt" => "<",
"le" => "<=",
"gt" => ">",
"ge" => ">=",
_ => unreachable!(),
};
if scalar.numeric {
let num: f64 = value.parse().map_err(|_| {
(
StatusCode::BAD_REQUEST,
format!(
"filter on numeric column {col:?} needs a number, got {value:?}"
),
)
})?;
qb.push(format!(
" AND CAST(json_extract(facts_json, '$.{col}') AS REAL) {comparator} "
));
qb.push_bind(num);
} else {
qb.push(format!(
" AND json_extract(facts_json, '$.{col}') {comparator} "
));
qb.push_bind(value.clone());
}
}
"contains" | "prefix" | "suffix" => {
let pattern = match op {
"contains" => format!("%{}%", escape_like(value)),
"prefix" => format!("{}%", escape_like(value)),
"suffix" => format!("%{}", escape_like(value)),
_ => unreachable!(),
};
qb.push(format!(" AND json_extract(facts_json, '$.{col}') LIKE "));
qb.push_bind(pattern);
qb.push(" ESCAPE '\\'");
}
other => {
return Err((
StatusCode::BAD_REQUEST,
format!("unknown filter operator {other:?}"),
));
}
}
}
let limit: i64 = filters
.get("limit")
.and_then(|v| v.parse().ok())
.unwrap_or(1000)
.clamp(1, 5000);
let offset: i64 = filters
.get("offset")
.and_then(|v| v.parse().ok())
.unwrap_or(0)
.max(0);
qb.push(" ORDER BY pc_id LIMIT ");
qb.push_bind(limit);
qb.push(" OFFSET ");
qb.push_bind(offset);
let rows = qb.build().fetch_all(&state.pool).await.map_err(|e| {
warn!(error = %e, manifest_id, "scalar inventory search query");
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
})?;
let mut out: Vec<serde_json::Map<String, serde_json::Value>> = Vec::with_capacity(rows.len());
for r in rows {
let mut map = serde_json::Map::new();
if let Ok(pc_id) = r.try_get::<String, _>("pc_id") {
map.insert("pc_id".into(), serde_json::Value::String(pc_id));
}
if let Ok(Some(t)) = r.try_get::<Option<DateTime<Utc>>, _>("collected_at") {
map.insert(
"collected_at".into(),
serde_json::Value::String(t.to_rfc3339()),
);
}
let projected: serde_json::Map<String, serde_json::Value> = r
.try_get::<String, _>("projected_json")
.ok()
.and_then(|s| serde_json::from_str(&s).ok())
.unwrap_or_default();
for s in &scalars {
let v = projected
.get(&s.field)
.cloned()
.unwrap_or(serde_json::Value::Null);
map.insert(s.field.clone(), v);
}
out.push(map);
}
enrich_with_account(&state.pool, &mut out).await;
Ok(Json(out))
}
async fn load_inventory_hint(
state: &AppState,
manifest_id: &str,
) -> Result<InventoryHint, (StatusCode, String)> {
if let Some(m) = state.explode_spec_cache.manifest(manifest_id).await {
return m.inventory.clone().ok_or((
StatusCode::NOT_FOUND,
format!("manifest {manifest_id:?} has no inventory hint"),
));
}
let kv = state
.jetstream
.get_key_value(BUCKET_JOBS)
.await
.map_err(|e| (StatusCode::SERVICE_UNAVAILABLE, format!("jobs KV: {e}")))?;
let entry = kv
.get(manifest_id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.ok_or((
StatusCode::NOT_FOUND,
format!("manifest {manifest_id:?} not registered"),
))?;
let manifest: Manifest = serde_json::from_slice(&entry).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("parse manifest: {e}"),
)
})?;
let hint = manifest.inventory.clone().ok_or((
StatusCode::NOT_FOUND,
format!("manifest {manifest_id:?} has no inventory hint"),
))?;
state.explode_spec_cache.insert_manifest(manifest).await;
Ok(hint)
}
async fn load_explode_spec(
state: &AppState,
manifest_id: &str,
field: &str,
) -> Result<ExplodeSpec, (StatusCode, String)> {
if let Some(hit) = state.explode_spec_cache.get(manifest_id, field).await {
return Ok(hit);
}
let kv = state
.jetstream
.get_key_value(BUCKET_JOBS)
.await
.map_err(|e| (StatusCode::SERVICE_UNAVAILABLE, format!("jobs KV: {e}")))?;
let entry = kv
.get(manifest_id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.ok_or((
StatusCode::NOT_FOUND,
format!("manifest {manifest_id:?} not registered"),
))?;
let manifest: Manifest = serde_json::from_slice(&entry).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("parse manifest: {e}"),
)
})?;
let specs = manifest
.inventory
.as_ref()
.ok_or((
StatusCode::NOT_FOUND,
format!("manifest {manifest_id:?} has no inventory hint"),
))?
.explode
.clone()
.ok_or((
StatusCode::NOT_FOUND,
format!("manifest {manifest_id:?} has no explode specs"),
))?;
state.explode_spec_cache.insert_manifest(manifest).await;
specs.into_iter().find(|s| s.field == field).ok_or((
StatusCode::NOT_FOUND,
format!("manifest {manifest_id:?} has no explode field {field:?}"),
))
}
#[derive(Serialize)]
pub struct HistoryEventRow {
pub id: i64,
pub pc_id: String,
pub job_id: String,
pub field_path: String,
pub identity_json: Option<String>,
pub change_kind: String,
pub before_json: Option<String>,
pub after_json: Option<String>,
pub observed_at: Option<DateTime<Utc>>,
}
#[derive(serde::Deserialize)]
pub struct HistoryParams {
pub field: Option<String>,
pub since: Option<DateTime<Utc>>,
pub limit: Option<u32>,
}
pub async fn history_for_pc(
State(state): State<AppState>,
Path((manifest_id, pc_id)): Path<(String, String)>,
Query(params): Query<HistoryParams>,
) -> Result<Json<Vec<HistoryEventRow>>, (StatusCode, String)> {
let limit = params.limit.unwrap_or(500).min(5000);
let mut qb = sqlx::QueryBuilder::<sqlx::Sqlite>::new(
"SELECT id, pc_id, job_id, field_path, identity_json, \
change_kind, before_json, after_json, observed_at \
FROM inventory_history \
WHERE job_id = ",
);
qb.push_bind(manifest_id);
qb.push(" AND pc_id = ");
qb.push_bind(pc_id);
if let Some(f) = params.field.filter(|s| !s.is_empty()) {
qb.push(" AND field_path = ");
qb.push_bind(f);
}
if let Some(t) = params.since {
qb.push(" AND observed_at >= ");
qb.push_bind(t);
}
qb.push(" ORDER BY observed_at DESC LIMIT ");
qb.push_bind(limit as i64);
let rows = qb.build().fetch_all(&state.pool).await.map_err(|e| {
warn!(error = %e, "inventory_history per-pc query");
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
})?;
let out: Result<Vec<HistoryEventRow>, _> = rows
.into_iter()
.map(|r| {
Ok::<_, sqlx::Error>(HistoryEventRow {
id: r.try_get("id")?,
pc_id: r.try_get("pc_id")?,
job_id: r.try_get("job_id")?,
field_path: r.try_get("field_path")?,
identity_json: r.try_get("identity_json").ok(),
change_kind: r.try_get("change_kind")?,
before_json: r.try_get("before_json").ok(),
after_json: r.try_get("after_json").ok(),
observed_at: r.try_get("observed_at").ok(),
})
})
.collect();
let out = out.map_err(|e| {
warn!(error = %e, "inventory_history row decode");
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("decode history row: {e}"),
)
})?;
Ok(Json(out))
}
fn parse_identity_filters(
params: &HashMap<String, String>,
) -> Result<Vec<(String, String)>, (StatusCode, String)> {
let mut out = Vec::new();
for (k, v) in params {
if let Some(field) = k.strip_prefix("identity.") {
validate_ident(field)
.map_err(|e| (StatusCode::BAD_REQUEST, format!("identity.{field}: {e}")))?;
out.push((field.to_string(), v.clone()));
}
}
out.sort_by(|a, b| a.0.cmp(&b.0));
Ok(out)
}
pub async fn fleet_history_search(
State(state): State<AppState>,
Path(manifest_id): Path<String>,
Query(params): Query<HashMap<String, String>>,
) -> Result<Json<Vec<HistoryEventRow>>, (StatusCode, String)> {
let limit = params
.get("limit")
.and_then(|s| s.parse::<u32>().ok())
.unwrap_or(500)
.min(5000);
let offset = params
.get("offset")
.and_then(|s| s.parse::<u32>().ok())
.unwrap_or(0);
let kind = params.get("kind").filter(|s| !s.is_empty());
if let Some(k) = kind
&& !matches!(k.as_str(), "added" | "removed" | "changed")
{
return Err((
StatusCode::BAD_REQUEST,
format!("kind must be one of added / removed / changed (got {k:?})"),
));
}
let since: Option<DateTime<Utc>> = params
.get("since")
.filter(|s| !s.is_empty())
.map(|s| {
s.parse::<DateTime<Utc>>()
.map_err(|e| (StatusCode::BAD_REQUEST, format!("since: {e}")))
})
.transpose()?;
let until: Option<DateTime<Utc>> = params
.get("until")
.filter(|s| !s.is_empty())
.map(|s| {
s.parse::<DateTime<Utc>>()
.map_err(|e| (StatusCode::BAD_REQUEST, format!("until: {e}")))
})
.transpose()?;
let field = params.get("field").filter(|s| !s.is_empty());
let identity_filters = parse_identity_filters(¶ms)?;
let mut qb = sqlx::QueryBuilder::<sqlx::Sqlite>::new(
"SELECT id, pc_id, job_id, field_path, identity_json, \
change_kind, before_json, after_json, observed_at \
FROM inventory_history \
WHERE job_id = ",
);
qb.push_bind(&manifest_id);
if let Some(f) = field {
qb.push(" AND field_path = ");
qb.push_bind(f);
}
if let Some(k) = kind {
qb.push(" AND change_kind = ");
qb.push_bind(k);
}
if let Some(t) = since {
qb.push(" AND observed_at >= ");
qb.push_bind(t);
}
if let Some(t) = until {
qb.push(" AND observed_at < ");
qb.push_bind(t);
}
for (key, value) in &identity_filters {
qb.push(format!(" AND json_extract(identity_json, '$.{key}') = "));
qb.push_bind(value);
}
qb.push(" ORDER BY observed_at DESC LIMIT ");
qb.push_bind(limit as i64);
qb.push(" OFFSET ");
qb.push_bind(offset as i64);
let rows = qb.build().fetch_all(&state.pool).await.map_err(|e| {
warn!(error = %e, "inventory_history fleet query");
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
})?;
let out: Result<Vec<HistoryEventRow>, _> = rows
.into_iter()
.map(|r| {
Ok::<_, sqlx::Error>(HistoryEventRow {
id: r.try_get("id")?,
pc_id: r.try_get("pc_id")?,
job_id: r.try_get("job_id")?,
field_path: r.try_get("field_path")?,
identity_json: r.try_get("identity_json").ok(),
change_kind: r.try_get("change_kind")?,
before_json: r.try_get("before_json").ok(),
after_json: r.try_get("after_json").ok(),
observed_at: r.try_get("observed_at").ok(),
})
})
.collect();
let out = out.map_err(|e| {
warn!(error = %e, "fleet history row decode");
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("decode history row: {e}"),
)
})?;
Ok(Json(out))
}
pub async fn first_seen(
State(state): State<AppState>,
Path(manifest_id): Path<String>,
Query(params): Query<HashMap<String, String>>,
) -> Result<Json<Vec<FirstSeenRow>>, (StatusCode, String)> {
let limit = params
.get("limit")
.and_then(|s| s.parse::<u32>().ok())
.unwrap_or(5000)
.min(5000);
let offset = params
.get("offset")
.and_then(|s| s.parse::<u32>().ok())
.unwrap_or(0);
let field = params.get("field").filter(|s| !s.is_empty());
let since: Option<DateTime<Utc>> = params
.get("since")
.filter(|s| !s.is_empty())
.map(|s| {
s.parse::<DateTime<Utc>>()
.map_err(|e| (StatusCode::BAD_REQUEST, format!("since: {e}")))
})
.transpose()?;
let identity_filters = parse_identity_filters(¶ms)?;
let mut qb = sqlx::QueryBuilder::<sqlx::Sqlite>::new(
"SELECT pc_id, MIN(observed_at) AS first_seen_at \
FROM inventory_history \
WHERE job_id = ",
);
qb.push_bind(&manifest_id);
if let Some(f) = field {
qb.push(" AND field_path = ");
qb.push_bind(f);
}
if let Some(t) = since {
qb.push(" AND observed_at >= ");
qb.push_bind(t);
}
for (key, value) in &identity_filters {
qb.push(format!(" AND json_extract(identity_json, '$.{key}') = "));
qb.push_bind(value);
}
qb.push(" GROUP BY pc_id ORDER BY first_seen_at ASC LIMIT ");
qb.push_bind(limit as i64);
qb.push(" OFFSET ");
qb.push_bind(offset as i64);
let rows = qb.build().fetch_all(&state.pool).await.map_err(|e| {
warn!(error = %e, "inventory_history first_seen query");
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
})?;
let out: Result<Vec<FirstSeenRow>, _> = rows
.into_iter()
.map(|r| {
Ok::<_, sqlx::Error>(FirstSeenRow {
pc_id: r.try_get("pc_id")?,
first_seen_at: r.try_get("first_seen_at").ok(),
})
})
.collect();
let out = out.map_err(|e| {
warn!(error = %e, "first_seen row decode");
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("decode first_seen row: {e}"),
)
})?;
Ok(Json(out))
}
#[derive(Serialize)]
pub struct FirstSeenRow {
pub pc_id: String,
pub first_seen_at: Option<DateTime<Utc>>,
}
fn row_to_fact(r: sqlx::sqlite::SqliteRow) -> InventoryFact {
let facts: serde_json::Value = r
.try_get::<String, _>("facts_json")
.ok()
.and_then(|s| serde_json::from_str(&s).ok())
.unwrap_or(serde_json::Value::Null);
let display: Vec<DisplayField> = r
.try_get::<Option<String>, _>("display_json")
.ok()
.flatten()
.and_then(|s| serde_json::from_str(&s).ok())
.unwrap_or_default();
let summary: Option<Vec<DisplayField>> = r
.try_get::<Option<String>, _>("summary_json")
.ok()
.flatten()
.and_then(|s| serde_json::from_str(&s).ok());
InventoryFact {
job_id: r.try_get("job_id").unwrap_or_default(),
facts,
display,
summary,
collected_at: r.try_get("collected_at").ok(),
recorded_at: r.try_get("recorded_at").ok(),
}
}
#[cfg(test)]
mod tests {
use super::*;
fn params(pairs: &[(&str, &str)]) -> HashMap<String, String> {
pairs
.iter()
.map(|(k, v)| ((*k).to_string(), (*v).to_string()))
.collect()
}
#[test]
fn parse_identity_filters_empty_when_no_identity_prefix() {
let got = parse_identity_filters(¶ms(&[
("field", "apps"),
("kind", "added"),
("since", "2026-04-01"),
]))
.unwrap();
assert!(got.is_empty());
}
#[test]
fn parse_identity_filters_extracts_pairs() {
let got = parse_identity_filters(¶ms(&[
("identity.name", "Chrome"),
("identity.source", "appx"),
("field", "apps"),
]))
.unwrap();
assert_eq!(
got,
vec![
("name".to_string(), "Chrome".to_string()),
("source".to_string(), "appx".to_string()),
]
);
}
#[test]
fn parse_identity_filters_rejects_injection_attempts() {
let err = parse_identity_filters(¶ms(&[("identity.name';--", "x")])).unwrap_err();
assert_eq!(err.0, StatusCode::BAD_REQUEST);
}
#[test]
fn parse_identity_filters_rejects_empty_field_name() {
let err = parse_identity_filters(¶ms(&[("identity.", "x")])).unwrap_err();
assert_eq!(err.0, StatusCode::BAD_REQUEST);
}
fn display(field: &str, kind: Option<&str>) -> DisplayField {
DisplayField {
field: field.to_string(),
label: field.to_string(),
kind: kind.map(str::to_string),
columns: None,
}
}
#[test]
fn scalar_columns_excludes_table_kind_and_marks_numeric() {
let hint = InventoryHint {
display: vec![
display("pc_model", None),
display("os_build", None),
display("ram_bytes", Some("bytes")),
display("cpu_count", Some("number")),
display("installed_at", Some("timestamp")),
display("apps", Some("table")),
],
summary: None,
explode: None,
history_scalars: None,
};
let cols = scalar_columns(&hint);
let names: Vec<&str> = cols.iter().map(|c| c.field.as_str()).collect();
assert_eq!(
names,
vec![
"pc_model",
"os_build",
"ram_bytes",
"cpu_count",
"installed_at"
],
"table-kind fields are dropped, order preserved"
);
let numeric: std::collections::HashMap<&str, bool> =
cols.iter().map(|c| (c.field.as_str(), c.numeric)).collect();
assert!(numeric["ram_bytes"]);
assert!(numeric["cpu_count"]);
assert!(!numeric["pc_model"]);
assert!(!numeric["os_build"]);
assert!(!numeric["installed_at"]);
}
fn row(pairs: &[(&str, serde_json::Value)]) -> serde_json::Map<String, serde_json::Value> {
pairs
.iter()
.map(|(k, v)| ((*k).to_string(), v.clone()))
.collect()
}
#[test]
fn apply_account_injects_namespaced_keys() {
use serde_json::Value;
let mut rows = vec![row(&[
("pc_id", Value::String("PC-1".into())),
("last_logon_user", Value::String("not-the-account".into())),
])];
let mut by_pc = std::collections::HashMap::new();
by_pc.insert(
"PC-1".to_string(),
(
Some("CONTOSO\\jdoe".to_string()),
Some("John Doe".to_string()),
),
);
apply_account(&mut rows, &by_pc);
let r = &rows[0];
assert_eq!(r[ACCOUNT_USER_KEY], Value::String("CONTOSO\\jdoe".into()));
assert_eq!(
r[ACCOUNT_DISPLAY_NAME_KEY],
Value::String("John Doe".into())
);
assert_eq!(
r["last_logon_user"],
Value::String("not-the-account".into())
);
}
#[test]
fn apply_account_nulls_when_pc_absent_or_account_empty() {
use serde_json::Value;
let mut rows = vec![
row(&[("pc_id", Value::String("PC-unknown".into()))]),
row(&[("pc_id", Value::String("PC-no-name".into()))]),
];
let mut by_pc = std::collections::HashMap::new();
by_pc.insert(
"PC-no-name".to_string(),
(Some("WG\\kiosk".to_string()), None),
);
apply_account(&mut rows, &by_pc);
assert_eq!(rows[0][ACCOUNT_USER_KEY], Value::Null);
assert_eq!(rows[0][ACCOUNT_DISPLAY_NAME_KEY], Value::Null);
assert_eq!(rows[1][ACCOUNT_USER_KEY], Value::String("WG\\kiosk".into()));
assert_eq!(rows[1][ACCOUNT_DISPLAY_NAME_KEY], Value::Null);
}
#[test]
fn scalar_columns_empty_when_all_fields_are_tables() {
let hint = InventoryHint {
display: vec![
display("apps", Some("table")),
display("disks", Some("table")),
],
summary: None,
explode: None,
history_scalars: None,
};
assert!(scalar_columns(&hint).is_empty());
}
}