Skip to main content

kanade_shared/wire/
obs_event.rs

1//! Issue #246 — per-PC observability events.
2//!
3//! Distinct from [`super::EventStarted`] (lifecycle: "script just
4//! spawned, watch this result_id"). `ObsEvent` is the timeline
5//! data type: sign-in / sign-out, power on / off, sleep / resume,
6//! agent self-update milestones — anything an operator wants to
7//! see on a per-PC timeline. The actual log source is open-ended;
8//! today it's Windows Event Log via a scheduled PowerShell job,
9//! tomorrow it could be agent-emitted milestones or the
10//! `kanade logs collect` diagnostic bundle (#219) carrying a
11//! pointer to an Object Store blob.
12//!
13//! Schema is intentionally narrow: a stable triplet
14//! (`pc_id`, `source`, `event_record_id`) for dedup-on-replay,
15//! plus a `kind` enum-ish string for the SPA filter chip, plus a
16//! free-form JSON `payload` for the per-kind details. Open-ended
17//! by design — adding a new event source doesn't require a wire
18//! change, just a new `kind` value.
19//!
20//! NATS subject: `obs.<pc_id>` (see [`super::super::subject::obs`]).
21//! JetStream stream `OBS_EVENTS` retains ~30d so a backend that
22//! was offline catches up on reconnect.
23//!
24//! De-dup is the BACKEND's job — agent re-sends are explicitly
25//! allowed and useful (a watermark mismatch shouldn't lose data).
26//! Backend UNIQUE constraint on
27//! `(pc_id, source, event_record_id)` makes re-sends a no-op.
28
29use chrono::{DateTime, Utc};
30use serde::{Deserialize, Serialize};
31
32/// One observability event published to `obs.<pc_id>`.
33///
34/// The `kind` field is a free-form string — vocabulary lives at
35/// the consumer (backend projector decides filtering / coloring;
36/// SPA decides chip labels). Established kinds at #246 land:
37///
38/// - `logon` / `logoff` — Security log 4624 / 4634
39/// - `boot` / `shutdown` — System log 12 / 13 (kernel-general)
40/// - `unexpected_shutdown` — System log 41
41/// - `sleep` / `resume` — System log 42 / 107
42/// - `agent_started` / `agent_self_update` — agent-emitted (later)
43/// - `diagnostic` — kanade logs collect bundles (#219)
44///
45/// New kinds can be added without a wire change; the backend
46/// projector stores whatever string the agent sends and the SPA
47/// surfaces it.
48#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
49pub struct ObsEvent {
50    /// PC reporting the event. Routing key on the subject side
51    /// (`obs.<pc_id>`) and primary scope of the SPA timeline view.
52    pub pc_id: String,
53
54    /// Wall-clock instant of the event as known to the SOURCE
55    /// (e.g. Windows Event Log's `TimeCreated`), NOT the moment
56    /// the agent published it. The timeline must reflect when
57    /// things happened on the box, not when the projector heard
58    /// about them — the two can differ by minutes when the agent
59    /// is catching up from outbox after a broker outage.
60    pub at: DateTime<Utc>,
61
62    /// What kind of event this is — the SPA's filter chip and
63    /// the backend projector's coloring key. See the doc comment
64    /// on this struct for the vocabulary at landing.
65    pub kind: String,
66
67    /// Where this event came from. Format `<scheme>:<detail>`
68    /// (e.g. `winlog:System`, `winlog:Security`, `agent:internal`,
69    /// `kanade:logs_collect`). Two roles:
70    ///
71    /// - Distinguishes events from different sources that might
72    ///   share an `event_record_id` namespace.
73    /// - Lets the SPA filter "show me only winlog events" without
74    ///   needing a separate enum.
75    pub source: String,
76
77    /// Stable per-source unique identifier — e.g. EventRecordID
78    /// from the Windows Event Log. Combined with `pc_id` and
79    /// `source` it forms the dedup key, so agent re-sends (under
80    /// watermark drift, outbox replay, etc.) are harmless.
81    ///
82    /// `None` for sources that have no natural unique id (e.g.
83    /// agent-emitted milestones where the only candidate is the
84    /// `at` timestamp + kind, which the backend can synthesize
85    /// from those fields if needed).
86    ///
87    /// `#[serde(default)]` so an agent publisher that has no id
88    /// to emit can omit the field entirely; serde fills `None`
89    /// rather than refusing the message. Without this, agent
90    /// versions that always send the field can deserialize but
91    /// future ones that drop it on `null` cases would silently
92    /// land in the warn-log → projector drop path.
93    #[serde(default)]
94    pub event_record_id: Option<String>,
95
96    /// Free-form per-kind details. The wire stays narrow
97    /// (`pc_id`, `at`, `kind`, `source`, `event_record_id`) and
98    /// the per-kind shape lives here:
99    ///
100    /// - `logon`: `{ "user": "...", "logon_type": 2 }`
101    /// - `boot`: typically `null` or `{}` — the bare presence is
102    ///   the event
103    /// - `diagnostic`: `{ "bucket": "OBJECT_DIAGNOSTICS", "key":
104    ///   "..." }` — pointer to the actual log blob
105    ///
106    /// Backend projector stores this as TEXT (the JSON
107    /// representation); SPA renders it kind-aware.
108    ///
109    /// `#[serde(default)]` so a publisher emitting a bare-presence
110    /// event can omit the field entirely (serde fills
111    /// `Value::Null`) rather than being forced to write
112    /// `"payload": null` on every line.
113    #[serde(default)]
114    pub payload: serde_json::Value,
115}
116
117#[cfg(test)]
118mod tests {
119    use super::*;
120    use chrono::TimeZone;
121    use serde_json::json;
122
123    #[test]
124    fn obs_event_round_trips_through_json() {
125        let t = Utc.with_ymd_and_hms(2026, 5, 28, 10, 41, 0).unwrap();
126        let e = ObsEvent {
127            pc_id: "pc-01".into(),
128            at: t,
129            kind: "logon".into(),
130            source: "winlog:Security".into(),
131            event_record_id: Some("1234567".into()),
132            payload: json!({ "user": "yukimemi", "logon_type": 2 }),
133        };
134        let s = serde_json::to_string(&e).unwrap();
135        let back: ObsEvent = serde_json::from_str(&s).unwrap();
136        assert_eq!(back, e);
137    }
138
139    #[test]
140    fn obs_event_null_payload_is_valid() {
141        // boot / shutdown / similar bare-presence events: the
142        // `at` + `kind` is the whole signal, payload carries
143        // nothing. Make sure null serialises round-trip clean
144        // so the agent's PowerShell side can emit an explicit
145        // `null` without backend deserialise rejecting.
146        let e = ObsEvent {
147            pc_id: "pc-01".into(),
148            at: Utc.with_ymd_and_hms(2026, 5, 28, 0, 0, 0).unwrap(),
149            kind: "boot".into(),
150            source: "winlog:System".into(),
151            event_record_id: Some("99".into()),
152            payload: serde_json::Value::Null,
153        };
154        let s = serde_json::to_string(&e).unwrap();
155        let back: ObsEvent = serde_json::from_str(&s).unwrap();
156        assert_eq!(back, e);
157    }
158
159    #[test]
160    fn obs_event_missing_event_record_id_deserialises() {
161        // Agent-emitted milestones (e.g. agent_started) have no
162        // natural EventRecordID equivalent. The field is
163        // `Option<String>` annotated `#[serde(default)]`, so a
164        // publisher omitting it entirely lands as `None` rather
165        // than failing the deserialise. Gemini #247 HIGH —
166        // without the explicit default, serde requires Option
167        // fields to be present (allowed to be null, not absent).
168        let s = r#"{
169            "pc_id": "pc-01",
170            "at": "2026-05-28T10:00:00Z",
171            "kind": "agent_started",
172            "source": "agent:internal",
173            "payload": null
174        }"#;
175        let e: ObsEvent = serde_json::from_str(s).unwrap();
176        assert_eq!(e.event_record_id, None);
177        assert_eq!(e.kind, "agent_started");
178    }
179
180    #[test]
181    fn obs_event_missing_payload_defaults_to_null() {
182        // Bare-presence events (boot / shutdown / agent_started)
183        // have no per-kind details. A publisher that omits the
184        // `payload` field entirely should parse — `#[serde(default)]`
185        // on the field fills `Value::Null` so the projector's
186        // `INSERT` sees the same value as if the publisher had
187        // written `"payload": null` explicitly.
188        let s = r#"{
189            "pc_id": "pc-01",
190            "at": "2026-05-28T10:00:00Z",
191            "kind": "boot",
192            "source": "winlog:System",
193            "event_record_id": "1"
194        }"#;
195        let e: ObsEvent = serde_json::from_str(s).unwrap();
196        assert_eq!(e.payload, serde_json::Value::Null);
197    }
198}