Skip to main content

vtcode_core/core/agent/events/
mod.rs

1//! Event recording utilities for the agent runner.
2
3mod lifecycle;
4pub use lifecycle::{
5    SharedLifecycleEmitter, ToolOutputPayload, error_item_completed_event,
6    tool_invocation_completed_event, tool_output_completed_event, tool_output_item_id,
7    tool_output_payload_from_value, tool_output_started_event, tool_output_updated_event,
8    tool_started_event,
9};
10
11use crate::core::threads::{SubmissionId, ThreadRuntimeHandle};
12use crate::exec::events::{
13    CommandExecutionItem, CommandExecutionStatus, CompactionMode, CompactionTrigger, ErrorItem,
14    FileChangeItem, FileUpdateChange, HarnessEventItem, HarnessEventKind, ItemCompletedEvent,
15    ItemStartedEvent, PatchApplyStatus, PatchChangeKind, ThreadCompactBoundaryEvent,
16    ThreadCompletedEvent, ThreadCompletionSubtype, ThreadEvent, ThreadItem, ThreadItemDetails,
17    ThreadStartedEvent, TurnCompletedEvent, TurnFailedEvent, TurnStartedEvent, Usage,
18};
19use parking_lot::Mutex;
20use serde_json::Value;
21use std::sync::Arc;
22use uuid::Uuid;
23
24/// Callback type alias for streaming structured events.
25pub type EventSink = Arc<Mutex<Box<dyn FnMut(&ThreadEvent) + Send>>>;
26
27#[doc(hidden)]
28pub fn event_sink<F>(callback: F) -> EventSink
29where
30    F: FnMut(&ThreadEvent) + Send + 'static,
31{
32    Arc::new(Mutex::new(Box::new(callback)))
33}
34
35#[derive(Debug, Clone)]
36pub struct ActiveCommandHandle {
37    id: String,
38    command: String,
39}
40
41#[derive(Debug, Clone)]
42pub struct ActiveToolHandle {
43    id: String,
44    tool_name: String,
45    arguments: Option<Value>,
46    tool_call_id: Option<String>,
47}
48
49impl ActiveToolHandle {
50    #[must_use]
51    pub fn item_id(&self) -> &str {
52        &self.id
53    }
54}
55
56/// Helper responsible for recording execution events and relaying them to optional sinks.
57#[derive(Default)]
58pub struct ExecEventRecorder {
59    thread_id: String,
60    events: Vec<ThreadEvent>,
61    event_sink: Option<EventSink>,
62    thread_handle: Option<ThreadRuntimeHandle>,
63    active_submission_id: Option<SubmissionId>,
64    active_turn_id: Option<String>,
65    lifecycle: SharedLifecycleEmitter,
66}
67
68impl ExecEventRecorder {
69    pub fn new(
70        thread_id: impl Into<String>,
71        event_sink: Option<EventSink>,
72        thread_handle: Option<ThreadRuntimeHandle>,
73    ) -> Self {
74        let thread_id = thread_id.into();
75        let mut recorder = Self {
76            thread_id: thread_id.clone(),
77            events: Vec::new(),
78            event_sink,
79            thread_handle,
80            active_submission_id: None,
81            active_turn_id: None,
82            lifecycle: SharedLifecycleEmitter::default(),
83        };
84        recorder.record_with_context(
85            None,
86            None,
87            ThreadEvent::ThreadStarted(ThreadStartedEvent { thread_id }),
88        );
89        recorder
90    }
91
92    fn record(&mut self, event: ThreadEvent) {
93        self.record_with_context(
94            self.active_submission_id.clone(),
95            self.active_turn_id.clone(),
96            event,
97        );
98    }
99
100    fn record_with_context(
101        &mut self,
102        submission_id: Option<SubmissionId>,
103        turn_id: Option<String>,
104        event: ThreadEvent,
105    ) {
106        if let Some(sink) = &self.event_sink {
107            let mut callback = sink.lock();
108            callback(&event);
109        }
110        if let Some(handle) = &self.thread_handle {
111            handle.record_event(submission_id, turn_id, event.clone());
112        }
113        self.events.push(event);
114    }
115
116    pub fn record_thread_event(&mut self, event: ThreadEvent) {
117        self.record(event);
118    }
119
120    pub fn record_thread_events<I>(&mut self, events: I)
121    where
122        I: IntoIterator<Item = ThreadEvent>,
123    {
124        for event in events {
125            self.record(event);
126        }
127    }
128
129    fn record_pending_lifecycle_events(&mut self) {
130        for event in self.lifecycle.drain_events() {
131            self.record(event);
132        }
133    }
134
135    fn next_item_id(&mut self) -> String {
136        self.lifecycle.next_item_id()
137    }
138
139    pub fn turn_started(&mut self) {
140        if let Some(handle) = &self.thread_handle {
141            match handle.begin_turn() {
142                Ok(submission_id) => self.active_submission_id = Some(submission_id),
143                Err(_) => self.active_submission_id = None,
144            }
145            self.active_turn_id = Some(format!("turn-{}", Uuid::new_v4()));
146        }
147        self.record(ThreadEvent::TurnStarted(TurnStartedEvent::default()));
148    }
149
150    pub fn turn_completed(&mut self) {
151        self.record(ThreadEvent::TurnCompleted(TurnCompletedEvent {
152            usage: Usage::default(),
153        }));
154        self.finish_turn();
155    }
156
157    pub fn turn_failed(&mut self, message: &str) {
158        self.record(ThreadEvent::TurnFailed(TurnFailedEvent {
159            message: message.to_string(),
160            usage: None,
161        }));
162        self.finish_turn();
163    }
164
165    pub fn thread_completed(
166        &mut self,
167        session_id: &str,
168        subtype: ThreadCompletionSubtype,
169        outcome_code: &str,
170        result: Option<&str>,
171        stop_reason: Option<&str>,
172        usage: Usage,
173        total_cost_usd: Option<serde_json::Number>,
174        num_turns: usize,
175    ) {
176        self.record(ThreadEvent::ThreadCompleted(ThreadCompletedEvent {
177            thread_id: self.thread_id.clone(),
178            session_id: session_id.to_string(),
179            subtype,
180            outcome_code: outcome_code.to_string(),
181            result: result.map(str::to_string),
182            stop_reason: stop_reason.map(str::to_string),
183            usage,
184            total_cost_usd,
185            num_turns,
186        }));
187    }
188
189    pub fn compact_boundary(
190        &mut self,
191        trigger: CompactionTrigger,
192        mode: CompactionMode,
193        original_message_count: usize,
194        compacted_message_count: usize,
195        history_artifact_path: Option<&str>,
196    ) {
197        self.record(ThreadEvent::ThreadCompactBoundary(
198            ThreadCompactBoundaryEvent {
199                thread_id: self.thread_id.clone(),
200                trigger,
201                mode,
202                original_message_count,
203                compacted_message_count,
204                history_artifact_path: history_artifact_path.map(str::to_string),
205            },
206        ));
207    }
208
209    fn finish_turn(&mut self) {
210        if let Some(handle) = &self.thread_handle {
211            handle.finish_turn();
212        }
213        self.active_submission_id = None;
214        self.active_turn_id = None;
215    }
216
217    pub fn agent_message(&mut self, text: &str) {
218        self.lifecycle.emit_completed_agent_message(text);
219        self.record_pending_lifecycle_events();
220    }
221
222    pub fn agent_message_stream_update(&mut self, text: &str) -> bool {
223        if text.trim().is_empty() || !self.lifecycle.replace_assistant_text(text) {
224            return false;
225        }
226        let emitted = self.lifecycle.emit_assistant_snapshot(None);
227        self.record_pending_lifecycle_events();
228        emitted
229    }
230
231    pub fn agent_message_stream_complete(&mut self) {
232        let _ = self.lifecycle.complete_assistant_stream();
233        self.record_pending_lifecycle_events();
234    }
235
236    pub fn reasoning(&mut self, text: &str) {
237        self.lifecycle.emit_completed_reasoning(text);
238        self.record_pending_lifecycle_events();
239    }
240
241    pub fn set_reasoning_stage(&mut self, stage: &str) {
242        if !self.lifecycle.set_reasoning_stage(Some(stage.to_string())) {
243            return;
244        }
245        let _ = self.lifecycle.emit_reasoning_stage_update();
246        self.record_pending_lifecycle_events();
247    }
248
249    pub fn reasoning_stream_update(&mut self, text: &str) -> bool {
250        if text.trim().is_empty() || !self.lifecycle.replace_reasoning_text(text) {
251            return false;
252        }
253        let emitted = self.lifecycle.emit_reasoning_snapshot(None);
254        self.record_pending_lifecycle_events();
255        emitted
256    }
257
258    pub fn reasoning_stream_complete(&mut self) {
259        let _ = self.lifecycle.complete_reasoning_stream();
260        self.record_pending_lifecycle_events();
261    }
262
263    pub fn tool_started(
264        &mut self,
265        tool_name: &str,
266        arguments: Option<&Value>,
267        tool_call_id: Option<&str>,
268    ) -> ActiveToolHandle {
269        let handle = ActiveToolHandle {
270            id: self.next_item_id(),
271            tool_name: tool_name.to_string(),
272            arguments: arguments.cloned(),
273            tool_call_id: tool_call_id.map(str::to_string),
274        };
275        self.record(tool_started_event(
276            handle.id.clone(),
277            &handle.tool_name,
278            handle.arguments.as_ref(),
279            handle.tool_call_id.as_deref(),
280        ));
281        handle
282    }
283
284    pub fn tool_finished(
285        &mut self,
286        handle: &ActiveToolHandle,
287        status: crate::exec::events::ToolCallStatus,
288        exit_code: Option<i32>,
289        aggregated_output: &str,
290        spool_path: Option<&str>,
291    ) {
292        self.record(tool_invocation_completed_event(
293            handle.id.clone(),
294            &handle.tool_name,
295            handle.arguments.as_ref(),
296            handle.tool_call_id.as_deref(),
297            status.clone(),
298        ));
299        self.record(tool_output_completed_event(
300            handle.id.clone(),
301            handle.tool_call_id.as_deref(),
302            status,
303            exit_code,
304            spool_path,
305            aggregated_output,
306        ));
307    }
308
309    pub fn tool_output_started(&mut self, call_item_id: &str, tool_call_id: Option<&str>) {
310        self.record(tool_output_started_event(
311            call_item_id.to_string(),
312            tool_call_id,
313        ));
314    }
315
316    pub fn tool_output_updated(
317        &mut self,
318        call_item_id: &str,
319        tool_call_id: Option<&str>,
320        output: &str,
321    ) {
322        self.record(tool_output_updated_event(
323            call_item_id.to_string(),
324            tool_call_id,
325            output,
326        ));
327    }
328
329    pub fn tool_output_finished(
330        &mut self,
331        call_item_id: &str,
332        tool_call_id: Option<&str>,
333        status: crate::exec::events::ToolCallStatus,
334        exit_code: Option<i32>,
335        aggregated_output: &str,
336        spool_path: Option<&str>,
337    ) {
338        self.record(tool_output_completed_event(
339            call_item_id.to_string(),
340            tool_call_id,
341            status,
342            exit_code,
343            spool_path,
344            aggregated_output,
345        ));
346    }
347
348    pub fn tool_rejected(
349        &mut self,
350        tool_name: &str,
351        arguments: Option<&Value>,
352        tool_call_id: Option<&str>,
353        detail: &str,
354    ) {
355        let handle = self.tool_started(tool_name, arguments, tool_call_id);
356        let call_item_id = handle.id.clone();
357        self.record(tool_invocation_completed_event(
358            call_item_id.clone(),
359            tool_name,
360            arguments,
361            tool_call_id,
362            crate::exec::events::ToolCallStatus::Failed,
363        ));
364        self.record(tool_output_started_event(
365            call_item_id.clone(),
366            tool_call_id,
367        ));
368        self.record(tool_output_completed_event(
369            call_item_id,
370            tool_call_id,
371            crate::exec::events::ToolCallStatus::Failed,
372            None,
373            None,
374            detail,
375        ));
376        let error_item_id = self.next_item_id();
377        self.record(error_item_completed_event(
378            error_item_id,
379            detail.to_string(),
380        ));
381    }
382
383    pub fn command_started(&mut self, command: &str) -> ActiveCommandHandle {
384        let id = self.next_item_id();
385        let item = ThreadItem {
386            id: id.clone(),
387            details: ThreadItemDetails::CommandExecution(Box::new(CommandExecutionItem {
388                command: command.to_string(),
389                arguments: None,
390                aggregated_output: String::new(),
391                exit_code: None,
392                status: CommandExecutionStatus::InProgress,
393            })),
394        };
395        self.record(ThreadEvent::ItemStarted(ItemStartedEvent { item }));
396        ActiveCommandHandle {
397            id,
398            command: command.to_string(),
399        }
400    }
401
402    pub fn command_finished(
403        &mut self,
404        handle: &ActiveCommandHandle,
405        status: CommandExecutionStatus,
406        exit_code: Option<i32>,
407        aggregated_output: &str,
408    ) {
409        let item = ThreadItem {
410            id: handle.id.clone(),
411            details: ThreadItemDetails::CommandExecution(Box::new(CommandExecutionItem {
412                command: handle.command.clone(),
413                arguments: None,
414                aggregated_output: aggregated_output.to_string(),
415                exit_code,
416                status,
417            })),
418        };
419        self.record(ThreadEvent::ItemCompleted(ItemCompletedEvent { item }));
420    }
421
422    pub fn file_change_completed(&mut self, path: &str) {
423        let change = FileUpdateChange {
424            path: path.to_string(),
425            kind: PatchChangeKind::Update,
426        };
427        let item = ThreadItem {
428            id: self.next_item_id(),
429            details: ThreadItemDetails::FileChange(Box::new(FileChangeItem {
430                changes: vec![change],
431                status: PatchApplyStatus::Completed,
432            })),
433        };
434        self.record(ThreadEvent::ItemCompleted(ItemCompletedEvent { item }));
435    }
436
437    pub fn warning(&mut self, message: &str) {
438        let item = ThreadItem {
439            id: self.next_item_id(),
440            details: ThreadItemDetails::Error(ErrorItem {
441                message: message.to_string(),
442            }),
443        };
444        self.record(ThreadEvent::ItemCompleted(ItemCompletedEvent { item }));
445    }
446
447    pub fn harness_event(
448        &mut self,
449        event: HarnessEventKind,
450        message: Option<String>,
451        command: Option<String>,
452        path: Option<String>,
453        exit_code: Option<i32>,
454    ) {
455        let item = ThreadItem {
456            id: self.next_item_id(),
457            details: ThreadItemDetails::Harness(HarnessEventItem {
458                event,
459                message,
460                command,
461                path,
462                exit_code,
463            }),
464        };
465        self.record(ThreadEvent::ItemCompleted(ItemCompletedEvent { item }));
466    }
467
468    pub fn into_events(mut self) -> Vec<ThreadEvent> {
469        self.lifecycle.complete_open_items();
470        self.record_pending_lifecycle_events();
471        self.events
472    }
473}
474
475#[cfg(test)]
476mod tests {
477    use super::*;
478    use crate::core::threads::{ThreadBootstrap, ThreadManager};
479
480    fn make_recorder() -> ExecEventRecorder {
481        ExecEventRecorder::new("thread", None, None)
482    }
483
484    #[test]
485    fn streaming_events_flush_on_completion() {
486        let mut recorder = make_recorder();
487        recorder.turn_started();
488        assert!(recorder.agent_message_stream_update("partial"));
489        recorder.agent_message_stream_complete();
490        let events = recorder.into_events();
491        assert!(
492            events
493                .iter()
494                .any(|event| matches!(event, ThreadEvent::ItemCompleted(_)))
495        );
496    }
497
498    #[test]
499    fn command_events_capture_status() {
500        let mut recorder = make_recorder();
501        let handle = recorder.command_started("git status");
502        recorder.command_finished(&handle, CommandExecutionStatus::Completed, Some(0), "");
503        let events = recorder.into_events();
504        let command = events
505            .into_iter()
506            .filter_map(|event| match event {
507                ThreadEvent::ItemCompleted(event) => Some(event.item),
508                _ => None,
509            })
510            .find(|item| matches!(item.details, ThreadItemDetails::CommandExecution(_)))
511            .expect("command event should be emitted");
512
513        match command.details {
514            ThreadItemDetails::CommandExecution(details) => {
515                assert_eq!(details.command, "git status");
516                assert_eq!(details.status, CommandExecutionStatus::Completed);
517            }
518            _ => panic!("unexpected event variant"),
519        }
520    }
521
522    #[test]
523    fn rejected_tool_call_emits_failed_tool_output_item() {
524        let mut recorder = make_recorder();
525        recorder.tool_rejected("read_file", None, Some("call_1"), "Tool permission denied");
526
527        let events = recorder.into_events();
528        let tool_outputs = events
529            .iter()
530            .filter_map(|event| match event {
531                ThreadEvent::ItemCompleted(ItemCompletedEvent { item }) => match &item.details {
532                    ThreadItemDetails::ToolOutput(details) => Some(details),
533                    _ => None,
534                },
535                _ => None,
536            })
537            .collect::<Vec<_>>();
538
539        assert_eq!(tool_outputs.len(), 1);
540        assert_eq!(tool_outputs[0].tool_call_id.as_deref(), Some("call_1"));
541        assert_eq!(
542            tool_outputs[0].status,
543            crate::exec::events::ToolCallStatus::Failed
544        );
545        assert_eq!(tool_outputs[0].output, "Tool permission denied");
546    }
547
548    #[test]
549    fn thread_backed_recorder_reuses_submission_id_within_turn() {
550        let handle =
551            ThreadManager::new().start_thread_with_identifier("thread", ThreadBootstrap::new(None));
552        let mut recorder = ExecEventRecorder::new("thread", None, Some(handle.clone()));
553
554        recorder.turn_started();
555        recorder.agent_message("hello");
556        recorder.turn_completed();
557
558        let records = handle.replay_recent();
559        let submission_ids: std::collections::BTreeSet<String> = records
560            .iter()
561            .filter_map(|record| {
562                record
563                    .submission_id
564                    .as_ref()
565                    .map(|id| id.as_str().to_string())
566            })
567            .collect();
568
569        assert_eq!(submission_ids.len(), 1);
570        assert!(
571            records
572                .iter()
573                .any(|record| matches!(record.event, ThreadEvent::TurnStarted(_))
574                    && record.submission_id.is_some())
575        );
576        assert!(records.iter().any(
577            |record| matches!(record.event, ThreadEvent::TurnCompleted(_))
578                && record.submission_id.is_some()
579        ));
580    }
581
582    #[test]
583    fn thread_backed_recorder_keeps_full_event_history_beyond_thread_buffer() {
584        let handle = ThreadManager::with_event_buffer_capacity(2)
585            .start_thread_with_identifier("thread", ThreadBootstrap::new(None));
586        let mut recorder = ExecEventRecorder::new("thread", None, Some(handle.clone()));
587
588        recorder.turn_started();
589        recorder.agent_message("first");
590        recorder.agent_message("second");
591        recorder.turn_completed();
592
593        let full_events = recorder.into_events();
594        let buffered_events = handle.recent_events();
595
596        assert_eq!(buffered_events.len(), 2);
597        assert!(full_events.len() > buffered_events.len());
598        assert_eq!(
599            full_events
600                .iter()
601                .filter(|event| matches!(event, ThreadEvent::ItemCompleted(_)))
602                .count(),
603            2
604        );
605    }
606}