Skip to main content

harn_vm/
agent_events.rs

1//! Agent event stream — the ACP-aligned observation surface for the
2//! agent loop.
3//!
4//! Every phase of the turn loop emits an `AgentEvent`. The canonical
5//! variants map 1:1 onto ACP `SessionUpdate` values; three internal
6//! variants (`TurnStart`, `TurnEnd`, `FeedbackInjected`) let pipelines
7//! react to loop milestones that don't have a direct ACP counterpart.
8//!
9//! There are two subscription paths, both keyed on session id so two
10//! concurrent sessions never cross-talk:
11//!
12//! 1. **External sinks** (`AgentEventSink` trait) — Rust-side consumers
13//!    like the harn-cli ACP server. Invoked synchronously by the loop.
14//!    Stored in a global `OnceLock<RwLock<HashMap<...>>>` here.
15//! 2. **Closure subscribers** — `.harn` closures registered via the
16//!    `agent_subscribe(session_id, callback)` host builtin. These live
17//!    on the session's `SessionState.subscribers` in
18//!    `crate::agent_sessions`, because sessions are the single source
19//!    of truth for session-scoped VM state.
20
21use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23
24use serde::{Deserialize, Serialize};
25
26use crate::event_log::{AnyEventLog, EventLog, LogEvent as EventLogRecord, Topic};
27use crate::orchestration::{HandoffArtifact, MutationSessionRecord};
28use crate::tool_annotations::ToolKind;
29
30/// One coalesced filesystem notification from a hostlib `fs_watch`
31/// subscription.
32#[derive(Clone, Debug, Serialize, Deserialize)]
33pub struct FsWatchEvent {
34    pub kind: String,
35    pub paths: Vec<String>,
36    pub relative_paths: Vec<String>,
37    pub raw_kind: String,
38    pub error: Option<String>,
39}
40
41/// Typed worker lifecycle events emitted by delegated/background agent
42/// execution. Bridge-facing worker updates still derive a string status
43/// from these variants, but the runtime no longer passes raw status
44/// strings around internally.
45///
46/// `Spawned`/`Completed`/`Failed`/`Cancelled` are the four terminal-or-start
47/// states. `Progressed` is fired on intermediate milestones (e.g. a
48/// retriggerable worker resuming from `awaiting_input`, or a workflow
49/// stage completing without ending the worker). `WaitingForInput` covers
50/// retriggerable workers that finish a cycle but stay alive pending the
51/// next host-supplied trigger payload.
52#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
53pub enum WorkerEvent {
54    WorkerSpawned,
55    WorkerProgressed,
56    WorkerWaitingForInput,
57    WorkerCompleted,
58    WorkerFailed,
59    WorkerCancelled,
60}
61
62impl WorkerEvent {
63    /// Wire-level status string used by bridge `worker_update` payloads
64    /// and ACP `worker_update` session updates. The four canonical
65    /// states are mirrored from harn's internal worker `status` field
66    /// (`running`/`completed`/`failed`/`cancelled`), and the two newer
67    /// lifecycle states pick names that don't collide with any existing
68    /// status string.
69    pub fn as_status(self) -> &'static str {
70        match self {
71            Self::WorkerSpawned => "running",
72            Self::WorkerProgressed => "progressed",
73            Self::WorkerWaitingForInput => "awaiting_input",
74            Self::WorkerCompleted => "completed",
75            Self::WorkerFailed => "failed",
76            Self::WorkerCancelled => "cancelled",
77        }
78    }
79
80    pub fn as_str(self) -> &'static str {
81        match self {
82            Self::WorkerSpawned => "WorkerSpawned",
83            Self::WorkerProgressed => "WorkerProgressed",
84            Self::WorkerWaitingForInput => "WorkerWaitingForInput",
85            Self::WorkerCompleted => "WorkerCompleted",
86            Self::WorkerFailed => "WorkerFailed",
87            Self::WorkerCancelled => "WorkerCancelled",
88        }
89    }
90
91    /// True for lifecycle events that mean the worker has reached a
92    /// final, non-resumable state. Retriggerable awaiting and
93    /// progressed milestones are *not* terminal — the worker keeps
94    /// running or is waiting for a trigger.
95    pub fn is_terminal(self) -> bool {
96        matches!(
97            self,
98            Self::WorkerCompleted | Self::WorkerFailed | Self::WorkerCancelled
99        )
100    }
101}
102
103/// Status of a tool call. Mirrors ACP's `toolCallStatus`.
104#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
105#[serde(rename_all = "snake_case")]
106pub enum ToolCallStatus {
107    /// Dispatched by the model but not yet started.
108    Pending,
109    /// Dispatch is actively running.
110    InProgress,
111    /// Finished successfully.
112    Completed,
113    /// Finished with an error.
114    Failed,
115}
116
117/// Wire-level classification of a `ToolCallUpdate` failure. Pairs with the
118/// human-readable `error` string so clients can render each failure type
119/// distinctly (e.g. surface a "permission denied" badge, or a different
120/// retry affordance for `network` vs `tool_error`). The enum is
121/// deliberately extensible — `unknown` is the default when the runtime
122/// could not classify a failure.
123#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
124#[serde(rename_all = "snake_case")]
125pub enum ToolCallErrorCategory {
126    /// Host-side validation rejected the args (missing required field,
127    /// invalid type, malformed JSON).
128    SchemaValidation,
129    /// The tool ran and returned an error result (e.g. `read_file` on a
130    /// missing path) — distinguished from a transport failure.
131    ToolError,
132    /// MCP transport / server-protocol error.
133    McpServerError,
134    /// Burin Swift host bridge returned an error during dispatch.
135    HostBridgeError,
136    /// `session/request_permission` denied by the client, or a policy
137    /// rule (static or dynamic) refused the call.
138    PermissionDenied,
139    /// The harn loop detector skipped this call because the same
140    /// (tool, args) pair repeated past the configured threshold.
141    RejectedLoop,
142    /// Streaming text candidate was detected (bare `name(` or
143    /// `<tool_call>` opener) but never resolved into a parseable call:
144    /// args parsed as malformed, the heredoc body broke, the tag closed
145    /// without a balanced expression, or the stream ended mid-call.
146    /// Used by the streaming candidate detector (harn#692) to retract a
147    /// `tool_call` candidate that turned out to be prose or syntactically
148    /// broken so clients can dismiss the in-flight chip.
149    ParseAborted,
150    /// The tool exceeded its time budget.
151    Timeout,
152    /// Transient network / rate-limited / 5xx provider failure.
153    Network,
154    /// The tool was cancelled (e.g. session aborted).
155    Cancelled,
156    /// Default when classification was not performed.
157    Unknown,
158}
159
160impl ToolCallErrorCategory {
161    pub fn as_str(self) -> &'static str {
162        match self {
163            Self::SchemaValidation => "schema_validation",
164            Self::ToolError => "tool_error",
165            Self::McpServerError => "mcp_server_error",
166            Self::HostBridgeError => "host_bridge_error",
167            Self::PermissionDenied => "permission_denied",
168            Self::RejectedLoop => "rejected_loop",
169            Self::ParseAborted => "parse_aborted",
170            Self::Timeout => "timeout",
171            Self::Network => "network",
172            Self::Cancelled => "cancelled",
173            Self::Unknown => "unknown",
174        }
175    }
176
177    /// Map an internal `ErrorCategory` (used by the VM's `VmError`
178    /// classification) onto the wire enum. The internal taxonomy is
179    /// finer-grained — several transient categories collapse onto
180    /// `Network`, and the auth/quota family becomes `HostBridgeError`
181    /// because at the tool-dispatch boundary those errors come from
182    /// the bridge transport rather than the tool itself.
183    pub fn from_internal(category: &crate::value::ErrorCategory) -> Self {
184        use crate::value::ErrorCategory as Internal;
185        match category {
186            Internal::Timeout => Self::Timeout,
187            Internal::RateLimit
188            | Internal::Overloaded
189            | Internal::ServerError
190            | Internal::TransientNetwork => Self::Network,
191            Internal::SchemaValidation => Self::SchemaValidation,
192            Internal::ToolError => Self::ToolError,
193            Internal::ToolRejected => Self::PermissionDenied,
194            Internal::Cancelled => Self::Cancelled,
195            Internal::Auth
196            | Internal::EgressBlocked
197            | Internal::NotFound
198            | Internal::CircuitOpen
199            | Internal::BudgetExceeded
200            | Internal::Generic => Self::HostBridgeError,
201        }
202    }
203}
204
205/// Where a tool actually ran. Tags `ToolCallUpdate` so clients can render
206/// "via mcp:linear" / "via host bridge" badges, attribute latency by
207/// transport, and route errors to the right surface (harn#691).
208///
209/// On the wire this serializes adjacently-tagged so the `mcp_server`
210/// case carries the configured server name. The ACP adapter rewrites
211/// unit variants as bare strings (`"harn_builtin"`, `"host_bridge"`,
212/// `"provider_native"`) and the `McpServer` case as
213/// `{"kind": "mcp_server", "serverName": "..."}` to match the protocol's
214/// camelCase convention.
215#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
216#[serde(tag = "kind", rename_all = "snake_case")]
217pub enum ToolExecutor {
218    /// VM-stdlib (`read_file`, `write_file`, `exec`, `http_*`, `mcp_*`)
219    /// or any Harn-side handler closure registered in `tools_val`.
220    HarnBuiltin,
221    /// Capability provided by the host through `HostBridge.builtin_call`
222    /// (Swift-side IDE bridge, BurinApp, BurinCLI host shells).
223    HostBridge,
224    /// Tool dispatched against a configured MCP server. Detected by the
225    /// `_mcp_server` tag that `mcp_list_tools` injects on every tool
226    /// dict before the agent loop sees it.
227    McpServer { server_name: String },
228    /// Provider-side server-side tool execution — currently OpenAI
229    /// Responses-API server tools (e.g. native `tool_search`). The
230    /// runtime never dispatches these locally; the model returns the
231    /// already-executed result inline.
232    ProviderNative,
233}
234
235/// Events emitted by the agent loop. The first five variants map 1:1
236/// to ACP `sessionUpdate` variants; the last three are harn-internal.
237#[derive(Clone, Debug, Serialize, Deserialize)]
238#[serde(tag = "type", rename_all = "snake_case")]
239pub enum AgentEvent {
240    AgentMessageChunk {
241        session_id: String,
242        content: String,
243    },
244    AgentThoughtChunk {
245        session_id: String,
246        content: String,
247    },
248    ToolCall {
249        session_id: String,
250        tool_call_id: String,
251        tool_name: String,
252        kind: Option<ToolKind>,
253        status: ToolCallStatus,
254        raw_input: serde_json::Value,
255        /// Set to `Some(true)` by the streaming candidate detector
256        /// (harn#692) when this event represents a tool-call shape
257        /// detected in the model's in-flight assistant text but whose
258        /// arguments have not finished parsing yet. Clients can render a
259        /// spinner / placeholder while the model writes the body. The
260        /// detector follows up with a `ToolCallUpdate { parsing: false,
261        /// .. }` carrying either `status: pending` (promoted) or
262        /// `status: failed` with `error_category: parse_aborted`.
263        /// `None` (the default) means "this is a normal post-parse tool
264        /// call, no candidate phase was active" so the on-disk shape
265        /// stays compatible with replays recorded before this field
266        /// existed.
267        #[serde(default, skip_serializing_if = "Option::is_none")]
268        parsing: Option<bool>,
269        /// Mutation-session audit context active when the tool was
270        /// dispatched (see harn#699). Hosts use it to group every tool
271        /// emission belonging to the same write-capable session.
272        #[serde(default, skip_serializing_if = "Option::is_none")]
273        audit: Option<MutationSessionRecord>,
274    },
275    ToolCallUpdate {
276        session_id: String,
277        tool_call_id: String,
278        tool_name: String,
279        status: ToolCallStatus,
280        raw_output: Option<serde_json::Value>,
281        error: Option<String>,
282        /// Wall-clock milliseconds from the parse-to-execution boundary
283        /// to the terminal `Completed`/`Failed` update. Includes the
284        /// time spent in any wrapping orchestration logic (loop checks,
285        /// post-tool hooks, microcompaction). Populated only on the
286        /// terminal update — `None` on intermediate `Pending` /
287        /// `InProgress` updates so clients can ignore the field until
288        /// it shows up.
289        #[serde(default, skip_serializing_if = "Option::is_none")]
290        duration_ms: Option<u64>,
291        /// Milliseconds spent in the actual host/builtin/MCP dispatch
292        /// call only (the inner `dispatch_tool_execution` window).
293        /// Populated only on the terminal update; `None` otherwise.
294        #[serde(default, skip_serializing_if = "Option::is_none")]
295        execution_duration_ms: Option<u64>,
296        /// Structured classification of the failure (when `status` is
297        /// `Failed`). Paired with `error` so clients can render each
298        /// category distinctly without parsing free-form strings. Always
299        /// `None` for non-Failed updates and serialized as
300        /// `errorCategory` in the ACP wire format.
301        #[serde(default, skip_serializing_if = "Option::is_none")]
302        error_category: Option<ToolCallErrorCategory>,
303        /// Where the tool actually ran. `None` only for events emitted
304        /// from sites that pre-date the dispatch decision (e.g. the
305        /// pending → in-progress transition the loop emits before the
306        /// dispatcher picks a backend).
307        #[serde(default, skip_serializing_if = "Option::is_none")]
308        executor: Option<ToolExecutor>,
309        /// Companion to `ToolCall.parsing` (harn#692). The streaming
310        /// candidate detector emits the *terminal* candidate event as a
311        /// `ToolCallUpdate` with `parsing: Some(false)` to retract the
312        /// in-flight `parsing: true` chip — either by promoting the
313        /// candidate (`status: pending`, populated `raw_output: None`,
314        /// `error: None`) or aborting it (`status: failed`,
315        /// `error_category: parse_aborted`). `None` means this update is
316        /// not part of a candidate-phase transition.
317        #[serde(default, skip_serializing_if = "Option::is_none")]
318        parsing: Option<bool>,
319        /// Best-effort partial parse of the streamed tool-call arguments.
320        /// Populated by the SSE transport on `Pending` updates as the
321        /// model streams `input_json_delta` (Anthropic) or
322        /// `tool_calls[].function.arguments` deltas (OpenAI). `None` on
323        /// terminal updates and on emissions from non-streaming paths
324        /// (#693). When the partial bytes are not yet parseable as JSON
325        /// the transport falls back to `raw_input_partial`.
326        #[serde(default, skip_serializing_if = "Option::is_none")]
327        raw_input: Option<serde_json::Value>,
328        /// Raw concatenated bytes of the streamed tool-call arguments
329        /// when a permissive parse failed (#693). Mutually exclusive
330        /// with `raw_input`: clients render whichever is present.
331        #[serde(default, skip_serializing_if = "Option::is_none")]
332        raw_input_partial: Option<String>,
333        /// Mutation-session audit context for the tool call. Carries the
334        /// same payload as on the paired `ToolCall` event so a host
335        /// processing a single update doesn't have to correlate against
336        /// the prior pending event.
337        #[serde(default, skip_serializing_if = "Option::is_none")]
338        audit: Option<MutationSessionRecord>,
339    },
340    Plan {
341        session_id: String,
342        plan: serde_json::Value,
343    },
344    TurnStart {
345        session_id: String,
346        iteration: usize,
347    },
348    TurnEnd {
349        session_id: String,
350        iteration: usize,
351        turn_info: serde_json::Value,
352    },
353    JudgeDecision {
354        session_id: String,
355        iteration: usize,
356        verdict: String,
357        reasoning: String,
358        next_step: Option<String>,
359        judge_duration_ms: u64,
360    },
361    TypedCheckpoint {
362        session_id: String,
363        checkpoint: serde_json::Value,
364    },
365    FeedbackInjected {
366        session_id: String,
367        kind: String,
368        content: String,
369    },
370    /// Emitted when the agent loop exhausts `max_iterations` without any
371    /// explicit break condition firing. Distinct from a natural "done" or
372    /// a "stuck" nudge-exhaustion: this is strictly a budget cap.
373    BudgetExhausted {
374        session_id: String,
375        max_iterations: usize,
376    },
377    /// Emitted when the loop breaks because consecutive text-only turns
378    /// hit `max_nudges`. Parity with `BudgetExhausted` / `TurnEnd` for
379    /// hosts that key off agent-terminal events.
380    LoopStuck {
381        session_id: String,
382        max_nudges: usize,
383        last_iteration: usize,
384        tail_excerpt: String,
385    },
386    /// Emitted when the daemon idle-wait loop trips its watchdog because
387    /// every configured wake source returned `None` for N consecutive
388    /// attempts. Exists so a broken daemon doesn't hang the session
389    /// silently.
390    DaemonWatchdogTripped {
391        session_id: String,
392        attempts: usize,
393        elapsed_ms: u64,
394    },
395    /// Emitted when a skill is activated. Carries the match reason so
396    /// replayers can reconstruct *why* a given skill took effect at
397    /// this iteration.
398    SkillActivated {
399        session_id: String,
400        skill_name: String,
401        iteration: usize,
402        reason: String,
403    },
404    /// Emitted when a previously-active skill is deactivated because
405    /// the reassess phase no longer matches it.
406    SkillDeactivated {
407        session_id: String,
408        skill_name: String,
409        iteration: usize,
410    },
411    /// Emitted once per activation when the skill's `allowed_tools` filter
412    /// narrows the effective tool surface exposed to the model.
413    SkillScopeTools {
414        session_id: String,
415        skill_name: String,
416        allowed_tools: Vec<String>,
417    },
418    /// Emitted when a `tool_search` query is issued by the model. Carries
419    /// the raw query args, the configured strategy, and a `mode` tag
420    /// distinguishing the client-executed fallback (`"client"`) from
421    /// provider-native paths (`"anthropic"` / `"openai"`). Mirrors the
422    /// transcript event shape so hosts can render a search-in-progress
423    /// chip in real time — the replay path walks the transcript after
424    /// the turn, which is too late for live UX.
425    ToolSearchQuery {
426        session_id: String,
427        tool_use_id: String,
428        name: String,
429        query: serde_json::Value,
430        strategy: String,
431        mode: String,
432    },
433    /// Emitted when `tool_search` resolves — carries the list of tool
434    /// names newly promoted into the model's effective surface for the
435    /// next turn. Pair-emitted with `ToolSearchQuery` on every search.
436    ToolSearchResult {
437        session_id: String,
438        tool_use_id: String,
439        promoted: Vec<String>,
440        strategy: String,
441        mode: String,
442    },
443    TranscriptCompacted {
444        session_id: String,
445        mode: String,
446        strategy: String,
447        archived_messages: usize,
448        estimated_tokens_before: usize,
449        estimated_tokens_after: usize,
450        snapshot_asset_id: Option<String>,
451    },
452    Handoff {
453        session_id: String,
454        artifact_id: String,
455        handoff: Box<HandoffArtifact>,
456    },
457    FsWatch {
458        session_id: String,
459        subscription_id: String,
460        events: Vec<FsWatchEvent>,
461    },
462    /// Lifecycle update for a delegated/background worker. Carries the
463    /// canonical typed `event` variant alongside the worker's current
464    /// `status` string and the structured `metadata` payload that
465    /// `worker_bridge_metadata` builds (task, mode, timing, child
466    /// run/snapshot paths, audit-session, etc.). The `audit` field is
467    /// the same `MutationSessionRecord` JSON serialization carried on
468    /// the bridge wire so ACP/A2A consumers don't need to re-derive it.
469    ///
470    /// One-to-one with the bridge-side `worker_update` session-update
471    /// notification: ACP and A2A adapters subscribe to this variant
472    /// and translate it into their respective wire formats. The
473    /// `session_id` is the parent agent session that owns the worker
474    /// (i.e. the session whose VM spawned the worker), so a single
475    /// host stays subscribed to the same sink for both message and
476    /// worker traffic.
477    WorkerUpdate {
478        session_id: String,
479        worker_id: String,
480        worker_name: String,
481        worker_task: String,
482        worker_mode: String,
483        event: WorkerEvent,
484        status: String,
485        metadata: serde_json::Value,
486        audit: Option<serde_json::Value>,
487    },
488    /// A human-in-the-loop primitive (`ask_user`, `request_approval`,
489    /// `dual_control`, `escalate`) has just suspended the script and is
490    /// waiting on a response. Hosts that bridge the VM onto a remote
491    /// transport (ACP, A2A) translate this into a "paused / awaiting
492    /// input" wire signal so the client knows the task isn't stuck —
493    /// it's blocked on the human side. Pair-emitted with `HitlResolved`
494    /// when the waitpoint completes/cancels/times out.
495    HitlRequested {
496        session_id: String,
497        request_id: String,
498        kind: String,
499        payload: serde_json::Value,
500    },
501    /// Companion to `HitlRequested`: the waitpoint has resolved (either
502    /// a response arrived, the deadline elapsed, or the request was
503    /// cancelled). `outcome` is one of `"answered"`, `"timeout"`,
504    /// `"cancelled"`. Hosts use this to flip task state back to
505    /// `working` after an `input-required` pause.
506    HitlResolved {
507        session_id: String,
508        request_id: String,
509        kind: String,
510        outcome: String,
511    },
512    /// Emitted by the agent loop's adaptive iteration budget /
513    /// `loop_control` policy when a budget extension or early stop fires.
514    /// Generic enough to cover both shapes — `action` distinguishes them.
515    /// Carries the iteration the decision applied to, the previous /
516    /// resulting iteration limit, the policy reason string, and (for
517    /// stops) the loop status.
518    LoopControlDecision {
519        session_id: String,
520        iteration: usize,
521        action: String,
522        old_limit: usize,
523        new_limit: usize,
524        reason: String,
525        status: String,
526    },
527    /// Emitted when a `tool_caller` middleware (see std/llm/tool_middleware)
528    /// attaches structured audit metadata to a tool call — typically a
529    /// user-facing `summary`, a `description`, an ACP-style `kind`, an MCP
530    /// `hints` block, a `consent` decision, the per-layer `layers` log, or
531    /// free-form `metadata` keys (A2A-style extension slot).
532    ///
533    /// One-to-one with the underlying tool-call: hosts can join on
534    /// `tool_call_id` to render middleware-attached chips alongside the
535    /// existing `ToolCall` / `ToolCallUpdate` stream. The `audit` payload
536    /// is intentionally free-form JSON so middleware can carry whatever
537    /// shape the harness author chooses without needing protocol-level
538    /// changes per new middleware.
539    ToolCallAudit {
540        session_id: String,
541        tool_call_id: String,
542        tool_name: String,
543        audit: serde_json::Value,
544    },
545}
546
547impl AgentEvent {
548    pub fn session_id(&self) -> &str {
549        match self {
550            Self::AgentMessageChunk { session_id, .. }
551            | Self::AgentThoughtChunk { session_id, .. }
552            | Self::ToolCall { session_id, .. }
553            | Self::ToolCallUpdate { session_id, .. }
554            | Self::Plan { session_id, .. }
555            | Self::TurnStart { session_id, .. }
556            | Self::TurnEnd { session_id, .. }
557            | Self::JudgeDecision { session_id, .. }
558            | Self::TypedCheckpoint { session_id, .. }
559            | Self::FeedbackInjected { session_id, .. }
560            | Self::BudgetExhausted { session_id, .. }
561            | Self::LoopStuck { session_id, .. }
562            | Self::DaemonWatchdogTripped { session_id, .. }
563            | Self::SkillActivated { session_id, .. }
564            | Self::SkillDeactivated { session_id, .. }
565            | Self::SkillScopeTools { session_id, .. }
566            | Self::ToolSearchQuery { session_id, .. }
567            | Self::ToolSearchResult { session_id, .. }
568            | Self::TranscriptCompacted { session_id, .. }
569            | Self::Handoff { session_id, .. }
570            | Self::FsWatch { session_id, .. }
571            | Self::WorkerUpdate { session_id, .. }
572            | Self::HitlRequested { session_id, .. }
573            | Self::HitlResolved { session_id, .. }
574            | Self::LoopControlDecision { session_id, .. }
575            | Self::ToolCallAudit { session_id, .. } => session_id,
576        }
577    }
578}
579
580/// External consumers of the event stream (e.g. the harn-cli ACP server,
581/// which translates events into JSON-RPC notifications).
582pub trait AgentEventSink: Send + Sync {
583    fn handle_event(&self, event: &AgentEvent);
584}
585
586/// Envelope written to `event_log.jsonl` (#103). Wraps the raw
587/// `AgentEvent` with monotonic index + timestamp + frame depth so
588/// replay engines can reconstruct paused state at any event index,
589/// and scrubber UIs can bucket events by time. The envelope is the
590/// on-disk shape; the wire format for live consumers is still the
591/// raw `AgentEvent` so existing sinks don't churn.
592#[derive(Clone, Debug, Serialize, Deserialize)]
593pub struct PersistedAgentEvent {
594    /// Monotonic per-session index starting at 0. Unique within a
595    /// session; gaps never happen even under load because the sink
596    /// owns the counter under a mutex.
597    pub index: u64,
598    /// Milliseconds since the Unix epoch, captured when the sink
599    /// received the event. Not the event's emission time — that
600    /// would require threading a clock through every emit site.
601    pub emitted_at_ms: i64,
602    /// Call-stack depth at the moment of emission, when the caller
603    /// can supply it. `None` for events emitted from a context where
604    /// the VM frame stack isn't available.
605    pub frame_depth: Option<u32>,
606    /// The raw event, flattened so `jq '.type'` works as expected.
607    #[serde(flatten)]
608    pub event: AgentEvent,
609}
610
611/// Append-only JSONL sink for a single session's event stream (#103).
612/// One writer per session; sinks rotate to a numbered suffix when a
613/// running file crosses `ROTATE_BYTES` (100 MB today — long chat
614/// sessions rarely exceed 5 MB, so rotation almost never fires).
615pub struct JsonlEventSink {
616    state: Mutex<JsonlEventSinkState>,
617    base_path: std::path::PathBuf,
618}
619
620struct JsonlEventSinkState {
621    writer: std::io::BufWriter<std::fs::File>,
622    index: u64,
623    bytes_written: u64,
624    rotation: u32,
625}
626
627impl JsonlEventSink {
628    /// Hard cap past which the current file rotates to a numbered
629    /// suffix (`event_log-000001.jsonl`). Chosen so long debugging
630    /// sessions don't produce unreadable multi-GB logs.
631    pub const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
632
633    /// Open a new sink writing to `base_path`. Creates parent dirs
634    /// if missing. Overwrites an existing file so each fresh session
635    /// starts from index 0.
636    pub fn open(base_path: impl Into<std::path::PathBuf>) -> std::io::Result<Arc<Self>> {
637        let base_path = base_path.into();
638        if let Some(parent) = base_path.parent() {
639            std::fs::create_dir_all(parent)?;
640        }
641        let file = std::fs::OpenOptions::new()
642            .create(true)
643            .truncate(true)
644            .write(true)
645            .open(&base_path)?;
646        Ok(Arc::new(Self {
647            state: Mutex::new(JsonlEventSinkState {
648                writer: std::io::BufWriter::new(file),
649                index: 0,
650                bytes_written: 0,
651                rotation: 0,
652            }),
653            base_path,
654        }))
655    }
656
657    /// Flush any buffered writes. Called on session shutdown; the
658    /// Drop impl calls this too but on early panic it may not run.
659    pub fn flush(&self) -> std::io::Result<()> {
660        use std::io::Write as _;
661        self.state
662            .lock()
663            .expect("jsonl sink mutex poisoned")
664            .writer
665            .flush()
666    }
667
668    /// Current event index — primarily for tests and the "how many
669    /// events are in this run" run-record summary.
670    pub fn event_count(&self) -> u64 {
671        self.state.lock().expect("jsonl sink mutex poisoned").index
672    }
673
674    fn rotate_if_needed(&self, state: &mut JsonlEventSinkState) -> std::io::Result<()> {
675        use std::io::Write as _;
676        if state.bytes_written < Self::ROTATE_BYTES {
677            return Ok(());
678        }
679        state.writer.flush()?;
680        state.rotation += 1;
681        let suffix = format!("-{:06}", state.rotation);
682        let rotated = self.base_path.with_file_name({
683            let stem = self
684                .base_path
685                .file_stem()
686                .and_then(|s| s.to_str())
687                .unwrap_or("event_log");
688            let ext = self
689                .base_path
690                .extension()
691                .and_then(|e| e.to_str())
692                .unwrap_or("jsonl");
693            format!("{stem}{suffix}.{ext}")
694        });
695        let file = std::fs::OpenOptions::new()
696            .create(true)
697            .truncate(true)
698            .write(true)
699            .open(&rotated)?;
700        state.writer = std::io::BufWriter::new(file);
701        state.bytes_written = 0;
702        Ok(())
703    }
704}
705
706/// Event-log-backed sink for a single session's agent event stream.
707/// Uses the generalized append-only event log when one is installed for
708/// the current VM thread and falls back to `JsonlEventSink` only for
709/// older env-driven workflows.
710pub struct EventLogSink {
711    log: Arc<AnyEventLog>,
712    topic: Topic,
713    session_id: String,
714}
715
716impl EventLogSink {
717    pub fn new(log: Arc<AnyEventLog>, session_id: impl Into<String>) -> Arc<Self> {
718        let session_id = session_id.into();
719        let topic = Topic::new(format!(
720            "observability.agent_events.{}",
721            crate::event_log::sanitize_topic_component(&session_id)
722        ))
723        .expect("session id should sanitize to a valid topic");
724        Arc::new(Self {
725            log,
726            topic,
727            session_id,
728        })
729    }
730}
731
732impl AgentEventSink for JsonlEventSink {
733    fn handle_event(&self, event: &AgentEvent) {
734        use std::io::Write as _;
735        let mut state = self.state.lock().expect("jsonl sink mutex poisoned");
736        let index = state.index;
737        state.index += 1;
738        let emitted_at_ms = std::time::SystemTime::now()
739            .duration_since(std::time::UNIX_EPOCH)
740            .map(|d| d.as_millis() as i64)
741            .unwrap_or(0);
742        let envelope = PersistedAgentEvent {
743            index,
744            emitted_at_ms,
745            frame_depth: None,
746            event: event.clone(),
747        };
748        if let Ok(line) = serde_json::to_string(&envelope) {
749            // One line, newline-terminated — JSON Lines spec.
750            // Errors here are swallowed on purpose; a failing write
751            // must never crash the agent loop, and the run record
752            // itself is a secondary artifact.
753            let _ = state.writer.write_all(line.as_bytes());
754            let _ = state.writer.write_all(b"\n");
755            state.bytes_written += line.len() as u64 + 1;
756            let _ = self.rotate_if_needed(&mut state);
757        }
758    }
759}
760
761impl AgentEventSink for EventLogSink {
762    fn handle_event(&self, event: &AgentEvent) {
763        let event_json = match serde_json::to_value(event) {
764            Ok(value) => value,
765            Err(_) => return,
766        };
767        let event_kind = event_json
768            .get("type")
769            .and_then(|value| value.as_str())
770            .unwrap_or("agent_event")
771            .to_string();
772        let payload = serde_json::json!({
773            "index_hint": now_ms(),
774            "session_id": self.session_id,
775            "event": event_json,
776        });
777        let mut headers = std::collections::BTreeMap::new();
778        headers.insert("session_id".to_string(), self.session_id.clone());
779        let log = self.log.clone();
780        let topic = self.topic.clone();
781        let record = EventLogRecord::new(event_kind, payload).with_headers(headers);
782        if let Ok(handle) = tokio::runtime::Handle::try_current() {
783            handle.spawn(async move {
784                let _ = log.append(&topic, record).await;
785            });
786        } else {
787            let _ = futures::executor::block_on(log.append(&topic, record));
788        }
789    }
790}
791
792impl Drop for JsonlEventSink {
793    fn drop(&mut self) {
794        if let Ok(mut state) = self.state.lock() {
795            use std::io::Write as _;
796            let _ = state.writer.flush();
797        }
798    }
799}
800
801/// Fan-out helper for composing multiple external sinks.
802pub struct MultiSink {
803    sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
804}
805
806impl MultiSink {
807    pub fn new() -> Self {
808        Self {
809            sinks: Mutex::new(Vec::new()),
810        }
811    }
812    pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
813        self.sinks.lock().expect("sink mutex poisoned").push(sink);
814    }
815    pub fn len(&self) -> usize {
816        self.sinks.lock().expect("sink mutex poisoned").len()
817    }
818    pub fn is_empty(&self) -> bool {
819        self.len() == 0
820    }
821}
822
823impl Default for MultiSink {
824    fn default() -> Self {
825        Self::new()
826    }
827}
828
829impl AgentEventSink for MultiSink {
830    fn handle_event(&self, event: &AgentEvent) {
831        // Deliberate: snapshot then release the lock before invoking sink
832        // callbacks. Sinks can re-enter the event system (e.g. a host
833        // sink that logs to another AgentEvent path), so holding the
834        // mutex across the callback would risk self-deadlock. Arc clones
835        // are refcount bumps — cheap.
836        let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
837        for sink in sinks {
838            sink.handle_event(event);
839        }
840    }
841}
842
843#[cfg(test)]
844#[derive(Clone)]
845struct RegisteredSink {
846    owner: std::thread::ThreadId,
847    sink: Arc<dyn AgentEventSink>,
848}
849
850#[cfg(not(test))]
851type RegisteredSink = Arc<dyn AgentEventSink>;
852
853type ExternalSinkRegistry = RwLock<HashMap<String, Vec<RegisteredSink>>>;
854
855fn external_sinks() -> &'static ExternalSinkRegistry {
856    static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
857    REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
858}
859
860pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
861    let session_id = session_id.into();
862    let mut reg = external_sinks().write().expect("sink registry poisoned");
863    #[cfg(test)]
864    let sink = RegisteredSink {
865        owner: std::thread::current().id(),
866        sink,
867    };
868    reg.entry(session_id).or_default().push(sink);
869}
870
871/// Remove all external sinks registered for `session_id`. Does NOT
872/// close the session itself — subscribers and transcript survive, so a
873/// later `agent_loop` call with the same id continues the conversation.
874pub fn clear_session_sinks(session_id: &str) {
875    #[cfg(test)]
876    {
877        let owner = std::thread::current().id();
878        let mut reg = external_sinks().write().expect("sink registry poisoned");
879        if let Some(sinks) = reg.get_mut(session_id) {
880            sinks.retain(|sink| sink.owner != owner);
881            if sinks.is_empty() {
882                reg.remove(session_id);
883            }
884        }
885    }
886    #[cfg(not(test))]
887    {
888        external_sinks()
889            .write()
890            .expect("sink registry poisoned")
891            .remove(session_id);
892    }
893}
894
895pub fn reset_all_sinks() {
896    #[cfg(test)]
897    {
898        let owner = std::thread::current().id();
899        let mut reg = external_sinks().write().expect("sink registry poisoned");
900        reg.retain(|_, sinks| {
901            sinks.retain(|sink| sink.owner != owner);
902            !sinks.is_empty()
903        });
904        crate::agent_sessions::reset_session_store();
905    }
906    #[cfg(not(test))]
907    {
908        external_sinks()
909            .write()
910            .expect("sink registry poisoned")
911            .clear();
912        crate::agent_sessions::reset_session_store();
913    }
914}
915
916/// Mirror externally-registered sinks from `source_session_id` onto
917/// `target_session_id` without moving ownership. Transports such as ACP
918/// register sinks on the outer prompt session before a script runs; scripts
919/// may then open a first-class agent transcript and route `agent_loop` events
920/// through that inner id. Mirroring keeps the transport subscribed to the
921/// in-run child transcript while preserving explicit session ids.
922pub fn mirror_session_sinks(source_session_id: &str, target_session_id: &str) {
923    if source_session_id.is_empty() || target_session_id.is_empty() {
924        return;
925    }
926    if source_session_id == target_session_id {
927        return;
928    }
929    let mut reg = external_sinks().write().expect("sink registry poisoned");
930    let Some(source_sinks) = reg.get(source_session_id).cloned() else {
931        return;
932    };
933    let target = reg.entry(target_session_id.to_string()).or_default();
934    #[cfg(test)]
935    {
936        for source in source_sinks {
937            let already_present = target
938                .iter()
939                .any(|existing| Arc::ptr_eq(&existing.sink, &source.sink));
940            if !already_present {
941                target.push(source);
942            }
943        }
944    }
945    #[cfg(not(test))]
946    {
947        for source in source_sinks {
948            let already_present = target.iter().any(|existing| Arc::ptr_eq(existing, &source));
949            if !already_present {
950                target.push(source);
951            }
952        }
953    }
954}
955
956/// Emit an event to external sinks registered for this session. Pipeline
957/// closure subscribers are NOT called by this function — the agent
958/// loop owns that path because it needs its async VM context.
959pub fn emit_event(event: &AgentEvent) {
960    let sinks: Vec<Arc<dyn AgentEventSink>> = {
961        let reg = external_sinks().read().expect("sink registry poisoned");
962        #[cfg(test)]
963        {
964            let owner = std::thread::current().id();
965            reg.get(event.session_id())
966                .map(|sinks| {
967                    sinks
968                        .iter()
969                        .filter(|sink| sink.owner == owner)
970                        .map(|sink| sink.sink.clone())
971                        .collect()
972                })
973                .unwrap_or_default()
974        }
975        #[cfg(not(test))]
976        {
977            reg.get(event.session_id()).cloned().unwrap_or_default()
978        }
979    };
980    for sink in sinks {
981        sink.handle_event(event);
982    }
983}
984
985fn now_ms() -> i64 {
986    std::time::SystemTime::now()
987        .duration_since(std::time::UNIX_EPOCH)
988        .map(|duration| duration.as_millis() as i64)
989        .unwrap_or(0)
990}
991
992pub fn session_external_sink_count(session_id: &str) -> usize {
993    #[cfg(test)]
994    {
995        let owner = std::thread::current().id();
996        return external_sinks()
997            .read()
998            .expect("sink registry poisoned")
999            .get(session_id)
1000            .map(|sinks| sinks.iter().filter(|sink| sink.owner == owner).count())
1001            .unwrap_or(0);
1002    }
1003    #[cfg(not(test))]
1004    {
1005        external_sinks()
1006            .read()
1007            .expect("sink registry poisoned")
1008            .get(session_id)
1009            .map(|v| v.len())
1010            .unwrap_or(0)
1011    }
1012}
1013
1014pub fn session_closure_subscriber_count(session_id: &str) -> usize {
1015    crate::agent_sessions::subscriber_count(session_id)
1016}
1017
1018#[cfg(test)]
1019mod tests {
1020    use super::*;
1021    use std::sync::atomic::{AtomicUsize, Ordering};
1022
1023    struct CountingSink(Arc<AtomicUsize>);
1024    impl AgentEventSink for CountingSink {
1025        fn handle_event(&self, _event: &AgentEvent) {
1026            self.0.fetch_add(1, Ordering::SeqCst);
1027        }
1028    }
1029
1030    #[test]
1031    fn multi_sink_fans_out_in_order() {
1032        let multi = MultiSink::new();
1033        let a = Arc::new(AtomicUsize::new(0));
1034        let b = Arc::new(AtomicUsize::new(0));
1035        multi.push(Arc::new(CountingSink(a.clone())));
1036        multi.push(Arc::new(CountingSink(b.clone())));
1037        let event = AgentEvent::TurnStart {
1038            session_id: "s1".into(),
1039            iteration: 1,
1040        };
1041        multi.handle_event(&event);
1042        assert_eq!(a.load(Ordering::SeqCst), 1);
1043        assert_eq!(b.load(Ordering::SeqCst), 1);
1044    }
1045
1046    #[test]
1047    fn session_scoped_sink_routing() {
1048        reset_all_sinks();
1049        let a = Arc::new(AtomicUsize::new(0));
1050        let b = Arc::new(AtomicUsize::new(0));
1051        register_sink("session-a", Arc::new(CountingSink(a.clone())));
1052        register_sink("session-b", Arc::new(CountingSink(b.clone())));
1053        emit_event(&AgentEvent::TurnStart {
1054            session_id: "session-a".into(),
1055            iteration: 0,
1056        });
1057        assert_eq!(a.load(Ordering::SeqCst), 1);
1058        assert_eq!(b.load(Ordering::SeqCst), 0);
1059        emit_event(&AgentEvent::TurnEnd {
1060            session_id: "session-b".into(),
1061            iteration: 0,
1062            turn_info: serde_json::json!({}),
1063        });
1064        assert_eq!(a.load(Ordering::SeqCst), 1);
1065        assert_eq!(b.load(Ordering::SeqCst), 1);
1066        clear_session_sinks("session-a");
1067        assert_eq!(session_external_sink_count("session-a"), 0);
1068        assert_eq!(session_external_sink_count("session-b"), 1);
1069        reset_all_sinks();
1070    }
1071
1072    #[test]
1073    fn newly_opened_child_session_inherits_current_external_sinks() {
1074        reset_all_sinks();
1075        let delivered = Arc::new(AtomicUsize::new(0));
1076        register_sink("outer-session", Arc::new(CountingSink(delivered.clone())));
1077        {
1078            let _guard = crate::agent_sessions::enter_current_session("outer-session");
1079            let inner = crate::agent_sessions::open_or_create(None);
1080            assert_ne!(inner, "outer-session");
1081            emit_event(&AgentEvent::TurnStart {
1082                session_id: inner,
1083                iteration: 0,
1084            });
1085        }
1086        assert_eq!(delivered.load(Ordering::SeqCst), 1);
1087        reset_all_sinks();
1088    }
1089
1090    #[test]
1091    fn jsonl_sink_writes_monotonic_indices_and_timestamps() {
1092        use std::io::{BufRead, BufReader};
1093        let dir = std::env::temp_dir().join(format!("harn-event-log-{}", std::process::id()));
1094        std::fs::create_dir_all(&dir).unwrap();
1095        let path = dir.join("event_log.jsonl");
1096        let sink = JsonlEventSink::open(&path).unwrap();
1097        for i in 0..5 {
1098            sink.handle_event(&AgentEvent::TurnStart {
1099                session_id: "s".into(),
1100                iteration: i,
1101            });
1102        }
1103        assert_eq!(sink.event_count(), 5);
1104        sink.flush().unwrap();
1105
1106        // Read back + assert monotonic indices + non-decreasing timestamps.
1107        let file = std::fs::File::open(&path).unwrap();
1108        let mut last_idx: i64 = -1;
1109        let mut last_ts: i64 = 0;
1110        for line in BufReader::new(file).lines() {
1111            let line = line.unwrap();
1112            let val: serde_json::Value = serde_json::from_str(&line).unwrap();
1113            let idx = val["index"].as_i64().unwrap();
1114            let ts = val["emitted_at_ms"].as_i64().unwrap();
1115            assert_eq!(idx, last_idx + 1, "indices must be contiguous");
1116            assert!(ts >= last_ts, "timestamps must be non-decreasing");
1117            last_idx = idx;
1118            last_ts = ts;
1119            // Event payload flattened — type tag must survive.
1120            assert_eq!(val["type"], "turn_start");
1121        }
1122        assert_eq!(last_idx, 4);
1123        let _ = std::fs::remove_file(&path);
1124    }
1125
1126    #[test]
1127    fn judge_decision_round_trips_through_jsonl_sink() {
1128        use std::io::{BufRead, BufReader};
1129        let dir =
1130            std::env::temp_dir().join(format!("harn-judge-event-log-{}", uuid::Uuid::now_v7()));
1131        std::fs::create_dir_all(&dir).unwrap();
1132        let path = dir.join("event_log.jsonl");
1133        let sink = JsonlEventSink::open(&path).unwrap();
1134        sink.handle_event(&AgentEvent::JudgeDecision {
1135            session_id: "s".into(),
1136            iteration: 2,
1137            verdict: "continue".into(),
1138            reasoning: "needs a concrete next step".into(),
1139            next_step: Some("run the verifier".into()),
1140            judge_duration_ms: 17,
1141        });
1142        sink.flush().unwrap();
1143
1144        let file = std::fs::File::open(&path).unwrap();
1145        let line = BufReader::new(file).lines().next().unwrap().unwrap();
1146        let recovered: PersistedAgentEvent = serde_json::from_str(&line).unwrap();
1147        match recovered.event {
1148            AgentEvent::JudgeDecision {
1149                session_id,
1150                iteration,
1151                verdict,
1152                reasoning,
1153                next_step,
1154                judge_duration_ms,
1155            } => {
1156                assert_eq!(session_id, "s");
1157                assert_eq!(iteration, 2);
1158                assert_eq!(verdict, "continue");
1159                assert_eq!(reasoning, "needs a concrete next step");
1160                assert_eq!(next_step.as_deref(), Some("run the verifier"));
1161                assert_eq!(judge_duration_ms, 17);
1162            }
1163            other => panic!("expected JudgeDecision, got {other:?}"),
1164        }
1165        let value: serde_json::Value = serde_json::from_str(&line).unwrap();
1166        assert_eq!(value["type"], "judge_decision");
1167        let _ = std::fs::remove_file(&path);
1168        let _ = std::fs::remove_dir(&dir);
1169    }
1170
1171    #[test]
1172    fn tool_call_update_durations_serialize_when_present_and_skip_when_absent() {
1173        // Terminal update with both durations populated — both fields
1174        // appear in the JSON. Snake_case keys here because this is the
1175        // canonical AgentEvent shape; the ACP adapter renames to
1176        // camelCase separately.
1177        let terminal = AgentEvent::ToolCallUpdate {
1178            session_id: "s".into(),
1179            tool_call_id: "tc-1".into(),
1180            tool_name: "read".into(),
1181            status: ToolCallStatus::Completed,
1182            raw_output: None,
1183            error: None,
1184            duration_ms: Some(42),
1185            execution_duration_ms: Some(7),
1186            error_category: None,
1187            executor: None,
1188            parsing: None,
1189
1190            raw_input: None,
1191            raw_input_partial: None,
1192            audit: None,
1193        };
1194        let value = serde_json::to_value(&terminal).unwrap();
1195        assert_eq!(value["duration_ms"], serde_json::json!(42));
1196        assert_eq!(value["execution_duration_ms"], serde_json::json!(7));
1197
1198        // In-progress update with `None` for both — both keys must be
1199        // absent (not `null`) so older ACP clients that key off
1200        // presence don't see a misleading zero.
1201        let intermediate = AgentEvent::ToolCallUpdate {
1202            session_id: "s".into(),
1203            tool_call_id: "tc-1".into(),
1204            tool_name: "read".into(),
1205            status: ToolCallStatus::InProgress,
1206            raw_output: None,
1207            error: None,
1208            duration_ms: None,
1209            execution_duration_ms: None,
1210            error_category: None,
1211            executor: None,
1212            parsing: None,
1213
1214            raw_input: None,
1215            raw_input_partial: None,
1216            audit: None,
1217        };
1218        let value = serde_json::to_value(&intermediate).unwrap();
1219        let object = value.as_object().expect("update serializes as object");
1220        assert!(
1221            !object.contains_key("duration_ms"),
1222            "duration_ms must be omitted when None: {value}"
1223        );
1224        assert!(
1225            !object.contains_key("execution_duration_ms"),
1226            "execution_duration_ms must be omitted when None: {value}"
1227        );
1228    }
1229
1230    #[test]
1231    fn tool_call_update_deserializes_without_duration_fields_for_back_compat() {
1232        // Persisted event-log entries written before the fields existed
1233        // must still deserialize cleanly. The missing keys map to None.
1234        let raw = serde_json::json!({
1235            "type": "tool_call_update",
1236            "session_id": "s",
1237            "tool_call_id": "tc-1",
1238            "tool_name": "read",
1239            "status": "completed",
1240            "raw_output": null,
1241            "error": null,
1242        });
1243        let event: AgentEvent = serde_json::from_value(raw).expect("parses without duration keys");
1244        match event {
1245            AgentEvent::ToolCallUpdate {
1246                duration_ms,
1247                execution_duration_ms,
1248                ..
1249            } => {
1250                assert!(duration_ms.is_none());
1251                assert!(execution_duration_ms.is_none());
1252            }
1253            other => panic!("expected ToolCallUpdate, got {other:?}"),
1254        }
1255    }
1256
1257    #[test]
1258    fn tool_call_status_serde() {
1259        assert_eq!(
1260            serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
1261            "\"pending\""
1262        );
1263        assert_eq!(
1264            serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
1265            "\"in_progress\""
1266        );
1267        assert_eq!(
1268            serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
1269            "\"completed\""
1270        );
1271        assert_eq!(
1272            serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
1273            "\"failed\""
1274        );
1275    }
1276
1277    #[test]
1278    fn tool_call_error_category_serializes_as_snake_case() {
1279        let pairs = [
1280            (ToolCallErrorCategory::SchemaValidation, "schema_validation"),
1281            (ToolCallErrorCategory::ToolError, "tool_error"),
1282            (ToolCallErrorCategory::McpServerError, "mcp_server_error"),
1283            (ToolCallErrorCategory::HostBridgeError, "host_bridge_error"),
1284            (ToolCallErrorCategory::PermissionDenied, "permission_denied"),
1285            (ToolCallErrorCategory::RejectedLoop, "rejected_loop"),
1286            (ToolCallErrorCategory::ParseAborted, "parse_aborted"),
1287            (ToolCallErrorCategory::Timeout, "timeout"),
1288            (ToolCallErrorCategory::Network, "network"),
1289            (ToolCallErrorCategory::Cancelled, "cancelled"),
1290            (ToolCallErrorCategory::Unknown, "unknown"),
1291        ];
1292        for (variant, wire) in pairs {
1293            let encoded = serde_json::to_string(&variant).unwrap();
1294            assert_eq!(encoded, format!("\"{wire}\""));
1295            assert_eq!(variant.as_str(), wire);
1296            // Round-trip via deserialize so wire stability is enforced
1297            // both ways.
1298            let decoded: ToolCallErrorCategory = serde_json::from_str(&encoded).unwrap();
1299            assert_eq!(decoded, variant);
1300        }
1301    }
1302
1303    #[test]
1304    fn tool_executor_round_trips_with_adjacent_tag() {
1305        // Adjacent tagging keeps the wire shape uniform — every variant
1306        // is a JSON object with a `kind` discriminator. The ACP adapter
1307        // rewrites unit variants as bare strings; the on-disk event log
1308        // keeps the object shape so deserialize can recover the variant.
1309        for executor in [
1310            ToolExecutor::HarnBuiltin,
1311            ToolExecutor::HostBridge,
1312            ToolExecutor::McpServer {
1313                server_name: "linear".to_string(),
1314            },
1315            ToolExecutor::ProviderNative,
1316        ] {
1317            let json = serde_json::to_value(&executor).unwrap();
1318            let kind = json.get("kind").and_then(|v| v.as_str()).unwrap();
1319            match &executor {
1320                ToolExecutor::HarnBuiltin => assert_eq!(kind, "harn_builtin"),
1321                ToolExecutor::HostBridge => assert_eq!(kind, "host_bridge"),
1322                ToolExecutor::McpServer { server_name } => {
1323                    assert_eq!(kind, "mcp_server");
1324                    assert_eq!(json["server_name"], *server_name);
1325                }
1326                ToolExecutor::ProviderNative => assert_eq!(kind, "provider_native"),
1327            }
1328            let recovered: ToolExecutor = serde_json::from_value(json).unwrap();
1329            assert_eq!(recovered, executor);
1330        }
1331    }
1332
1333    #[test]
1334    fn tool_call_error_category_from_internal_collapses_transient_family() {
1335        use crate::value::ErrorCategory as Internal;
1336        assert_eq!(
1337            ToolCallErrorCategory::from_internal(&Internal::Timeout),
1338            ToolCallErrorCategory::Timeout
1339        );
1340        for net in [
1341            Internal::RateLimit,
1342            Internal::Overloaded,
1343            Internal::ServerError,
1344            Internal::TransientNetwork,
1345        ] {
1346            assert_eq!(
1347                ToolCallErrorCategory::from_internal(&net),
1348                ToolCallErrorCategory::Network,
1349                "{net:?} should map to Network",
1350            );
1351        }
1352        assert_eq!(
1353            ToolCallErrorCategory::from_internal(&Internal::SchemaValidation),
1354            ToolCallErrorCategory::SchemaValidation
1355        );
1356        assert_eq!(
1357            ToolCallErrorCategory::from_internal(&Internal::ToolError),
1358            ToolCallErrorCategory::ToolError
1359        );
1360        assert_eq!(
1361            ToolCallErrorCategory::from_internal(&Internal::ToolRejected),
1362            ToolCallErrorCategory::PermissionDenied
1363        );
1364        assert_eq!(
1365            ToolCallErrorCategory::from_internal(&Internal::Cancelled),
1366            ToolCallErrorCategory::Cancelled
1367        );
1368        for bridge in [
1369            Internal::Auth,
1370            Internal::EgressBlocked,
1371            Internal::NotFound,
1372            Internal::CircuitOpen,
1373            Internal::Generic,
1374        ] {
1375            assert_eq!(
1376                ToolCallErrorCategory::from_internal(&bridge),
1377                ToolCallErrorCategory::HostBridgeError,
1378                "{bridge:?} should map to HostBridgeError",
1379            );
1380        }
1381    }
1382
1383    #[test]
1384    fn tool_call_update_event_omits_error_category_when_none() {
1385        let event = AgentEvent::ToolCallUpdate {
1386            session_id: "s".into(),
1387            tool_call_id: "t".into(),
1388            tool_name: "read".into(),
1389            status: ToolCallStatus::Completed,
1390            raw_output: None,
1391            error: None,
1392            duration_ms: None,
1393            execution_duration_ms: None,
1394            error_category: None,
1395            executor: None,
1396            parsing: None,
1397
1398            raw_input: None,
1399            raw_input_partial: None,
1400            audit: None,
1401        };
1402        let v = serde_json::to_value(&event).unwrap();
1403        assert_eq!(v["type"], "tool_call_update");
1404        assert!(v.get("error_category").is_none());
1405    }
1406
1407    #[test]
1408    fn tool_call_update_event_serializes_error_category_when_set() {
1409        let event = AgentEvent::ToolCallUpdate {
1410            session_id: "s".into(),
1411            tool_call_id: "t".into(),
1412            tool_name: "read".into(),
1413            status: ToolCallStatus::Failed,
1414            raw_output: None,
1415            error: Some("missing required field".into()),
1416            duration_ms: None,
1417            execution_duration_ms: None,
1418            error_category: Some(ToolCallErrorCategory::SchemaValidation),
1419            executor: None,
1420            parsing: None,
1421
1422            raw_input: None,
1423            raw_input_partial: None,
1424            audit: None,
1425        };
1426        let v = serde_json::to_value(&event).unwrap();
1427        assert_eq!(v["error_category"], "schema_validation");
1428        assert_eq!(v["error"], "missing required field");
1429    }
1430
1431    #[test]
1432    fn tool_call_update_omits_executor_when_absent() {
1433        // `executor: None` must not appear in the serialized event so
1434        // the on-disk shape stays backward-compatible with replays
1435        // recorded before harn#691.
1436        let event = AgentEvent::ToolCallUpdate {
1437            session_id: "s".into(),
1438            tool_call_id: "tc-1".into(),
1439            tool_name: "read".into(),
1440            status: ToolCallStatus::Completed,
1441            raw_output: None,
1442            error: None,
1443            duration_ms: None,
1444            execution_duration_ms: None,
1445            error_category: None,
1446            executor: None,
1447            parsing: None,
1448
1449            raw_input: None,
1450            raw_input_partial: None,
1451            audit: None,
1452        };
1453        let json = serde_json::to_value(&event).unwrap();
1454        assert!(json.get("executor").is_none(), "got: {json}");
1455    }
1456
1457    #[test]
1458    fn worker_event_status_strings_cover_all_variants() {
1459        // Wire-level status strings flow into both bridge `worker_update`
1460        // payloads and ACP `session/update` notifications. Pinning every
1461        // variant here so a future addition can't silently land without
1462        // a docs/wire decision.
1463        assert_eq!(WorkerEvent::WorkerSpawned.as_status(), "running");
1464        assert_eq!(WorkerEvent::WorkerProgressed.as_status(), "progressed");
1465        assert_eq!(
1466            WorkerEvent::WorkerWaitingForInput.as_status(),
1467            "awaiting_input"
1468        );
1469        assert_eq!(WorkerEvent::WorkerCompleted.as_status(), "completed");
1470        assert_eq!(WorkerEvent::WorkerFailed.as_status(), "failed");
1471        assert_eq!(WorkerEvent::WorkerCancelled.as_status(), "cancelled");
1472
1473        for terminal in [
1474            WorkerEvent::WorkerCompleted,
1475            WorkerEvent::WorkerFailed,
1476            WorkerEvent::WorkerCancelled,
1477        ] {
1478            assert!(terminal.is_terminal(), "{terminal:?} should be terminal");
1479        }
1480        for non_terminal in [
1481            WorkerEvent::WorkerSpawned,
1482            WorkerEvent::WorkerProgressed,
1483            WorkerEvent::WorkerWaitingForInput,
1484        ] {
1485            assert!(
1486                !non_terminal.is_terminal(),
1487                "{non_terminal:?} should not be terminal"
1488            );
1489        }
1490    }
1491
1492    #[test]
1493    fn worker_update_event_routes_through_session_keyed_sink() {
1494        // Worker lifecycle events ride the same session-keyed
1495        // `AgentEventSink` registry as message and tool events. This
1496        // is the canonical path ACP and A2A subscribe to — gate it
1497        // here so a registry-routing regression breaks loudly.
1498        reset_all_sinks();
1499        let captured: Arc<Mutex<Vec<AgentEvent>>> = Arc::new(Mutex::new(Vec::new()));
1500        struct CapturingSink(Arc<Mutex<Vec<AgentEvent>>>);
1501        impl AgentEventSink for CapturingSink {
1502            fn handle_event(&self, event: &AgentEvent) {
1503                self.0
1504                    .lock()
1505                    .expect("captured sink mutex poisoned")
1506                    .push(event.clone());
1507            }
1508        }
1509        register_sink(
1510            "worker-session-1",
1511            Arc::new(CapturingSink(captured.clone())),
1512        );
1513        emit_event(&AgentEvent::WorkerUpdate {
1514            session_id: "worker-session-1".into(),
1515            worker_id: "worker_42".into(),
1516            worker_name: "review_captain".into(),
1517            worker_task: "review pr".into(),
1518            worker_mode: "delegated_stage".into(),
1519            event: WorkerEvent::WorkerWaitingForInput,
1520            status: WorkerEvent::WorkerWaitingForInput.as_status().to_string(),
1521            metadata: serde_json::json!({"awaiting_started_at": "0193..."}),
1522            audit: None,
1523        });
1524        // Other sessions don't receive cross-talk.
1525        emit_event(&AgentEvent::WorkerUpdate {
1526            session_id: "other-session".into(),
1527            worker_id: "w2".into(),
1528            worker_name: "n2".into(),
1529            worker_task: "t2".into(),
1530            worker_mode: "delegated_stage".into(),
1531            event: WorkerEvent::WorkerCompleted,
1532            status: "completed".into(),
1533            metadata: serde_json::json!({}),
1534            audit: None,
1535        });
1536        let received = captured.lock().unwrap().clone();
1537        assert_eq!(received.len(), 1, "got: {received:?}");
1538        match &received[0] {
1539            AgentEvent::WorkerUpdate {
1540                session_id,
1541                worker_id,
1542                event,
1543                status,
1544                ..
1545            } => {
1546                assert_eq!(session_id, "worker-session-1");
1547                assert_eq!(worker_id, "worker_42");
1548                assert_eq!(*event, WorkerEvent::WorkerWaitingForInput);
1549                assert_eq!(status, "awaiting_input");
1550            }
1551            other => panic!("expected WorkerUpdate, got {other:?}"),
1552        }
1553        reset_all_sinks();
1554    }
1555
1556    #[test]
1557    fn worker_update_event_serializes_to_canonical_shape() {
1558        // Persisted event-log entries flatten the AgentEvent envelope,
1559        // so the WorkerUpdate variant must serialize with a `type` of
1560        // `worker_update` and the worker fields directly on the
1561        // top-level object (matching the `#[serde(tag = "type", ...)]`
1562        // shape the rest of AgentEvent uses).
1563        let event = AgentEvent::WorkerUpdate {
1564            session_id: "s".into(),
1565            worker_id: "w".into(),
1566            worker_name: "n".into(),
1567            worker_task: "t".into(),
1568            worker_mode: "delegated_stage".into(),
1569            event: WorkerEvent::WorkerProgressed,
1570            status: "progressed".into(),
1571            metadata: serde_json::json!({"started_at": "0193..."}),
1572            audit: Some(serde_json::json!({"run_id": "run_x"})),
1573        };
1574        let value = serde_json::to_value(&event).unwrap();
1575        assert_eq!(value["type"], "worker_update");
1576        assert_eq!(value["session_id"], "s");
1577        assert_eq!(value["worker_id"], "w");
1578        assert_eq!(value["status"], "progressed");
1579        assert_eq!(value["audit"]["run_id"], "run_x");
1580
1581        // Round-trip: the persisted event log must deserialize the
1582        // canonical shape back into the typed variant so replay
1583        // tooling can re-derive lifecycle state offline.
1584        let recovered: AgentEvent = serde_json::from_value(value).unwrap();
1585        match recovered {
1586            AgentEvent::WorkerUpdate {
1587                event: recovered_event,
1588                ..
1589            } => assert_eq!(recovered_event, WorkerEvent::WorkerProgressed),
1590            other => panic!("expected WorkerUpdate, got {other:?}"),
1591        }
1592    }
1593
1594    #[test]
1595    fn tool_call_update_includes_executor_when_present() {
1596        let event = AgentEvent::ToolCallUpdate {
1597            session_id: "s".into(),
1598            tool_call_id: "tc-1".into(),
1599            tool_name: "read".into(),
1600            status: ToolCallStatus::Completed,
1601            raw_output: None,
1602            error: None,
1603            duration_ms: None,
1604            execution_duration_ms: None,
1605            error_category: None,
1606            executor: Some(ToolExecutor::McpServer {
1607                server_name: "github".into(),
1608            }),
1609            parsing: None,
1610
1611            raw_input: None,
1612            raw_input_partial: None,
1613            audit: None,
1614        };
1615        let json = serde_json::to_value(&event).unwrap();
1616        assert_eq!(json["executor"]["kind"], "mcp_server");
1617        assert_eq!(json["executor"]["server_name"], "github");
1618    }
1619
1620    #[test]
1621    fn tool_call_update_omits_audit_when_absent() {
1622        let event = AgentEvent::ToolCallUpdate {
1623            session_id: "s".into(),
1624            tool_call_id: "tc-1".into(),
1625            tool_name: "read".into(),
1626            status: ToolCallStatus::Completed,
1627            raw_output: None,
1628            error: None,
1629            duration_ms: None,
1630            execution_duration_ms: None,
1631            error_category: None,
1632            executor: None,
1633            parsing: None,
1634            raw_input: None,
1635            raw_input_partial: None,
1636            audit: None,
1637        };
1638        let json = serde_json::to_value(&event).unwrap();
1639        assert!(json.get("audit").is_none(), "got: {json}");
1640    }
1641
1642    #[test]
1643    fn tool_call_update_includes_audit_when_present() {
1644        let audit = MutationSessionRecord {
1645            session_id: "session_42".into(),
1646            run_id: Some("run_42".into()),
1647            mutation_scope: "apply_workspace".into(),
1648            execution_kind: Some("worker".into()),
1649            ..Default::default()
1650        };
1651        let event = AgentEvent::ToolCallUpdate {
1652            session_id: "s".into(),
1653            tool_call_id: "tc-1".into(),
1654            tool_name: "edit_file".into(),
1655            status: ToolCallStatus::Completed,
1656            raw_output: None,
1657            error: None,
1658            duration_ms: None,
1659            execution_duration_ms: None,
1660            error_category: None,
1661            executor: Some(ToolExecutor::HostBridge),
1662            parsing: None,
1663            raw_input: None,
1664            raw_input_partial: None,
1665            audit: Some(audit),
1666        };
1667        let json = serde_json::to_value(&event).unwrap();
1668        assert_eq!(json["audit"]["session_id"], "session_42");
1669        assert_eq!(json["audit"]["run_id"], "run_42");
1670        assert_eq!(json["audit"]["mutation_scope"], "apply_workspace");
1671        assert_eq!(json["audit"]["execution_kind"], "worker");
1672    }
1673
1674    #[test]
1675    fn tool_call_update_deserializes_without_audit_field_for_back_compat() {
1676        let raw = serde_json::json!({
1677            "type": "tool_call_update",
1678            "session_id": "s",
1679            "tool_call_id": "tc-1",
1680            "tool_name": "read",
1681            "status": "completed",
1682            "raw_output": null,
1683            "error": null,
1684        });
1685        let event: AgentEvent = serde_json::from_value(raw).expect("parses without audit key");
1686        match event {
1687            AgentEvent::ToolCallUpdate { audit, .. } => {
1688                assert!(audit.is_none());
1689            }
1690            other => panic!("expected ToolCallUpdate, got {other:?}"),
1691        }
1692    }
1693
1694    #[test]
1695    fn tool_call_audit_serializes_with_free_form_audit_payload() {
1696        // Middleware-attached metadata is intentionally free-form JSON
1697        // (A2A-style `metadata` extension slot). The wire format must
1698        // preserve nested dicts + lists verbatim so hosts can read
1699        // `summary`/`consent`/`layers`/etc. without per-field schema.
1700        let audit = serde_json::json!({
1701            "summary": "Searched codebase",
1702            "kind": "search",
1703            "consent": {"decision": "approved", "decided_by": "auto"},
1704            "layers": [{"name": "with_required_reason", "status": "ok"}],
1705        });
1706        let event = AgentEvent::ToolCallAudit {
1707            session_id: "s".into(),
1708            tool_call_id: "tc-1".into(),
1709            tool_name: "search_files".into(),
1710            audit: audit.clone(),
1711        };
1712        let json = serde_json::to_value(&event).unwrap();
1713        assert_eq!(json["type"], "tool_call_audit");
1714        assert_eq!(json["session_id"], "s");
1715        assert_eq!(json["tool_call_id"], "tc-1");
1716        assert_eq!(json["tool_name"], "search_files");
1717        assert_eq!(json["audit"], audit);
1718    }
1719
1720    #[test]
1721    fn tool_call_audit_session_id_routes_correctly() {
1722        let event = AgentEvent::ToolCallAudit {
1723            session_id: "abc".into(),
1724            tool_call_id: "tc".into(),
1725            tool_name: "read".into(),
1726            audit: serde_json::Value::Null,
1727        };
1728        assert_eq!(event.session_id(), "abc");
1729    }
1730}