Skip to main content

harn_vm/
agent_events.rs

1//! Agent event stream — the ACP-aligned observation surface for the
2//! agent loop.
3//!
4//! Every phase of the turn loop emits an `AgentEvent`. The canonical
5//! variants map 1:1 onto ACP `SessionUpdate` values; three internal
6//! variants (`TurnStart`, `TurnEnd`, `FeedbackInjected`) let pipelines
7//! react to loop milestones that don't have a direct ACP counterpart.
8//!
9//! There are two subscription paths, both keyed on session id so two
10//! concurrent sessions never cross-talk:
11//!
12//! 1. **External sinks** (`AgentEventSink` trait) — Rust-side consumers
13//!    like the harn-cli ACP server. Invoked synchronously by the loop.
14//!    Stored in a global `OnceLock<RwLock<HashMap<...>>>` here.
15//! 2. **Closure subscribers** — `.harn` closures registered via the
16//!    `agent_subscribe(session_id, callback)` host builtin. These live
17//!    on the session's `SessionState.subscribers` in
18//!    `crate::agent_sessions`, because sessions are the single source
19//!    of truth for session-scoped VM state.
20
21use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23
24use serde::{Deserialize, Serialize};
25
26use crate::event_log::{AnyEventLog, EventLog, LogEvent as EventLogRecord, Topic};
27use crate::orchestration::HandoffArtifact;
28use crate::tool_annotations::ToolKind;
29
30/// One coalesced filesystem notification from a hostlib `fs_watch`
31/// subscription.
32#[derive(Clone, Debug, Serialize, Deserialize)]
33pub struct FsWatchEvent {
34    pub kind: String,
35    pub paths: Vec<String>,
36    pub relative_paths: Vec<String>,
37    pub raw_kind: String,
38    pub error: Option<String>,
39}
40
41/// Typed worker lifecycle events emitted by delegated/background agent
42/// execution. Bridge-facing worker updates still derive a string status
43/// from these variants, but the runtime no longer passes raw status
44/// strings around internally.
45#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
46pub enum WorkerEvent {
47    WorkerSpawned,
48    WorkerCompleted,
49    WorkerFailed,
50    WorkerCancelled,
51}
52
53impl WorkerEvent {
54    pub fn as_status(self) -> &'static str {
55        match self {
56            Self::WorkerSpawned => "running",
57            Self::WorkerCompleted => "completed",
58            Self::WorkerFailed => "failed",
59            Self::WorkerCancelled => "cancelled",
60        }
61    }
62
63    pub fn as_str(self) -> &'static str {
64        match self {
65            Self::WorkerSpawned => "WorkerSpawned",
66            Self::WorkerCompleted => "WorkerCompleted",
67            Self::WorkerFailed => "WorkerFailed",
68            Self::WorkerCancelled => "WorkerCancelled",
69        }
70    }
71}
72
73/// Status of a tool call. Mirrors ACP's `toolCallStatus`.
74#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
75#[serde(rename_all = "snake_case")]
76pub enum ToolCallStatus {
77    /// Dispatched by the model but not yet started.
78    Pending,
79    /// Dispatch is actively running.
80    InProgress,
81    /// Finished successfully.
82    Completed,
83    /// Finished with an error.
84    Failed,
85}
86
87/// Events emitted by the agent loop. The first five variants map 1:1
88/// to ACP `sessionUpdate` variants; the last three are harn-internal.
89#[derive(Clone, Debug, Serialize, Deserialize)]
90#[serde(tag = "type", rename_all = "snake_case")]
91pub enum AgentEvent {
92    AgentMessageChunk {
93        session_id: String,
94        content: String,
95    },
96    AgentThoughtChunk {
97        session_id: String,
98        content: String,
99    },
100    ToolCall {
101        session_id: String,
102        tool_call_id: String,
103        tool_name: String,
104        kind: Option<ToolKind>,
105        status: ToolCallStatus,
106        raw_input: serde_json::Value,
107    },
108    ToolCallUpdate {
109        session_id: String,
110        tool_call_id: String,
111        tool_name: String,
112        status: ToolCallStatus,
113        raw_output: Option<serde_json::Value>,
114        error: Option<String>,
115        /// Wall-clock milliseconds from the parse-to-execution boundary
116        /// to the terminal `Completed`/`Failed` update. Includes the
117        /// time spent in any wrapping orchestration logic (loop checks,
118        /// post-tool hooks, microcompaction). Populated only on the
119        /// terminal update — `None` on intermediate `Pending` /
120        /// `InProgress` updates so clients can ignore the field until
121        /// it shows up.
122        #[serde(default, skip_serializing_if = "Option::is_none")]
123        duration_ms: Option<u64>,
124        /// Milliseconds spent in the actual host/builtin/MCP dispatch
125        /// call only (the inner `dispatch_tool_execution` window).
126        /// Populated only on the terminal update; `None` otherwise.
127        #[serde(default, skip_serializing_if = "Option::is_none")]
128        execution_duration_ms: Option<u64>,
129    },
130    Plan {
131        session_id: String,
132        plan: serde_json::Value,
133    },
134    TurnStart {
135        session_id: String,
136        iteration: usize,
137    },
138    TurnEnd {
139        session_id: String,
140        iteration: usize,
141        turn_info: serde_json::Value,
142    },
143    FeedbackInjected {
144        session_id: String,
145        kind: String,
146        content: String,
147    },
148    /// Emitted when the agent loop exhausts `max_iterations` without any
149    /// explicit break condition firing. Distinct from a natural "done" or
150    /// a "stuck" nudge-exhaustion: this is strictly a budget cap.
151    BudgetExhausted {
152        session_id: String,
153        max_iterations: usize,
154    },
155    /// Emitted when the loop breaks because consecutive text-only turns
156    /// hit `max_nudges`. Parity with `BudgetExhausted` / `TurnEnd` for
157    /// hosts that key off agent-terminal events.
158    LoopStuck {
159        session_id: String,
160        max_nudges: usize,
161        last_iteration: usize,
162        tail_excerpt: String,
163    },
164    /// Emitted when the daemon idle-wait loop trips its watchdog because
165    /// every configured wake source returned `None` for N consecutive
166    /// attempts. Exists so a broken daemon doesn't hang the session
167    /// silently.
168    DaemonWatchdogTripped {
169        session_id: String,
170        attempts: usize,
171        elapsed_ms: u64,
172    },
173    /// Emitted when a skill is activated. Carries the match reason so
174    /// replayers can reconstruct *why* a given skill took effect at
175    /// this iteration.
176    SkillActivated {
177        session_id: String,
178        skill_name: String,
179        iteration: usize,
180        reason: String,
181    },
182    /// Emitted when a previously-active skill is deactivated because
183    /// the reassess phase no longer matches it.
184    SkillDeactivated {
185        session_id: String,
186        skill_name: String,
187        iteration: usize,
188    },
189    /// Emitted once per activation when the skill's `allowed_tools` filter
190    /// narrows the effective tool surface exposed to the model.
191    SkillScopeTools {
192        session_id: String,
193        skill_name: String,
194        allowed_tools: Vec<String>,
195    },
196    /// Emitted when a `tool_search` query is issued by the model. Carries
197    /// the raw query args, the configured strategy, and a `mode` tag
198    /// distinguishing the client-executed fallback (`"client"`) from
199    /// provider-native paths (`"anthropic"` / `"openai"`). Mirrors the
200    /// transcript event shape so hosts can render a search-in-progress
201    /// chip in real time — the replay path walks the transcript after
202    /// the turn, which is too late for live UX.
203    ToolSearchQuery {
204        session_id: String,
205        tool_use_id: String,
206        name: String,
207        query: serde_json::Value,
208        strategy: String,
209        mode: String,
210    },
211    /// Emitted when `tool_search` resolves — carries the list of tool
212    /// names newly promoted into the model's effective surface for the
213    /// next turn. Pair-emitted with `ToolSearchQuery` on every search.
214    ToolSearchResult {
215        session_id: String,
216        tool_use_id: String,
217        promoted: Vec<String>,
218        strategy: String,
219        mode: String,
220    },
221    TranscriptCompacted {
222        session_id: String,
223        mode: String,
224        strategy: String,
225        archived_messages: usize,
226        estimated_tokens_before: usize,
227        estimated_tokens_after: usize,
228        snapshot_asset_id: Option<String>,
229    },
230    Handoff {
231        session_id: String,
232        artifact_id: String,
233        handoff: Box<HandoffArtifact>,
234    },
235    FsWatch {
236        session_id: String,
237        subscription_id: String,
238        events: Vec<FsWatchEvent>,
239    },
240}
241
242impl AgentEvent {
243    pub fn session_id(&self) -> &str {
244        match self {
245            Self::AgentMessageChunk { session_id, .. }
246            | Self::AgentThoughtChunk { session_id, .. }
247            | Self::ToolCall { session_id, .. }
248            | Self::ToolCallUpdate { session_id, .. }
249            | Self::Plan { session_id, .. }
250            | Self::TurnStart { session_id, .. }
251            | Self::TurnEnd { session_id, .. }
252            | Self::FeedbackInjected { session_id, .. }
253            | Self::BudgetExhausted { session_id, .. }
254            | Self::LoopStuck { session_id, .. }
255            | Self::DaemonWatchdogTripped { session_id, .. }
256            | Self::SkillActivated { session_id, .. }
257            | Self::SkillDeactivated { session_id, .. }
258            | Self::SkillScopeTools { session_id, .. }
259            | Self::ToolSearchQuery { session_id, .. }
260            | Self::ToolSearchResult { session_id, .. }
261            | Self::TranscriptCompacted { session_id, .. }
262            | Self::Handoff { session_id, .. }
263            | Self::FsWatch { session_id, .. } => session_id,
264        }
265    }
266}
267
268/// External consumers of the event stream (e.g. the harn-cli ACP server,
269/// which translates events into JSON-RPC notifications).
270pub trait AgentEventSink: Send + Sync {
271    fn handle_event(&self, event: &AgentEvent);
272}
273
274/// Envelope written to `event_log.jsonl` (#103). Wraps the raw
275/// `AgentEvent` with monotonic index + timestamp + frame depth so
276/// replay engines can reconstruct paused state at any event index,
277/// and scrubber UIs can bucket events by time. The envelope is the
278/// on-disk shape; the wire format for live consumers is still the
279/// raw `AgentEvent` so existing sinks don't churn.
280#[derive(Clone, Debug, Serialize, Deserialize)]
281pub struct PersistedAgentEvent {
282    /// Monotonic per-session index starting at 0. Unique within a
283    /// session; gaps never happen even under load because the sink
284    /// owns the counter under a mutex.
285    pub index: u64,
286    /// Milliseconds since the Unix epoch, captured when the sink
287    /// received the event. Not the event's emission time — that
288    /// would require threading a clock through every emit site.
289    pub emitted_at_ms: i64,
290    /// Call-stack depth at the moment of emission, when the caller
291    /// can supply it. `None` for events emitted from a context where
292    /// the VM frame stack isn't available.
293    pub frame_depth: Option<u32>,
294    /// The raw event, flattened so `jq '.type'` works as expected.
295    #[serde(flatten)]
296    pub event: AgentEvent,
297}
298
299/// Append-only JSONL sink for a single session's event stream (#103).
300/// One writer per session; sinks rotate to a numbered suffix when a
301/// running file crosses `ROTATE_BYTES` (100 MB today — long chat
302/// sessions rarely exceed 5 MB, so rotation almost never fires).
303pub struct JsonlEventSink {
304    state: Mutex<JsonlEventSinkState>,
305    base_path: std::path::PathBuf,
306}
307
308struct JsonlEventSinkState {
309    writer: std::io::BufWriter<std::fs::File>,
310    index: u64,
311    bytes_written: u64,
312    rotation: u32,
313}
314
315impl JsonlEventSink {
316    /// Hard cap past which the current file rotates to a numbered
317    /// suffix (`event_log-000001.jsonl`). Chosen so long debugging
318    /// sessions don't produce unreadable multi-GB logs.
319    pub const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
320
321    /// Open a new sink writing to `base_path`. Creates parent dirs
322    /// if missing. Overwrites an existing file so each fresh session
323    /// starts from index 0.
324    pub fn open(base_path: impl Into<std::path::PathBuf>) -> std::io::Result<Arc<Self>> {
325        let base_path = base_path.into();
326        if let Some(parent) = base_path.parent() {
327            std::fs::create_dir_all(parent)?;
328        }
329        let file = std::fs::OpenOptions::new()
330            .create(true)
331            .truncate(true)
332            .write(true)
333            .open(&base_path)?;
334        Ok(Arc::new(Self {
335            state: Mutex::new(JsonlEventSinkState {
336                writer: std::io::BufWriter::new(file),
337                index: 0,
338                bytes_written: 0,
339                rotation: 0,
340            }),
341            base_path,
342        }))
343    }
344
345    /// Flush any buffered writes. Called on session shutdown; the
346    /// Drop impl calls this too but on early panic it may not run.
347    pub fn flush(&self) -> std::io::Result<()> {
348        use std::io::Write as _;
349        self.state
350            .lock()
351            .expect("jsonl sink mutex poisoned")
352            .writer
353            .flush()
354    }
355
356    /// Current event index — primarily for tests and the "how many
357    /// events are in this run" run-record summary.
358    pub fn event_count(&self) -> u64 {
359        self.state.lock().expect("jsonl sink mutex poisoned").index
360    }
361
362    fn rotate_if_needed(&self, state: &mut JsonlEventSinkState) -> std::io::Result<()> {
363        use std::io::Write as _;
364        if state.bytes_written < Self::ROTATE_BYTES {
365            return Ok(());
366        }
367        state.writer.flush()?;
368        state.rotation += 1;
369        let suffix = format!("-{:06}", state.rotation);
370        let rotated = self.base_path.with_file_name({
371            let stem = self
372                .base_path
373                .file_stem()
374                .and_then(|s| s.to_str())
375                .unwrap_or("event_log");
376            let ext = self
377                .base_path
378                .extension()
379                .and_then(|e| e.to_str())
380                .unwrap_or("jsonl");
381            format!("{stem}{suffix}.{ext}")
382        });
383        let file = std::fs::OpenOptions::new()
384            .create(true)
385            .truncate(true)
386            .write(true)
387            .open(&rotated)?;
388        state.writer = std::io::BufWriter::new(file);
389        state.bytes_written = 0;
390        Ok(())
391    }
392}
393
394/// Event-log-backed sink for a single session's agent event stream.
395/// Uses the generalized append-only event log when one is installed for
396/// the current VM thread and falls back to `JsonlEventSink` only for
397/// older env-driven workflows.
398pub struct EventLogSink {
399    log: Arc<AnyEventLog>,
400    topic: Topic,
401    session_id: String,
402}
403
404impl EventLogSink {
405    pub fn new(log: Arc<AnyEventLog>, session_id: impl Into<String>) -> Arc<Self> {
406        let session_id = session_id.into();
407        let topic = Topic::new(format!(
408            "observability.agent_events.{}",
409            crate::event_log::sanitize_topic_component(&session_id)
410        ))
411        .expect("session id should sanitize to a valid topic");
412        Arc::new(Self {
413            log,
414            topic,
415            session_id,
416        })
417    }
418}
419
420impl AgentEventSink for JsonlEventSink {
421    fn handle_event(&self, event: &AgentEvent) {
422        use std::io::Write as _;
423        let mut state = self.state.lock().expect("jsonl sink mutex poisoned");
424        let index = state.index;
425        state.index += 1;
426        let emitted_at_ms = std::time::SystemTime::now()
427            .duration_since(std::time::UNIX_EPOCH)
428            .map(|d| d.as_millis() as i64)
429            .unwrap_or(0);
430        let envelope = PersistedAgentEvent {
431            index,
432            emitted_at_ms,
433            frame_depth: None,
434            event: event.clone(),
435        };
436        if let Ok(line) = serde_json::to_string(&envelope) {
437            // One line, newline-terminated — JSON Lines spec.
438            // Errors here are swallowed on purpose; a failing write
439            // must never crash the agent loop, and the run record
440            // itself is a secondary artifact.
441            let _ = state.writer.write_all(line.as_bytes());
442            let _ = state.writer.write_all(b"\n");
443            state.bytes_written += line.len() as u64 + 1;
444            let _ = self.rotate_if_needed(&mut state);
445        }
446    }
447}
448
449impl AgentEventSink for EventLogSink {
450    fn handle_event(&self, event: &AgentEvent) {
451        let event_json = match serde_json::to_value(event) {
452            Ok(value) => value,
453            Err(_) => return,
454        };
455        let event_kind = event_json
456            .get("type")
457            .and_then(|value| value.as_str())
458            .unwrap_or("agent_event")
459            .to_string();
460        let payload = serde_json::json!({
461            "index_hint": now_ms(),
462            "session_id": self.session_id,
463            "event": event_json,
464        });
465        let mut headers = std::collections::BTreeMap::new();
466        headers.insert("session_id".to_string(), self.session_id.clone());
467        let log = self.log.clone();
468        let topic = self.topic.clone();
469        let record = EventLogRecord::new(event_kind, payload).with_headers(headers);
470        if let Ok(handle) = tokio::runtime::Handle::try_current() {
471            handle.spawn(async move {
472                let _ = log.append(&topic, record).await;
473            });
474        } else {
475            let _ = futures::executor::block_on(log.append(&topic, record));
476        }
477    }
478}
479
480impl Drop for JsonlEventSink {
481    fn drop(&mut self) {
482        if let Ok(mut state) = self.state.lock() {
483            use std::io::Write as _;
484            let _ = state.writer.flush();
485        }
486    }
487}
488
489/// Fan-out helper for composing multiple external sinks.
490pub struct MultiSink {
491    sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
492}
493
494impl MultiSink {
495    pub fn new() -> Self {
496        Self {
497            sinks: Mutex::new(Vec::new()),
498        }
499    }
500    pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
501        self.sinks.lock().expect("sink mutex poisoned").push(sink);
502    }
503    pub fn len(&self) -> usize {
504        self.sinks.lock().expect("sink mutex poisoned").len()
505    }
506    pub fn is_empty(&self) -> bool {
507        self.len() == 0
508    }
509}
510
511impl Default for MultiSink {
512    fn default() -> Self {
513        Self::new()
514    }
515}
516
517impl AgentEventSink for MultiSink {
518    fn handle_event(&self, event: &AgentEvent) {
519        // Deliberate: snapshot then release the lock before invoking sink
520        // callbacks. Sinks can re-enter the event system (e.g. a host
521        // sink that logs to another AgentEvent path), so holding the
522        // mutex across the callback would risk self-deadlock. Arc clones
523        // are refcount bumps — cheap.
524        let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
525        for sink in sinks {
526            sink.handle_event(event);
527        }
528    }
529}
530
531type ExternalSinkRegistry = RwLock<HashMap<String, Vec<Arc<dyn AgentEventSink>>>>;
532
533fn external_sinks() -> &'static ExternalSinkRegistry {
534    static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
535    REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
536}
537
538pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
539    let session_id = session_id.into();
540    let mut reg = external_sinks().write().expect("sink registry poisoned");
541    reg.entry(session_id).or_default().push(sink);
542}
543
544/// Remove all external sinks registered for `session_id`. Does NOT
545/// close the session itself — subscribers and transcript survive, so a
546/// later `agent_loop` call with the same id continues the conversation.
547pub fn clear_session_sinks(session_id: &str) {
548    external_sinks()
549        .write()
550        .expect("sink registry poisoned")
551        .remove(session_id);
552}
553
554pub fn reset_all_sinks() {
555    external_sinks()
556        .write()
557        .expect("sink registry poisoned")
558        .clear();
559    crate::agent_sessions::reset_session_store();
560}
561
562/// Emit an event to external sinks registered for this session. Pipeline
563/// closure subscribers are NOT called by this function — the agent
564/// loop owns that path because it needs its async VM context.
565pub fn emit_event(event: &AgentEvent) {
566    let sinks: Vec<Arc<dyn AgentEventSink>> = {
567        let reg = external_sinks().read().expect("sink registry poisoned");
568        reg.get(event.session_id()).cloned().unwrap_or_default()
569    };
570    for sink in sinks {
571        sink.handle_event(event);
572    }
573}
574
575fn now_ms() -> i64 {
576    std::time::SystemTime::now()
577        .duration_since(std::time::UNIX_EPOCH)
578        .map(|duration| duration.as_millis() as i64)
579        .unwrap_or(0)
580}
581
582pub fn session_external_sink_count(session_id: &str) -> usize {
583    external_sinks()
584        .read()
585        .expect("sink registry poisoned")
586        .get(session_id)
587        .map(|v| v.len())
588        .unwrap_or(0)
589}
590
591pub fn session_closure_subscriber_count(session_id: &str) -> usize {
592    crate::agent_sessions::subscriber_count(session_id)
593}
594
595#[cfg(test)]
596mod tests {
597    use super::*;
598    use std::sync::atomic::{AtomicUsize, Ordering};
599
600    struct CountingSink(Arc<AtomicUsize>);
601    impl AgentEventSink for CountingSink {
602        fn handle_event(&self, _event: &AgentEvent) {
603            self.0.fetch_add(1, Ordering::SeqCst);
604        }
605    }
606
607    #[test]
608    fn multi_sink_fans_out_in_order() {
609        let multi = MultiSink::new();
610        let a = Arc::new(AtomicUsize::new(0));
611        let b = Arc::new(AtomicUsize::new(0));
612        multi.push(Arc::new(CountingSink(a.clone())));
613        multi.push(Arc::new(CountingSink(b.clone())));
614        let event = AgentEvent::TurnStart {
615            session_id: "s1".into(),
616            iteration: 1,
617        };
618        multi.handle_event(&event);
619        assert_eq!(a.load(Ordering::SeqCst), 1);
620        assert_eq!(b.load(Ordering::SeqCst), 1);
621    }
622
623    #[test]
624    fn session_scoped_sink_routing() {
625        reset_all_sinks();
626        let a = Arc::new(AtomicUsize::new(0));
627        let b = Arc::new(AtomicUsize::new(0));
628        register_sink("session-a", Arc::new(CountingSink(a.clone())));
629        register_sink("session-b", Arc::new(CountingSink(b.clone())));
630        emit_event(&AgentEvent::TurnStart {
631            session_id: "session-a".into(),
632            iteration: 0,
633        });
634        assert_eq!(a.load(Ordering::SeqCst), 1);
635        assert_eq!(b.load(Ordering::SeqCst), 0);
636        emit_event(&AgentEvent::TurnEnd {
637            session_id: "session-b".into(),
638            iteration: 0,
639            turn_info: serde_json::json!({}),
640        });
641        assert_eq!(a.load(Ordering::SeqCst), 1);
642        assert_eq!(b.load(Ordering::SeqCst), 1);
643        clear_session_sinks("session-a");
644        assert_eq!(session_external_sink_count("session-a"), 0);
645        assert_eq!(session_external_sink_count("session-b"), 1);
646        reset_all_sinks();
647    }
648
649    #[test]
650    fn jsonl_sink_writes_monotonic_indices_and_timestamps() {
651        use std::io::{BufRead, BufReader};
652        let dir = std::env::temp_dir().join(format!("harn-event-log-{}", std::process::id()));
653        std::fs::create_dir_all(&dir).unwrap();
654        let path = dir.join("event_log.jsonl");
655        let sink = JsonlEventSink::open(&path).unwrap();
656        for i in 0..5 {
657            sink.handle_event(&AgentEvent::TurnStart {
658                session_id: "s".into(),
659                iteration: i,
660            });
661        }
662        assert_eq!(sink.event_count(), 5);
663        sink.flush().unwrap();
664
665        // Read back + assert monotonic indices + non-decreasing timestamps.
666        let file = std::fs::File::open(&path).unwrap();
667        let mut last_idx: i64 = -1;
668        let mut last_ts: i64 = 0;
669        for line in BufReader::new(file).lines() {
670            let line = line.unwrap();
671            let val: serde_json::Value = serde_json::from_str(&line).unwrap();
672            let idx = val["index"].as_i64().unwrap();
673            let ts = val["emitted_at_ms"].as_i64().unwrap();
674            assert_eq!(idx, last_idx + 1, "indices must be contiguous");
675            assert!(ts >= last_ts, "timestamps must be non-decreasing");
676            last_idx = idx;
677            last_ts = ts;
678            // Event payload flattened — type tag must survive.
679            assert_eq!(val["type"], "turn_start");
680        }
681        assert_eq!(last_idx, 4);
682        let _ = std::fs::remove_file(&path);
683    }
684
685    #[test]
686    fn tool_call_update_durations_serialize_when_present_and_skip_when_absent() {
687        // Terminal update with both durations populated — both fields
688        // appear in the JSON. Snake_case keys here because this is the
689        // canonical AgentEvent shape; the ACP adapter renames to
690        // camelCase separately.
691        let terminal = AgentEvent::ToolCallUpdate {
692            session_id: "s".into(),
693            tool_call_id: "tc-1".into(),
694            tool_name: "read".into(),
695            status: ToolCallStatus::Completed,
696            raw_output: None,
697            error: None,
698            duration_ms: Some(42),
699            execution_duration_ms: Some(7),
700        };
701        let value = serde_json::to_value(&terminal).unwrap();
702        assert_eq!(value["duration_ms"], serde_json::json!(42));
703        assert_eq!(value["execution_duration_ms"], serde_json::json!(7));
704
705        // In-progress update with `None` for both — both keys must be
706        // absent (not `null`) so older ACP clients that key off
707        // presence don't see a misleading zero.
708        let intermediate = AgentEvent::ToolCallUpdate {
709            session_id: "s".into(),
710            tool_call_id: "tc-1".into(),
711            tool_name: "read".into(),
712            status: ToolCallStatus::InProgress,
713            raw_output: None,
714            error: None,
715            duration_ms: None,
716            execution_duration_ms: None,
717        };
718        let value = serde_json::to_value(&intermediate).unwrap();
719        let object = value.as_object().expect("update serializes as object");
720        assert!(
721            !object.contains_key("duration_ms"),
722            "duration_ms must be omitted when None: {value}"
723        );
724        assert!(
725            !object.contains_key("execution_duration_ms"),
726            "execution_duration_ms must be omitted when None: {value}"
727        );
728    }
729
730    #[test]
731    fn tool_call_update_deserializes_without_duration_fields_for_back_compat() {
732        // Persisted event-log entries written before the fields existed
733        // must still deserialize cleanly. The missing keys map to None.
734        let raw = serde_json::json!({
735            "type": "tool_call_update",
736            "session_id": "s",
737            "tool_call_id": "tc-1",
738            "tool_name": "read",
739            "status": "completed",
740            "raw_output": null,
741            "error": null,
742        });
743        let event: AgentEvent = serde_json::from_value(raw).expect("parses without duration keys");
744        match event {
745            AgentEvent::ToolCallUpdate {
746                duration_ms,
747                execution_duration_ms,
748                ..
749            } => {
750                assert!(duration_ms.is_none());
751                assert!(execution_duration_ms.is_none());
752            }
753            other => panic!("expected ToolCallUpdate, got {other:?}"),
754        }
755    }
756
757    #[test]
758    fn tool_call_status_serde() {
759        assert_eq!(
760            serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
761            "\"pending\""
762        );
763        assert_eq!(
764            serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
765            "\"in_progress\""
766        );
767        assert_eq!(
768            serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
769            "\"completed\""
770        );
771        assert_eq!(
772            serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
773            "\"failed\""
774        );
775    }
776}