1mod lifecycle;
4pub use lifecycle::{
5 SharedLifecycleEmitter, ToolOutputPayload, error_item_completed_event,
6 tool_invocation_completed_event, tool_output_completed_event, tool_output_item_id,
7 tool_output_payload_from_value, tool_output_started_event, tool_output_updated_event,
8 tool_started_event,
9};
10
11use crate::core::threads::{SubmissionId, ThreadRuntimeHandle};
12use crate::exec::events::{
13 CommandExecutionItem, CommandExecutionStatus, CompactionMode, CompactionTrigger, ErrorItem,
14 FileChangeItem, FileUpdateChange, HarnessEventItem, HarnessEventKind, ItemCompletedEvent,
15 ItemStartedEvent, PatchApplyStatus, PatchChangeKind, ThreadCompactBoundaryEvent,
16 ThreadCompletedEvent, ThreadCompletionSubtype, ThreadEvent, ThreadItem, ThreadItemDetails,
17 ThreadStartedEvent, TurnCompletedEvent, TurnFailedEvent, TurnStartedEvent, Usage,
18};
19use parking_lot::Mutex;
20use serde_json::Value;
21use std::sync::Arc;
22use uuid::Uuid;
23
24pub type EventSink = Arc<Mutex<Box<dyn FnMut(&ThreadEvent) + Send>>>;
26
27#[doc(hidden)]
28pub fn event_sink<F>(callback: F) -> EventSink
29where
30 F: FnMut(&ThreadEvent) + Send + 'static,
31{
32 Arc::new(Mutex::new(Box::new(callback)))
33}
34
35#[derive(Debug, Clone)]
36pub struct ActiveCommandHandle {
37 id: String,
38 command: String,
39}
40
41#[derive(Debug, Clone)]
42pub struct ActiveToolHandle {
43 id: String,
44 tool_name: String,
45 arguments: Option<Value>,
46 tool_call_id: Option<String>,
47}
48
49impl ActiveToolHandle {
50 #[must_use]
51 pub fn item_id(&self) -> &str {
52 &self.id
53 }
54}
55
56#[derive(Default)]
58pub struct ExecEventRecorder {
59 thread_id: String,
60 events: Vec<ThreadEvent>,
61 event_sink: Option<EventSink>,
62 thread_handle: Option<ThreadRuntimeHandle>,
63 active_submission_id: Option<SubmissionId>,
64 active_turn_id: Option<String>,
65 lifecycle: SharedLifecycleEmitter,
66}
67
68impl ExecEventRecorder {
69 pub fn new(
70 thread_id: impl Into<String>,
71 event_sink: Option<EventSink>,
72 thread_handle: Option<ThreadRuntimeHandle>,
73 ) -> Self {
74 let thread_id = thread_id.into();
75 let mut recorder = Self {
76 thread_id: thread_id.clone(),
77 events: Vec::new(),
78 event_sink,
79 thread_handle,
80 active_submission_id: None,
81 active_turn_id: None,
82 lifecycle: SharedLifecycleEmitter::default(),
83 };
84 recorder.record_with_context(
85 None,
86 None,
87 ThreadEvent::ThreadStarted(ThreadStartedEvent { thread_id }),
88 );
89 recorder
90 }
91
92 fn record(&mut self, event: ThreadEvent) {
93 self.record_with_context(
94 self.active_submission_id.clone(),
95 self.active_turn_id.clone(),
96 event,
97 );
98 }
99
100 fn record_with_context(
101 &mut self,
102 submission_id: Option<SubmissionId>,
103 turn_id: Option<String>,
104 event: ThreadEvent,
105 ) {
106 if let Some(sink) = &self.event_sink {
107 let mut callback = sink.lock();
108 callback(&event);
109 }
110 if let Some(handle) = &self.thread_handle {
111 handle.record_event(submission_id, turn_id, event.clone());
112 }
113 self.events.push(event);
114 }
115
116 pub fn record_thread_event(&mut self, event: ThreadEvent) {
117 self.record(event);
118 }
119
120 pub fn record_thread_events<I>(&mut self, events: I)
121 where
122 I: IntoIterator<Item = ThreadEvent>,
123 {
124 for event in events {
125 self.record(event);
126 }
127 }
128
129 fn record_pending_lifecycle_events(&mut self) {
130 for event in self.lifecycle.drain_events() {
131 self.record(event);
132 }
133 }
134
135 fn next_item_id(&mut self) -> String {
136 self.lifecycle.next_item_id()
137 }
138
139 pub fn turn_started(&mut self) {
140 if let Some(handle) = &self.thread_handle {
141 match handle.begin_turn() {
142 Ok(submission_id) => self.active_submission_id = Some(submission_id),
143 Err(_) => self.active_submission_id = None,
144 }
145 self.active_turn_id = Some(format!("turn-{}", Uuid::new_v4()));
146 }
147 self.record(ThreadEvent::TurnStarted(TurnStartedEvent::default()));
148 }
149
150 pub fn turn_completed(&mut self) {
151 self.record(ThreadEvent::TurnCompleted(TurnCompletedEvent {
152 usage: Usage::default(),
153 }));
154 self.finish_turn();
155 }
156
157 pub fn turn_failed(&mut self, message: &str) {
158 self.record(ThreadEvent::TurnFailed(TurnFailedEvent {
159 message: message.to_string(),
160 usage: None,
161 }));
162 self.finish_turn();
163 }
164
165 pub fn thread_completed(
166 &mut self,
167 session_id: &str,
168 subtype: ThreadCompletionSubtype,
169 outcome_code: &str,
170 result: Option<&str>,
171 stop_reason: Option<&str>,
172 usage: Usage,
173 total_cost_usd: Option<serde_json::Number>,
174 num_turns: usize,
175 ) {
176 self.record(ThreadEvent::ThreadCompleted(ThreadCompletedEvent {
177 thread_id: self.thread_id.clone(),
178 session_id: session_id.to_string(),
179 subtype,
180 outcome_code: outcome_code.to_string(),
181 result: result.map(str::to_string),
182 stop_reason: stop_reason.map(str::to_string),
183 usage,
184 total_cost_usd,
185 num_turns,
186 }));
187 }
188
189 pub fn compact_boundary(
190 &mut self,
191 trigger: CompactionTrigger,
192 mode: CompactionMode,
193 original_message_count: usize,
194 compacted_message_count: usize,
195 history_artifact_path: Option<&str>,
196 ) {
197 self.record(ThreadEvent::ThreadCompactBoundary(
198 ThreadCompactBoundaryEvent {
199 thread_id: self.thread_id.clone(),
200 trigger,
201 mode,
202 original_message_count,
203 compacted_message_count,
204 history_artifact_path: history_artifact_path.map(str::to_string),
205 },
206 ));
207 }
208
209 fn finish_turn(&mut self) {
210 if let Some(handle) = &self.thread_handle {
211 handle.finish_turn();
212 }
213 self.active_submission_id = None;
214 self.active_turn_id = None;
215 }
216
217 pub fn agent_message(&mut self, text: &str) {
218 self.lifecycle.emit_completed_agent_message(text);
219 self.record_pending_lifecycle_events();
220 }
221
222 pub fn agent_message_stream_update(&mut self, text: &str) -> bool {
223 if text.trim().is_empty() || !self.lifecycle.replace_assistant_text(text) {
224 return false;
225 }
226 let emitted = self.lifecycle.emit_assistant_snapshot(None);
227 self.record_pending_lifecycle_events();
228 emitted
229 }
230
231 pub fn agent_message_stream_complete(&mut self) {
232 let _ = self.lifecycle.complete_assistant_stream();
233 self.record_pending_lifecycle_events();
234 }
235
236 pub fn reasoning(&mut self, text: &str) {
237 self.lifecycle.emit_completed_reasoning(text);
238 self.record_pending_lifecycle_events();
239 }
240
241 pub fn set_reasoning_stage(&mut self, stage: &str) {
242 if !self.lifecycle.set_reasoning_stage(Some(stage.to_string())) {
243 return;
244 }
245 let _ = self.lifecycle.emit_reasoning_stage_update();
246 self.record_pending_lifecycle_events();
247 }
248
249 pub fn reasoning_stream_update(&mut self, text: &str) -> bool {
250 if text.trim().is_empty() || !self.lifecycle.replace_reasoning_text(text) {
251 return false;
252 }
253 let emitted = self.lifecycle.emit_reasoning_snapshot(None);
254 self.record_pending_lifecycle_events();
255 emitted
256 }
257
258 pub fn reasoning_stream_complete(&mut self) {
259 let _ = self.lifecycle.complete_reasoning_stream();
260 self.record_pending_lifecycle_events();
261 }
262
263 pub fn tool_started(
264 &mut self,
265 tool_name: &str,
266 arguments: Option<&Value>,
267 tool_call_id: Option<&str>,
268 ) -> ActiveToolHandle {
269 let handle = ActiveToolHandle {
270 id: self.next_item_id(),
271 tool_name: tool_name.to_string(),
272 arguments: arguments.cloned(),
273 tool_call_id: tool_call_id.map(str::to_string),
274 };
275 self.record(tool_started_event(
276 handle.id.clone(),
277 &handle.tool_name,
278 handle.arguments.as_ref(),
279 handle.tool_call_id.as_deref(),
280 ));
281 handle
282 }
283
284 pub fn tool_finished(
285 &mut self,
286 handle: &ActiveToolHandle,
287 status: crate::exec::events::ToolCallStatus,
288 exit_code: Option<i32>,
289 aggregated_output: &str,
290 spool_path: Option<&str>,
291 ) {
292 self.record(tool_invocation_completed_event(
293 handle.id.clone(),
294 &handle.tool_name,
295 handle.arguments.as_ref(),
296 handle.tool_call_id.as_deref(),
297 status.clone(),
298 ));
299 self.record(tool_output_completed_event(
300 handle.id.clone(),
301 handle.tool_call_id.as_deref(),
302 status,
303 exit_code,
304 spool_path,
305 aggregated_output,
306 ));
307 }
308
309 pub fn tool_output_started(&mut self, call_item_id: &str, tool_call_id: Option<&str>) {
310 self.record(tool_output_started_event(
311 call_item_id.to_string(),
312 tool_call_id,
313 ));
314 }
315
316 pub fn tool_output_updated(
317 &mut self,
318 call_item_id: &str,
319 tool_call_id: Option<&str>,
320 output: &str,
321 ) {
322 self.record(tool_output_updated_event(
323 call_item_id.to_string(),
324 tool_call_id,
325 output,
326 ));
327 }
328
329 pub fn tool_output_finished(
330 &mut self,
331 call_item_id: &str,
332 tool_call_id: Option<&str>,
333 status: crate::exec::events::ToolCallStatus,
334 exit_code: Option<i32>,
335 aggregated_output: &str,
336 spool_path: Option<&str>,
337 ) {
338 self.record(tool_output_completed_event(
339 call_item_id.to_string(),
340 tool_call_id,
341 status,
342 exit_code,
343 spool_path,
344 aggregated_output,
345 ));
346 }
347
348 pub fn tool_rejected(
349 &mut self,
350 tool_name: &str,
351 arguments: Option<&Value>,
352 tool_call_id: Option<&str>,
353 detail: &str,
354 ) {
355 let handle = self.tool_started(tool_name, arguments, tool_call_id);
356 let call_item_id = handle.id.clone();
357 self.record(tool_invocation_completed_event(
358 call_item_id.clone(),
359 tool_name,
360 arguments,
361 tool_call_id,
362 crate::exec::events::ToolCallStatus::Failed,
363 ));
364 self.record(tool_output_started_event(
365 call_item_id.clone(),
366 tool_call_id,
367 ));
368 self.record(tool_output_completed_event(
369 call_item_id,
370 tool_call_id,
371 crate::exec::events::ToolCallStatus::Failed,
372 None,
373 None,
374 detail,
375 ));
376 let error_item_id = self.next_item_id();
377 self.record(error_item_completed_event(
378 error_item_id,
379 detail.to_string(),
380 ));
381 }
382
383 pub fn command_started(&mut self, command: &str) -> ActiveCommandHandle {
384 let id = self.next_item_id();
385 let item = ThreadItem {
386 id: id.clone(),
387 details: ThreadItemDetails::CommandExecution(Box::new(CommandExecutionItem {
388 command: command.to_string(),
389 arguments: None,
390 aggregated_output: String::new(),
391 exit_code: None,
392 status: CommandExecutionStatus::InProgress,
393 })),
394 };
395 self.record(ThreadEvent::ItemStarted(ItemStartedEvent { item }));
396 ActiveCommandHandle {
397 id,
398 command: command.to_string(),
399 }
400 }
401
402 pub fn command_finished(
403 &mut self,
404 handle: &ActiveCommandHandle,
405 status: CommandExecutionStatus,
406 exit_code: Option<i32>,
407 aggregated_output: &str,
408 ) {
409 let item = ThreadItem {
410 id: handle.id.clone(),
411 details: ThreadItemDetails::CommandExecution(Box::new(CommandExecutionItem {
412 command: handle.command.clone(),
413 arguments: None,
414 aggregated_output: aggregated_output.to_string(),
415 exit_code,
416 status,
417 })),
418 };
419 self.record(ThreadEvent::ItemCompleted(ItemCompletedEvent { item }));
420 }
421
422 pub fn file_change_completed(&mut self, path: &str) {
423 let change = FileUpdateChange {
424 path: path.to_string(),
425 kind: PatchChangeKind::Update,
426 };
427 let item = ThreadItem {
428 id: self.next_item_id(),
429 details: ThreadItemDetails::FileChange(Box::new(FileChangeItem {
430 changes: vec![change],
431 status: PatchApplyStatus::Completed,
432 })),
433 };
434 self.record(ThreadEvent::ItemCompleted(ItemCompletedEvent { item }));
435 }
436
437 pub fn warning(&mut self, message: &str) {
438 let item = ThreadItem {
439 id: self.next_item_id(),
440 details: ThreadItemDetails::Error(ErrorItem {
441 message: message.to_string(),
442 }),
443 };
444 self.record(ThreadEvent::ItemCompleted(ItemCompletedEvent { item }));
445 }
446
447 pub fn harness_event(
448 &mut self,
449 event: HarnessEventKind,
450 message: Option<String>,
451 command: Option<String>,
452 path: Option<String>,
453 exit_code: Option<i32>,
454 ) {
455 let item = ThreadItem {
456 id: self.next_item_id(),
457 details: ThreadItemDetails::Harness(HarnessEventItem {
458 event,
459 message,
460 command,
461 path,
462 exit_code,
463 }),
464 };
465 self.record(ThreadEvent::ItemCompleted(ItemCompletedEvent { item }));
466 }
467
468 pub fn into_events(mut self) -> Vec<ThreadEvent> {
469 self.lifecycle.complete_open_items();
470 self.record_pending_lifecycle_events();
471 self.events
472 }
473}
474
475#[cfg(test)]
476mod tests {
477 use super::*;
478 use crate::core::threads::{ThreadBootstrap, ThreadManager};
479
480 fn make_recorder() -> ExecEventRecorder {
481 ExecEventRecorder::new("thread", None, None)
482 }
483
484 #[test]
485 fn streaming_events_flush_on_completion() {
486 let mut recorder = make_recorder();
487 recorder.turn_started();
488 assert!(recorder.agent_message_stream_update("partial"));
489 recorder.agent_message_stream_complete();
490 let events = recorder.into_events();
491 assert!(
492 events
493 .iter()
494 .any(|event| matches!(event, ThreadEvent::ItemCompleted(_)))
495 );
496 }
497
498 #[test]
499 fn command_events_capture_status() {
500 let mut recorder = make_recorder();
501 let handle = recorder.command_started("git status");
502 recorder.command_finished(&handle, CommandExecutionStatus::Completed, Some(0), "");
503 let events = recorder.into_events();
504 let command = events
505 .into_iter()
506 .filter_map(|event| match event {
507 ThreadEvent::ItemCompleted(event) => Some(event.item),
508 _ => None,
509 })
510 .find(|item| matches!(item.details, ThreadItemDetails::CommandExecution(_)))
511 .expect("command event should be emitted");
512
513 match command.details {
514 ThreadItemDetails::CommandExecution(details) => {
515 assert_eq!(details.command, "git status");
516 assert_eq!(details.status, CommandExecutionStatus::Completed);
517 }
518 _ => panic!("unexpected event variant"),
519 }
520 }
521
522 #[test]
523 fn rejected_tool_call_emits_failed_tool_output_item() {
524 let mut recorder = make_recorder();
525 recorder.tool_rejected("read_file", None, Some("call_1"), "Tool permission denied");
526
527 let events = recorder.into_events();
528 let tool_outputs = events
529 .iter()
530 .filter_map(|event| match event {
531 ThreadEvent::ItemCompleted(ItemCompletedEvent { item }) => match &item.details {
532 ThreadItemDetails::ToolOutput(details) => Some(details),
533 _ => None,
534 },
535 _ => None,
536 })
537 .collect::<Vec<_>>();
538
539 assert_eq!(tool_outputs.len(), 1);
540 assert_eq!(tool_outputs[0].tool_call_id.as_deref(), Some("call_1"));
541 assert_eq!(
542 tool_outputs[0].status,
543 crate::exec::events::ToolCallStatus::Failed
544 );
545 assert_eq!(tool_outputs[0].output, "Tool permission denied");
546 }
547
548 #[test]
549 fn thread_backed_recorder_reuses_submission_id_within_turn() {
550 let handle =
551 ThreadManager::new().start_thread_with_identifier("thread", ThreadBootstrap::new(None));
552 let mut recorder = ExecEventRecorder::new("thread", None, Some(handle.clone()));
553
554 recorder.turn_started();
555 recorder.agent_message("hello");
556 recorder.turn_completed();
557
558 let records = handle.replay_recent();
559 let submission_ids: std::collections::BTreeSet<String> = records
560 .iter()
561 .filter_map(|record| {
562 record
563 .submission_id
564 .as_ref()
565 .map(|id| id.as_str().to_string())
566 })
567 .collect();
568
569 assert_eq!(submission_ids.len(), 1);
570 assert!(
571 records
572 .iter()
573 .any(|record| matches!(record.event, ThreadEvent::TurnStarted(_))
574 && record.submission_id.is_some())
575 );
576 assert!(records.iter().any(
577 |record| matches!(record.event, ThreadEvent::TurnCompleted(_))
578 && record.submission_id.is_some()
579 ));
580 }
581
582 #[test]
583 fn thread_backed_recorder_keeps_full_event_history_beyond_thread_buffer() {
584 let handle = ThreadManager::with_event_buffer_capacity(2)
585 .start_thread_with_identifier("thread", ThreadBootstrap::new(None));
586 let mut recorder = ExecEventRecorder::new("thread", None, Some(handle.clone()));
587
588 recorder.turn_started();
589 recorder.agent_message("first");
590 recorder.agent_message("second");
591 recorder.turn_completed();
592
593 let full_events = recorder.into_events();
594 let buffered_events = handle.recent_events();
595
596 assert_eq!(buffered_events.len(), 2);
597 assert!(full_events.len() > buffered_events.len());
598 assert_eq!(
599 full_events
600 .iter()
601 .filter(|event| matches!(event, ThreadEvent::ItemCompleted(_)))
602 .count(),
603 2
604 );
605 }
606}