Skip to main content

harn_vm/
agent_events.rs

1//! Agent event stream — the ACP-aligned observation surface for the
2//! agent loop.
3//!
4//! Every phase of the turn loop emits an `AgentEvent`. The canonical
5//! variants map 1:1 onto ACP `SessionUpdate` values; three internal
6//! variants (`TurnStart`, `TurnEnd`, `FeedbackInjected`) let pipelines
7//! react to loop milestones that don't have a direct ACP counterpart.
8//!
9//! There are two subscription paths, both keyed on session id so two
10//! concurrent sessions never cross-talk:
11//!
12//! 1. **External sinks** (`AgentEventSink` trait) — Rust-side consumers
13//!    like the harn-cli ACP server. Invoked synchronously by the loop.
14//!    Stored in a global `OnceLock<RwLock<HashMap<...>>>` here.
15//! 2. **Closure subscribers** — `.harn` closures registered via the
16//!    `agent_subscribe(session_id, callback)` host builtin. These live
17//!    on the session's `SessionState.subscribers` in
18//!    `crate::agent_sessions`, because sessions are the single source
19//!    of truth for session-scoped VM state.
20
21use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23
24use serde::{Deserialize, Serialize};
25
26use crate::event_log::{AnyEventLog, EventLog, LogEvent as EventLogRecord, Topic};
27use crate::orchestration::{HandoffArtifact, MutationSessionRecord};
28use crate::tool_annotations::ToolKind;
29
30/// One coalesced filesystem notification from a hostlib `fs_watch`
31/// subscription.
32#[derive(Clone, Debug, Serialize, Deserialize)]
33pub struct FsWatchEvent {
34    pub kind: String,
35    pub paths: Vec<String>,
36    pub relative_paths: Vec<String>,
37    pub raw_kind: String,
38    pub error: Option<String>,
39}
40
41/// Typed worker lifecycle events emitted by delegated/background agent
42/// execution. Bridge-facing worker updates still derive a string status
43/// from these variants, but the runtime no longer passes raw status
44/// strings around internally.
45///
46/// `Spawned`/`Completed`/`Failed`/`Cancelled` are the four terminal-or-start
47/// states. `Progressed` is fired on intermediate milestones (e.g. a
48/// retriggerable worker resuming from `awaiting_input`, or a workflow
49/// stage completing without ending the worker). `WaitingForInput` covers
50/// retriggerable workers that finish a cycle but stay alive pending the
51/// next host-supplied trigger payload.
52#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
53pub enum WorkerEvent {
54    WorkerSpawned,
55    WorkerProgressed,
56    WorkerWaitingForInput,
57    WorkerCompleted,
58    WorkerFailed,
59    WorkerCancelled,
60}
61
62impl WorkerEvent {
63    /// Wire-level status string used by bridge `worker_update` payloads
64    /// and ACP `worker_update` session updates. The four canonical
65    /// states are mirrored from harn's internal worker `status` field
66    /// (`running`/`completed`/`failed`/`cancelled`), and the two newer
67    /// lifecycle states pick names that don't collide with any existing
68    /// status string.
69    pub fn as_status(self) -> &'static str {
70        match self {
71            Self::WorkerSpawned => "running",
72            Self::WorkerProgressed => "progressed",
73            Self::WorkerWaitingForInput => "awaiting_input",
74            Self::WorkerCompleted => "completed",
75            Self::WorkerFailed => "failed",
76            Self::WorkerCancelled => "cancelled",
77        }
78    }
79
80    pub fn as_str(self) -> &'static str {
81        match self {
82            Self::WorkerSpawned => "WorkerSpawned",
83            Self::WorkerProgressed => "WorkerProgressed",
84            Self::WorkerWaitingForInput => "WorkerWaitingForInput",
85            Self::WorkerCompleted => "WorkerCompleted",
86            Self::WorkerFailed => "WorkerFailed",
87            Self::WorkerCancelled => "WorkerCancelled",
88        }
89    }
90
91    /// True for lifecycle events that mean the worker has reached a
92    /// final, non-resumable state. Retriggerable awaiting and
93    /// progressed milestones are *not* terminal — the worker keeps
94    /// running or is waiting for a trigger.
95    pub fn is_terminal(self) -> bool {
96        matches!(
97            self,
98            Self::WorkerCompleted | Self::WorkerFailed | Self::WorkerCancelled
99        )
100    }
101}
102
103/// Status of a tool call. Mirrors ACP's `toolCallStatus`.
104#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
105#[serde(rename_all = "snake_case")]
106pub enum ToolCallStatus {
107    /// Dispatched by the model but not yet started.
108    Pending,
109    /// Dispatch is actively running.
110    InProgress,
111    /// Finished successfully.
112    Completed,
113    /// Finished with an error.
114    Failed,
115}
116
117/// Wire-level classification of a `ToolCallUpdate` failure. Pairs with the
118/// human-readable `error` string so clients can render each failure type
119/// distinctly (e.g. surface a "permission denied" badge, or a different
120/// retry affordance for `network` vs `tool_error`). The enum is
121/// deliberately extensible — `unknown` is the default when the runtime
122/// could not classify a failure.
123#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
124#[serde(rename_all = "snake_case")]
125pub enum ToolCallErrorCategory {
126    /// Host-side validation rejected the args (missing required field,
127    /// invalid type, malformed JSON).
128    SchemaValidation,
129    /// The tool ran and returned an error result (e.g. `read_file` on a
130    /// missing path) — distinguished from a transport failure.
131    ToolError,
132    /// MCP transport / server-protocol error.
133    McpServerError,
134    /// Burin Swift host bridge returned an error during dispatch.
135    HostBridgeError,
136    /// `session/request_permission` denied by the client, or a policy
137    /// rule (static or dynamic) refused the call.
138    PermissionDenied,
139    /// The harn loop detector skipped this call because the same
140    /// (tool, args) pair repeated past the configured threshold.
141    RejectedLoop,
142    /// Streaming text candidate was detected (bare `name(` or
143    /// `<tool_call>` opener) but never resolved into a parseable call:
144    /// args parsed as malformed, the heredoc body broke, the tag closed
145    /// without a balanced expression, or the stream ended mid-call.
146    /// Used by the streaming candidate detector (harn#692) to retract a
147    /// `tool_call` candidate that turned out to be prose or syntactically
148    /// broken so clients can dismiss the in-flight chip.
149    ParseAborted,
150    /// The tool exceeded its time budget.
151    Timeout,
152    /// Transient network / rate-limited / 5xx provider failure.
153    Network,
154    /// The tool was cancelled (e.g. session aborted).
155    Cancelled,
156    /// Default when classification was not performed.
157    Unknown,
158}
159
160impl ToolCallErrorCategory {
161    pub fn as_str(self) -> &'static str {
162        match self {
163            Self::SchemaValidation => "schema_validation",
164            Self::ToolError => "tool_error",
165            Self::McpServerError => "mcp_server_error",
166            Self::HostBridgeError => "host_bridge_error",
167            Self::PermissionDenied => "permission_denied",
168            Self::RejectedLoop => "rejected_loop",
169            Self::ParseAborted => "parse_aborted",
170            Self::Timeout => "timeout",
171            Self::Network => "network",
172            Self::Cancelled => "cancelled",
173            Self::Unknown => "unknown",
174        }
175    }
176
177    /// Map an internal `ErrorCategory` (used by the VM's `VmError`
178    /// classification) onto the wire enum. The internal taxonomy is
179    /// finer-grained — several transient categories collapse onto
180    /// `Network`, and the auth/quota family becomes `HostBridgeError`
181    /// because at the tool-dispatch boundary those errors come from
182    /// the bridge transport rather than the tool itself.
183    pub fn from_internal(category: &crate::value::ErrorCategory) -> Self {
184        use crate::value::ErrorCategory as Internal;
185        match category {
186            Internal::Timeout => Self::Timeout,
187            Internal::RateLimit
188            | Internal::Overloaded
189            | Internal::ServerError
190            | Internal::TransientNetwork => Self::Network,
191            Internal::SchemaValidation => Self::SchemaValidation,
192            Internal::ToolError => Self::ToolError,
193            Internal::ToolRejected => Self::PermissionDenied,
194            Internal::Cancelled => Self::Cancelled,
195            Internal::Auth
196            | Internal::EgressBlocked
197            | Internal::NotFound
198            | Internal::CircuitOpen
199            | Internal::BudgetExceeded
200            | Internal::Generic => Self::HostBridgeError,
201        }
202    }
203}
204
205/// Where a tool actually ran. Tags `ToolCallUpdate` so clients can render
206/// "via mcp:linear" / "via host bridge" badges, attribute latency by
207/// transport, and route errors to the right surface (harn#691).
208///
209/// On the wire this serializes adjacently-tagged so the `mcp_server`
210/// case carries the configured server name. The ACP adapter rewrites
211/// unit variants as bare strings (`"harn_builtin"`, `"host_bridge"`,
212/// `"provider_native"`) and the `McpServer` case as
213/// `{"kind": "mcp_server", "serverName": "..."}` to match the protocol's
214/// camelCase convention.
215#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
216#[serde(tag = "kind", rename_all = "snake_case")]
217pub enum ToolExecutor {
218    /// VM-stdlib (`read_file`, `write_file`, `exec`, `http_*`, `mcp_*`)
219    /// or any Harn-side handler closure registered in `tools_val`.
220    HarnBuiltin,
221    /// Capability provided by the host through `HostBridge.builtin_call`
222    /// (Swift-side IDE bridge, BurinApp, BurinCLI host shells).
223    HostBridge,
224    /// Tool dispatched against a configured MCP server. Detected by the
225    /// `_mcp_server` tag that `mcp_list_tools` injects on every tool
226    /// dict before the agent loop sees it.
227    McpServer { server_name: String },
228    /// Provider-side server-side tool execution — currently OpenAI
229    /// Responses-API server tools (e.g. native `tool_search`). The
230    /// runtime never dispatches these locally; the model returns the
231    /// already-executed result inline.
232    ProviderNative,
233}
234
235/// Events emitted by the agent loop. The first five variants map 1:1
236/// to ACP `sessionUpdate` variants; the last three are harn-internal.
237#[derive(Clone, Debug, Serialize, Deserialize)]
238#[serde(tag = "type", rename_all = "snake_case")]
239pub enum AgentEvent {
240    AgentMessageChunk {
241        session_id: String,
242        content: String,
243    },
244    AgentThoughtChunk {
245        session_id: String,
246        content: String,
247    },
248    ToolCall {
249        session_id: String,
250        tool_call_id: String,
251        tool_name: String,
252        kind: Option<ToolKind>,
253        status: ToolCallStatus,
254        raw_input: serde_json::Value,
255        /// Set to `Some(true)` by the streaming candidate detector
256        /// (harn#692) when this event represents a tool-call shape
257        /// detected in the model's in-flight assistant text but whose
258        /// arguments have not finished parsing yet. Clients can render a
259        /// spinner / placeholder while the model writes the body. The
260        /// detector follows up with a `ToolCallUpdate { parsing: false,
261        /// .. }` carrying either `status: pending` (promoted) or
262        /// `status: failed` with `error_category: parse_aborted`.
263        /// `None` (the default) means "this is a normal post-parse tool
264        /// call, no candidate phase was active" so the on-disk shape
265        /// stays compatible with replays recorded before this field
266        /// existed.
267        #[serde(default, skip_serializing_if = "Option::is_none")]
268        parsing: Option<bool>,
269        /// Mutation-session audit context active when the tool was
270        /// dispatched (see harn#699). Hosts use it to group every tool
271        /// emission belonging to the same write-capable session.
272        #[serde(default, skip_serializing_if = "Option::is_none")]
273        audit: Option<MutationSessionRecord>,
274    },
275    ToolCallUpdate {
276        session_id: String,
277        tool_call_id: String,
278        tool_name: String,
279        status: ToolCallStatus,
280        raw_output: Option<serde_json::Value>,
281        error: Option<String>,
282        /// Wall-clock milliseconds from the parse-to-execution boundary
283        /// to the terminal `Completed`/`Failed` update. Includes the
284        /// time spent in any wrapping orchestration logic (loop checks,
285        /// post-tool hooks, microcompaction). Populated only on the
286        /// terminal update — `None` on intermediate `Pending` /
287        /// `InProgress` updates so clients can ignore the field until
288        /// it shows up.
289        #[serde(default, skip_serializing_if = "Option::is_none")]
290        duration_ms: Option<u64>,
291        /// Milliseconds spent in the actual host/builtin/MCP dispatch
292        /// call only (the inner `dispatch_tool_execution` window).
293        /// Populated only on the terminal update; `None` otherwise.
294        #[serde(default, skip_serializing_if = "Option::is_none")]
295        execution_duration_ms: Option<u64>,
296        /// Structured classification of the failure (when `status` is
297        /// `Failed`). Paired with `error` so clients can render each
298        /// category distinctly without parsing free-form strings. Always
299        /// `None` for non-Failed updates and serialized as
300        /// `errorCategory` in the ACP wire format.
301        #[serde(default, skip_serializing_if = "Option::is_none")]
302        error_category: Option<ToolCallErrorCategory>,
303        /// Where the tool actually ran. `None` only for events emitted
304        /// from sites that pre-date the dispatch decision (e.g. the
305        /// pending → in-progress transition the loop emits before the
306        /// dispatcher picks a backend).
307        #[serde(default, skip_serializing_if = "Option::is_none")]
308        executor: Option<ToolExecutor>,
309        /// Companion to `ToolCall.parsing` (harn#692). The streaming
310        /// candidate detector emits the *terminal* candidate event as a
311        /// `ToolCallUpdate` with `parsing: Some(false)` to retract the
312        /// in-flight `parsing: true` chip — either by promoting the
313        /// candidate (`status: pending`, populated `raw_output: None`,
314        /// `error: None`) or aborting it (`status: failed`,
315        /// `error_category: parse_aborted`). `None` means this update is
316        /// not part of a candidate-phase transition.
317        #[serde(default, skip_serializing_if = "Option::is_none")]
318        parsing: Option<bool>,
319        /// Best-effort partial parse of the streamed tool-call arguments.
320        /// Populated by the SSE transport on `Pending` updates as the
321        /// model streams `input_json_delta` (Anthropic) or
322        /// `tool_calls[].function.arguments` deltas (OpenAI). `None` on
323        /// terminal updates and on emissions from non-streaming paths
324        /// (#693). When the partial bytes are not yet parseable as JSON
325        /// the transport falls back to `raw_input_partial`.
326        #[serde(default, skip_serializing_if = "Option::is_none")]
327        raw_input: Option<serde_json::Value>,
328        /// Raw concatenated bytes of the streamed tool-call arguments
329        /// when a permissive parse failed (#693). Mutually exclusive
330        /// with `raw_input`: clients render whichever is present.
331        #[serde(default, skip_serializing_if = "Option::is_none")]
332        raw_input_partial: Option<String>,
333        /// Mutation-session audit context for the tool call. Carries the
334        /// same payload as on the paired `ToolCall` event so a host
335        /// processing a single update doesn't have to correlate against
336        /// the prior pending event.
337        #[serde(default, skip_serializing_if = "Option::is_none")]
338        audit: Option<MutationSessionRecord>,
339    },
340    Plan {
341        session_id: String,
342        plan: serde_json::Value,
343    },
344    TurnStart {
345        session_id: String,
346        iteration: usize,
347    },
348    TurnEnd {
349        session_id: String,
350        iteration: usize,
351        turn_info: serde_json::Value,
352    },
353    FeedbackInjected {
354        session_id: String,
355        kind: String,
356        content: String,
357    },
358    /// Emitted when the agent loop exhausts `max_iterations` without any
359    /// explicit break condition firing. Distinct from a natural "done" or
360    /// a "stuck" nudge-exhaustion: this is strictly a budget cap.
361    BudgetExhausted {
362        session_id: String,
363        max_iterations: usize,
364    },
365    /// Emitted when the loop breaks because consecutive text-only turns
366    /// hit `max_nudges`. Parity with `BudgetExhausted` / `TurnEnd` for
367    /// hosts that key off agent-terminal events.
368    LoopStuck {
369        session_id: String,
370        max_nudges: usize,
371        last_iteration: usize,
372        tail_excerpt: String,
373    },
374    /// Emitted when the daemon idle-wait loop trips its watchdog because
375    /// every configured wake source returned `None` for N consecutive
376    /// attempts. Exists so a broken daemon doesn't hang the session
377    /// silently.
378    DaemonWatchdogTripped {
379        session_id: String,
380        attempts: usize,
381        elapsed_ms: u64,
382    },
383    /// Emitted when a skill is activated. Carries the match reason so
384    /// replayers can reconstruct *why* a given skill took effect at
385    /// this iteration.
386    SkillActivated {
387        session_id: String,
388        skill_name: String,
389        iteration: usize,
390        reason: String,
391    },
392    /// Emitted when a previously-active skill is deactivated because
393    /// the reassess phase no longer matches it.
394    SkillDeactivated {
395        session_id: String,
396        skill_name: String,
397        iteration: usize,
398    },
399    /// Emitted once per activation when the skill's `allowed_tools` filter
400    /// narrows the effective tool surface exposed to the model.
401    SkillScopeTools {
402        session_id: String,
403        skill_name: String,
404        allowed_tools: Vec<String>,
405    },
406    /// Emitted when a `tool_search` query is issued by the model. Carries
407    /// the raw query args, the configured strategy, and a `mode` tag
408    /// distinguishing the client-executed fallback (`"client"`) from
409    /// provider-native paths (`"anthropic"` / `"openai"`). Mirrors the
410    /// transcript event shape so hosts can render a search-in-progress
411    /// chip in real time — the replay path walks the transcript after
412    /// the turn, which is too late for live UX.
413    ToolSearchQuery {
414        session_id: String,
415        tool_use_id: String,
416        name: String,
417        query: serde_json::Value,
418        strategy: String,
419        mode: String,
420    },
421    /// Emitted when `tool_search` resolves — carries the list of tool
422    /// names newly promoted into the model's effective surface for the
423    /// next turn. Pair-emitted with `ToolSearchQuery` on every search.
424    ToolSearchResult {
425        session_id: String,
426        tool_use_id: String,
427        promoted: Vec<String>,
428        strategy: String,
429        mode: String,
430    },
431    TranscriptCompacted {
432        session_id: String,
433        mode: String,
434        strategy: String,
435        archived_messages: usize,
436        estimated_tokens_before: usize,
437        estimated_tokens_after: usize,
438        snapshot_asset_id: Option<String>,
439    },
440    Handoff {
441        session_id: String,
442        artifact_id: String,
443        handoff: Box<HandoffArtifact>,
444    },
445    FsWatch {
446        session_id: String,
447        subscription_id: String,
448        events: Vec<FsWatchEvent>,
449    },
450    /// Lifecycle update for a delegated/background worker. Carries the
451    /// canonical typed `event` variant alongside the worker's current
452    /// `status` string and the structured `metadata` payload that
453    /// `worker_bridge_metadata` builds (task, mode, timing, child
454    /// run/snapshot paths, audit-session, etc.). The `audit` field is
455    /// the same `MutationSessionRecord` JSON serialization carried on
456    /// the bridge wire so ACP/A2A consumers don't need to re-derive it.
457    ///
458    /// One-to-one with the bridge-side `worker_update` session-update
459    /// notification: ACP and A2A adapters subscribe to this variant
460    /// and translate it into their respective wire formats. The
461    /// `session_id` is the parent agent session that owns the worker
462    /// (i.e. the session whose VM spawned the worker), so a single
463    /// host stays subscribed to the same sink for both message and
464    /// worker traffic.
465    WorkerUpdate {
466        session_id: String,
467        worker_id: String,
468        worker_name: String,
469        worker_task: String,
470        worker_mode: String,
471        event: WorkerEvent,
472        status: String,
473        metadata: serde_json::Value,
474        audit: Option<serde_json::Value>,
475    },
476}
477
478impl AgentEvent {
479    pub fn session_id(&self) -> &str {
480        match self {
481            Self::AgentMessageChunk { session_id, .. }
482            | Self::AgentThoughtChunk { session_id, .. }
483            | Self::ToolCall { session_id, .. }
484            | Self::ToolCallUpdate { session_id, .. }
485            | Self::Plan { session_id, .. }
486            | Self::TurnStart { session_id, .. }
487            | Self::TurnEnd { session_id, .. }
488            | Self::FeedbackInjected { session_id, .. }
489            | Self::BudgetExhausted { session_id, .. }
490            | Self::LoopStuck { session_id, .. }
491            | Self::DaemonWatchdogTripped { session_id, .. }
492            | Self::SkillActivated { session_id, .. }
493            | Self::SkillDeactivated { session_id, .. }
494            | Self::SkillScopeTools { session_id, .. }
495            | Self::ToolSearchQuery { session_id, .. }
496            | Self::ToolSearchResult { session_id, .. }
497            | Self::TranscriptCompacted { session_id, .. }
498            | Self::Handoff { session_id, .. }
499            | Self::FsWatch { session_id, .. }
500            | Self::WorkerUpdate { session_id, .. } => session_id,
501        }
502    }
503}
504
505/// External consumers of the event stream (e.g. the harn-cli ACP server,
506/// which translates events into JSON-RPC notifications).
507pub trait AgentEventSink: Send + Sync {
508    fn handle_event(&self, event: &AgentEvent);
509}
510
511/// Envelope written to `event_log.jsonl` (#103). Wraps the raw
512/// `AgentEvent` with monotonic index + timestamp + frame depth so
513/// replay engines can reconstruct paused state at any event index,
514/// and scrubber UIs can bucket events by time. The envelope is the
515/// on-disk shape; the wire format for live consumers is still the
516/// raw `AgentEvent` so existing sinks don't churn.
517#[derive(Clone, Debug, Serialize, Deserialize)]
518pub struct PersistedAgentEvent {
519    /// Monotonic per-session index starting at 0. Unique within a
520    /// session; gaps never happen even under load because the sink
521    /// owns the counter under a mutex.
522    pub index: u64,
523    /// Milliseconds since the Unix epoch, captured when the sink
524    /// received the event. Not the event's emission time — that
525    /// would require threading a clock through every emit site.
526    pub emitted_at_ms: i64,
527    /// Call-stack depth at the moment of emission, when the caller
528    /// can supply it. `None` for events emitted from a context where
529    /// the VM frame stack isn't available.
530    pub frame_depth: Option<u32>,
531    /// The raw event, flattened so `jq '.type'` works as expected.
532    #[serde(flatten)]
533    pub event: AgentEvent,
534}
535
536/// Append-only JSONL sink for a single session's event stream (#103).
537/// One writer per session; sinks rotate to a numbered suffix when a
538/// running file crosses `ROTATE_BYTES` (100 MB today — long chat
539/// sessions rarely exceed 5 MB, so rotation almost never fires).
540pub struct JsonlEventSink {
541    state: Mutex<JsonlEventSinkState>,
542    base_path: std::path::PathBuf,
543}
544
545struct JsonlEventSinkState {
546    writer: std::io::BufWriter<std::fs::File>,
547    index: u64,
548    bytes_written: u64,
549    rotation: u32,
550}
551
552impl JsonlEventSink {
553    /// Hard cap past which the current file rotates to a numbered
554    /// suffix (`event_log-000001.jsonl`). Chosen so long debugging
555    /// sessions don't produce unreadable multi-GB logs.
556    pub const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
557
558    /// Open a new sink writing to `base_path`. Creates parent dirs
559    /// if missing. Overwrites an existing file so each fresh session
560    /// starts from index 0.
561    pub fn open(base_path: impl Into<std::path::PathBuf>) -> std::io::Result<Arc<Self>> {
562        let base_path = base_path.into();
563        if let Some(parent) = base_path.parent() {
564            std::fs::create_dir_all(parent)?;
565        }
566        let file = std::fs::OpenOptions::new()
567            .create(true)
568            .truncate(true)
569            .write(true)
570            .open(&base_path)?;
571        Ok(Arc::new(Self {
572            state: Mutex::new(JsonlEventSinkState {
573                writer: std::io::BufWriter::new(file),
574                index: 0,
575                bytes_written: 0,
576                rotation: 0,
577            }),
578            base_path,
579        }))
580    }
581
582    /// Flush any buffered writes. Called on session shutdown; the
583    /// Drop impl calls this too but on early panic it may not run.
584    pub fn flush(&self) -> std::io::Result<()> {
585        use std::io::Write as _;
586        self.state
587            .lock()
588            .expect("jsonl sink mutex poisoned")
589            .writer
590            .flush()
591    }
592
593    /// Current event index — primarily for tests and the "how many
594    /// events are in this run" run-record summary.
595    pub fn event_count(&self) -> u64 {
596        self.state.lock().expect("jsonl sink mutex poisoned").index
597    }
598
599    fn rotate_if_needed(&self, state: &mut JsonlEventSinkState) -> std::io::Result<()> {
600        use std::io::Write as _;
601        if state.bytes_written < Self::ROTATE_BYTES {
602            return Ok(());
603        }
604        state.writer.flush()?;
605        state.rotation += 1;
606        let suffix = format!("-{:06}", state.rotation);
607        let rotated = self.base_path.with_file_name({
608            let stem = self
609                .base_path
610                .file_stem()
611                .and_then(|s| s.to_str())
612                .unwrap_or("event_log");
613            let ext = self
614                .base_path
615                .extension()
616                .and_then(|e| e.to_str())
617                .unwrap_or("jsonl");
618            format!("{stem}{suffix}.{ext}")
619        });
620        let file = std::fs::OpenOptions::new()
621            .create(true)
622            .truncate(true)
623            .write(true)
624            .open(&rotated)?;
625        state.writer = std::io::BufWriter::new(file);
626        state.bytes_written = 0;
627        Ok(())
628    }
629}
630
631/// Event-log-backed sink for a single session's agent event stream.
632/// Uses the generalized append-only event log when one is installed for
633/// the current VM thread and falls back to `JsonlEventSink` only for
634/// older env-driven workflows.
635pub struct EventLogSink {
636    log: Arc<AnyEventLog>,
637    topic: Topic,
638    session_id: String,
639}
640
641impl EventLogSink {
642    pub fn new(log: Arc<AnyEventLog>, session_id: impl Into<String>) -> Arc<Self> {
643        let session_id = session_id.into();
644        let topic = Topic::new(format!(
645            "observability.agent_events.{}",
646            crate::event_log::sanitize_topic_component(&session_id)
647        ))
648        .expect("session id should sanitize to a valid topic");
649        Arc::new(Self {
650            log,
651            topic,
652            session_id,
653        })
654    }
655}
656
657impl AgentEventSink for JsonlEventSink {
658    fn handle_event(&self, event: &AgentEvent) {
659        use std::io::Write as _;
660        let mut state = self.state.lock().expect("jsonl sink mutex poisoned");
661        let index = state.index;
662        state.index += 1;
663        let emitted_at_ms = std::time::SystemTime::now()
664            .duration_since(std::time::UNIX_EPOCH)
665            .map(|d| d.as_millis() as i64)
666            .unwrap_or(0);
667        let envelope = PersistedAgentEvent {
668            index,
669            emitted_at_ms,
670            frame_depth: None,
671            event: event.clone(),
672        };
673        if let Ok(line) = serde_json::to_string(&envelope) {
674            // One line, newline-terminated — JSON Lines spec.
675            // Errors here are swallowed on purpose; a failing write
676            // must never crash the agent loop, and the run record
677            // itself is a secondary artifact.
678            let _ = state.writer.write_all(line.as_bytes());
679            let _ = state.writer.write_all(b"\n");
680            state.bytes_written += line.len() as u64 + 1;
681            let _ = self.rotate_if_needed(&mut state);
682        }
683    }
684}
685
686impl AgentEventSink for EventLogSink {
687    fn handle_event(&self, event: &AgentEvent) {
688        let event_json = match serde_json::to_value(event) {
689            Ok(value) => value,
690            Err(_) => return,
691        };
692        let event_kind = event_json
693            .get("type")
694            .and_then(|value| value.as_str())
695            .unwrap_or("agent_event")
696            .to_string();
697        let payload = serde_json::json!({
698            "index_hint": now_ms(),
699            "session_id": self.session_id,
700            "event": event_json,
701        });
702        let mut headers = std::collections::BTreeMap::new();
703        headers.insert("session_id".to_string(), self.session_id.clone());
704        let log = self.log.clone();
705        let topic = self.topic.clone();
706        let record = EventLogRecord::new(event_kind, payload).with_headers(headers);
707        if let Ok(handle) = tokio::runtime::Handle::try_current() {
708            handle.spawn(async move {
709                let _ = log.append(&topic, record).await;
710            });
711        } else {
712            let _ = futures::executor::block_on(log.append(&topic, record));
713        }
714    }
715}
716
717impl Drop for JsonlEventSink {
718    fn drop(&mut self) {
719        if let Ok(mut state) = self.state.lock() {
720            use std::io::Write as _;
721            let _ = state.writer.flush();
722        }
723    }
724}
725
726/// Fan-out helper for composing multiple external sinks.
727pub struct MultiSink {
728    sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
729}
730
731impl MultiSink {
732    pub fn new() -> Self {
733        Self {
734            sinks: Mutex::new(Vec::new()),
735        }
736    }
737    pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
738        self.sinks.lock().expect("sink mutex poisoned").push(sink);
739    }
740    pub fn len(&self) -> usize {
741        self.sinks.lock().expect("sink mutex poisoned").len()
742    }
743    pub fn is_empty(&self) -> bool {
744        self.len() == 0
745    }
746}
747
748impl Default for MultiSink {
749    fn default() -> Self {
750        Self::new()
751    }
752}
753
754impl AgentEventSink for MultiSink {
755    fn handle_event(&self, event: &AgentEvent) {
756        // Deliberate: snapshot then release the lock before invoking sink
757        // callbacks. Sinks can re-enter the event system (e.g. a host
758        // sink that logs to another AgentEvent path), so holding the
759        // mutex across the callback would risk self-deadlock. Arc clones
760        // are refcount bumps — cheap.
761        let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
762        for sink in sinks {
763            sink.handle_event(event);
764        }
765    }
766}
767
768#[cfg(test)]
769#[derive(Clone)]
770struct RegisteredSink {
771    owner: std::thread::ThreadId,
772    sink: Arc<dyn AgentEventSink>,
773}
774
775#[cfg(not(test))]
776type RegisteredSink = Arc<dyn AgentEventSink>;
777
778type ExternalSinkRegistry = RwLock<HashMap<String, Vec<RegisteredSink>>>;
779
780fn external_sinks() -> &'static ExternalSinkRegistry {
781    static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
782    REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
783}
784
785pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
786    let session_id = session_id.into();
787    let mut reg = external_sinks().write().expect("sink registry poisoned");
788    #[cfg(test)]
789    let sink = RegisteredSink {
790        owner: std::thread::current().id(),
791        sink,
792    };
793    reg.entry(session_id).or_default().push(sink);
794}
795
796/// Remove all external sinks registered for `session_id`. Does NOT
797/// close the session itself — subscribers and transcript survive, so a
798/// later `agent_loop` call with the same id continues the conversation.
799pub fn clear_session_sinks(session_id: &str) {
800    #[cfg(test)]
801    {
802        let owner = std::thread::current().id();
803        let mut reg = external_sinks().write().expect("sink registry poisoned");
804        if let Some(sinks) = reg.get_mut(session_id) {
805            sinks.retain(|sink| sink.owner != owner);
806            if sinks.is_empty() {
807                reg.remove(session_id);
808            }
809        }
810    }
811    #[cfg(not(test))]
812    {
813        external_sinks()
814            .write()
815            .expect("sink registry poisoned")
816            .remove(session_id);
817    }
818}
819
820pub fn reset_all_sinks() {
821    #[cfg(test)]
822    {
823        let owner = std::thread::current().id();
824        let mut reg = external_sinks().write().expect("sink registry poisoned");
825        reg.retain(|_, sinks| {
826            sinks.retain(|sink| sink.owner != owner);
827            !sinks.is_empty()
828        });
829        crate::agent_sessions::reset_session_store();
830    }
831    #[cfg(not(test))]
832    {
833        external_sinks()
834            .write()
835            .expect("sink registry poisoned")
836            .clear();
837        crate::agent_sessions::reset_session_store();
838    }
839}
840
841/// Emit an event to external sinks registered for this session. Pipeline
842/// closure subscribers are NOT called by this function — the agent
843/// loop owns that path because it needs its async VM context.
844pub fn emit_event(event: &AgentEvent) {
845    let sinks: Vec<Arc<dyn AgentEventSink>> = {
846        let reg = external_sinks().read().expect("sink registry poisoned");
847        #[cfg(test)]
848        {
849            let owner = std::thread::current().id();
850            reg.get(event.session_id())
851                .map(|sinks| {
852                    sinks
853                        .iter()
854                        .filter(|sink| sink.owner == owner)
855                        .map(|sink| sink.sink.clone())
856                        .collect()
857                })
858                .unwrap_or_default()
859        }
860        #[cfg(not(test))]
861        {
862            reg.get(event.session_id()).cloned().unwrap_or_default()
863        }
864    };
865    for sink in sinks {
866        sink.handle_event(event);
867    }
868}
869
870fn now_ms() -> i64 {
871    std::time::SystemTime::now()
872        .duration_since(std::time::UNIX_EPOCH)
873        .map(|duration| duration.as_millis() as i64)
874        .unwrap_or(0)
875}
876
877pub fn session_external_sink_count(session_id: &str) -> usize {
878    #[cfg(test)]
879    {
880        let owner = std::thread::current().id();
881        return external_sinks()
882            .read()
883            .expect("sink registry poisoned")
884            .get(session_id)
885            .map(|sinks| sinks.iter().filter(|sink| sink.owner == owner).count())
886            .unwrap_or(0);
887    }
888    #[cfg(not(test))]
889    {
890        external_sinks()
891            .read()
892            .expect("sink registry poisoned")
893            .get(session_id)
894            .map(|v| v.len())
895            .unwrap_or(0)
896    }
897}
898
899pub fn session_closure_subscriber_count(session_id: &str) -> usize {
900    crate::agent_sessions::subscriber_count(session_id)
901}
902
903#[cfg(test)]
904mod tests {
905    use super::*;
906    use std::sync::atomic::{AtomicUsize, Ordering};
907
908    struct CountingSink(Arc<AtomicUsize>);
909    impl AgentEventSink for CountingSink {
910        fn handle_event(&self, _event: &AgentEvent) {
911            self.0.fetch_add(1, Ordering::SeqCst);
912        }
913    }
914
915    #[test]
916    fn multi_sink_fans_out_in_order() {
917        let multi = MultiSink::new();
918        let a = Arc::new(AtomicUsize::new(0));
919        let b = Arc::new(AtomicUsize::new(0));
920        multi.push(Arc::new(CountingSink(a.clone())));
921        multi.push(Arc::new(CountingSink(b.clone())));
922        let event = AgentEvent::TurnStart {
923            session_id: "s1".into(),
924            iteration: 1,
925        };
926        multi.handle_event(&event);
927        assert_eq!(a.load(Ordering::SeqCst), 1);
928        assert_eq!(b.load(Ordering::SeqCst), 1);
929    }
930
931    #[test]
932    fn session_scoped_sink_routing() {
933        reset_all_sinks();
934        let a = Arc::new(AtomicUsize::new(0));
935        let b = Arc::new(AtomicUsize::new(0));
936        register_sink("session-a", Arc::new(CountingSink(a.clone())));
937        register_sink("session-b", Arc::new(CountingSink(b.clone())));
938        emit_event(&AgentEvent::TurnStart {
939            session_id: "session-a".into(),
940            iteration: 0,
941        });
942        assert_eq!(a.load(Ordering::SeqCst), 1);
943        assert_eq!(b.load(Ordering::SeqCst), 0);
944        emit_event(&AgentEvent::TurnEnd {
945            session_id: "session-b".into(),
946            iteration: 0,
947            turn_info: serde_json::json!({}),
948        });
949        assert_eq!(a.load(Ordering::SeqCst), 1);
950        assert_eq!(b.load(Ordering::SeqCst), 1);
951        clear_session_sinks("session-a");
952        assert_eq!(session_external_sink_count("session-a"), 0);
953        assert_eq!(session_external_sink_count("session-b"), 1);
954        reset_all_sinks();
955    }
956
957    #[test]
958    fn jsonl_sink_writes_monotonic_indices_and_timestamps() {
959        use std::io::{BufRead, BufReader};
960        let dir = std::env::temp_dir().join(format!("harn-event-log-{}", std::process::id()));
961        std::fs::create_dir_all(&dir).unwrap();
962        let path = dir.join("event_log.jsonl");
963        let sink = JsonlEventSink::open(&path).unwrap();
964        for i in 0..5 {
965            sink.handle_event(&AgentEvent::TurnStart {
966                session_id: "s".into(),
967                iteration: i,
968            });
969        }
970        assert_eq!(sink.event_count(), 5);
971        sink.flush().unwrap();
972
973        // Read back + assert monotonic indices + non-decreasing timestamps.
974        let file = std::fs::File::open(&path).unwrap();
975        let mut last_idx: i64 = -1;
976        let mut last_ts: i64 = 0;
977        for line in BufReader::new(file).lines() {
978            let line = line.unwrap();
979            let val: serde_json::Value = serde_json::from_str(&line).unwrap();
980            let idx = val["index"].as_i64().unwrap();
981            let ts = val["emitted_at_ms"].as_i64().unwrap();
982            assert_eq!(idx, last_idx + 1, "indices must be contiguous");
983            assert!(ts >= last_ts, "timestamps must be non-decreasing");
984            last_idx = idx;
985            last_ts = ts;
986            // Event payload flattened — type tag must survive.
987            assert_eq!(val["type"], "turn_start");
988        }
989        assert_eq!(last_idx, 4);
990        let _ = std::fs::remove_file(&path);
991    }
992
993    #[test]
994    fn tool_call_update_durations_serialize_when_present_and_skip_when_absent() {
995        // Terminal update with both durations populated — both fields
996        // appear in the JSON. Snake_case keys here because this is the
997        // canonical AgentEvent shape; the ACP adapter renames to
998        // camelCase separately.
999        let terminal = AgentEvent::ToolCallUpdate {
1000            session_id: "s".into(),
1001            tool_call_id: "tc-1".into(),
1002            tool_name: "read".into(),
1003            status: ToolCallStatus::Completed,
1004            raw_output: None,
1005            error: None,
1006            duration_ms: Some(42),
1007            execution_duration_ms: Some(7),
1008            error_category: None,
1009            executor: None,
1010            parsing: None,
1011
1012            raw_input: None,
1013            raw_input_partial: None,
1014            audit: None,
1015        };
1016        let value = serde_json::to_value(&terminal).unwrap();
1017        assert_eq!(value["duration_ms"], serde_json::json!(42));
1018        assert_eq!(value["execution_duration_ms"], serde_json::json!(7));
1019
1020        // In-progress update with `None` for both — both keys must be
1021        // absent (not `null`) so older ACP clients that key off
1022        // presence don't see a misleading zero.
1023        let intermediate = AgentEvent::ToolCallUpdate {
1024            session_id: "s".into(),
1025            tool_call_id: "tc-1".into(),
1026            tool_name: "read".into(),
1027            status: ToolCallStatus::InProgress,
1028            raw_output: None,
1029            error: None,
1030            duration_ms: None,
1031            execution_duration_ms: None,
1032            error_category: None,
1033            executor: None,
1034            parsing: None,
1035
1036            raw_input: None,
1037            raw_input_partial: None,
1038            audit: None,
1039        };
1040        let value = serde_json::to_value(&intermediate).unwrap();
1041        let object = value.as_object().expect("update serializes as object");
1042        assert!(
1043            !object.contains_key("duration_ms"),
1044            "duration_ms must be omitted when None: {value}"
1045        );
1046        assert!(
1047            !object.contains_key("execution_duration_ms"),
1048            "execution_duration_ms must be omitted when None: {value}"
1049        );
1050    }
1051
1052    #[test]
1053    fn tool_call_update_deserializes_without_duration_fields_for_back_compat() {
1054        // Persisted event-log entries written before the fields existed
1055        // must still deserialize cleanly. The missing keys map to None.
1056        let raw = serde_json::json!({
1057            "type": "tool_call_update",
1058            "session_id": "s",
1059            "tool_call_id": "tc-1",
1060            "tool_name": "read",
1061            "status": "completed",
1062            "raw_output": null,
1063            "error": null,
1064        });
1065        let event: AgentEvent = serde_json::from_value(raw).expect("parses without duration keys");
1066        match event {
1067            AgentEvent::ToolCallUpdate {
1068                duration_ms,
1069                execution_duration_ms,
1070                ..
1071            } => {
1072                assert!(duration_ms.is_none());
1073                assert!(execution_duration_ms.is_none());
1074            }
1075            other => panic!("expected ToolCallUpdate, got {other:?}"),
1076        }
1077    }
1078
1079    #[test]
1080    fn tool_call_status_serde() {
1081        assert_eq!(
1082            serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
1083            "\"pending\""
1084        );
1085        assert_eq!(
1086            serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
1087            "\"in_progress\""
1088        );
1089        assert_eq!(
1090            serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
1091            "\"completed\""
1092        );
1093        assert_eq!(
1094            serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
1095            "\"failed\""
1096        );
1097    }
1098
1099    #[test]
1100    fn tool_call_error_category_serializes_as_snake_case() {
1101        let pairs = [
1102            (ToolCallErrorCategory::SchemaValidation, "schema_validation"),
1103            (ToolCallErrorCategory::ToolError, "tool_error"),
1104            (ToolCallErrorCategory::McpServerError, "mcp_server_error"),
1105            (ToolCallErrorCategory::HostBridgeError, "host_bridge_error"),
1106            (ToolCallErrorCategory::PermissionDenied, "permission_denied"),
1107            (ToolCallErrorCategory::RejectedLoop, "rejected_loop"),
1108            (ToolCallErrorCategory::ParseAborted, "parse_aborted"),
1109            (ToolCallErrorCategory::Timeout, "timeout"),
1110            (ToolCallErrorCategory::Network, "network"),
1111            (ToolCallErrorCategory::Cancelled, "cancelled"),
1112            (ToolCallErrorCategory::Unknown, "unknown"),
1113        ];
1114        for (variant, wire) in pairs {
1115            let encoded = serde_json::to_string(&variant).unwrap();
1116            assert_eq!(encoded, format!("\"{wire}\""));
1117            assert_eq!(variant.as_str(), wire);
1118            // Round-trip via deserialize so wire stability is enforced
1119            // both ways.
1120            let decoded: ToolCallErrorCategory = serde_json::from_str(&encoded).unwrap();
1121            assert_eq!(decoded, variant);
1122        }
1123    }
1124
1125    #[test]
1126    fn tool_executor_round_trips_with_adjacent_tag() {
1127        // Adjacent tagging keeps the wire shape uniform — every variant
1128        // is a JSON object with a `kind` discriminator. The ACP adapter
1129        // rewrites unit variants as bare strings; the on-disk event log
1130        // keeps the object shape so deserialize can recover the variant.
1131        for executor in [
1132            ToolExecutor::HarnBuiltin,
1133            ToolExecutor::HostBridge,
1134            ToolExecutor::McpServer {
1135                server_name: "linear".to_string(),
1136            },
1137            ToolExecutor::ProviderNative,
1138        ] {
1139            let json = serde_json::to_value(&executor).unwrap();
1140            let kind = json.get("kind").and_then(|v| v.as_str()).unwrap();
1141            match &executor {
1142                ToolExecutor::HarnBuiltin => assert_eq!(kind, "harn_builtin"),
1143                ToolExecutor::HostBridge => assert_eq!(kind, "host_bridge"),
1144                ToolExecutor::McpServer { server_name } => {
1145                    assert_eq!(kind, "mcp_server");
1146                    assert_eq!(json["server_name"], *server_name);
1147                }
1148                ToolExecutor::ProviderNative => assert_eq!(kind, "provider_native"),
1149            }
1150            let recovered: ToolExecutor = serde_json::from_value(json).unwrap();
1151            assert_eq!(recovered, executor);
1152        }
1153    }
1154
1155    #[test]
1156    fn tool_call_error_category_from_internal_collapses_transient_family() {
1157        use crate::value::ErrorCategory as Internal;
1158        assert_eq!(
1159            ToolCallErrorCategory::from_internal(&Internal::Timeout),
1160            ToolCallErrorCategory::Timeout
1161        );
1162        for net in [
1163            Internal::RateLimit,
1164            Internal::Overloaded,
1165            Internal::ServerError,
1166            Internal::TransientNetwork,
1167        ] {
1168            assert_eq!(
1169                ToolCallErrorCategory::from_internal(&net),
1170                ToolCallErrorCategory::Network,
1171                "{net:?} should map to Network",
1172            );
1173        }
1174        assert_eq!(
1175            ToolCallErrorCategory::from_internal(&Internal::SchemaValidation),
1176            ToolCallErrorCategory::SchemaValidation
1177        );
1178        assert_eq!(
1179            ToolCallErrorCategory::from_internal(&Internal::ToolError),
1180            ToolCallErrorCategory::ToolError
1181        );
1182        assert_eq!(
1183            ToolCallErrorCategory::from_internal(&Internal::ToolRejected),
1184            ToolCallErrorCategory::PermissionDenied
1185        );
1186        assert_eq!(
1187            ToolCallErrorCategory::from_internal(&Internal::Cancelled),
1188            ToolCallErrorCategory::Cancelled
1189        );
1190        for bridge in [
1191            Internal::Auth,
1192            Internal::EgressBlocked,
1193            Internal::NotFound,
1194            Internal::CircuitOpen,
1195            Internal::Generic,
1196        ] {
1197            assert_eq!(
1198                ToolCallErrorCategory::from_internal(&bridge),
1199                ToolCallErrorCategory::HostBridgeError,
1200                "{bridge:?} should map to HostBridgeError",
1201            );
1202        }
1203    }
1204
1205    #[test]
1206    fn tool_call_update_event_omits_error_category_when_none() {
1207        let event = AgentEvent::ToolCallUpdate {
1208            session_id: "s".into(),
1209            tool_call_id: "t".into(),
1210            tool_name: "read".into(),
1211            status: ToolCallStatus::Completed,
1212            raw_output: None,
1213            error: None,
1214            duration_ms: None,
1215            execution_duration_ms: None,
1216            error_category: None,
1217            executor: None,
1218            parsing: None,
1219
1220            raw_input: None,
1221            raw_input_partial: None,
1222            audit: None,
1223        };
1224        let v = serde_json::to_value(&event).unwrap();
1225        assert_eq!(v["type"], "tool_call_update");
1226        assert!(v.get("error_category").is_none());
1227    }
1228
1229    #[test]
1230    fn tool_call_update_event_serializes_error_category_when_set() {
1231        let event = AgentEvent::ToolCallUpdate {
1232            session_id: "s".into(),
1233            tool_call_id: "t".into(),
1234            tool_name: "read".into(),
1235            status: ToolCallStatus::Failed,
1236            raw_output: None,
1237            error: Some("missing required field".into()),
1238            duration_ms: None,
1239            execution_duration_ms: None,
1240            error_category: Some(ToolCallErrorCategory::SchemaValidation),
1241            executor: None,
1242            parsing: None,
1243
1244            raw_input: None,
1245            raw_input_partial: None,
1246            audit: None,
1247        };
1248        let v = serde_json::to_value(&event).unwrap();
1249        assert_eq!(v["error_category"], "schema_validation");
1250        assert_eq!(v["error"], "missing required field");
1251    }
1252
1253    #[test]
1254    fn tool_call_update_omits_executor_when_absent() {
1255        // `executor: None` must not appear in the serialized event so
1256        // the on-disk shape stays backward-compatible with replays
1257        // recorded before harn#691.
1258        let event = AgentEvent::ToolCallUpdate {
1259            session_id: "s".into(),
1260            tool_call_id: "tc-1".into(),
1261            tool_name: "read".into(),
1262            status: ToolCallStatus::Completed,
1263            raw_output: None,
1264            error: None,
1265            duration_ms: None,
1266            execution_duration_ms: None,
1267            error_category: None,
1268            executor: None,
1269            parsing: None,
1270
1271            raw_input: None,
1272            raw_input_partial: None,
1273            audit: None,
1274        };
1275        let json = serde_json::to_value(&event).unwrap();
1276        assert!(json.get("executor").is_none(), "got: {json}");
1277    }
1278
1279    #[test]
1280    fn worker_event_status_strings_cover_all_variants() {
1281        // Wire-level status strings flow into both bridge `worker_update`
1282        // payloads and ACP `session/update` notifications. Pinning every
1283        // variant here so a future addition can't silently land without
1284        // a docs/wire decision.
1285        assert_eq!(WorkerEvent::WorkerSpawned.as_status(), "running");
1286        assert_eq!(WorkerEvent::WorkerProgressed.as_status(), "progressed");
1287        assert_eq!(
1288            WorkerEvent::WorkerWaitingForInput.as_status(),
1289            "awaiting_input"
1290        );
1291        assert_eq!(WorkerEvent::WorkerCompleted.as_status(), "completed");
1292        assert_eq!(WorkerEvent::WorkerFailed.as_status(), "failed");
1293        assert_eq!(WorkerEvent::WorkerCancelled.as_status(), "cancelled");
1294
1295        for terminal in [
1296            WorkerEvent::WorkerCompleted,
1297            WorkerEvent::WorkerFailed,
1298            WorkerEvent::WorkerCancelled,
1299        ] {
1300            assert!(terminal.is_terminal(), "{terminal:?} should be terminal");
1301        }
1302        for non_terminal in [
1303            WorkerEvent::WorkerSpawned,
1304            WorkerEvent::WorkerProgressed,
1305            WorkerEvent::WorkerWaitingForInput,
1306        ] {
1307            assert!(
1308                !non_terminal.is_terminal(),
1309                "{non_terminal:?} should not be terminal"
1310            );
1311        }
1312    }
1313
1314    #[test]
1315    fn worker_update_event_routes_through_session_keyed_sink() {
1316        // Worker lifecycle events ride the same session-keyed
1317        // `AgentEventSink` registry as message and tool events. This
1318        // is the canonical path ACP and A2A subscribe to — gate it
1319        // here so a registry-routing regression breaks loudly.
1320        reset_all_sinks();
1321        let captured: Arc<Mutex<Vec<AgentEvent>>> = Arc::new(Mutex::new(Vec::new()));
1322        struct CapturingSink(Arc<Mutex<Vec<AgentEvent>>>);
1323        impl AgentEventSink for CapturingSink {
1324            fn handle_event(&self, event: &AgentEvent) {
1325                self.0
1326                    .lock()
1327                    .expect("captured sink mutex poisoned")
1328                    .push(event.clone());
1329            }
1330        }
1331        register_sink(
1332            "worker-session-1",
1333            Arc::new(CapturingSink(captured.clone())),
1334        );
1335        emit_event(&AgentEvent::WorkerUpdate {
1336            session_id: "worker-session-1".into(),
1337            worker_id: "worker_42".into(),
1338            worker_name: "review_captain".into(),
1339            worker_task: "review pr".into(),
1340            worker_mode: "delegated_stage".into(),
1341            event: WorkerEvent::WorkerWaitingForInput,
1342            status: WorkerEvent::WorkerWaitingForInput.as_status().to_string(),
1343            metadata: serde_json::json!({"awaiting_started_at": "0193..."}),
1344            audit: None,
1345        });
1346        // Other sessions don't receive cross-talk.
1347        emit_event(&AgentEvent::WorkerUpdate {
1348            session_id: "other-session".into(),
1349            worker_id: "w2".into(),
1350            worker_name: "n2".into(),
1351            worker_task: "t2".into(),
1352            worker_mode: "delegated_stage".into(),
1353            event: WorkerEvent::WorkerCompleted,
1354            status: "completed".into(),
1355            metadata: serde_json::json!({}),
1356            audit: None,
1357        });
1358        let received = captured.lock().unwrap().clone();
1359        assert_eq!(received.len(), 1, "got: {received:?}");
1360        match &received[0] {
1361            AgentEvent::WorkerUpdate {
1362                session_id,
1363                worker_id,
1364                event,
1365                status,
1366                ..
1367            } => {
1368                assert_eq!(session_id, "worker-session-1");
1369                assert_eq!(worker_id, "worker_42");
1370                assert_eq!(*event, WorkerEvent::WorkerWaitingForInput);
1371                assert_eq!(status, "awaiting_input");
1372            }
1373            other => panic!("expected WorkerUpdate, got {other:?}"),
1374        }
1375        reset_all_sinks();
1376    }
1377
1378    #[test]
1379    fn worker_update_event_serializes_to_canonical_shape() {
1380        // Persisted event-log entries flatten the AgentEvent envelope,
1381        // so the WorkerUpdate variant must serialize with a `type` of
1382        // `worker_update` and the worker fields directly on the
1383        // top-level object (matching the `#[serde(tag = "type", ...)]`
1384        // shape the rest of AgentEvent uses).
1385        let event = AgentEvent::WorkerUpdate {
1386            session_id: "s".into(),
1387            worker_id: "w".into(),
1388            worker_name: "n".into(),
1389            worker_task: "t".into(),
1390            worker_mode: "delegated_stage".into(),
1391            event: WorkerEvent::WorkerProgressed,
1392            status: "progressed".into(),
1393            metadata: serde_json::json!({"started_at": "0193..."}),
1394            audit: Some(serde_json::json!({"run_id": "run_x"})),
1395        };
1396        let value = serde_json::to_value(&event).unwrap();
1397        assert_eq!(value["type"], "worker_update");
1398        assert_eq!(value["session_id"], "s");
1399        assert_eq!(value["worker_id"], "w");
1400        assert_eq!(value["status"], "progressed");
1401        assert_eq!(value["audit"]["run_id"], "run_x");
1402
1403        // Round-trip: the persisted event log must deserialize the
1404        // canonical shape back into the typed variant so replay
1405        // tooling can re-derive lifecycle state offline.
1406        let recovered: AgentEvent = serde_json::from_value(value).unwrap();
1407        match recovered {
1408            AgentEvent::WorkerUpdate {
1409                event: recovered_event,
1410                ..
1411            } => assert_eq!(recovered_event, WorkerEvent::WorkerProgressed),
1412            other => panic!("expected WorkerUpdate, got {other:?}"),
1413        }
1414    }
1415
1416    #[test]
1417    fn tool_call_update_includes_executor_when_present() {
1418        let event = AgentEvent::ToolCallUpdate {
1419            session_id: "s".into(),
1420            tool_call_id: "tc-1".into(),
1421            tool_name: "read".into(),
1422            status: ToolCallStatus::Completed,
1423            raw_output: None,
1424            error: None,
1425            duration_ms: None,
1426            execution_duration_ms: None,
1427            error_category: None,
1428            executor: Some(ToolExecutor::McpServer {
1429                server_name: "github".into(),
1430            }),
1431            parsing: None,
1432
1433            raw_input: None,
1434            raw_input_partial: None,
1435            audit: None,
1436        };
1437        let json = serde_json::to_value(&event).unwrap();
1438        assert_eq!(json["executor"]["kind"], "mcp_server");
1439        assert_eq!(json["executor"]["server_name"], "github");
1440    }
1441
1442    #[test]
1443    fn tool_call_update_omits_audit_when_absent() {
1444        let event = AgentEvent::ToolCallUpdate {
1445            session_id: "s".into(),
1446            tool_call_id: "tc-1".into(),
1447            tool_name: "read".into(),
1448            status: ToolCallStatus::Completed,
1449            raw_output: None,
1450            error: None,
1451            duration_ms: None,
1452            execution_duration_ms: None,
1453            error_category: None,
1454            executor: None,
1455            parsing: None,
1456            raw_input: None,
1457            raw_input_partial: None,
1458            audit: None,
1459        };
1460        let json = serde_json::to_value(&event).unwrap();
1461        assert!(json.get("audit").is_none(), "got: {json}");
1462    }
1463
1464    #[test]
1465    fn tool_call_update_includes_audit_when_present() {
1466        let audit = MutationSessionRecord {
1467            session_id: "session_42".into(),
1468            run_id: Some("run_42".into()),
1469            mutation_scope: "apply_workspace".into(),
1470            execution_kind: Some("worker".into()),
1471            ..Default::default()
1472        };
1473        let event = AgentEvent::ToolCallUpdate {
1474            session_id: "s".into(),
1475            tool_call_id: "tc-1".into(),
1476            tool_name: "edit_file".into(),
1477            status: ToolCallStatus::Completed,
1478            raw_output: None,
1479            error: None,
1480            duration_ms: None,
1481            execution_duration_ms: None,
1482            error_category: None,
1483            executor: Some(ToolExecutor::HostBridge),
1484            parsing: None,
1485            raw_input: None,
1486            raw_input_partial: None,
1487            audit: Some(audit),
1488        };
1489        let json = serde_json::to_value(&event).unwrap();
1490        assert_eq!(json["audit"]["session_id"], "session_42");
1491        assert_eq!(json["audit"]["run_id"], "run_42");
1492        assert_eq!(json["audit"]["mutation_scope"], "apply_workspace");
1493        assert_eq!(json["audit"]["execution_kind"], "worker");
1494    }
1495
1496    #[test]
1497    fn tool_call_update_deserializes_without_audit_field_for_back_compat() {
1498        let raw = serde_json::json!({
1499            "type": "tool_call_update",
1500            "session_id": "s",
1501            "tool_call_id": "tc-1",
1502            "tool_name": "read",
1503            "status": "completed",
1504            "raw_output": null,
1505            "error": null,
1506        });
1507        let event: AgentEvent = serde_json::from_value(raw).expect("parses without audit key");
1508        match event {
1509            AgentEvent::ToolCallUpdate { audit, .. } => {
1510                assert!(audit.is_none());
1511            }
1512            other => panic!("expected ToolCallUpdate, got {other:?}"),
1513        }
1514    }
1515}