use anyhow::Result;
use kanade_shared::manifest::ExplodeSpec;
use serde_json::Value as JsonValue;
use sqlx::{Sqlite, Transaction};
use std::collections::HashMap;
#[derive(Debug, Clone, PartialEq)]
pub struct HistoryEvent {
pub field_path: String,
pub change_kind: &'static str, pub identity_json: Option<String>,
pub before_json: Option<String>,
pub after_json: Option<String>,
}
pub async fn diff_explode_rows(
tx: &mut Transaction<'_, Sqlite>,
spec: &ExplodeSpec,
pc_id: &str,
job_id: &str,
arr: &[JsonValue],
) -> Result<Vec<HistoryEvent>> {
let select_cols: Vec<String> = spec
.columns
.iter()
.map(|c| format!("\"{}\"", c.field))
.collect();
let prior_sql = format!(
"SELECT {} FROM \"{}\" WHERE pc_id = ? AND job_id = ?",
select_cols.join(", "),
spec.table,
);
let prior_rows: Vec<sqlx::sqlite::SqliteRow> = sqlx::query(&prior_sql)
.bind(pc_id)
.bind(job_id)
.fetch_all(&mut **tx)
.await?;
let mut prior_by_key: HashMap<String, JsonValue> = HashMap::with_capacity(prior_rows.len());
for row in &prior_rows {
let obj = row_to_json(row, spec);
let key = identity_string(&obj, &spec.primary_key);
prior_by_key.insert(key, obj);
}
let mut events = Vec::new();
let mut seen_keys: std::collections::HashSet<String> =
std::collections::HashSet::with_capacity(arr.len());
for element in arr {
let key = identity_string(element, &spec.primary_key);
if !seen_keys.insert(key.clone()) {
continue;
}
let identity_json = Some(identity_json_for(element, &spec.primary_key));
match prior_by_key.get(&key) {
None => events.push(HistoryEvent {
field_path: spec.field.clone(),
change_kind: "added",
identity_json,
before_json: None,
after_json: Some(serde_json::to_string(element)?),
}),
Some(prior) if rows_differ(prior, element, spec) => events.push(HistoryEvent {
field_path: spec.field.clone(),
change_kind: "changed",
identity_json,
before_json: Some(serde_json::to_string(prior)?),
after_json: Some(serde_json::to_string(element)?),
}),
Some(_) => { }
}
}
for (key, prior) in &prior_by_key {
if seen_keys.contains(key) {
continue;
}
events.push(HistoryEvent {
field_path: spec.field.clone(),
change_kind: "removed",
identity_json: Some(identity_json_for(prior, &spec.primary_key)),
before_json: Some(serde_json::to_string(prior)?),
after_json: None,
});
}
Ok(events)
}
pub fn diff_scalars(
prior_facts_json: Option<&str>,
new_facts: &JsonValue,
scalars: &[String],
) -> Result<Vec<HistoryEvent>> {
let prior: Option<JsonValue> = prior_facts_json.map(serde_json::from_str).transpose()?;
let mut events = Vec::new();
for field in scalars {
let prior_val = prior.as_ref().and_then(|p| p.get(field));
let new_val = new_facts.get(field);
match (prior_val, new_val) {
(_, None) => {}
(None, Some(v)) => {
events.push(HistoryEvent {
field_path: field.clone(),
change_kind: "added",
identity_json: None,
before_json: None,
after_json: Some(serde_json::to_string(&serde_json::json!({ "value": v }))?),
});
}
(Some(p), Some(n)) if p != n => {
events.push(HistoryEvent {
field_path: field.clone(),
change_kind: "changed",
identity_json: None,
before_json: Some(serde_json::to_string(&serde_json::json!({ "value": p }))?),
after_json: Some(serde_json::to_string(&serde_json::json!({ "value": n }))?),
});
}
_ => {}
}
}
Ok(events)
}
pub async fn write_events(
tx: &mut Transaction<'_, Sqlite>,
pc_id: &str,
job_id: &str,
events: &[HistoryEvent],
) -> Result<()> {
if events.is_empty() {
return Ok(());
}
let mut qb = sqlx::QueryBuilder::<Sqlite>::new(
"INSERT INTO inventory_history (
pc_id, job_id, field_path, identity_json,
change_kind, before_json, after_json
) ",
);
qb.push_values(events, |mut b, ev| {
b.push_bind(pc_id)
.push_bind(job_id)
.push_bind(&ev.field_path)
.push_bind(&ev.identity_json)
.push_bind(ev.change_kind)
.push_bind(&ev.before_json)
.push_bind(&ev.after_json);
});
qb.build().execute(&mut **tx).await?;
Ok(())
}
fn identity_string(obj: &JsonValue, primary_key: &[String]) -> String {
let mut parts = Vec::with_capacity(primary_key.len());
for k in primary_key {
let v = obj.get(k).cloned().unwrap_or(JsonValue::Null);
parts.push(format!("{k}={v}"));
}
parts.join("|")
}
fn identity_json_for(obj: &JsonValue, primary_key: &[String]) -> String {
let mut map = serde_json::Map::new();
for k in primary_key {
let v = obj.get(k).cloned().unwrap_or(JsonValue::Null);
map.insert(k.clone(), v);
}
serde_json::Value::Object(map).to_string()
}
fn rows_differ(a: &JsonValue, b: &JsonValue, spec: &ExplodeSpec) -> bool {
for col in &spec.columns {
let av = a.get(&col.field);
let bv = b.get(&col.field);
if av != bv {
return true;
}
}
false
}
fn row_to_json(row: &sqlx::sqlite::SqliteRow, spec: &ExplodeSpec) -> JsonValue {
use sqlx::Row;
let mut map = serde_json::Map::new();
for col in &spec.columns {
let v: JsonValue = match col.kind.as_deref() {
Some("integer") => match row.try_get::<Option<i64>, _>(col.field.as_str()) {
Ok(Some(i)) => JsonValue::Number(i.into()),
Ok(None) => JsonValue::Null,
Err(e) => {
tracing::warn!(
error = %e,
column = %col.field,
kind = "integer",
"history diff: row decode failed (treating as NULL — diff may generate a false 'changed' event)",
);
JsonValue::Null
}
},
Some("real") => match row.try_get::<Option<f64>, _>(col.field.as_str()) {
Ok(Some(f)) => serde_json::Number::from_f64(f)
.map(JsonValue::Number)
.unwrap_or(JsonValue::Null),
Ok(None) => JsonValue::Null,
Err(e) => {
tracing::warn!(
error = %e,
column = %col.field,
kind = "real",
"history diff: row decode failed (treating as NULL)",
);
JsonValue::Null
}
},
_ => match row.try_get::<Option<String>, _>(col.field.as_str()) {
Ok(Some(s)) => JsonValue::String(s),
Ok(None) => JsonValue::Null,
Err(e) => {
tracing::warn!(
error = %e,
column = %col.field,
kind = "text",
"history diff: row decode failed (treating as NULL)",
);
JsonValue::Null
}
},
};
map.insert(col.field.clone(), v);
}
JsonValue::Object(map)
}
#[cfg(test)]
mod tests {
use super::*;
use kanade_shared::manifest::{ExplodeColumn, ExplodeSpec};
use sqlx::SqlitePool;
use sqlx::sqlite::SqlitePoolOptions;
fn sample_apps_spec() -> ExplodeSpec {
ExplodeSpec {
field: "apps".into(),
table: "inventory_sw_apps".into(),
primary_key: vec!["name".into(), "source".into()],
columns: vec![
ExplodeColumn {
field: "source".into(),
kind: Some("text".into()),
index: false,
},
ExplodeColumn {
field: "name".into(),
kind: None,
index: true,
},
ExplodeColumn {
field: "version".into(),
kind: None,
index: false,
},
],
track_history: true,
}
}
async fn fresh_pool_with_table() -> SqlitePool {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect("sqlite::memory:")
.await
.unwrap();
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
crate::projector::explode::ensure_table(&pool, &sample_apps_spec())
.await
.unwrap();
pool
}
async fn seed_row(pool: &SqlitePool, name: &str, source: &str, version: &str) {
sqlx::query(
"INSERT INTO inventory_sw_apps
(pc_id, job_id, source, name, version)
VALUES ('pc-1', 'inventory-sw', ?, ?, ?)",
)
.bind(source)
.bind(name)
.bind(version)
.execute(pool)
.await
.unwrap();
}
#[tokio::test]
async fn first_ever_scan_produces_added_events_for_every_element() {
let pool = fresh_pool_with_table().await;
let spec = sample_apps_spec();
let arr = vec![
serde_json::json!({"name": "Chrome", "source": "msi", "version": "120"}),
serde_json::json!({"name": "Firefox", "source": "wow6432", "version": "122"}),
];
let mut tx = pool.begin().await.unwrap();
let events = diff_explode_rows(&mut tx, &spec, "pc-1", "inventory-sw", &arr)
.await
.unwrap();
tx.commit().await.unwrap();
assert_eq!(events.len(), 2);
assert!(events.iter().all(|e| e.change_kind == "added"));
assert!(events.iter().all(|e| e.before_json.is_none()));
assert!(events.iter().all(|e| e.after_json.is_some()));
}
#[tokio::test]
async fn stable_scan_produces_no_events() {
let pool = fresh_pool_with_table().await;
seed_row(&pool, "Chrome", "msi", "120").await;
let spec = sample_apps_spec();
let arr = vec![serde_json::json!({"name":"Chrome","source":"msi","version":"120"})];
let mut tx = pool.begin().await.unwrap();
let events = diff_explode_rows(&mut tx, &spec, "pc-1", "inventory-sw", &arr)
.await
.unwrap();
tx.commit().await.unwrap();
assert!(
events.is_empty(),
"identical scans must produce zero events"
);
}
#[tokio::test]
async fn version_change_produces_changed_event_with_before_after() {
let pool = fresh_pool_with_table().await;
seed_row(&pool, "Chrome", "msi", "120").await;
let spec = sample_apps_spec();
let arr = vec![serde_json::json!({"name":"Chrome","source":"msi","version":"121"})];
let mut tx = pool.begin().await.unwrap();
let events = diff_explode_rows(&mut tx, &spec, "pc-1", "inventory-sw", &arr)
.await
.unwrap();
tx.commit().await.unwrap();
assert_eq!(events.len(), 1);
let ev = &events[0];
assert_eq!(ev.change_kind, "changed");
assert!(
ev.before_json
.as_ref()
.unwrap()
.contains("\"version\":\"120\"")
);
assert!(
ev.after_json
.as_ref()
.unwrap()
.contains("\"version\":\"121\"")
);
let identity = ev
.identity_json
.as_ref()
.expect("explode events carry identity");
assert!(identity.contains("\"name\":\"Chrome\""));
assert!(identity.contains("\"source\":\"msi\""));
assert_eq!(ev.field_path, "apps");
}
#[tokio::test]
async fn uninstall_produces_removed_event() {
let pool = fresh_pool_with_table().await;
seed_row(&pool, "Chrome", "msi", "120").await;
seed_row(&pool, "Firefox", "wow6432", "122").await;
let spec = sample_apps_spec();
let arr = vec![serde_json::json!({"name":"Firefox","source":"wow6432","version":"122"})];
let mut tx = pool.begin().await.unwrap();
let events = diff_explode_rows(&mut tx, &spec, "pc-1", "inventory-sw", &arr)
.await
.unwrap();
tx.commit().await.unwrap();
assert_eq!(events.len(), 1);
let ev = &events[0];
assert_eq!(ev.change_kind, "removed");
assert!(ev.after_json.is_none());
assert!(
ev.before_json
.as_ref()
.unwrap()
.contains("\"name\":\"Chrome\"")
);
}
#[tokio::test]
async fn mixed_diff_emits_all_three_kinds() {
let pool = fresh_pool_with_table().await;
seed_row(&pool, "Chrome", "msi", "120").await; seed_row(&pool, "Firefox", "wow6432", "122").await; let spec = sample_apps_spec();
let arr = vec![
serde_json::json!({"name":"Chrome","source":"msi","version":"121"}), serde_json::json!({"name":"Edge","source":"appx","version":"122"}), ];
let mut tx = pool.begin().await.unwrap();
let events = diff_explode_rows(&mut tx, &spec, "pc-1", "inventory-sw", &arr)
.await
.unwrap();
tx.commit().await.unwrap();
assert_eq!(events.len(), 3);
let kinds: std::collections::HashSet<_> = events.iter().map(|e| e.change_kind).collect();
assert!(kinds.contains("added"));
assert!(kinds.contains("removed"));
assert!(kinds.contains("changed"));
}
#[tokio::test]
async fn write_events_persists_to_inventory_history() {
let pool = fresh_pool_with_table().await;
let events = vec![HistoryEvent {
field_path: "apps".into(),
change_kind: "added",
identity_json: Some(r#"{"name":"Chrome"}"#.into()),
before_json: None,
after_json: Some(r#"{"name":"Chrome","version":"120"}"#.into()),
}];
let mut tx = pool.begin().await.unwrap();
write_events(&mut tx, "pc-1", "inventory-sw", &events)
.await
.unwrap();
tx.commit().await.unwrap();
let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM inventory_history")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(count.0, 1);
let row: (
String,
String,
String,
String,
Option<String>,
Option<String>,
) = sqlx::query_as(
"SELECT pc_id, job_id, field_path, change_kind, before_json, after_json \
FROM inventory_history",
)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(row.0, "pc-1");
assert_eq!(row.1, "inventory-sw");
assert_eq!(row.2, "apps");
assert_eq!(row.3, "added");
assert_eq!(row.4, None);
assert!(row.5.unwrap().contains("Chrome"));
}
#[test]
fn diff_scalars_first_scan_emits_added_per_field() {
let new_facts = serde_json::json!({
"ram_bytes": 17179869184_u64,
"os_version": "10.0.22631",
"cpu_model": "Intel(R) Core(TM) i7-1165G7",
});
let events =
diff_scalars(None, &new_facts, &["ram_bytes".into(), "os_version".into()]).unwrap();
assert_eq!(events.len(), 2);
assert!(events.iter().all(|e| e.change_kind == "added"));
assert!(events.iter().all(|e| e.identity_json.is_none()));
assert!(events.iter().all(|e| e.before_json.is_none()));
assert!(events.iter().all(|e| e.after_json.is_some()));
let paths: std::collections::HashSet<_> =
events.iter().map(|e| e.field_path.as_str()).collect();
assert!(paths.contains("ram_bytes"));
assert!(paths.contains("os_version"));
}
#[test]
fn diff_scalars_identical_rescan_emits_nothing() {
let prior = r#"{"ram_bytes":17179869184,"os_version":"10.0.22631"}"#;
let new_facts =
serde_json::json!({"ram_bytes": 17179869184_u64, "os_version": "10.0.22631"});
let events = diff_scalars(
Some(prior),
&new_facts,
&["ram_bytes".into(), "os_version".into()],
)
.unwrap();
assert!(events.is_empty(), "stable rescans must produce zero events");
}
#[test]
fn diff_scalars_value_change_emits_changed_with_value_wrapper() {
let prior = r#"{"os_version":"10.0.19045"}"#;
let new_facts = serde_json::json!({"os_version": "10.0.22631"});
let events = diff_scalars(Some(prior), &new_facts, &["os_version".into()]).unwrap();
assert_eq!(events.len(), 1);
let ev = &events[0];
assert_eq!(ev.field_path, "os_version");
assert_eq!(ev.change_kind, "changed");
assert!(ev.identity_json.is_none());
assert!(
ev.before_json
.as_ref()
.unwrap()
.contains("\"value\":\"10.0.19045\"")
);
assert!(
ev.after_json
.as_ref()
.unwrap()
.contains("\"value\":\"10.0.22631\"")
);
}
#[test]
fn diff_scalars_field_missing_from_new_payload_no_event() {
let prior = r#"{"bios_version":"1.30"}"#;
let new_facts = serde_json::json!({"ram_bytes": 1024});
let events = diff_scalars(Some(prior), &new_facts, &["bios_version".into()]).unwrap();
assert!(events.is_empty());
}
#[test]
fn diff_scalars_added_when_prior_lacks_the_field() {
let prior = r#"{"ram_bytes":1024}"#;
let new_facts = serde_json::json!({"ram_bytes": 1024, "os_version": "10.0.22631"});
let events = diff_scalars(Some(prior), &new_facts, &["os_version".into()]).unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0].change_kind, "added");
assert_eq!(events[0].field_path, "os_version");
}
}