Skip to main content

harn_vm/
agent_events.rs

1//! Agent event stream — the ACP-aligned observation surface for the
2//! agent loop.
3//!
4//! Every phase of the turn loop emits an `AgentEvent`. The canonical
5//! variants map 1:1 onto ACP `SessionUpdate` values; three internal
6//! variants (`TurnStart`, `TurnEnd`, `FeedbackInjected`) let pipelines
7//! react to loop milestones that don't have a direct ACP counterpart.
8//!
9//! There are two subscription paths, both keyed on session id so two
10//! concurrent sessions never cross-talk:
11//!
12//! 1. **External sinks** (`AgentEventSink` trait) — Rust-side consumers
13//!    like the harn-cli ACP server. Invoked synchronously by the loop.
14//!    Stored in a global `OnceLock<RwLock<HashMap<...>>>` here.
15//! 2. **Closure subscribers** — `.harn` closures registered via the
16//!    `agent_subscribe(session_id, callback)` host builtin. These live
17//!    on the session's `SessionState.subscribers` in
18//!    `crate::agent_sessions`, because sessions are the single source
19//!    of truth for session-scoped VM state.
20
21use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23
24use serde::{Deserialize, Serialize};
25
26use crate::composition::{CompositionChildCall, CompositionChildResult, CompositionRunEnvelope};
27use crate::event_log::{AnyEventLog, EventLog, LogEvent as EventLogRecord, Topic};
28use crate::orchestration::{HandoffArtifact, MutationSessionRecord};
29use crate::tool_annotations::ToolKind;
30
31/// One coalesced filesystem notification from a hostlib `fs_watch`
32/// subscription.
33#[derive(Clone, Debug, Serialize, Deserialize)]
34pub struct FsWatchEvent {
35    pub kind: String,
36    pub paths: Vec<String>,
37    pub relative_paths: Vec<String>,
38    pub raw_kind: String,
39    pub error: Option<String>,
40}
41
42/// Typed worker lifecycle events emitted by delegated/background agent
43/// execution. Bridge-facing worker updates still derive a string status
44/// from these variants, but the runtime no longer passes raw status
45/// strings around internally.
46///
47/// `Spawned`/`Completed`/`Failed`/`Cancelled` are the four terminal-or-start
48/// states. `Progressed` is fired on intermediate milestones (e.g. a
49/// retriggerable worker resuming from `awaiting_input`, or a workflow
50/// stage completing without ending the worker). `WaitingForInput` covers
51/// retriggerable workers that finish a cycle but stay alive pending the
52/// next host-supplied trigger payload.
53#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
54pub enum WorkerEvent {
55    WorkerSpawned,
56    WorkerProgressed,
57    WorkerWaitingForInput,
58    WorkerCompleted,
59    WorkerFailed,
60    WorkerCancelled,
61}
62
63impl WorkerEvent {
64    /// The full set of `WorkerEvent` variants in canonical order. Mirrors
65    /// the pattern used by `ToolCallStatus::ALL` so the protocol-artifact
66    /// dumper can enumerate worker status wire values without
67    /// special-casing each lifecycle event.
68    pub const ALL: [Self; 6] = [
69        Self::WorkerSpawned,
70        Self::WorkerProgressed,
71        Self::WorkerWaitingForInput,
72        Self::WorkerCompleted,
73        Self::WorkerFailed,
74        Self::WorkerCancelled,
75    ];
76
77    /// Wire-level status string used by bridge `worker_update` payloads
78    /// and ACP `worker_update` session updates. The four canonical
79    /// states are mirrored from harn's internal worker `status` field
80    /// (`running`/`completed`/`failed`/`cancelled`), and the two newer
81    /// lifecycle states pick names that don't collide with any existing
82    /// status string.
83    pub fn as_status(self) -> &'static str {
84        match self {
85            Self::WorkerSpawned => "running",
86            Self::WorkerProgressed => "progressed",
87            Self::WorkerWaitingForInput => "awaiting_input",
88            Self::WorkerCompleted => "completed",
89            Self::WorkerFailed => "failed",
90            Self::WorkerCancelled => "cancelled",
91        }
92    }
93
94    pub fn as_str(self) -> &'static str {
95        match self {
96            Self::WorkerSpawned => "WorkerSpawned",
97            Self::WorkerProgressed => "WorkerProgressed",
98            Self::WorkerWaitingForInput => "WorkerWaitingForInput",
99            Self::WorkerCompleted => "WorkerCompleted",
100            Self::WorkerFailed => "WorkerFailed",
101            Self::WorkerCancelled => "WorkerCancelled",
102        }
103    }
104
105    /// True for lifecycle events that mean the worker has reached a
106    /// final, non-resumable state. Retriggerable awaiting and
107    /// progressed milestones are *not* terminal — the worker keeps
108    /// running or is waiting for a trigger.
109    pub fn is_terminal(self) -> bool {
110        matches!(
111            self,
112            Self::WorkerCompleted | Self::WorkerFailed | Self::WorkerCancelled
113        )
114    }
115}
116
117/// Status of a tool call. Mirrors ACP's `toolCallStatus`.
118#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
119#[serde(rename_all = "snake_case")]
120pub enum ToolCallStatus {
121    /// Dispatched by the model but not yet started.
122    Pending,
123    /// Dispatch is actively running.
124    InProgress,
125    /// Finished successfully.
126    Completed,
127    /// Finished with an error.
128    Failed,
129}
130
131impl ToolCallStatus {
132    pub const ALL: [Self; 4] = [
133        Self::Pending,
134        Self::InProgress,
135        Self::Completed,
136        Self::Failed,
137    ];
138
139    pub fn as_str(self) -> &'static str {
140        match self {
141            Self::Pending => "pending",
142            Self::InProgress => "in_progress",
143            Self::Completed => "completed",
144            Self::Failed => "failed",
145        }
146    }
147}
148
149/// Wire-level classification of a `ToolCallUpdate` failure. Pairs with the
150/// human-readable `error` string so clients can render each failure type
151/// distinctly (e.g. surface a "permission denied" badge, or a different
152/// retry affordance for `network` vs `tool_error`). The enum is
153/// deliberately extensible — `unknown` is the default when the runtime
154/// could not classify a failure.
155#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
156#[serde(rename_all = "snake_case")]
157pub enum ToolCallErrorCategory {
158    /// Host-side validation rejected the args (missing required field,
159    /// invalid type, malformed JSON).
160    SchemaValidation,
161    /// The tool ran and returned an error result (e.g. `read_file` on a
162    /// missing path) — distinguished from a transport failure.
163    ToolError,
164    /// MCP transport / server-protocol error.
165    McpServerError,
166    /// Burin Swift host bridge returned an error during dispatch.
167    HostBridgeError,
168    /// `session/request_permission` denied by the client, or a policy
169    /// rule (static or dynamic) refused the call.
170    PermissionDenied,
171    /// The harn loop detector skipped this call because the same
172    /// (tool, args) pair repeated past the configured threshold.
173    RejectedLoop,
174    /// Streaming text candidate was detected (bare `name(` or
175    /// `<tool_call>` opener) but never resolved into a parseable call:
176    /// args parsed as malformed, the heredoc body broke, the tag closed
177    /// without a balanced expression, or the stream ended mid-call.
178    /// Used by the streaming candidate detector (harn#692) to retract a
179    /// `tool_call` candidate that turned out to be prose or syntactically
180    /// broken so clients can dismiss the in-flight chip.
181    ParseAborted,
182    /// The tool exceeded its time budget.
183    Timeout,
184    /// Transient network / rate-limited / 5xx provider failure.
185    Network,
186    /// The tool was cancelled (e.g. session aborted).
187    Cancelled,
188    /// Default when classification was not performed.
189    Unknown,
190}
191
192impl ToolCallErrorCategory {
193    pub const ALL: [Self; 11] = [
194        Self::SchemaValidation,
195        Self::ToolError,
196        Self::McpServerError,
197        Self::HostBridgeError,
198        Self::PermissionDenied,
199        Self::RejectedLoop,
200        Self::ParseAborted,
201        Self::Timeout,
202        Self::Network,
203        Self::Cancelled,
204        Self::Unknown,
205    ];
206
207    pub fn as_str(self) -> &'static str {
208        match self {
209            Self::SchemaValidation => "schema_validation",
210            Self::ToolError => "tool_error",
211            Self::McpServerError => "mcp_server_error",
212            Self::HostBridgeError => "host_bridge_error",
213            Self::PermissionDenied => "permission_denied",
214            Self::RejectedLoop => "rejected_loop",
215            Self::ParseAborted => "parse_aborted",
216            Self::Timeout => "timeout",
217            Self::Network => "network",
218            Self::Cancelled => "cancelled",
219            Self::Unknown => "unknown",
220        }
221    }
222
223    /// Map an internal `ErrorCategory` (used by the VM's `VmError`
224    /// classification) onto the wire enum. The internal taxonomy is
225    /// finer-grained — several transient categories collapse onto
226    /// `Network`, and the auth/quota family becomes `HostBridgeError`
227    /// because at the tool-dispatch boundary those errors come from
228    /// the bridge transport rather than the tool itself.
229    pub fn from_internal(category: &crate::value::ErrorCategory) -> Self {
230        use crate::value::ErrorCategory as Internal;
231        match category {
232            Internal::Timeout => Self::Timeout,
233            Internal::RateLimit
234            | Internal::Overloaded
235            | Internal::ServerError
236            | Internal::TransientNetwork => Self::Network,
237            Internal::SchemaValidation => Self::SchemaValidation,
238            Internal::ToolError => Self::ToolError,
239            Internal::ToolRejected => Self::PermissionDenied,
240            Internal::Cancelled => Self::Cancelled,
241            Internal::Auth
242            | Internal::EgressBlocked
243            | Internal::NotFound
244            | Internal::CircuitOpen
245            | Internal::BudgetExceeded
246            | Internal::Generic => Self::HostBridgeError,
247        }
248    }
249}
250
251/// Where a tool actually ran. Tags `ToolCallUpdate` so clients can render
252/// "via mcp:linear" / "via host bridge" badges, attribute latency by
253/// transport, and route errors to the right surface (harn#691).
254///
255/// On the wire this serializes adjacently-tagged so the `mcp_server`
256/// case carries the configured server name. The ACP adapter rewrites
257/// unit variants as bare strings (`"harn_builtin"`, `"host_bridge"`,
258/// `"provider_native"`) and the `McpServer` case as
259/// `{"kind": "mcp_server", "serverName": "..."}` to match the protocol's
260/// camelCase convention.
261#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
262#[serde(tag = "kind", rename_all = "snake_case")]
263pub enum ToolExecutor {
264    /// VM-stdlib (`read_file`, `write_file`, `exec`, `http_*`, `mcp_*`)
265    /// or any Harn-side handler closure registered in `tools_val`.
266    HarnBuiltin,
267    /// Capability provided by the host through `HostBridge.builtin_call`
268    /// (Swift-side IDE bridge, BurinApp, BurinCLI host shells).
269    HostBridge,
270    /// Tool dispatched against a configured MCP server. Detected by the
271    /// `_mcp_server` tag that `mcp_list_tools` injects on every tool
272    /// dict before the agent loop sees it.
273    McpServer { server_name: String },
274    /// Provider-side server-side tool execution — currently OpenAI
275    /// Responses-API server tools (e.g. native `tool_search`). The
276    /// runtime never dispatches these locally; the model returns the
277    /// already-executed result inline.
278    ProviderNative,
279}
280
281/// Events emitted by the agent loop. Some variants map 1:1 to ACP
282/// `sessionUpdate` variants; Harn-specific lifecycle events ride on the
283/// extension stream.
284#[derive(Clone, Debug, Serialize, Deserialize)]
285#[serde(tag = "type", rename_all = "snake_case")]
286pub enum AgentEvent {
287    AgentMessageChunk {
288        session_id: String,
289        content: String,
290    },
291    AgentThoughtChunk {
292        session_id: String,
293        content: String,
294    },
295    ToolCall {
296        session_id: String,
297        tool_call_id: String,
298        tool_name: String,
299        kind: Option<ToolKind>,
300        status: ToolCallStatus,
301        raw_input: serde_json::Value,
302        /// Set to `Some(true)` by the streaming candidate detector
303        /// (harn#692) when this event represents a tool-call shape
304        /// detected in the model's in-flight assistant text but whose
305        /// arguments have not finished parsing yet. Clients can render a
306        /// spinner / placeholder while the model writes the body. The
307        /// detector follows up with a `ToolCallUpdate { parsing: false,
308        /// .. }` carrying either `status: pending` (promoted) or
309        /// `status: failed` with `error_category: parse_aborted`.
310        /// `None` (the default) means "this is a normal post-parse tool
311        /// call, no candidate phase was active" so the on-disk shape
312        /// stays compatible with replays recorded before this field
313        /// existed.
314        #[serde(default, skip_serializing_if = "Option::is_none")]
315        parsing: Option<bool>,
316        /// Mutation-session audit context active when the tool was
317        /// dispatched (see harn#699). Hosts use it to group every tool
318        /// emission belonging to the same write-capable session.
319        #[serde(default, skip_serializing_if = "Option::is_none")]
320        audit: Option<MutationSessionRecord>,
321    },
322    ToolCallUpdate {
323        session_id: String,
324        tool_call_id: String,
325        tool_name: String,
326        status: ToolCallStatus,
327        raw_output: Option<serde_json::Value>,
328        error: Option<String>,
329        /// Wall-clock milliseconds from the parse-to-execution boundary
330        /// to the terminal `Completed`/`Failed` update. Includes the
331        /// time spent in any wrapping orchestration logic (loop checks,
332        /// post-tool hooks, microcompaction). Populated only on the
333        /// terminal update — `None` on intermediate `Pending` /
334        /// `InProgress` updates so clients can ignore the field until
335        /// it shows up.
336        #[serde(default, skip_serializing_if = "Option::is_none")]
337        duration_ms: Option<u64>,
338        /// Milliseconds spent in the actual host/builtin/MCP dispatch
339        /// call only (the inner `dispatch_tool_execution` window).
340        /// Populated only on the terminal update; `None` otherwise.
341        #[serde(default, skip_serializing_if = "Option::is_none")]
342        execution_duration_ms: Option<u64>,
343        /// Structured classification of the failure (when `status` is
344        /// `Failed`). Paired with `error` so clients can render each
345        /// category distinctly without parsing free-form strings. Always
346        /// `None` for non-Failed updates and serialized as
347        /// `errorCategory` in the ACP wire format.
348        #[serde(default, skip_serializing_if = "Option::is_none")]
349        error_category: Option<ToolCallErrorCategory>,
350        /// Where the tool actually ran. `None` only for events emitted
351        /// from sites that pre-date the dispatch decision (e.g. the
352        /// pending → in-progress transition the loop emits before the
353        /// dispatcher picks a backend).
354        #[serde(default, skip_serializing_if = "Option::is_none")]
355        executor: Option<ToolExecutor>,
356        /// Companion to `ToolCall.parsing` (harn#692). The streaming
357        /// candidate detector emits the *terminal* candidate event as a
358        /// `ToolCallUpdate` with `parsing: Some(false)` to retract the
359        /// in-flight `parsing: true` chip — either by promoting the
360        /// candidate (`status: pending`, populated `raw_output: None`,
361        /// `error: None`) or aborting it (`status: failed`,
362        /// `error_category: parse_aborted`). `None` means this update is
363        /// not part of a candidate-phase transition.
364        #[serde(default, skip_serializing_if = "Option::is_none")]
365        parsing: Option<bool>,
366        /// Best-effort partial parse of the streamed tool-call arguments.
367        /// Populated by the SSE transport on `Pending` updates as the
368        /// model streams `input_json_delta` (Anthropic) or
369        /// `tool_calls[].function.arguments` deltas (OpenAI). `None` on
370        /// terminal updates and on emissions from non-streaming paths
371        /// (#693). When the partial bytes are not yet parseable as JSON
372        /// the transport falls back to `raw_input_partial`.
373        #[serde(default, skip_serializing_if = "Option::is_none")]
374        raw_input: Option<serde_json::Value>,
375        /// Raw concatenated bytes of the streamed tool-call arguments
376        /// when a permissive parse failed (#693). Mutually exclusive
377        /// with `raw_input`: clients render whichever is present.
378        #[serde(default, skip_serializing_if = "Option::is_none")]
379        raw_input_partial: Option<String>,
380        /// Mutation-session audit context for the tool call. Carries the
381        /// same payload as on the paired `ToolCall` event so a host
382        /// processing a single update doesn't have to correlate against
383        /// the prior pending event.
384        #[serde(default, skip_serializing_if = "Option::is_none")]
385        audit: Option<MutationSessionRecord>,
386    },
387    Plan {
388        session_id: String,
389        plan: serde_json::Value,
390    },
391    ProgressReported {
392        session_id: String,
393        message: Option<String>,
394        entries: serde_json::Value,
395        replace: bool,
396        metadata: serde_json::Value,
397    },
398    TurnStart {
399        session_id: String,
400        iteration: usize,
401    },
402    TurnEnd {
403        session_id: String,
404        iteration: usize,
405        turn_info: serde_json::Value,
406    },
407    /// Emitted when a first-class agent session is explicitly closed by
408    /// `agent_session_close`. This gives event-log consumers a typed
409    /// terminal marker even when no final model turn runs.
410    SessionClosed {
411        session_id: String,
412        reason: String,
413        status: String,
414        metadata: serde_json::Value,
415    },
416    JudgeDecision {
417        session_id: String,
418        iteration: usize,
419        verdict: String,
420        reasoning: String,
421        next_step: Option<String>,
422        judge_duration_ms: u64,
423        #[serde(default, skip_serializing_if = "Option::is_none")]
424        trigger: Option<String>,
425    },
426    TypedCheckpoint {
427        session_id: String,
428        checkpoint: serde_json::Value,
429    },
430    FeedbackInjected {
431        session_id: String,
432        kind: String,
433        content: String,
434    },
435    /// Emitted when the agent loop exhausts `max_iterations` without any
436    /// explicit break condition firing. Distinct from a natural "done" or
437    /// a "stuck" nudge-exhaustion: this is strictly a budget cap.
438    BudgetExhausted {
439        session_id: String,
440        max_iterations: usize,
441    },
442    /// Emitted when the loop breaks because consecutive text-only turns
443    /// hit `max_nudges`. Parity with `BudgetExhausted` / `TurnEnd` for
444    /// hosts that key off agent-terminal events.
445    LoopStuck {
446        session_id: String,
447        max_nudges: usize,
448        last_iteration: usize,
449        tail_excerpt: String,
450    },
451    /// Emitted when the daemon idle-wait loop trips its watchdog because
452    /// every configured wake source returned `None` for N consecutive
453    /// attempts. Exists so a broken daemon doesn't hang the session
454    /// silently.
455    DaemonWatchdogTripped {
456        session_id: String,
457        attempts: usize,
458        elapsed_ms: u64,
459    },
460    /// Emitted when a skill is activated. Carries the match reason so
461    /// replayers can reconstruct *why* a given skill took effect at
462    /// this iteration.
463    SkillActivated {
464        session_id: String,
465        skill_name: String,
466        iteration: usize,
467        reason: String,
468    },
469    /// Emitted when a previously-active skill is deactivated because
470    /// the reassess phase no longer matches it.
471    SkillDeactivated {
472        session_id: String,
473        skill_name: String,
474        iteration: usize,
475    },
476    /// Emitted once per activation when the skill's `allowed_tools` filter
477    /// narrows the effective tool surface exposed to the model.
478    SkillScopeTools {
479        session_id: String,
480        skill_name: String,
481        allowed_tools: Vec<String>,
482    },
483    /// Emitted when a `tool_search` query is issued by the model. Carries
484    /// the raw query args, the configured strategy, and a `mode` tag
485    /// distinguishing the client-executed fallback (`"client"`) from
486    /// provider-native paths (`"anthropic"` / `"openai"`). Mirrors the
487    /// transcript event shape so hosts can render a search-in-progress
488    /// chip in real time — the replay path walks the transcript after
489    /// the turn, which is too late for live UX.
490    ToolSearchQuery {
491        session_id: String,
492        tool_use_id: String,
493        name: String,
494        query: serde_json::Value,
495        strategy: String,
496        mode: String,
497    },
498    /// Emitted when `tool_search` resolves — carries the list of tool
499    /// names newly promoted into the model's effective surface for the
500    /// next turn. Pair-emitted with `ToolSearchQuery` on every search.
501    ToolSearchResult {
502        session_id: String,
503        tool_use_id: String,
504        promoted: Vec<String>,
505        strategy: String,
506        mode: String,
507    },
508    TranscriptCompacted {
509        session_id: String,
510        mode: String,
511        strategy: String,
512        archived_messages: usize,
513        estimated_tokens_before: usize,
514        estimated_tokens_after: usize,
515        snapshot_asset_id: Option<String>,
516    },
517    Handoff {
518        session_id: String,
519        artifact_id: String,
520        handoff: Box<HandoffArtifact>,
521    },
522    FsWatch {
523        session_id: String,
524        subscription_id: String,
525        events: Vec<FsWatchEvent>,
526    },
527    /// Lifecycle update for a delegated/background worker. Carries the
528    /// canonical typed `event` variant alongside the worker's current
529    /// `status` string and the structured `metadata` payload that
530    /// `worker_bridge_metadata` builds (task, mode, timing, child
531    /// run/snapshot paths, audit-session, etc.). The `audit` field is
532    /// the same `MutationSessionRecord` JSON serialization carried on
533    /// the bridge wire so ACP/A2A consumers don't need to re-derive it.
534    ///
535    /// One-to-one with the bridge-side `worker_update` session-update
536    /// notification: ACP and A2A adapters subscribe to this variant
537    /// and translate it into their respective wire formats. The
538    /// `session_id` is the parent agent session that owns the worker
539    /// (i.e. the session whose VM spawned the worker), so a single
540    /// host stays subscribed to the same sink for both message and
541    /// worker traffic.
542    WorkerUpdate {
543        session_id: String,
544        worker_id: String,
545        worker_name: String,
546        worker_task: String,
547        worker_mode: String,
548        event: WorkerEvent,
549        status: String,
550        metadata: serde_json::Value,
551        audit: Option<serde_json::Value>,
552    },
553    /// A human-in-the-loop primitive (`ask_user`, `request_approval`,
554    /// `dual_control`, `escalate`) has just suspended the script and is
555    /// waiting on a response. Hosts that bridge the VM onto a remote
556    /// transport (ACP, A2A) translate this into a "paused / awaiting
557    /// input" wire signal so the client knows the task isn't stuck —
558    /// it's blocked on the human side. Pair-emitted with `HitlResolved`
559    /// when the waitpoint completes/cancels/times out.
560    HitlRequested {
561        session_id: String,
562        request_id: String,
563        kind: String,
564        payload: serde_json::Value,
565    },
566    /// Companion to `HitlRequested`: the waitpoint has resolved (either
567    /// a response arrived, the deadline elapsed, or the request was
568    /// cancelled). `outcome` is one of `"answered"`, `"timeout"`,
569    /// `"cancelled"`. Hosts use this to flip task state back to
570    /// `working` after an `input-required` pause.
571    HitlResolved {
572        session_id: String,
573        request_id: String,
574        kind: String,
575        outcome: String,
576    },
577    /// Emitted by the agent loop's adaptive iteration budget /
578    /// `loop_control` policy when a budget extension or early stop fires.
579    /// Generic enough to cover both shapes — `action` distinguishes them.
580    /// Carries the iteration the decision applied to, the previous /
581    /// resulting iteration limit, the policy reason string, and (for
582    /// stops) the loop status.
583    LoopControlDecision {
584        session_id: String,
585        iteration: usize,
586        action: String,
587        old_limit: usize,
588        new_limit: usize,
589        reason: String,
590        status: String,
591    },
592    /// Emitted when `agent_loop` detects adjacent repeated tool calls with
593    /// identical arguments. The warning payload avoids raw arguments by
594    /// default and carries digests so hosts can correlate repeats without
595    /// exposing potentially sensitive tool inputs.
596    AgentLoopStallWarning {
597        session_id: String,
598        warning: serde_json::Value,
599    },
600    /// Emitted when a `tool_caller` middleware (see std/llm/tool_middleware)
601    /// attaches structured audit metadata to a tool call — typically a
602    /// user-facing `summary`, a `description`, an ACP-style `kind`, an MCP
603    /// `hints` block, a `consent` decision, the per-layer `layers` log, or
604    /// free-form `metadata` keys (A2A-style extension slot).
605    ///
606    /// One-to-one with the underlying tool-call: hosts can join on
607    /// `tool_call_id` to render middleware-attached chips alongside the
608    /// existing `ToolCall` / `ToolCallUpdate` stream. The `audit` payload
609    /// is intentionally free-form JSON so middleware can carry whatever
610    /// shape the harness author chooses without needing protocol-level
611    /// changes per new middleware.
612    ToolCallAudit {
613        session_id: String,
614        tool_call_id: String,
615        tool_name: String,
616        audit: serde_json::Value,
617    },
618    /// Emitted by `std/cache::with_cache` (both the generic and LLM
619    /// forms) when a cached lookup returns a hit. Carries the
620    /// content-addressed key, the backend that served the value, and a
621    /// `metrics` block with the cost-moat receipts the persona value
622    /// ledger (harn-cloud#58) and crystallization receipts read:
623    /// `model_calls_avoided`, plus `tokens_saved` / `latency_saved_ms`
624    /// when the cached envelope carried `usage` / `latency_ms`.
625    CacheHit {
626        session_id: String,
627        key: String,
628        backend: String,
629        namespace: String,
630        payload: serde_json::Value,
631    },
632    /// Paired with `CacheHit`. Emitted on the miss path when the
633    /// fresh result is stored. `payload.metrics.compute_ms` carries
634    /// the wall-clock cost of the underlying computation, which
635    /// callers can feed back as `estimate.latency_saved_ms` on the
636    /// next hit.
637    CacheMiss {
638        session_id: String,
639        key: String,
640        backend: String,
641        namespace: String,
642        payload: serde_json::Value,
643    },
644    /// A language-neutral tool-composition snippet has started. The envelope
645    /// identifies the snippet and binding manifest hashes plus the side-effect
646    /// ceiling requested for the whole parent run.
647    CompositionStart {
648        session_id: String,
649        run: CompositionRunEnvelope,
650    },
651    /// A composition snippet is dispatching a child binding call. The child
652    /// remains visible as its own operation with annotations and policy context
653    /// instead of being hidden inside the parent composition blob.
654    CompositionChildCall {
655        session_id: String,
656        call: CompositionChildCall,
657    },
658    /// A child binding operation emitted a status/result update.
659    CompositionChildResult {
660        session_id: String,
661        result: CompositionChildResult,
662    },
663    /// A composition run finished successfully and carries stdout/stderr,
664    /// artifacts, and the structured result in the terminal envelope.
665    CompositionFinish {
666        session_id: String,
667        run: CompositionRunEnvelope,
668    },
669    /// A composition run failed before producing a successful terminal result.
670    /// The terminal envelope carries the failure category and optional error.
671    CompositionError {
672        session_id: String,
673        run: CompositionRunEnvelope,
674    },
675}
676
677impl AgentEvent {
678    pub fn session_id(&self) -> &str {
679        match self {
680            Self::AgentMessageChunk { session_id, .. }
681            | Self::AgentThoughtChunk { session_id, .. }
682            | Self::ToolCall { session_id, .. }
683            | Self::ToolCallUpdate { session_id, .. }
684            | Self::Plan { session_id, .. }
685            | Self::ProgressReported { session_id, .. }
686            | Self::TurnStart { session_id, .. }
687            | Self::TurnEnd { session_id, .. }
688            | Self::SessionClosed { session_id, .. }
689            | Self::JudgeDecision { session_id, .. }
690            | Self::TypedCheckpoint { session_id, .. }
691            | Self::FeedbackInjected { session_id, .. }
692            | Self::BudgetExhausted { session_id, .. }
693            | Self::LoopStuck { session_id, .. }
694            | Self::DaemonWatchdogTripped { session_id, .. }
695            | Self::SkillActivated { session_id, .. }
696            | Self::SkillDeactivated { session_id, .. }
697            | Self::SkillScopeTools { session_id, .. }
698            | Self::ToolSearchQuery { session_id, .. }
699            | Self::ToolSearchResult { session_id, .. }
700            | Self::TranscriptCompacted { session_id, .. }
701            | Self::Handoff { session_id, .. }
702            | Self::FsWatch { session_id, .. }
703            | Self::WorkerUpdate { session_id, .. }
704            | Self::HitlRequested { session_id, .. }
705            | Self::HitlResolved { session_id, .. }
706            | Self::LoopControlDecision { session_id, .. }
707            | Self::AgentLoopStallWarning { session_id, .. }
708            | Self::ToolCallAudit { session_id, .. }
709            | Self::CacheHit { session_id, .. }
710            | Self::CacheMiss { session_id, .. }
711            | Self::CompositionStart { session_id, .. }
712            | Self::CompositionChildCall { session_id, .. }
713            | Self::CompositionChildResult { session_id, .. }
714            | Self::CompositionFinish { session_id, .. }
715            | Self::CompositionError { session_id, .. } => session_id,
716        }
717    }
718}
719
720/// External consumers of the event stream (e.g. the harn-cli ACP server,
721/// which translates events into JSON-RPC notifications).
722pub trait AgentEventSink: Send + Sync {
723    fn handle_event(&self, event: &AgentEvent);
724}
725
726/// Envelope written to `event_log.jsonl` (#103). Wraps the raw
727/// `AgentEvent` with monotonic index + timestamp + frame depth so
728/// replay engines can reconstruct paused state at any event index,
729/// and scrubber UIs can bucket events by time. The envelope is the
730/// on-disk shape; the wire format for live consumers is still the
731/// raw `AgentEvent` so existing sinks don't churn.
732#[derive(Clone, Debug, Serialize, Deserialize)]
733pub struct PersistedAgentEvent {
734    /// Monotonic per-session index starting at 0. Unique within a
735    /// session; gaps never happen even under load because the sink
736    /// owns the counter under a mutex.
737    pub index: u64,
738    /// Milliseconds since the Unix epoch, captured when the sink
739    /// received the event. Not the event's emission time — that
740    /// would require threading a clock through every emit site.
741    pub emitted_at_ms: i64,
742    /// Call-stack depth at the moment of emission, when the caller
743    /// can supply it. `None` for events emitted from a context where
744    /// the VM frame stack isn't available.
745    pub frame_depth: Option<u32>,
746    /// The raw event, flattened so `jq '.type'` works as expected.
747    #[serde(flatten)]
748    pub event: AgentEvent,
749}
750
751/// Append-only JSONL sink for a single session's event stream (#103).
752/// One writer per session; sinks rotate to a numbered suffix when a
753/// running file crosses `ROTATE_BYTES` (100 MB today — long chat
754/// sessions rarely exceed 5 MB, so rotation almost never fires).
755pub struct JsonlEventSink {
756    state: Mutex<JsonlEventSinkState>,
757    base_path: std::path::PathBuf,
758}
759
760struct JsonlEventSinkState {
761    writer: std::io::BufWriter<std::fs::File>,
762    index: u64,
763    bytes_written: u64,
764    rotation: u32,
765}
766
767impl JsonlEventSink {
768    /// Hard cap past which the current file rotates to a numbered
769    /// suffix (`event_log-000001.jsonl`). Chosen so long debugging
770    /// sessions don't produce unreadable multi-GB logs.
771    pub const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
772
773    /// Open a new sink writing to `base_path`. Creates parent dirs
774    /// if missing. Overwrites an existing file so each fresh session
775    /// starts from index 0.
776    pub fn open(base_path: impl Into<std::path::PathBuf>) -> std::io::Result<Arc<Self>> {
777        let base_path = base_path.into();
778        if let Some(parent) = base_path.parent() {
779            std::fs::create_dir_all(parent)?;
780        }
781        let file = std::fs::OpenOptions::new()
782            .create(true)
783            .truncate(true)
784            .write(true)
785            .open(&base_path)?;
786        Ok(Arc::new(Self {
787            state: Mutex::new(JsonlEventSinkState {
788                writer: std::io::BufWriter::new(file),
789                index: 0,
790                bytes_written: 0,
791                rotation: 0,
792            }),
793            base_path,
794        }))
795    }
796
797    /// Flush any buffered writes. Called on session shutdown; the
798    /// Drop impl calls this too but on early panic it may not run.
799    pub fn flush(&self) -> std::io::Result<()> {
800        use std::io::Write as _;
801        self.state
802            .lock()
803            .expect("jsonl sink mutex poisoned")
804            .writer
805            .flush()
806    }
807
808    /// Current event index — primarily for tests and the "how many
809    /// events are in this run" run-record summary.
810    pub fn event_count(&self) -> u64 {
811        self.state.lock().expect("jsonl sink mutex poisoned").index
812    }
813
814    fn rotate_if_needed(&self, state: &mut JsonlEventSinkState) -> std::io::Result<()> {
815        use std::io::Write as _;
816        if state.bytes_written < Self::ROTATE_BYTES {
817            return Ok(());
818        }
819        state.writer.flush()?;
820        state.rotation += 1;
821        let suffix = format!("-{:06}", state.rotation);
822        let rotated = self.base_path.with_file_name({
823            let stem = self
824                .base_path
825                .file_stem()
826                .and_then(|s| s.to_str())
827                .unwrap_or("event_log");
828            let ext = self
829                .base_path
830                .extension()
831                .and_then(|e| e.to_str())
832                .unwrap_or("jsonl");
833            format!("{stem}{suffix}.{ext}")
834        });
835        let file = std::fs::OpenOptions::new()
836            .create(true)
837            .truncate(true)
838            .write(true)
839            .open(&rotated)?;
840        state.writer = std::io::BufWriter::new(file);
841        state.bytes_written = 0;
842        Ok(())
843    }
844}
845
846/// Event-log-backed sink for a single session's agent event stream.
847/// Uses the generalized append-only event log when one is installed for
848/// the current VM thread and falls back to `JsonlEventSink` only for
849/// older env-driven workflows.
850pub struct EventLogSink {
851    log: Arc<AnyEventLog>,
852    topic: Topic,
853    session_id: String,
854}
855
856impl EventLogSink {
857    pub fn new(log: Arc<AnyEventLog>, session_id: impl Into<String>) -> Arc<Self> {
858        let session_id = session_id.into();
859        let topic = Topic::new(format!(
860            "observability.agent_events.{}",
861            crate::event_log::sanitize_topic_component(&session_id)
862        ))
863        .expect("session id should sanitize to a valid topic");
864        Arc::new(Self {
865            log,
866            topic,
867            session_id,
868        })
869    }
870}
871
872impl AgentEventSink for JsonlEventSink {
873    fn handle_event(&self, event: &AgentEvent) {
874        use std::io::Write as _;
875        let mut state = self.state.lock().expect("jsonl sink mutex poisoned");
876        let index = state.index;
877        state.index += 1;
878        let emitted_at_ms = std::time::SystemTime::now()
879            .duration_since(std::time::UNIX_EPOCH)
880            .map(|d| d.as_millis() as i64)
881            .unwrap_or(0);
882        let envelope = PersistedAgentEvent {
883            index,
884            emitted_at_ms,
885            frame_depth: None,
886            event: event.clone(),
887        };
888        if let Ok(line) = serde_json::to_string(&envelope) {
889            // One line, newline-terminated — JSON Lines spec.
890            // Errors here are swallowed on purpose; a failing write
891            // must never crash the agent loop, and the run record
892            // itself is a secondary artifact.
893            let _ = state.writer.write_all(line.as_bytes());
894            let _ = state.writer.write_all(b"\n");
895            state.bytes_written += line.len() as u64 + 1;
896            let _ = self.rotate_if_needed(&mut state);
897        }
898    }
899}
900
901impl AgentEventSink for EventLogSink {
902    fn handle_event(&self, event: &AgentEvent) {
903        let event_json = match serde_json::to_value(event) {
904            Ok(value) => value,
905            Err(_) => return,
906        };
907        let event_kind = event_json
908            .get("type")
909            .and_then(|value| value.as_str())
910            .unwrap_or("agent_event")
911            .to_string();
912        let payload = serde_json::json!({
913            "index_hint": now_ms(),
914            "session_id": self.session_id,
915            "event": event_json,
916        });
917        let mut headers = std::collections::BTreeMap::new();
918        headers.insert("session_id".to_string(), self.session_id.clone());
919        let log = self.log.clone();
920        let topic = self.topic.clone();
921        let record = EventLogRecord::new(event_kind, payload).with_headers(headers);
922        if let Ok(handle) = tokio::runtime::Handle::try_current() {
923            handle.spawn(async move {
924                let _ = log.append(&topic, record).await;
925            });
926        } else {
927            let _ = futures::executor::block_on(log.append(&topic, record));
928        }
929    }
930}
931
932impl Drop for JsonlEventSink {
933    fn drop(&mut self) {
934        if let Ok(mut state) = self.state.lock() {
935            use std::io::Write as _;
936            let _ = state.writer.flush();
937        }
938    }
939}
940
941/// Fan-out helper for composing multiple external sinks.
942pub struct MultiSink {
943    sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
944}
945
946impl MultiSink {
947    pub fn new() -> Self {
948        Self {
949            sinks: Mutex::new(Vec::new()),
950        }
951    }
952    pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
953        self.sinks.lock().expect("sink mutex poisoned").push(sink);
954    }
955    pub fn len(&self) -> usize {
956        self.sinks.lock().expect("sink mutex poisoned").len()
957    }
958    pub fn is_empty(&self) -> bool {
959        self.len() == 0
960    }
961}
962
963impl Default for MultiSink {
964    fn default() -> Self {
965        Self::new()
966    }
967}
968
969impl AgentEventSink for MultiSink {
970    fn handle_event(&self, event: &AgentEvent) {
971        // Deliberate: snapshot then release the lock before invoking sink
972        // callbacks. Sinks can re-enter the event system (e.g. a host
973        // sink that logs to another AgentEvent path), so holding the
974        // mutex across the callback would risk self-deadlock. Arc clones
975        // are refcount bumps — cheap.
976        let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
977        for sink in sinks {
978            sink.handle_event(event);
979        }
980    }
981}
982
983#[cfg(test)]
984#[derive(Clone)]
985struct RegisteredSink {
986    owner: std::thread::ThreadId,
987    sink: Arc<dyn AgentEventSink>,
988}
989
990#[cfg(not(test))]
991type RegisteredSink = Arc<dyn AgentEventSink>;
992
993type ExternalSinkRegistry = RwLock<HashMap<String, Vec<RegisteredSink>>>;
994
995fn external_sinks() -> &'static ExternalSinkRegistry {
996    static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
997    REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
998}
999
1000pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
1001    let session_id = session_id.into();
1002    let mut reg = external_sinks().write().expect("sink registry poisoned");
1003    #[cfg(test)]
1004    let sink = RegisteredSink {
1005        owner: std::thread::current().id(),
1006        sink,
1007    };
1008    reg.entry(session_id).or_default().push(sink);
1009}
1010
1011/// Remove all external sinks registered for `session_id`. Does NOT
1012/// close the session itself — subscribers and transcript survive, so a
1013/// later `agent_loop` call with the same id continues the conversation.
1014pub fn clear_session_sinks(session_id: &str) {
1015    #[cfg(test)]
1016    {
1017        let owner = std::thread::current().id();
1018        let mut reg = external_sinks().write().expect("sink registry poisoned");
1019        if let Some(sinks) = reg.get_mut(session_id) {
1020            sinks.retain(|sink| sink.owner != owner);
1021            if sinks.is_empty() {
1022                reg.remove(session_id);
1023            }
1024        }
1025    }
1026    #[cfg(not(test))]
1027    {
1028        external_sinks()
1029            .write()
1030            .expect("sink registry poisoned")
1031            .remove(session_id);
1032    }
1033}
1034
1035pub fn reset_all_sinks() {
1036    #[cfg(test)]
1037    {
1038        let owner = std::thread::current().id();
1039        let mut reg = external_sinks().write().expect("sink registry poisoned");
1040        reg.retain(|_, sinks| {
1041            sinks.retain(|sink| sink.owner != owner);
1042            !sinks.is_empty()
1043        });
1044        crate::agent_sessions::reset_session_store();
1045    }
1046    #[cfg(not(test))]
1047    {
1048        external_sinks()
1049            .write()
1050            .expect("sink registry poisoned")
1051            .clear();
1052        crate::agent_sessions::reset_session_store();
1053    }
1054}
1055
1056/// Mirror externally-registered sinks from `source_session_id` onto
1057/// `target_session_id` without moving ownership. Transports such as ACP
1058/// register sinks on the outer prompt session before a script runs; scripts
1059/// may then open a first-class agent transcript and route `agent_loop` events
1060/// through that inner id. Mirroring keeps the transport subscribed to the
1061/// in-run child transcript while preserving explicit session ids.
1062pub fn mirror_session_sinks(source_session_id: &str, target_session_id: &str) {
1063    if source_session_id.is_empty() || target_session_id.is_empty() {
1064        return;
1065    }
1066    if source_session_id == target_session_id {
1067        return;
1068    }
1069    let mut reg = external_sinks().write().expect("sink registry poisoned");
1070    let Some(source_sinks) = reg.get(source_session_id).cloned() else {
1071        return;
1072    };
1073    let target = reg.entry(target_session_id.to_string()).or_default();
1074    #[cfg(test)]
1075    {
1076        for source in source_sinks {
1077            let already_present = target
1078                .iter()
1079                .any(|existing| Arc::ptr_eq(&existing.sink, &source.sink));
1080            if !already_present {
1081                target.push(source);
1082            }
1083        }
1084    }
1085    #[cfg(not(test))]
1086    {
1087        for source in source_sinks {
1088            let already_present = target.iter().any(|existing| Arc::ptr_eq(existing, &source));
1089            if !already_present {
1090                target.push(source);
1091            }
1092        }
1093    }
1094}
1095
1096/// Emit an event to external sinks registered for this session. Pipeline
1097/// closure subscribers are NOT called by this function — the agent
1098/// loop owns that path because it needs its async VM context.
1099pub fn emit_event(event: &AgentEvent) {
1100    let sinks: Vec<Arc<dyn AgentEventSink>> = {
1101        let reg = external_sinks().read().expect("sink registry poisoned");
1102        #[cfg(test)]
1103        {
1104            let owner = std::thread::current().id();
1105            reg.get(event.session_id())
1106                .map(|sinks| {
1107                    sinks
1108                        .iter()
1109                        .filter(|sink| sink.owner == owner)
1110                        .map(|sink| sink.sink.clone())
1111                        .collect()
1112                })
1113                .unwrap_or_default()
1114        }
1115        #[cfg(not(test))]
1116        {
1117            reg.get(event.session_id()).cloned().unwrap_or_default()
1118        }
1119    };
1120    for sink in sinks {
1121        sink.handle_event(event);
1122    }
1123}
1124
1125fn now_ms() -> i64 {
1126    std::time::SystemTime::now()
1127        .duration_since(std::time::UNIX_EPOCH)
1128        .map(|duration| duration.as_millis() as i64)
1129        .unwrap_or(0)
1130}
1131
1132pub fn session_external_sink_count(session_id: &str) -> usize {
1133    #[cfg(test)]
1134    {
1135        let owner = std::thread::current().id();
1136        return external_sinks()
1137            .read()
1138            .expect("sink registry poisoned")
1139            .get(session_id)
1140            .map(|sinks| sinks.iter().filter(|sink| sink.owner == owner).count())
1141            .unwrap_or(0);
1142    }
1143    #[cfg(not(test))]
1144    {
1145        external_sinks()
1146            .read()
1147            .expect("sink registry poisoned")
1148            .get(session_id)
1149            .map(|v| v.len())
1150            .unwrap_or(0)
1151    }
1152}
1153
1154pub fn session_closure_subscriber_count(session_id: &str) -> usize {
1155    crate::agent_sessions::subscriber_count(session_id)
1156}
1157
1158#[cfg(test)]
1159mod tests {
1160    use super::*;
1161    use std::sync::atomic::{AtomicUsize, Ordering};
1162
1163    struct CountingSink(Arc<AtomicUsize>);
1164    impl AgentEventSink for CountingSink {
1165        fn handle_event(&self, _event: &AgentEvent) {
1166            self.0.fetch_add(1, Ordering::SeqCst);
1167        }
1168    }
1169
1170    #[test]
1171    fn multi_sink_fans_out_in_order() {
1172        let multi = MultiSink::new();
1173        let a = Arc::new(AtomicUsize::new(0));
1174        let b = Arc::new(AtomicUsize::new(0));
1175        multi.push(Arc::new(CountingSink(a.clone())));
1176        multi.push(Arc::new(CountingSink(b.clone())));
1177        let event = AgentEvent::TurnStart {
1178            session_id: "s1".into(),
1179            iteration: 1,
1180        };
1181        multi.handle_event(&event);
1182        assert_eq!(a.load(Ordering::SeqCst), 1);
1183        assert_eq!(b.load(Ordering::SeqCst), 1);
1184    }
1185
1186    #[test]
1187    fn session_scoped_sink_routing() {
1188        reset_all_sinks();
1189        let a = Arc::new(AtomicUsize::new(0));
1190        let b = Arc::new(AtomicUsize::new(0));
1191        register_sink("session-a", Arc::new(CountingSink(a.clone())));
1192        register_sink("session-b", Arc::new(CountingSink(b.clone())));
1193        emit_event(&AgentEvent::TurnStart {
1194            session_id: "session-a".into(),
1195            iteration: 0,
1196        });
1197        assert_eq!(a.load(Ordering::SeqCst), 1);
1198        assert_eq!(b.load(Ordering::SeqCst), 0);
1199        emit_event(&AgentEvent::TurnEnd {
1200            session_id: "session-b".into(),
1201            iteration: 0,
1202            turn_info: serde_json::json!({}),
1203        });
1204        assert_eq!(a.load(Ordering::SeqCst), 1);
1205        assert_eq!(b.load(Ordering::SeqCst), 1);
1206        clear_session_sinks("session-a");
1207        assert_eq!(session_external_sink_count("session-a"), 0);
1208        assert_eq!(session_external_sink_count("session-b"), 1);
1209        reset_all_sinks();
1210    }
1211
1212    #[test]
1213    fn newly_opened_child_session_inherits_current_external_sinks() {
1214        reset_all_sinks();
1215        let delivered = Arc::new(AtomicUsize::new(0));
1216        register_sink("outer-session", Arc::new(CountingSink(delivered.clone())));
1217        {
1218            let _guard = crate::agent_sessions::enter_current_session("outer-session");
1219            let inner = crate::agent_sessions::open_or_create(None);
1220            assert_ne!(inner, "outer-session");
1221            emit_event(&AgentEvent::TurnStart {
1222                session_id: inner,
1223                iteration: 0,
1224            });
1225        }
1226        assert_eq!(delivered.load(Ordering::SeqCst), 1);
1227        reset_all_sinks();
1228    }
1229
1230    #[test]
1231    fn jsonl_sink_writes_monotonic_indices_and_timestamps() {
1232        use std::io::{BufRead, BufReader};
1233        let dir = std::env::temp_dir().join(format!("harn-event-log-{}", std::process::id()));
1234        std::fs::create_dir_all(&dir).unwrap();
1235        let path = dir.join("event_log.jsonl");
1236        let sink = JsonlEventSink::open(&path).unwrap();
1237        for i in 0..5 {
1238            sink.handle_event(&AgentEvent::TurnStart {
1239                session_id: "s".into(),
1240                iteration: i,
1241            });
1242        }
1243        assert_eq!(sink.event_count(), 5);
1244        sink.flush().unwrap();
1245
1246        // Read back + assert monotonic indices + non-decreasing timestamps.
1247        let file = std::fs::File::open(&path).unwrap();
1248        let mut last_idx: i64 = -1;
1249        let mut last_ts: i64 = 0;
1250        for line in BufReader::new(file).lines() {
1251            let line = line.unwrap();
1252            let val: serde_json::Value = serde_json::from_str(&line).unwrap();
1253            let idx = val["index"].as_i64().unwrap();
1254            let ts = val["emitted_at_ms"].as_i64().unwrap();
1255            assert_eq!(idx, last_idx + 1, "indices must be contiguous");
1256            assert!(ts >= last_ts, "timestamps must be non-decreasing");
1257            last_idx = idx;
1258            last_ts = ts;
1259            // Event payload flattened — type tag must survive.
1260            assert_eq!(val["type"], "turn_start");
1261        }
1262        assert_eq!(last_idx, 4);
1263        let _ = std::fs::remove_file(&path);
1264    }
1265
1266    #[test]
1267    fn judge_decision_round_trips_through_jsonl_sink() {
1268        use std::io::{BufRead, BufReader};
1269        let dir =
1270            std::env::temp_dir().join(format!("harn-judge-event-log-{}", uuid::Uuid::now_v7()));
1271        std::fs::create_dir_all(&dir).unwrap();
1272        let path = dir.join("event_log.jsonl");
1273        let sink = JsonlEventSink::open(&path).unwrap();
1274        sink.handle_event(&AgentEvent::JudgeDecision {
1275            session_id: "s".into(),
1276            iteration: 2,
1277            verdict: "continue".into(),
1278            reasoning: "needs a concrete next step".into(),
1279            next_step: Some("run the verifier".into()),
1280            judge_duration_ms: 17,
1281            trigger: Some("stalled".into()),
1282        });
1283        sink.flush().unwrap();
1284
1285        let file = std::fs::File::open(&path).unwrap();
1286        let line = BufReader::new(file).lines().next().unwrap().unwrap();
1287        let recovered: PersistedAgentEvent = serde_json::from_str(&line).unwrap();
1288        match recovered.event {
1289            AgentEvent::JudgeDecision {
1290                session_id,
1291                iteration,
1292                verdict,
1293                reasoning,
1294                next_step,
1295                judge_duration_ms,
1296                trigger,
1297            } => {
1298                assert_eq!(session_id, "s");
1299                assert_eq!(iteration, 2);
1300                assert_eq!(verdict, "continue");
1301                assert_eq!(reasoning, "needs a concrete next step");
1302                assert_eq!(next_step.as_deref(), Some("run the verifier"));
1303                assert_eq!(judge_duration_ms, 17);
1304                assert_eq!(trigger.as_deref(), Some("stalled"));
1305            }
1306            other => panic!("expected JudgeDecision, got {other:?}"),
1307        }
1308        let value: serde_json::Value = serde_json::from_str(&line).unwrap();
1309        assert_eq!(value["type"], "judge_decision");
1310        let _ = std::fs::remove_file(&path);
1311        let _ = std::fs::remove_dir(&dir);
1312    }
1313
1314    #[test]
1315    fn tool_call_update_durations_serialize_when_present_and_skip_when_absent() {
1316        // Terminal update with both durations populated — both fields
1317        // appear in the JSON. Snake_case keys here because this is the
1318        // canonical AgentEvent shape; the ACP adapter renames to
1319        // camelCase separately.
1320        let terminal = AgentEvent::ToolCallUpdate {
1321            session_id: "s".into(),
1322            tool_call_id: "tc-1".into(),
1323            tool_name: "read".into(),
1324            status: ToolCallStatus::Completed,
1325            raw_output: None,
1326            error: None,
1327            duration_ms: Some(42),
1328            execution_duration_ms: Some(7),
1329            error_category: None,
1330            executor: None,
1331            parsing: None,
1332
1333            raw_input: None,
1334            raw_input_partial: None,
1335            audit: None,
1336        };
1337        let value = serde_json::to_value(&terminal).unwrap();
1338        assert_eq!(value["duration_ms"], serde_json::json!(42));
1339        assert_eq!(value["execution_duration_ms"], serde_json::json!(7));
1340
1341        // In-progress update with `None` for both — both keys must be
1342        // absent (not `null`) so older ACP clients that key off
1343        // presence don't see a misleading zero.
1344        let intermediate = AgentEvent::ToolCallUpdate {
1345            session_id: "s".into(),
1346            tool_call_id: "tc-1".into(),
1347            tool_name: "read".into(),
1348            status: ToolCallStatus::InProgress,
1349            raw_output: None,
1350            error: None,
1351            duration_ms: None,
1352            execution_duration_ms: None,
1353            error_category: None,
1354            executor: None,
1355            parsing: None,
1356
1357            raw_input: None,
1358            raw_input_partial: None,
1359            audit: None,
1360        };
1361        let value = serde_json::to_value(&intermediate).unwrap();
1362        let object = value.as_object().expect("update serializes as object");
1363        assert!(
1364            !object.contains_key("duration_ms"),
1365            "duration_ms must be omitted when None: {value}"
1366        );
1367        assert!(
1368            !object.contains_key("execution_duration_ms"),
1369            "execution_duration_ms must be omitted when None: {value}"
1370        );
1371    }
1372
1373    #[test]
1374    fn tool_call_update_deserializes_without_duration_fields_for_back_compat() {
1375        // Persisted event-log entries written before the fields existed
1376        // must still deserialize cleanly. The missing keys map to None.
1377        let raw = serde_json::json!({
1378            "type": "tool_call_update",
1379            "session_id": "s",
1380            "tool_call_id": "tc-1",
1381            "tool_name": "read",
1382            "status": "completed",
1383            "raw_output": null,
1384            "error": null,
1385        });
1386        let event: AgentEvent = serde_json::from_value(raw).expect("parses without duration keys");
1387        match event {
1388            AgentEvent::ToolCallUpdate {
1389                duration_ms,
1390                execution_duration_ms,
1391                ..
1392            } => {
1393                assert!(duration_ms.is_none());
1394                assert!(execution_duration_ms.is_none());
1395            }
1396            other => panic!("expected ToolCallUpdate, got {other:?}"),
1397        }
1398    }
1399
1400    #[test]
1401    fn tool_call_status_serde() {
1402        assert_eq!(
1403            serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
1404            "\"pending\""
1405        );
1406        assert_eq!(
1407            serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
1408            "\"in_progress\""
1409        );
1410        assert_eq!(
1411            serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
1412            "\"completed\""
1413        );
1414        assert_eq!(
1415            serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
1416            "\"failed\""
1417        );
1418    }
1419
1420    #[test]
1421    fn tool_call_error_category_serializes_as_snake_case() {
1422        let pairs = [
1423            (ToolCallErrorCategory::SchemaValidation, "schema_validation"),
1424            (ToolCallErrorCategory::ToolError, "tool_error"),
1425            (ToolCallErrorCategory::McpServerError, "mcp_server_error"),
1426            (ToolCallErrorCategory::HostBridgeError, "host_bridge_error"),
1427            (ToolCallErrorCategory::PermissionDenied, "permission_denied"),
1428            (ToolCallErrorCategory::RejectedLoop, "rejected_loop"),
1429            (ToolCallErrorCategory::ParseAborted, "parse_aborted"),
1430            (ToolCallErrorCategory::Timeout, "timeout"),
1431            (ToolCallErrorCategory::Network, "network"),
1432            (ToolCallErrorCategory::Cancelled, "cancelled"),
1433            (ToolCallErrorCategory::Unknown, "unknown"),
1434        ];
1435        for (variant, wire) in pairs {
1436            let encoded = serde_json::to_string(&variant).unwrap();
1437            assert_eq!(encoded, format!("\"{wire}\""));
1438            assert_eq!(variant.as_str(), wire);
1439            // Round-trip via deserialize so wire stability is enforced
1440            // both ways.
1441            let decoded: ToolCallErrorCategory = serde_json::from_str(&encoded).unwrap();
1442            assert_eq!(decoded, variant);
1443        }
1444    }
1445
1446    #[test]
1447    fn tool_executor_round_trips_with_adjacent_tag() {
1448        // Adjacent tagging keeps the wire shape uniform — every variant
1449        // is a JSON object with a `kind` discriminator. The ACP adapter
1450        // rewrites unit variants as bare strings; the on-disk event log
1451        // keeps the object shape so deserialize can recover the variant.
1452        for executor in [
1453            ToolExecutor::HarnBuiltin,
1454            ToolExecutor::HostBridge,
1455            ToolExecutor::McpServer {
1456                server_name: "linear".to_string(),
1457            },
1458            ToolExecutor::ProviderNative,
1459        ] {
1460            let json = serde_json::to_value(&executor).unwrap();
1461            let kind = json.get("kind").and_then(|v| v.as_str()).unwrap();
1462            match &executor {
1463                ToolExecutor::HarnBuiltin => assert_eq!(kind, "harn_builtin"),
1464                ToolExecutor::HostBridge => assert_eq!(kind, "host_bridge"),
1465                ToolExecutor::McpServer { server_name } => {
1466                    assert_eq!(kind, "mcp_server");
1467                    assert_eq!(json["server_name"], *server_name);
1468                }
1469                ToolExecutor::ProviderNative => assert_eq!(kind, "provider_native"),
1470            }
1471            let recovered: ToolExecutor = serde_json::from_value(json).unwrap();
1472            assert_eq!(recovered, executor);
1473        }
1474    }
1475
1476    #[test]
1477    fn tool_call_error_category_from_internal_collapses_transient_family() {
1478        use crate::value::ErrorCategory as Internal;
1479        assert_eq!(
1480            ToolCallErrorCategory::from_internal(&Internal::Timeout),
1481            ToolCallErrorCategory::Timeout
1482        );
1483        for net in [
1484            Internal::RateLimit,
1485            Internal::Overloaded,
1486            Internal::ServerError,
1487            Internal::TransientNetwork,
1488        ] {
1489            assert_eq!(
1490                ToolCallErrorCategory::from_internal(&net),
1491                ToolCallErrorCategory::Network,
1492                "{net:?} should map to Network",
1493            );
1494        }
1495        assert_eq!(
1496            ToolCallErrorCategory::from_internal(&Internal::SchemaValidation),
1497            ToolCallErrorCategory::SchemaValidation
1498        );
1499        assert_eq!(
1500            ToolCallErrorCategory::from_internal(&Internal::ToolError),
1501            ToolCallErrorCategory::ToolError
1502        );
1503        assert_eq!(
1504            ToolCallErrorCategory::from_internal(&Internal::ToolRejected),
1505            ToolCallErrorCategory::PermissionDenied
1506        );
1507        assert_eq!(
1508            ToolCallErrorCategory::from_internal(&Internal::Cancelled),
1509            ToolCallErrorCategory::Cancelled
1510        );
1511        for bridge in [
1512            Internal::Auth,
1513            Internal::EgressBlocked,
1514            Internal::NotFound,
1515            Internal::CircuitOpen,
1516            Internal::Generic,
1517        ] {
1518            assert_eq!(
1519                ToolCallErrorCategory::from_internal(&bridge),
1520                ToolCallErrorCategory::HostBridgeError,
1521                "{bridge:?} should map to HostBridgeError",
1522            );
1523        }
1524    }
1525
1526    #[test]
1527    fn tool_call_update_event_omits_error_category_when_none() {
1528        let event = AgentEvent::ToolCallUpdate {
1529            session_id: "s".into(),
1530            tool_call_id: "t".into(),
1531            tool_name: "read".into(),
1532            status: ToolCallStatus::Completed,
1533            raw_output: None,
1534            error: None,
1535            duration_ms: None,
1536            execution_duration_ms: None,
1537            error_category: None,
1538            executor: None,
1539            parsing: None,
1540
1541            raw_input: None,
1542            raw_input_partial: None,
1543            audit: None,
1544        };
1545        let v = serde_json::to_value(&event).unwrap();
1546        assert_eq!(v["type"], "tool_call_update");
1547        assert!(v.get("error_category").is_none());
1548    }
1549
1550    #[test]
1551    fn tool_call_update_event_serializes_error_category_when_set() {
1552        let event = AgentEvent::ToolCallUpdate {
1553            session_id: "s".into(),
1554            tool_call_id: "t".into(),
1555            tool_name: "read".into(),
1556            status: ToolCallStatus::Failed,
1557            raw_output: None,
1558            error: Some("missing required field".into()),
1559            duration_ms: None,
1560            execution_duration_ms: None,
1561            error_category: Some(ToolCallErrorCategory::SchemaValidation),
1562            executor: None,
1563            parsing: None,
1564
1565            raw_input: None,
1566            raw_input_partial: None,
1567            audit: None,
1568        };
1569        let v = serde_json::to_value(&event).unwrap();
1570        assert_eq!(v["error_category"], "schema_validation");
1571        assert_eq!(v["error"], "missing required field");
1572    }
1573
1574    #[test]
1575    fn tool_call_update_omits_executor_when_absent() {
1576        // `executor: None` must not appear in the serialized event so
1577        // the on-disk shape stays backward-compatible with replays
1578        // recorded before harn#691.
1579        let event = AgentEvent::ToolCallUpdate {
1580            session_id: "s".into(),
1581            tool_call_id: "tc-1".into(),
1582            tool_name: "read".into(),
1583            status: ToolCallStatus::Completed,
1584            raw_output: None,
1585            error: None,
1586            duration_ms: None,
1587            execution_duration_ms: None,
1588            error_category: None,
1589            executor: None,
1590            parsing: None,
1591
1592            raw_input: None,
1593            raw_input_partial: None,
1594            audit: None,
1595        };
1596        let json = serde_json::to_value(&event).unwrap();
1597        assert!(json.get("executor").is_none(), "got: {json}");
1598    }
1599
1600    #[test]
1601    fn worker_event_status_strings_cover_all_variants() {
1602        // Wire-level status strings flow into both bridge `worker_update`
1603        // payloads and ACP `session/update` notifications. Pinning every
1604        // variant here so a future addition can't silently land without
1605        // a docs/wire decision.
1606        assert_eq!(WorkerEvent::WorkerSpawned.as_status(), "running");
1607        assert_eq!(WorkerEvent::WorkerProgressed.as_status(), "progressed");
1608        assert_eq!(
1609            WorkerEvent::WorkerWaitingForInput.as_status(),
1610            "awaiting_input"
1611        );
1612        assert_eq!(WorkerEvent::WorkerCompleted.as_status(), "completed");
1613        assert_eq!(WorkerEvent::WorkerFailed.as_status(), "failed");
1614        assert_eq!(WorkerEvent::WorkerCancelled.as_status(), "cancelled");
1615
1616        for terminal in [
1617            WorkerEvent::WorkerCompleted,
1618            WorkerEvent::WorkerFailed,
1619            WorkerEvent::WorkerCancelled,
1620        ] {
1621            assert!(terminal.is_terminal(), "{terminal:?} should be terminal");
1622        }
1623        for non_terminal in [
1624            WorkerEvent::WorkerSpawned,
1625            WorkerEvent::WorkerProgressed,
1626            WorkerEvent::WorkerWaitingForInput,
1627        ] {
1628            assert!(
1629                !non_terminal.is_terminal(),
1630                "{non_terminal:?} should not be terminal"
1631            );
1632        }
1633
1634        // `ALL` is the iteration order downstream protocol-artifact
1635        // dumpers walk; keep it in lockstep with the variant list so a
1636        // new event doesn't slip in without a wire-status decision.
1637        let collected: Vec<&'static str> = WorkerEvent::ALL
1638            .iter()
1639            .map(|event| event.as_status())
1640            .collect();
1641        assert_eq!(
1642            collected,
1643            vec![
1644                "running",
1645                "progressed",
1646                "awaiting_input",
1647                "completed",
1648                "failed",
1649                "cancelled",
1650            ]
1651        );
1652    }
1653
1654    #[test]
1655    fn worker_update_event_routes_through_session_keyed_sink() {
1656        // Worker lifecycle events ride the same session-keyed
1657        // `AgentEventSink` registry as message and tool events. This
1658        // is the canonical path ACP and A2A subscribe to — gate it
1659        // here so a registry-routing regression breaks loudly.
1660        reset_all_sinks();
1661        let captured: Arc<Mutex<Vec<AgentEvent>>> = Arc::new(Mutex::new(Vec::new()));
1662        struct CapturingSink(Arc<Mutex<Vec<AgentEvent>>>);
1663        impl AgentEventSink for CapturingSink {
1664            fn handle_event(&self, event: &AgentEvent) {
1665                self.0
1666                    .lock()
1667                    .expect("captured sink mutex poisoned")
1668                    .push(event.clone());
1669            }
1670        }
1671        register_sink(
1672            "worker-session-1",
1673            Arc::new(CapturingSink(captured.clone())),
1674        );
1675        emit_event(&AgentEvent::WorkerUpdate {
1676            session_id: "worker-session-1".into(),
1677            worker_id: "worker_42".into(),
1678            worker_name: "review_captain".into(),
1679            worker_task: "review pr".into(),
1680            worker_mode: "delegated_stage".into(),
1681            event: WorkerEvent::WorkerWaitingForInput,
1682            status: WorkerEvent::WorkerWaitingForInput.as_status().to_string(),
1683            metadata: serde_json::json!({"awaiting_started_at": "0193..."}),
1684            audit: None,
1685        });
1686        // Other sessions don't receive cross-talk.
1687        emit_event(&AgentEvent::WorkerUpdate {
1688            session_id: "other-session".into(),
1689            worker_id: "w2".into(),
1690            worker_name: "n2".into(),
1691            worker_task: "t2".into(),
1692            worker_mode: "delegated_stage".into(),
1693            event: WorkerEvent::WorkerCompleted,
1694            status: "completed".into(),
1695            metadata: serde_json::json!({}),
1696            audit: None,
1697        });
1698        let received = captured.lock().unwrap().clone();
1699        assert_eq!(received.len(), 1, "got: {received:?}");
1700        match &received[0] {
1701            AgentEvent::WorkerUpdate {
1702                session_id,
1703                worker_id,
1704                event,
1705                status,
1706                ..
1707            } => {
1708                assert_eq!(session_id, "worker-session-1");
1709                assert_eq!(worker_id, "worker_42");
1710                assert_eq!(*event, WorkerEvent::WorkerWaitingForInput);
1711                assert_eq!(status, "awaiting_input");
1712            }
1713            other => panic!("expected WorkerUpdate, got {other:?}"),
1714        }
1715        reset_all_sinks();
1716    }
1717
1718    #[test]
1719    fn worker_update_event_serializes_to_canonical_shape() {
1720        // Persisted event-log entries flatten the AgentEvent envelope,
1721        // so the WorkerUpdate variant must serialize with a `type` of
1722        // `worker_update` and the worker fields directly on the
1723        // top-level object (matching the `#[serde(tag = "type", ...)]`
1724        // shape the rest of AgentEvent uses).
1725        let event = AgentEvent::WorkerUpdate {
1726            session_id: "s".into(),
1727            worker_id: "w".into(),
1728            worker_name: "n".into(),
1729            worker_task: "t".into(),
1730            worker_mode: "delegated_stage".into(),
1731            event: WorkerEvent::WorkerProgressed,
1732            status: "progressed".into(),
1733            metadata: serde_json::json!({"started_at": "0193..."}),
1734            audit: Some(serde_json::json!({"run_id": "run_x"})),
1735        };
1736        let value = serde_json::to_value(&event).unwrap();
1737        assert_eq!(value["type"], "worker_update");
1738        assert_eq!(value["session_id"], "s");
1739        assert_eq!(value["worker_id"], "w");
1740        assert_eq!(value["status"], "progressed");
1741        assert_eq!(value["audit"]["run_id"], "run_x");
1742
1743        // Round-trip: the persisted event log must deserialize the
1744        // canonical shape back into the typed variant so replay
1745        // tooling can re-derive lifecycle state offline.
1746        let recovered: AgentEvent = serde_json::from_value(value).unwrap();
1747        match recovered {
1748            AgentEvent::WorkerUpdate {
1749                event: recovered_event,
1750                ..
1751            } => assert_eq!(recovered_event, WorkerEvent::WorkerProgressed),
1752            other => panic!("expected WorkerUpdate, got {other:?}"),
1753        }
1754    }
1755
1756    #[test]
1757    fn tool_call_update_includes_executor_when_present() {
1758        let event = AgentEvent::ToolCallUpdate {
1759            session_id: "s".into(),
1760            tool_call_id: "tc-1".into(),
1761            tool_name: "read".into(),
1762            status: ToolCallStatus::Completed,
1763            raw_output: None,
1764            error: None,
1765            duration_ms: None,
1766            execution_duration_ms: None,
1767            error_category: None,
1768            executor: Some(ToolExecutor::McpServer {
1769                server_name: "github".into(),
1770            }),
1771            parsing: None,
1772
1773            raw_input: None,
1774            raw_input_partial: None,
1775            audit: None,
1776        };
1777        let json = serde_json::to_value(&event).unwrap();
1778        assert_eq!(json["executor"]["kind"], "mcp_server");
1779        assert_eq!(json["executor"]["server_name"], "github");
1780    }
1781
1782    #[test]
1783    fn tool_call_update_omits_audit_when_absent() {
1784        let event = AgentEvent::ToolCallUpdate {
1785            session_id: "s".into(),
1786            tool_call_id: "tc-1".into(),
1787            tool_name: "read".into(),
1788            status: ToolCallStatus::Completed,
1789            raw_output: None,
1790            error: None,
1791            duration_ms: None,
1792            execution_duration_ms: None,
1793            error_category: None,
1794            executor: None,
1795            parsing: None,
1796            raw_input: None,
1797            raw_input_partial: None,
1798            audit: None,
1799        };
1800        let json = serde_json::to_value(&event).unwrap();
1801        assert!(json.get("audit").is_none(), "got: {json}");
1802    }
1803
1804    #[test]
1805    fn tool_call_update_includes_audit_when_present() {
1806        let audit = MutationSessionRecord {
1807            session_id: "session_42".into(),
1808            run_id: Some("run_42".into()),
1809            mutation_scope: "apply_workspace".into(),
1810            execution_kind: Some("worker".into()),
1811            ..Default::default()
1812        };
1813        let event = AgentEvent::ToolCallUpdate {
1814            session_id: "s".into(),
1815            tool_call_id: "tc-1".into(),
1816            tool_name: "edit_file".into(),
1817            status: ToolCallStatus::Completed,
1818            raw_output: None,
1819            error: None,
1820            duration_ms: None,
1821            execution_duration_ms: None,
1822            error_category: None,
1823            executor: Some(ToolExecutor::HostBridge),
1824            parsing: None,
1825            raw_input: None,
1826            raw_input_partial: None,
1827            audit: Some(audit),
1828        };
1829        let json = serde_json::to_value(&event).unwrap();
1830        assert_eq!(json["audit"]["session_id"], "session_42");
1831        assert_eq!(json["audit"]["run_id"], "run_42");
1832        assert_eq!(json["audit"]["mutation_scope"], "apply_workspace");
1833        assert_eq!(json["audit"]["execution_kind"], "worker");
1834    }
1835
1836    #[test]
1837    fn tool_call_update_deserializes_without_audit_field_for_back_compat() {
1838        let raw = serde_json::json!({
1839            "type": "tool_call_update",
1840            "session_id": "s",
1841            "tool_call_id": "tc-1",
1842            "tool_name": "read",
1843            "status": "completed",
1844            "raw_output": null,
1845            "error": null,
1846        });
1847        let event: AgentEvent = serde_json::from_value(raw).expect("parses without audit key");
1848        match event {
1849            AgentEvent::ToolCallUpdate { audit, .. } => {
1850                assert!(audit.is_none());
1851            }
1852            other => panic!("expected ToolCallUpdate, got {other:?}"),
1853        }
1854    }
1855
1856    #[test]
1857    fn tool_call_audit_serializes_with_free_form_audit_payload() {
1858        // Middleware-attached metadata is intentionally free-form JSON
1859        // (A2A-style `metadata` extension slot). The wire format must
1860        // preserve nested dicts + lists verbatim so hosts can read
1861        // `summary`/`consent`/`layers`/etc. without per-field schema.
1862        let audit = serde_json::json!({
1863            "summary": "Searched codebase",
1864            "kind": "search",
1865            "consent": {"decision": "approved", "decided_by": "auto"},
1866            "layers": [{"name": "with_required_reason", "status": "ok"}],
1867        });
1868        let event = AgentEvent::ToolCallAudit {
1869            session_id: "s".into(),
1870            tool_call_id: "tc-1".into(),
1871            tool_name: "search_files".into(),
1872            audit: audit.clone(),
1873        };
1874        let json = serde_json::to_value(&event).unwrap();
1875        assert_eq!(json["type"], "tool_call_audit");
1876        assert_eq!(json["session_id"], "s");
1877        assert_eq!(json["tool_call_id"], "tc-1");
1878        assert_eq!(json["tool_name"], "search_files");
1879        assert_eq!(json["audit"], audit);
1880    }
1881
1882    #[test]
1883    fn tool_call_audit_session_id_routes_correctly() {
1884        let event = AgentEvent::ToolCallAudit {
1885            session_id: "abc".into(),
1886            tool_call_id: "tc".into(),
1887            tool_name: "read".into(),
1888            audit: serde_json::Value::Null,
1889        };
1890        assert_eq!(event.session_id(), "abc");
1891    }
1892}