Skip to main content

harn_vm/
agent_events.rs

1//! Agent event stream — the ACP-aligned observation surface for the
2//! agent loop.
3//!
4//! Every phase of the turn loop emits an `AgentEvent`. The canonical
5//! variants map 1:1 onto ACP `SessionUpdate` values; three internal
6//! variants (`TurnStart`, `TurnEnd`, `FeedbackInjected`) let pipelines
7//! react to loop milestones that don't have a direct ACP counterpart.
8//!
9//! There are two subscription paths, both keyed on session id so two
10//! concurrent sessions never cross-talk:
11//!
12//! 1. **External sinks** (`AgentEventSink` trait) — Rust-side consumers
13//!    like the harn-cli ACP server. Invoked synchronously by the loop.
14//!    Stored in a global `OnceLock<RwLock<HashMap<...>>>` here.
15//! 2. **Closure subscribers** — `.harn` closures registered via the
16//!    `agent_subscribe(session_id, callback)` host builtin. These live
17//!    on the session's `SessionState.subscribers` in
18//!    `crate::agent_sessions`, because sessions are the single source
19//!    of truth for session-scoped VM state.
20
21use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23
24use serde::{Deserialize, Serialize};
25
26use crate::event_log::{AnyEventLog, EventLog, LogEvent as EventLogRecord, Topic};
27use crate::orchestration::{HandoffArtifact, MutationSessionRecord};
28use crate::tool_annotations::ToolKind;
29
30/// One coalesced filesystem notification from a hostlib `fs_watch`
31/// subscription.
32#[derive(Clone, Debug, Serialize, Deserialize)]
33pub struct FsWatchEvent {
34    pub kind: String,
35    pub paths: Vec<String>,
36    pub relative_paths: Vec<String>,
37    pub raw_kind: String,
38    pub error: Option<String>,
39}
40
41/// Typed worker lifecycle events emitted by delegated/background agent
42/// execution. Bridge-facing worker updates still derive a string status
43/// from these variants, but the runtime no longer passes raw status
44/// strings around internally.
45///
46/// `Spawned`/`Completed`/`Failed`/`Cancelled` are the four terminal-or-start
47/// states. `Progressed` is fired on intermediate milestones (e.g. a
48/// retriggerable worker resuming from `awaiting_input`, or a workflow
49/// stage completing without ending the worker). `WaitingForInput` covers
50/// retriggerable workers that finish a cycle but stay alive pending the
51/// next host-supplied trigger payload.
52#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
53pub enum WorkerEvent {
54    WorkerSpawned,
55    WorkerProgressed,
56    WorkerWaitingForInput,
57    WorkerCompleted,
58    WorkerFailed,
59    WorkerCancelled,
60}
61
62impl WorkerEvent {
63    /// Wire-level status string used by bridge `worker_update` payloads
64    /// and ACP `worker_update` session updates. The four canonical
65    /// states are mirrored from harn's internal worker `status` field
66    /// (`running`/`completed`/`failed`/`cancelled`), and the two newer
67    /// lifecycle states pick names that don't collide with any existing
68    /// status string.
69    pub fn as_status(self) -> &'static str {
70        match self {
71            Self::WorkerSpawned => "running",
72            Self::WorkerProgressed => "progressed",
73            Self::WorkerWaitingForInput => "awaiting_input",
74            Self::WorkerCompleted => "completed",
75            Self::WorkerFailed => "failed",
76            Self::WorkerCancelled => "cancelled",
77        }
78    }
79
80    pub fn as_str(self) -> &'static str {
81        match self {
82            Self::WorkerSpawned => "WorkerSpawned",
83            Self::WorkerProgressed => "WorkerProgressed",
84            Self::WorkerWaitingForInput => "WorkerWaitingForInput",
85            Self::WorkerCompleted => "WorkerCompleted",
86            Self::WorkerFailed => "WorkerFailed",
87            Self::WorkerCancelled => "WorkerCancelled",
88        }
89    }
90
91    /// True for lifecycle events that mean the worker has reached a
92    /// final, non-resumable state. Retriggerable awaiting and
93    /// progressed milestones are *not* terminal — the worker keeps
94    /// running or is waiting for a trigger.
95    pub fn is_terminal(self) -> bool {
96        matches!(
97            self,
98            Self::WorkerCompleted | Self::WorkerFailed | Self::WorkerCancelled
99        )
100    }
101}
102
103/// Status of a tool call. Mirrors ACP's `toolCallStatus`.
104#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
105#[serde(rename_all = "snake_case")]
106pub enum ToolCallStatus {
107    /// Dispatched by the model but not yet started.
108    Pending,
109    /// Dispatch is actively running.
110    InProgress,
111    /// Finished successfully.
112    Completed,
113    /// Finished with an error.
114    Failed,
115}
116
117impl ToolCallStatus {
118    pub const ALL: [Self; 4] = [
119        Self::Pending,
120        Self::InProgress,
121        Self::Completed,
122        Self::Failed,
123    ];
124
125    pub fn as_str(self) -> &'static str {
126        match self {
127            Self::Pending => "pending",
128            Self::InProgress => "in_progress",
129            Self::Completed => "completed",
130            Self::Failed => "failed",
131        }
132    }
133}
134
135/// Wire-level classification of a `ToolCallUpdate` failure. Pairs with the
136/// human-readable `error` string so clients can render each failure type
137/// distinctly (e.g. surface a "permission denied" badge, or a different
138/// retry affordance for `network` vs `tool_error`). The enum is
139/// deliberately extensible — `unknown` is the default when the runtime
140/// could not classify a failure.
141#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
142#[serde(rename_all = "snake_case")]
143pub enum ToolCallErrorCategory {
144    /// Host-side validation rejected the args (missing required field,
145    /// invalid type, malformed JSON).
146    SchemaValidation,
147    /// The tool ran and returned an error result (e.g. `read_file` on a
148    /// missing path) — distinguished from a transport failure.
149    ToolError,
150    /// MCP transport / server-protocol error.
151    McpServerError,
152    /// Burin Swift host bridge returned an error during dispatch.
153    HostBridgeError,
154    /// `session/request_permission` denied by the client, or a policy
155    /// rule (static or dynamic) refused the call.
156    PermissionDenied,
157    /// The harn loop detector skipped this call because the same
158    /// (tool, args) pair repeated past the configured threshold.
159    RejectedLoop,
160    /// Streaming text candidate was detected (bare `name(` or
161    /// `<tool_call>` opener) but never resolved into a parseable call:
162    /// args parsed as malformed, the heredoc body broke, the tag closed
163    /// without a balanced expression, or the stream ended mid-call.
164    /// Used by the streaming candidate detector (harn#692) to retract a
165    /// `tool_call` candidate that turned out to be prose or syntactically
166    /// broken so clients can dismiss the in-flight chip.
167    ParseAborted,
168    /// The tool exceeded its time budget.
169    Timeout,
170    /// Transient network / rate-limited / 5xx provider failure.
171    Network,
172    /// The tool was cancelled (e.g. session aborted).
173    Cancelled,
174    /// Default when classification was not performed.
175    Unknown,
176}
177
178impl ToolCallErrorCategory {
179    pub const ALL: [Self; 11] = [
180        Self::SchemaValidation,
181        Self::ToolError,
182        Self::McpServerError,
183        Self::HostBridgeError,
184        Self::PermissionDenied,
185        Self::RejectedLoop,
186        Self::ParseAborted,
187        Self::Timeout,
188        Self::Network,
189        Self::Cancelled,
190        Self::Unknown,
191    ];
192
193    pub fn as_str(self) -> &'static str {
194        match self {
195            Self::SchemaValidation => "schema_validation",
196            Self::ToolError => "tool_error",
197            Self::McpServerError => "mcp_server_error",
198            Self::HostBridgeError => "host_bridge_error",
199            Self::PermissionDenied => "permission_denied",
200            Self::RejectedLoop => "rejected_loop",
201            Self::ParseAborted => "parse_aborted",
202            Self::Timeout => "timeout",
203            Self::Network => "network",
204            Self::Cancelled => "cancelled",
205            Self::Unknown => "unknown",
206        }
207    }
208
209    /// Map an internal `ErrorCategory` (used by the VM's `VmError`
210    /// classification) onto the wire enum. The internal taxonomy is
211    /// finer-grained — several transient categories collapse onto
212    /// `Network`, and the auth/quota family becomes `HostBridgeError`
213    /// because at the tool-dispatch boundary those errors come from
214    /// the bridge transport rather than the tool itself.
215    pub fn from_internal(category: &crate::value::ErrorCategory) -> Self {
216        use crate::value::ErrorCategory as Internal;
217        match category {
218            Internal::Timeout => Self::Timeout,
219            Internal::RateLimit
220            | Internal::Overloaded
221            | Internal::ServerError
222            | Internal::TransientNetwork => Self::Network,
223            Internal::SchemaValidation => Self::SchemaValidation,
224            Internal::ToolError => Self::ToolError,
225            Internal::ToolRejected => Self::PermissionDenied,
226            Internal::Cancelled => Self::Cancelled,
227            Internal::Auth
228            | Internal::EgressBlocked
229            | Internal::NotFound
230            | Internal::CircuitOpen
231            | Internal::BudgetExceeded
232            | Internal::Generic => Self::HostBridgeError,
233        }
234    }
235}
236
237/// Where a tool actually ran. Tags `ToolCallUpdate` so clients can render
238/// "via mcp:linear" / "via host bridge" badges, attribute latency by
239/// transport, and route errors to the right surface (harn#691).
240///
241/// On the wire this serializes adjacently-tagged so the `mcp_server`
242/// case carries the configured server name. The ACP adapter rewrites
243/// unit variants as bare strings (`"harn_builtin"`, `"host_bridge"`,
244/// `"provider_native"`) and the `McpServer` case as
245/// `{"kind": "mcp_server", "serverName": "..."}` to match the protocol's
246/// camelCase convention.
247#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
248#[serde(tag = "kind", rename_all = "snake_case")]
249pub enum ToolExecutor {
250    /// VM-stdlib (`read_file`, `write_file`, `exec`, `http_*`, `mcp_*`)
251    /// or any Harn-side handler closure registered in `tools_val`.
252    HarnBuiltin,
253    /// Capability provided by the host through `HostBridge.builtin_call`
254    /// (Swift-side IDE bridge, BurinApp, BurinCLI host shells).
255    HostBridge,
256    /// Tool dispatched against a configured MCP server. Detected by the
257    /// `_mcp_server` tag that `mcp_list_tools` injects on every tool
258    /// dict before the agent loop sees it.
259    McpServer { server_name: String },
260    /// Provider-side server-side tool execution — currently OpenAI
261    /// Responses-API server tools (e.g. native `tool_search`). The
262    /// runtime never dispatches these locally; the model returns the
263    /// already-executed result inline.
264    ProviderNative,
265}
266
267/// Events emitted by the agent loop. The first five variants map 1:1
268/// to ACP `sessionUpdate` variants; the last three are harn-internal.
269#[derive(Clone, Debug, Serialize, Deserialize)]
270#[serde(tag = "type", rename_all = "snake_case")]
271pub enum AgentEvent {
272    AgentMessageChunk {
273        session_id: String,
274        content: String,
275    },
276    AgentThoughtChunk {
277        session_id: String,
278        content: String,
279    },
280    ToolCall {
281        session_id: String,
282        tool_call_id: String,
283        tool_name: String,
284        kind: Option<ToolKind>,
285        status: ToolCallStatus,
286        raw_input: serde_json::Value,
287        /// Set to `Some(true)` by the streaming candidate detector
288        /// (harn#692) when this event represents a tool-call shape
289        /// detected in the model's in-flight assistant text but whose
290        /// arguments have not finished parsing yet. Clients can render a
291        /// spinner / placeholder while the model writes the body. The
292        /// detector follows up with a `ToolCallUpdate { parsing: false,
293        /// .. }` carrying either `status: pending` (promoted) or
294        /// `status: failed` with `error_category: parse_aborted`.
295        /// `None` (the default) means "this is a normal post-parse tool
296        /// call, no candidate phase was active" so the on-disk shape
297        /// stays compatible with replays recorded before this field
298        /// existed.
299        #[serde(default, skip_serializing_if = "Option::is_none")]
300        parsing: Option<bool>,
301        /// Mutation-session audit context active when the tool was
302        /// dispatched (see harn#699). Hosts use it to group every tool
303        /// emission belonging to the same write-capable session.
304        #[serde(default, skip_serializing_if = "Option::is_none")]
305        audit: Option<MutationSessionRecord>,
306    },
307    ToolCallUpdate {
308        session_id: String,
309        tool_call_id: String,
310        tool_name: String,
311        status: ToolCallStatus,
312        raw_output: Option<serde_json::Value>,
313        error: Option<String>,
314        /// Wall-clock milliseconds from the parse-to-execution boundary
315        /// to the terminal `Completed`/`Failed` update. Includes the
316        /// time spent in any wrapping orchestration logic (loop checks,
317        /// post-tool hooks, microcompaction). Populated only on the
318        /// terminal update — `None` on intermediate `Pending` /
319        /// `InProgress` updates so clients can ignore the field until
320        /// it shows up.
321        #[serde(default, skip_serializing_if = "Option::is_none")]
322        duration_ms: Option<u64>,
323        /// Milliseconds spent in the actual host/builtin/MCP dispatch
324        /// call only (the inner `dispatch_tool_execution` window).
325        /// Populated only on the terminal update; `None` otherwise.
326        #[serde(default, skip_serializing_if = "Option::is_none")]
327        execution_duration_ms: Option<u64>,
328        /// Structured classification of the failure (when `status` is
329        /// `Failed`). Paired with `error` so clients can render each
330        /// category distinctly without parsing free-form strings. Always
331        /// `None` for non-Failed updates and serialized as
332        /// `errorCategory` in the ACP wire format.
333        #[serde(default, skip_serializing_if = "Option::is_none")]
334        error_category: Option<ToolCallErrorCategory>,
335        /// Where the tool actually ran. `None` only for events emitted
336        /// from sites that pre-date the dispatch decision (e.g. the
337        /// pending → in-progress transition the loop emits before the
338        /// dispatcher picks a backend).
339        #[serde(default, skip_serializing_if = "Option::is_none")]
340        executor: Option<ToolExecutor>,
341        /// Companion to `ToolCall.parsing` (harn#692). The streaming
342        /// candidate detector emits the *terminal* candidate event as a
343        /// `ToolCallUpdate` with `parsing: Some(false)` to retract the
344        /// in-flight `parsing: true` chip — either by promoting the
345        /// candidate (`status: pending`, populated `raw_output: None`,
346        /// `error: None`) or aborting it (`status: failed`,
347        /// `error_category: parse_aborted`). `None` means this update is
348        /// not part of a candidate-phase transition.
349        #[serde(default, skip_serializing_if = "Option::is_none")]
350        parsing: Option<bool>,
351        /// Best-effort partial parse of the streamed tool-call arguments.
352        /// Populated by the SSE transport on `Pending` updates as the
353        /// model streams `input_json_delta` (Anthropic) or
354        /// `tool_calls[].function.arguments` deltas (OpenAI). `None` on
355        /// terminal updates and on emissions from non-streaming paths
356        /// (#693). When the partial bytes are not yet parseable as JSON
357        /// the transport falls back to `raw_input_partial`.
358        #[serde(default, skip_serializing_if = "Option::is_none")]
359        raw_input: Option<serde_json::Value>,
360        /// Raw concatenated bytes of the streamed tool-call arguments
361        /// when a permissive parse failed (#693). Mutually exclusive
362        /// with `raw_input`: clients render whichever is present.
363        #[serde(default, skip_serializing_if = "Option::is_none")]
364        raw_input_partial: Option<String>,
365        /// Mutation-session audit context for the tool call. Carries the
366        /// same payload as on the paired `ToolCall` event so a host
367        /// processing a single update doesn't have to correlate against
368        /// the prior pending event.
369        #[serde(default, skip_serializing_if = "Option::is_none")]
370        audit: Option<MutationSessionRecord>,
371    },
372    Plan {
373        session_id: String,
374        plan: serde_json::Value,
375    },
376    TurnStart {
377        session_id: String,
378        iteration: usize,
379    },
380    TurnEnd {
381        session_id: String,
382        iteration: usize,
383        turn_info: serde_json::Value,
384    },
385    JudgeDecision {
386        session_id: String,
387        iteration: usize,
388        verdict: String,
389        reasoning: String,
390        next_step: Option<String>,
391        judge_duration_ms: u64,
392    },
393    TypedCheckpoint {
394        session_id: String,
395        checkpoint: serde_json::Value,
396    },
397    FeedbackInjected {
398        session_id: String,
399        kind: String,
400        content: String,
401    },
402    /// Emitted when the agent loop exhausts `max_iterations` without any
403    /// explicit break condition firing. Distinct from a natural "done" or
404    /// a "stuck" nudge-exhaustion: this is strictly a budget cap.
405    BudgetExhausted {
406        session_id: String,
407        max_iterations: usize,
408    },
409    /// Emitted when the loop breaks because consecutive text-only turns
410    /// hit `max_nudges`. Parity with `BudgetExhausted` / `TurnEnd` for
411    /// hosts that key off agent-terminal events.
412    LoopStuck {
413        session_id: String,
414        max_nudges: usize,
415        last_iteration: usize,
416        tail_excerpt: String,
417    },
418    /// Emitted when the daemon idle-wait loop trips its watchdog because
419    /// every configured wake source returned `None` for N consecutive
420    /// attempts. Exists so a broken daemon doesn't hang the session
421    /// silently.
422    DaemonWatchdogTripped {
423        session_id: String,
424        attempts: usize,
425        elapsed_ms: u64,
426    },
427    /// Emitted when a skill is activated. Carries the match reason so
428    /// replayers can reconstruct *why* a given skill took effect at
429    /// this iteration.
430    SkillActivated {
431        session_id: String,
432        skill_name: String,
433        iteration: usize,
434        reason: String,
435    },
436    /// Emitted when a previously-active skill is deactivated because
437    /// the reassess phase no longer matches it.
438    SkillDeactivated {
439        session_id: String,
440        skill_name: String,
441        iteration: usize,
442    },
443    /// Emitted once per activation when the skill's `allowed_tools` filter
444    /// narrows the effective tool surface exposed to the model.
445    SkillScopeTools {
446        session_id: String,
447        skill_name: String,
448        allowed_tools: Vec<String>,
449    },
450    /// Emitted when a `tool_search` query is issued by the model. Carries
451    /// the raw query args, the configured strategy, and a `mode` tag
452    /// distinguishing the client-executed fallback (`"client"`) from
453    /// provider-native paths (`"anthropic"` / `"openai"`). Mirrors the
454    /// transcript event shape so hosts can render a search-in-progress
455    /// chip in real time — the replay path walks the transcript after
456    /// the turn, which is too late for live UX.
457    ToolSearchQuery {
458        session_id: String,
459        tool_use_id: String,
460        name: String,
461        query: serde_json::Value,
462        strategy: String,
463        mode: String,
464    },
465    /// Emitted when `tool_search` resolves — carries the list of tool
466    /// names newly promoted into the model's effective surface for the
467    /// next turn. Pair-emitted with `ToolSearchQuery` on every search.
468    ToolSearchResult {
469        session_id: String,
470        tool_use_id: String,
471        promoted: Vec<String>,
472        strategy: String,
473        mode: String,
474    },
475    TranscriptCompacted {
476        session_id: String,
477        mode: String,
478        strategy: String,
479        archived_messages: usize,
480        estimated_tokens_before: usize,
481        estimated_tokens_after: usize,
482        snapshot_asset_id: Option<String>,
483    },
484    Handoff {
485        session_id: String,
486        artifact_id: String,
487        handoff: Box<HandoffArtifact>,
488    },
489    FsWatch {
490        session_id: String,
491        subscription_id: String,
492        events: Vec<FsWatchEvent>,
493    },
494    /// Lifecycle update for a delegated/background worker. Carries the
495    /// canonical typed `event` variant alongside the worker's current
496    /// `status` string and the structured `metadata` payload that
497    /// `worker_bridge_metadata` builds (task, mode, timing, child
498    /// run/snapshot paths, audit-session, etc.). The `audit` field is
499    /// the same `MutationSessionRecord` JSON serialization carried on
500    /// the bridge wire so ACP/A2A consumers don't need to re-derive it.
501    ///
502    /// One-to-one with the bridge-side `worker_update` session-update
503    /// notification: ACP and A2A adapters subscribe to this variant
504    /// and translate it into their respective wire formats. The
505    /// `session_id` is the parent agent session that owns the worker
506    /// (i.e. the session whose VM spawned the worker), so a single
507    /// host stays subscribed to the same sink for both message and
508    /// worker traffic.
509    WorkerUpdate {
510        session_id: String,
511        worker_id: String,
512        worker_name: String,
513        worker_task: String,
514        worker_mode: String,
515        event: WorkerEvent,
516        status: String,
517        metadata: serde_json::Value,
518        audit: Option<serde_json::Value>,
519    },
520    /// A human-in-the-loop primitive (`ask_user`, `request_approval`,
521    /// `dual_control`, `escalate`) has just suspended the script and is
522    /// waiting on a response. Hosts that bridge the VM onto a remote
523    /// transport (ACP, A2A) translate this into a "paused / awaiting
524    /// input" wire signal so the client knows the task isn't stuck —
525    /// it's blocked on the human side. Pair-emitted with `HitlResolved`
526    /// when the waitpoint completes/cancels/times out.
527    HitlRequested {
528        session_id: String,
529        request_id: String,
530        kind: String,
531        payload: serde_json::Value,
532    },
533    /// Companion to `HitlRequested`: the waitpoint has resolved (either
534    /// a response arrived, the deadline elapsed, or the request was
535    /// cancelled). `outcome` is one of `"answered"`, `"timeout"`,
536    /// `"cancelled"`. Hosts use this to flip task state back to
537    /// `working` after an `input-required` pause.
538    HitlResolved {
539        session_id: String,
540        request_id: String,
541        kind: String,
542        outcome: String,
543    },
544    /// Emitted by the agent loop's adaptive iteration budget /
545    /// `loop_control` policy when a budget extension or early stop fires.
546    /// Generic enough to cover both shapes — `action` distinguishes them.
547    /// Carries the iteration the decision applied to, the previous /
548    /// resulting iteration limit, the policy reason string, and (for
549    /// stops) the loop status.
550    LoopControlDecision {
551        session_id: String,
552        iteration: usize,
553        action: String,
554        old_limit: usize,
555        new_limit: usize,
556        reason: String,
557        status: String,
558    },
559    /// Emitted when `agent_loop` detects adjacent repeated tool calls with
560    /// identical arguments. The warning payload avoids raw arguments by
561    /// default and carries digests so hosts can correlate repeats without
562    /// exposing potentially sensitive tool inputs.
563    AgentLoopStallWarning {
564        session_id: String,
565        warning: serde_json::Value,
566    },
567    /// Emitted when a `tool_caller` middleware (see std/llm/tool_middleware)
568    /// attaches structured audit metadata to a tool call — typically a
569    /// user-facing `summary`, a `description`, an ACP-style `kind`, an MCP
570    /// `hints` block, a `consent` decision, the per-layer `layers` log, or
571    /// free-form `metadata` keys (A2A-style extension slot).
572    ///
573    /// One-to-one with the underlying tool-call: hosts can join on
574    /// `tool_call_id` to render middleware-attached chips alongside the
575    /// existing `ToolCall` / `ToolCallUpdate` stream. The `audit` payload
576    /// is intentionally free-form JSON so middleware can carry whatever
577    /// shape the harness author chooses without needing protocol-level
578    /// changes per new middleware.
579    ToolCallAudit {
580        session_id: String,
581        tool_call_id: String,
582        tool_name: String,
583        audit: serde_json::Value,
584    },
585}
586
587impl AgentEvent {
588    pub fn session_id(&self) -> &str {
589        match self {
590            Self::AgentMessageChunk { session_id, .. }
591            | Self::AgentThoughtChunk { session_id, .. }
592            | Self::ToolCall { session_id, .. }
593            | Self::ToolCallUpdate { session_id, .. }
594            | Self::Plan { session_id, .. }
595            | Self::TurnStart { session_id, .. }
596            | Self::TurnEnd { session_id, .. }
597            | Self::JudgeDecision { session_id, .. }
598            | Self::TypedCheckpoint { session_id, .. }
599            | Self::FeedbackInjected { session_id, .. }
600            | Self::BudgetExhausted { session_id, .. }
601            | Self::LoopStuck { session_id, .. }
602            | Self::DaemonWatchdogTripped { session_id, .. }
603            | Self::SkillActivated { session_id, .. }
604            | Self::SkillDeactivated { session_id, .. }
605            | Self::SkillScopeTools { session_id, .. }
606            | Self::ToolSearchQuery { session_id, .. }
607            | Self::ToolSearchResult { session_id, .. }
608            | Self::TranscriptCompacted { session_id, .. }
609            | Self::Handoff { session_id, .. }
610            | Self::FsWatch { session_id, .. }
611            | Self::WorkerUpdate { session_id, .. }
612            | Self::HitlRequested { session_id, .. }
613            | Self::HitlResolved { session_id, .. }
614            | Self::LoopControlDecision { session_id, .. }
615            | Self::AgentLoopStallWarning { session_id, .. }
616            | Self::ToolCallAudit { session_id, .. } => session_id,
617        }
618    }
619}
620
621/// External consumers of the event stream (e.g. the harn-cli ACP server,
622/// which translates events into JSON-RPC notifications).
623pub trait AgentEventSink: Send + Sync {
624    fn handle_event(&self, event: &AgentEvent);
625}
626
627/// Envelope written to `event_log.jsonl` (#103). Wraps the raw
628/// `AgentEvent` with monotonic index + timestamp + frame depth so
629/// replay engines can reconstruct paused state at any event index,
630/// and scrubber UIs can bucket events by time. The envelope is the
631/// on-disk shape; the wire format for live consumers is still the
632/// raw `AgentEvent` so existing sinks don't churn.
633#[derive(Clone, Debug, Serialize, Deserialize)]
634pub struct PersistedAgentEvent {
635    /// Monotonic per-session index starting at 0. Unique within a
636    /// session; gaps never happen even under load because the sink
637    /// owns the counter under a mutex.
638    pub index: u64,
639    /// Milliseconds since the Unix epoch, captured when the sink
640    /// received the event. Not the event's emission time — that
641    /// would require threading a clock through every emit site.
642    pub emitted_at_ms: i64,
643    /// Call-stack depth at the moment of emission, when the caller
644    /// can supply it. `None` for events emitted from a context where
645    /// the VM frame stack isn't available.
646    pub frame_depth: Option<u32>,
647    /// The raw event, flattened so `jq '.type'` works as expected.
648    #[serde(flatten)]
649    pub event: AgentEvent,
650}
651
652/// Append-only JSONL sink for a single session's event stream (#103).
653/// One writer per session; sinks rotate to a numbered suffix when a
654/// running file crosses `ROTATE_BYTES` (100 MB today — long chat
655/// sessions rarely exceed 5 MB, so rotation almost never fires).
656pub struct JsonlEventSink {
657    state: Mutex<JsonlEventSinkState>,
658    base_path: std::path::PathBuf,
659}
660
661struct JsonlEventSinkState {
662    writer: std::io::BufWriter<std::fs::File>,
663    index: u64,
664    bytes_written: u64,
665    rotation: u32,
666}
667
668impl JsonlEventSink {
669    /// Hard cap past which the current file rotates to a numbered
670    /// suffix (`event_log-000001.jsonl`). Chosen so long debugging
671    /// sessions don't produce unreadable multi-GB logs.
672    pub const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
673
674    /// Open a new sink writing to `base_path`. Creates parent dirs
675    /// if missing. Overwrites an existing file so each fresh session
676    /// starts from index 0.
677    pub fn open(base_path: impl Into<std::path::PathBuf>) -> std::io::Result<Arc<Self>> {
678        let base_path = base_path.into();
679        if let Some(parent) = base_path.parent() {
680            std::fs::create_dir_all(parent)?;
681        }
682        let file = std::fs::OpenOptions::new()
683            .create(true)
684            .truncate(true)
685            .write(true)
686            .open(&base_path)?;
687        Ok(Arc::new(Self {
688            state: Mutex::new(JsonlEventSinkState {
689                writer: std::io::BufWriter::new(file),
690                index: 0,
691                bytes_written: 0,
692                rotation: 0,
693            }),
694            base_path,
695        }))
696    }
697
698    /// Flush any buffered writes. Called on session shutdown; the
699    /// Drop impl calls this too but on early panic it may not run.
700    pub fn flush(&self) -> std::io::Result<()> {
701        use std::io::Write as _;
702        self.state
703            .lock()
704            .expect("jsonl sink mutex poisoned")
705            .writer
706            .flush()
707    }
708
709    /// Current event index — primarily for tests and the "how many
710    /// events are in this run" run-record summary.
711    pub fn event_count(&self) -> u64 {
712        self.state.lock().expect("jsonl sink mutex poisoned").index
713    }
714
715    fn rotate_if_needed(&self, state: &mut JsonlEventSinkState) -> std::io::Result<()> {
716        use std::io::Write as _;
717        if state.bytes_written < Self::ROTATE_BYTES {
718            return Ok(());
719        }
720        state.writer.flush()?;
721        state.rotation += 1;
722        let suffix = format!("-{:06}", state.rotation);
723        let rotated = self.base_path.with_file_name({
724            let stem = self
725                .base_path
726                .file_stem()
727                .and_then(|s| s.to_str())
728                .unwrap_or("event_log");
729            let ext = self
730                .base_path
731                .extension()
732                .and_then(|e| e.to_str())
733                .unwrap_or("jsonl");
734            format!("{stem}{suffix}.{ext}")
735        });
736        let file = std::fs::OpenOptions::new()
737            .create(true)
738            .truncate(true)
739            .write(true)
740            .open(&rotated)?;
741        state.writer = std::io::BufWriter::new(file);
742        state.bytes_written = 0;
743        Ok(())
744    }
745}
746
747/// Event-log-backed sink for a single session's agent event stream.
748/// Uses the generalized append-only event log when one is installed for
749/// the current VM thread and falls back to `JsonlEventSink` only for
750/// older env-driven workflows.
751pub struct EventLogSink {
752    log: Arc<AnyEventLog>,
753    topic: Topic,
754    session_id: String,
755}
756
757impl EventLogSink {
758    pub fn new(log: Arc<AnyEventLog>, session_id: impl Into<String>) -> Arc<Self> {
759        let session_id = session_id.into();
760        let topic = Topic::new(format!(
761            "observability.agent_events.{}",
762            crate::event_log::sanitize_topic_component(&session_id)
763        ))
764        .expect("session id should sanitize to a valid topic");
765        Arc::new(Self {
766            log,
767            topic,
768            session_id,
769        })
770    }
771}
772
773impl AgentEventSink for JsonlEventSink {
774    fn handle_event(&self, event: &AgentEvent) {
775        use std::io::Write as _;
776        let mut state = self.state.lock().expect("jsonl sink mutex poisoned");
777        let index = state.index;
778        state.index += 1;
779        let emitted_at_ms = std::time::SystemTime::now()
780            .duration_since(std::time::UNIX_EPOCH)
781            .map(|d| d.as_millis() as i64)
782            .unwrap_or(0);
783        let envelope = PersistedAgentEvent {
784            index,
785            emitted_at_ms,
786            frame_depth: None,
787            event: event.clone(),
788        };
789        if let Ok(line) = serde_json::to_string(&envelope) {
790            // One line, newline-terminated — JSON Lines spec.
791            // Errors here are swallowed on purpose; a failing write
792            // must never crash the agent loop, and the run record
793            // itself is a secondary artifact.
794            let _ = state.writer.write_all(line.as_bytes());
795            let _ = state.writer.write_all(b"\n");
796            state.bytes_written += line.len() as u64 + 1;
797            let _ = self.rotate_if_needed(&mut state);
798        }
799    }
800}
801
802impl AgentEventSink for EventLogSink {
803    fn handle_event(&self, event: &AgentEvent) {
804        let event_json = match serde_json::to_value(event) {
805            Ok(value) => value,
806            Err(_) => return,
807        };
808        let event_kind = event_json
809            .get("type")
810            .and_then(|value| value.as_str())
811            .unwrap_or("agent_event")
812            .to_string();
813        let payload = serde_json::json!({
814            "index_hint": now_ms(),
815            "session_id": self.session_id,
816            "event": event_json,
817        });
818        let mut headers = std::collections::BTreeMap::new();
819        headers.insert("session_id".to_string(), self.session_id.clone());
820        let log = self.log.clone();
821        let topic = self.topic.clone();
822        let record = EventLogRecord::new(event_kind, payload).with_headers(headers);
823        if let Ok(handle) = tokio::runtime::Handle::try_current() {
824            handle.spawn(async move {
825                let _ = log.append(&topic, record).await;
826            });
827        } else {
828            let _ = futures::executor::block_on(log.append(&topic, record));
829        }
830    }
831}
832
833impl Drop for JsonlEventSink {
834    fn drop(&mut self) {
835        if let Ok(mut state) = self.state.lock() {
836            use std::io::Write as _;
837            let _ = state.writer.flush();
838        }
839    }
840}
841
842/// Fan-out helper for composing multiple external sinks.
843pub struct MultiSink {
844    sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
845}
846
847impl MultiSink {
848    pub fn new() -> Self {
849        Self {
850            sinks: Mutex::new(Vec::new()),
851        }
852    }
853    pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
854        self.sinks.lock().expect("sink mutex poisoned").push(sink);
855    }
856    pub fn len(&self) -> usize {
857        self.sinks.lock().expect("sink mutex poisoned").len()
858    }
859    pub fn is_empty(&self) -> bool {
860        self.len() == 0
861    }
862}
863
864impl Default for MultiSink {
865    fn default() -> Self {
866        Self::new()
867    }
868}
869
870impl AgentEventSink for MultiSink {
871    fn handle_event(&self, event: &AgentEvent) {
872        // Deliberate: snapshot then release the lock before invoking sink
873        // callbacks. Sinks can re-enter the event system (e.g. a host
874        // sink that logs to another AgentEvent path), so holding the
875        // mutex across the callback would risk self-deadlock. Arc clones
876        // are refcount bumps — cheap.
877        let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
878        for sink in sinks {
879            sink.handle_event(event);
880        }
881    }
882}
883
884#[cfg(test)]
885#[derive(Clone)]
886struct RegisteredSink {
887    owner: std::thread::ThreadId,
888    sink: Arc<dyn AgentEventSink>,
889}
890
891#[cfg(not(test))]
892type RegisteredSink = Arc<dyn AgentEventSink>;
893
894type ExternalSinkRegistry = RwLock<HashMap<String, Vec<RegisteredSink>>>;
895
896fn external_sinks() -> &'static ExternalSinkRegistry {
897    static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
898    REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
899}
900
901pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
902    let session_id = session_id.into();
903    let mut reg = external_sinks().write().expect("sink registry poisoned");
904    #[cfg(test)]
905    let sink = RegisteredSink {
906        owner: std::thread::current().id(),
907        sink,
908    };
909    reg.entry(session_id).or_default().push(sink);
910}
911
912/// Remove all external sinks registered for `session_id`. Does NOT
913/// close the session itself — subscribers and transcript survive, so a
914/// later `agent_loop` call with the same id continues the conversation.
915pub fn clear_session_sinks(session_id: &str) {
916    #[cfg(test)]
917    {
918        let owner = std::thread::current().id();
919        let mut reg = external_sinks().write().expect("sink registry poisoned");
920        if let Some(sinks) = reg.get_mut(session_id) {
921            sinks.retain(|sink| sink.owner != owner);
922            if sinks.is_empty() {
923                reg.remove(session_id);
924            }
925        }
926    }
927    #[cfg(not(test))]
928    {
929        external_sinks()
930            .write()
931            .expect("sink registry poisoned")
932            .remove(session_id);
933    }
934}
935
936pub fn reset_all_sinks() {
937    #[cfg(test)]
938    {
939        let owner = std::thread::current().id();
940        let mut reg = external_sinks().write().expect("sink registry poisoned");
941        reg.retain(|_, sinks| {
942            sinks.retain(|sink| sink.owner != owner);
943            !sinks.is_empty()
944        });
945        crate::agent_sessions::reset_session_store();
946    }
947    #[cfg(not(test))]
948    {
949        external_sinks()
950            .write()
951            .expect("sink registry poisoned")
952            .clear();
953        crate::agent_sessions::reset_session_store();
954    }
955}
956
957/// Mirror externally-registered sinks from `source_session_id` onto
958/// `target_session_id` without moving ownership. Transports such as ACP
959/// register sinks on the outer prompt session before a script runs; scripts
960/// may then open a first-class agent transcript and route `agent_loop` events
961/// through that inner id. Mirroring keeps the transport subscribed to the
962/// in-run child transcript while preserving explicit session ids.
963pub fn mirror_session_sinks(source_session_id: &str, target_session_id: &str) {
964    if source_session_id.is_empty() || target_session_id.is_empty() {
965        return;
966    }
967    if source_session_id == target_session_id {
968        return;
969    }
970    let mut reg = external_sinks().write().expect("sink registry poisoned");
971    let Some(source_sinks) = reg.get(source_session_id).cloned() else {
972        return;
973    };
974    let target = reg.entry(target_session_id.to_string()).or_default();
975    #[cfg(test)]
976    {
977        for source in source_sinks {
978            let already_present = target
979                .iter()
980                .any(|existing| Arc::ptr_eq(&existing.sink, &source.sink));
981            if !already_present {
982                target.push(source);
983            }
984        }
985    }
986    #[cfg(not(test))]
987    {
988        for source in source_sinks {
989            let already_present = target.iter().any(|existing| Arc::ptr_eq(existing, &source));
990            if !already_present {
991                target.push(source);
992            }
993        }
994    }
995}
996
997/// Emit an event to external sinks registered for this session. Pipeline
998/// closure subscribers are NOT called by this function — the agent
999/// loop owns that path because it needs its async VM context.
1000pub fn emit_event(event: &AgentEvent) {
1001    let sinks: Vec<Arc<dyn AgentEventSink>> = {
1002        let reg = external_sinks().read().expect("sink registry poisoned");
1003        #[cfg(test)]
1004        {
1005            let owner = std::thread::current().id();
1006            reg.get(event.session_id())
1007                .map(|sinks| {
1008                    sinks
1009                        .iter()
1010                        .filter(|sink| sink.owner == owner)
1011                        .map(|sink| sink.sink.clone())
1012                        .collect()
1013                })
1014                .unwrap_or_default()
1015        }
1016        #[cfg(not(test))]
1017        {
1018            reg.get(event.session_id()).cloned().unwrap_or_default()
1019        }
1020    };
1021    for sink in sinks {
1022        sink.handle_event(event);
1023    }
1024}
1025
1026fn now_ms() -> i64 {
1027    std::time::SystemTime::now()
1028        .duration_since(std::time::UNIX_EPOCH)
1029        .map(|duration| duration.as_millis() as i64)
1030        .unwrap_or(0)
1031}
1032
1033pub fn session_external_sink_count(session_id: &str) -> usize {
1034    #[cfg(test)]
1035    {
1036        let owner = std::thread::current().id();
1037        return external_sinks()
1038            .read()
1039            .expect("sink registry poisoned")
1040            .get(session_id)
1041            .map(|sinks| sinks.iter().filter(|sink| sink.owner == owner).count())
1042            .unwrap_or(0);
1043    }
1044    #[cfg(not(test))]
1045    {
1046        external_sinks()
1047            .read()
1048            .expect("sink registry poisoned")
1049            .get(session_id)
1050            .map(|v| v.len())
1051            .unwrap_or(0)
1052    }
1053}
1054
1055pub fn session_closure_subscriber_count(session_id: &str) -> usize {
1056    crate::agent_sessions::subscriber_count(session_id)
1057}
1058
1059#[cfg(test)]
1060mod tests {
1061    use super::*;
1062    use std::sync::atomic::{AtomicUsize, Ordering};
1063
1064    struct CountingSink(Arc<AtomicUsize>);
1065    impl AgentEventSink for CountingSink {
1066        fn handle_event(&self, _event: &AgentEvent) {
1067            self.0.fetch_add(1, Ordering::SeqCst);
1068        }
1069    }
1070
1071    #[test]
1072    fn multi_sink_fans_out_in_order() {
1073        let multi = MultiSink::new();
1074        let a = Arc::new(AtomicUsize::new(0));
1075        let b = Arc::new(AtomicUsize::new(0));
1076        multi.push(Arc::new(CountingSink(a.clone())));
1077        multi.push(Arc::new(CountingSink(b.clone())));
1078        let event = AgentEvent::TurnStart {
1079            session_id: "s1".into(),
1080            iteration: 1,
1081        };
1082        multi.handle_event(&event);
1083        assert_eq!(a.load(Ordering::SeqCst), 1);
1084        assert_eq!(b.load(Ordering::SeqCst), 1);
1085    }
1086
1087    #[test]
1088    fn session_scoped_sink_routing() {
1089        reset_all_sinks();
1090        let a = Arc::new(AtomicUsize::new(0));
1091        let b = Arc::new(AtomicUsize::new(0));
1092        register_sink("session-a", Arc::new(CountingSink(a.clone())));
1093        register_sink("session-b", Arc::new(CountingSink(b.clone())));
1094        emit_event(&AgentEvent::TurnStart {
1095            session_id: "session-a".into(),
1096            iteration: 0,
1097        });
1098        assert_eq!(a.load(Ordering::SeqCst), 1);
1099        assert_eq!(b.load(Ordering::SeqCst), 0);
1100        emit_event(&AgentEvent::TurnEnd {
1101            session_id: "session-b".into(),
1102            iteration: 0,
1103            turn_info: serde_json::json!({}),
1104        });
1105        assert_eq!(a.load(Ordering::SeqCst), 1);
1106        assert_eq!(b.load(Ordering::SeqCst), 1);
1107        clear_session_sinks("session-a");
1108        assert_eq!(session_external_sink_count("session-a"), 0);
1109        assert_eq!(session_external_sink_count("session-b"), 1);
1110        reset_all_sinks();
1111    }
1112
1113    #[test]
1114    fn newly_opened_child_session_inherits_current_external_sinks() {
1115        reset_all_sinks();
1116        let delivered = Arc::new(AtomicUsize::new(0));
1117        register_sink("outer-session", Arc::new(CountingSink(delivered.clone())));
1118        {
1119            let _guard = crate::agent_sessions::enter_current_session("outer-session");
1120            let inner = crate::agent_sessions::open_or_create(None);
1121            assert_ne!(inner, "outer-session");
1122            emit_event(&AgentEvent::TurnStart {
1123                session_id: inner,
1124                iteration: 0,
1125            });
1126        }
1127        assert_eq!(delivered.load(Ordering::SeqCst), 1);
1128        reset_all_sinks();
1129    }
1130
1131    #[test]
1132    fn jsonl_sink_writes_monotonic_indices_and_timestamps() {
1133        use std::io::{BufRead, BufReader};
1134        let dir = std::env::temp_dir().join(format!("harn-event-log-{}", std::process::id()));
1135        std::fs::create_dir_all(&dir).unwrap();
1136        let path = dir.join("event_log.jsonl");
1137        let sink = JsonlEventSink::open(&path).unwrap();
1138        for i in 0..5 {
1139            sink.handle_event(&AgentEvent::TurnStart {
1140                session_id: "s".into(),
1141                iteration: i,
1142            });
1143        }
1144        assert_eq!(sink.event_count(), 5);
1145        sink.flush().unwrap();
1146
1147        // Read back + assert monotonic indices + non-decreasing timestamps.
1148        let file = std::fs::File::open(&path).unwrap();
1149        let mut last_idx: i64 = -1;
1150        let mut last_ts: i64 = 0;
1151        for line in BufReader::new(file).lines() {
1152            let line = line.unwrap();
1153            let val: serde_json::Value = serde_json::from_str(&line).unwrap();
1154            let idx = val["index"].as_i64().unwrap();
1155            let ts = val["emitted_at_ms"].as_i64().unwrap();
1156            assert_eq!(idx, last_idx + 1, "indices must be contiguous");
1157            assert!(ts >= last_ts, "timestamps must be non-decreasing");
1158            last_idx = idx;
1159            last_ts = ts;
1160            // Event payload flattened — type tag must survive.
1161            assert_eq!(val["type"], "turn_start");
1162        }
1163        assert_eq!(last_idx, 4);
1164        let _ = std::fs::remove_file(&path);
1165    }
1166
1167    #[test]
1168    fn judge_decision_round_trips_through_jsonl_sink() {
1169        use std::io::{BufRead, BufReader};
1170        let dir =
1171            std::env::temp_dir().join(format!("harn-judge-event-log-{}", uuid::Uuid::now_v7()));
1172        std::fs::create_dir_all(&dir).unwrap();
1173        let path = dir.join("event_log.jsonl");
1174        let sink = JsonlEventSink::open(&path).unwrap();
1175        sink.handle_event(&AgentEvent::JudgeDecision {
1176            session_id: "s".into(),
1177            iteration: 2,
1178            verdict: "continue".into(),
1179            reasoning: "needs a concrete next step".into(),
1180            next_step: Some("run the verifier".into()),
1181            judge_duration_ms: 17,
1182        });
1183        sink.flush().unwrap();
1184
1185        let file = std::fs::File::open(&path).unwrap();
1186        let line = BufReader::new(file).lines().next().unwrap().unwrap();
1187        let recovered: PersistedAgentEvent = serde_json::from_str(&line).unwrap();
1188        match recovered.event {
1189            AgentEvent::JudgeDecision {
1190                session_id,
1191                iteration,
1192                verdict,
1193                reasoning,
1194                next_step,
1195                judge_duration_ms,
1196            } => {
1197                assert_eq!(session_id, "s");
1198                assert_eq!(iteration, 2);
1199                assert_eq!(verdict, "continue");
1200                assert_eq!(reasoning, "needs a concrete next step");
1201                assert_eq!(next_step.as_deref(), Some("run the verifier"));
1202                assert_eq!(judge_duration_ms, 17);
1203            }
1204            other => panic!("expected JudgeDecision, got {other:?}"),
1205        }
1206        let value: serde_json::Value = serde_json::from_str(&line).unwrap();
1207        assert_eq!(value["type"], "judge_decision");
1208        let _ = std::fs::remove_file(&path);
1209        let _ = std::fs::remove_dir(&dir);
1210    }
1211
1212    #[test]
1213    fn tool_call_update_durations_serialize_when_present_and_skip_when_absent() {
1214        // Terminal update with both durations populated — both fields
1215        // appear in the JSON. Snake_case keys here because this is the
1216        // canonical AgentEvent shape; the ACP adapter renames to
1217        // camelCase separately.
1218        let terminal = AgentEvent::ToolCallUpdate {
1219            session_id: "s".into(),
1220            tool_call_id: "tc-1".into(),
1221            tool_name: "read".into(),
1222            status: ToolCallStatus::Completed,
1223            raw_output: None,
1224            error: None,
1225            duration_ms: Some(42),
1226            execution_duration_ms: Some(7),
1227            error_category: None,
1228            executor: None,
1229            parsing: None,
1230
1231            raw_input: None,
1232            raw_input_partial: None,
1233            audit: None,
1234        };
1235        let value = serde_json::to_value(&terminal).unwrap();
1236        assert_eq!(value["duration_ms"], serde_json::json!(42));
1237        assert_eq!(value["execution_duration_ms"], serde_json::json!(7));
1238
1239        // In-progress update with `None` for both — both keys must be
1240        // absent (not `null`) so older ACP clients that key off
1241        // presence don't see a misleading zero.
1242        let intermediate = AgentEvent::ToolCallUpdate {
1243            session_id: "s".into(),
1244            tool_call_id: "tc-1".into(),
1245            tool_name: "read".into(),
1246            status: ToolCallStatus::InProgress,
1247            raw_output: None,
1248            error: None,
1249            duration_ms: None,
1250            execution_duration_ms: None,
1251            error_category: None,
1252            executor: None,
1253            parsing: None,
1254
1255            raw_input: None,
1256            raw_input_partial: None,
1257            audit: None,
1258        };
1259        let value = serde_json::to_value(&intermediate).unwrap();
1260        let object = value.as_object().expect("update serializes as object");
1261        assert!(
1262            !object.contains_key("duration_ms"),
1263            "duration_ms must be omitted when None: {value}"
1264        );
1265        assert!(
1266            !object.contains_key("execution_duration_ms"),
1267            "execution_duration_ms must be omitted when None: {value}"
1268        );
1269    }
1270
1271    #[test]
1272    fn tool_call_update_deserializes_without_duration_fields_for_back_compat() {
1273        // Persisted event-log entries written before the fields existed
1274        // must still deserialize cleanly. The missing keys map to None.
1275        let raw = serde_json::json!({
1276            "type": "tool_call_update",
1277            "session_id": "s",
1278            "tool_call_id": "tc-1",
1279            "tool_name": "read",
1280            "status": "completed",
1281            "raw_output": null,
1282            "error": null,
1283        });
1284        let event: AgentEvent = serde_json::from_value(raw).expect("parses without duration keys");
1285        match event {
1286            AgentEvent::ToolCallUpdate {
1287                duration_ms,
1288                execution_duration_ms,
1289                ..
1290            } => {
1291                assert!(duration_ms.is_none());
1292                assert!(execution_duration_ms.is_none());
1293            }
1294            other => panic!("expected ToolCallUpdate, got {other:?}"),
1295        }
1296    }
1297
1298    #[test]
1299    fn tool_call_status_serde() {
1300        assert_eq!(
1301            serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
1302            "\"pending\""
1303        );
1304        assert_eq!(
1305            serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
1306            "\"in_progress\""
1307        );
1308        assert_eq!(
1309            serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
1310            "\"completed\""
1311        );
1312        assert_eq!(
1313            serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
1314            "\"failed\""
1315        );
1316    }
1317
1318    #[test]
1319    fn tool_call_error_category_serializes_as_snake_case() {
1320        let pairs = [
1321            (ToolCallErrorCategory::SchemaValidation, "schema_validation"),
1322            (ToolCallErrorCategory::ToolError, "tool_error"),
1323            (ToolCallErrorCategory::McpServerError, "mcp_server_error"),
1324            (ToolCallErrorCategory::HostBridgeError, "host_bridge_error"),
1325            (ToolCallErrorCategory::PermissionDenied, "permission_denied"),
1326            (ToolCallErrorCategory::RejectedLoop, "rejected_loop"),
1327            (ToolCallErrorCategory::ParseAborted, "parse_aborted"),
1328            (ToolCallErrorCategory::Timeout, "timeout"),
1329            (ToolCallErrorCategory::Network, "network"),
1330            (ToolCallErrorCategory::Cancelled, "cancelled"),
1331            (ToolCallErrorCategory::Unknown, "unknown"),
1332        ];
1333        for (variant, wire) in pairs {
1334            let encoded = serde_json::to_string(&variant).unwrap();
1335            assert_eq!(encoded, format!("\"{wire}\""));
1336            assert_eq!(variant.as_str(), wire);
1337            // Round-trip via deserialize so wire stability is enforced
1338            // both ways.
1339            let decoded: ToolCallErrorCategory = serde_json::from_str(&encoded).unwrap();
1340            assert_eq!(decoded, variant);
1341        }
1342    }
1343
1344    #[test]
1345    fn tool_executor_round_trips_with_adjacent_tag() {
1346        // Adjacent tagging keeps the wire shape uniform — every variant
1347        // is a JSON object with a `kind` discriminator. The ACP adapter
1348        // rewrites unit variants as bare strings; the on-disk event log
1349        // keeps the object shape so deserialize can recover the variant.
1350        for executor in [
1351            ToolExecutor::HarnBuiltin,
1352            ToolExecutor::HostBridge,
1353            ToolExecutor::McpServer {
1354                server_name: "linear".to_string(),
1355            },
1356            ToolExecutor::ProviderNative,
1357        ] {
1358            let json = serde_json::to_value(&executor).unwrap();
1359            let kind = json.get("kind").and_then(|v| v.as_str()).unwrap();
1360            match &executor {
1361                ToolExecutor::HarnBuiltin => assert_eq!(kind, "harn_builtin"),
1362                ToolExecutor::HostBridge => assert_eq!(kind, "host_bridge"),
1363                ToolExecutor::McpServer { server_name } => {
1364                    assert_eq!(kind, "mcp_server");
1365                    assert_eq!(json["server_name"], *server_name);
1366                }
1367                ToolExecutor::ProviderNative => assert_eq!(kind, "provider_native"),
1368            }
1369            let recovered: ToolExecutor = serde_json::from_value(json).unwrap();
1370            assert_eq!(recovered, executor);
1371        }
1372    }
1373
1374    #[test]
1375    fn tool_call_error_category_from_internal_collapses_transient_family() {
1376        use crate::value::ErrorCategory as Internal;
1377        assert_eq!(
1378            ToolCallErrorCategory::from_internal(&Internal::Timeout),
1379            ToolCallErrorCategory::Timeout
1380        );
1381        for net in [
1382            Internal::RateLimit,
1383            Internal::Overloaded,
1384            Internal::ServerError,
1385            Internal::TransientNetwork,
1386        ] {
1387            assert_eq!(
1388                ToolCallErrorCategory::from_internal(&net),
1389                ToolCallErrorCategory::Network,
1390                "{net:?} should map to Network",
1391            );
1392        }
1393        assert_eq!(
1394            ToolCallErrorCategory::from_internal(&Internal::SchemaValidation),
1395            ToolCallErrorCategory::SchemaValidation
1396        );
1397        assert_eq!(
1398            ToolCallErrorCategory::from_internal(&Internal::ToolError),
1399            ToolCallErrorCategory::ToolError
1400        );
1401        assert_eq!(
1402            ToolCallErrorCategory::from_internal(&Internal::ToolRejected),
1403            ToolCallErrorCategory::PermissionDenied
1404        );
1405        assert_eq!(
1406            ToolCallErrorCategory::from_internal(&Internal::Cancelled),
1407            ToolCallErrorCategory::Cancelled
1408        );
1409        for bridge in [
1410            Internal::Auth,
1411            Internal::EgressBlocked,
1412            Internal::NotFound,
1413            Internal::CircuitOpen,
1414            Internal::Generic,
1415        ] {
1416            assert_eq!(
1417                ToolCallErrorCategory::from_internal(&bridge),
1418                ToolCallErrorCategory::HostBridgeError,
1419                "{bridge:?} should map to HostBridgeError",
1420            );
1421        }
1422    }
1423
1424    #[test]
1425    fn tool_call_update_event_omits_error_category_when_none() {
1426        let event = AgentEvent::ToolCallUpdate {
1427            session_id: "s".into(),
1428            tool_call_id: "t".into(),
1429            tool_name: "read".into(),
1430            status: ToolCallStatus::Completed,
1431            raw_output: None,
1432            error: None,
1433            duration_ms: None,
1434            execution_duration_ms: None,
1435            error_category: None,
1436            executor: None,
1437            parsing: None,
1438
1439            raw_input: None,
1440            raw_input_partial: None,
1441            audit: None,
1442        };
1443        let v = serde_json::to_value(&event).unwrap();
1444        assert_eq!(v["type"], "tool_call_update");
1445        assert!(v.get("error_category").is_none());
1446    }
1447
1448    #[test]
1449    fn tool_call_update_event_serializes_error_category_when_set() {
1450        let event = AgentEvent::ToolCallUpdate {
1451            session_id: "s".into(),
1452            tool_call_id: "t".into(),
1453            tool_name: "read".into(),
1454            status: ToolCallStatus::Failed,
1455            raw_output: None,
1456            error: Some("missing required field".into()),
1457            duration_ms: None,
1458            execution_duration_ms: None,
1459            error_category: Some(ToolCallErrorCategory::SchemaValidation),
1460            executor: None,
1461            parsing: None,
1462
1463            raw_input: None,
1464            raw_input_partial: None,
1465            audit: None,
1466        };
1467        let v = serde_json::to_value(&event).unwrap();
1468        assert_eq!(v["error_category"], "schema_validation");
1469        assert_eq!(v["error"], "missing required field");
1470    }
1471
1472    #[test]
1473    fn tool_call_update_omits_executor_when_absent() {
1474        // `executor: None` must not appear in the serialized event so
1475        // the on-disk shape stays backward-compatible with replays
1476        // recorded before harn#691.
1477        let event = AgentEvent::ToolCallUpdate {
1478            session_id: "s".into(),
1479            tool_call_id: "tc-1".into(),
1480            tool_name: "read".into(),
1481            status: ToolCallStatus::Completed,
1482            raw_output: None,
1483            error: None,
1484            duration_ms: None,
1485            execution_duration_ms: None,
1486            error_category: None,
1487            executor: None,
1488            parsing: None,
1489
1490            raw_input: None,
1491            raw_input_partial: None,
1492            audit: None,
1493        };
1494        let json = serde_json::to_value(&event).unwrap();
1495        assert!(json.get("executor").is_none(), "got: {json}");
1496    }
1497
1498    #[test]
1499    fn worker_event_status_strings_cover_all_variants() {
1500        // Wire-level status strings flow into both bridge `worker_update`
1501        // payloads and ACP `session/update` notifications. Pinning every
1502        // variant here so a future addition can't silently land without
1503        // a docs/wire decision.
1504        assert_eq!(WorkerEvent::WorkerSpawned.as_status(), "running");
1505        assert_eq!(WorkerEvent::WorkerProgressed.as_status(), "progressed");
1506        assert_eq!(
1507            WorkerEvent::WorkerWaitingForInput.as_status(),
1508            "awaiting_input"
1509        );
1510        assert_eq!(WorkerEvent::WorkerCompleted.as_status(), "completed");
1511        assert_eq!(WorkerEvent::WorkerFailed.as_status(), "failed");
1512        assert_eq!(WorkerEvent::WorkerCancelled.as_status(), "cancelled");
1513
1514        for terminal in [
1515            WorkerEvent::WorkerCompleted,
1516            WorkerEvent::WorkerFailed,
1517            WorkerEvent::WorkerCancelled,
1518        ] {
1519            assert!(terminal.is_terminal(), "{terminal:?} should be terminal");
1520        }
1521        for non_terminal in [
1522            WorkerEvent::WorkerSpawned,
1523            WorkerEvent::WorkerProgressed,
1524            WorkerEvent::WorkerWaitingForInput,
1525        ] {
1526            assert!(
1527                !non_terminal.is_terminal(),
1528                "{non_terminal:?} should not be terminal"
1529            );
1530        }
1531    }
1532
1533    #[test]
1534    fn worker_update_event_routes_through_session_keyed_sink() {
1535        // Worker lifecycle events ride the same session-keyed
1536        // `AgentEventSink` registry as message and tool events. This
1537        // is the canonical path ACP and A2A subscribe to — gate it
1538        // here so a registry-routing regression breaks loudly.
1539        reset_all_sinks();
1540        let captured: Arc<Mutex<Vec<AgentEvent>>> = Arc::new(Mutex::new(Vec::new()));
1541        struct CapturingSink(Arc<Mutex<Vec<AgentEvent>>>);
1542        impl AgentEventSink for CapturingSink {
1543            fn handle_event(&self, event: &AgentEvent) {
1544                self.0
1545                    .lock()
1546                    .expect("captured sink mutex poisoned")
1547                    .push(event.clone());
1548            }
1549        }
1550        register_sink(
1551            "worker-session-1",
1552            Arc::new(CapturingSink(captured.clone())),
1553        );
1554        emit_event(&AgentEvent::WorkerUpdate {
1555            session_id: "worker-session-1".into(),
1556            worker_id: "worker_42".into(),
1557            worker_name: "review_captain".into(),
1558            worker_task: "review pr".into(),
1559            worker_mode: "delegated_stage".into(),
1560            event: WorkerEvent::WorkerWaitingForInput,
1561            status: WorkerEvent::WorkerWaitingForInput.as_status().to_string(),
1562            metadata: serde_json::json!({"awaiting_started_at": "0193..."}),
1563            audit: None,
1564        });
1565        // Other sessions don't receive cross-talk.
1566        emit_event(&AgentEvent::WorkerUpdate {
1567            session_id: "other-session".into(),
1568            worker_id: "w2".into(),
1569            worker_name: "n2".into(),
1570            worker_task: "t2".into(),
1571            worker_mode: "delegated_stage".into(),
1572            event: WorkerEvent::WorkerCompleted,
1573            status: "completed".into(),
1574            metadata: serde_json::json!({}),
1575            audit: None,
1576        });
1577        let received = captured.lock().unwrap().clone();
1578        assert_eq!(received.len(), 1, "got: {received:?}");
1579        match &received[0] {
1580            AgentEvent::WorkerUpdate {
1581                session_id,
1582                worker_id,
1583                event,
1584                status,
1585                ..
1586            } => {
1587                assert_eq!(session_id, "worker-session-1");
1588                assert_eq!(worker_id, "worker_42");
1589                assert_eq!(*event, WorkerEvent::WorkerWaitingForInput);
1590                assert_eq!(status, "awaiting_input");
1591            }
1592            other => panic!("expected WorkerUpdate, got {other:?}"),
1593        }
1594        reset_all_sinks();
1595    }
1596
1597    #[test]
1598    fn worker_update_event_serializes_to_canonical_shape() {
1599        // Persisted event-log entries flatten the AgentEvent envelope,
1600        // so the WorkerUpdate variant must serialize with a `type` of
1601        // `worker_update` and the worker fields directly on the
1602        // top-level object (matching the `#[serde(tag = "type", ...)]`
1603        // shape the rest of AgentEvent uses).
1604        let event = AgentEvent::WorkerUpdate {
1605            session_id: "s".into(),
1606            worker_id: "w".into(),
1607            worker_name: "n".into(),
1608            worker_task: "t".into(),
1609            worker_mode: "delegated_stage".into(),
1610            event: WorkerEvent::WorkerProgressed,
1611            status: "progressed".into(),
1612            metadata: serde_json::json!({"started_at": "0193..."}),
1613            audit: Some(serde_json::json!({"run_id": "run_x"})),
1614        };
1615        let value = serde_json::to_value(&event).unwrap();
1616        assert_eq!(value["type"], "worker_update");
1617        assert_eq!(value["session_id"], "s");
1618        assert_eq!(value["worker_id"], "w");
1619        assert_eq!(value["status"], "progressed");
1620        assert_eq!(value["audit"]["run_id"], "run_x");
1621
1622        // Round-trip: the persisted event log must deserialize the
1623        // canonical shape back into the typed variant so replay
1624        // tooling can re-derive lifecycle state offline.
1625        let recovered: AgentEvent = serde_json::from_value(value).unwrap();
1626        match recovered {
1627            AgentEvent::WorkerUpdate {
1628                event: recovered_event,
1629                ..
1630            } => assert_eq!(recovered_event, WorkerEvent::WorkerProgressed),
1631            other => panic!("expected WorkerUpdate, got {other:?}"),
1632        }
1633    }
1634
1635    #[test]
1636    fn tool_call_update_includes_executor_when_present() {
1637        let event = AgentEvent::ToolCallUpdate {
1638            session_id: "s".into(),
1639            tool_call_id: "tc-1".into(),
1640            tool_name: "read".into(),
1641            status: ToolCallStatus::Completed,
1642            raw_output: None,
1643            error: None,
1644            duration_ms: None,
1645            execution_duration_ms: None,
1646            error_category: None,
1647            executor: Some(ToolExecutor::McpServer {
1648                server_name: "github".into(),
1649            }),
1650            parsing: None,
1651
1652            raw_input: None,
1653            raw_input_partial: None,
1654            audit: None,
1655        };
1656        let json = serde_json::to_value(&event).unwrap();
1657        assert_eq!(json["executor"]["kind"], "mcp_server");
1658        assert_eq!(json["executor"]["server_name"], "github");
1659    }
1660
1661    #[test]
1662    fn tool_call_update_omits_audit_when_absent() {
1663        let event = AgentEvent::ToolCallUpdate {
1664            session_id: "s".into(),
1665            tool_call_id: "tc-1".into(),
1666            tool_name: "read".into(),
1667            status: ToolCallStatus::Completed,
1668            raw_output: None,
1669            error: None,
1670            duration_ms: None,
1671            execution_duration_ms: None,
1672            error_category: None,
1673            executor: None,
1674            parsing: None,
1675            raw_input: None,
1676            raw_input_partial: None,
1677            audit: None,
1678        };
1679        let json = serde_json::to_value(&event).unwrap();
1680        assert!(json.get("audit").is_none(), "got: {json}");
1681    }
1682
1683    #[test]
1684    fn tool_call_update_includes_audit_when_present() {
1685        let audit = MutationSessionRecord {
1686            session_id: "session_42".into(),
1687            run_id: Some("run_42".into()),
1688            mutation_scope: "apply_workspace".into(),
1689            execution_kind: Some("worker".into()),
1690            ..Default::default()
1691        };
1692        let event = AgentEvent::ToolCallUpdate {
1693            session_id: "s".into(),
1694            tool_call_id: "tc-1".into(),
1695            tool_name: "edit_file".into(),
1696            status: ToolCallStatus::Completed,
1697            raw_output: None,
1698            error: None,
1699            duration_ms: None,
1700            execution_duration_ms: None,
1701            error_category: None,
1702            executor: Some(ToolExecutor::HostBridge),
1703            parsing: None,
1704            raw_input: None,
1705            raw_input_partial: None,
1706            audit: Some(audit),
1707        };
1708        let json = serde_json::to_value(&event).unwrap();
1709        assert_eq!(json["audit"]["session_id"], "session_42");
1710        assert_eq!(json["audit"]["run_id"], "run_42");
1711        assert_eq!(json["audit"]["mutation_scope"], "apply_workspace");
1712        assert_eq!(json["audit"]["execution_kind"], "worker");
1713    }
1714
1715    #[test]
1716    fn tool_call_update_deserializes_without_audit_field_for_back_compat() {
1717        let raw = serde_json::json!({
1718            "type": "tool_call_update",
1719            "session_id": "s",
1720            "tool_call_id": "tc-1",
1721            "tool_name": "read",
1722            "status": "completed",
1723            "raw_output": null,
1724            "error": null,
1725        });
1726        let event: AgentEvent = serde_json::from_value(raw).expect("parses without audit key");
1727        match event {
1728            AgentEvent::ToolCallUpdate { audit, .. } => {
1729                assert!(audit.is_none());
1730            }
1731            other => panic!("expected ToolCallUpdate, got {other:?}"),
1732        }
1733    }
1734
1735    #[test]
1736    fn tool_call_audit_serializes_with_free_form_audit_payload() {
1737        // Middleware-attached metadata is intentionally free-form JSON
1738        // (A2A-style `metadata` extension slot). The wire format must
1739        // preserve nested dicts + lists verbatim so hosts can read
1740        // `summary`/`consent`/`layers`/etc. without per-field schema.
1741        let audit = serde_json::json!({
1742            "summary": "Searched codebase",
1743            "kind": "search",
1744            "consent": {"decision": "approved", "decided_by": "auto"},
1745            "layers": [{"name": "with_required_reason", "status": "ok"}],
1746        });
1747        let event = AgentEvent::ToolCallAudit {
1748            session_id: "s".into(),
1749            tool_call_id: "tc-1".into(),
1750            tool_name: "search_files".into(),
1751            audit: audit.clone(),
1752        };
1753        let json = serde_json::to_value(&event).unwrap();
1754        assert_eq!(json["type"], "tool_call_audit");
1755        assert_eq!(json["session_id"], "s");
1756        assert_eq!(json["tool_call_id"], "tc-1");
1757        assert_eq!(json["tool_name"], "search_files");
1758        assert_eq!(json["audit"], audit);
1759    }
1760
1761    #[test]
1762    fn tool_call_audit_session_id_routes_correctly() {
1763        let event = AgentEvent::ToolCallAudit {
1764            session_id: "abc".into(),
1765            tool_call_id: "tc".into(),
1766            tool_name: "read".into(),
1767            audit: serde_json::Value::Null,
1768        };
1769        assert_eq!(event.session_id(), "abc");
1770    }
1771}