1use std::io::{self, Write};
2use std::time::SystemTime;
3
4use agentkit_core::{Item, ItemKind, Part, TokenUsage, Usage};
5use agentkit_loop::{AgentEvent, LoopObserver, TurnResult};
6use serde::Serialize;
7use thiserror::Error;
8
9#[derive(Debug, Error)]
10pub enum ReportError {
11 #[error("io error: {0}")]
12 Io(#[from] io::Error),
13 #[error("serialization error: {0}")]
14 Serialize(#[from] serde_json::Error),
15}
16
17#[derive(Clone, Debug, PartialEq, Serialize)]
18pub struct EventEnvelope<'a> {
19 pub timestamp: SystemTime,
20 pub event: &'a AgentEvent,
21}
22
23#[derive(Default)]
24pub struct CompositeReporter {
25 children: Vec<Box<dyn LoopObserver>>,
26}
27
28impl CompositeReporter {
29 pub fn new() -> Self {
30 Self::default()
31 }
32
33 pub fn with_observer(mut self, observer: impl LoopObserver + 'static) -> Self {
34 self.children.push(Box::new(observer));
35 self
36 }
37
38 pub fn push(&mut self, observer: impl LoopObserver + 'static) -> &mut Self {
39 self.children.push(Box::new(observer));
40 self
41 }
42}
43
44impl LoopObserver for CompositeReporter {
45 fn handle_event(&mut self, event: AgentEvent) {
46 for child in &mut self.children {
47 child.handle_event(event.clone());
48 }
49 }
50}
51
52pub struct JsonlReporter<W> {
53 writer: W,
54 flush_each_event: bool,
55 errors: Vec<ReportError>,
56}
57
58impl<W> JsonlReporter<W>
59where
60 W: Write,
61{
62 pub fn new(writer: W) -> Self {
63 Self {
64 writer,
65 flush_each_event: true,
66 errors: Vec::new(),
67 }
68 }
69
70 pub fn with_flush_each_event(mut self, flush_each_event: bool) -> Self {
71 self.flush_each_event = flush_each_event;
72 self
73 }
74
75 pub fn writer(&self) -> &W {
76 &self.writer
77 }
78
79 pub fn writer_mut(&mut self) -> &mut W {
80 &mut self.writer
81 }
82
83 pub fn take_errors(&mut self) -> Vec<ReportError> {
84 std::mem::take(&mut self.errors)
85 }
86
87 fn record_result(&mut self, result: Result<(), ReportError>) {
88 if let Err(error) = result {
89 self.errors.push(error);
90 }
91 }
92}
93
94impl<W> LoopObserver for JsonlReporter<W>
95where
96 W: Write + Send,
97{
98 fn handle_event(&mut self, event: AgentEvent) {
99 let result = (|| -> Result<(), ReportError> {
100 let envelope = EventEnvelope {
101 timestamp: SystemTime::now(),
102 event: &event,
103 };
104 serde_json::to_writer(&mut self.writer, &envelope)?;
105 self.writer.write_all(b"\n")?;
106 if self.flush_each_event {
107 self.writer.flush()?;
108 }
109 Ok(())
110 })();
111 self.record_result(result);
112 }
113}
114
115#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
116pub struct UsageTotals {
117 pub input_tokens: u64,
118 pub output_tokens: u64,
119 pub reasoning_tokens: u64,
120 pub cached_input_tokens: u64,
121}
122
123#[derive(Clone, Debug, Default, PartialEq)]
124pub struct CostTotals {
125 pub amount: f64,
126 pub currency: Option<String>,
127}
128
129#[derive(Clone, Debug, Default, PartialEq)]
130pub struct UsageSummary {
131 pub events_seen: usize,
132 pub usage_events_seen: usize,
133 pub turn_results_seen: usize,
134 pub totals: UsageTotals,
135 pub cost: Option<CostTotals>,
136}
137
138#[derive(Default)]
139pub struct UsageReporter {
140 summary: UsageSummary,
141}
142
143impl UsageReporter {
144 pub fn new() -> Self {
145 Self::default()
146 }
147
148 pub fn summary(&self) -> &UsageSummary {
149 &self.summary
150 }
151
152 fn absorb(&mut self, usage: &Usage) {
153 self.summary.usage_events_seen += 1;
154 if let Some(tokens) = &usage.tokens {
155 self.summary.totals.input_tokens += tokens.input_tokens;
156 self.summary.totals.output_tokens += tokens.output_tokens;
157 self.summary.totals.reasoning_tokens += tokens.reasoning_tokens.unwrap_or_default();
158 self.summary.totals.cached_input_tokens +=
159 tokens.cached_input_tokens.unwrap_or_default();
160 }
161 if let Some(cost) = &usage.cost {
162 let totals = self.summary.cost.get_or_insert_with(CostTotals::default);
163 totals.amount += cost.amount;
164 if totals.currency.is_none() {
165 totals.currency = Some(cost.currency.clone());
166 }
167 }
168 }
169}
170
171impl LoopObserver for UsageReporter {
172 fn handle_event(&mut self, event: AgentEvent) {
173 self.summary.events_seen += 1;
174 match event {
175 AgentEvent::UsageUpdated(usage) => self.absorb(&usage),
176 AgentEvent::TurnFinished(TurnResult {
177 usage: Some(usage), ..
178 }) => {
179 self.summary.turn_results_seen += 1;
180 self.absorb(&usage);
181 }
182 AgentEvent::TurnFinished(_) => {
183 self.summary.turn_results_seen += 1;
184 }
185 _ => {}
186 }
187 }
188}
189
190#[derive(Clone, Debug, Default, PartialEq)]
191pub struct TranscriptView {
192 pub items: Vec<Item>,
193}
194
195#[derive(Default)]
196pub struct TranscriptReporter {
197 transcript: TranscriptView,
198}
199
200impl TranscriptReporter {
201 pub fn new() -> Self {
202 Self::default()
203 }
204
205 pub fn transcript(&self) -> &TranscriptView {
206 &self.transcript
207 }
208}
209
210impl LoopObserver for TranscriptReporter {
211 fn handle_event(&mut self, event: AgentEvent) {
212 match event {
213 AgentEvent::InputAccepted { items, .. } => {
214 self.transcript.items.extend(items);
215 }
216 AgentEvent::TurnFinished(result) => {
217 self.transcript.items.extend(result.items);
218 }
219 _ => {}
220 }
221 }
222}
223
224pub struct StdoutReporter<W> {
225 writer: W,
226 show_usage: bool,
227 errors: Vec<ReportError>,
228}
229
230impl<W> StdoutReporter<W>
231where
232 W: Write,
233{
234 pub fn new(writer: W) -> Self {
235 Self {
236 writer,
237 show_usage: true,
238 errors: Vec::new(),
239 }
240 }
241
242 pub fn with_usage(mut self, show_usage: bool) -> Self {
243 self.show_usage = show_usage;
244 self
245 }
246
247 pub fn writer(&self) -> &W {
248 &self.writer
249 }
250
251 pub fn take_errors(&mut self) -> Vec<ReportError> {
252 std::mem::take(&mut self.errors)
253 }
254
255 fn record_result(&mut self, result: Result<(), ReportError>) {
256 if let Err(error) = result {
257 self.errors.push(error);
258 }
259 }
260}
261
262impl<W> LoopObserver for StdoutReporter<W>
263where
264 W: Write + Send,
265{
266 fn handle_event(&mut self, event: AgentEvent) {
267 let result = write_stdout_event(&mut self.writer, &event, self.show_usage);
268 self.record_result(result);
269 }
270}
271
272fn write_stdout_event<W>(
273 writer: &mut W,
274 event: &AgentEvent,
275 show_usage: bool,
276) -> Result<(), ReportError>
277where
278 W: Write,
279{
280 match event {
281 AgentEvent::RunStarted { session_id } => {
282 writeln!(writer, "[run] started session={session_id}")?;
283 }
284 AgentEvent::TurnStarted {
285 session_id,
286 turn_id,
287 } => {
288 writeln!(writer, "[turn] started session={session_id} turn={turn_id}")?;
289 }
290 AgentEvent::InputAccepted { items, .. } => {
291 writeln!(writer, "[input] accepted items={}", items.len())?;
292 }
293 AgentEvent::ContentDelta(delta) => {
294 writeln!(writer, "[delta] {delta:?}")?;
295 }
296 AgentEvent::ToolCallRequested(call) => {
297 writeln!(writer, "[tool] call {} {}", call.name, call.input)?;
298 }
299 AgentEvent::ApprovalRequired(request) => {
300 writeln!(
301 writer,
302 "[approval] {} {:?}",
303 request.summary, request.reason
304 )?;
305 }
306 AgentEvent::AuthRequired(request) => {
307 writeln!(writer, "[auth] required provider={}", request.provider)?;
308 }
309 AgentEvent::ApprovalResolved { approved } => {
310 writeln!(writer, "[approval] resolved approved={approved}")?;
311 }
312 AgentEvent::AuthResolved { provided } => {
313 writeln!(writer, "[auth] resolved provided={provided}")?;
314 }
315 AgentEvent::CompactionStarted {
316 turn_id, reason, ..
317 } => {
318 writeln!(
319 writer,
320 "[compaction] started turn={} reason={reason:?}",
321 turn_id
322 .as_ref()
323 .map(ToString::to_string)
324 .unwrap_or_else(|| "none".into())
325 )?;
326 }
327 AgentEvent::CompactionFinished {
328 turn_id,
329 replaced_items,
330 transcript_len,
331 ..
332 } => {
333 writeln!(
334 writer,
335 "[compaction] finished turn={} replaced_items={} transcript_len={}",
336 turn_id
337 .as_ref()
338 .map(ToString::to_string)
339 .unwrap_or_else(|| "none".into()),
340 replaced_items,
341 transcript_len
342 )?;
343 }
344 AgentEvent::UsageUpdated(usage) if show_usage => {
345 writeln!(writer, "[usage] {}", format_usage(usage))?;
346 }
347 AgentEvent::UsageUpdated(_) => {}
348 AgentEvent::Warning { message } => {
349 writeln!(writer, "[warning] {message}")?;
350 }
351 AgentEvent::RunFailed { message } => {
352 writeln!(writer, "[error] {message}")?;
353 }
354 AgentEvent::TurnFinished(result) => {
355 writeln!(
356 writer,
357 "[turn] finished reason={:?} items={}",
358 result.finish_reason,
359 result.items.len()
360 )?;
361 for item in &result.items {
362 write_item_summary(writer, item)?;
363 }
364 if show_usage && let Some(usage) = &result.usage {
365 writeln!(writer, "[usage] {}", format_usage(usage))?;
366 }
367 }
368 }
369
370 writer.flush()?;
371 Ok(())
372}
373
374fn write_item_summary<W>(writer: &mut W, item: &Item) -> Result<(), ReportError>
375where
376 W: Write,
377{
378 writeln!(writer, " [{}]", item_kind_name(item.kind))?;
379 for part in &item.parts {
380 match part {
381 Part::Text(text) => writeln!(writer, " [text] {}", text.text)?,
382 Part::Reasoning(reasoning) => {
383 if let Some(summary) = &reasoning.summary {
384 writeln!(writer, " [reasoning] {summary}")?;
385 } else {
386 writeln!(writer, " [reasoning]")?;
387 }
388 }
389 Part::ToolCall(call) => {
390 writeln!(writer, " [tool-call] {} {}", call.name, call.input)?
391 }
392 Part::ToolResult(result) => writeln!(
393 writer,
394 " [tool-result] call={} error={}",
395 result.call_id, result.is_error
396 )?,
397 Part::Structured(value) => writeln!(writer, " [structured] {}", value.value)?,
398 Part::Media(media) => writeln!(
399 writer,
400 " [media] {:?} {}",
401 media.modality, media.mime_type
402 )?,
403 Part::File(file) => writeln!(
404 writer,
405 " [file] {}",
406 file.name.as_deref().unwrap_or("<unnamed>")
407 )?,
408 Part::Custom(custom) => writeln!(writer, " [custom] {}", custom.kind)?,
409 }
410 }
411 Ok(())
412}
413
414fn item_kind_name(kind: ItemKind) -> &'static str {
415 match kind {
416 ItemKind::System => "system",
417 ItemKind::Developer => "developer",
418 ItemKind::User => "user",
419 ItemKind::Assistant => "assistant",
420 ItemKind::Tool => "tool",
421 ItemKind::Context => "context",
422 }
423}
424
425fn format_usage(usage: &Usage) -> String {
426 match &usage.tokens {
427 Some(TokenUsage {
428 input_tokens,
429 output_tokens,
430 reasoning_tokens,
431 cached_input_tokens,
432 }) => format!(
433 "input={} output={} reasoning={} cached_input={}",
434 input_tokens,
435 output_tokens,
436 reasoning_tokens.unwrap_or_default(),
437 cached_input_tokens.unwrap_or_default()
438 ),
439 None => "no token usage".into(),
440 }
441}
442
443#[cfg(test)]
444mod tests {
445 use super::*;
446 use agentkit_core::{FinishReason, MetadataMap, SessionId, TextPart};
447 use agentkit_loop::TurnResult;
448
449 #[test]
450 fn usage_reporter_accumulates_usage_events_and_turn_results() {
451 let mut reporter = UsageReporter::new();
452
453 reporter.handle_event(AgentEvent::UsageUpdated(Usage {
454 tokens: Some(TokenUsage {
455 input_tokens: 10,
456 output_tokens: 5,
457 reasoning_tokens: Some(2),
458 cached_input_tokens: Some(1),
459 }),
460 cost: None,
461 metadata: MetadataMap::new(),
462 }));
463
464 reporter.handle_event(AgentEvent::TurnFinished(TurnResult {
465 turn_id: "turn-1".into(),
466 finish_reason: FinishReason::Completed,
467 items: Vec::new(),
468 usage: Some(Usage {
469 tokens: Some(TokenUsage {
470 input_tokens: 3,
471 output_tokens: 4,
472 reasoning_tokens: Some(1),
473 cached_input_tokens: None,
474 }),
475 cost: None,
476 metadata: MetadataMap::new(),
477 }),
478 metadata: MetadataMap::new(),
479 }));
480
481 let summary = reporter.summary();
482 assert_eq!(summary.events_seen, 2);
483 assert_eq!(summary.usage_events_seen, 2);
484 assert_eq!(summary.turn_results_seen, 1);
485 assert_eq!(summary.totals.input_tokens, 13);
486 assert_eq!(summary.totals.output_tokens, 9);
487 assert_eq!(summary.totals.reasoning_tokens, 3);
488 assert_eq!(summary.totals.cached_input_tokens, 1);
489 }
490
491 #[test]
492 fn transcript_reporter_tracks_inputs_and_outputs() {
493 let mut reporter = TranscriptReporter::new();
494
495 reporter.handle_event(AgentEvent::InputAccepted {
496 session_id: SessionId::new("session-1"),
497 items: vec![Item {
498 id: None,
499 kind: ItemKind::User,
500 parts: vec![Part::Text(TextPart {
501 text: "hello".into(),
502 metadata: MetadataMap::new(),
503 })],
504 metadata: MetadataMap::new(),
505 }],
506 });
507
508 reporter.handle_event(AgentEvent::TurnFinished(TurnResult {
509 turn_id: "turn-1".into(),
510 finish_reason: FinishReason::Completed,
511 items: vec![Item {
512 id: None,
513 kind: ItemKind::Assistant,
514 parts: vec![Part::Text(TextPart {
515 text: "hi".into(),
516 metadata: MetadataMap::new(),
517 })],
518 metadata: MetadataMap::new(),
519 }],
520 usage: None,
521 metadata: MetadataMap::new(),
522 }));
523
524 assert_eq!(reporter.transcript().items.len(), 2);
525 assert_eq!(reporter.transcript().items[0].kind, ItemKind::User);
526 assert_eq!(reporter.transcript().items[1].kind, ItemKind::Assistant);
527 }
528
529 #[test]
530 fn jsonl_reporter_serializes_event_envelopes() {
531 let mut reporter = JsonlReporter::new(Vec::new());
532 reporter.handle_event(AgentEvent::RunStarted {
533 session_id: SessionId::new("session-1"),
534 });
535
536 let output = String::from_utf8(reporter.writer().clone()).unwrap();
537 assert!(output.contains("\"RunStarted\""));
538 assert!(output.contains("session-1"));
539 }
540}