kanade-backend 0.43.31

axum + SQLite projection backend for the kanade endpoint-management system. Hosts /api/* and the embedded SPA dashboard, projects JetStream streams into SQLite, drives the cron scheduler
//! Issue #246 — `/api/obs_events*` routes.
//!
//! Drives the SPA Events page (per-PC timeline) and the planned
//! fleet-wide observability dashboard. Three endpoints:
//!
//! - `GET /api/obs_events?pc_id=&from=&to=&kind=&source=&limit=`
//!   Filtered list, ordered by `at DESC`. Filters are optional;
//!   omitting them all returns the most recent events fleet-wide.
//! - `GET /api/obs_events/kinds` — distinct `kind` strings the
//!   SPA's filter chip needs to populate without a separate query.
//! - `GET /api/obs_events/sources` — distinct `source` strings for
//!   the include/exclude chips (Issue #391).
//! - `GET /api/obs_events/recent?limit=` — convenience alias for
//!   "newest N events fleet-wide" (same as `obs_events` with no
//!   `pc_id` and `limit` default 50).
//!
//! Pagination is keyset (`before_id`) rather than offset so a long-
//! tail timeline view doesn't drift when new events arrive between
//! pages — matches the inventory and audit endpoints' shape.
//! Pagination is deferred to the SPA PR; the first cut returns
//! up to `limit` rows in a single call.

use axum::Json;
use axum::extract::{Query, State};
use axum::http::StatusCode;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::sqlite::SqliteRow;
use sqlx::{Row, SqlitePool};
use tracing::warn;

/// Default page size when the caller doesn't specify `limit`.
/// Generous enough to render a "today on this PC" table without
/// pagination chrome; cheap enough server-side that an accidental
/// no-filter call doesn't pull millions of rows.
const DEFAULT_LIMIT: i64 = 200;
/// Hard ceiling. A misbehaving caller asking for `limit=1_000_000`
/// would otherwise exhaust SQLite's working memory on a busy fleet.
const MAX_LIMIT: i64 = 5_000;

#[derive(Deserialize)]
pub struct ListQuery {
    pub pc_id: Option<String>,
    /// RFC3339 lower bound (inclusive).
    pub from: Option<DateTime<Utc>>,
    /// RFC3339 upper bound (exclusive).
    pub to: Option<DateTime<Utc>>,
    /// Exact-match filter on `kind` (e.g. `logon`, `boot`).
    pub kind: Option<String>,
    /// Exact-match filter on `source` (e.g. `winlog:Security`).
    pub source: Option<String>,
    /// Exact-match filter on `payload.logon_type` (Issue #366).
    /// Windows LogonType numbers: 2 interactive, 3 network,
    /// 4 batch, 5 service, 7 unlock, 10 RDP, 11 cached. Only
    /// logon/logoff events carry the field, so combining this
    /// with a non-logon `kind` filter returns nothing — which is
    /// the honest answer. Kept for URL compatibility; the SPA now
    /// sends the generic `payload_key`/`payload_value` pair
    /// (Issue #391) instead.
    pub logon_type: Option<i64>,
    /// Issue #391: comma-separated include / exclude lists for
    /// `kind` and `source`. Both compose with the single-value
    /// `kind` / `source` gates above (which stay for URL
    /// compatibility); an empty include list (after splitting)
    /// means "no constraint", same as absent.
    pub kinds: Option<String>,
    pub kinds_ex: Option<String>,
    pub sources: Option<String>,
    pub sources_ex: Option<String>,
    /// Issue #391: generic payload filter — `payload_key=user` +
    /// `payload_value=yukimemi` matches rows whose
    /// `payload.<key> == <value>`. The key is restricted to
    /// `[A-Za-z0-9_]+` (400 otherwise) so the `'$.' || ?` path
    /// concat below can't be steered into other JSONPath syntax.
    /// The value is matched as text AND, when it parses as a
    /// number, numerically — so `payload_value=2` matches the
    /// JSON number 2 that the collectors emit.
    pub payload_key: Option<String>,
    pub payload_value: Option<String>,
    pub limit: Option<i64>,
}

/// Turn a comma-separated filter list into the JSON-array string
/// the `json_each(?)` binds expect. Blank segments are dropped;
/// an empty result collapses to `None` ("no constraint") so a
/// trailing comma can't accidentally filter everything out.
fn csv_to_json_array(csv: &Option<String>) -> Option<String> {
    let vals: Vec<&str> = csv
        .as_deref()?
        .split(',')
        .map(str::trim)
        .filter(|s| !s.is_empty())
        .collect();
    if vals.is_empty() {
        return None;
    }
    // Values are data, not SQL — serde_json escaping keeps the
    // array well-formed whatever the kind/source strings contain.
    serde_json::to_string(&vals).ok()
}

#[derive(Serialize)]
pub struct EventRow {
    pub id: i64,
    pub pc_id: String,
    pub at: DateTime<Utc>,
    pub kind: String,
    pub source: String,
    pub event_record_id: Option<String>,
    /// Parsed back into a JSON Value so the SPA receives structured
    /// data instead of a stringified JSON column.
    pub payload: serde_json::Value,
}

#[derive(Serialize)]
pub struct ListResponse {
    pub events: Vec<EventRow>,
}

/// `GET /api/obs_events`.
pub async fn list(
    State(pool): State<SqlitePool>,
    Query(q): Query<ListQuery>,
) -> Result<Json<ListResponse>, StatusCode> {
    let limit = match q.limit {
        None => DEFAULT_LIMIT,
        Some(n) if n > 0 && n <= MAX_LIMIT => n,
        _ => return Err(StatusCode::BAD_REQUEST),
    };

    // Issue #391: payload key allow-list — the key is interpolated
    // into a JSONPath via `'$.' || ?`, so anything beyond
    // identifier characters (quotes, brackets, dots) is rejected
    // up front rather than left to SQLite's path parser.
    let payload_key = match q.payload_key.as_deref().map(str::trim) {
        None | Some("") => None,
        Some(k) if k.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') => Some(k),
        Some(_) => return Err(StatusCode::BAD_REQUEST),
    };
    // The pair only constrains when BOTH halves are present —
    // otherwise neutralise it entirely. A key without a value
    // would otherwise bind `?12` to NULL and the
    // `json_extract(...) = NULL` comparison blanks the whole
    // result set for direct API callers (Gemini #394 medium; the
    // SPA always sends both).
    let payload_value = payload_key.and(q.payload_value.as_deref());
    let payload_key = payload_value.and(payload_key);
    // Numeric twin: collectors emit numbers as JSON numbers
    // (logon_type: 2), and SQLite's `json_extract` returns them
    // typed — a text-only bind would never match. f64 covers i64
    // payload values under SQLite's numeric comparison rules.
    let payload_value_num: Option<f64> = payload_value.and_then(|v| v.parse().ok());

    let kinds_json = csv_to_json_array(&q.kinds);
    let kinds_ex_json = csv_to_json_array(&q.kinds_ex);
    let sources_json = csv_to_json_array(&q.sources);
    let sources_ex_json = csv_to_json_array(&q.sources_ex);

    // Static SQL with "param IS NULL OR column = param" gates per
    // optional filter. Equivalent to building a dynamic WHERE on
    // the fly but keeps the SQL string a `&'static str`, which is
    // what `kanade-backend`'s lint config requires (dynamic SQL is
    // blocked at the lint level to prevent accidental injection
    // surfaces). The same binding appears twice per gated filter
    // — once for the NULL check, once for the equality — which is
    // a SQLite query-planner no-op (the `IS NULL` branch
    // short-circuits at parse time when the bind is non-NULL).
    // `json_extract` on the `?6` gate: `payload` is stored as JSON
    // text, so the logon_type filter digs into it at query time.
    // No index on the expression — acceptable because the filter
    // composes with the indexed gates above and the table is
    // cleanup-bounded (see cleanup.rs).
    // Issue #391 additions keep the same static-SQL discipline:
    // the include/exclude lists arrive as JSON-array strings and
    // unpack inside SQLite via `json_each(?)` — one bind per list,
    // no dynamic IN-clause assembly. The generic payload gate
    // builds its JSONPath from a validated identifier (`'$.' || ?`)
    // and compares against the text bind plus, when the value is
    // numeric, the f64 twin.
    let rows = sqlx::query(
        "SELECT id, pc_id, at, kind, source, event_record_id, payload
         FROM obs_events
         WHERE (?1 IS NULL OR pc_id  = ?1)
           AND (?2 IS NULL OR at    >= ?2)
           AND (?3 IS NULL OR at    <  ?3)
           AND (?4 IS NULL OR kind   = ?4)
           AND (?5 IS NULL OR source = ?5)
           AND (?6 IS NULL OR json_extract(payload, '$.logon_type') = ?6)
           AND (?7 IS NULL OR kind   IN     (SELECT value FROM json_each(?7)))
           AND (?8 IS NULL OR kind   NOT IN (SELECT value FROM json_each(?8)))
           AND (?9 IS NULL OR source IN     (SELECT value FROM json_each(?9)))
           AND (?10 IS NULL OR source NOT IN (SELECT value FROM json_each(?10)))
           AND (?11 IS NULL
                OR json_extract(payload, '$.' || ?11) = ?12
                OR (?13 IS NOT NULL AND json_extract(payload, '$.' || ?11) = ?13))
         ORDER BY at DESC, id DESC
         LIMIT ?14",
    )
    .bind(q.pc_id.as_deref())
    .bind(q.from)
    .bind(q.to)
    .bind(q.kind.as_deref())
    .bind(q.source.as_deref())
    .bind(q.logon_type)
    .bind(kinds_json)
    .bind(kinds_ex_json)
    .bind(sources_json)
    .bind(sources_ex_json)
    .bind(payload_key)
    .bind(payload_value)
    .bind(payload_value_num)
    .bind(limit)
    .fetch_all(&pool)
    .await
    .map_err(|e| {
        warn!(error = %e, "obs_events list query");
        StatusCode::INTERNAL_SERVER_ERROR
    })?;

    let events = rows
        .into_iter()
        .filter_map(|r| match row_to_event(&r) {
            Ok(e) => Some(e),
            Err(e) => {
                // Gemini #248 HIGH: surface schema mismatches /
                // type errors instead of returning blank fields.
                // We can't propagate the error here without
                // changing the response shape; warn-log + drop
                // the row keeps the API consistent while making
                // any bug operator-visible via agent.log.
                warn!(error = %e, "obs_events: drop row that failed to decode");
                None
            }
        })
        .collect();

    Ok(Json(ListResponse { events }))
}

/// Decode one `obs_events` row into an `EventRow`. Errors propagate
/// (vs the previous `unwrap_or_default()` shape which silently
/// returned empty strings on a column-rename / type mismatch). The
/// caller drops the row + logs; an alternative would be to 500 the
/// whole response, but a single bad row in a 200-row page
/// shouldn't take the whole timeline down.
fn row_to_event(r: &SqliteRow) -> sqlx::Result<EventRow> {
    let raw: String = r.try_get("payload")?;
    // `payload` is JSON text we stored ourselves, so a parse
    // failure means data corruption (someone hand-edited the
    // table) rather than a schema mismatch — bubble the same
    // error type out so the warn-log captures both cases.
    let payload = serde_json::from_str(&raw).map_err(|e| {
        sqlx::Error::Decode(format!("obs_events.payload not valid JSON: {e}").into())
    })?;
    Ok(EventRow {
        id: r.try_get("id")?,
        pc_id: r.try_get("pc_id")?,
        at: r.try_get("at")?,
        kind: r.try_get("kind")?,
        source: r.try_get("source")?,
        event_record_id: r.try_get("event_record_id")?,
        payload,
    })
}

#[derive(Serialize)]
pub struct KindsResponse {
    pub kinds: Vec<String>,
}

/// `GET /api/obs_events/kinds`.
pub async fn kinds(State(pool): State<SqlitePool>) -> Result<Json<KindsResponse>, StatusCode> {
    let rows = sqlx::query("SELECT DISTINCT kind FROM obs_events ORDER BY kind")
        .fetch_all(&pool)
        .await
        .map_err(|e| {
            warn!(error = %e, "obs_events kinds query");
            StatusCode::INTERNAL_SERVER_ERROR
        })?;
    // Drop rows that fail to decode (same handling rationale as
    // `list` above — operator sees the warn, the API stays useful).
    let kinds = rows
        .into_iter()
        .filter_map(|r| match r.try_get::<String, _>("kind") {
            Ok(k) => Some(k),
            Err(e) => {
                warn!(error = %e, "obs_events kinds: drop row that failed to decode kind");
                None
            }
        })
        .collect();
    Ok(Json(KindsResponse { kinds }))
}

#[derive(Serialize)]
pub struct SourcesResponse {
    pub sources: Vec<String>,
}

/// `GET /api/obs_events/sources` (Issue #391) — distinct `source`
/// strings for the SPA's include/exclude chips, mirroring `kinds`.
pub async fn sources(State(pool): State<SqlitePool>) -> Result<Json<SourcesResponse>, StatusCode> {
    let rows = sqlx::query("SELECT DISTINCT source FROM obs_events ORDER BY source")
        .fetch_all(&pool)
        .await
        .map_err(|e| {
            warn!(error = %e, "obs_events sources query");
            StatusCode::INTERNAL_SERVER_ERROR
        })?;
    let sources = rows
        .into_iter()
        .filter_map(|r| match r.try_get::<String, _>("source") {
            Ok(s) => Some(s),
            Err(e) => {
                warn!(error = %e, "obs_events sources: drop row that failed to decode source");
                None
            }
        })
        .collect();
    Ok(Json(SourcesResponse { sources }))
}

#[derive(Deserialize)]
pub struct RecentQuery {
    pub limit: Option<i64>,
}

/// `GET /api/obs_events/recent?limit=`. Convenience alias for
/// `/api/obs_events` with no `pc_id`. Lower default `limit` (50)
/// suited to a dashboard "latest activity" card.
pub async fn recent(
    State(pool): State<SqlitePool>,
    Query(q): Query<RecentQuery>,
) -> Result<Json<ListResponse>, StatusCode> {
    let limit = match q.limit {
        None => 50,
        Some(n) if n > 0 && n <= MAX_LIMIT => n,
        _ => return Err(StatusCode::BAD_REQUEST),
    };
    // Same shape as `list` with no filters, just a different
    // default `limit`. Kept as a sibling handler (vs delegating to
    // `list`) so the response model + log labels stay specific to
    // "recent" — small clarity win over saving a few lines.
    let rows = sqlx::query(
        "SELECT id, pc_id, at, kind, source, event_record_id, payload
         FROM obs_events
         ORDER BY at DESC, id DESC
         LIMIT ?",
    )
    .bind(limit)
    .fetch_all(&pool)
    .await
    .map_err(|e| {
        warn!(error = %e, "obs_events recent query");
        StatusCode::INTERNAL_SERVER_ERROR
    })?;

    let events = rows
        .into_iter()
        .filter_map(|r| match row_to_event(&r) {
            Ok(e) => Some(e),
            Err(e) => {
                warn!(error = %e, "obs_events recent: drop row that failed to decode");
                None
            }
        })
        .collect();
    Ok(Json(ListResponse { events }))
}