Skip to main content

harn_vm/
agent_events.rs

1//! Agent event stream — the ACP-aligned observation surface for the
2//! agent loop.
3//!
4//! Every phase of the turn loop emits an `AgentEvent`. The canonical
5//! variants map 1:1 onto ACP `SessionUpdate` values; three internal
6//! variants (`TurnStart`, `TurnEnd`, `FeedbackInjected`) let pipelines
7//! react to loop milestones that don't have a direct ACP counterpart.
8//!
9//! There are two subscription paths, both keyed on session id so two
10//! concurrent sessions never cross-talk:
11//!
12//! 1. **External sinks** (`AgentEventSink` trait) — Rust-side consumers
13//!    like the harn-cli ACP server. Invoked synchronously by the loop.
14//!    Stored in a global `OnceLock<RwLock<HashMap<...>>>` here.
15//! 2. **Closure subscribers** — `.harn` closures registered via the
16//!    `agent_subscribe(session_id, callback)` host builtin. These live
17//!    on the session's `SessionState.subscribers` in
18//!    `crate::agent_sessions`, because sessions are the single source
19//!    of truth for session-scoped VM state.
20
21use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23
24use serde::{Deserialize, Serialize};
25
26use crate::event_log::{AnyEventLog, EventLog, LogEvent as EventLogRecord, Topic};
27use crate::orchestration::{HandoffArtifact, MutationSessionRecord};
28use crate::tool_annotations::ToolKind;
29
30/// One coalesced filesystem notification from a hostlib `fs_watch`
31/// subscription.
32#[derive(Clone, Debug, Serialize, Deserialize)]
33pub struct FsWatchEvent {
34    pub kind: String,
35    pub paths: Vec<String>,
36    pub relative_paths: Vec<String>,
37    pub raw_kind: String,
38    pub error: Option<String>,
39}
40
41/// Typed worker lifecycle events emitted by delegated/background agent
42/// execution. Bridge-facing worker updates still derive a string status
43/// from these variants, but the runtime no longer passes raw status
44/// strings around internally.
45///
46/// `Spawned`/`Completed`/`Failed`/`Cancelled` are the four terminal-or-start
47/// states. `Progressed` is fired on intermediate milestones (e.g. a
48/// retriggerable worker resuming from `awaiting_input`, or a workflow
49/// stage completing without ending the worker). `WaitingForInput` covers
50/// retriggerable workers that finish a cycle but stay alive pending the
51/// next host-supplied trigger payload.
52#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
53pub enum WorkerEvent {
54    WorkerSpawned,
55    WorkerProgressed,
56    WorkerWaitingForInput,
57    WorkerCompleted,
58    WorkerFailed,
59    WorkerCancelled,
60}
61
62impl WorkerEvent {
63    /// Wire-level status string used by bridge `worker_update` payloads
64    /// and ACP `worker_update` session updates. The four canonical
65    /// states are mirrored from harn's internal worker `status` field
66    /// (`running`/`completed`/`failed`/`cancelled`), and the two newer
67    /// lifecycle states pick names that don't collide with any existing
68    /// status string.
69    pub fn as_status(self) -> &'static str {
70        match self {
71            Self::WorkerSpawned => "running",
72            Self::WorkerProgressed => "progressed",
73            Self::WorkerWaitingForInput => "awaiting_input",
74            Self::WorkerCompleted => "completed",
75            Self::WorkerFailed => "failed",
76            Self::WorkerCancelled => "cancelled",
77        }
78    }
79
80    pub fn as_str(self) -> &'static str {
81        match self {
82            Self::WorkerSpawned => "WorkerSpawned",
83            Self::WorkerProgressed => "WorkerProgressed",
84            Self::WorkerWaitingForInput => "WorkerWaitingForInput",
85            Self::WorkerCompleted => "WorkerCompleted",
86            Self::WorkerFailed => "WorkerFailed",
87            Self::WorkerCancelled => "WorkerCancelled",
88        }
89    }
90
91    /// True for lifecycle events that mean the worker has reached a
92    /// final, non-resumable state. Retriggerable awaiting and
93    /// progressed milestones are *not* terminal — the worker keeps
94    /// running or is waiting for a trigger.
95    pub fn is_terminal(self) -> bool {
96        matches!(
97            self,
98            Self::WorkerCompleted | Self::WorkerFailed | Self::WorkerCancelled
99        )
100    }
101}
102
103/// Status of a tool call. Mirrors ACP's `toolCallStatus`.
104#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
105#[serde(rename_all = "snake_case")]
106pub enum ToolCallStatus {
107    /// Dispatched by the model but not yet started.
108    Pending,
109    /// Dispatch is actively running.
110    InProgress,
111    /// Finished successfully.
112    Completed,
113    /// Finished with an error.
114    Failed,
115}
116
117impl ToolCallStatus {
118    pub const ALL: [Self; 4] = [
119        Self::Pending,
120        Self::InProgress,
121        Self::Completed,
122        Self::Failed,
123    ];
124
125    pub fn as_str(self) -> &'static str {
126        match self {
127            Self::Pending => "pending",
128            Self::InProgress => "in_progress",
129            Self::Completed => "completed",
130            Self::Failed => "failed",
131        }
132    }
133}
134
135/// Wire-level classification of a `ToolCallUpdate` failure. Pairs with the
136/// human-readable `error` string so clients can render each failure type
137/// distinctly (e.g. surface a "permission denied" badge, or a different
138/// retry affordance for `network` vs `tool_error`). The enum is
139/// deliberately extensible — `unknown` is the default when the runtime
140/// could not classify a failure.
141#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
142#[serde(rename_all = "snake_case")]
143pub enum ToolCallErrorCategory {
144    /// Host-side validation rejected the args (missing required field,
145    /// invalid type, malformed JSON).
146    SchemaValidation,
147    /// The tool ran and returned an error result (e.g. `read_file` on a
148    /// missing path) — distinguished from a transport failure.
149    ToolError,
150    /// MCP transport / server-protocol error.
151    McpServerError,
152    /// Burin Swift host bridge returned an error during dispatch.
153    HostBridgeError,
154    /// `session/request_permission` denied by the client, or a policy
155    /// rule (static or dynamic) refused the call.
156    PermissionDenied,
157    /// The harn loop detector skipped this call because the same
158    /// (tool, args) pair repeated past the configured threshold.
159    RejectedLoop,
160    /// Streaming text candidate was detected (bare `name(` or
161    /// `<tool_call>` opener) but never resolved into a parseable call:
162    /// args parsed as malformed, the heredoc body broke, the tag closed
163    /// without a balanced expression, or the stream ended mid-call.
164    /// Used by the streaming candidate detector (harn#692) to retract a
165    /// `tool_call` candidate that turned out to be prose or syntactically
166    /// broken so clients can dismiss the in-flight chip.
167    ParseAborted,
168    /// The tool exceeded its time budget.
169    Timeout,
170    /// Transient network / rate-limited / 5xx provider failure.
171    Network,
172    /// The tool was cancelled (e.g. session aborted).
173    Cancelled,
174    /// Default when classification was not performed.
175    Unknown,
176}
177
178impl ToolCallErrorCategory {
179    pub const ALL: [Self; 11] = [
180        Self::SchemaValidation,
181        Self::ToolError,
182        Self::McpServerError,
183        Self::HostBridgeError,
184        Self::PermissionDenied,
185        Self::RejectedLoop,
186        Self::ParseAborted,
187        Self::Timeout,
188        Self::Network,
189        Self::Cancelled,
190        Self::Unknown,
191    ];
192
193    pub fn as_str(self) -> &'static str {
194        match self {
195            Self::SchemaValidation => "schema_validation",
196            Self::ToolError => "tool_error",
197            Self::McpServerError => "mcp_server_error",
198            Self::HostBridgeError => "host_bridge_error",
199            Self::PermissionDenied => "permission_denied",
200            Self::RejectedLoop => "rejected_loop",
201            Self::ParseAborted => "parse_aborted",
202            Self::Timeout => "timeout",
203            Self::Network => "network",
204            Self::Cancelled => "cancelled",
205            Self::Unknown => "unknown",
206        }
207    }
208
209    /// Map an internal `ErrorCategory` (used by the VM's `VmError`
210    /// classification) onto the wire enum. The internal taxonomy is
211    /// finer-grained — several transient categories collapse onto
212    /// `Network`, and the auth/quota family becomes `HostBridgeError`
213    /// because at the tool-dispatch boundary those errors come from
214    /// the bridge transport rather than the tool itself.
215    pub fn from_internal(category: &crate::value::ErrorCategory) -> Self {
216        use crate::value::ErrorCategory as Internal;
217        match category {
218            Internal::Timeout => Self::Timeout,
219            Internal::RateLimit
220            | Internal::Overloaded
221            | Internal::ServerError
222            | Internal::TransientNetwork => Self::Network,
223            Internal::SchemaValidation => Self::SchemaValidation,
224            Internal::ToolError => Self::ToolError,
225            Internal::ToolRejected => Self::PermissionDenied,
226            Internal::Cancelled => Self::Cancelled,
227            Internal::Auth
228            | Internal::EgressBlocked
229            | Internal::NotFound
230            | Internal::CircuitOpen
231            | Internal::BudgetExceeded
232            | Internal::Generic => Self::HostBridgeError,
233        }
234    }
235}
236
237/// Where a tool actually ran. Tags `ToolCallUpdate` so clients can render
238/// "via mcp:linear" / "via host bridge" badges, attribute latency by
239/// transport, and route errors to the right surface (harn#691).
240///
241/// On the wire this serializes adjacently-tagged so the `mcp_server`
242/// case carries the configured server name. The ACP adapter rewrites
243/// unit variants as bare strings (`"harn_builtin"`, `"host_bridge"`,
244/// `"provider_native"`) and the `McpServer` case as
245/// `{"kind": "mcp_server", "serverName": "..."}` to match the protocol's
246/// camelCase convention.
247#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
248#[serde(tag = "kind", rename_all = "snake_case")]
249pub enum ToolExecutor {
250    /// VM-stdlib (`read_file`, `write_file`, `exec`, `http_*`, `mcp_*`)
251    /// or any Harn-side handler closure registered in `tools_val`.
252    HarnBuiltin,
253    /// Capability provided by the host through `HostBridge.builtin_call`
254    /// (Swift-side IDE bridge, BurinApp, BurinCLI host shells).
255    HostBridge,
256    /// Tool dispatched against a configured MCP server. Detected by the
257    /// `_mcp_server` tag that `mcp_list_tools` injects on every tool
258    /// dict before the agent loop sees it.
259    McpServer { server_name: String },
260    /// Provider-side server-side tool execution — currently OpenAI
261    /// Responses-API server tools (e.g. native `tool_search`). The
262    /// runtime never dispatches these locally; the model returns the
263    /// already-executed result inline.
264    ProviderNative,
265}
266
267/// Events emitted by the agent loop. Some variants map 1:1 to ACP
268/// `sessionUpdate` variants; Harn-specific lifecycle events ride on the
269/// extension stream.
270#[derive(Clone, Debug, Serialize, Deserialize)]
271#[serde(tag = "type", rename_all = "snake_case")]
272pub enum AgentEvent {
273    AgentMessageChunk {
274        session_id: String,
275        content: String,
276    },
277    AgentThoughtChunk {
278        session_id: String,
279        content: String,
280    },
281    ToolCall {
282        session_id: String,
283        tool_call_id: String,
284        tool_name: String,
285        kind: Option<ToolKind>,
286        status: ToolCallStatus,
287        raw_input: serde_json::Value,
288        /// Set to `Some(true)` by the streaming candidate detector
289        /// (harn#692) when this event represents a tool-call shape
290        /// detected in the model's in-flight assistant text but whose
291        /// arguments have not finished parsing yet. Clients can render a
292        /// spinner / placeholder while the model writes the body. The
293        /// detector follows up with a `ToolCallUpdate { parsing: false,
294        /// .. }` carrying either `status: pending` (promoted) or
295        /// `status: failed` with `error_category: parse_aborted`.
296        /// `None` (the default) means "this is a normal post-parse tool
297        /// call, no candidate phase was active" so the on-disk shape
298        /// stays compatible with replays recorded before this field
299        /// existed.
300        #[serde(default, skip_serializing_if = "Option::is_none")]
301        parsing: Option<bool>,
302        /// Mutation-session audit context active when the tool was
303        /// dispatched (see harn#699). Hosts use it to group every tool
304        /// emission belonging to the same write-capable session.
305        #[serde(default, skip_serializing_if = "Option::is_none")]
306        audit: Option<MutationSessionRecord>,
307    },
308    ToolCallUpdate {
309        session_id: String,
310        tool_call_id: String,
311        tool_name: String,
312        status: ToolCallStatus,
313        raw_output: Option<serde_json::Value>,
314        error: Option<String>,
315        /// Wall-clock milliseconds from the parse-to-execution boundary
316        /// to the terminal `Completed`/`Failed` update. Includes the
317        /// time spent in any wrapping orchestration logic (loop checks,
318        /// post-tool hooks, microcompaction). Populated only on the
319        /// terminal update — `None` on intermediate `Pending` /
320        /// `InProgress` updates so clients can ignore the field until
321        /// it shows up.
322        #[serde(default, skip_serializing_if = "Option::is_none")]
323        duration_ms: Option<u64>,
324        /// Milliseconds spent in the actual host/builtin/MCP dispatch
325        /// call only (the inner `dispatch_tool_execution` window).
326        /// Populated only on the terminal update; `None` otherwise.
327        #[serde(default, skip_serializing_if = "Option::is_none")]
328        execution_duration_ms: Option<u64>,
329        /// Structured classification of the failure (when `status` is
330        /// `Failed`). Paired with `error` so clients can render each
331        /// category distinctly without parsing free-form strings. Always
332        /// `None` for non-Failed updates and serialized as
333        /// `errorCategory` in the ACP wire format.
334        #[serde(default, skip_serializing_if = "Option::is_none")]
335        error_category: Option<ToolCallErrorCategory>,
336        /// Where the tool actually ran. `None` only for events emitted
337        /// from sites that pre-date the dispatch decision (e.g. the
338        /// pending → in-progress transition the loop emits before the
339        /// dispatcher picks a backend).
340        #[serde(default, skip_serializing_if = "Option::is_none")]
341        executor: Option<ToolExecutor>,
342        /// Companion to `ToolCall.parsing` (harn#692). The streaming
343        /// candidate detector emits the *terminal* candidate event as a
344        /// `ToolCallUpdate` with `parsing: Some(false)` to retract the
345        /// in-flight `parsing: true` chip — either by promoting the
346        /// candidate (`status: pending`, populated `raw_output: None`,
347        /// `error: None`) or aborting it (`status: failed`,
348        /// `error_category: parse_aborted`). `None` means this update is
349        /// not part of a candidate-phase transition.
350        #[serde(default, skip_serializing_if = "Option::is_none")]
351        parsing: Option<bool>,
352        /// Best-effort partial parse of the streamed tool-call arguments.
353        /// Populated by the SSE transport on `Pending` updates as the
354        /// model streams `input_json_delta` (Anthropic) or
355        /// `tool_calls[].function.arguments` deltas (OpenAI). `None` on
356        /// terminal updates and on emissions from non-streaming paths
357        /// (#693). When the partial bytes are not yet parseable as JSON
358        /// the transport falls back to `raw_input_partial`.
359        #[serde(default, skip_serializing_if = "Option::is_none")]
360        raw_input: Option<serde_json::Value>,
361        /// Raw concatenated bytes of the streamed tool-call arguments
362        /// when a permissive parse failed (#693). Mutually exclusive
363        /// with `raw_input`: clients render whichever is present.
364        #[serde(default, skip_serializing_if = "Option::is_none")]
365        raw_input_partial: Option<String>,
366        /// Mutation-session audit context for the tool call. Carries the
367        /// same payload as on the paired `ToolCall` event so a host
368        /// processing a single update doesn't have to correlate against
369        /// the prior pending event.
370        #[serde(default, skip_serializing_if = "Option::is_none")]
371        audit: Option<MutationSessionRecord>,
372    },
373    Plan {
374        session_id: String,
375        plan: serde_json::Value,
376    },
377    TurnStart {
378        session_id: String,
379        iteration: usize,
380    },
381    TurnEnd {
382        session_id: String,
383        iteration: usize,
384        turn_info: serde_json::Value,
385    },
386    /// Emitted when a first-class agent session is explicitly closed by
387    /// `agent_session_close`. This gives event-log consumers a typed
388    /// terminal marker even when no final model turn runs.
389    SessionClosed {
390        session_id: String,
391        reason: String,
392        status: String,
393        metadata: serde_json::Value,
394    },
395    JudgeDecision {
396        session_id: String,
397        iteration: usize,
398        verdict: String,
399        reasoning: String,
400        next_step: Option<String>,
401        judge_duration_ms: u64,
402    },
403    TypedCheckpoint {
404        session_id: String,
405        checkpoint: serde_json::Value,
406    },
407    FeedbackInjected {
408        session_id: String,
409        kind: String,
410        content: String,
411    },
412    /// Emitted when the agent loop exhausts `max_iterations` without any
413    /// explicit break condition firing. Distinct from a natural "done" or
414    /// a "stuck" nudge-exhaustion: this is strictly a budget cap.
415    BudgetExhausted {
416        session_id: String,
417        max_iterations: usize,
418    },
419    /// Emitted when the loop breaks because consecutive text-only turns
420    /// hit `max_nudges`. Parity with `BudgetExhausted` / `TurnEnd` for
421    /// hosts that key off agent-terminal events.
422    LoopStuck {
423        session_id: String,
424        max_nudges: usize,
425        last_iteration: usize,
426        tail_excerpt: String,
427    },
428    /// Emitted when the daemon idle-wait loop trips its watchdog because
429    /// every configured wake source returned `None` for N consecutive
430    /// attempts. Exists so a broken daemon doesn't hang the session
431    /// silently.
432    DaemonWatchdogTripped {
433        session_id: String,
434        attempts: usize,
435        elapsed_ms: u64,
436    },
437    /// Emitted when a skill is activated. Carries the match reason so
438    /// replayers can reconstruct *why* a given skill took effect at
439    /// this iteration.
440    SkillActivated {
441        session_id: String,
442        skill_name: String,
443        iteration: usize,
444        reason: String,
445    },
446    /// Emitted when a previously-active skill is deactivated because
447    /// the reassess phase no longer matches it.
448    SkillDeactivated {
449        session_id: String,
450        skill_name: String,
451        iteration: usize,
452    },
453    /// Emitted once per activation when the skill's `allowed_tools` filter
454    /// narrows the effective tool surface exposed to the model.
455    SkillScopeTools {
456        session_id: String,
457        skill_name: String,
458        allowed_tools: Vec<String>,
459    },
460    /// Emitted when a `tool_search` query is issued by the model. Carries
461    /// the raw query args, the configured strategy, and a `mode` tag
462    /// distinguishing the client-executed fallback (`"client"`) from
463    /// provider-native paths (`"anthropic"` / `"openai"`). Mirrors the
464    /// transcript event shape so hosts can render a search-in-progress
465    /// chip in real time — the replay path walks the transcript after
466    /// the turn, which is too late for live UX.
467    ToolSearchQuery {
468        session_id: String,
469        tool_use_id: String,
470        name: String,
471        query: serde_json::Value,
472        strategy: String,
473        mode: String,
474    },
475    /// Emitted when `tool_search` resolves — carries the list of tool
476    /// names newly promoted into the model's effective surface for the
477    /// next turn. Pair-emitted with `ToolSearchQuery` on every search.
478    ToolSearchResult {
479        session_id: String,
480        tool_use_id: String,
481        promoted: Vec<String>,
482        strategy: String,
483        mode: String,
484    },
485    TranscriptCompacted {
486        session_id: String,
487        mode: String,
488        strategy: String,
489        archived_messages: usize,
490        estimated_tokens_before: usize,
491        estimated_tokens_after: usize,
492        snapshot_asset_id: Option<String>,
493    },
494    Handoff {
495        session_id: String,
496        artifact_id: String,
497        handoff: Box<HandoffArtifact>,
498    },
499    FsWatch {
500        session_id: String,
501        subscription_id: String,
502        events: Vec<FsWatchEvent>,
503    },
504    /// Lifecycle update for a delegated/background worker. Carries the
505    /// canonical typed `event` variant alongside the worker's current
506    /// `status` string and the structured `metadata` payload that
507    /// `worker_bridge_metadata` builds (task, mode, timing, child
508    /// run/snapshot paths, audit-session, etc.). The `audit` field is
509    /// the same `MutationSessionRecord` JSON serialization carried on
510    /// the bridge wire so ACP/A2A consumers don't need to re-derive it.
511    ///
512    /// One-to-one with the bridge-side `worker_update` session-update
513    /// notification: ACP and A2A adapters subscribe to this variant
514    /// and translate it into their respective wire formats. The
515    /// `session_id` is the parent agent session that owns the worker
516    /// (i.e. the session whose VM spawned the worker), so a single
517    /// host stays subscribed to the same sink for both message and
518    /// worker traffic.
519    WorkerUpdate {
520        session_id: String,
521        worker_id: String,
522        worker_name: String,
523        worker_task: String,
524        worker_mode: String,
525        event: WorkerEvent,
526        status: String,
527        metadata: serde_json::Value,
528        audit: Option<serde_json::Value>,
529    },
530    /// A human-in-the-loop primitive (`ask_user`, `request_approval`,
531    /// `dual_control`, `escalate`) has just suspended the script and is
532    /// waiting on a response. Hosts that bridge the VM onto a remote
533    /// transport (ACP, A2A) translate this into a "paused / awaiting
534    /// input" wire signal so the client knows the task isn't stuck —
535    /// it's blocked on the human side. Pair-emitted with `HitlResolved`
536    /// when the waitpoint completes/cancels/times out.
537    HitlRequested {
538        session_id: String,
539        request_id: String,
540        kind: String,
541        payload: serde_json::Value,
542    },
543    /// Companion to `HitlRequested`: the waitpoint has resolved (either
544    /// a response arrived, the deadline elapsed, or the request was
545    /// cancelled). `outcome` is one of `"answered"`, `"timeout"`,
546    /// `"cancelled"`. Hosts use this to flip task state back to
547    /// `working` after an `input-required` pause.
548    HitlResolved {
549        session_id: String,
550        request_id: String,
551        kind: String,
552        outcome: String,
553    },
554    /// Emitted by the agent loop's adaptive iteration budget /
555    /// `loop_control` policy when a budget extension or early stop fires.
556    /// Generic enough to cover both shapes — `action` distinguishes them.
557    /// Carries the iteration the decision applied to, the previous /
558    /// resulting iteration limit, the policy reason string, and (for
559    /// stops) the loop status.
560    LoopControlDecision {
561        session_id: String,
562        iteration: usize,
563        action: String,
564        old_limit: usize,
565        new_limit: usize,
566        reason: String,
567        status: String,
568    },
569    /// Emitted when `agent_loop` detects adjacent repeated tool calls with
570    /// identical arguments. The warning payload avoids raw arguments by
571    /// default and carries digests so hosts can correlate repeats without
572    /// exposing potentially sensitive tool inputs.
573    AgentLoopStallWarning {
574        session_id: String,
575        warning: serde_json::Value,
576    },
577    /// Emitted when a `tool_caller` middleware (see std/llm/tool_middleware)
578    /// attaches structured audit metadata to a tool call — typically a
579    /// user-facing `summary`, a `description`, an ACP-style `kind`, an MCP
580    /// `hints` block, a `consent` decision, the per-layer `layers` log, or
581    /// free-form `metadata` keys (A2A-style extension slot).
582    ///
583    /// One-to-one with the underlying tool-call: hosts can join on
584    /// `tool_call_id` to render middleware-attached chips alongside the
585    /// existing `ToolCall` / `ToolCallUpdate` stream. The `audit` payload
586    /// is intentionally free-form JSON so middleware can carry whatever
587    /// shape the harness author chooses without needing protocol-level
588    /// changes per new middleware.
589    ToolCallAudit {
590        session_id: String,
591        tool_call_id: String,
592        tool_name: String,
593        audit: serde_json::Value,
594    },
595    /// Emitted by `std/cache::with_cache` (both the generic and LLM
596    /// forms) when a cached lookup returns a hit. Carries the
597    /// content-addressed key, the backend that served the value, and a
598    /// `metrics` block with the cost-moat receipts the persona value
599    /// ledger (harn-cloud#58) and crystallization receipts read:
600    /// `model_calls_avoided`, plus `tokens_saved` / `latency_saved_ms`
601    /// when the cached envelope carried `usage` / `latency_ms`.
602    CacheHit {
603        session_id: String,
604        key: String,
605        backend: String,
606        namespace: String,
607        payload: serde_json::Value,
608    },
609    /// Paired with `CacheHit`. Emitted on the miss path when the
610    /// fresh result is stored. `payload.metrics.compute_ms` carries
611    /// the wall-clock cost of the underlying computation, which
612    /// callers can feed back as `estimate.latency_saved_ms` on the
613    /// next hit.
614    CacheMiss {
615        session_id: String,
616        key: String,
617        backend: String,
618        namespace: String,
619        payload: serde_json::Value,
620    },
621}
622
623impl AgentEvent {
624    pub fn session_id(&self) -> &str {
625        match self {
626            Self::AgentMessageChunk { session_id, .. }
627            | Self::AgentThoughtChunk { session_id, .. }
628            | Self::ToolCall { session_id, .. }
629            | Self::ToolCallUpdate { session_id, .. }
630            | Self::Plan { session_id, .. }
631            | Self::TurnStart { session_id, .. }
632            | Self::TurnEnd { session_id, .. }
633            | Self::SessionClosed { session_id, .. }
634            | Self::JudgeDecision { session_id, .. }
635            | Self::TypedCheckpoint { session_id, .. }
636            | Self::FeedbackInjected { session_id, .. }
637            | Self::BudgetExhausted { session_id, .. }
638            | Self::LoopStuck { session_id, .. }
639            | Self::DaemonWatchdogTripped { session_id, .. }
640            | Self::SkillActivated { session_id, .. }
641            | Self::SkillDeactivated { session_id, .. }
642            | Self::SkillScopeTools { session_id, .. }
643            | Self::ToolSearchQuery { session_id, .. }
644            | Self::ToolSearchResult { session_id, .. }
645            | Self::TranscriptCompacted { session_id, .. }
646            | Self::Handoff { session_id, .. }
647            | Self::FsWatch { session_id, .. }
648            | Self::WorkerUpdate { session_id, .. }
649            | Self::HitlRequested { session_id, .. }
650            | Self::HitlResolved { session_id, .. }
651            | Self::LoopControlDecision { session_id, .. }
652            | Self::AgentLoopStallWarning { session_id, .. }
653            | Self::ToolCallAudit { session_id, .. }
654            | Self::CacheHit { session_id, .. }
655            | Self::CacheMiss { session_id, .. } => session_id,
656        }
657    }
658}
659
660/// External consumers of the event stream (e.g. the harn-cli ACP server,
661/// which translates events into JSON-RPC notifications).
662pub trait AgentEventSink: Send + Sync {
663    fn handle_event(&self, event: &AgentEvent);
664}
665
666/// Envelope written to `event_log.jsonl` (#103). Wraps the raw
667/// `AgentEvent` with monotonic index + timestamp + frame depth so
668/// replay engines can reconstruct paused state at any event index,
669/// and scrubber UIs can bucket events by time. The envelope is the
670/// on-disk shape; the wire format for live consumers is still the
671/// raw `AgentEvent` so existing sinks don't churn.
672#[derive(Clone, Debug, Serialize, Deserialize)]
673pub struct PersistedAgentEvent {
674    /// Monotonic per-session index starting at 0. Unique within a
675    /// session; gaps never happen even under load because the sink
676    /// owns the counter under a mutex.
677    pub index: u64,
678    /// Milliseconds since the Unix epoch, captured when the sink
679    /// received the event. Not the event's emission time — that
680    /// would require threading a clock through every emit site.
681    pub emitted_at_ms: i64,
682    /// Call-stack depth at the moment of emission, when the caller
683    /// can supply it. `None` for events emitted from a context where
684    /// the VM frame stack isn't available.
685    pub frame_depth: Option<u32>,
686    /// The raw event, flattened so `jq '.type'` works as expected.
687    #[serde(flatten)]
688    pub event: AgentEvent,
689}
690
691/// Append-only JSONL sink for a single session's event stream (#103).
692/// One writer per session; sinks rotate to a numbered suffix when a
693/// running file crosses `ROTATE_BYTES` (100 MB today — long chat
694/// sessions rarely exceed 5 MB, so rotation almost never fires).
695pub struct JsonlEventSink {
696    state: Mutex<JsonlEventSinkState>,
697    base_path: std::path::PathBuf,
698}
699
700struct JsonlEventSinkState {
701    writer: std::io::BufWriter<std::fs::File>,
702    index: u64,
703    bytes_written: u64,
704    rotation: u32,
705}
706
707impl JsonlEventSink {
708    /// Hard cap past which the current file rotates to a numbered
709    /// suffix (`event_log-000001.jsonl`). Chosen so long debugging
710    /// sessions don't produce unreadable multi-GB logs.
711    pub const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
712
713    /// Open a new sink writing to `base_path`. Creates parent dirs
714    /// if missing. Overwrites an existing file so each fresh session
715    /// starts from index 0.
716    pub fn open(base_path: impl Into<std::path::PathBuf>) -> std::io::Result<Arc<Self>> {
717        let base_path = base_path.into();
718        if let Some(parent) = base_path.parent() {
719            std::fs::create_dir_all(parent)?;
720        }
721        let file = std::fs::OpenOptions::new()
722            .create(true)
723            .truncate(true)
724            .write(true)
725            .open(&base_path)?;
726        Ok(Arc::new(Self {
727            state: Mutex::new(JsonlEventSinkState {
728                writer: std::io::BufWriter::new(file),
729                index: 0,
730                bytes_written: 0,
731                rotation: 0,
732            }),
733            base_path,
734        }))
735    }
736
737    /// Flush any buffered writes. Called on session shutdown; the
738    /// Drop impl calls this too but on early panic it may not run.
739    pub fn flush(&self) -> std::io::Result<()> {
740        use std::io::Write as _;
741        self.state
742            .lock()
743            .expect("jsonl sink mutex poisoned")
744            .writer
745            .flush()
746    }
747
748    /// Current event index — primarily for tests and the "how many
749    /// events are in this run" run-record summary.
750    pub fn event_count(&self) -> u64 {
751        self.state.lock().expect("jsonl sink mutex poisoned").index
752    }
753
754    fn rotate_if_needed(&self, state: &mut JsonlEventSinkState) -> std::io::Result<()> {
755        use std::io::Write as _;
756        if state.bytes_written < Self::ROTATE_BYTES {
757            return Ok(());
758        }
759        state.writer.flush()?;
760        state.rotation += 1;
761        let suffix = format!("-{:06}", state.rotation);
762        let rotated = self.base_path.with_file_name({
763            let stem = self
764                .base_path
765                .file_stem()
766                .and_then(|s| s.to_str())
767                .unwrap_or("event_log");
768            let ext = self
769                .base_path
770                .extension()
771                .and_then(|e| e.to_str())
772                .unwrap_or("jsonl");
773            format!("{stem}{suffix}.{ext}")
774        });
775        let file = std::fs::OpenOptions::new()
776            .create(true)
777            .truncate(true)
778            .write(true)
779            .open(&rotated)?;
780        state.writer = std::io::BufWriter::new(file);
781        state.bytes_written = 0;
782        Ok(())
783    }
784}
785
786/// Event-log-backed sink for a single session's agent event stream.
787/// Uses the generalized append-only event log when one is installed for
788/// the current VM thread and falls back to `JsonlEventSink` only for
789/// older env-driven workflows.
790pub struct EventLogSink {
791    log: Arc<AnyEventLog>,
792    topic: Topic,
793    session_id: String,
794}
795
796impl EventLogSink {
797    pub fn new(log: Arc<AnyEventLog>, session_id: impl Into<String>) -> Arc<Self> {
798        let session_id = session_id.into();
799        let topic = Topic::new(format!(
800            "observability.agent_events.{}",
801            crate::event_log::sanitize_topic_component(&session_id)
802        ))
803        .expect("session id should sanitize to a valid topic");
804        Arc::new(Self {
805            log,
806            topic,
807            session_id,
808        })
809    }
810}
811
812impl AgentEventSink for JsonlEventSink {
813    fn handle_event(&self, event: &AgentEvent) {
814        use std::io::Write as _;
815        let mut state = self.state.lock().expect("jsonl sink mutex poisoned");
816        let index = state.index;
817        state.index += 1;
818        let emitted_at_ms = std::time::SystemTime::now()
819            .duration_since(std::time::UNIX_EPOCH)
820            .map(|d| d.as_millis() as i64)
821            .unwrap_or(0);
822        let envelope = PersistedAgentEvent {
823            index,
824            emitted_at_ms,
825            frame_depth: None,
826            event: event.clone(),
827        };
828        if let Ok(line) = serde_json::to_string(&envelope) {
829            // One line, newline-terminated — JSON Lines spec.
830            // Errors here are swallowed on purpose; a failing write
831            // must never crash the agent loop, and the run record
832            // itself is a secondary artifact.
833            let _ = state.writer.write_all(line.as_bytes());
834            let _ = state.writer.write_all(b"\n");
835            state.bytes_written += line.len() as u64 + 1;
836            let _ = self.rotate_if_needed(&mut state);
837        }
838    }
839}
840
841impl AgentEventSink for EventLogSink {
842    fn handle_event(&self, event: &AgentEvent) {
843        let event_json = match serde_json::to_value(event) {
844            Ok(value) => value,
845            Err(_) => return,
846        };
847        let event_kind = event_json
848            .get("type")
849            .and_then(|value| value.as_str())
850            .unwrap_or("agent_event")
851            .to_string();
852        let payload = serde_json::json!({
853            "index_hint": now_ms(),
854            "session_id": self.session_id,
855            "event": event_json,
856        });
857        let mut headers = std::collections::BTreeMap::new();
858        headers.insert("session_id".to_string(), self.session_id.clone());
859        let log = self.log.clone();
860        let topic = self.topic.clone();
861        let record = EventLogRecord::new(event_kind, payload).with_headers(headers);
862        if let Ok(handle) = tokio::runtime::Handle::try_current() {
863            handle.spawn(async move {
864                let _ = log.append(&topic, record).await;
865            });
866        } else {
867            let _ = futures::executor::block_on(log.append(&topic, record));
868        }
869    }
870}
871
872impl Drop for JsonlEventSink {
873    fn drop(&mut self) {
874        if let Ok(mut state) = self.state.lock() {
875            use std::io::Write as _;
876            let _ = state.writer.flush();
877        }
878    }
879}
880
881/// Fan-out helper for composing multiple external sinks.
882pub struct MultiSink {
883    sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
884}
885
886impl MultiSink {
887    pub fn new() -> Self {
888        Self {
889            sinks: Mutex::new(Vec::new()),
890        }
891    }
892    pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
893        self.sinks.lock().expect("sink mutex poisoned").push(sink);
894    }
895    pub fn len(&self) -> usize {
896        self.sinks.lock().expect("sink mutex poisoned").len()
897    }
898    pub fn is_empty(&self) -> bool {
899        self.len() == 0
900    }
901}
902
903impl Default for MultiSink {
904    fn default() -> Self {
905        Self::new()
906    }
907}
908
909impl AgentEventSink for MultiSink {
910    fn handle_event(&self, event: &AgentEvent) {
911        // Deliberate: snapshot then release the lock before invoking sink
912        // callbacks. Sinks can re-enter the event system (e.g. a host
913        // sink that logs to another AgentEvent path), so holding the
914        // mutex across the callback would risk self-deadlock. Arc clones
915        // are refcount bumps — cheap.
916        let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
917        for sink in sinks {
918            sink.handle_event(event);
919        }
920    }
921}
922
923#[cfg(test)]
924#[derive(Clone)]
925struct RegisteredSink {
926    owner: std::thread::ThreadId,
927    sink: Arc<dyn AgentEventSink>,
928}
929
930#[cfg(not(test))]
931type RegisteredSink = Arc<dyn AgentEventSink>;
932
933type ExternalSinkRegistry = RwLock<HashMap<String, Vec<RegisteredSink>>>;
934
935fn external_sinks() -> &'static ExternalSinkRegistry {
936    static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
937    REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
938}
939
940pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
941    let session_id = session_id.into();
942    let mut reg = external_sinks().write().expect("sink registry poisoned");
943    #[cfg(test)]
944    let sink = RegisteredSink {
945        owner: std::thread::current().id(),
946        sink,
947    };
948    reg.entry(session_id).or_default().push(sink);
949}
950
951/// Remove all external sinks registered for `session_id`. Does NOT
952/// close the session itself — subscribers and transcript survive, so a
953/// later `agent_loop` call with the same id continues the conversation.
954pub fn clear_session_sinks(session_id: &str) {
955    #[cfg(test)]
956    {
957        let owner = std::thread::current().id();
958        let mut reg = external_sinks().write().expect("sink registry poisoned");
959        if let Some(sinks) = reg.get_mut(session_id) {
960            sinks.retain(|sink| sink.owner != owner);
961            if sinks.is_empty() {
962                reg.remove(session_id);
963            }
964        }
965    }
966    #[cfg(not(test))]
967    {
968        external_sinks()
969            .write()
970            .expect("sink registry poisoned")
971            .remove(session_id);
972    }
973}
974
975pub fn reset_all_sinks() {
976    #[cfg(test)]
977    {
978        let owner = std::thread::current().id();
979        let mut reg = external_sinks().write().expect("sink registry poisoned");
980        reg.retain(|_, sinks| {
981            sinks.retain(|sink| sink.owner != owner);
982            !sinks.is_empty()
983        });
984        crate::agent_sessions::reset_session_store();
985    }
986    #[cfg(not(test))]
987    {
988        external_sinks()
989            .write()
990            .expect("sink registry poisoned")
991            .clear();
992        crate::agent_sessions::reset_session_store();
993    }
994}
995
996/// Mirror externally-registered sinks from `source_session_id` onto
997/// `target_session_id` without moving ownership. Transports such as ACP
998/// register sinks on the outer prompt session before a script runs; scripts
999/// may then open a first-class agent transcript and route `agent_loop` events
1000/// through that inner id. Mirroring keeps the transport subscribed to the
1001/// in-run child transcript while preserving explicit session ids.
1002pub fn mirror_session_sinks(source_session_id: &str, target_session_id: &str) {
1003    if source_session_id.is_empty() || target_session_id.is_empty() {
1004        return;
1005    }
1006    if source_session_id == target_session_id {
1007        return;
1008    }
1009    let mut reg = external_sinks().write().expect("sink registry poisoned");
1010    let Some(source_sinks) = reg.get(source_session_id).cloned() else {
1011        return;
1012    };
1013    let target = reg.entry(target_session_id.to_string()).or_default();
1014    #[cfg(test)]
1015    {
1016        for source in source_sinks {
1017            let already_present = target
1018                .iter()
1019                .any(|existing| Arc::ptr_eq(&existing.sink, &source.sink));
1020            if !already_present {
1021                target.push(source);
1022            }
1023        }
1024    }
1025    #[cfg(not(test))]
1026    {
1027        for source in source_sinks {
1028            let already_present = target.iter().any(|existing| Arc::ptr_eq(existing, &source));
1029            if !already_present {
1030                target.push(source);
1031            }
1032        }
1033    }
1034}
1035
1036/// Emit an event to external sinks registered for this session. Pipeline
1037/// closure subscribers are NOT called by this function — the agent
1038/// loop owns that path because it needs its async VM context.
1039pub fn emit_event(event: &AgentEvent) {
1040    let sinks: Vec<Arc<dyn AgentEventSink>> = {
1041        let reg = external_sinks().read().expect("sink registry poisoned");
1042        #[cfg(test)]
1043        {
1044            let owner = std::thread::current().id();
1045            reg.get(event.session_id())
1046                .map(|sinks| {
1047                    sinks
1048                        .iter()
1049                        .filter(|sink| sink.owner == owner)
1050                        .map(|sink| sink.sink.clone())
1051                        .collect()
1052                })
1053                .unwrap_or_default()
1054        }
1055        #[cfg(not(test))]
1056        {
1057            reg.get(event.session_id()).cloned().unwrap_or_default()
1058        }
1059    };
1060    for sink in sinks {
1061        sink.handle_event(event);
1062    }
1063}
1064
1065fn now_ms() -> i64 {
1066    std::time::SystemTime::now()
1067        .duration_since(std::time::UNIX_EPOCH)
1068        .map(|duration| duration.as_millis() as i64)
1069        .unwrap_or(0)
1070}
1071
1072pub fn session_external_sink_count(session_id: &str) -> usize {
1073    #[cfg(test)]
1074    {
1075        let owner = std::thread::current().id();
1076        return external_sinks()
1077            .read()
1078            .expect("sink registry poisoned")
1079            .get(session_id)
1080            .map(|sinks| sinks.iter().filter(|sink| sink.owner == owner).count())
1081            .unwrap_or(0);
1082    }
1083    #[cfg(not(test))]
1084    {
1085        external_sinks()
1086            .read()
1087            .expect("sink registry poisoned")
1088            .get(session_id)
1089            .map(|v| v.len())
1090            .unwrap_or(0)
1091    }
1092}
1093
1094pub fn session_closure_subscriber_count(session_id: &str) -> usize {
1095    crate::agent_sessions::subscriber_count(session_id)
1096}
1097
1098#[cfg(test)]
1099mod tests {
1100    use super::*;
1101    use std::sync::atomic::{AtomicUsize, Ordering};
1102
1103    struct CountingSink(Arc<AtomicUsize>);
1104    impl AgentEventSink for CountingSink {
1105        fn handle_event(&self, _event: &AgentEvent) {
1106            self.0.fetch_add(1, Ordering::SeqCst);
1107        }
1108    }
1109
1110    #[test]
1111    fn multi_sink_fans_out_in_order() {
1112        let multi = MultiSink::new();
1113        let a = Arc::new(AtomicUsize::new(0));
1114        let b = Arc::new(AtomicUsize::new(0));
1115        multi.push(Arc::new(CountingSink(a.clone())));
1116        multi.push(Arc::new(CountingSink(b.clone())));
1117        let event = AgentEvent::TurnStart {
1118            session_id: "s1".into(),
1119            iteration: 1,
1120        };
1121        multi.handle_event(&event);
1122        assert_eq!(a.load(Ordering::SeqCst), 1);
1123        assert_eq!(b.load(Ordering::SeqCst), 1);
1124    }
1125
1126    #[test]
1127    fn session_scoped_sink_routing() {
1128        reset_all_sinks();
1129        let a = Arc::new(AtomicUsize::new(0));
1130        let b = Arc::new(AtomicUsize::new(0));
1131        register_sink("session-a", Arc::new(CountingSink(a.clone())));
1132        register_sink("session-b", Arc::new(CountingSink(b.clone())));
1133        emit_event(&AgentEvent::TurnStart {
1134            session_id: "session-a".into(),
1135            iteration: 0,
1136        });
1137        assert_eq!(a.load(Ordering::SeqCst), 1);
1138        assert_eq!(b.load(Ordering::SeqCst), 0);
1139        emit_event(&AgentEvent::TurnEnd {
1140            session_id: "session-b".into(),
1141            iteration: 0,
1142            turn_info: serde_json::json!({}),
1143        });
1144        assert_eq!(a.load(Ordering::SeqCst), 1);
1145        assert_eq!(b.load(Ordering::SeqCst), 1);
1146        clear_session_sinks("session-a");
1147        assert_eq!(session_external_sink_count("session-a"), 0);
1148        assert_eq!(session_external_sink_count("session-b"), 1);
1149        reset_all_sinks();
1150    }
1151
1152    #[test]
1153    fn newly_opened_child_session_inherits_current_external_sinks() {
1154        reset_all_sinks();
1155        let delivered = Arc::new(AtomicUsize::new(0));
1156        register_sink("outer-session", Arc::new(CountingSink(delivered.clone())));
1157        {
1158            let _guard = crate::agent_sessions::enter_current_session("outer-session");
1159            let inner = crate::agent_sessions::open_or_create(None);
1160            assert_ne!(inner, "outer-session");
1161            emit_event(&AgentEvent::TurnStart {
1162                session_id: inner,
1163                iteration: 0,
1164            });
1165        }
1166        assert_eq!(delivered.load(Ordering::SeqCst), 1);
1167        reset_all_sinks();
1168    }
1169
1170    #[test]
1171    fn jsonl_sink_writes_monotonic_indices_and_timestamps() {
1172        use std::io::{BufRead, BufReader};
1173        let dir = std::env::temp_dir().join(format!("harn-event-log-{}", std::process::id()));
1174        std::fs::create_dir_all(&dir).unwrap();
1175        let path = dir.join("event_log.jsonl");
1176        let sink = JsonlEventSink::open(&path).unwrap();
1177        for i in 0..5 {
1178            sink.handle_event(&AgentEvent::TurnStart {
1179                session_id: "s".into(),
1180                iteration: i,
1181            });
1182        }
1183        assert_eq!(sink.event_count(), 5);
1184        sink.flush().unwrap();
1185
1186        // Read back + assert monotonic indices + non-decreasing timestamps.
1187        let file = std::fs::File::open(&path).unwrap();
1188        let mut last_idx: i64 = -1;
1189        let mut last_ts: i64 = 0;
1190        for line in BufReader::new(file).lines() {
1191            let line = line.unwrap();
1192            let val: serde_json::Value = serde_json::from_str(&line).unwrap();
1193            let idx = val["index"].as_i64().unwrap();
1194            let ts = val["emitted_at_ms"].as_i64().unwrap();
1195            assert_eq!(idx, last_idx + 1, "indices must be contiguous");
1196            assert!(ts >= last_ts, "timestamps must be non-decreasing");
1197            last_idx = idx;
1198            last_ts = ts;
1199            // Event payload flattened — type tag must survive.
1200            assert_eq!(val["type"], "turn_start");
1201        }
1202        assert_eq!(last_idx, 4);
1203        let _ = std::fs::remove_file(&path);
1204    }
1205
1206    #[test]
1207    fn judge_decision_round_trips_through_jsonl_sink() {
1208        use std::io::{BufRead, BufReader};
1209        let dir =
1210            std::env::temp_dir().join(format!("harn-judge-event-log-{}", uuid::Uuid::now_v7()));
1211        std::fs::create_dir_all(&dir).unwrap();
1212        let path = dir.join("event_log.jsonl");
1213        let sink = JsonlEventSink::open(&path).unwrap();
1214        sink.handle_event(&AgentEvent::JudgeDecision {
1215            session_id: "s".into(),
1216            iteration: 2,
1217            verdict: "continue".into(),
1218            reasoning: "needs a concrete next step".into(),
1219            next_step: Some("run the verifier".into()),
1220            judge_duration_ms: 17,
1221        });
1222        sink.flush().unwrap();
1223
1224        let file = std::fs::File::open(&path).unwrap();
1225        let line = BufReader::new(file).lines().next().unwrap().unwrap();
1226        let recovered: PersistedAgentEvent = serde_json::from_str(&line).unwrap();
1227        match recovered.event {
1228            AgentEvent::JudgeDecision {
1229                session_id,
1230                iteration,
1231                verdict,
1232                reasoning,
1233                next_step,
1234                judge_duration_ms,
1235            } => {
1236                assert_eq!(session_id, "s");
1237                assert_eq!(iteration, 2);
1238                assert_eq!(verdict, "continue");
1239                assert_eq!(reasoning, "needs a concrete next step");
1240                assert_eq!(next_step.as_deref(), Some("run the verifier"));
1241                assert_eq!(judge_duration_ms, 17);
1242            }
1243            other => panic!("expected JudgeDecision, got {other:?}"),
1244        }
1245        let value: serde_json::Value = serde_json::from_str(&line).unwrap();
1246        assert_eq!(value["type"], "judge_decision");
1247        let _ = std::fs::remove_file(&path);
1248        let _ = std::fs::remove_dir(&dir);
1249    }
1250
1251    #[test]
1252    fn tool_call_update_durations_serialize_when_present_and_skip_when_absent() {
1253        // Terminal update with both durations populated — both fields
1254        // appear in the JSON. Snake_case keys here because this is the
1255        // canonical AgentEvent shape; the ACP adapter renames to
1256        // camelCase separately.
1257        let terminal = AgentEvent::ToolCallUpdate {
1258            session_id: "s".into(),
1259            tool_call_id: "tc-1".into(),
1260            tool_name: "read".into(),
1261            status: ToolCallStatus::Completed,
1262            raw_output: None,
1263            error: None,
1264            duration_ms: Some(42),
1265            execution_duration_ms: Some(7),
1266            error_category: None,
1267            executor: None,
1268            parsing: None,
1269
1270            raw_input: None,
1271            raw_input_partial: None,
1272            audit: None,
1273        };
1274        let value = serde_json::to_value(&terminal).unwrap();
1275        assert_eq!(value["duration_ms"], serde_json::json!(42));
1276        assert_eq!(value["execution_duration_ms"], serde_json::json!(7));
1277
1278        // In-progress update with `None` for both — both keys must be
1279        // absent (not `null`) so older ACP clients that key off
1280        // presence don't see a misleading zero.
1281        let intermediate = AgentEvent::ToolCallUpdate {
1282            session_id: "s".into(),
1283            tool_call_id: "tc-1".into(),
1284            tool_name: "read".into(),
1285            status: ToolCallStatus::InProgress,
1286            raw_output: None,
1287            error: None,
1288            duration_ms: None,
1289            execution_duration_ms: None,
1290            error_category: None,
1291            executor: None,
1292            parsing: None,
1293
1294            raw_input: None,
1295            raw_input_partial: None,
1296            audit: None,
1297        };
1298        let value = serde_json::to_value(&intermediate).unwrap();
1299        let object = value.as_object().expect("update serializes as object");
1300        assert!(
1301            !object.contains_key("duration_ms"),
1302            "duration_ms must be omitted when None: {value}"
1303        );
1304        assert!(
1305            !object.contains_key("execution_duration_ms"),
1306            "execution_duration_ms must be omitted when None: {value}"
1307        );
1308    }
1309
1310    #[test]
1311    fn tool_call_update_deserializes_without_duration_fields_for_back_compat() {
1312        // Persisted event-log entries written before the fields existed
1313        // must still deserialize cleanly. The missing keys map to None.
1314        let raw = serde_json::json!({
1315            "type": "tool_call_update",
1316            "session_id": "s",
1317            "tool_call_id": "tc-1",
1318            "tool_name": "read",
1319            "status": "completed",
1320            "raw_output": null,
1321            "error": null,
1322        });
1323        let event: AgentEvent = serde_json::from_value(raw).expect("parses without duration keys");
1324        match event {
1325            AgentEvent::ToolCallUpdate {
1326                duration_ms,
1327                execution_duration_ms,
1328                ..
1329            } => {
1330                assert!(duration_ms.is_none());
1331                assert!(execution_duration_ms.is_none());
1332            }
1333            other => panic!("expected ToolCallUpdate, got {other:?}"),
1334        }
1335    }
1336
1337    #[test]
1338    fn tool_call_status_serde() {
1339        assert_eq!(
1340            serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
1341            "\"pending\""
1342        );
1343        assert_eq!(
1344            serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
1345            "\"in_progress\""
1346        );
1347        assert_eq!(
1348            serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
1349            "\"completed\""
1350        );
1351        assert_eq!(
1352            serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
1353            "\"failed\""
1354        );
1355    }
1356
1357    #[test]
1358    fn tool_call_error_category_serializes_as_snake_case() {
1359        let pairs = [
1360            (ToolCallErrorCategory::SchemaValidation, "schema_validation"),
1361            (ToolCallErrorCategory::ToolError, "tool_error"),
1362            (ToolCallErrorCategory::McpServerError, "mcp_server_error"),
1363            (ToolCallErrorCategory::HostBridgeError, "host_bridge_error"),
1364            (ToolCallErrorCategory::PermissionDenied, "permission_denied"),
1365            (ToolCallErrorCategory::RejectedLoop, "rejected_loop"),
1366            (ToolCallErrorCategory::ParseAborted, "parse_aborted"),
1367            (ToolCallErrorCategory::Timeout, "timeout"),
1368            (ToolCallErrorCategory::Network, "network"),
1369            (ToolCallErrorCategory::Cancelled, "cancelled"),
1370            (ToolCallErrorCategory::Unknown, "unknown"),
1371        ];
1372        for (variant, wire) in pairs {
1373            let encoded = serde_json::to_string(&variant).unwrap();
1374            assert_eq!(encoded, format!("\"{wire}\""));
1375            assert_eq!(variant.as_str(), wire);
1376            // Round-trip via deserialize so wire stability is enforced
1377            // both ways.
1378            let decoded: ToolCallErrorCategory = serde_json::from_str(&encoded).unwrap();
1379            assert_eq!(decoded, variant);
1380        }
1381    }
1382
1383    #[test]
1384    fn tool_executor_round_trips_with_adjacent_tag() {
1385        // Adjacent tagging keeps the wire shape uniform — every variant
1386        // is a JSON object with a `kind` discriminator. The ACP adapter
1387        // rewrites unit variants as bare strings; the on-disk event log
1388        // keeps the object shape so deserialize can recover the variant.
1389        for executor in [
1390            ToolExecutor::HarnBuiltin,
1391            ToolExecutor::HostBridge,
1392            ToolExecutor::McpServer {
1393                server_name: "linear".to_string(),
1394            },
1395            ToolExecutor::ProviderNative,
1396        ] {
1397            let json = serde_json::to_value(&executor).unwrap();
1398            let kind = json.get("kind").and_then(|v| v.as_str()).unwrap();
1399            match &executor {
1400                ToolExecutor::HarnBuiltin => assert_eq!(kind, "harn_builtin"),
1401                ToolExecutor::HostBridge => assert_eq!(kind, "host_bridge"),
1402                ToolExecutor::McpServer { server_name } => {
1403                    assert_eq!(kind, "mcp_server");
1404                    assert_eq!(json["server_name"], *server_name);
1405                }
1406                ToolExecutor::ProviderNative => assert_eq!(kind, "provider_native"),
1407            }
1408            let recovered: ToolExecutor = serde_json::from_value(json).unwrap();
1409            assert_eq!(recovered, executor);
1410        }
1411    }
1412
1413    #[test]
1414    fn tool_call_error_category_from_internal_collapses_transient_family() {
1415        use crate::value::ErrorCategory as Internal;
1416        assert_eq!(
1417            ToolCallErrorCategory::from_internal(&Internal::Timeout),
1418            ToolCallErrorCategory::Timeout
1419        );
1420        for net in [
1421            Internal::RateLimit,
1422            Internal::Overloaded,
1423            Internal::ServerError,
1424            Internal::TransientNetwork,
1425        ] {
1426            assert_eq!(
1427                ToolCallErrorCategory::from_internal(&net),
1428                ToolCallErrorCategory::Network,
1429                "{net:?} should map to Network",
1430            );
1431        }
1432        assert_eq!(
1433            ToolCallErrorCategory::from_internal(&Internal::SchemaValidation),
1434            ToolCallErrorCategory::SchemaValidation
1435        );
1436        assert_eq!(
1437            ToolCallErrorCategory::from_internal(&Internal::ToolError),
1438            ToolCallErrorCategory::ToolError
1439        );
1440        assert_eq!(
1441            ToolCallErrorCategory::from_internal(&Internal::ToolRejected),
1442            ToolCallErrorCategory::PermissionDenied
1443        );
1444        assert_eq!(
1445            ToolCallErrorCategory::from_internal(&Internal::Cancelled),
1446            ToolCallErrorCategory::Cancelled
1447        );
1448        for bridge in [
1449            Internal::Auth,
1450            Internal::EgressBlocked,
1451            Internal::NotFound,
1452            Internal::CircuitOpen,
1453            Internal::Generic,
1454        ] {
1455            assert_eq!(
1456                ToolCallErrorCategory::from_internal(&bridge),
1457                ToolCallErrorCategory::HostBridgeError,
1458                "{bridge:?} should map to HostBridgeError",
1459            );
1460        }
1461    }
1462
1463    #[test]
1464    fn tool_call_update_event_omits_error_category_when_none() {
1465        let event = AgentEvent::ToolCallUpdate {
1466            session_id: "s".into(),
1467            tool_call_id: "t".into(),
1468            tool_name: "read".into(),
1469            status: ToolCallStatus::Completed,
1470            raw_output: None,
1471            error: None,
1472            duration_ms: None,
1473            execution_duration_ms: None,
1474            error_category: None,
1475            executor: None,
1476            parsing: None,
1477
1478            raw_input: None,
1479            raw_input_partial: None,
1480            audit: None,
1481        };
1482        let v = serde_json::to_value(&event).unwrap();
1483        assert_eq!(v["type"], "tool_call_update");
1484        assert!(v.get("error_category").is_none());
1485    }
1486
1487    #[test]
1488    fn tool_call_update_event_serializes_error_category_when_set() {
1489        let event = AgentEvent::ToolCallUpdate {
1490            session_id: "s".into(),
1491            tool_call_id: "t".into(),
1492            tool_name: "read".into(),
1493            status: ToolCallStatus::Failed,
1494            raw_output: None,
1495            error: Some("missing required field".into()),
1496            duration_ms: None,
1497            execution_duration_ms: None,
1498            error_category: Some(ToolCallErrorCategory::SchemaValidation),
1499            executor: None,
1500            parsing: None,
1501
1502            raw_input: None,
1503            raw_input_partial: None,
1504            audit: None,
1505        };
1506        let v = serde_json::to_value(&event).unwrap();
1507        assert_eq!(v["error_category"], "schema_validation");
1508        assert_eq!(v["error"], "missing required field");
1509    }
1510
1511    #[test]
1512    fn tool_call_update_omits_executor_when_absent() {
1513        // `executor: None` must not appear in the serialized event so
1514        // the on-disk shape stays backward-compatible with replays
1515        // recorded before harn#691.
1516        let event = AgentEvent::ToolCallUpdate {
1517            session_id: "s".into(),
1518            tool_call_id: "tc-1".into(),
1519            tool_name: "read".into(),
1520            status: ToolCallStatus::Completed,
1521            raw_output: None,
1522            error: None,
1523            duration_ms: None,
1524            execution_duration_ms: None,
1525            error_category: None,
1526            executor: None,
1527            parsing: None,
1528
1529            raw_input: None,
1530            raw_input_partial: None,
1531            audit: None,
1532        };
1533        let json = serde_json::to_value(&event).unwrap();
1534        assert!(json.get("executor").is_none(), "got: {json}");
1535    }
1536
1537    #[test]
1538    fn worker_event_status_strings_cover_all_variants() {
1539        // Wire-level status strings flow into both bridge `worker_update`
1540        // payloads and ACP `session/update` notifications. Pinning every
1541        // variant here so a future addition can't silently land without
1542        // a docs/wire decision.
1543        assert_eq!(WorkerEvent::WorkerSpawned.as_status(), "running");
1544        assert_eq!(WorkerEvent::WorkerProgressed.as_status(), "progressed");
1545        assert_eq!(
1546            WorkerEvent::WorkerWaitingForInput.as_status(),
1547            "awaiting_input"
1548        );
1549        assert_eq!(WorkerEvent::WorkerCompleted.as_status(), "completed");
1550        assert_eq!(WorkerEvent::WorkerFailed.as_status(), "failed");
1551        assert_eq!(WorkerEvent::WorkerCancelled.as_status(), "cancelled");
1552
1553        for terminal in [
1554            WorkerEvent::WorkerCompleted,
1555            WorkerEvent::WorkerFailed,
1556            WorkerEvent::WorkerCancelled,
1557        ] {
1558            assert!(terminal.is_terminal(), "{terminal:?} should be terminal");
1559        }
1560        for non_terminal in [
1561            WorkerEvent::WorkerSpawned,
1562            WorkerEvent::WorkerProgressed,
1563            WorkerEvent::WorkerWaitingForInput,
1564        ] {
1565            assert!(
1566                !non_terminal.is_terminal(),
1567                "{non_terminal:?} should not be terminal"
1568            );
1569        }
1570    }
1571
1572    #[test]
1573    fn worker_update_event_routes_through_session_keyed_sink() {
1574        // Worker lifecycle events ride the same session-keyed
1575        // `AgentEventSink` registry as message and tool events. This
1576        // is the canonical path ACP and A2A subscribe to — gate it
1577        // here so a registry-routing regression breaks loudly.
1578        reset_all_sinks();
1579        let captured: Arc<Mutex<Vec<AgentEvent>>> = Arc::new(Mutex::new(Vec::new()));
1580        struct CapturingSink(Arc<Mutex<Vec<AgentEvent>>>);
1581        impl AgentEventSink for CapturingSink {
1582            fn handle_event(&self, event: &AgentEvent) {
1583                self.0
1584                    .lock()
1585                    .expect("captured sink mutex poisoned")
1586                    .push(event.clone());
1587            }
1588        }
1589        register_sink(
1590            "worker-session-1",
1591            Arc::new(CapturingSink(captured.clone())),
1592        );
1593        emit_event(&AgentEvent::WorkerUpdate {
1594            session_id: "worker-session-1".into(),
1595            worker_id: "worker_42".into(),
1596            worker_name: "review_captain".into(),
1597            worker_task: "review pr".into(),
1598            worker_mode: "delegated_stage".into(),
1599            event: WorkerEvent::WorkerWaitingForInput,
1600            status: WorkerEvent::WorkerWaitingForInput.as_status().to_string(),
1601            metadata: serde_json::json!({"awaiting_started_at": "0193..."}),
1602            audit: None,
1603        });
1604        // Other sessions don't receive cross-talk.
1605        emit_event(&AgentEvent::WorkerUpdate {
1606            session_id: "other-session".into(),
1607            worker_id: "w2".into(),
1608            worker_name: "n2".into(),
1609            worker_task: "t2".into(),
1610            worker_mode: "delegated_stage".into(),
1611            event: WorkerEvent::WorkerCompleted,
1612            status: "completed".into(),
1613            metadata: serde_json::json!({}),
1614            audit: None,
1615        });
1616        let received = captured.lock().unwrap().clone();
1617        assert_eq!(received.len(), 1, "got: {received:?}");
1618        match &received[0] {
1619            AgentEvent::WorkerUpdate {
1620                session_id,
1621                worker_id,
1622                event,
1623                status,
1624                ..
1625            } => {
1626                assert_eq!(session_id, "worker-session-1");
1627                assert_eq!(worker_id, "worker_42");
1628                assert_eq!(*event, WorkerEvent::WorkerWaitingForInput);
1629                assert_eq!(status, "awaiting_input");
1630            }
1631            other => panic!("expected WorkerUpdate, got {other:?}"),
1632        }
1633        reset_all_sinks();
1634    }
1635
1636    #[test]
1637    fn worker_update_event_serializes_to_canonical_shape() {
1638        // Persisted event-log entries flatten the AgentEvent envelope,
1639        // so the WorkerUpdate variant must serialize with a `type` of
1640        // `worker_update` and the worker fields directly on the
1641        // top-level object (matching the `#[serde(tag = "type", ...)]`
1642        // shape the rest of AgentEvent uses).
1643        let event = AgentEvent::WorkerUpdate {
1644            session_id: "s".into(),
1645            worker_id: "w".into(),
1646            worker_name: "n".into(),
1647            worker_task: "t".into(),
1648            worker_mode: "delegated_stage".into(),
1649            event: WorkerEvent::WorkerProgressed,
1650            status: "progressed".into(),
1651            metadata: serde_json::json!({"started_at": "0193..."}),
1652            audit: Some(serde_json::json!({"run_id": "run_x"})),
1653        };
1654        let value = serde_json::to_value(&event).unwrap();
1655        assert_eq!(value["type"], "worker_update");
1656        assert_eq!(value["session_id"], "s");
1657        assert_eq!(value["worker_id"], "w");
1658        assert_eq!(value["status"], "progressed");
1659        assert_eq!(value["audit"]["run_id"], "run_x");
1660
1661        // Round-trip: the persisted event log must deserialize the
1662        // canonical shape back into the typed variant so replay
1663        // tooling can re-derive lifecycle state offline.
1664        let recovered: AgentEvent = serde_json::from_value(value).unwrap();
1665        match recovered {
1666            AgentEvent::WorkerUpdate {
1667                event: recovered_event,
1668                ..
1669            } => assert_eq!(recovered_event, WorkerEvent::WorkerProgressed),
1670            other => panic!("expected WorkerUpdate, got {other:?}"),
1671        }
1672    }
1673
1674    #[test]
1675    fn tool_call_update_includes_executor_when_present() {
1676        let event = AgentEvent::ToolCallUpdate {
1677            session_id: "s".into(),
1678            tool_call_id: "tc-1".into(),
1679            tool_name: "read".into(),
1680            status: ToolCallStatus::Completed,
1681            raw_output: None,
1682            error: None,
1683            duration_ms: None,
1684            execution_duration_ms: None,
1685            error_category: None,
1686            executor: Some(ToolExecutor::McpServer {
1687                server_name: "github".into(),
1688            }),
1689            parsing: None,
1690
1691            raw_input: None,
1692            raw_input_partial: None,
1693            audit: None,
1694        };
1695        let json = serde_json::to_value(&event).unwrap();
1696        assert_eq!(json["executor"]["kind"], "mcp_server");
1697        assert_eq!(json["executor"]["server_name"], "github");
1698    }
1699
1700    #[test]
1701    fn tool_call_update_omits_audit_when_absent() {
1702        let event = AgentEvent::ToolCallUpdate {
1703            session_id: "s".into(),
1704            tool_call_id: "tc-1".into(),
1705            tool_name: "read".into(),
1706            status: ToolCallStatus::Completed,
1707            raw_output: None,
1708            error: None,
1709            duration_ms: None,
1710            execution_duration_ms: None,
1711            error_category: None,
1712            executor: None,
1713            parsing: None,
1714            raw_input: None,
1715            raw_input_partial: None,
1716            audit: None,
1717        };
1718        let json = serde_json::to_value(&event).unwrap();
1719        assert!(json.get("audit").is_none(), "got: {json}");
1720    }
1721
1722    #[test]
1723    fn tool_call_update_includes_audit_when_present() {
1724        let audit = MutationSessionRecord {
1725            session_id: "session_42".into(),
1726            run_id: Some("run_42".into()),
1727            mutation_scope: "apply_workspace".into(),
1728            execution_kind: Some("worker".into()),
1729            ..Default::default()
1730        };
1731        let event = AgentEvent::ToolCallUpdate {
1732            session_id: "s".into(),
1733            tool_call_id: "tc-1".into(),
1734            tool_name: "edit_file".into(),
1735            status: ToolCallStatus::Completed,
1736            raw_output: None,
1737            error: None,
1738            duration_ms: None,
1739            execution_duration_ms: None,
1740            error_category: None,
1741            executor: Some(ToolExecutor::HostBridge),
1742            parsing: None,
1743            raw_input: None,
1744            raw_input_partial: None,
1745            audit: Some(audit),
1746        };
1747        let json = serde_json::to_value(&event).unwrap();
1748        assert_eq!(json["audit"]["session_id"], "session_42");
1749        assert_eq!(json["audit"]["run_id"], "run_42");
1750        assert_eq!(json["audit"]["mutation_scope"], "apply_workspace");
1751        assert_eq!(json["audit"]["execution_kind"], "worker");
1752    }
1753
1754    #[test]
1755    fn tool_call_update_deserializes_without_audit_field_for_back_compat() {
1756        let raw = serde_json::json!({
1757            "type": "tool_call_update",
1758            "session_id": "s",
1759            "tool_call_id": "tc-1",
1760            "tool_name": "read",
1761            "status": "completed",
1762            "raw_output": null,
1763            "error": null,
1764        });
1765        let event: AgentEvent = serde_json::from_value(raw).expect("parses without audit key");
1766        match event {
1767            AgentEvent::ToolCallUpdate { audit, .. } => {
1768                assert!(audit.is_none());
1769            }
1770            other => panic!("expected ToolCallUpdate, got {other:?}"),
1771        }
1772    }
1773
1774    #[test]
1775    fn tool_call_audit_serializes_with_free_form_audit_payload() {
1776        // Middleware-attached metadata is intentionally free-form JSON
1777        // (A2A-style `metadata` extension slot). The wire format must
1778        // preserve nested dicts + lists verbatim so hosts can read
1779        // `summary`/`consent`/`layers`/etc. without per-field schema.
1780        let audit = serde_json::json!({
1781            "summary": "Searched codebase",
1782            "kind": "search",
1783            "consent": {"decision": "approved", "decided_by": "auto"},
1784            "layers": [{"name": "with_required_reason", "status": "ok"}],
1785        });
1786        let event = AgentEvent::ToolCallAudit {
1787            session_id: "s".into(),
1788            tool_call_id: "tc-1".into(),
1789            tool_name: "search_files".into(),
1790            audit: audit.clone(),
1791        };
1792        let json = serde_json::to_value(&event).unwrap();
1793        assert_eq!(json["type"], "tool_call_audit");
1794        assert_eq!(json["session_id"], "s");
1795        assert_eq!(json["tool_call_id"], "tc-1");
1796        assert_eq!(json["tool_name"], "search_files");
1797        assert_eq!(json["audit"], audit);
1798    }
1799
1800    #[test]
1801    fn tool_call_audit_session_id_routes_correctly() {
1802        let event = AgentEvent::ToolCallAudit {
1803            session_id: "abc".into(),
1804            tool_call_id: "tc".into(),
1805            tool_name: "read".into(),
1806            audit: serde_json::Value::Null,
1807        };
1808        assert_eq!(event.session_id(), "abc");
1809    }
1810}