use std::collections::{BTreeSet, HashSet};
use std::sync::{Mutex, OnceLock};
use anyhow::{Result, anyhow, bail};
use kanade_shared::manifest::{ExplodeColumn, ExplodeSpec, Manifest};
use serde_json::Value as JsonValue;
use sqlx::{Sqlite, SqlitePool, Transaction};
use tracing::{info, warn};
fn ensured_tables() -> &'static Mutex<HashSet<String>> {
static CACHE: OnceLock<Mutex<HashSet<String>>> = OnceLock::new();
CACHE.get_or_init(|| Mutex::new(HashSet::new()))
}
pub fn validate_ident(ident: &str) -> Result<()> {
if ident.is_empty() || ident.len() > 64 {
bail!("identifier {ident:?} must be 1..=64 chars");
}
let mut chars = ident.chars();
let first = chars.next().unwrap();
if !first.is_ascii_alphabetic() && first != '_' {
bail!("identifier {ident:?} must start with a letter or underscore");
}
for c in chars {
if !(c.is_ascii_alphanumeric() || c == '_') {
bail!("identifier {ident:?} contains invalid character {c:?}");
}
}
Ok(())
}
fn validate_kind(kind: Option<&str>) -> Result<()> {
match kind {
None | Some("text") | Some("integer") | Some("real") => Ok(()),
Some(other) => {
bail!("unsupported explode column kind {other:?}; expected text|integer|real")
}
}
}
fn sql_affinity(kind: Option<&str>) -> &'static str {
match kind {
Some("integer") => "INTEGER",
Some("real") => "REAL",
_ => "TEXT",
}
}
pub fn create_table_sql(spec: &ExplodeSpec) -> Result<String> {
validate_ident(&spec.table)?;
if spec.primary_key.is_empty() {
bail!(
"explode spec for table {:?} needs at least one primary_key column",
spec.table,
);
}
let column_names: BTreeSet<&str> = spec.columns.iter().map(|c| c.field.as_str()).collect();
for pk in &spec.primary_key {
validate_ident(pk)?;
if !column_names.contains(pk.as_str()) {
bail!(
"primary_key entry {pk:?} for table {:?} is not in columns",
spec.table,
);
}
}
let mut sql = format!("CREATE TABLE IF NOT EXISTS \"{}\" (\n", spec.table);
sql.push_str(" pc_id TEXT NOT NULL,\n");
sql.push_str(" job_id TEXT NOT NULL,\n");
sql.push_str(" collected_at TIMESTAMP,\n");
for col in &spec.columns {
validate_ident(&col.field)?;
validate_kind(col.kind.as_deref())?;
sql.push_str(&format!(
" \"{}\" {},\n",
col.field,
sql_affinity(col.kind.as_deref())
));
}
sql.push_str(" PRIMARY KEY (pc_id, job_id");
for pk in &spec.primary_key {
sql.push_str(", \"");
sql.push_str(pk);
sql.push('"');
}
sql.push_str(")\n);");
Ok(sql)
}
pub fn create_index_sqls(spec: &ExplodeSpec) -> Result<Vec<String>> {
let mut out = Vec::new();
for col in &spec.columns {
if !col.index {
continue;
}
validate_ident(&col.field)?;
out.push(format!(
"CREATE INDEX IF NOT EXISTS \"idx_{table}_{col}\" ON \"{table}\"(\"{col}\");",
table = spec.table,
col = col.field,
));
}
Ok(out)
}
pub async fn ensure_table(pool: &SqlitePool, spec: &ExplodeSpec) -> Result<()> {
let table_sql = create_table_sql(spec)?;
sqlx::query(&table_sql)
.execute(pool)
.await
.map_err(|e| anyhow!("create table {}: {e}", spec.table))?;
for index_sql in create_index_sqls(spec)? {
sqlx::query(&index_sql)
.execute(pool)
.await
.map_err(|e| anyhow!("create index for {}: {e}", spec.table))?;
}
Ok(())
}
pub async fn ensure_table_cached(pool: &SqlitePool, spec: &ExplodeSpec) -> Result<()> {
{
let cache = ensured_tables().lock().expect("ensured_tables mutex");
if cache.contains(&spec.table) {
return Ok(());
}
}
ensure_table(pool, spec).await?;
let mut cache = ensured_tables().lock().expect("ensured_tables mutex");
cache.insert(spec.table.clone());
Ok(())
}
pub async fn ensure_tables_for_jobs(
pool: &SqlitePool,
manifests: impl IntoIterator<Item = Manifest>,
) -> Result<()> {
for manifest in manifests {
let Some(inv) = manifest.inventory.as_ref() else {
continue;
};
let Some(specs) = inv.explode.as_ref() else {
continue;
};
for spec in specs {
match ensure_table_cached(pool, spec).await {
Ok(()) => info!(
job_id = %manifest.id,
table = %spec.table,
"explode: derived table ready",
),
Err(e) => warn!(
error = %e,
job_id = %manifest.id,
table = %spec.table,
"explode: derived table creation failed (skipped)",
),
}
}
}
Ok(())
}
pub async fn replace_rows(
pool: &SqlitePool,
spec: &ExplodeSpec,
pc_id: &str,
job_id: &str,
collected_at: Option<chrono::DateTime<chrono::Utc>>,
payload: &JsonValue,
) -> Result<usize> {
validate_ident(&spec.table)?;
for col in &spec.columns {
validate_ident(&col.field)?;
}
let Some(arr) = payload.get(&spec.field).and_then(|v| v.as_array()) else {
delete_rows(pool, &spec.table, pc_id, job_id).await?;
return Ok(0);
};
let mut tx: Transaction<'_, Sqlite> = pool.begin().await?;
if spec.track_history {
let events = super::history::diff_explode_rows(&mut tx, spec, pc_id, job_id, arr).await?;
if !events.is_empty() {
super::history::write_events(&mut tx, pc_id, job_id, &spec.field, &events).await?;
}
}
sqlx::query(&format!(
"DELETE FROM \"{}\" WHERE pc_id = ? AND job_id = ?",
spec.table
))
.bind(pc_id)
.bind(job_id)
.execute(&mut *tx)
.await
.map_err(|e| anyhow!("delete prior rows in {}: {e}", spec.table))?;
let quoted_columns: Vec<String> = spec
.columns
.iter()
.map(|c| format!("\"{}\"", c.field))
.collect();
let placeholders = std::iter::repeat_n("?", spec.columns.len() + 3)
.collect::<Vec<_>>()
.join(", ");
let insert_sql = format!(
"INSERT INTO \"{}\" (pc_id, job_id, collected_at, {}) VALUES ({})",
spec.table,
quoted_columns.join(", "),
placeholders,
);
let mut inserted = 0;
for element in arr {
let mut q = sqlx::query(&insert_sql)
.bind(pc_id)
.bind(job_id)
.bind(collected_at);
for col in &spec.columns {
q = bind_column(q, col, element);
}
match q.execute(&mut *tx).await {
Ok(_) => inserted += 1,
Err(e) => warn!(
error = %e,
table = %spec.table,
pc_id,
job_id,
"explode: skip row (insert failed; likely PK collision within payload)",
),
}
}
tx.commit().await?;
Ok(inserted)
}
async fn delete_rows(pool: &SqlitePool, table: &str, pc_id: &str, job_id: &str) -> Result<()> {
sqlx::query(&format!(
"DELETE FROM \"{table}\" WHERE pc_id = ? AND job_id = ?"
))
.bind(pc_id)
.bind(job_id)
.execute(pool)
.await
.map_err(|e| anyhow!("delete rows in {table}: {e}"))?;
Ok(())
}
fn bind_column<'q>(
q: sqlx::query::Query<'q, Sqlite, sqlx::sqlite::SqliteArguments<'q>>,
col: &ExplodeColumn,
element: &'q JsonValue,
) -> sqlx::query::Query<'q, Sqlite, sqlx::sqlite::SqliteArguments<'q>> {
let v = element.get(&col.field);
match (col.kind.as_deref(), v) {
(_, None) | (_, Some(JsonValue::Null)) => q.bind(Option::<String>::None),
(Some("integer"), Some(JsonValue::Number(n))) => q.bind(n.as_i64()),
(Some("real"), Some(JsonValue::Number(n))) => q.bind(n.as_f64()),
(_, Some(JsonValue::String(s))) => q.bind(Some(s.clone())),
(_, Some(JsonValue::Number(n))) => q.bind(Some(n.to_string())),
(_, Some(JsonValue::Bool(b))) => q.bind(Some(b.to_string())),
(_, Some(other)) => q.bind(Some(other.to_string())),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn validate_ident_accepts_normal_names() {
for ok in ["apps", "inventory_sw_apps", "_underscore", "abc123"] {
assert!(validate_ident(ok).is_ok(), "{ok} should pass");
}
}
#[test]
fn validate_ident_rejects_attacks() {
for bad in [
"",
"123leading",
"with space",
"drop;",
"with-dash",
"apps]",
"ねこ",
] {
assert!(validate_ident(bad).is_err(), "{bad:?} should be rejected");
}
}
fn sample_apps_spec() -> ExplodeSpec {
ExplodeSpec {
field: "apps".into(),
table: "inventory_sw_apps".into(),
primary_key: vec!["name".into(), "source".into()],
track_history: false,
columns: vec![
ExplodeColumn {
field: "source".into(),
kind: Some("text".into()),
index: false,
},
ExplodeColumn {
field: "name".into(),
kind: None,
index: true,
},
ExplodeColumn {
field: "version".into(),
kind: None,
index: false,
},
ExplodeColumn {
field: "publisher".into(),
kind: None,
index: false,
},
],
}
}
#[test]
fn create_table_sql_shape() {
let sql = create_table_sql(&sample_apps_spec()).unwrap();
assert!(sql.contains("CREATE TABLE IF NOT EXISTS \"inventory_sw_apps\""));
assert!(sql.contains("pc_id TEXT NOT NULL"));
assert!(sql.contains("\"name\" TEXT"));
assert!(sql.contains("PRIMARY KEY (pc_id, job_id, \"name\", \"source\")"));
}
#[test]
fn create_table_sql_rejects_unknown_primary_key() {
let mut bad = sample_apps_spec();
bad.primary_key = vec!["nonexistent".into()];
let err = create_table_sql(&bad).unwrap_err().to_string();
assert!(err.contains("nonexistent"), "{err}");
}
#[test]
fn create_index_sqls_only_for_marked_columns() {
let sqls = create_index_sqls(&sample_apps_spec()).unwrap();
assert_eq!(sqls.len(), 1);
assert!(sqls[0].contains("\"idx_inventory_sw_apps_name\""));
assert!(sqls[0].contains("ON \"inventory_sw_apps\"(\"name\")"));
}
#[tokio::test]
async fn ensure_table_and_replace_rows_roundtrip() {
use sqlx::sqlite::SqlitePoolOptions;
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect("sqlite::memory:")
.await
.unwrap();
let spec = sample_apps_spec();
ensure_table(&pool, &spec).await.unwrap();
let payload = serde_json::json!({
"apps": [
{"source": "wow6432", "name": "Chrome", "version": "120.0.6099.71", "publisher": "Google"},
{"source": "x64", "name": "Chrome", "version": "120.0.6099.71", "publisher": "Google"},
{"source": "x64", "name": "Firefox", "version": "122.0", "publisher": "Mozilla"},
]
});
let n = replace_rows(&pool, &spec, "minipc-01", "inventory-sw", None, &payload)
.await
.unwrap();
assert_eq!(n, 3);
let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM inventory_sw_apps")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(count.0, 3);
let payload2 = serde_json::json!({
"apps": [
{"source": "x64", "name": "Edge", "version": "121.0", "publisher": "Microsoft"},
]
});
let n = replace_rows(&pool, &spec, "minipc-01", "inventory-sw", None, &payload2)
.await
.unwrap();
assert_eq!(n, 1);
let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM inventory_sw_apps")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(count.0, 1, "old rows replaced, not appended");
let pc2_payload = serde_json::json!({
"apps": [
{"source": "x64", "name": "Chrome", "version": "99.0.4844.51", "publisher": "Google"},
]
});
replace_rows(
&pool,
&spec,
"minipc-02",
"inventory-sw",
None,
&pc2_payload,
)
.await
.unwrap();
let chrome_pcs: Vec<(String, String)> = sqlx::query_as(
"SELECT pc_id, version FROM inventory_sw_apps WHERE name = ? ORDER BY pc_id",
)
.bind("Chrome")
.fetch_all(&pool)
.await
.unwrap();
assert_eq!(chrome_pcs.len(), 1, "minipc-01 no longer has Chrome");
assert_eq!(chrome_pcs[0].0, "minipc-02");
assert_eq!(chrome_pcs[0].1, "99.0.4844.51");
}
#[tokio::test]
async fn replace_rows_with_missing_field_clears_pc_state() {
use sqlx::sqlite::SqlitePoolOptions;
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect("sqlite::memory:")
.await
.unwrap();
let spec = sample_apps_spec();
ensure_table(&pool, &spec).await.unwrap();
let prior = serde_json::json!({
"apps": [{"source": "x64", "name": "Chrome", "version": "120", "publisher": "Google"}]
});
replace_rows(&pool, &spec, "minipc", "inventory-sw", None, &prior)
.await
.unwrap();
let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM inventory_sw_apps")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(count.0, 1);
let no_apps_field = serde_json::json!({ "hostname": "minipc" });
let n = replace_rows(&pool, &spec, "minipc", "inventory-sw", None, &no_apps_field)
.await
.unwrap();
assert_eq!(n, 0);
let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM inventory_sw_apps")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(count.0, 0, "stale rows cleared even when field absent");
}
}