Skip to main content

harn_vm/
agent_events.rs

1//! Agent event stream — the ACP-aligned observation surface for the
2//! agent loop.
3//!
4//! Every phase of the turn loop emits an `AgentEvent`. The canonical
5//! variants map 1:1 onto ACP `SessionUpdate` values; three internal
6//! variants (`TurnStart`, `TurnEnd`, `FeedbackInjected`) let pipelines
7//! react to loop milestones that don't have a direct ACP counterpart.
8//!
9//! There are two subscription paths, both keyed on session id so two
10//! concurrent sessions never cross-talk:
11//!
12//! 1. **External sinks** (`AgentEventSink` trait) — Rust-side consumers
13//!    like the harn-cli ACP server. Invoked synchronously by the loop.
14//!    Stored in a global `OnceLock<RwLock<HashMap<...>>>` here.
15//! 2. **Closure subscribers** — `.harn` closures registered via the
16//!    `agent_subscribe(session_id, callback)` host builtin. These live
17//!    on the session's `SessionState.subscribers` in
18//!    `crate::agent_sessions`, because sessions are the single source
19//!    of truth for session-scoped VM state.
20
21use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23
24use serde::{Deserialize, Serialize};
25
26use crate::event_log::{AnyEventLog, EventLog, LogEvent as EventLogRecord, Topic};
27use crate::orchestration::{HandoffArtifact, MutationSessionRecord};
28use crate::tool_annotations::ToolKind;
29
30/// One coalesced filesystem notification from a hostlib `fs_watch`
31/// subscription.
32#[derive(Clone, Debug, Serialize, Deserialize)]
33pub struct FsWatchEvent {
34    pub kind: String,
35    pub paths: Vec<String>,
36    pub relative_paths: Vec<String>,
37    pub raw_kind: String,
38    pub error: Option<String>,
39}
40
41/// Typed worker lifecycle events emitted by delegated/background agent
42/// execution. Bridge-facing worker updates still derive a string status
43/// from these variants, but the runtime no longer passes raw status
44/// strings around internally.
45///
46/// `Spawned`/`Completed`/`Failed`/`Cancelled` are the four terminal-or-start
47/// states. `Progressed` is fired on intermediate milestones (e.g. a
48/// retriggerable worker resuming from `awaiting_input`, or a workflow
49/// stage completing without ending the worker). `WaitingForInput` covers
50/// retriggerable workers that finish a cycle but stay alive pending the
51/// next host-supplied trigger payload.
52#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
53pub enum WorkerEvent {
54    WorkerSpawned,
55    WorkerProgressed,
56    WorkerWaitingForInput,
57    WorkerCompleted,
58    WorkerFailed,
59    WorkerCancelled,
60}
61
62impl WorkerEvent {
63    /// Wire-level status string used by bridge `worker_update` payloads
64    /// and ACP `worker_update` session updates. The four canonical
65    /// states are mirrored from harn's internal worker `status` field
66    /// (`running`/`completed`/`failed`/`cancelled`), and the two newer
67    /// lifecycle states pick names that don't collide with any existing
68    /// status string.
69    pub fn as_status(self) -> &'static str {
70        match self {
71            Self::WorkerSpawned => "running",
72            Self::WorkerProgressed => "progressed",
73            Self::WorkerWaitingForInput => "awaiting_input",
74            Self::WorkerCompleted => "completed",
75            Self::WorkerFailed => "failed",
76            Self::WorkerCancelled => "cancelled",
77        }
78    }
79
80    pub fn as_str(self) -> &'static str {
81        match self {
82            Self::WorkerSpawned => "WorkerSpawned",
83            Self::WorkerProgressed => "WorkerProgressed",
84            Self::WorkerWaitingForInput => "WorkerWaitingForInput",
85            Self::WorkerCompleted => "WorkerCompleted",
86            Self::WorkerFailed => "WorkerFailed",
87            Self::WorkerCancelled => "WorkerCancelled",
88        }
89    }
90
91    /// True for lifecycle events that mean the worker has reached a
92    /// final, non-resumable state. Retriggerable awaiting and
93    /// progressed milestones are *not* terminal — the worker keeps
94    /// running or is waiting for a trigger.
95    pub fn is_terminal(self) -> bool {
96        matches!(
97            self,
98            Self::WorkerCompleted | Self::WorkerFailed | Self::WorkerCancelled
99        )
100    }
101}
102
103/// Status of a tool call. Mirrors ACP's `toolCallStatus`.
104#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
105#[serde(rename_all = "snake_case")]
106pub enum ToolCallStatus {
107    /// Dispatched by the model but not yet started.
108    Pending,
109    /// Dispatch is actively running.
110    InProgress,
111    /// Finished successfully.
112    Completed,
113    /// Finished with an error.
114    Failed,
115}
116
117/// Wire-level classification of a `ToolCallUpdate` failure. Pairs with the
118/// human-readable `error` string so clients can render each failure type
119/// distinctly (e.g. surface a "permission denied" badge, or a different
120/// retry affordance for `network` vs `tool_error`). The enum is
121/// deliberately extensible — `unknown` is the default when the runtime
122/// could not classify a failure.
123#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
124#[serde(rename_all = "snake_case")]
125pub enum ToolCallErrorCategory {
126    /// Host-side validation rejected the args (missing required field,
127    /// invalid type, malformed JSON).
128    SchemaValidation,
129    /// The tool ran and returned an error result (e.g. `read_file` on a
130    /// missing path) — distinguished from a transport failure.
131    ToolError,
132    /// MCP transport / server-protocol error.
133    McpServerError,
134    /// Burin Swift host bridge returned an error during dispatch.
135    HostBridgeError,
136    /// `session/request_permission` denied by the client, or a policy
137    /// rule (static or dynamic) refused the call.
138    PermissionDenied,
139    /// The harn loop detector skipped this call because the same
140    /// (tool, args) pair repeated past the configured threshold.
141    RejectedLoop,
142    /// Streaming text candidate was detected (bare `name(` or
143    /// `<tool_call>` opener) but never resolved into a parseable call:
144    /// args parsed as malformed, the heredoc body broke, the tag closed
145    /// without a balanced expression, or the stream ended mid-call.
146    /// Used by the streaming candidate detector (harn#692) to retract a
147    /// `tool_call` candidate that turned out to be prose or syntactically
148    /// broken so clients can dismiss the in-flight chip.
149    ParseAborted,
150    /// The tool exceeded its time budget.
151    Timeout,
152    /// Transient network / rate-limited / 5xx provider failure.
153    Network,
154    /// The tool was cancelled (e.g. session aborted).
155    Cancelled,
156    /// Default when classification was not performed.
157    Unknown,
158}
159
160impl ToolCallErrorCategory {
161    pub fn as_str(self) -> &'static str {
162        match self {
163            Self::SchemaValidation => "schema_validation",
164            Self::ToolError => "tool_error",
165            Self::McpServerError => "mcp_server_error",
166            Self::HostBridgeError => "host_bridge_error",
167            Self::PermissionDenied => "permission_denied",
168            Self::RejectedLoop => "rejected_loop",
169            Self::ParseAborted => "parse_aborted",
170            Self::Timeout => "timeout",
171            Self::Network => "network",
172            Self::Cancelled => "cancelled",
173            Self::Unknown => "unknown",
174        }
175    }
176
177    /// Map an internal `ErrorCategory` (used by the VM's `VmError`
178    /// classification) onto the wire enum. The internal taxonomy is
179    /// finer-grained — several transient categories collapse onto
180    /// `Network`, and the auth/quota family becomes `HostBridgeError`
181    /// because at the tool-dispatch boundary those errors come from
182    /// the bridge transport rather than the tool itself.
183    pub fn from_internal(category: &crate::value::ErrorCategory) -> Self {
184        use crate::value::ErrorCategory as Internal;
185        match category {
186            Internal::Timeout => Self::Timeout,
187            Internal::RateLimit
188            | Internal::Overloaded
189            | Internal::ServerError
190            | Internal::TransientNetwork => Self::Network,
191            Internal::SchemaValidation => Self::SchemaValidation,
192            Internal::ToolError => Self::ToolError,
193            Internal::ToolRejected => Self::PermissionDenied,
194            Internal::Cancelled => Self::Cancelled,
195            Internal::Auth
196            | Internal::EgressBlocked
197            | Internal::NotFound
198            | Internal::CircuitOpen
199            | Internal::Generic => Self::HostBridgeError,
200        }
201    }
202}
203
204/// Where a tool actually ran. Tags `ToolCallUpdate` so clients can render
205/// "via mcp:linear" / "via host bridge" badges, attribute latency by
206/// transport, and route errors to the right surface (harn#691).
207///
208/// On the wire this serializes adjacently-tagged so the `mcp_server`
209/// case carries the configured server name. The ACP adapter rewrites
210/// unit variants as bare strings (`"harn_builtin"`, `"host_bridge"`,
211/// `"provider_native"`) and the `McpServer` case as
212/// `{"kind": "mcp_server", "serverName": "..."}` to match the protocol's
213/// camelCase convention.
214#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
215#[serde(tag = "kind", rename_all = "snake_case")]
216pub enum ToolExecutor {
217    /// VM-stdlib (`read_file`, `write_file`, `exec`, `http_*`, `mcp_*`)
218    /// or any Harn-side handler closure registered in `tools_val`.
219    HarnBuiltin,
220    /// Capability provided by the host through `HostBridge.builtin_call`
221    /// (Swift-side IDE bridge, BurinApp, BurinCLI host shells).
222    HostBridge,
223    /// Tool dispatched against a configured MCP server. Detected by the
224    /// `_mcp_server` tag that `mcp_list_tools` injects on every tool
225    /// dict before the agent loop sees it.
226    McpServer { server_name: String },
227    /// Provider-side server-side tool execution — currently OpenAI
228    /// Responses-API server tools (e.g. native `tool_search`). The
229    /// runtime never dispatches these locally; the model returns the
230    /// already-executed result inline.
231    ProviderNative,
232}
233
234/// Events emitted by the agent loop. The first five variants map 1:1
235/// to ACP `sessionUpdate` variants; the last three are harn-internal.
236#[derive(Clone, Debug, Serialize, Deserialize)]
237#[serde(tag = "type", rename_all = "snake_case")]
238pub enum AgentEvent {
239    AgentMessageChunk {
240        session_id: String,
241        content: String,
242    },
243    AgentThoughtChunk {
244        session_id: String,
245        content: String,
246    },
247    ToolCall {
248        session_id: String,
249        tool_call_id: String,
250        tool_name: String,
251        kind: Option<ToolKind>,
252        status: ToolCallStatus,
253        raw_input: serde_json::Value,
254        /// Set to `Some(true)` by the streaming candidate detector
255        /// (harn#692) when this event represents a tool-call shape
256        /// detected in the model's in-flight assistant text but whose
257        /// arguments have not finished parsing yet. Clients can render a
258        /// spinner / placeholder while the model writes the body. The
259        /// detector follows up with a `ToolCallUpdate { parsing: false,
260        /// .. }` carrying either `status: pending` (promoted) or
261        /// `status: failed` with `error_category: parse_aborted`.
262        /// `None` (the default) means "this is a normal post-parse tool
263        /// call, no candidate phase was active" so the on-disk shape
264        /// stays compatible with replays recorded before this field
265        /// existed.
266        #[serde(default, skip_serializing_if = "Option::is_none")]
267        parsing: Option<bool>,
268        /// Mutation-session audit context active when the tool was
269        /// dispatched (see harn#699). Hosts use it to group every tool
270        /// emission belonging to the same write-capable session.
271        #[serde(default, skip_serializing_if = "Option::is_none")]
272        audit: Option<MutationSessionRecord>,
273    },
274    ToolCallUpdate {
275        session_id: String,
276        tool_call_id: String,
277        tool_name: String,
278        status: ToolCallStatus,
279        raw_output: Option<serde_json::Value>,
280        error: Option<String>,
281        /// Wall-clock milliseconds from the parse-to-execution boundary
282        /// to the terminal `Completed`/`Failed` update. Includes the
283        /// time spent in any wrapping orchestration logic (loop checks,
284        /// post-tool hooks, microcompaction). Populated only on the
285        /// terminal update — `None` on intermediate `Pending` /
286        /// `InProgress` updates so clients can ignore the field until
287        /// it shows up.
288        #[serde(default, skip_serializing_if = "Option::is_none")]
289        duration_ms: Option<u64>,
290        /// Milliseconds spent in the actual host/builtin/MCP dispatch
291        /// call only (the inner `dispatch_tool_execution` window).
292        /// Populated only on the terminal update; `None` otherwise.
293        #[serde(default, skip_serializing_if = "Option::is_none")]
294        execution_duration_ms: Option<u64>,
295        /// Structured classification of the failure (when `status` is
296        /// `Failed`). Paired with `error` so clients can render each
297        /// category distinctly without parsing free-form strings. Always
298        /// `None` for non-Failed updates and serialized as
299        /// `errorCategory` in the ACP wire format.
300        #[serde(default, skip_serializing_if = "Option::is_none")]
301        error_category: Option<ToolCallErrorCategory>,
302        /// Where the tool actually ran. `None` only for events emitted
303        /// from sites that pre-date the dispatch decision (e.g. the
304        /// pending → in-progress transition the loop emits before the
305        /// dispatcher picks a backend).
306        #[serde(default, skip_serializing_if = "Option::is_none")]
307        executor: Option<ToolExecutor>,
308        /// Companion to `ToolCall.parsing` (harn#692). The streaming
309        /// candidate detector emits the *terminal* candidate event as a
310        /// `ToolCallUpdate` with `parsing: Some(false)` to retract the
311        /// in-flight `parsing: true` chip — either by promoting the
312        /// candidate (`status: pending`, populated `raw_output: None`,
313        /// `error: None`) or aborting it (`status: failed`,
314        /// `error_category: parse_aborted`). `None` means this update is
315        /// not part of a candidate-phase transition.
316        #[serde(default, skip_serializing_if = "Option::is_none")]
317        parsing: Option<bool>,
318        /// Best-effort partial parse of the streamed tool-call arguments.
319        /// Populated by the SSE transport on `Pending` updates as the
320        /// model streams `input_json_delta` (Anthropic) or
321        /// `tool_calls[].function.arguments` deltas (OpenAI). `None` on
322        /// terminal updates and on emissions from non-streaming paths
323        /// (#693). When the partial bytes are not yet parseable as JSON
324        /// the transport falls back to `raw_input_partial`.
325        #[serde(default, skip_serializing_if = "Option::is_none")]
326        raw_input: Option<serde_json::Value>,
327        /// Raw concatenated bytes of the streamed tool-call arguments
328        /// when a permissive parse failed (#693). Mutually exclusive
329        /// with `raw_input`: clients render whichever is present.
330        #[serde(default, skip_serializing_if = "Option::is_none")]
331        raw_input_partial: Option<String>,
332        /// Mutation-session audit context for the tool call. Carries the
333        /// same payload as on the paired `ToolCall` event so a host
334        /// processing a single update doesn't have to correlate against
335        /// the prior pending event.
336        #[serde(default, skip_serializing_if = "Option::is_none")]
337        audit: Option<MutationSessionRecord>,
338    },
339    Plan {
340        session_id: String,
341        plan: serde_json::Value,
342    },
343    TurnStart {
344        session_id: String,
345        iteration: usize,
346    },
347    TurnEnd {
348        session_id: String,
349        iteration: usize,
350        turn_info: serde_json::Value,
351    },
352    FeedbackInjected {
353        session_id: String,
354        kind: String,
355        content: String,
356    },
357    /// Emitted when the agent loop exhausts `max_iterations` without any
358    /// explicit break condition firing. Distinct from a natural "done" or
359    /// a "stuck" nudge-exhaustion: this is strictly a budget cap.
360    BudgetExhausted {
361        session_id: String,
362        max_iterations: usize,
363    },
364    /// Emitted when the loop breaks because consecutive text-only turns
365    /// hit `max_nudges`. Parity with `BudgetExhausted` / `TurnEnd` for
366    /// hosts that key off agent-terminal events.
367    LoopStuck {
368        session_id: String,
369        max_nudges: usize,
370        last_iteration: usize,
371        tail_excerpt: String,
372    },
373    /// Emitted when the daemon idle-wait loop trips its watchdog because
374    /// every configured wake source returned `None` for N consecutive
375    /// attempts. Exists so a broken daemon doesn't hang the session
376    /// silently.
377    DaemonWatchdogTripped {
378        session_id: String,
379        attempts: usize,
380        elapsed_ms: u64,
381    },
382    /// Emitted when a skill is activated. Carries the match reason so
383    /// replayers can reconstruct *why* a given skill took effect at
384    /// this iteration.
385    SkillActivated {
386        session_id: String,
387        skill_name: String,
388        iteration: usize,
389        reason: String,
390    },
391    /// Emitted when a previously-active skill is deactivated because
392    /// the reassess phase no longer matches it.
393    SkillDeactivated {
394        session_id: String,
395        skill_name: String,
396        iteration: usize,
397    },
398    /// Emitted once per activation when the skill's `allowed_tools` filter
399    /// narrows the effective tool surface exposed to the model.
400    SkillScopeTools {
401        session_id: String,
402        skill_name: String,
403        allowed_tools: Vec<String>,
404    },
405    /// Emitted when a `tool_search` query is issued by the model. Carries
406    /// the raw query args, the configured strategy, and a `mode` tag
407    /// distinguishing the client-executed fallback (`"client"`) from
408    /// provider-native paths (`"anthropic"` / `"openai"`). Mirrors the
409    /// transcript event shape so hosts can render a search-in-progress
410    /// chip in real time — the replay path walks the transcript after
411    /// the turn, which is too late for live UX.
412    ToolSearchQuery {
413        session_id: String,
414        tool_use_id: String,
415        name: String,
416        query: serde_json::Value,
417        strategy: String,
418        mode: String,
419    },
420    /// Emitted when `tool_search` resolves — carries the list of tool
421    /// names newly promoted into the model's effective surface for the
422    /// next turn. Pair-emitted with `ToolSearchQuery` on every search.
423    ToolSearchResult {
424        session_id: String,
425        tool_use_id: String,
426        promoted: Vec<String>,
427        strategy: String,
428        mode: String,
429    },
430    TranscriptCompacted {
431        session_id: String,
432        mode: String,
433        strategy: String,
434        archived_messages: usize,
435        estimated_tokens_before: usize,
436        estimated_tokens_after: usize,
437        snapshot_asset_id: Option<String>,
438    },
439    Handoff {
440        session_id: String,
441        artifact_id: String,
442        handoff: Box<HandoffArtifact>,
443    },
444    FsWatch {
445        session_id: String,
446        subscription_id: String,
447        events: Vec<FsWatchEvent>,
448    },
449    /// Lifecycle update for a delegated/background worker. Carries the
450    /// canonical typed `event` variant alongside the worker's current
451    /// `status` string and the structured `metadata` payload that
452    /// `worker_bridge_metadata` builds (task, mode, timing, child
453    /// run/snapshot paths, audit-session, etc.). The `audit` field is
454    /// the same `MutationSessionRecord` JSON serialization carried on
455    /// the bridge wire so ACP/A2A consumers don't need to re-derive it.
456    ///
457    /// One-to-one with the bridge-side `worker_update` session-update
458    /// notification: ACP and A2A adapters subscribe to this variant
459    /// and translate it into their respective wire formats. The
460    /// `session_id` is the parent agent session that owns the worker
461    /// (i.e. the session whose VM spawned the worker), so a single
462    /// host stays subscribed to the same sink for both message and
463    /// worker traffic.
464    WorkerUpdate {
465        session_id: String,
466        worker_id: String,
467        worker_name: String,
468        worker_task: String,
469        worker_mode: String,
470        event: WorkerEvent,
471        status: String,
472        metadata: serde_json::Value,
473        audit: Option<serde_json::Value>,
474    },
475}
476
477impl AgentEvent {
478    pub fn session_id(&self) -> &str {
479        match self {
480            Self::AgentMessageChunk { session_id, .. }
481            | Self::AgentThoughtChunk { session_id, .. }
482            | Self::ToolCall { session_id, .. }
483            | Self::ToolCallUpdate { session_id, .. }
484            | Self::Plan { session_id, .. }
485            | Self::TurnStart { session_id, .. }
486            | Self::TurnEnd { session_id, .. }
487            | Self::FeedbackInjected { session_id, .. }
488            | Self::BudgetExhausted { session_id, .. }
489            | Self::LoopStuck { session_id, .. }
490            | Self::DaemonWatchdogTripped { session_id, .. }
491            | Self::SkillActivated { session_id, .. }
492            | Self::SkillDeactivated { session_id, .. }
493            | Self::SkillScopeTools { session_id, .. }
494            | Self::ToolSearchQuery { session_id, .. }
495            | Self::ToolSearchResult { session_id, .. }
496            | Self::TranscriptCompacted { session_id, .. }
497            | Self::Handoff { session_id, .. }
498            | Self::FsWatch { session_id, .. }
499            | Self::WorkerUpdate { session_id, .. } => session_id,
500        }
501    }
502}
503
504/// External consumers of the event stream (e.g. the harn-cli ACP server,
505/// which translates events into JSON-RPC notifications).
506pub trait AgentEventSink: Send + Sync {
507    fn handle_event(&self, event: &AgentEvent);
508}
509
510/// Envelope written to `event_log.jsonl` (#103). Wraps the raw
511/// `AgentEvent` with monotonic index + timestamp + frame depth so
512/// replay engines can reconstruct paused state at any event index,
513/// and scrubber UIs can bucket events by time. The envelope is the
514/// on-disk shape; the wire format for live consumers is still the
515/// raw `AgentEvent` so existing sinks don't churn.
516#[derive(Clone, Debug, Serialize, Deserialize)]
517pub struct PersistedAgentEvent {
518    /// Monotonic per-session index starting at 0. Unique within a
519    /// session; gaps never happen even under load because the sink
520    /// owns the counter under a mutex.
521    pub index: u64,
522    /// Milliseconds since the Unix epoch, captured when the sink
523    /// received the event. Not the event's emission time — that
524    /// would require threading a clock through every emit site.
525    pub emitted_at_ms: i64,
526    /// Call-stack depth at the moment of emission, when the caller
527    /// can supply it. `None` for events emitted from a context where
528    /// the VM frame stack isn't available.
529    pub frame_depth: Option<u32>,
530    /// The raw event, flattened so `jq '.type'` works as expected.
531    #[serde(flatten)]
532    pub event: AgentEvent,
533}
534
535/// Append-only JSONL sink for a single session's event stream (#103).
536/// One writer per session; sinks rotate to a numbered suffix when a
537/// running file crosses `ROTATE_BYTES` (100 MB today — long chat
538/// sessions rarely exceed 5 MB, so rotation almost never fires).
539pub struct JsonlEventSink {
540    state: Mutex<JsonlEventSinkState>,
541    base_path: std::path::PathBuf,
542}
543
544struct JsonlEventSinkState {
545    writer: std::io::BufWriter<std::fs::File>,
546    index: u64,
547    bytes_written: u64,
548    rotation: u32,
549}
550
551impl JsonlEventSink {
552    /// Hard cap past which the current file rotates to a numbered
553    /// suffix (`event_log-000001.jsonl`). Chosen so long debugging
554    /// sessions don't produce unreadable multi-GB logs.
555    pub const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
556
557    /// Open a new sink writing to `base_path`. Creates parent dirs
558    /// if missing. Overwrites an existing file so each fresh session
559    /// starts from index 0.
560    pub fn open(base_path: impl Into<std::path::PathBuf>) -> std::io::Result<Arc<Self>> {
561        let base_path = base_path.into();
562        if let Some(parent) = base_path.parent() {
563            std::fs::create_dir_all(parent)?;
564        }
565        let file = std::fs::OpenOptions::new()
566            .create(true)
567            .truncate(true)
568            .write(true)
569            .open(&base_path)?;
570        Ok(Arc::new(Self {
571            state: Mutex::new(JsonlEventSinkState {
572                writer: std::io::BufWriter::new(file),
573                index: 0,
574                bytes_written: 0,
575                rotation: 0,
576            }),
577            base_path,
578        }))
579    }
580
581    /// Flush any buffered writes. Called on session shutdown; the
582    /// Drop impl calls this too but on early panic it may not run.
583    pub fn flush(&self) -> std::io::Result<()> {
584        use std::io::Write as _;
585        self.state
586            .lock()
587            .expect("jsonl sink mutex poisoned")
588            .writer
589            .flush()
590    }
591
592    /// Current event index — primarily for tests and the "how many
593    /// events are in this run" run-record summary.
594    pub fn event_count(&self) -> u64 {
595        self.state.lock().expect("jsonl sink mutex poisoned").index
596    }
597
598    fn rotate_if_needed(&self, state: &mut JsonlEventSinkState) -> std::io::Result<()> {
599        use std::io::Write as _;
600        if state.bytes_written < Self::ROTATE_BYTES {
601            return Ok(());
602        }
603        state.writer.flush()?;
604        state.rotation += 1;
605        let suffix = format!("-{:06}", state.rotation);
606        let rotated = self.base_path.with_file_name({
607            let stem = self
608                .base_path
609                .file_stem()
610                .and_then(|s| s.to_str())
611                .unwrap_or("event_log");
612            let ext = self
613                .base_path
614                .extension()
615                .and_then(|e| e.to_str())
616                .unwrap_or("jsonl");
617            format!("{stem}{suffix}.{ext}")
618        });
619        let file = std::fs::OpenOptions::new()
620            .create(true)
621            .truncate(true)
622            .write(true)
623            .open(&rotated)?;
624        state.writer = std::io::BufWriter::new(file);
625        state.bytes_written = 0;
626        Ok(())
627    }
628}
629
630/// Event-log-backed sink for a single session's agent event stream.
631/// Uses the generalized append-only event log when one is installed for
632/// the current VM thread and falls back to `JsonlEventSink` only for
633/// older env-driven workflows.
634pub struct EventLogSink {
635    log: Arc<AnyEventLog>,
636    topic: Topic,
637    session_id: String,
638}
639
640impl EventLogSink {
641    pub fn new(log: Arc<AnyEventLog>, session_id: impl Into<String>) -> Arc<Self> {
642        let session_id = session_id.into();
643        let topic = Topic::new(format!(
644            "observability.agent_events.{}",
645            crate::event_log::sanitize_topic_component(&session_id)
646        ))
647        .expect("session id should sanitize to a valid topic");
648        Arc::new(Self {
649            log,
650            topic,
651            session_id,
652        })
653    }
654}
655
656impl AgentEventSink for JsonlEventSink {
657    fn handle_event(&self, event: &AgentEvent) {
658        use std::io::Write as _;
659        let mut state = self.state.lock().expect("jsonl sink mutex poisoned");
660        let index = state.index;
661        state.index += 1;
662        let emitted_at_ms = std::time::SystemTime::now()
663            .duration_since(std::time::UNIX_EPOCH)
664            .map(|d| d.as_millis() as i64)
665            .unwrap_or(0);
666        let envelope = PersistedAgentEvent {
667            index,
668            emitted_at_ms,
669            frame_depth: None,
670            event: event.clone(),
671        };
672        if let Ok(line) = serde_json::to_string(&envelope) {
673            // One line, newline-terminated — JSON Lines spec.
674            // Errors here are swallowed on purpose; a failing write
675            // must never crash the agent loop, and the run record
676            // itself is a secondary artifact.
677            let _ = state.writer.write_all(line.as_bytes());
678            let _ = state.writer.write_all(b"\n");
679            state.bytes_written += line.len() as u64 + 1;
680            let _ = self.rotate_if_needed(&mut state);
681        }
682    }
683}
684
685impl AgentEventSink for EventLogSink {
686    fn handle_event(&self, event: &AgentEvent) {
687        let event_json = match serde_json::to_value(event) {
688            Ok(value) => value,
689            Err(_) => return,
690        };
691        let event_kind = event_json
692            .get("type")
693            .and_then(|value| value.as_str())
694            .unwrap_or("agent_event")
695            .to_string();
696        let payload = serde_json::json!({
697            "index_hint": now_ms(),
698            "session_id": self.session_id,
699            "event": event_json,
700        });
701        let mut headers = std::collections::BTreeMap::new();
702        headers.insert("session_id".to_string(), self.session_id.clone());
703        let log = self.log.clone();
704        let topic = self.topic.clone();
705        let record = EventLogRecord::new(event_kind, payload).with_headers(headers);
706        if let Ok(handle) = tokio::runtime::Handle::try_current() {
707            handle.spawn(async move {
708                let _ = log.append(&topic, record).await;
709            });
710        } else {
711            let _ = futures::executor::block_on(log.append(&topic, record));
712        }
713    }
714}
715
716impl Drop for JsonlEventSink {
717    fn drop(&mut self) {
718        if let Ok(mut state) = self.state.lock() {
719            use std::io::Write as _;
720            let _ = state.writer.flush();
721        }
722    }
723}
724
725/// Fan-out helper for composing multiple external sinks.
726pub struct MultiSink {
727    sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
728}
729
730impl MultiSink {
731    pub fn new() -> Self {
732        Self {
733            sinks: Mutex::new(Vec::new()),
734        }
735    }
736    pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
737        self.sinks.lock().expect("sink mutex poisoned").push(sink);
738    }
739    pub fn len(&self) -> usize {
740        self.sinks.lock().expect("sink mutex poisoned").len()
741    }
742    pub fn is_empty(&self) -> bool {
743        self.len() == 0
744    }
745}
746
747impl Default for MultiSink {
748    fn default() -> Self {
749        Self::new()
750    }
751}
752
753impl AgentEventSink for MultiSink {
754    fn handle_event(&self, event: &AgentEvent) {
755        // Deliberate: snapshot then release the lock before invoking sink
756        // callbacks. Sinks can re-enter the event system (e.g. a host
757        // sink that logs to another AgentEvent path), so holding the
758        // mutex across the callback would risk self-deadlock. Arc clones
759        // are refcount bumps — cheap.
760        let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
761        for sink in sinks {
762            sink.handle_event(event);
763        }
764    }
765}
766
767type ExternalSinkRegistry = RwLock<HashMap<String, Vec<Arc<dyn AgentEventSink>>>>;
768
769fn external_sinks() -> &'static ExternalSinkRegistry {
770    static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
771    REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
772}
773
774pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
775    let session_id = session_id.into();
776    let mut reg = external_sinks().write().expect("sink registry poisoned");
777    reg.entry(session_id).or_default().push(sink);
778}
779
780/// Remove all external sinks registered for `session_id`. Does NOT
781/// close the session itself — subscribers and transcript survive, so a
782/// later `agent_loop` call with the same id continues the conversation.
783pub fn clear_session_sinks(session_id: &str) {
784    external_sinks()
785        .write()
786        .expect("sink registry poisoned")
787        .remove(session_id);
788}
789
790pub fn reset_all_sinks() {
791    external_sinks()
792        .write()
793        .expect("sink registry poisoned")
794        .clear();
795    crate::agent_sessions::reset_session_store();
796}
797
798/// Emit an event to external sinks registered for this session. Pipeline
799/// closure subscribers are NOT called by this function — the agent
800/// loop owns that path because it needs its async VM context.
801pub fn emit_event(event: &AgentEvent) {
802    let sinks: Vec<Arc<dyn AgentEventSink>> = {
803        let reg = external_sinks().read().expect("sink registry poisoned");
804        reg.get(event.session_id()).cloned().unwrap_or_default()
805    };
806    for sink in sinks {
807        sink.handle_event(event);
808    }
809}
810
811fn now_ms() -> i64 {
812    std::time::SystemTime::now()
813        .duration_since(std::time::UNIX_EPOCH)
814        .map(|duration| duration.as_millis() as i64)
815        .unwrap_or(0)
816}
817
818pub fn session_external_sink_count(session_id: &str) -> usize {
819    external_sinks()
820        .read()
821        .expect("sink registry poisoned")
822        .get(session_id)
823        .map(|v| v.len())
824        .unwrap_or(0)
825}
826
827pub fn session_closure_subscriber_count(session_id: &str) -> usize {
828    crate::agent_sessions::subscriber_count(session_id)
829}
830
831#[cfg(test)]
832mod tests {
833    use super::*;
834    use std::sync::atomic::{AtomicUsize, Ordering};
835
836    struct CountingSink(Arc<AtomicUsize>);
837    impl AgentEventSink for CountingSink {
838        fn handle_event(&self, _event: &AgentEvent) {
839            self.0.fetch_add(1, Ordering::SeqCst);
840        }
841    }
842
843    #[test]
844    fn multi_sink_fans_out_in_order() {
845        let multi = MultiSink::new();
846        let a = Arc::new(AtomicUsize::new(0));
847        let b = Arc::new(AtomicUsize::new(0));
848        multi.push(Arc::new(CountingSink(a.clone())));
849        multi.push(Arc::new(CountingSink(b.clone())));
850        let event = AgentEvent::TurnStart {
851            session_id: "s1".into(),
852            iteration: 1,
853        };
854        multi.handle_event(&event);
855        assert_eq!(a.load(Ordering::SeqCst), 1);
856        assert_eq!(b.load(Ordering::SeqCst), 1);
857    }
858
859    #[test]
860    fn session_scoped_sink_routing() {
861        reset_all_sinks();
862        let a = Arc::new(AtomicUsize::new(0));
863        let b = Arc::new(AtomicUsize::new(0));
864        register_sink("session-a", Arc::new(CountingSink(a.clone())));
865        register_sink("session-b", Arc::new(CountingSink(b.clone())));
866        emit_event(&AgentEvent::TurnStart {
867            session_id: "session-a".into(),
868            iteration: 0,
869        });
870        assert_eq!(a.load(Ordering::SeqCst), 1);
871        assert_eq!(b.load(Ordering::SeqCst), 0);
872        emit_event(&AgentEvent::TurnEnd {
873            session_id: "session-b".into(),
874            iteration: 0,
875            turn_info: serde_json::json!({}),
876        });
877        assert_eq!(a.load(Ordering::SeqCst), 1);
878        assert_eq!(b.load(Ordering::SeqCst), 1);
879        clear_session_sinks("session-a");
880        assert_eq!(session_external_sink_count("session-a"), 0);
881        assert_eq!(session_external_sink_count("session-b"), 1);
882        reset_all_sinks();
883    }
884
885    #[test]
886    fn jsonl_sink_writes_monotonic_indices_and_timestamps() {
887        use std::io::{BufRead, BufReader};
888        let dir = std::env::temp_dir().join(format!("harn-event-log-{}", std::process::id()));
889        std::fs::create_dir_all(&dir).unwrap();
890        let path = dir.join("event_log.jsonl");
891        let sink = JsonlEventSink::open(&path).unwrap();
892        for i in 0..5 {
893            sink.handle_event(&AgentEvent::TurnStart {
894                session_id: "s".into(),
895                iteration: i,
896            });
897        }
898        assert_eq!(sink.event_count(), 5);
899        sink.flush().unwrap();
900
901        // Read back + assert monotonic indices + non-decreasing timestamps.
902        let file = std::fs::File::open(&path).unwrap();
903        let mut last_idx: i64 = -1;
904        let mut last_ts: i64 = 0;
905        for line in BufReader::new(file).lines() {
906            let line = line.unwrap();
907            let val: serde_json::Value = serde_json::from_str(&line).unwrap();
908            let idx = val["index"].as_i64().unwrap();
909            let ts = val["emitted_at_ms"].as_i64().unwrap();
910            assert_eq!(idx, last_idx + 1, "indices must be contiguous");
911            assert!(ts >= last_ts, "timestamps must be non-decreasing");
912            last_idx = idx;
913            last_ts = ts;
914            // Event payload flattened — type tag must survive.
915            assert_eq!(val["type"], "turn_start");
916        }
917        assert_eq!(last_idx, 4);
918        let _ = std::fs::remove_file(&path);
919    }
920
921    #[test]
922    fn tool_call_update_durations_serialize_when_present_and_skip_when_absent() {
923        // Terminal update with both durations populated — both fields
924        // appear in the JSON. Snake_case keys here because this is the
925        // canonical AgentEvent shape; the ACP adapter renames to
926        // camelCase separately.
927        let terminal = AgentEvent::ToolCallUpdate {
928            session_id: "s".into(),
929            tool_call_id: "tc-1".into(),
930            tool_name: "read".into(),
931            status: ToolCallStatus::Completed,
932            raw_output: None,
933            error: None,
934            duration_ms: Some(42),
935            execution_duration_ms: Some(7),
936            error_category: None,
937            executor: None,
938            parsing: None,
939
940            raw_input: None,
941            raw_input_partial: None,
942            audit: None,
943        };
944        let value = serde_json::to_value(&terminal).unwrap();
945        assert_eq!(value["duration_ms"], serde_json::json!(42));
946        assert_eq!(value["execution_duration_ms"], serde_json::json!(7));
947
948        // In-progress update with `None` for both — both keys must be
949        // absent (not `null`) so older ACP clients that key off
950        // presence don't see a misleading zero.
951        let intermediate = AgentEvent::ToolCallUpdate {
952            session_id: "s".into(),
953            tool_call_id: "tc-1".into(),
954            tool_name: "read".into(),
955            status: ToolCallStatus::InProgress,
956            raw_output: None,
957            error: None,
958            duration_ms: None,
959            execution_duration_ms: None,
960            error_category: None,
961            executor: None,
962            parsing: None,
963
964            raw_input: None,
965            raw_input_partial: None,
966            audit: None,
967        };
968        let value = serde_json::to_value(&intermediate).unwrap();
969        let object = value.as_object().expect("update serializes as object");
970        assert!(
971            !object.contains_key("duration_ms"),
972            "duration_ms must be omitted when None: {value}"
973        );
974        assert!(
975            !object.contains_key("execution_duration_ms"),
976            "execution_duration_ms must be omitted when None: {value}"
977        );
978    }
979
980    #[test]
981    fn tool_call_update_deserializes_without_duration_fields_for_back_compat() {
982        // Persisted event-log entries written before the fields existed
983        // must still deserialize cleanly. The missing keys map to None.
984        let raw = serde_json::json!({
985            "type": "tool_call_update",
986            "session_id": "s",
987            "tool_call_id": "tc-1",
988            "tool_name": "read",
989            "status": "completed",
990            "raw_output": null,
991            "error": null,
992        });
993        let event: AgentEvent = serde_json::from_value(raw).expect("parses without duration keys");
994        match event {
995            AgentEvent::ToolCallUpdate {
996                duration_ms,
997                execution_duration_ms,
998                ..
999            } => {
1000                assert!(duration_ms.is_none());
1001                assert!(execution_duration_ms.is_none());
1002            }
1003            other => panic!("expected ToolCallUpdate, got {other:?}"),
1004        }
1005    }
1006
1007    #[test]
1008    fn tool_call_status_serde() {
1009        assert_eq!(
1010            serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
1011            "\"pending\""
1012        );
1013        assert_eq!(
1014            serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
1015            "\"in_progress\""
1016        );
1017        assert_eq!(
1018            serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
1019            "\"completed\""
1020        );
1021        assert_eq!(
1022            serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
1023            "\"failed\""
1024        );
1025    }
1026
1027    #[test]
1028    fn tool_call_error_category_serializes_as_snake_case() {
1029        let pairs = [
1030            (ToolCallErrorCategory::SchemaValidation, "schema_validation"),
1031            (ToolCallErrorCategory::ToolError, "tool_error"),
1032            (ToolCallErrorCategory::McpServerError, "mcp_server_error"),
1033            (ToolCallErrorCategory::HostBridgeError, "host_bridge_error"),
1034            (ToolCallErrorCategory::PermissionDenied, "permission_denied"),
1035            (ToolCallErrorCategory::RejectedLoop, "rejected_loop"),
1036            (ToolCallErrorCategory::ParseAborted, "parse_aborted"),
1037            (ToolCallErrorCategory::Timeout, "timeout"),
1038            (ToolCallErrorCategory::Network, "network"),
1039            (ToolCallErrorCategory::Cancelled, "cancelled"),
1040            (ToolCallErrorCategory::Unknown, "unknown"),
1041        ];
1042        for (variant, wire) in pairs {
1043            let encoded = serde_json::to_string(&variant).unwrap();
1044            assert_eq!(encoded, format!("\"{wire}\""));
1045            assert_eq!(variant.as_str(), wire);
1046            // Round-trip via deserialize so wire stability is enforced
1047            // both ways.
1048            let decoded: ToolCallErrorCategory = serde_json::from_str(&encoded).unwrap();
1049            assert_eq!(decoded, variant);
1050        }
1051    }
1052
1053    #[test]
1054    fn tool_executor_round_trips_with_adjacent_tag() {
1055        // Adjacent tagging keeps the wire shape uniform — every variant
1056        // is a JSON object with a `kind` discriminator. The ACP adapter
1057        // rewrites unit variants as bare strings; the on-disk event log
1058        // keeps the object shape so deserialize can recover the variant.
1059        for executor in [
1060            ToolExecutor::HarnBuiltin,
1061            ToolExecutor::HostBridge,
1062            ToolExecutor::McpServer {
1063                server_name: "linear".to_string(),
1064            },
1065            ToolExecutor::ProviderNative,
1066        ] {
1067            let json = serde_json::to_value(&executor).unwrap();
1068            let kind = json.get("kind").and_then(|v| v.as_str()).unwrap();
1069            match &executor {
1070                ToolExecutor::HarnBuiltin => assert_eq!(kind, "harn_builtin"),
1071                ToolExecutor::HostBridge => assert_eq!(kind, "host_bridge"),
1072                ToolExecutor::McpServer { server_name } => {
1073                    assert_eq!(kind, "mcp_server");
1074                    assert_eq!(json["server_name"], *server_name);
1075                }
1076                ToolExecutor::ProviderNative => assert_eq!(kind, "provider_native"),
1077            }
1078            let recovered: ToolExecutor = serde_json::from_value(json).unwrap();
1079            assert_eq!(recovered, executor);
1080        }
1081    }
1082
1083    #[test]
1084    fn tool_call_error_category_from_internal_collapses_transient_family() {
1085        use crate::value::ErrorCategory as Internal;
1086        assert_eq!(
1087            ToolCallErrorCategory::from_internal(&Internal::Timeout),
1088            ToolCallErrorCategory::Timeout
1089        );
1090        for net in [
1091            Internal::RateLimit,
1092            Internal::Overloaded,
1093            Internal::ServerError,
1094            Internal::TransientNetwork,
1095        ] {
1096            assert_eq!(
1097                ToolCallErrorCategory::from_internal(&net),
1098                ToolCallErrorCategory::Network,
1099                "{net:?} should map to Network",
1100            );
1101        }
1102        assert_eq!(
1103            ToolCallErrorCategory::from_internal(&Internal::SchemaValidation),
1104            ToolCallErrorCategory::SchemaValidation
1105        );
1106        assert_eq!(
1107            ToolCallErrorCategory::from_internal(&Internal::ToolError),
1108            ToolCallErrorCategory::ToolError
1109        );
1110        assert_eq!(
1111            ToolCallErrorCategory::from_internal(&Internal::ToolRejected),
1112            ToolCallErrorCategory::PermissionDenied
1113        );
1114        assert_eq!(
1115            ToolCallErrorCategory::from_internal(&Internal::Cancelled),
1116            ToolCallErrorCategory::Cancelled
1117        );
1118        for bridge in [
1119            Internal::Auth,
1120            Internal::EgressBlocked,
1121            Internal::NotFound,
1122            Internal::CircuitOpen,
1123            Internal::Generic,
1124        ] {
1125            assert_eq!(
1126                ToolCallErrorCategory::from_internal(&bridge),
1127                ToolCallErrorCategory::HostBridgeError,
1128                "{bridge:?} should map to HostBridgeError",
1129            );
1130        }
1131    }
1132
1133    #[test]
1134    fn tool_call_update_event_omits_error_category_when_none() {
1135        let event = AgentEvent::ToolCallUpdate {
1136            session_id: "s".into(),
1137            tool_call_id: "t".into(),
1138            tool_name: "read".into(),
1139            status: ToolCallStatus::Completed,
1140            raw_output: None,
1141            error: None,
1142            duration_ms: None,
1143            execution_duration_ms: None,
1144            error_category: None,
1145            executor: None,
1146            parsing: None,
1147
1148            raw_input: None,
1149            raw_input_partial: None,
1150            audit: None,
1151        };
1152        let v = serde_json::to_value(&event).unwrap();
1153        assert_eq!(v["type"], "tool_call_update");
1154        assert!(v.get("error_category").is_none());
1155    }
1156
1157    #[test]
1158    fn tool_call_update_event_serializes_error_category_when_set() {
1159        let event = AgentEvent::ToolCallUpdate {
1160            session_id: "s".into(),
1161            tool_call_id: "t".into(),
1162            tool_name: "read".into(),
1163            status: ToolCallStatus::Failed,
1164            raw_output: None,
1165            error: Some("missing required field".into()),
1166            duration_ms: None,
1167            execution_duration_ms: None,
1168            error_category: Some(ToolCallErrorCategory::SchemaValidation),
1169            executor: None,
1170            parsing: None,
1171
1172            raw_input: None,
1173            raw_input_partial: None,
1174            audit: None,
1175        };
1176        let v = serde_json::to_value(&event).unwrap();
1177        assert_eq!(v["error_category"], "schema_validation");
1178        assert_eq!(v["error"], "missing required field");
1179    }
1180
1181    #[test]
1182    fn tool_call_update_omits_executor_when_absent() {
1183        // `executor: None` must not appear in the serialized event so
1184        // the on-disk shape stays backward-compatible with replays
1185        // recorded before harn#691.
1186        let event = AgentEvent::ToolCallUpdate {
1187            session_id: "s".into(),
1188            tool_call_id: "tc-1".into(),
1189            tool_name: "read".into(),
1190            status: ToolCallStatus::Completed,
1191            raw_output: None,
1192            error: None,
1193            duration_ms: None,
1194            execution_duration_ms: None,
1195            error_category: None,
1196            executor: None,
1197            parsing: None,
1198
1199            raw_input: None,
1200            raw_input_partial: None,
1201            audit: None,
1202        };
1203        let json = serde_json::to_value(&event).unwrap();
1204        assert!(json.get("executor").is_none(), "got: {json}");
1205    }
1206
1207    #[test]
1208    fn worker_event_status_strings_cover_all_variants() {
1209        // Wire-level status strings flow into both bridge `worker_update`
1210        // payloads and ACP `session/update` notifications. Pinning every
1211        // variant here so a future addition can't silently land without
1212        // a docs/wire decision.
1213        assert_eq!(WorkerEvent::WorkerSpawned.as_status(), "running");
1214        assert_eq!(WorkerEvent::WorkerProgressed.as_status(), "progressed");
1215        assert_eq!(
1216            WorkerEvent::WorkerWaitingForInput.as_status(),
1217            "awaiting_input"
1218        );
1219        assert_eq!(WorkerEvent::WorkerCompleted.as_status(), "completed");
1220        assert_eq!(WorkerEvent::WorkerFailed.as_status(), "failed");
1221        assert_eq!(WorkerEvent::WorkerCancelled.as_status(), "cancelled");
1222
1223        for terminal in [
1224            WorkerEvent::WorkerCompleted,
1225            WorkerEvent::WorkerFailed,
1226            WorkerEvent::WorkerCancelled,
1227        ] {
1228            assert!(terminal.is_terminal(), "{terminal:?} should be terminal");
1229        }
1230        for non_terminal in [
1231            WorkerEvent::WorkerSpawned,
1232            WorkerEvent::WorkerProgressed,
1233            WorkerEvent::WorkerWaitingForInput,
1234        ] {
1235            assert!(
1236                !non_terminal.is_terminal(),
1237                "{non_terminal:?} should not be terminal"
1238            );
1239        }
1240    }
1241
1242    #[test]
1243    fn worker_update_event_routes_through_session_keyed_sink() {
1244        // Worker lifecycle events ride the same session-keyed
1245        // `AgentEventSink` registry as message and tool events. This
1246        // is the canonical path ACP and A2A subscribe to — gate it
1247        // here so a registry-routing regression breaks loudly.
1248        reset_all_sinks();
1249        let captured: Arc<Mutex<Vec<AgentEvent>>> = Arc::new(Mutex::new(Vec::new()));
1250        struct CapturingSink(Arc<Mutex<Vec<AgentEvent>>>);
1251        impl AgentEventSink for CapturingSink {
1252            fn handle_event(&self, event: &AgentEvent) {
1253                self.0
1254                    .lock()
1255                    .expect("captured sink mutex poisoned")
1256                    .push(event.clone());
1257            }
1258        }
1259        register_sink(
1260            "worker-session-1",
1261            Arc::new(CapturingSink(captured.clone())),
1262        );
1263        emit_event(&AgentEvent::WorkerUpdate {
1264            session_id: "worker-session-1".into(),
1265            worker_id: "worker_42".into(),
1266            worker_name: "review_captain".into(),
1267            worker_task: "review pr".into(),
1268            worker_mode: "delegated_stage".into(),
1269            event: WorkerEvent::WorkerWaitingForInput,
1270            status: WorkerEvent::WorkerWaitingForInput.as_status().to_string(),
1271            metadata: serde_json::json!({"awaiting_started_at": "0193..."}),
1272            audit: None,
1273        });
1274        // Other sessions don't receive cross-talk.
1275        emit_event(&AgentEvent::WorkerUpdate {
1276            session_id: "other-session".into(),
1277            worker_id: "w2".into(),
1278            worker_name: "n2".into(),
1279            worker_task: "t2".into(),
1280            worker_mode: "delegated_stage".into(),
1281            event: WorkerEvent::WorkerCompleted,
1282            status: "completed".into(),
1283            metadata: serde_json::json!({}),
1284            audit: None,
1285        });
1286        let received = captured.lock().unwrap().clone();
1287        assert_eq!(received.len(), 1, "got: {received:?}");
1288        match &received[0] {
1289            AgentEvent::WorkerUpdate {
1290                session_id,
1291                worker_id,
1292                event,
1293                status,
1294                ..
1295            } => {
1296                assert_eq!(session_id, "worker-session-1");
1297                assert_eq!(worker_id, "worker_42");
1298                assert_eq!(*event, WorkerEvent::WorkerWaitingForInput);
1299                assert_eq!(status, "awaiting_input");
1300            }
1301            other => panic!("expected WorkerUpdate, got {other:?}"),
1302        }
1303        reset_all_sinks();
1304    }
1305
1306    #[test]
1307    fn worker_update_event_serializes_to_canonical_shape() {
1308        // Persisted event-log entries flatten the AgentEvent envelope,
1309        // so the WorkerUpdate variant must serialize with a `type` of
1310        // `worker_update` and the worker fields directly on the
1311        // top-level object (matching the `#[serde(tag = "type", ...)]`
1312        // shape the rest of AgentEvent uses).
1313        let event = AgentEvent::WorkerUpdate {
1314            session_id: "s".into(),
1315            worker_id: "w".into(),
1316            worker_name: "n".into(),
1317            worker_task: "t".into(),
1318            worker_mode: "delegated_stage".into(),
1319            event: WorkerEvent::WorkerProgressed,
1320            status: "progressed".into(),
1321            metadata: serde_json::json!({"started_at": "0193..."}),
1322            audit: Some(serde_json::json!({"run_id": "run_x"})),
1323        };
1324        let value = serde_json::to_value(&event).unwrap();
1325        assert_eq!(value["type"], "worker_update");
1326        assert_eq!(value["session_id"], "s");
1327        assert_eq!(value["worker_id"], "w");
1328        assert_eq!(value["status"], "progressed");
1329        assert_eq!(value["audit"]["run_id"], "run_x");
1330
1331        // Round-trip: the persisted event log must deserialize the
1332        // canonical shape back into the typed variant so replay
1333        // tooling can re-derive lifecycle state offline.
1334        let recovered: AgentEvent = serde_json::from_value(value).unwrap();
1335        match recovered {
1336            AgentEvent::WorkerUpdate {
1337                event: recovered_event,
1338                ..
1339            } => assert_eq!(recovered_event, WorkerEvent::WorkerProgressed),
1340            other => panic!("expected WorkerUpdate, got {other:?}"),
1341        }
1342    }
1343
1344    #[test]
1345    fn tool_call_update_includes_executor_when_present() {
1346        let event = AgentEvent::ToolCallUpdate {
1347            session_id: "s".into(),
1348            tool_call_id: "tc-1".into(),
1349            tool_name: "read".into(),
1350            status: ToolCallStatus::Completed,
1351            raw_output: None,
1352            error: None,
1353            duration_ms: None,
1354            execution_duration_ms: None,
1355            error_category: None,
1356            executor: Some(ToolExecutor::McpServer {
1357                server_name: "github".into(),
1358            }),
1359            parsing: None,
1360
1361            raw_input: None,
1362            raw_input_partial: None,
1363            audit: None,
1364        };
1365        let json = serde_json::to_value(&event).unwrap();
1366        assert_eq!(json["executor"]["kind"], "mcp_server");
1367        assert_eq!(json["executor"]["server_name"], "github");
1368    }
1369
1370    #[test]
1371    fn tool_call_update_omits_audit_when_absent() {
1372        let event = AgentEvent::ToolCallUpdate {
1373            session_id: "s".into(),
1374            tool_call_id: "tc-1".into(),
1375            tool_name: "read".into(),
1376            status: ToolCallStatus::Completed,
1377            raw_output: None,
1378            error: None,
1379            duration_ms: None,
1380            execution_duration_ms: None,
1381            error_category: None,
1382            executor: None,
1383            parsing: None,
1384            raw_input: None,
1385            raw_input_partial: None,
1386            audit: None,
1387        };
1388        let json = serde_json::to_value(&event).unwrap();
1389        assert!(json.get("audit").is_none(), "got: {json}");
1390    }
1391
1392    #[test]
1393    fn tool_call_update_includes_audit_when_present() {
1394        let audit = MutationSessionRecord {
1395            session_id: "session_42".into(),
1396            run_id: Some("run_42".into()),
1397            mutation_scope: "apply_workspace".into(),
1398            execution_kind: Some("worker".into()),
1399            ..Default::default()
1400        };
1401        let event = AgentEvent::ToolCallUpdate {
1402            session_id: "s".into(),
1403            tool_call_id: "tc-1".into(),
1404            tool_name: "edit_file".into(),
1405            status: ToolCallStatus::Completed,
1406            raw_output: None,
1407            error: None,
1408            duration_ms: None,
1409            execution_duration_ms: None,
1410            error_category: None,
1411            executor: Some(ToolExecutor::HostBridge),
1412            parsing: None,
1413            raw_input: None,
1414            raw_input_partial: None,
1415            audit: Some(audit),
1416        };
1417        let json = serde_json::to_value(&event).unwrap();
1418        assert_eq!(json["audit"]["session_id"], "session_42");
1419        assert_eq!(json["audit"]["run_id"], "run_42");
1420        assert_eq!(json["audit"]["mutation_scope"], "apply_workspace");
1421        assert_eq!(json["audit"]["execution_kind"], "worker");
1422    }
1423
1424    #[test]
1425    fn tool_call_update_deserializes_without_audit_field_for_back_compat() {
1426        let raw = serde_json::json!({
1427            "type": "tool_call_update",
1428            "session_id": "s",
1429            "tool_call_id": "tc-1",
1430            "tool_name": "read",
1431            "status": "completed",
1432            "raw_output": null,
1433            "error": null,
1434        });
1435        let event: AgentEvent = serde_json::from_value(raw).expect("parses without audit key");
1436        match event {
1437            AgentEvent::ToolCallUpdate { audit, .. } => {
1438                assert!(audit.is_none());
1439            }
1440            other => panic!("expected ToolCallUpdate, got {other:?}"),
1441        }
1442    }
1443}