Skip to main content

harn_vm/
agent_events.rs

1//! Agent event stream — the ACP-aligned observation surface for the
2//! agent loop.
3//!
4//! Every phase of the turn loop emits an `AgentEvent`. The canonical
5//! variants map 1:1 onto ACP `SessionUpdate` values; three internal
6//! variants (`TurnStart`, `TurnEnd`, `FeedbackInjected`) let pipelines
7//! react to loop milestones that don't have a direct ACP counterpart.
8//!
9//! There are two subscription paths, both keyed on session id so two
10//! concurrent sessions never cross-talk:
11//!
12//! 1. **External sinks** (`AgentEventSink` trait) — Rust-side consumers
13//!    like the harn-cli ACP server. Invoked synchronously by the loop.
14//!    Stored in a global `OnceLock<RwLock<HashMap<...>>>` here.
15//! 2. **Closure subscribers** — `.harn` closures registered via the
16//!    `agent_subscribe(session_id, callback)` host builtin. These live
17//!    on the session's `SessionState.subscribers` in
18//!    `crate::agent_sessions`, because sessions are the single source
19//!    of truth for session-scoped VM state.
20
21use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23
24use serde::{Deserialize, Serialize};
25
26use crate::event_log::{AnyEventLog, EventLog, LogEvent as EventLogRecord, Topic};
27use crate::orchestration::{HandoffArtifact, MutationSessionRecord};
28use crate::tool_annotations::ToolKind;
29
30/// One coalesced filesystem notification from a hostlib `fs_watch`
31/// subscription.
32#[derive(Clone, Debug, Serialize, Deserialize)]
33pub struct FsWatchEvent {
34    pub kind: String,
35    pub paths: Vec<String>,
36    pub relative_paths: Vec<String>,
37    pub raw_kind: String,
38    pub error: Option<String>,
39}
40
41/// Typed worker lifecycle events emitted by delegated/background agent
42/// execution. Bridge-facing worker updates still derive a string status
43/// from these variants, but the runtime no longer passes raw status
44/// strings around internally.
45///
46/// `Spawned`/`Completed`/`Failed`/`Cancelled` are the four terminal-or-start
47/// states. `Progressed` is fired on intermediate milestones (e.g. a
48/// retriggerable worker resuming from `awaiting_input`, or a workflow
49/// stage completing without ending the worker). `WaitingForInput` covers
50/// retriggerable workers that finish a cycle but stay alive pending the
51/// next host-supplied trigger payload.
52#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
53pub enum WorkerEvent {
54    WorkerSpawned,
55    WorkerProgressed,
56    WorkerWaitingForInput,
57    WorkerCompleted,
58    WorkerFailed,
59    WorkerCancelled,
60}
61
62impl WorkerEvent {
63    /// Wire-level status string used by bridge `worker_update` payloads
64    /// and ACP `worker_update` session updates. The four canonical
65    /// states are mirrored from harn's internal worker `status` field
66    /// (`running`/`completed`/`failed`/`cancelled`), and the two newer
67    /// lifecycle states pick names that don't collide with any existing
68    /// status string.
69    pub fn as_status(self) -> &'static str {
70        match self {
71            Self::WorkerSpawned => "running",
72            Self::WorkerProgressed => "progressed",
73            Self::WorkerWaitingForInput => "awaiting_input",
74            Self::WorkerCompleted => "completed",
75            Self::WorkerFailed => "failed",
76            Self::WorkerCancelled => "cancelled",
77        }
78    }
79
80    pub fn as_str(self) -> &'static str {
81        match self {
82            Self::WorkerSpawned => "WorkerSpawned",
83            Self::WorkerProgressed => "WorkerProgressed",
84            Self::WorkerWaitingForInput => "WorkerWaitingForInput",
85            Self::WorkerCompleted => "WorkerCompleted",
86            Self::WorkerFailed => "WorkerFailed",
87            Self::WorkerCancelled => "WorkerCancelled",
88        }
89    }
90
91    /// True for lifecycle events that mean the worker has reached a
92    /// final, non-resumable state. Retriggerable awaiting and
93    /// progressed milestones are *not* terminal — the worker keeps
94    /// running or is waiting for a trigger.
95    pub fn is_terminal(self) -> bool {
96        matches!(
97            self,
98            Self::WorkerCompleted | Self::WorkerFailed | Self::WorkerCancelled
99        )
100    }
101}
102
103/// Status of a tool call. Mirrors ACP's `toolCallStatus`.
104#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
105#[serde(rename_all = "snake_case")]
106pub enum ToolCallStatus {
107    /// Dispatched by the model but not yet started.
108    Pending,
109    /// Dispatch is actively running.
110    InProgress,
111    /// Finished successfully.
112    Completed,
113    /// Finished with an error.
114    Failed,
115}
116
117/// Wire-level classification of a `ToolCallUpdate` failure. Pairs with the
118/// human-readable `error` string so clients can render each failure type
119/// distinctly (e.g. surface a "permission denied" badge, or a different
120/// retry affordance for `network` vs `tool_error`). The enum is
121/// deliberately extensible — `unknown` is the default when the runtime
122/// could not classify a failure.
123#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
124#[serde(rename_all = "snake_case")]
125pub enum ToolCallErrorCategory {
126    /// Host-side validation rejected the args (missing required field,
127    /// invalid type, malformed JSON).
128    SchemaValidation,
129    /// The tool ran and returned an error result (e.g. `read_file` on a
130    /// missing path) — distinguished from a transport failure.
131    ToolError,
132    /// MCP transport / server-protocol error.
133    McpServerError,
134    /// Burin Swift host bridge returned an error during dispatch.
135    HostBridgeError,
136    /// `session/request_permission` denied by the client, or a policy
137    /// rule (static or dynamic) refused the call.
138    PermissionDenied,
139    /// The harn loop detector skipped this call because the same
140    /// (tool, args) pair repeated past the configured threshold.
141    RejectedLoop,
142    /// Streaming text candidate was detected (bare `name(` or
143    /// `<tool_call>` opener) but never resolved into a parseable call:
144    /// args parsed as malformed, the heredoc body broke, the tag closed
145    /// without a balanced expression, or the stream ended mid-call.
146    /// Used by the streaming candidate detector (harn#692) to retract a
147    /// `tool_call` candidate that turned out to be prose or syntactically
148    /// broken so clients can dismiss the in-flight chip.
149    ParseAborted,
150    /// The tool exceeded its time budget.
151    Timeout,
152    /// Transient network / rate-limited / 5xx provider failure.
153    Network,
154    /// The tool was cancelled (e.g. session aborted).
155    Cancelled,
156    /// Default when classification was not performed.
157    Unknown,
158}
159
160impl ToolCallErrorCategory {
161    pub fn as_str(self) -> &'static str {
162        match self {
163            Self::SchemaValidation => "schema_validation",
164            Self::ToolError => "tool_error",
165            Self::McpServerError => "mcp_server_error",
166            Self::HostBridgeError => "host_bridge_error",
167            Self::PermissionDenied => "permission_denied",
168            Self::RejectedLoop => "rejected_loop",
169            Self::ParseAborted => "parse_aborted",
170            Self::Timeout => "timeout",
171            Self::Network => "network",
172            Self::Cancelled => "cancelled",
173            Self::Unknown => "unknown",
174        }
175    }
176
177    /// Map an internal `ErrorCategory` (used by the VM's `VmError`
178    /// classification) onto the wire enum. The internal taxonomy is
179    /// finer-grained — several transient categories collapse onto
180    /// `Network`, and the auth/quota family becomes `HostBridgeError`
181    /// because at the tool-dispatch boundary those errors come from
182    /// the bridge transport rather than the tool itself.
183    pub fn from_internal(category: &crate::value::ErrorCategory) -> Self {
184        use crate::value::ErrorCategory as Internal;
185        match category {
186            Internal::Timeout => Self::Timeout,
187            Internal::RateLimit
188            | Internal::Overloaded
189            | Internal::ServerError
190            | Internal::TransientNetwork => Self::Network,
191            Internal::SchemaValidation => Self::SchemaValidation,
192            Internal::ToolError => Self::ToolError,
193            Internal::ToolRejected => Self::PermissionDenied,
194            Internal::Cancelled => Self::Cancelled,
195            Internal::Auth
196            | Internal::EgressBlocked
197            | Internal::NotFound
198            | Internal::CircuitOpen
199            | Internal::BudgetExceeded
200            | Internal::Generic => Self::HostBridgeError,
201        }
202    }
203}
204
205/// Where a tool actually ran. Tags `ToolCallUpdate` so clients can render
206/// "via mcp:linear" / "via host bridge" badges, attribute latency by
207/// transport, and route errors to the right surface (harn#691).
208///
209/// On the wire this serializes adjacently-tagged so the `mcp_server`
210/// case carries the configured server name. The ACP adapter rewrites
211/// unit variants as bare strings (`"harn_builtin"`, `"host_bridge"`,
212/// `"provider_native"`) and the `McpServer` case as
213/// `{"kind": "mcp_server", "serverName": "..."}` to match the protocol's
214/// camelCase convention.
215#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
216#[serde(tag = "kind", rename_all = "snake_case")]
217pub enum ToolExecutor {
218    /// VM-stdlib (`read_file`, `write_file`, `exec`, `http_*`, `mcp_*`)
219    /// or any Harn-side handler closure registered in `tools_val`.
220    HarnBuiltin,
221    /// Capability provided by the host through `HostBridge.builtin_call`
222    /// (Swift-side IDE bridge, BurinApp, BurinCLI host shells).
223    HostBridge,
224    /// Tool dispatched against a configured MCP server. Detected by the
225    /// `_mcp_server` tag that `mcp_list_tools` injects on every tool
226    /// dict before the agent loop sees it.
227    McpServer { server_name: String },
228    /// Provider-side server-side tool execution — currently OpenAI
229    /// Responses-API server tools (e.g. native `tool_search`). The
230    /// runtime never dispatches these locally; the model returns the
231    /// already-executed result inline.
232    ProviderNative,
233}
234
235/// Events emitted by the agent loop. The first five variants map 1:1
236/// to ACP `sessionUpdate` variants; the last three are harn-internal.
237#[derive(Clone, Debug, Serialize, Deserialize)]
238#[serde(tag = "type", rename_all = "snake_case")]
239pub enum AgentEvent {
240    AgentMessageChunk {
241        session_id: String,
242        content: String,
243    },
244    AgentThoughtChunk {
245        session_id: String,
246        content: String,
247    },
248    ToolCall {
249        session_id: String,
250        tool_call_id: String,
251        tool_name: String,
252        kind: Option<ToolKind>,
253        status: ToolCallStatus,
254        raw_input: serde_json::Value,
255        /// Set to `Some(true)` by the streaming candidate detector
256        /// (harn#692) when this event represents a tool-call shape
257        /// detected in the model's in-flight assistant text but whose
258        /// arguments have not finished parsing yet. Clients can render a
259        /// spinner / placeholder while the model writes the body. The
260        /// detector follows up with a `ToolCallUpdate { parsing: false,
261        /// .. }` carrying either `status: pending` (promoted) or
262        /// `status: failed` with `error_category: parse_aborted`.
263        /// `None` (the default) means "this is a normal post-parse tool
264        /// call, no candidate phase was active" so the on-disk shape
265        /// stays compatible with replays recorded before this field
266        /// existed.
267        #[serde(default, skip_serializing_if = "Option::is_none")]
268        parsing: Option<bool>,
269        /// Mutation-session audit context active when the tool was
270        /// dispatched (see harn#699). Hosts use it to group every tool
271        /// emission belonging to the same write-capable session.
272        #[serde(default, skip_serializing_if = "Option::is_none")]
273        audit: Option<MutationSessionRecord>,
274    },
275    ToolCallUpdate {
276        session_id: String,
277        tool_call_id: String,
278        tool_name: String,
279        status: ToolCallStatus,
280        raw_output: Option<serde_json::Value>,
281        error: Option<String>,
282        /// Wall-clock milliseconds from the parse-to-execution boundary
283        /// to the terminal `Completed`/`Failed` update. Includes the
284        /// time spent in any wrapping orchestration logic (loop checks,
285        /// post-tool hooks, microcompaction). Populated only on the
286        /// terminal update — `None` on intermediate `Pending` /
287        /// `InProgress` updates so clients can ignore the field until
288        /// it shows up.
289        #[serde(default, skip_serializing_if = "Option::is_none")]
290        duration_ms: Option<u64>,
291        /// Milliseconds spent in the actual host/builtin/MCP dispatch
292        /// call only (the inner `dispatch_tool_execution` window).
293        /// Populated only on the terminal update; `None` otherwise.
294        #[serde(default, skip_serializing_if = "Option::is_none")]
295        execution_duration_ms: Option<u64>,
296        /// Structured classification of the failure (when `status` is
297        /// `Failed`). Paired with `error` so clients can render each
298        /// category distinctly without parsing free-form strings. Always
299        /// `None` for non-Failed updates and serialized as
300        /// `errorCategory` in the ACP wire format.
301        #[serde(default, skip_serializing_if = "Option::is_none")]
302        error_category: Option<ToolCallErrorCategory>,
303        /// Where the tool actually ran. `None` only for events emitted
304        /// from sites that pre-date the dispatch decision (e.g. the
305        /// pending → in-progress transition the loop emits before the
306        /// dispatcher picks a backend).
307        #[serde(default, skip_serializing_if = "Option::is_none")]
308        executor: Option<ToolExecutor>,
309        /// Companion to `ToolCall.parsing` (harn#692). The streaming
310        /// candidate detector emits the *terminal* candidate event as a
311        /// `ToolCallUpdate` with `parsing: Some(false)` to retract the
312        /// in-flight `parsing: true` chip — either by promoting the
313        /// candidate (`status: pending`, populated `raw_output: None`,
314        /// `error: None`) or aborting it (`status: failed`,
315        /// `error_category: parse_aborted`). `None` means this update is
316        /// not part of a candidate-phase transition.
317        #[serde(default, skip_serializing_if = "Option::is_none")]
318        parsing: Option<bool>,
319        /// Best-effort partial parse of the streamed tool-call arguments.
320        /// Populated by the SSE transport on `Pending` updates as the
321        /// model streams `input_json_delta` (Anthropic) or
322        /// `tool_calls[].function.arguments` deltas (OpenAI). `None` on
323        /// terminal updates and on emissions from non-streaming paths
324        /// (#693). When the partial bytes are not yet parseable as JSON
325        /// the transport falls back to `raw_input_partial`.
326        #[serde(default, skip_serializing_if = "Option::is_none")]
327        raw_input: Option<serde_json::Value>,
328        /// Raw concatenated bytes of the streamed tool-call arguments
329        /// when a permissive parse failed (#693). Mutually exclusive
330        /// with `raw_input`: clients render whichever is present.
331        #[serde(default, skip_serializing_if = "Option::is_none")]
332        raw_input_partial: Option<String>,
333        /// Mutation-session audit context for the tool call. Carries the
334        /// same payload as on the paired `ToolCall` event so a host
335        /// processing a single update doesn't have to correlate against
336        /// the prior pending event.
337        #[serde(default, skip_serializing_if = "Option::is_none")]
338        audit: Option<MutationSessionRecord>,
339    },
340    Plan {
341        session_id: String,
342        plan: serde_json::Value,
343    },
344    TurnStart {
345        session_id: String,
346        iteration: usize,
347    },
348    TurnEnd {
349        session_id: String,
350        iteration: usize,
351        turn_info: serde_json::Value,
352    },
353    JudgeDecision {
354        session_id: String,
355        iteration: usize,
356        verdict: String,
357        reasoning: String,
358        next_step: Option<String>,
359        judge_duration_ms: u64,
360    },
361    TypedCheckpoint {
362        session_id: String,
363        checkpoint: serde_json::Value,
364    },
365    FeedbackInjected {
366        session_id: String,
367        kind: String,
368        content: String,
369    },
370    /// Emitted when the agent loop exhausts `max_iterations` without any
371    /// explicit break condition firing. Distinct from a natural "done" or
372    /// a "stuck" nudge-exhaustion: this is strictly a budget cap.
373    BudgetExhausted {
374        session_id: String,
375        max_iterations: usize,
376    },
377    /// Emitted when the loop breaks because consecutive text-only turns
378    /// hit `max_nudges`. Parity with `BudgetExhausted` / `TurnEnd` for
379    /// hosts that key off agent-terminal events.
380    LoopStuck {
381        session_id: String,
382        max_nudges: usize,
383        last_iteration: usize,
384        tail_excerpt: String,
385    },
386    /// Emitted when the daemon idle-wait loop trips its watchdog because
387    /// every configured wake source returned `None` for N consecutive
388    /// attempts. Exists so a broken daemon doesn't hang the session
389    /// silently.
390    DaemonWatchdogTripped {
391        session_id: String,
392        attempts: usize,
393        elapsed_ms: u64,
394    },
395    /// Emitted when a skill is activated. Carries the match reason so
396    /// replayers can reconstruct *why* a given skill took effect at
397    /// this iteration.
398    SkillActivated {
399        session_id: String,
400        skill_name: String,
401        iteration: usize,
402        reason: String,
403    },
404    /// Emitted when a previously-active skill is deactivated because
405    /// the reassess phase no longer matches it.
406    SkillDeactivated {
407        session_id: String,
408        skill_name: String,
409        iteration: usize,
410    },
411    /// Emitted once per activation when the skill's `allowed_tools` filter
412    /// narrows the effective tool surface exposed to the model.
413    SkillScopeTools {
414        session_id: String,
415        skill_name: String,
416        allowed_tools: Vec<String>,
417    },
418    /// Emitted when a `tool_search` query is issued by the model. Carries
419    /// the raw query args, the configured strategy, and a `mode` tag
420    /// distinguishing the client-executed fallback (`"client"`) from
421    /// provider-native paths (`"anthropic"` / `"openai"`). Mirrors the
422    /// transcript event shape so hosts can render a search-in-progress
423    /// chip in real time — the replay path walks the transcript after
424    /// the turn, which is too late for live UX.
425    ToolSearchQuery {
426        session_id: String,
427        tool_use_id: String,
428        name: String,
429        query: serde_json::Value,
430        strategy: String,
431        mode: String,
432    },
433    /// Emitted when `tool_search` resolves — carries the list of tool
434    /// names newly promoted into the model's effective surface for the
435    /// next turn. Pair-emitted with `ToolSearchQuery` on every search.
436    ToolSearchResult {
437        session_id: String,
438        tool_use_id: String,
439        promoted: Vec<String>,
440        strategy: String,
441        mode: String,
442    },
443    TranscriptCompacted {
444        session_id: String,
445        mode: String,
446        strategy: String,
447        archived_messages: usize,
448        estimated_tokens_before: usize,
449        estimated_tokens_after: usize,
450        snapshot_asset_id: Option<String>,
451    },
452    Handoff {
453        session_id: String,
454        artifact_id: String,
455        handoff: Box<HandoffArtifact>,
456    },
457    FsWatch {
458        session_id: String,
459        subscription_id: String,
460        events: Vec<FsWatchEvent>,
461    },
462    /// Lifecycle update for a delegated/background worker. Carries the
463    /// canonical typed `event` variant alongside the worker's current
464    /// `status` string and the structured `metadata` payload that
465    /// `worker_bridge_metadata` builds (task, mode, timing, child
466    /// run/snapshot paths, audit-session, etc.). The `audit` field is
467    /// the same `MutationSessionRecord` JSON serialization carried on
468    /// the bridge wire so ACP/A2A consumers don't need to re-derive it.
469    ///
470    /// One-to-one with the bridge-side `worker_update` session-update
471    /// notification: ACP and A2A adapters subscribe to this variant
472    /// and translate it into their respective wire formats. The
473    /// `session_id` is the parent agent session that owns the worker
474    /// (i.e. the session whose VM spawned the worker), so a single
475    /// host stays subscribed to the same sink for both message and
476    /// worker traffic.
477    WorkerUpdate {
478        session_id: String,
479        worker_id: String,
480        worker_name: String,
481        worker_task: String,
482        worker_mode: String,
483        event: WorkerEvent,
484        status: String,
485        metadata: serde_json::Value,
486        audit: Option<serde_json::Value>,
487    },
488    /// A human-in-the-loop primitive (`ask_user`, `request_approval`,
489    /// `dual_control`, `escalate`) has just suspended the script and is
490    /// waiting on a response. Hosts that bridge the VM onto a remote
491    /// transport (ACP, A2A) translate this into a "paused / awaiting
492    /// input" wire signal so the client knows the task isn't stuck —
493    /// it's blocked on the human side. Pair-emitted with `HitlResolved`
494    /// when the waitpoint completes/cancels/times out.
495    HitlRequested {
496        session_id: String,
497        request_id: String,
498        kind: String,
499        payload: serde_json::Value,
500    },
501    /// Companion to `HitlRequested`: the waitpoint has resolved (either
502    /// a response arrived, the deadline elapsed, or the request was
503    /// cancelled). `outcome` is one of `"answered"`, `"timeout"`,
504    /// `"cancelled"`. Hosts use this to flip task state back to
505    /// `working` after an `input-required` pause.
506    HitlResolved {
507        session_id: String,
508        request_id: String,
509        kind: String,
510        outcome: String,
511    },
512}
513
514impl AgentEvent {
515    pub fn session_id(&self) -> &str {
516        match self {
517            Self::AgentMessageChunk { session_id, .. }
518            | Self::AgentThoughtChunk { session_id, .. }
519            | Self::ToolCall { session_id, .. }
520            | Self::ToolCallUpdate { session_id, .. }
521            | Self::Plan { session_id, .. }
522            | Self::TurnStart { session_id, .. }
523            | Self::TurnEnd { session_id, .. }
524            | Self::JudgeDecision { session_id, .. }
525            | Self::TypedCheckpoint { session_id, .. }
526            | Self::FeedbackInjected { session_id, .. }
527            | Self::BudgetExhausted { session_id, .. }
528            | Self::LoopStuck { session_id, .. }
529            | Self::DaemonWatchdogTripped { session_id, .. }
530            | Self::SkillActivated { session_id, .. }
531            | Self::SkillDeactivated { session_id, .. }
532            | Self::SkillScopeTools { session_id, .. }
533            | Self::ToolSearchQuery { session_id, .. }
534            | Self::ToolSearchResult { session_id, .. }
535            | Self::TranscriptCompacted { session_id, .. }
536            | Self::Handoff { session_id, .. }
537            | Self::FsWatch { session_id, .. }
538            | Self::WorkerUpdate { session_id, .. }
539            | Self::HitlRequested { session_id, .. }
540            | Self::HitlResolved { session_id, .. } => session_id,
541        }
542    }
543}
544
545/// External consumers of the event stream (e.g. the harn-cli ACP server,
546/// which translates events into JSON-RPC notifications).
547pub trait AgentEventSink: Send + Sync {
548    fn handle_event(&self, event: &AgentEvent);
549}
550
551/// Envelope written to `event_log.jsonl` (#103). Wraps the raw
552/// `AgentEvent` with monotonic index + timestamp + frame depth so
553/// replay engines can reconstruct paused state at any event index,
554/// and scrubber UIs can bucket events by time. The envelope is the
555/// on-disk shape; the wire format for live consumers is still the
556/// raw `AgentEvent` so existing sinks don't churn.
557#[derive(Clone, Debug, Serialize, Deserialize)]
558pub struct PersistedAgentEvent {
559    /// Monotonic per-session index starting at 0. Unique within a
560    /// session; gaps never happen even under load because the sink
561    /// owns the counter under a mutex.
562    pub index: u64,
563    /// Milliseconds since the Unix epoch, captured when the sink
564    /// received the event. Not the event's emission time — that
565    /// would require threading a clock through every emit site.
566    pub emitted_at_ms: i64,
567    /// Call-stack depth at the moment of emission, when the caller
568    /// can supply it. `None` for events emitted from a context where
569    /// the VM frame stack isn't available.
570    pub frame_depth: Option<u32>,
571    /// The raw event, flattened so `jq '.type'` works as expected.
572    #[serde(flatten)]
573    pub event: AgentEvent,
574}
575
576/// Append-only JSONL sink for a single session's event stream (#103).
577/// One writer per session; sinks rotate to a numbered suffix when a
578/// running file crosses `ROTATE_BYTES` (100 MB today — long chat
579/// sessions rarely exceed 5 MB, so rotation almost never fires).
580pub struct JsonlEventSink {
581    state: Mutex<JsonlEventSinkState>,
582    base_path: std::path::PathBuf,
583}
584
585struct JsonlEventSinkState {
586    writer: std::io::BufWriter<std::fs::File>,
587    index: u64,
588    bytes_written: u64,
589    rotation: u32,
590}
591
592impl JsonlEventSink {
593    /// Hard cap past which the current file rotates to a numbered
594    /// suffix (`event_log-000001.jsonl`). Chosen so long debugging
595    /// sessions don't produce unreadable multi-GB logs.
596    pub const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
597
598    /// Open a new sink writing to `base_path`. Creates parent dirs
599    /// if missing. Overwrites an existing file so each fresh session
600    /// starts from index 0.
601    pub fn open(base_path: impl Into<std::path::PathBuf>) -> std::io::Result<Arc<Self>> {
602        let base_path = base_path.into();
603        if let Some(parent) = base_path.parent() {
604            std::fs::create_dir_all(parent)?;
605        }
606        let file = std::fs::OpenOptions::new()
607            .create(true)
608            .truncate(true)
609            .write(true)
610            .open(&base_path)?;
611        Ok(Arc::new(Self {
612            state: Mutex::new(JsonlEventSinkState {
613                writer: std::io::BufWriter::new(file),
614                index: 0,
615                bytes_written: 0,
616                rotation: 0,
617            }),
618            base_path,
619        }))
620    }
621
622    /// Flush any buffered writes. Called on session shutdown; the
623    /// Drop impl calls this too but on early panic it may not run.
624    pub fn flush(&self) -> std::io::Result<()> {
625        use std::io::Write as _;
626        self.state
627            .lock()
628            .expect("jsonl sink mutex poisoned")
629            .writer
630            .flush()
631    }
632
633    /// Current event index — primarily for tests and the "how many
634    /// events are in this run" run-record summary.
635    pub fn event_count(&self) -> u64 {
636        self.state.lock().expect("jsonl sink mutex poisoned").index
637    }
638
639    fn rotate_if_needed(&self, state: &mut JsonlEventSinkState) -> std::io::Result<()> {
640        use std::io::Write as _;
641        if state.bytes_written < Self::ROTATE_BYTES {
642            return Ok(());
643        }
644        state.writer.flush()?;
645        state.rotation += 1;
646        let suffix = format!("-{:06}", state.rotation);
647        let rotated = self.base_path.with_file_name({
648            let stem = self
649                .base_path
650                .file_stem()
651                .and_then(|s| s.to_str())
652                .unwrap_or("event_log");
653            let ext = self
654                .base_path
655                .extension()
656                .and_then(|e| e.to_str())
657                .unwrap_or("jsonl");
658            format!("{stem}{suffix}.{ext}")
659        });
660        let file = std::fs::OpenOptions::new()
661            .create(true)
662            .truncate(true)
663            .write(true)
664            .open(&rotated)?;
665        state.writer = std::io::BufWriter::new(file);
666        state.bytes_written = 0;
667        Ok(())
668    }
669}
670
671/// Event-log-backed sink for a single session's agent event stream.
672/// Uses the generalized append-only event log when one is installed for
673/// the current VM thread and falls back to `JsonlEventSink` only for
674/// older env-driven workflows.
675pub struct EventLogSink {
676    log: Arc<AnyEventLog>,
677    topic: Topic,
678    session_id: String,
679}
680
681impl EventLogSink {
682    pub fn new(log: Arc<AnyEventLog>, session_id: impl Into<String>) -> Arc<Self> {
683        let session_id = session_id.into();
684        let topic = Topic::new(format!(
685            "observability.agent_events.{}",
686            crate::event_log::sanitize_topic_component(&session_id)
687        ))
688        .expect("session id should sanitize to a valid topic");
689        Arc::new(Self {
690            log,
691            topic,
692            session_id,
693        })
694    }
695}
696
697impl AgentEventSink for JsonlEventSink {
698    fn handle_event(&self, event: &AgentEvent) {
699        use std::io::Write as _;
700        let mut state = self.state.lock().expect("jsonl sink mutex poisoned");
701        let index = state.index;
702        state.index += 1;
703        let emitted_at_ms = std::time::SystemTime::now()
704            .duration_since(std::time::UNIX_EPOCH)
705            .map(|d| d.as_millis() as i64)
706            .unwrap_or(0);
707        let envelope = PersistedAgentEvent {
708            index,
709            emitted_at_ms,
710            frame_depth: None,
711            event: event.clone(),
712        };
713        if let Ok(line) = serde_json::to_string(&envelope) {
714            // One line, newline-terminated — JSON Lines spec.
715            // Errors here are swallowed on purpose; a failing write
716            // must never crash the agent loop, and the run record
717            // itself is a secondary artifact.
718            let _ = state.writer.write_all(line.as_bytes());
719            let _ = state.writer.write_all(b"\n");
720            state.bytes_written += line.len() as u64 + 1;
721            let _ = self.rotate_if_needed(&mut state);
722        }
723    }
724}
725
726impl AgentEventSink for EventLogSink {
727    fn handle_event(&self, event: &AgentEvent) {
728        let event_json = match serde_json::to_value(event) {
729            Ok(value) => value,
730            Err(_) => return,
731        };
732        let event_kind = event_json
733            .get("type")
734            .and_then(|value| value.as_str())
735            .unwrap_or("agent_event")
736            .to_string();
737        let payload = serde_json::json!({
738            "index_hint": now_ms(),
739            "session_id": self.session_id,
740            "event": event_json,
741        });
742        let mut headers = std::collections::BTreeMap::new();
743        headers.insert("session_id".to_string(), self.session_id.clone());
744        let log = self.log.clone();
745        let topic = self.topic.clone();
746        let record = EventLogRecord::new(event_kind, payload).with_headers(headers);
747        if let Ok(handle) = tokio::runtime::Handle::try_current() {
748            handle.spawn(async move {
749                let _ = log.append(&topic, record).await;
750            });
751        } else {
752            let _ = futures::executor::block_on(log.append(&topic, record));
753        }
754    }
755}
756
757impl Drop for JsonlEventSink {
758    fn drop(&mut self) {
759        if let Ok(mut state) = self.state.lock() {
760            use std::io::Write as _;
761            let _ = state.writer.flush();
762        }
763    }
764}
765
766/// Fan-out helper for composing multiple external sinks.
767pub struct MultiSink {
768    sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
769}
770
771impl MultiSink {
772    pub fn new() -> Self {
773        Self {
774            sinks: Mutex::new(Vec::new()),
775        }
776    }
777    pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
778        self.sinks.lock().expect("sink mutex poisoned").push(sink);
779    }
780    pub fn len(&self) -> usize {
781        self.sinks.lock().expect("sink mutex poisoned").len()
782    }
783    pub fn is_empty(&self) -> bool {
784        self.len() == 0
785    }
786}
787
788impl Default for MultiSink {
789    fn default() -> Self {
790        Self::new()
791    }
792}
793
794impl AgentEventSink for MultiSink {
795    fn handle_event(&self, event: &AgentEvent) {
796        // Deliberate: snapshot then release the lock before invoking sink
797        // callbacks. Sinks can re-enter the event system (e.g. a host
798        // sink that logs to another AgentEvent path), so holding the
799        // mutex across the callback would risk self-deadlock. Arc clones
800        // are refcount bumps — cheap.
801        let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
802        for sink in sinks {
803            sink.handle_event(event);
804        }
805    }
806}
807
808#[cfg(test)]
809#[derive(Clone)]
810struct RegisteredSink {
811    owner: std::thread::ThreadId,
812    sink: Arc<dyn AgentEventSink>,
813}
814
815#[cfg(not(test))]
816type RegisteredSink = Arc<dyn AgentEventSink>;
817
818type ExternalSinkRegistry = RwLock<HashMap<String, Vec<RegisteredSink>>>;
819
820fn external_sinks() -> &'static ExternalSinkRegistry {
821    static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
822    REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
823}
824
825pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
826    let session_id = session_id.into();
827    let mut reg = external_sinks().write().expect("sink registry poisoned");
828    #[cfg(test)]
829    let sink = RegisteredSink {
830        owner: std::thread::current().id(),
831        sink,
832    };
833    reg.entry(session_id).or_default().push(sink);
834}
835
836/// Remove all external sinks registered for `session_id`. Does NOT
837/// close the session itself — subscribers and transcript survive, so a
838/// later `agent_loop` call with the same id continues the conversation.
839pub fn clear_session_sinks(session_id: &str) {
840    #[cfg(test)]
841    {
842        let owner = std::thread::current().id();
843        let mut reg = external_sinks().write().expect("sink registry poisoned");
844        if let Some(sinks) = reg.get_mut(session_id) {
845            sinks.retain(|sink| sink.owner != owner);
846            if sinks.is_empty() {
847                reg.remove(session_id);
848            }
849        }
850    }
851    #[cfg(not(test))]
852    {
853        external_sinks()
854            .write()
855            .expect("sink registry poisoned")
856            .remove(session_id);
857    }
858}
859
860pub fn reset_all_sinks() {
861    #[cfg(test)]
862    {
863        let owner = std::thread::current().id();
864        let mut reg = external_sinks().write().expect("sink registry poisoned");
865        reg.retain(|_, sinks| {
866            sinks.retain(|sink| sink.owner != owner);
867            !sinks.is_empty()
868        });
869        crate::agent_sessions::reset_session_store();
870    }
871    #[cfg(not(test))]
872    {
873        external_sinks()
874            .write()
875            .expect("sink registry poisoned")
876            .clear();
877        crate::agent_sessions::reset_session_store();
878    }
879}
880
881/// Mirror externally-registered sinks from `source_session_id` onto
882/// `target_session_id` without moving ownership. Transports such as ACP
883/// register sinks on the outer prompt session before a script runs; scripts
884/// may then open a first-class agent transcript and route `agent_loop` events
885/// through that inner id. Mirroring keeps the transport subscribed to the
886/// in-run child transcript while preserving explicit session ids.
887pub fn mirror_session_sinks(source_session_id: &str, target_session_id: &str) {
888    if source_session_id.is_empty() || target_session_id.is_empty() {
889        return;
890    }
891    if source_session_id == target_session_id {
892        return;
893    }
894    let mut reg = external_sinks().write().expect("sink registry poisoned");
895    let Some(source_sinks) = reg.get(source_session_id).cloned() else {
896        return;
897    };
898    let target = reg.entry(target_session_id.to_string()).or_default();
899    #[cfg(test)]
900    {
901        for source in source_sinks {
902            let already_present = target
903                .iter()
904                .any(|existing| Arc::ptr_eq(&existing.sink, &source.sink));
905            if !already_present {
906                target.push(source);
907            }
908        }
909    }
910    #[cfg(not(test))]
911    {
912        for source in source_sinks {
913            let already_present = target.iter().any(|existing| Arc::ptr_eq(existing, &source));
914            if !already_present {
915                target.push(source);
916            }
917        }
918    }
919}
920
921/// Emit an event to external sinks registered for this session. Pipeline
922/// closure subscribers are NOT called by this function — the agent
923/// loop owns that path because it needs its async VM context.
924pub fn emit_event(event: &AgentEvent) {
925    let sinks: Vec<Arc<dyn AgentEventSink>> = {
926        let reg = external_sinks().read().expect("sink registry poisoned");
927        #[cfg(test)]
928        {
929            let owner = std::thread::current().id();
930            reg.get(event.session_id())
931                .map(|sinks| {
932                    sinks
933                        .iter()
934                        .filter(|sink| sink.owner == owner)
935                        .map(|sink| sink.sink.clone())
936                        .collect()
937                })
938                .unwrap_or_default()
939        }
940        #[cfg(not(test))]
941        {
942            reg.get(event.session_id()).cloned().unwrap_or_default()
943        }
944    };
945    for sink in sinks {
946        sink.handle_event(event);
947    }
948}
949
950fn now_ms() -> i64 {
951    std::time::SystemTime::now()
952        .duration_since(std::time::UNIX_EPOCH)
953        .map(|duration| duration.as_millis() as i64)
954        .unwrap_or(0)
955}
956
957pub fn session_external_sink_count(session_id: &str) -> usize {
958    #[cfg(test)]
959    {
960        let owner = std::thread::current().id();
961        return external_sinks()
962            .read()
963            .expect("sink registry poisoned")
964            .get(session_id)
965            .map(|sinks| sinks.iter().filter(|sink| sink.owner == owner).count())
966            .unwrap_or(0);
967    }
968    #[cfg(not(test))]
969    {
970        external_sinks()
971            .read()
972            .expect("sink registry poisoned")
973            .get(session_id)
974            .map(|v| v.len())
975            .unwrap_or(0)
976    }
977}
978
979pub fn session_closure_subscriber_count(session_id: &str) -> usize {
980    crate::agent_sessions::subscriber_count(session_id)
981}
982
983#[cfg(test)]
984mod tests {
985    use super::*;
986    use std::sync::atomic::{AtomicUsize, Ordering};
987
988    struct CountingSink(Arc<AtomicUsize>);
989    impl AgentEventSink for CountingSink {
990        fn handle_event(&self, _event: &AgentEvent) {
991            self.0.fetch_add(1, Ordering::SeqCst);
992        }
993    }
994
995    #[test]
996    fn multi_sink_fans_out_in_order() {
997        let multi = MultiSink::new();
998        let a = Arc::new(AtomicUsize::new(0));
999        let b = Arc::new(AtomicUsize::new(0));
1000        multi.push(Arc::new(CountingSink(a.clone())));
1001        multi.push(Arc::new(CountingSink(b.clone())));
1002        let event = AgentEvent::TurnStart {
1003            session_id: "s1".into(),
1004            iteration: 1,
1005        };
1006        multi.handle_event(&event);
1007        assert_eq!(a.load(Ordering::SeqCst), 1);
1008        assert_eq!(b.load(Ordering::SeqCst), 1);
1009    }
1010
1011    #[test]
1012    fn session_scoped_sink_routing() {
1013        reset_all_sinks();
1014        let a = Arc::new(AtomicUsize::new(0));
1015        let b = Arc::new(AtomicUsize::new(0));
1016        register_sink("session-a", Arc::new(CountingSink(a.clone())));
1017        register_sink("session-b", Arc::new(CountingSink(b.clone())));
1018        emit_event(&AgentEvent::TurnStart {
1019            session_id: "session-a".into(),
1020            iteration: 0,
1021        });
1022        assert_eq!(a.load(Ordering::SeqCst), 1);
1023        assert_eq!(b.load(Ordering::SeqCst), 0);
1024        emit_event(&AgentEvent::TurnEnd {
1025            session_id: "session-b".into(),
1026            iteration: 0,
1027            turn_info: serde_json::json!({}),
1028        });
1029        assert_eq!(a.load(Ordering::SeqCst), 1);
1030        assert_eq!(b.load(Ordering::SeqCst), 1);
1031        clear_session_sinks("session-a");
1032        assert_eq!(session_external_sink_count("session-a"), 0);
1033        assert_eq!(session_external_sink_count("session-b"), 1);
1034        reset_all_sinks();
1035    }
1036
1037    #[test]
1038    fn newly_opened_child_session_inherits_current_external_sinks() {
1039        reset_all_sinks();
1040        let delivered = Arc::new(AtomicUsize::new(0));
1041        register_sink("outer-session", Arc::new(CountingSink(delivered.clone())));
1042        {
1043            let _guard = crate::agent_sessions::enter_current_session("outer-session");
1044            let inner = crate::agent_sessions::open_or_create(None);
1045            assert_ne!(inner, "outer-session");
1046            emit_event(&AgentEvent::TurnStart {
1047                session_id: inner,
1048                iteration: 0,
1049            });
1050        }
1051        assert_eq!(delivered.load(Ordering::SeqCst), 1);
1052        reset_all_sinks();
1053    }
1054
1055    #[test]
1056    fn jsonl_sink_writes_monotonic_indices_and_timestamps() {
1057        use std::io::{BufRead, BufReader};
1058        let dir = std::env::temp_dir().join(format!("harn-event-log-{}", std::process::id()));
1059        std::fs::create_dir_all(&dir).unwrap();
1060        let path = dir.join("event_log.jsonl");
1061        let sink = JsonlEventSink::open(&path).unwrap();
1062        for i in 0..5 {
1063            sink.handle_event(&AgentEvent::TurnStart {
1064                session_id: "s".into(),
1065                iteration: i,
1066            });
1067        }
1068        assert_eq!(sink.event_count(), 5);
1069        sink.flush().unwrap();
1070
1071        // Read back + assert monotonic indices + non-decreasing timestamps.
1072        let file = std::fs::File::open(&path).unwrap();
1073        let mut last_idx: i64 = -1;
1074        let mut last_ts: i64 = 0;
1075        for line in BufReader::new(file).lines() {
1076            let line = line.unwrap();
1077            let val: serde_json::Value = serde_json::from_str(&line).unwrap();
1078            let idx = val["index"].as_i64().unwrap();
1079            let ts = val["emitted_at_ms"].as_i64().unwrap();
1080            assert_eq!(idx, last_idx + 1, "indices must be contiguous");
1081            assert!(ts >= last_ts, "timestamps must be non-decreasing");
1082            last_idx = idx;
1083            last_ts = ts;
1084            // Event payload flattened — type tag must survive.
1085            assert_eq!(val["type"], "turn_start");
1086        }
1087        assert_eq!(last_idx, 4);
1088        let _ = std::fs::remove_file(&path);
1089    }
1090
1091    #[test]
1092    fn judge_decision_round_trips_through_jsonl_sink() {
1093        use std::io::{BufRead, BufReader};
1094        let dir =
1095            std::env::temp_dir().join(format!("harn-judge-event-log-{}", uuid::Uuid::now_v7()));
1096        std::fs::create_dir_all(&dir).unwrap();
1097        let path = dir.join("event_log.jsonl");
1098        let sink = JsonlEventSink::open(&path).unwrap();
1099        sink.handle_event(&AgentEvent::JudgeDecision {
1100            session_id: "s".into(),
1101            iteration: 2,
1102            verdict: "continue".into(),
1103            reasoning: "needs a concrete next step".into(),
1104            next_step: Some("run the verifier".into()),
1105            judge_duration_ms: 17,
1106        });
1107        sink.flush().unwrap();
1108
1109        let file = std::fs::File::open(&path).unwrap();
1110        let line = BufReader::new(file).lines().next().unwrap().unwrap();
1111        let recovered: PersistedAgentEvent = serde_json::from_str(&line).unwrap();
1112        match recovered.event {
1113            AgentEvent::JudgeDecision {
1114                session_id,
1115                iteration,
1116                verdict,
1117                reasoning,
1118                next_step,
1119                judge_duration_ms,
1120            } => {
1121                assert_eq!(session_id, "s");
1122                assert_eq!(iteration, 2);
1123                assert_eq!(verdict, "continue");
1124                assert_eq!(reasoning, "needs a concrete next step");
1125                assert_eq!(next_step.as_deref(), Some("run the verifier"));
1126                assert_eq!(judge_duration_ms, 17);
1127            }
1128            other => panic!("expected JudgeDecision, got {other:?}"),
1129        }
1130        let value: serde_json::Value = serde_json::from_str(&line).unwrap();
1131        assert_eq!(value["type"], "judge_decision");
1132        let _ = std::fs::remove_file(&path);
1133        let _ = std::fs::remove_dir(&dir);
1134    }
1135
1136    #[test]
1137    fn tool_call_update_durations_serialize_when_present_and_skip_when_absent() {
1138        // Terminal update with both durations populated — both fields
1139        // appear in the JSON. Snake_case keys here because this is the
1140        // canonical AgentEvent shape; the ACP adapter renames to
1141        // camelCase separately.
1142        let terminal = AgentEvent::ToolCallUpdate {
1143            session_id: "s".into(),
1144            tool_call_id: "tc-1".into(),
1145            tool_name: "read".into(),
1146            status: ToolCallStatus::Completed,
1147            raw_output: None,
1148            error: None,
1149            duration_ms: Some(42),
1150            execution_duration_ms: Some(7),
1151            error_category: None,
1152            executor: None,
1153            parsing: None,
1154
1155            raw_input: None,
1156            raw_input_partial: None,
1157            audit: None,
1158        };
1159        let value = serde_json::to_value(&terminal).unwrap();
1160        assert_eq!(value["duration_ms"], serde_json::json!(42));
1161        assert_eq!(value["execution_duration_ms"], serde_json::json!(7));
1162
1163        // In-progress update with `None` for both — both keys must be
1164        // absent (not `null`) so older ACP clients that key off
1165        // presence don't see a misleading zero.
1166        let intermediate = AgentEvent::ToolCallUpdate {
1167            session_id: "s".into(),
1168            tool_call_id: "tc-1".into(),
1169            tool_name: "read".into(),
1170            status: ToolCallStatus::InProgress,
1171            raw_output: None,
1172            error: None,
1173            duration_ms: None,
1174            execution_duration_ms: None,
1175            error_category: None,
1176            executor: None,
1177            parsing: None,
1178
1179            raw_input: None,
1180            raw_input_partial: None,
1181            audit: None,
1182        };
1183        let value = serde_json::to_value(&intermediate).unwrap();
1184        let object = value.as_object().expect("update serializes as object");
1185        assert!(
1186            !object.contains_key("duration_ms"),
1187            "duration_ms must be omitted when None: {value}"
1188        );
1189        assert!(
1190            !object.contains_key("execution_duration_ms"),
1191            "execution_duration_ms must be omitted when None: {value}"
1192        );
1193    }
1194
1195    #[test]
1196    fn tool_call_update_deserializes_without_duration_fields_for_back_compat() {
1197        // Persisted event-log entries written before the fields existed
1198        // must still deserialize cleanly. The missing keys map to None.
1199        let raw = serde_json::json!({
1200            "type": "tool_call_update",
1201            "session_id": "s",
1202            "tool_call_id": "tc-1",
1203            "tool_name": "read",
1204            "status": "completed",
1205            "raw_output": null,
1206            "error": null,
1207        });
1208        let event: AgentEvent = serde_json::from_value(raw).expect("parses without duration keys");
1209        match event {
1210            AgentEvent::ToolCallUpdate {
1211                duration_ms,
1212                execution_duration_ms,
1213                ..
1214            } => {
1215                assert!(duration_ms.is_none());
1216                assert!(execution_duration_ms.is_none());
1217            }
1218            other => panic!("expected ToolCallUpdate, got {other:?}"),
1219        }
1220    }
1221
1222    #[test]
1223    fn tool_call_status_serde() {
1224        assert_eq!(
1225            serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
1226            "\"pending\""
1227        );
1228        assert_eq!(
1229            serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
1230            "\"in_progress\""
1231        );
1232        assert_eq!(
1233            serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
1234            "\"completed\""
1235        );
1236        assert_eq!(
1237            serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
1238            "\"failed\""
1239        );
1240    }
1241
1242    #[test]
1243    fn tool_call_error_category_serializes_as_snake_case() {
1244        let pairs = [
1245            (ToolCallErrorCategory::SchemaValidation, "schema_validation"),
1246            (ToolCallErrorCategory::ToolError, "tool_error"),
1247            (ToolCallErrorCategory::McpServerError, "mcp_server_error"),
1248            (ToolCallErrorCategory::HostBridgeError, "host_bridge_error"),
1249            (ToolCallErrorCategory::PermissionDenied, "permission_denied"),
1250            (ToolCallErrorCategory::RejectedLoop, "rejected_loop"),
1251            (ToolCallErrorCategory::ParseAborted, "parse_aborted"),
1252            (ToolCallErrorCategory::Timeout, "timeout"),
1253            (ToolCallErrorCategory::Network, "network"),
1254            (ToolCallErrorCategory::Cancelled, "cancelled"),
1255            (ToolCallErrorCategory::Unknown, "unknown"),
1256        ];
1257        for (variant, wire) in pairs {
1258            let encoded = serde_json::to_string(&variant).unwrap();
1259            assert_eq!(encoded, format!("\"{wire}\""));
1260            assert_eq!(variant.as_str(), wire);
1261            // Round-trip via deserialize so wire stability is enforced
1262            // both ways.
1263            let decoded: ToolCallErrorCategory = serde_json::from_str(&encoded).unwrap();
1264            assert_eq!(decoded, variant);
1265        }
1266    }
1267
1268    #[test]
1269    fn tool_executor_round_trips_with_adjacent_tag() {
1270        // Adjacent tagging keeps the wire shape uniform — every variant
1271        // is a JSON object with a `kind` discriminator. The ACP adapter
1272        // rewrites unit variants as bare strings; the on-disk event log
1273        // keeps the object shape so deserialize can recover the variant.
1274        for executor in [
1275            ToolExecutor::HarnBuiltin,
1276            ToolExecutor::HostBridge,
1277            ToolExecutor::McpServer {
1278                server_name: "linear".to_string(),
1279            },
1280            ToolExecutor::ProviderNative,
1281        ] {
1282            let json = serde_json::to_value(&executor).unwrap();
1283            let kind = json.get("kind").and_then(|v| v.as_str()).unwrap();
1284            match &executor {
1285                ToolExecutor::HarnBuiltin => assert_eq!(kind, "harn_builtin"),
1286                ToolExecutor::HostBridge => assert_eq!(kind, "host_bridge"),
1287                ToolExecutor::McpServer { server_name } => {
1288                    assert_eq!(kind, "mcp_server");
1289                    assert_eq!(json["server_name"], *server_name);
1290                }
1291                ToolExecutor::ProviderNative => assert_eq!(kind, "provider_native"),
1292            }
1293            let recovered: ToolExecutor = serde_json::from_value(json).unwrap();
1294            assert_eq!(recovered, executor);
1295        }
1296    }
1297
1298    #[test]
1299    fn tool_call_error_category_from_internal_collapses_transient_family() {
1300        use crate::value::ErrorCategory as Internal;
1301        assert_eq!(
1302            ToolCallErrorCategory::from_internal(&Internal::Timeout),
1303            ToolCallErrorCategory::Timeout
1304        );
1305        for net in [
1306            Internal::RateLimit,
1307            Internal::Overloaded,
1308            Internal::ServerError,
1309            Internal::TransientNetwork,
1310        ] {
1311            assert_eq!(
1312                ToolCallErrorCategory::from_internal(&net),
1313                ToolCallErrorCategory::Network,
1314                "{net:?} should map to Network",
1315            );
1316        }
1317        assert_eq!(
1318            ToolCallErrorCategory::from_internal(&Internal::SchemaValidation),
1319            ToolCallErrorCategory::SchemaValidation
1320        );
1321        assert_eq!(
1322            ToolCallErrorCategory::from_internal(&Internal::ToolError),
1323            ToolCallErrorCategory::ToolError
1324        );
1325        assert_eq!(
1326            ToolCallErrorCategory::from_internal(&Internal::ToolRejected),
1327            ToolCallErrorCategory::PermissionDenied
1328        );
1329        assert_eq!(
1330            ToolCallErrorCategory::from_internal(&Internal::Cancelled),
1331            ToolCallErrorCategory::Cancelled
1332        );
1333        for bridge in [
1334            Internal::Auth,
1335            Internal::EgressBlocked,
1336            Internal::NotFound,
1337            Internal::CircuitOpen,
1338            Internal::Generic,
1339        ] {
1340            assert_eq!(
1341                ToolCallErrorCategory::from_internal(&bridge),
1342                ToolCallErrorCategory::HostBridgeError,
1343                "{bridge:?} should map to HostBridgeError",
1344            );
1345        }
1346    }
1347
1348    #[test]
1349    fn tool_call_update_event_omits_error_category_when_none() {
1350        let event = AgentEvent::ToolCallUpdate {
1351            session_id: "s".into(),
1352            tool_call_id: "t".into(),
1353            tool_name: "read".into(),
1354            status: ToolCallStatus::Completed,
1355            raw_output: None,
1356            error: None,
1357            duration_ms: None,
1358            execution_duration_ms: None,
1359            error_category: None,
1360            executor: None,
1361            parsing: None,
1362
1363            raw_input: None,
1364            raw_input_partial: None,
1365            audit: None,
1366        };
1367        let v = serde_json::to_value(&event).unwrap();
1368        assert_eq!(v["type"], "tool_call_update");
1369        assert!(v.get("error_category").is_none());
1370    }
1371
1372    #[test]
1373    fn tool_call_update_event_serializes_error_category_when_set() {
1374        let event = AgentEvent::ToolCallUpdate {
1375            session_id: "s".into(),
1376            tool_call_id: "t".into(),
1377            tool_name: "read".into(),
1378            status: ToolCallStatus::Failed,
1379            raw_output: None,
1380            error: Some("missing required field".into()),
1381            duration_ms: None,
1382            execution_duration_ms: None,
1383            error_category: Some(ToolCallErrorCategory::SchemaValidation),
1384            executor: None,
1385            parsing: None,
1386
1387            raw_input: None,
1388            raw_input_partial: None,
1389            audit: None,
1390        };
1391        let v = serde_json::to_value(&event).unwrap();
1392        assert_eq!(v["error_category"], "schema_validation");
1393        assert_eq!(v["error"], "missing required field");
1394    }
1395
1396    #[test]
1397    fn tool_call_update_omits_executor_when_absent() {
1398        // `executor: None` must not appear in the serialized event so
1399        // the on-disk shape stays backward-compatible with replays
1400        // recorded before harn#691.
1401        let event = AgentEvent::ToolCallUpdate {
1402            session_id: "s".into(),
1403            tool_call_id: "tc-1".into(),
1404            tool_name: "read".into(),
1405            status: ToolCallStatus::Completed,
1406            raw_output: None,
1407            error: None,
1408            duration_ms: None,
1409            execution_duration_ms: None,
1410            error_category: None,
1411            executor: None,
1412            parsing: None,
1413
1414            raw_input: None,
1415            raw_input_partial: None,
1416            audit: None,
1417        };
1418        let json = serde_json::to_value(&event).unwrap();
1419        assert!(json.get("executor").is_none(), "got: {json}");
1420    }
1421
1422    #[test]
1423    fn worker_event_status_strings_cover_all_variants() {
1424        // Wire-level status strings flow into both bridge `worker_update`
1425        // payloads and ACP `session/update` notifications. Pinning every
1426        // variant here so a future addition can't silently land without
1427        // a docs/wire decision.
1428        assert_eq!(WorkerEvent::WorkerSpawned.as_status(), "running");
1429        assert_eq!(WorkerEvent::WorkerProgressed.as_status(), "progressed");
1430        assert_eq!(
1431            WorkerEvent::WorkerWaitingForInput.as_status(),
1432            "awaiting_input"
1433        );
1434        assert_eq!(WorkerEvent::WorkerCompleted.as_status(), "completed");
1435        assert_eq!(WorkerEvent::WorkerFailed.as_status(), "failed");
1436        assert_eq!(WorkerEvent::WorkerCancelled.as_status(), "cancelled");
1437
1438        for terminal in [
1439            WorkerEvent::WorkerCompleted,
1440            WorkerEvent::WorkerFailed,
1441            WorkerEvent::WorkerCancelled,
1442        ] {
1443            assert!(terminal.is_terminal(), "{terminal:?} should be terminal");
1444        }
1445        for non_terminal in [
1446            WorkerEvent::WorkerSpawned,
1447            WorkerEvent::WorkerProgressed,
1448            WorkerEvent::WorkerWaitingForInput,
1449        ] {
1450            assert!(
1451                !non_terminal.is_terminal(),
1452                "{non_terminal:?} should not be terminal"
1453            );
1454        }
1455    }
1456
1457    #[test]
1458    fn worker_update_event_routes_through_session_keyed_sink() {
1459        // Worker lifecycle events ride the same session-keyed
1460        // `AgentEventSink` registry as message and tool events. This
1461        // is the canonical path ACP and A2A subscribe to — gate it
1462        // here so a registry-routing regression breaks loudly.
1463        reset_all_sinks();
1464        let captured: Arc<Mutex<Vec<AgentEvent>>> = Arc::new(Mutex::new(Vec::new()));
1465        struct CapturingSink(Arc<Mutex<Vec<AgentEvent>>>);
1466        impl AgentEventSink for CapturingSink {
1467            fn handle_event(&self, event: &AgentEvent) {
1468                self.0
1469                    .lock()
1470                    .expect("captured sink mutex poisoned")
1471                    .push(event.clone());
1472            }
1473        }
1474        register_sink(
1475            "worker-session-1",
1476            Arc::new(CapturingSink(captured.clone())),
1477        );
1478        emit_event(&AgentEvent::WorkerUpdate {
1479            session_id: "worker-session-1".into(),
1480            worker_id: "worker_42".into(),
1481            worker_name: "review_captain".into(),
1482            worker_task: "review pr".into(),
1483            worker_mode: "delegated_stage".into(),
1484            event: WorkerEvent::WorkerWaitingForInput,
1485            status: WorkerEvent::WorkerWaitingForInput.as_status().to_string(),
1486            metadata: serde_json::json!({"awaiting_started_at": "0193..."}),
1487            audit: None,
1488        });
1489        // Other sessions don't receive cross-talk.
1490        emit_event(&AgentEvent::WorkerUpdate {
1491            session_id: "other-session".into(),
1492            worker_id: "w2".into(),
1493            worker_name: "n2".into(),
1494            worker_task: "t2".into(),
1495            worker_mode: "delegated_stage".into(),
1496            event: WorkerEvent::WorkerCompleted,
1497            status: "completed".into(),
1498            metadata: serde_json::json!({}),
1499            audit: None,
1500        });
1501        let received = captured.lock().unwrap().clone();
1502        assert_eq!(received.len(), 1, "got: {received:?}");
1503        match &received[0] {
1504            AgentEvent::WorkerUpdate {
1505                session_id,
1506                worker_id,
1507                event,
1508                status,
1509                ..
1510            } => {
1511                assert_eq!(session_id, "worker-session-1");
1512                assert_eq!(worker_id, "worker_42");
1513                assert_eq!(*event, WorkerEvent::WorkerWaitingForInput);
1514                assert_eq!(status, "awaiting_input");
1515            }
1516            other => panic!("expected WorkerUpdate, got {other:?}"),
1517        }
1518        reset_all_sinks();
1519    }
1520
1521    #[test]
1522    fn worker_update_event_serializes_to_canonical_shape() {
1523        // Persisted event-log entries flatten the AgentEvent envelope,
1524        // so the WorkerUpdate variant must serialize with a `type` of
1525        // `worker_update` and the worker fields directly on the
1526        // top-level object (matching the `#[serde(tag = "type", ...)]`
1527        // shape the rest of AgentEvent uses).
1528        let event = AgentEvent::WorkerUpdate {
1529            session_id: "s".into(),
1530            worker_id: "w".into(),
1531            worker_name: "n".into(),
1532            worker_task: "t".into(),
1533            worker_mode: "delegated_stage".into(),
1534            event: WorkerEvent::WorkerProgressed,
1535            status: "progressed".into(),
1536            metadata: serde_json::json!({"started_at": "0193..."}),
1537            audit: Some(serde_json::json!({"run_id": "run_x"})),
1538        };
1539        let value = serde_json::to_value(&event).unwrap();
1540        assert_eq!(value["type"], "worker_update");
1541        assert_eq!(value["session_id"], "s");
1542        assert_eq!(value["worker_id"], "w");
1543        assert_eq!(value["status"], "progressed");
1544        assert_eq!(value["audit"]["run_id"], "run_x");
1545
1546        // Round-trip: the persisted event log must deserialize the
1547        // canonical shape back into the typed variant so replay
1548        // tooling can re-derive lifecycle state offline.
1549        let recovered: AgentEvent = serde_json::from_value(value).unwrap();
1550        match recovered {
1551            AgentEvent::WorkerUpdate {
1552                event: recovered_event,
1553                ..
1554            } => assert_eq!(recovered_event, WorkerEvent::WorkerProgressed),
1555            other => panic!("expected WorkerUpdate, got {other:?}"),
1556        }
1557    }
1558
1559    #[test]
1560    fn tool_call_update_includes_executor_when_present() {
1561        let event = AgentEvent::ToolCallUpdate {
1562            session_id: "s".into(),
1563            tool_call_id: "tc-1".into(),
1564            tool_name: "read".into(),
1565            status: ToolCallStatus::Completed,
1566            raw_output: None,
1567            error: None,
1568            duration_ms: None,
1569            execution_duration_ms: None,
1570            error_category: None,
1571            executor: Some(ToolExecutor::McpServer {
1572                server_name: "github".into(),
1573            }),
1574            parsing: None,
1575
1576            raw_input: None,
1577            raw_input_partial: None,
1578            audit: None,
1579        };
1580        let json = serde_json::to_value(&event).unwrap();
1581        assert_eq!(json["executor"]["kind"], "mcp_server");
1582        assert_eq!(json["executor"]["server_name"], "github");
1583    }
1584
1585    #[test]
1586    fn tool_call_update_omits_audit_when_absent() {
1587        let event = AgentEvent::ToolCallUpdate {
1588            session_id: "s".into(),
1589            tool_call_id: "tc-1".into(),
1590            tool_name: "read".into(),
1591            status: ToolCallStatus::Completed,
1592            raw_output: None,
1593            error: None,
1594            duration_ms: None,
1595            execution_duration_ms: None,
1596            error_category: None,
1597            executor: None,
1598            parsing: None,
1599            raw_input: None,
1600            raw_input_partial: None,
1601            audit: None,
1602        };
1603        let json = serde_json::to_value(&event).unwrap();
1604        assert!(json.get("audit").is_none(), "got: {json}");
1605    }
1606
1607    #[test]
1608    fn tool_call_update_includes_audit_when_present() {
1609        let audit = MutationSessionRecord {
1610            session_id: "session_42".into(),
1611            run_id: Some("run_42".into()),
1612            mutation_scope: "apply_workspace".into(),
1613            execution_kind: Some("worker".into()),
1614            ..Default::default()
1615        };
1616        let event = AgentEvent::ToolCallUpdate {
1617            session_id: "s".into(),
1618            tool_call_id: "tc-1".into(),
1619            tool_name: "edit_file".into(),
1620            status: ToolCallStatus::Completed,
1621            raw_output: None,
1622            error: None,
1623            duration_ms: None,
1624            execution_duration_ms: None,
1625            error_category: None,
1626            executor: Some(ToolExecutor::HostBridge),
1627            parsing: None,
1628            raw_input: None,
1629            raw_input_partial: None,
1630            audit: Some(audit),
1631        };
1632        let json = serde_json::to_value(&event).unwrap();
1633        assert_eq!(json["audit"]["session_id"], "session_42");
1634        assert_eq!(json["audit"]["run_id"], "run_42");
1635        assert_eq!(json["audit"]["mutation_scope"], "apply_workspace");
1636        assert_eq!(json["audit"]["execution_kind"], "worker");
1637    }
1638
1639    #[test]
1640    fn tool_call_update_deserializes_without_audit_field_for_back_compat() {
1641        let raw = serde_json::json!({
1642            "type": "tool_call_update",
1643            "session_id": "s",
1644            "tool_call_id": "tc-1",
1645            "tool_name": "read",
1646            "status": "completed",
1647            "raw_output": null,
1648            "error": null,
1649        });
1650        let event: AgentEvent = serde_json::from_value(raw).expect("parses without audit key");
1651        match event {
1652            AgentEvent::ToolCallUpdate { audit, .. } => {
1653                assert!(audit.is_none());
1654            }
1655            other => panic!("expected ToolCallUpdate, got {other:?}"),
1656        }
1657    }
1658}