Skip to main content

harn_vm/
agent_events.rs

1//! Agent event stream — the ACP-aligned observation surface for the
2//! agent loop.
3//!
4//! Every phase of the turn loop emits an `AgentEvent`. The canonical
5//! variants map 1:1 onto ACP `SessionUpdate` values; three internal
6//! variants (`TurnStart`, `TurnEnd`, `FeedbackInjected`) let pipelines
7//! react to loop milestones that don't have a direct ACP counterpart.
8//!
9//! There are two subscription paths, both keyed on session id so two
10//! concurrent sessions never cross-talk:
11//!
12//! 1. **External sinks** (`AgentEventSink` trait) — Rust-side consumers
13//!    like the harn-cli ACP server. Invoked synchronously by the loop.
14//!    Stored in a global `OnceLock<RwLock<HashMap<...>>>` here.
15//! 2. **Closure subscribers** — `.harn` closures registered via the
16//!    `agent_subscribe(session_id, callback)` host builtin. These live
17//!    on the session's `SessionState.subscribers` in
18//!    `crate::agent_sessions`, because sessions are the single source
19//!    of truth for session-scoped VM state.
20
21use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23
24use serde::{Deserialize, Serialize};
25
26use crate::event_log::{AnyEventLog, EventLog, LogEvent as EventLogRecord, Topic};
27use crate::orchestration::{HandoffArtifact, MutationSessionRecord};
28use crate::tool_annotations::ToolKind;
29
30/// One coalesced filesystem notification from a hostlib `fs_watch`
31/// subscription.
32#[derive(Clone, Debug, Serialize, Deserialize)]
33pub struct FsWatchEvent {
34    pub kind: String,
35    pub paths: Vec<String>,
36    pub relative_paths: Vec<String>,
37    pub raw_kind: String,
38    pub error: Option<String>,
39}
40
41/// Typed worker lifecycle events emitted by delegated/background agent
42/// execution. Bridge-facing worker updates still derive a string status
43/// from these variants, but the runtime no longer passes raw status
44/// strings around internally.
45///
46/// `Spawned`/`Completed`/`Failed`/`Cancelled` are the four terminal-or-start
47/// states. `Progressed` is fired on intermediate milestones (e.g. a
48/// retriggerable worker resuming from `awaiting_input`, or a workflow
49/// stage completing without ending the worker). `WaitingForInput` covers
50/// retriggerable workers that finish a cycle but stay alive pending the
51/// next host-supplied trigger payload.
52#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
53pub enum WorkerEvent {
54    WorkerSpawned,
55    WorkerProgressed,
56    WorkerWaitingForInput,
57    WorkerCompleted,
58    WorkerFailed,
59    WorkerCancelled,
60}
61
62impl WorkerEvent {
63    /// Wire-level status string used by bridge `worker_update` payloads
64    /// and ACP `worker_update` session updates. The four canonical
65    /// states are mirrored from harn's internal worker `status` field
66    /// (`running`/`completed`/`failed`/`cancelled`), and the two newer
67    /// lifecycle states pick names that don't collide with any existing
68    /// status string.
69    pub fn as_status(self) -> &'static str {
70        match self {
71            Self::WorkerSpawned => "running",
72            Self::WorkerProgressed => "progressed",
73            Self::WorkerWaitingForInput => "awaiting_input",
74            Self::WorkerCompleted => "completed",
75            Self::WorkerFailed => "failed",
76            Self::WorkerCancelled => "cancelled",
77        }
78    }
79
80    pub fn as_str(self) -> &'static str {
81        match self {
82            Self::WorkerSpawned => "WorkerSpawned",
83            Self::WorkerProgressed => "WorkerProgressed",
84            Self::WorkerWaitingForInput => "WorkerWaitingForInput",
85            Self::WorkerCompleted => "WorkerCompleted",
86            Self::WorkerFailed => "WorkerFailed",
87            Self::WorkerCancelled => "WorkerCancelled",
88        }
89    }
90
91    /// True for lifecycle events that mean the worker has reached a
92    /// final, non-resumable state. Retriggerable awaiting and
93    /// progressed milestones are *not* terminal — the worker keeps
94    /// running or is waiting for a trigger.
95    pub fn is_terminal(self) -> bool {
96        matches!(
97            self,
98            Self::WorkerCompleted | Self::WorkerFailed | Self::WorkerCancelled
99        )
100    }
101}
102
103/// Status of a tool call. Mirrors ACP's `toolCallStatus`.
104#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
105#[serde(rename_all = "snake_case")]
106pub enum ToolCallStatus {
107    /// Dispatched by the model but not yet started.
108    Pending,
109    /// Dispatch is actively running.
110    InProgress,
111    /// Finished successfully.
112    Completed,
113    /// Finished with an error.
114    Failed,
115}
116
117impl ToolCallStatus {
118    pub const ALL: [Self; 4] = [
119        Self::Pending,
120        Self::InProgress,
121        Self::Completed,
122        Self::Failed,
123    ];
124
125    pub fn as_str(self) -> &'static str {
126        match self {
127            Self::Pending => "pending",
128            Self::InProgress => "in_progress",
129            Self::Completed => "completed",
130            Self::Failed => "failed",
131        }
132    }
133}
134
135/// Wire-level classification of a `ToolCallUpdate` failure. Pairs with the
136/// human-readable `error` string so clients can render each failure type
137/// distinctly (e.g. surface a "permission denied" badge, or a different
138/// retry affordance for `network` vs `tool_error`). The enum is
139/// deliberately extensible — `unknown` is the default when the runtime
140/// could not classify a failure.
141#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
142#[serde(rename_all = "snake_case")]
143pub enum ToolCallErrorCategory {
144    /// Host-side validation rejected the args (missing required field,
145    /// invalid type, malformed JSON).
146    SchemaValidation,
147    /// The tool ran and returned an error result (e.g. `read_file` on a
148    /// missing path) — distinguished from a transport failure.
149    ToolError,
150    /// MCP transport / server-protocol error.
151    McpServerError,
152    /// Burin Swift host bridge returned an error during dispatch.
153    HostBridgeError,
154    /// `session/request_permission` denied by the client, or a policy
155    /// rule (static or dynamic) refused the call.
156    PermissionDenied,
157    /// The harn loop detector skipped this call because the same
158    /// (tool, args) pair repeated past the configured threshold.
159    RejectedLoop,
160    /// Streaming text candidate was detected (bare `name(` or
161    /// `<tool_call>` opener) but never resolved into a parseable call:
162    /// args parsed as malformed, the heredoc body broke, the tag closed
163    /// without a balanced expression, or the stream ended mid-call.
164    /// Used by the streaming candidate detector (harn#692) to retract a
165    /// `tool_call` candidate that turned out to be prose or syntactically
166    /// broken so clients can dismiss the in-flight chip.
167    ParseAborted,
168    /// The tool exceeded its time budget.
169    Timeout,
170    /// Transient network / rate-limited / 5xx provider failure.
171    Network,
172    /// The tool was cancelled (e.g. session aborted).
173    Cancelled,
174    /// Default when classification was not performed.
175    Unknown,
176}
177
178impl ToolCallErrorCategory {
179    pub const ALL: [Self; 11] = [
180        Self::SchemaValidation,
181        Self::ToolError,
182        Self::McpServerError,
183        Self::HostBridgeError,
184        Self::PermissionDenied,
185        Self::RejectedLoop,
186        Self::ParseAborted,
187        Self::Timeout,
188        Self::Network,
189        Self::Cancelled,
190        Self::Unknown,
191    ];
192
193    pub fn as_str(self) -> &'static str {
194        match self {
195            Self::SchemaValidation => "schema_validation",
196            Self::ToolError => "tool_error",
197            Self::McpServerError => "mcp_server_error",
198            Self::HostBridgeError => "host_bridge_error",
199            Self::PermissionDenied => "permission_denied",
200            Self::RejectedLoop => "rejected_loop",
201            Self::ParseAborted => "parse_aborted",
202            Self::Timeout => "timeout",
203            Self::Network => "network",
204            Self::Cancelled => "cancelled",
205            Self::Unknown => "unknown",
206        }
207    }
208
209    /// Map an internal `ErrorCategory` (used by the VM's `VmError`
210    /// classification) onto the wire enum. The internal taxonomy is
211    /// finer-grained — several transient categories collapse onto
212    /// `Network`, and the auth/quota family becomes `HostBridgeError`
213    /// because at the tool-dispatch boundary those errors come from
214    /// the bridge transport rather than the tool itself.
215    pub fn from_internal(category: &crate::value::ErrorCategory) -> Self {
216        use crate::value::ErrorCategory as Internal;
217        match category {
218            Internal::Timeout => Self::Timeout,
219            Internal::RateLimit
220            | Internal::Overloaded
221            | Internal::ServerError
222            | Internal::TransientNetwork => Self::Network,
223            Internal::SchemaValidation => Self::SchemaValidation,
224            Internal::ToolError => Self::ToolError,
225            Internal::ToolRejected => Self::PermissionDenied,
226            Internal::Cancelled => Self::Cancelled,
227            Internal::Auth
228            | Internal::EgressBlocked
229            | Internal::NotFound
230            | Internal::CircuitOpen
231            | Internal::BudgetExceeded
232            | Internal::Generic => Self::HostBridgeError,
233        }
234    }
235}
236
237/// Where a tool actually ran. Tags `ToolCallUpdate` so clients can render
238/// "via mcp:linear" / "via host bridge" badges, attribute latency by
239/// transport, and route errors to the right surface (harn#691).
240///
241/// On the wire this serializes adjacently-tagged so the `mcp_server`
242/// case carries the configured server name. The ACP adapter rewrites
243/// unit variants as bare strings (`"harn_builtin"`, `"host_bridge"`,
244/// `"provider_native"`) and the `McpServer` case as
245/// `{"kind": "mcp_server", "serverName": "..."}` to match the protocol's
246/// camelCase convention.
247#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
248#[serde(tag = "kind", rename_all = "snake_case")]
249pub enum ToolExecutor {
250    /// VM-stdlib (`read_file`, `write_file`, `exec`, `http_*`, `mcp_*`)
251    /// or any Harn-side handler closure registered in `tools_val`.
252    HarnBuiltin,
253    /// Capability provided by the host through `HostBridge.builtin_call`
254    /// (Swift-side IDE bridge, BurinApp, BurinCLI host shells).
255    HostBridge,
256    /// Tool dispatched against a configured MCP server. Detected by the
257    /// `_mcp_server` tag that `mcp_list_tools` injects on every tool
258    /// dict before the agent loop sees it.
259    McpServer { server_name: String },
260    /// Provider-side server-side tool execution — currently OpenAI
261    /// Responses-API server tools (e.g. native `tool_search`). The
262    /// runtime never dispatches these locally; the model returns the
263    /// already-executed result inline.
264    ProviderNative,
265}
266
267/// Events emitted by the agent loop. The first five variants map 1:1
268/// to ACP `sessionUpdate` variants; the last three are harn-internal.
269#[derive(Clone, Debug, Serialize, Deserialize)]
270#[serde(tag = "type", rename_all = "snake_case")]
271pub enum AgentEvent {
272    AgentMessageChunk {
273        session_id: String,
274        content: String,
275    },
276    AgentThoughtChunk {
277        session_id: String,
278        content: String,
279    },
280    ToolCall {
281        session_id: String,
282        tool_call_id: String,
283        tool_name: String,
284        kind: Option<ToolKind>,
285        status: ToolCallStatus,
286        raw_input: serde_json::Value,
287        /// Set to `Some(true)` by the streaming candidate detector
288        /// (harn#692) when this event represents a tool-call shape
289        /// detected in the model's in-flight assistant text but whose
290        /// arguments have not finished parsing yet. Clients can render a
291        /// spinner / placeholder while the model writes the body. The
292        /// detector follows up with a `ToolCallUpdate { parsing: false,
293        /// .. }` carrying either `status: pending` (promoted) or
294        /// `status: failed` with `error_category: parse_aborted`.
295        /// `None` (the default) means "this is a normal post-parse tool
296        /// call, no candidate phase was active" so the on-disk shape
297        /// stays compatible with replays recorded before this field
298        /// existed.
299        #[serde(default, skip_serializing_if = "Option::is_none")]
300        parsing: Option<bool>,
301        /// Mutation-session audit context active when the tool was
302        /// dispatched (see harn#699). Hosts use it to group every tool
303        /// emission belonging to the same write-capable session.
304        #[serde(default, skip_serializing_if = "Option::is_none")]
305        audit: Option<MutationSessionRecord>,
306    },
307    ToolCallUpdate {
308        session_id: String,
309        tool_call_id: String,
310        tool_name: String,
311        status: ToolCallStatus,
312        raw_output: Option<serde_json::Value>,
313        error: Option<String>,
314        /// Wall-clock milliseconds from the parse-to-execution boundary
315        /// to the terminal `Completed`/`Failed` update. Includes the
316        /// time spent in any wrapping orchestration logic (loop checks,
317        /// post-tool hooks, microcompaction). Populated only on the
318        /// terminal update — `None` on intermediate `Pending` /
319        /// `InProgress` updates so clients can ignore the field until
320        /// it shows up.
321        #[serde(default, skip_serializing_if = "Option::is_none")]
322        duration_ms: Option<u64>,
323        /// Milliseconds spent in the actual host/builtin/MCP dispatch
324        /// call only (the inner `dispatch_tool_execution` window).
325        /// Populated only on the terminal update; `None` otherwise.
326        #[serde(default, skip_serializing_if = "Option::is_none")]
327        execution_duration_ms: Option<u64>,
328        /// Structured classification of the failure (when `status` is
329        /// `Failed`). Paired with `error` so clients can render each
330        /// category distinctly without parsing free-form strings. Always
331        /// `None` for non-Failed updates and serialized as
332        /// `errorCategory` in the ACP wire format.
333        #[serde(default, skip_serializing_if = "Option::is_none")]
334        error_category: Option<ToolCallErrorCategory>,
335        /// Where the tool actually ran. `None` only for events emitted
336        /// from sites that pre-date the dispatch decision (e.g. the
337        /// pending → in-progress transition the loop emits before the
338        /// dispatcher picks a backend).
339        #[serde(default, skip_serializing_if = "Option::is_none")]
340        executor: Option<ToolExecutor>,
341        /// Companion to `ToolCall.parsing` (harn#692). The streaming
342        /// candidate detector emits the *terminal* candidate event as a
343        /// `ToolCallUpdate` with `parsing: Some(false)` to retract the
344        /// in-flight `parsing: true` chip — either by promoting the
345        /// candidate (`status: pending`, populated `raw_output: None`,
346        /// `error: None`) or aborting it (`status: failed`,
347        /// `error_category: parse_aborted`). `None` means this update is
348        /// not part of a candidate-phase transition.
349        #[serde(default, skip_serializing_if = "Option::is_none")]
350        parsing: Option<bool>,
351        /// Best-effort partial parse of the streamed tool-call arguments.
352        /// Populated by the SSE transport on `Pending` updates as the
353        /// model streams `input_json_delta` (Anthropic) or
354        /// `tool_calls[].function.arguments` deltas (OpenAI). `None` on
355        /// terminal updates and on emissions from non-streaming paths
356        /// (#693). When the partial bytes are not yet parseable as JSON
357        /// the transport falls back to `raw_input_partial`.
358        #[serde(default, skip_serializing_if = "Option::is_none")]
359        raw_input: Option<serde_json::Value>,
360        /// Raw concatenated bytes of the streamed tool-call arguments
361        /// when a permissive parse failed (#693). Mutually exclusive
362        /// with `raw_input`: clients render whichever is present.
363        #[serde(default, skip_serializing_if = "Option::is_none")]
364        raw_input_partial: Option<String>,
365        /// Mutation-session audit context for the tool call. Carries the
366        /// same payload as on the paired `ToolCall` event so a host
367        /// processing a single update doesn't have to correlate against
368        /// the prior pending event.
369        #[serde(default, skip_serializing_if = "Option::is_none")]
370        audit: Option<MutationSessionRecord>,
371    },
372    Plan {
373        session_id: String,
374        plan: serde_json::Value,
375    },
376    TurnStart {
377        session_id: String,
378        iteration: usize,
379    },
380    TurnEnd {
381        session_id: String,
382        iteration: usize,
383        turn_info: serde_json::Value,
384    },
385    JudgeDecision {
386        session_id: String,
387        iteration: usize,
388        verdict: String,
389        reasoning: String,
390        next_step: Option<String>,
391        judge_duration_ms: u64,
392    },
393    TypedCheckpoint {
394        session_id: String,
395        checkpoint: serde_json::Value,
396    },
397    FeedbackInjected {
398        session_id: String,
399        kind: String,
400        content: String,
401    },
402    /// Emitted when the agent loop exhausts `max_iterations` without any
403    /// explicit break condition firing. Distinct from a natural "done" or
404    /// a "stuck" nudge-exhaustion: this is strictly a budget cap.
405    BudgetExhausted {
406        session_id: String,
407        max_iterations: usize,
408    },
409    /// Emitted when the loop breaks because consecutive text-only turns
410    /// hit `max_nudges`. Parity with `BudgetExhausted` / `TurnEnd` for
411    /// hosts that key off agent-terminal events.
412    LoopStuck {
413        session_id: String,
414        max_nudges: usize,
415        last_iteration: usize,
416        tail_excerpt: String,
417    },
418    /// Emitted when the daemon idle-wait loop trips its watchdog because
419    /// every configured wake source returned `None` for N consecutive
420    /// attempts. Exists so a broken daemon doesn't hang the session
421    /// silently.
422    DaemonWatchdogTripped {
423        session_id: String,
424        attempts: usize,
425        elapsed_ms: u64,
426    },
427    /// Emitted when a skill is activated. Carries the match reason so
428    /// replayers can reconstruct *why* a given skill took effect at
429    /// this iteration.
430    SkillActivated {
431        session_id: String,
432        skill_name: String,
433        iteration: usize,
434        reason: String,
435    },
436    /// Emitted when a previously-active skill is deactivated because
437    /// the reassess phase no longer matches it.
438    SkillDeactivated {
439        session_id: String,
440        skill_name: String,
441        iteration: usize,
442    },
443    /// Emitted once per activation when the skill's `allowed_tools` filter
444    /// narrows the effective tool surface exposed to the model.
445    SkillScopeTools {
446        session_id: String,
447        skill_name: String,
448        allowed_tools: Vec<String>,
449    },
450    /// Emitted when a `tool_search` query is issued by the model. Carries
451    /// the raw query args, the configured strategy, and a `mode` tag
452    /// distinguishing the client-executed fallback (`"client"`) from
453    /// provider-native paths (`"anthropic"` / `"openai"`). Mirrors the
454    /// transcript event shape so hosts can render a search-in-progress
455    /// chip in real time — the replay path walks the transcript after
456    /// the turn, which is too late for live UX.
457    ToolSearchQuery {
458        session_id: String,
459        tool_use_id: String,
460        name: String,
461        query: serde_json::Value,
462        strategy: String,
463        mode: String,
464    },
465    /// Emitted when `tool_search` resolves — carries the list of tool
466    /// names newly promoted into the model's effective surface for the
467    /// next turn. Pair-emitted with `ToolSearchQuery` on every search.
468    ToolSearchResult {
469        session_id: String,
470        tool_use_id: String,
471        promoted: Vec<String>,
472        strategy: String,
473        mode: String,
474    },
475    TranscriptCompacted {
476        session_id: String,
477        mode: String,
478        strategy: String,
479        archived_messages: usize,
480        estimated_tokens_before: usize,
481        estimated_tokens_after: usize,
482        snapshot_asset_id: Option<String>,
483    },
484    Handoff {
485        session_id: String,
486        artifact_id: String,
487        handoff: Box<HandoffArtifact>,
488    },
489    FsWatch {
490        session_id: String,
491        subscription_id: String,
492        events: Vec<FsWatchEvent>,
493    },
494    /// Lifecycle update for a delegated/background worker. Carries the
495    /// canonical typed `event` variant alongside the worker's current
496    /// `status` string and the structured `metadata` payload that
497    /// `worker_bridge_metadata` builds (task, mode, timing, child
498    /// run/snapshot paths, audit-session, etc.). The `audit` field is
499    /// the same `MutationSessionRecord` JSON serialization carried on
500    /// the bridge wire so ACP/A2A consumers don't need to re-derive it.
501    ///
502    /// One-to-one with the bridge-side `worker_update` session-update
503    /// notification: ACP and A2A adapters subscribe to this variant
504    /// and translate it into their respective wire formats. The
505    /// `session_id` is the parent agent session that owns the worker
506    /// (i.e. the session whose VM spawned the worker), so a single
507    /// host stays subscribed to the same sink for both message and
508    /// worker traffic.
509    WorkerUpdate {
510        session_id: String,
511        worker_id: String,
512        worker_name: String,
513        worker_task: String,
514        worker_mode: String,
515        event: WorkerEvent,
516        status: String,
517        metadata: serde_json::Value,
518        audit: Option<serde_json::Value>,
519    },
520    /// A human-in-the-loop primitive (`ask_user`, `request_approval`,
521    /// `dual_control`, `escalate`) has just suspended the script and is
522    /// waiting on a response. Hosts that bridge the VM onto a remote
523    /// transport (ACP, A2A) translate this into a "paused / awaiting
524    /// input" wire signal so the client knows the task isn't stuck —
525    /// it's blocked on the human side. Pair-emitted with `HitlResolved`
526    /// when the waitpoint completes/cancels/times out.
527    HitlRequested {
528        session_id: String,
529        request_id: String,
530        kind: String,
531        payload: serde_json::Value,
532    },
533    /// Companion to `HitlRequested`: the waitpoint has resolved (either
534    /// a response arrived, the deadline elapsed, or the request was
535    /// cancelled). `outcome` is one of `"answered"`, `"timeout"`,
536    /// `"cancelled"`. Hosts use this to flip task state back to
537    /// `working` after an `input-required` pause.
538    HitlResolved {
539        session_id: String,
540        request_id: String,
541        kind: String,
542        outcome: String,
543    },
544    /// Emitted by the agent loop's adaptive iteration budget /
545    /// `loop_control` policy when a budget extension or early stop fires.
546    /// Generic enough to cover both shapes — `action` distinguishes them.
547    /// Carries the iteration the decision applied to, the previous /
548    /// resulting iteration limit, the policy reason string, and (for
549    /// stops) the loop status.
550    LoopControlDecision {
551        session_id: String,
552        iteration: usize,
553        action: String,
554        old_limit: usize,
555        new_limit: usize,
556        reason: String,
557        status: String,
558    },
559    /// Emitted when `agent_loop` detects adjacent repeated tool calls with
560    /// identical arguments. The warning payload avoids raw arguments by
561    /// default and carries digests so hosts can correlate repeats without
562    /// exposing potentially sensitive tool inputs.
563    AgentLoopStallWarning {
564        session_id: String,
565        warning: serde_json::Value,
566    },
567    /// Emitted when a `tool_caller` middleware (see std/llm/tool_middleware)
568    /// attaches structured audit metadata to a tool call — typically a
569    /// user-facing `summary`, a `description`, an ACP-style `kind`, an MCP
570    /// `hints` block, a `consent` decision, the per-layer `layers` log, or
571    /// free-form `metadata` keys (A2A-style extension slot).
572    ///
573    /// One-to-one with the underlying tool-call: hosts can join on
574    /// `tool_call_id` to render middleware-attached chips alongside the
575    /// existing `ToolCall` / `ToolCallUpdate` stream. The `audit` payload
576    /// is intentionally free-form JSON so middleware can carry whatever
577    /// shape the harness author chooses without needing protocol-level
578    /// changes per new middleware.
579    ToolCallAudit {
580        session_id: String,
581        tool_call_id: String,
582        tool_name: String,
583        audit: serde_json::Value,
584    },
585    /// Emitted by `std/cache::with_cache` (both the generic and LLM
586    /// forms) when a cached lookup returns a hit. Carries the
587    /// content-addressed key, the backend that served the value, and a
588    /// `metrics` block with the cost-moat receipts the persona value
589    /// ledger (harn-cloud#58) and crystallization receipts read:
590    /// `model_calls_avoided`, plus `tokens_saved` / `latency_saved_ms`
591    /// when the cached envelope carried `usage` / `latency_ms`.
592    CacheHit {
593        session_id: String,
594        key: String,
595        backend: String,
596        namespace: String,
597        payload: serde_json::Value,
598    },
599    /// Paired with `CacheHit`. Emitted on the miss path when the
600    /// fresh result is stored. `payload.metrics.compute_ms` carries
601    /// the wall-clock cost of the underlying computation, which
602    /// callers can feed back as `estimate.latency_saved_ms` on the
603    /// next hit.
604    CacheMiss {
605        session_id: String,
606        key: String,
607        backend: String,
608        namespace: String,
609        payload: serde_json::Value,
610    },
611}
612
613impl AgentEvent {
614    pub fn session_id(&self) -> &str {
615        match self {
616            Self::AgentMessageChunk { session_id, .. }
617            | Self::AgentThoughtChunk { session_id, .. }
618            | Self::ToolCall { session_id, .. }
619            | Self::ToolCallUpdate { session_id, .. }
620            | Self::Plan { session_id, .. }
621            | Self::TurnStart { session_id, .. }
622            | Self::TurnEnd { session_id, .. }
623            | Self::JudgeDecision { session_id, .. }
624            | Self::TypedCheckpoint { session_id, .. }
625            | Self::FeedbackInjected { session_id, .. }
626            | Self::BudgetExhausted { session_id, .. }
627            | Self::LoopStuck { session_id, .. }
628            | Self::DaemonWatchdogTripped { session_id, .. }
629            | Self::SkillActivated { session_id, .. }
630            | Self::SkillDeactivated { session_id, .. }
631            | Self::SkillScopeTools { session_id, .. }
632            | Self::ToolSearchQuery { session_id, .. }
633            | Self::ToolSearchResult { session_id, .. }
634            | Self::TranscriptCompacted { session_id, .. }
635            | Self::Handoff { session_id, .. }
636            | Self::FsWatch { session_id, .. }
637            | Self::WorkerUpdate { session_id, .. }
638            | Self::HitlRequested { session_id, .. }
639            | Self::HitlResolved { session_id, .. }
640            | Self::LoopControlDecision { session_id, .. }
641            | Self::AgentLoopStallWarning { session_id, .. }
642            | Self::ToolCallAudit { session_id, .. }
643            | Self::CacheHit { session_id, .. }
644            | Self::CacheMiss { session_id, .. } => session_id,
645        }
646    }
647}
648
649/// External consumers of the event stream (e.g. the harn-cli ACP server,
650/// which translates events into JSON-RPC notifications).
651pub trait AgentEventSink: Send + Sync {
652    fn handle_event(&self, event: &AgentEvent);
653}
654
655/// Envelope written to `event_log.jsonl` (#103). Wraps the raw
656/// `AgentEvent` with monotonic index + timestamp + frame depth so
657/// replay engines can reconstruct paused state at any event index,
658/// and scrubber UIs can bucket events by time. The envelope is the
659/// on-disk shape; the wire format for live consumers is still the
660/// raw `AgentEvent` so existing sinks don't churn.
661#[derive(Clone, Debug, Serialize, Deserialize)]
662pub struct PersistedAgentEvent {
663    /// Monotonic per-session index starting at 0. Unique within a
664    /// session; gaps never happen even under load because the sink
665    /// owns the counter under a mutex.
666    pub index: u64,
667    /// Milliseconds since the Unix epoch, captured when the sink
668    /// received the event. Not the event's emission time — that
669    /// would require threading a clock through every emit site.
670    pub emitted_at_ms: i64,
671    /// Call-stack depth at the moment of emission, when the caller
672    /// can supply it. `None` for events emitted from a context where
673    /// the VM frame stack isn't available.
674    pub frame_depth: Option<u32>,
675    /// The raw event, flattened so `jq '.type'` works as expected.
676    #[serde(flatten)]
677    pub event: AgentEvent,
678}
679
680/// Append-only JSONL sink for a single session's event stream (#103).
681/// One writer per session; sinks rotate to a numbered suffix when a
682/// running file crosses `ROTATE_BYTES` (100 MB today — long chat
683/// sessions rarely exceed 5 MB, so rotation almost never fires).
684pub struct JsonlEventSink {
685    state: Mutex<JsonlEventSinkState>,
686    base_path: std::path::PathBuf,
687}
688
689struct JsonlEventSinkState {
690    writer: std::io::BufWriter<std::fs::File>,
691    index: u64,
692    bytes_written: u64,
693    rotation: u32,
694}
695
696impl JsonlEventSink {
697    /// Hard cap past which the current file rotates to a numbered
698    /// suffix (`event_log-000001.jsonl`). Chosen so long debugging
699    /// sessions don't produce unreadable multi-GB logs.
700    pub const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
701
702    /// Open a new sink writing to `base_path`. Creates parent dirs
703    /// if missing. Overwrites an existing file so each fresh session
704    /// starts from index 0.
705    pub fn open(base_path: impl Into<std::path::PathBuf>) -> std::io::Result<Arc<Self>> {
706        let base_path = base_path.into();
707        if let Some(parent) = base_path.parent() {
708            std::fs::create_dir_all(parent)?;
709        }
710        let file = std::fs::OpenOptions::new()
711            .create(true)
712            .truncate(true)
713            .write(true)
714            .open(&base_path)?;
715        Ok(Arc::new(Self {
716            state: Mutex::new(JsonlEventSinkState {
717                writer: std::io::BufWriter::new(file),
718                index: 0,
719                bytes_written: 0,
720                rotation: 0,
721            }),
722            base_path,
723        }))
724    }
725
726    /// Flush any buffered writes. Called on session shutdown; the
727    /// Drop impl calls this too but on early panic it may not run.
728    pub fn flush(&self) -> std::io::Result<()> {
729        use std::io::Write as _;
730        self.state
731            .lock()
732            .expect("jsonl sink mutex poisoned")
733            .writer
734            .flush()
735    }
736
737    /// Current event index — primarily for tests and the "how many
738    /// events are in this run" run-record summary.
739    pub fn event_count(&self) -> u64 {
740        self.state.lock().expect("jsonl sink mutex poisoned").index
741    }
742
743    fn rotate_if_needed(&self, state: &mut JsonlEventSinkState) -> std::io::Result<()> {
744        use std::io::Write as _;
745        if state.bytes_written < Self::ROTATE_BYTES {
746            return Ok(());
747        }
748        state.writer.flush()?;
749        state.rotation += 1;
750        let suffix = format!("-{:06}", state.rotation);
751        let rotated = self.base_path.with_file_name({
752            let stem = self
753                .base_path
754                .file_stem()
755                .and_then(|s| s.to_str())
756                .unwrap_or("event_log");
757            let ext = self
758                .base_path
759                .extension()
760                .and_then(|e| e.to_str())
761                .unwrap_or("jsonl");
762            format!("{stem}{suffix}.{ext}")
763        });
764        let file = std::fs::OpenOptions::new()
765            .create(true)
766            .truncate(true)
767            .write(true)
768            .open(&rotated)?;
769        state.writer = std::io::BufWriter::new(file);
770        state.bytes_written = 0;
771        Ok(())
772    }
773}
774
775/// Event-log-backed sink for a single session's agent event stream.
776/// Uses the generalized append-only event log when one is installed for
777/// the current VM thread and falls back to `JsonlEventSink` only for
778/// older env-driven workflows.
779pub struct EventLogSink {
780    log: Arc<AnyEventLog>,
781    topic: Topic,
782    session_id: String,
783}
784
785impl EventLogSink {
786    pub fn new(log: Arc<AnyEventLog>, session_id: impl Into<String>) -> Arc<Self> {
787        let session_id = session_id.into();
788        let topic = Topic::new(format!(
789            "observability.agent_events.{}",
790            crate::event_log::sanitize_topic_component(&session_id)
791        ))
792        .expect("session id should sanitize to a valid topic");
793        Arc::new(Self {
794            log,
795            topic,
796            session_id,
797        })
798    }
799}
800
801impl AgentEventSink for JsonlEventSink {
802    fn handle_event(&self, event: &AgentEvent) {
803        use std::io::Write as _;
804        let mut state = self.state.lock().expect("jsonl sink mutex poisoned");
805        let index = state.index;
806        state.index += 1;
807        let emitted_at_ms = std::time::SystemTime::now()
808            .duration_since(std::time::UNIX_EPOCH)
809            .map(|d| d.as_millis() as i64)
810            .unwrap_or(0);
811        let envelope = PersistedAgentEvent {
812            index,
813            emitted_at_ms,
814            frame_depth: None,
815            event: event.clone(),
816        };
817        if let Ok(line) = serde_json::to_string(&envelope) {
818            // One line, newline-terminated — JSON Lines spec.
819            // Errors here are swallowed on purpose; a failing write
820            // must never crash the agent loop, and the run record
821            // itself is a secondary artifact.
822            let _ = state.writer.write_all(line.as_bytes());
823            let _ = state.writer.write_all(b"\n");
824            state.bytes_written += line.len() as u64 + 1;
825            let _ = self.rotate_if_needed(&mut state);
826        }
827    }
828}
829
830impl AgentEventSink for EventLogSink {
831    fn handle_event(&self, event: &AgentEvent) {
832        let event_json = match serde_json::to_value(event) {
833            Ok(value) => value,
834            Err(_) => return,
835        };
836        let event_kind = event_json
837            .get("type")
838            .and_then(|value| value.as_str())
839            .unwrap_or("agent_event")
840            .to_string();
841        let payload = serde_json::json!({
842            "index_hint": now_ms(),
843            "session_id": self.session_id,
844            "event": event_json,
845        });
846        let mut headers = std::collections::BTreeMap::new();
847        headers.insert("session_id".to_string(), self.session_id.clone());
848        let log = self.log.clone();
849        let topic = self.topic.clone();
850        let record = EventLogRecord::new(event_kind, payload).with_headers(headers);
851        if let Ok(handle) = tokio::runtime::Handle::try_current() {
852            handle.spawn(async move {
853                let _ = log.append(&topic, record).await;
854            });
855        } else {
856            let _ = futures::executor::block_on(log.append(&topic, record));
857        }
858    }
859}
860
861impl Drop for JsonlEventSink {
862    fn drop(&mut self) {
863        if let Ok(mut state) = self.state.lock() {
864            use std::io::Write as _;
865            let _ = state.writer.flush();
866        }
867    }
868}
869
870/// Fan-out helper for composing multiple external sinks.
871pub struct MultiSink {
872    sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
873}
874
875impl MultiSink {
876    pub fn new() -> Self {
877        Self {
878            sinks: Mutex::new(Vec::new()),
879        }
880    }
881    pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
882        self.sinks.lock().expect("sink mutex poisoned").push(sink);
883    }
884    pub fn len(&self) -> usize {
885        self.sinks.lock().expect("sink mutex poisoned").len()
886    }
887    pub fn is_empty(&self) -> bool {
888        self.len() == 0
889    }
890}
891
892impl Default for MultiSink {
893    fn default() -> Self {
894        Self::new()
895    }
896}
897
898impl AgentEventSink for MultiSink {
899    fn handle_event(&self, event: &AgentEvent) {
900        // Deliberate: snapshot then release the lock before invoking sink
901        // callbacks. Sinks can re-enter the event system (e.g. a host
902        // sink that logs to another AgentEvent path), so holding the
903        // mutex across the callback would risk self-deadlock. Arc clones
904        // are refcount bumps — cheap.
905        let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
906        for sink in sinks {
907            sink.handle_event(event);
908        }
909    }
910}
911
912#[cfg(test)]
913#[derive(Clone)]
914struct RegisteredSink {
915    owner: std::thread::ThreadId,
916    sink: Arc<dyn AgentEventSink>,
917}
918
919#[cfg(not(test))]
920type RegisteredSink = Arc<dyn AgentEventSink>;
921
922type ExternalSinkRegistry = RwLock<HashMap<String, Vec<RegisteredSink>>>;
923
924fn external_sinks() -> &'static ExternalSinkRegistry {
925    static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
926    REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
927}
928
929pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
930    let session_id = session_id.into();
931    let mut reg = external_sinks().write().expect("sink registry poisoned");
932    #[cfg(test)]
933    let sink = RegisteredSink {
934        owner: std::thread::current().id(),
935        sink,
936    };
937    reg.entry(session_id).or_default().push(sink);
938}
939
940/// Remove all external sinks registered for `session_id`. Does NOT
941/// close the session itself — subscribers and transcript survive, so a
942/// later `agent_loop` call with the same id continues the conversation.
943pub fn clear_session_sinks(session_id: &str) {
944    #[cfg(test)]
945    {
946        let owner = std::thread::current().id();
947        let mut reg = external_sinks().write().expect("sink registry poisoned");
948        if let Some(sinks) = reg.get_mut(session_id) {
949            sinks.retain(|sink| sink.owner != owner);
950            if sinks.is_empty() {
951                reg.remove(session_id);
952            }
953        }
954    }
955    #[cfg(not(test))]
956    {
957        external_sinks()
958            .write()
959            .expect("sink registry poisoned")
960            .remove(session_id);
961    }
962}
963
964pub fn reset_all_sinks() {
965    #[cfg(test)]
966    {
967        let owner = std::thread::current().id();
968        let mut reg = external_sinks().write().expect("sink registry poisoned");
969        reg.retain(|_, sinks| {
970            sinks.retain(|sink| sink.owner != owner);
971            !sinks.is_empty()
972        });
973        crate::agent_sessions::reset_session_store();
974    }
975    #[cfg(not(test))]
976    {
977        external_sinks()
978            .write()
979            .expect("sink registry poisoned")
980            .clear();
981        crate::agent_sessions::reset_session_store();
982    }
983}
984
985/// Mirror externally-registered sinks from `source_session_id` onto
986/// `target_session_id` without moving ownership. Transports such as ACP
987/// register sinks on the outer prompt session before a script runs; scripts
988/// may then open a first-class agent transcript and route `agent_loop` events
989/// through that inner id. Mirroring keeps the transport subscribed to the
990/// in-run child transcript while preserving explicit session ids.
991pub fn mirror_session_sinks(source_session_id: &str, target_session_id: &str) {
992    if source_session_id.is_empty() || target_session_id.is_empty() {
993        return;
994    }
995    if source_session_id == target_session_id {
996        return;
997    }
998    let mut reg = external_sinks().write().expect("sink registry poisoned");
999    let Some(source_sinks) = reg.get(source_session_id).cloned() else {
1000        return;
1001    };
1002    let target = reg.entry(target_session_id.to_string()).or_default();
1003    #[cfg(test)]
1004    {
1005        for source in source_sinks {
1006            let already_present = target
1007                .iter()
1008                .any(|existing| Arc::ptr_eq(&existing.sink, &source.sink));
1009            if !already_present {
1010                target.push(source);
1011            }
1012        }
1013    }
1014    #[cfg(not(test))]
1015    {
1016        for source in source_sinks {
1017            let already_present = target.iter().any(|existing| Arc::ptr_eq(existing, &source));
1018            if !already_present {
1019                target.push(source);
1020            }
1021        }
1022    }
1023}
1024
1025/// Emit an event to external sinks registered for this session. Pipeline
1026/// closure subscribers are NOT called by this function — the agent
1027/// loop owns that path because it needs its async VM context.
1028pub fn emit_event(event: &AgentEvent) {
1029    let sinks: Vec<Arc<dyn AgentEventSink>> = {
1030        let reg = external_sinks().read().expect("sink registry poisoned");
1031        #[cfg(test)]
1032        {
1033            let owner = std::thread::current().id();
1034            reg.get(event.session_id())
1035                .map(|sinks| {
1036                    sinks
1037                        .iter()
1038                        .filter(|sink| sink.owner == owner)
1039                        .map(|sink| sink.sink.clone())
1040                        .collect()
1041                })
1042                .unwrap_or_default()
1043        }
1044        #[cfg(not(test))]
1045        {
1046            reg.get(event.session_id()).cloned().unwrap_or_default()
1047        }
1048    };
1049    for sink in sinks {
1050        sink.handle_event(event);
1051    }
1052}
1053
1054fn now_ms() -> i64 {
1055    std::time::SystemTime::now()
1056        .duration_since(std::time::UNIX_EPOCH)
1057        .map(|duration| duration.as_millis() as i64)
1058        .unwrap_or(0)
1059}
1060
1061pub fn session_external_sink_count(session_id: &str) -> usize {
1062    #[cfg(test)]
1063    {
1064        let owner = std::thread::current().id();
1065        return external_sinks()
1066            .read()
1067            .expect("sink registry poisoned")
1068            .get(session_id)
1069            .map(|sinks| sinks.iter().filter(|sink| sink.owner == owner).count())
1070            .unwrap_or(0);
1071    }
1072    #[cfg(not(test))]
1073    {
1074        external_sinks()
1075            .read()
1076            .expect("sink registry poisoned")
1077            .get(session_id)
1078            .map(|v| v.len())
1079            .unwrap_or(0)
1080    }
1081}
1082
1083pub fn session_closure_subscriber_count(session_id: &str) -> usize {
1084    crate::agent_sessions::subscriber_count(session_id)
1085}
1086
1087#[cfg(test)]
1088mod tests {
1089    use super::*;
1090    use std::sync::atomic::{AtomicUsize, Ordering};
1091
1092    struct CountingSink(Arc<AtomicUsize>);
1093    impl AgentEventSink for CountingSink {
1094        fn handle_event(&self, _event: &AgentEvent) {
1095            self.0.fetch_add(1, Ordering::SeqCst);
1096        }
1097    }
1098
1099    #[test]
1100    fn multi_sink_fans_out_in_order() {
1101        let multi = MultiSink::new();
1102        let a = Arc::new(AtomicUsize::new(0));
1103        let b = Arc::new(AtomicUsize::new(0));
1104        multi.push(Arc::new(CountingSink(a.clone())));
1105        multi.push(Arc::new(CountingSink(b.clone())));
1106        let event = AgentEvent::TurnStart {
1107            session_id: "s1".into(),
1108            iteration: 1,
1109        };
1110        multi.handle_event(&event);
1111        assert_eq!(a.load(Ordering::SeqCst), 1);
1112        assert_eq!(b.load(Ordering::SeqCst), 1);
1113    }
1114
1115    #[test]
1116    fn session_scoped_sink_routing() {
1117        reset_all_sinks();
1118        let a = Arc::new(AtomicUsize::new(0));
1119        let b = Arc::new(AtomicUsize::new(0));
1120        register_sink("session-a", Arc::new(CountingSink(a.clone())));
1121        register_sink("session-b", Arc::new(CountingSink(b.clone())));
1122        emit_event(&AgentEvent::TurnStart {
1123            session_id: "session-a".into(),
1124            iteration: 0,
1125        });
1126        assert_eq!(a.load(Ordering::SeqCst), 1);
1127        assert_eq!(b.load(Ordering::SeqCst), 0);
1128        emit_event(&AgentEvent::TurnEnd {
1129            session_id: "session-b".into(),
1130            iteration: 0,
1131            turn_info: serde_json::json!({}),
1132        });
1133        assert_eq!(a.load(Ordering::SeqCst), 1);
1134        assert_eq!(b.load(Ordering::SeqCst), 1);
1135        clear_session_sinks("session-a");
1136        assert_eq!(session_external_sink_count("session-a"), 0);
1137        assert_eq!(session_external_sink_count("session-b"), 1);
1138        reset_all_sinks();
1139    }
1140
1141    #[test]
1142    fn newly_opened_child_session_inherits_current_external_sinks() {
1143        reset_all_sinks();
1144        let delivered = Arc::new(AtomicUsize::new(0));
1145        register_sink("outer-session", Arc::new(CountingSink(delivered.clone())));
1146        {
1147            let _guard = crate::agent_sessions::enter_current_session("outer-session");
1148            let inner = crate::agent_sessions::open_or_create(None);
1149            assert_ne!(inner, "outer-session");
1150            emit_event(&AgentEvent::TurnStart {
1151                session_id: inner,
1152                iteration: 0,
1153            });
1154        }
1155        assert_eq!(delivered.load(Ordering::SeqCst), 1);
1156        reset_all_sinks();
1157    }
1158
1159    #[test]
1160    fn jsonl_sink_writes_monotonic_indices_and_timestamps() {
1161        use std::io::{BufRead, BufReader};
1162        let dir = std::env::temp_dir().join(format!("harn-event-log-{}", std::process::id()));
1163        std::fs::create_dir_all(&dir).unwrap();
1164        let path = dir.join("event_log.jsonl");
1165        let sink = JsonlEventSink::open(&path).unwrap();
1166        for i in 0..5 {
1167            sink.handle_event(&AgentEvent::TurnStart {
1168                session_id: "s".into(),
1169                iteration: i,
1170            });
1171        }
1172        assert_eq!(sink.event_count(), 5);
1173        sink.flush().unwrap();
1174
1175        // Read back + assert monotonic indices + non-decreasing timestamps.
1176        let file = std::fs::File::open(&path).unwrap();
1177        let mut last_idx: i64 = -1;
1178        let mut last_ts: i64 = 0;
1179        for line in BufReader::new(file).lines() {
1180            let line = line.unwrap();
1181            let val: serde_json::Value = serde_json::from_str(&line).unwrap();
1182            let idx = val["index"].as_i64().unwrap();
1183            let ts = val["emitted_at_ms"].as_i64().unwrap();
1184            assert_eq!(idx, last_idx + 1, "indices must be contiguous");
1185            assert!(ts >= last_ts, "timestamps must be non-decreasing");
1186            last_idx = idx;
1187            last_ts = ts;
1188            // Event payload flattened — type tag must survive.
1189            assert_eq!(val["type"], "turn_start");
1190        }
1191        assert_eq!(last_idx, 4);
1192        let _ = std::fs::remove_file(&path);
1193    }
1194
1195    #[test]
1196    fn judge_decision_round_trips_through_jsonl_sink() {
1197        use std::io::{BufRead, BufReader};
1198        let dir =
1199            std::env::temp_dir().join(format!("harn-judge-event-log-{}", uuid::Uuid::now_v7()));
1200        std::fs::create_dir_all(&dir).unwrap();
1201        let path = dir.join("event_log.jsonl");
1202        let sink = JsonlEventSink::open(&path).unwrap();
1203        sink.handle_event(&AgentEvent::JudgeDecision {
1204            session_id: "s".into(),
1205            iteration: 2,
1206            verdict: "continue".into(),
1207            reasoning: "needs a concrete next step".into(),
1208            next_step: Some("run the verifier".into()),
1209            judge_duration_ms: 17,
1210        });
1211        sink.flush().unwrap();
1212
1213        let file = std::fs::File::open(&path).unwrap();
1214        let line = BufReader::new(file).lines().next().unwrap().unwrap();
1215        let recovered: PersistedAgentEvent = serde_json::from_str(&line).unwrap();
1216        match recovered.event {
1217            AgentEvent::JudgeDecision {
1218                session_id,
1219                iteration,
1220                verdict,
1221                reasoning,
1222                next_step,
1223                judge_duration_ms,
1224            } => {
1225                assert_eq!(session_id, "s");
1226                assert_eq!(iteration, 2);
1227                assert_eq!(verdict, "continue");
1228                assert_eq!(reasoning, "needs a concrete next step");
1229                assert_eq!(next_step.as_deref(), Some("run the verifier"));
1230                assert_eq!(judge_duration_ms, 17);
1231            }
1232            other => panic!("expected JudgeDecision, got {other:?}"),
1233        }
1234        let value: serde_json::Value = serde_json::from_str(&line).unwrap();
1235        assert_eq!(value["type"], "judge_decision");
1236        let _ = std::fs::remove_file(&path);
1237        let _ = std::fs::remove_dir(&dir);
1238    }
1239
1240    #[test]
1241    fn tool_call_update_durations_serialize_when_present_and_skip_when_absent() {
1242        // Terminal update with both durations populated — both fields
1243        // appear in the JSON. Snake_case keys here because this is the
1244        // canonical AgentEvent shape; the ACP adapter renames to
1245        // camelCase separately.
1246        let terminal = AgentEvent::ToolCallUpdate {
1247            session_id: "s".into(),
1248            tool_call_id: "tc-1".into(),
1249            tool_name: "read".into(),
1250            status: ToolCallStatus::Completed,
1251            raw_output: None,
1252            error: None,
1253            duration_ms: Some(42),
1254            execution_duration_ms: Some(7),
1255            error_category: None,
1256            executor: None,
1257            parsing: None,
1258
1259            raw_input: None,
1260            raw_input_partial: None,
1261            audit: None,
1262        };
1263        let value = serde_json::to_value(&terminal).unwrap();
1264        assert_eq!(value["duration_ms"], serde_json::json!(42));
1265        assert_eq!(value["execution_duration_ms"], serde_json::json!(7));
1266
1267        // In-progress update with `None` for both — both keys must be
1268        // absent (not `null`) so older ACP clients that key off
1269        // presence don't see a misleading zero.
1270        let intermediate = AgentEvent::ToolCallUpdate {
1271            session_id: "s".into(),
1272            tool_call_id: "tc-1".into(),
1273            tool_name: "read".into(),
1274            status: ToolCallStatus::InProgress,
1275            raw_output: None,
1276            error: None,
1277            duration_ms: None,
1278            execution_duration_ms: None,
1279            error_category: None,
1280            executor: None,
1281            parsing: None,
1282
1283            raw_input: None,
1284            raw_input_partial: None,
1285            audit: None,
1286        };
1287        let value = serde_json::to_value(&intermediate).unwrap();
1288        let object = value.as_object().expect("update serializes as object");
1289        assert!(
1290            !object.contains_key("duration_ms"),
1291            "duration_ms must be omitted when None: {value}"
1292        );
1293        assert!(
1294            !object.contains_key("execution_duration_ms"),
1295            "execution_duration_ms must be omitted when None: {value}"
1296        );
1297    }
1298
1299    #[test]
1300    fn tool_call_update_deserializes_without_duration_fields_for_back_compat() {
1301        // Persisted event-log entries written before the fields existed
1302        // must still deserialize cleanly. The missing keys map to None.
1303        let raw = serde_json::json!({
1304            "type": "tool_call_update",
1305            "session_id": "s",
1306            "tool_call_id": "tc-1",
1307            "tool_name": "read",
1308            "status": "completed",
1309            "raw_output": null,
1310            "error": null,
1311        });
1312        let event: AgentEvent = serde_json::from_value(raw).expect("parses without duration keys");
1313        match event {
1314            AgentEvent::ToolCallUpdate {
1315                duration_ms,
1316                execution_duration_ms,
1317                ..
1318            } => {
1319                assert!(duration_ms.is_none());
1320                assert!(execution_duration_ms.is_none());
1321            }
1322            other => panic!("expected ToolCallUpdate, got {other:?}"),
1323        }
1324    }
1325
1326    #[test]
1327    fn tool_call_status_serde() {
1328        assert_eq!(
1329            serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
1330            "\"pending\""
1331        );
1332        assert_eq!(
1333            serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
1334            "\"in_progress\""
1335        );
1336        assert_eq!(
1337            serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
1338            "\"completed\""
1339        );
1340        assert_eq!(
1341            serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
1342            "\"failed\""
1343        );
1344    }
1345
1346    #[test]
1347    fn tool_call_error_category_serializes_as_snake_case() {
1348        let pairs = [
1349            (ToolCallErrorCategory::SchemaValidation, "schema_validation"),
1350            (ToolCallErrorCategory::ToolError, "tool_error"),
1351            (ToolCallErrorCategory::McpServerError, "mcp_server_error"),
1352            (ToolCallErrorCategory::HostBridgeError, "host_bridge_error"),
1353            (ToolCallErrorCategory::PermissionDenied, "permission_denied"),
1354            (ToolCallErrorCategory::RejectedLoop, "rejected_loop"),
1355            (ToolCallErrorCategory::ParseAborted, "parse_aborted"),
1356            (ToolCallErrorCategory::Timeout, "timeout"),
1357            (ToolCallErrorCategory::Network, "network"),
1358            (ToolCallErrorCategory::Cancelled, "cancelled"),
1359            (ToolCallErrorCategory::Unknown, "unknown"),
1360        ];
1361        for (variant, wire) in pairs {
1362            let encoded = serde_json::to_string(&variant).unwrap();
1363            assert_eq!(encoded, format!("\"{wire}\""));
1364            assert_eq!(variant.as_str(), wire);
1365            // Round-trip via deserialize so wire stability is enforced
1366            // both ways.
1367            let decoded: ToolCallErrorCategory = serde_json::from_str(&encoded).unwrap();
1368            assert_eq!(decoded, variant);
1369        }
1370    }
1371
1372    #[test]
1373    fn tool_executor_round_trips_with_adjacent_tag() {
1374        // Adjacent tagging keeps the wire shape uniform — every variant
1375        // is a JSON object with a `kind` discriminator. The ACP adapter
1376        // rewrites unit variants as bare strings; the on-disk event log
1377        // keeps the object shape so deserialize can recover the variant.
1378        for executor in [
1379            ToolExecutor::HarnBuiltin,
1380            ToolExecutor::HostBridge,
1381            ToolExecutor::McpServer {
1382                server_name: "linear".to_string(),
1383            },
1384            ToolExecutor::ProviderNative,
1385        ] {
1386            let json = serde_json::to_value(&executor).unwrap();
1387            let kind = json.get("kind").and_then(|v| v.as_str()).unwrap();
1388            match &executor {
1389                ToolExecutor::HarnBuiltin => assert_eq!(kind, "harn_builtin"),
1390                ToolExecutor::HostBridge => assert_eq!(kind, "host_bridge"),
1391                ToolExecutor::McpServer { server_name } => {
1392                    assert_eq!(kind, "mcp_server");
1393                    assert_eq!(json["server_name"], *server_name);
1394                }
1395                ToolExecutor::ProviderNative => assert_eq!(kind, "provider_native"),
1396            }
1397            let recovered: ToolExecutor = serde_json::from_value(json).unwrap();
1398            assert_eq!(recovered, executor);
1399        }
1400    }
1401
1402    #[test]
1403    fn tool_call_error_category_from_internal_collapses_transient_family() {
1404        use crate::value::ErrorCategory as Internal;
1405        assert_eq!(
1406            ToolCallErrorCategory::from_internal(&Internal::Timeout),
1407            ToolCallErrorCategory::Timeout
1408        );
1409        for net in [
1410            Internal::RateLimit,
1411            Internal::Overloaded,
1412            Internal::ServerError,
1413            Internal::TransientNetwork,
1414        ] {
1415            assert_eq!(
1416                ToolCallErrorCategory::from_internal(&net),
1417                ToolCallErrorCategory::Network,
1418                "{net:?} should map to Network",
1419            );
1420        }
1421        assert_eq!(
1422            ToolCallErrorCategory::from_internal(&Internal::SchemaValidation),
1423            ToolCallErrorCategory::SchemaValidation
1424        );
1425        assert_eq!(
1426            ToolCallErrorCategory::from_internal(&Internal::ToolError),
1427            ToolCallErrorCategory::ToolError
1428        );
1429        assert_eq!(
1430            ToolCallErrorCategory::from_internal(&Internal::ToolRejected),
1431            ToolCallErrorCategory::PermissionDenied
1432        );
1433        assert_eq!(
1434            ToolCallErrorCategory::from_internal(&Internal::Cancelled),
1435            ToolCallErrorCategory::Cancelled
1436        );
1437        for bridge in [
1438            Internal::Auth,
1439            Internal::EgressBlocked,
1440            Internal::NotFound,
1441            Internal::CircuitOpen,
1442            Internal::Generic,
1443        ] {
1444            assert_eq!(
1445                ToolCallErrorCategory::from_internal(&bridge),
1446                ToolCallErrorCategory::HostBridgeError,
1447                "{bridge:?} should map to HostBridgeError",
1448            );
1449        }
1450    }
1451
1452    #[test]
1453    fn tool_call_update_event_omits_error_category_when_none() {
1454        let event = AgentEvent::ToolCallUpdate {
1455            session_id: "s".into(),
1456            tool_call_id: "t".into(),
1457            tool_name: "read".into(),
1458            status: ToolCallStatus::Completed,
1459            raw_output: None,
1460            error: None,
1461            duration_ms: None,
1462            execution_duration_ms: None,
1463            error_category: None,
1464            executor: None,
1465            parsing: None,
1466
1467            raw_input: None,
1468            raw_input_partial: None,
1469            audit: None,
1470        };
1471        let v = serde_json::to_value(&event).unwrap();
1472        assert_eq!(v["type"], "tool_call_update");
1473        assert!(v.get("error_category").is_none());
1474    }
1475
1476    #[test]
1477    fn tool_call_update_event_serializes_error_category_when_set() {
1478        let event = AgentEvent::ToolCallUpdate {
1479            session_id: "s".into(),
1480            tool_call_id: "t".into(),
1481            tool_name: "read".into(),
1482            status: ToolCallStatus::Failed,
1483            raw_output: None,
1484            error: Some("missing required field".into()),
1485            duration_ms: None,
1486            execution_duration_ms: None,
1487            error_category: Some(ToolCallErrorCategory::SchemaValidation),
1488            executor: None,
1489            parsing: None,
1490
1491            raw_input: None,
1492            raw_input_partial: None,
1493            audit: None,
1494        };
1495        let v = serde_json::to_value(&event).unwrap();
1496        assert_eq!(v["error_category"], "schema_validation");
1497        assert_eq!(v["error"], "missing required field");
1498    }
1499
1500    #[test]
1501    fn tool_call_update_omits_executor_when_absent() {
1502        // `executor: None` must not appear in the serialized event so
1503        // the on-disk shape stays backward-compatible with replays
1504        // recorded before harn#691.
1505        let event = AgentEvent::ToolCallUpdate {
1506            session_id: "s".into(),
1507            tool_call_id: "tc-1".into(),
1508            tool_name: "read".into(),
1509            status: ToolCallStatus::Completed,
1510            raw_output: None,
1511            error: None,
1512            duration_ms: None,
1513            execution_duration_ms: None,
1514            error_category: None,
1515            executor: None,
1516            parsing: None,
1517
1518            raw_input: None,
1519            raw_input_partial: None,
1520            audit: None,
1521        };
1522        let json = serde_json::to_value(&event).unwrap();
1523        assert!(json.get("executor").is_none(), "got: {json}");
1524    }
1525
1526    #[test]
1527    fn worker_event_status_strings_cover_all_variants() {
1528        // Wire-level status strings flow into both bridge `worker_update`
1529        // payloads and ACP `session/update` notifications. Pinning every
1530        // variant here so a future addition can't silently land without
1531        // a docs/wire decision.
1532        assert_eq!(WorkerEvent::WorkerSpawned.as_status(), "running");
1533        assert_eq!(WorkerEvent::WorkerProgressed.as_status(), "progressed");
1534        assert_eq!(
1535            WorkerEvent::WorkerWaitingForInput.as_status(),
1536            "awaiting_input"
1537        );
1538        assert_eq!(WorkerEvent::WorkerCompleted.as_status(), "completed");
1539        assert_eq!(WorkerEvent::WorkerFailed.as_status(), "failed");
1540        assert_eq!(WorkerEvent::WorkerCancelled.as_status(), "cancelled");
1541
1542        for terminal in [
1543            WorkerEvent::WorkerCompleted,
1544            WorkerEvent::WorkerFailed,
1545            WorkerEvent::WorkerCancelled,
1546        ] {
1547            assert!(terminal.is_terminal(), "{terminal:?} should be terminal");
1548        }
1549        for non_terminal in [
1550            WorkerEvent::WorkerSpawned,
1551            WorkerEvent::WorkerProgressed,
1552            WorkerEvent::WorkerWaitingForInput,
1553        ] {
1554            assert!(
1555                !non_terminal.is_terminal(),
1556                "{non_terminal:?} should not be terminal"
1557            );
1558        }
1559    }
1560
1561    #[test]
1562    fn worker_update_event_routes_through_session_keyed_sink() {
1563        // Worker lifecycle events ride the same session-keyed
1564        // `AgentEventSink` registry as message and tool events. This
1565        // is the canonical path ACP and A2A subscribe to — gate it
1566        // here so a registry-routing regression breaks loudly.
1567        reset_all_sinks();
1568        let captured: Arc<Mutex<Vec<AgentEvent>>> = Arc::new(Mutex::new(Vec::new()));
1569        struct CapturingSink(Arc<Mutex<Vec<AgentEvent>>>);
1570        impl AgentEventSink for CapturingSink {
1571            fn handle_event(&self, event: &AgentEvent) {
1572                self.0
1573                    .lock()
1574                    .expect("captured sink mutex poisoned")
1575                    .push(event.clone());
1576            }
1577        }
1578        register_sink(
1579            "worker-session-1",
1580            Arc::new(CapturingSink(captured.clone())),
1581        );
1582        emit_event(&AgentEvent::WorkerUpdate {
1583            session_id: "worker-session-1".into(),
1584            worker_id: "worker_42".into(),
1585            worker_name: "review_captain".into(),
1586            worker_task: "review pr".into(),
1587            worker_mode: "delegated_stage".into(),
1588            event: WorkerEvent::WorkerWaitingForInput,
1589            status: WorkerEvent::WorkerWaitingForInput.as_status().to_string(),
1590            metadata: serde_json::json!({"awaiting_started_at": "0193..."}),
1591            audit: None,
1592        });
1593        // Other sessions don't receive cross-talk.
1594        emit_event(&AgentEvent::WorkerUpdate {
1595            session_id: "other-session".into(),
1596            worker_id: "w2".into(),
1597            worker_name: "n2".into(),
1598            worker_task: "t2".into(),
1599            worker_mode: "delegated_stage".into(),
1600            event: WorkerEvent::WorkerCompleted,
1601            status: "completed".into(),
1602            metadata: serde_json::json!({}),
1603            audit: None,
1604        });
1605        let received = captured.lock().unwrap().clone();
1606        assert_eq!(received.len(), 1, "got: {received:?}");
1607        match &received[0] {
1608            AgentEvent::WorkerUpdate {
1609                session_id,
1610                worker_id,
1611                event,
1612                status,
1613                ..
1614            } => {
1615                assert_eq!(session_id, "worker-session-1");
1616                assert_eq!(worker_id, "worker_42");
1617                assert_eq!(*event, WorkerEvent::WorkerWaitingForInput);
1618                assert_eq!(status, "awaiting_input");
1619            }
1620            other => panic!("expected WorkerUpdate, got {other:?}"),
1621        }
1622        reset_all_sinks();
1623    }
1624
1625    #[test]
1626    fn worker_update_event_serializes_to_canonical_shape() {
1627        // Persisted event-log entries flatten the AgentEvent envelope,
1628        // so the WorkerUpdate variant must serialize with a `type` of
1629        // `worker_update` and the worker fields directly on the
1630        // top-level object (matching the `#[serde(tag = "type", ...)]`
1631        // shape the rest of AgentEvent uses).
1632        let event = AgentEvent::WorkerUpdate {
1633            session_id: "s".into(),
1634            worker_id: "w".into(),
1635            worker_name: "n".into(),
1636            worker_task: "t".into(),
1637            worker_mode: "delegated_stage".into(),
1638            event: WorkerEvent::WorkerProgressed,
1639            status: "progressed".into(),
1640            metadata: serde_json::json!({"started_at": "0193..."}),
1641            audit: Some(serde_json::json!({"run_id": "run_x"})),
1642        };
1643        let value = serde_json::to_value(&event).unwrap();
1644        assert_eq!(value["type"], "worker_update");
1645        assert_eq!(value["session_id"], "s");
1646        assert_eq!(value["worker_id"], "w");
1647        assert_eq!(value["status"], "progressed");
1648        assert_eq!(value["audit"]["run_id"], "run_x");
1649
1650        // Round-trip: the persisted event log must deserialize the
1651        // canonical shape back into the typed variant so replay
1652        // tooling can re-derive lifecycle state offline.
1653        let recovered: AgentEvent = serde_json::from_value(value).unwrap();
1654        match recovered {
1655            AgentEvent::WorkerUpdate {
1656                event: recovered_event,
1657                ..
1658            } => assert_eq!(recovered_event, WorkerEvent::WorkerProgressed),
1659            other => panic!("expected WorkerUpdate, got {other:?}"),
1660        }
1661    }
1662
1663    #[test]
1664    fn tool_call_update_includes_executor_when_present() {
1665        let event = AgentEvent::ToolCallUpdate {
1666            session_id: "s".into(),
1667            tool_call_id: "tc-1".into(),
1668            tool_name: "read".into(),
1669            status: ToolCallStatus::Completed,
1670            raw_output: None,
1671            error: None,
1672            duration_ms: None,
1673            execution_duration_ms: None,
1674            error_category: None,
1675            executor: Some(ToolExecutor::McpServer {
1676                server_name: "github".into(),
1677            }),
1678            parsing: None,
1679
1680            raw_input: None,
1681            raw_input_partial: None,
1682            audit: None,
1683        };
1684        let json = serde_json::to_value(&event).unwrap();
1685        assert_eq!(json["executor"]["kind"], "mcp_server");
1686        assert_eq!(json["executor"]["server_name"], "github");
1687    }
1688
1689    #[test]
1690    fn tool_call_update_omits_audit_when_absent() {
1691        let event = AgentEvent::ToolCallUpdate {
1692            session_id: "s".into(),
1693            tool_call_id: "tc-1".into(),
1694            tool_name: "read".into(),
1695            status: ToolCallStatus::Completed,
1696            raw_output: None,
1697            error: None,
1698            duration_ms: None,
1699            execution_duration_ms: None,
1700            error_category: None,
1701            executor: None,
1702            parsing: None,
1703            raw_input: None,
1704            raw_input_partial: None,
1705            audit: None,
1706        };
1707        let json = serde_json::to_value(&event).unwrap();
1708        assert!(json.get("audit").is_none(), "got: {json}");
1709    }
1710
1711    #[test]
1712    fn tool_call_update_includes_audit_when_present() {
1713        let audit = MutationSessionRecord {
1714            session_id: "session_42".into(),
1715            run_id: Some("run_42".into()),
1716            mutation_scope: "apply_workspace".into(),
1717            execution_kind: Some("worker".into()),
1718            ..Default::default()
1719        };
1720        let event = AgentEvent::ToolCallUpdate {
1721            session_id: "s".into(),
1722            tool_call_id: "tc-1".into(),
1723            tool_name: "edit_file".into(),
1724            status: ToolCallStatus::Completed,
1725            raw_output: None,
1726            error: None,
1727            duration_ms: None,
1728            execution_duration_ms: None,
1729            error_category: None,
1730            executor: Some(ToolExecutor::HostBridge),
1731            parsing: None,
1732            raw_input: None,
1733            raw_input_partial: None,
1734            audit: Some(audit),
1735        };
1736        let json = serde_json::to_value(&event).unwrap();
1737        assert_eq!(json["audit"]["session_id"], "session_42");
1738        assert_eq!(json["audit"]["run_id"], "run_42");
1739        assert_eq!(json["audit"]["mutation_scope"], "apply_workspace");
1740        assert_eq!(json["audit"]["execution_kind"], "worker");
1741    }
1742
1743    #[test]
1744    fn tool_call_update_deserializes_without_audit_field_for_back_compat() {
1745        let raw = serde_json::json!({
1746            "type": "tool_call_update",
1747            "session_id": "s",
1748            "tool_call_id": "tc-1",
1749            "tool_name": "read",
1750            "status": "completed",
1751            "raw_output": null,
1752            "error": null,
1753        });
1754        let event: AgentEvent = serde_json::from_value(raw).expect("parses without audit key");
1755        match event {
1756            AgentEvent::ToolCallUpdate { audit, .. } => {
1757                assert!(audit.is_none());
1758            }
1759            other => panic!("expected ToolCallUpdate, got {other:?}"),
1760        }
1761    }
1762
1763    #[test]
1764    fn tool_call_audit_serializes_with_free_form_audit_payload() {
1765        // Middleware-attached metadata is intentionally free-form JSON
1766        // (A2A-style `metadata` extension slot). The wire format must
1767        // preserve nested dicts + lists verbatim so hosts can read
1768        // `summary`/`consent`/`layers`/etc. without per-field schema.
1769        let audit = serde_json::json!({
1770            "summary": "Searched codebase",
1771            "kind": "search",
1772            "consent": {"decision": "approved", "decided_by": "auto"},
1773            "layers": [{"name": "with_required_reason", "status": "ok"}],
1774        });
1775        let event = AgentEvent::ToolCallAudit {
1776            session_id: "s".into(),
1777            tool_call_id: "tc-1".into(),
1778            tool_name: "search_files".into(),
1779            audit: audit.clone(),
1780        };
1781        let json = serde_json::to_value(&event).unwrap();
1782        assert_eq!(json["type"], "tool_call_audit");
1783        assert_eq!(json["session_id"], "s");
1784        assert_eq!(json["tool_call_id"], "tc-1");
1785        assert_eq!(json["tool_name"], "search_files");
1786        assert_eq!(json["audit"], audit);
1787    }
1788
1789    #[test]
1790    fn tool_call_audit_session_id_routes_correctly() {
1791        let event = AgentEvent::ToolCallAudit {
1792            session_id: "abc".into(),
1793            tool_call_id: "tc".into(),
1794            tool_name: "read".into(),
1795            audit: serde_json::Value::Null,
1796        };
1797        assert_eq!(event.session_id(), "abc");
1798    }
1799}