1use std::io::{self, Write};
31use std::time::SystemTime;
32
33use agentkit_core::{Item, ItemKind, Part, TokenUsage, Usage};
34use agentkit_loop::{AgentEvent, LoopObserver, TurnResult};
35use serde::Serialize;
36use thiserror::Error;
37
38#[derive(Debug, Error)]
45pub enum ReportError {
46 #[error("io error: {0}")]
48 Io(#[from] io::Error),
49 #[error("serialization error: {0}")]
51 Serialize(#[from] serde_json::Error),
52}
53
54#[derive(Clone, Debug, PartialEq, Serialize)]
60pub struct EventEnvelope<'a> {
61 pub timestamp: SystemTime,
63 pub event: &'a AgentEvent,
65}
66
67#[derive(Default)]
87pub struct CompositeReporter {
88 children: Vec<Box<dyn LoopObserver>>,
89}
90
91impl CompositeReporter {
92 pub fn new() -> Self {
94 Self::default()
95 }
96
97 pub fn with_observer(mut self, observer: impl LoopObserver + 'static) -> Self {
103 self.children.push(Box::new(observer));
104 self
105 }
106
107 pub fn push(&mut self, observer: impl LoopObserver + 'static) -> &mut Self {
116 self.children.push(Box::new(observer));
117 self
118 }
119}
120
121impl LoopObserver for CompositeReporter {
122 fn handle_event(&mut self, event: AgentEvent) {
123 for child in &mut self.children {
124 child.handle_event(event.clone());
125 }
126 }
127}
128
129pub struct JsonlReporter<W> {
154 writer: W,
155 flush_each_event: bool,
156 errors: Vec<ReportError>,
157}
158
159impl<W> JsonlReporter<W>
160where
161 W: Write,
162{
163 pub fn new(writer: W) -> Self {
173 Self {
174 writer,
175 flush_each_event: true,
176 errors: Vec::new(),
177 }
178 }
179
180 pub fn with_flush_each_event(mut self, flush_each_event: bool) -> Self {
184 self.flush_each_event = flush_each_event;
185 self
186 }
187
188 pub fn writer(&self) -> &W {
192 &self.writer
193 }
194
195 pub fn writer_mut(&mut self) -> &mut W {
197 &mut self.writer
198 }
199
200 pub fn take_errors(&mut self) -> Vec<ReportError> {
204 std::mem::take(&mut self.errors)
205 }
206
207 fn record_result(&mut self, result: Result<(), ReportError>) {
208 if let Err(error) = result {
209 self.errors.push(error);
210 }
211 }
212}
213
214impl<W> LoopObserver for JsonlReporter<W>
215where
216 W: Write + Send,
217{
218 fn handle_event(&mut self, event: AgentEvent) {
219 let result = (|| -> Result<(), ReportError> {
220 let envelope = EventEnvelope {
221 timestamp: SystemTime::now(),
222 event: &event,
223 };
224 serde_json::to_writer(&mut self.writer, &envelope)?;
225 self.writer.write_all(b"\n")?;
226 if self.flush_each_event {
227 self.writer.flush()?;
228 }
229 Ok(())
230 })();
231 self.record_result(result);
232 }
233}
234
235#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
237pub struct UsageTotals {
238 pub input_tokens: u64,
240 pub output_tokens: u64,
242 pub reasoning_tokens: u64,
244 pub cached_input_tokens: u64,
246 pub cache_write_input_tokens: u64,
248}
249
250#[derive(Clone, Debug, Default, PartialEq)]
252pub struct CostTotals {
253 pub amount: f64,
255 pub currency: Option<String>,
257}
258
259#[derive(Clone, Debug, Default, PartialEq)]
263pub struct UsageSummary {
264 pub events_seen: usize,
266 pub usage_events_seen: usize,
269 pub turn_results_seen: usize,
271 pub totals: UsageTotals,
273 pub cost: Option<CostTotals>,
275}
276
277#[derive(Default)]
301pub struct UsageReporter {
302 summary: UsageSummary,
303}
304
305impl UsageReporter {
306 pub fn new() -> Self {
308 Self::default()
309 }
310
311 pub fn summary(&self) -> &UsageSummary {
313 &self.summary
314 }
315
316 fn absorb(&mut self, usage: &Usage) {
317 self.summary.usage_events_seen += 1;
318 if let Some(tokens) = &usage.tokens {
319 self.summary.totals.input_tokens += tokens.input_tokens;
320 self.summary.totals.output_tokens += tokens.output_tokens;
321 self.summary.totals.reasoning_tokens += tokens.reasoning_tokens.unwrap_or_default();
322 self.summary.totals.cached_input_tokens +=
323 tokens.cached_input_tokens.unwrap_or_default();
324 self.summary.totals.cache_write_input_tokens +=
325 tokens.cache_write_input_tokens.unwrap_or_default();
326 }
327 if let Some(cost) = &usage.cost {
328 let totals = self.summary.cost.get_or_insert_with(CostTotals::default);
329 totals.amount += cost.amount;
330 if totals.currency.is_none() {
331 totals.currency = Some(cost.currency.clone());
332 }
333 }
334 }
335}
336
337impl LoopObserver for UsageReporter {
338 fn handle_event(&mut self, event: AgentEvent) {
339 self.summary.events_seen += 1;
340 match event {
341 AgentEvent::UsageUpdated(usage) => self.absorb(&usage),
342 AgentEvent::TurnFinished(TurnResult {
343 usage: Some(usage), ..
344 }) => {
345 self.summary.turn_results_seen += 1;
346 self.absorb(&usage);
347 }
348 AgentEvent::TurnFinished(_) => {
349 self.summary.turn_results_seen += 1;
350 }
351 _ => {}
352 }
353 }
354}
355
356#[derive(Clone, Debug, Default, PartialEq)]
361pub struct TranscriptView {
362 pub items: Vec<Item>,
364}
365
366#[derive(Default)]
387pub struct TranscriptReporter {
388 transcript: TranscriptView,
389}
390
391impl TranscriptReporter {
392 pub fn new() -> Self {
394 Self::default()
395 }
396
397 pub fn transcript(&self) -> &TranscriptView {
399 &self.transcript
400 }
401}
402
403impl LoopObserver for TranscriptReporter {
404 fn handle_event(&mut self, event: AgentEvent) {
405 match event {
406 AgentEvent::InputAccepted { items, .. } => {
407 self.transcript.items.extend(items);
408 }
409 AgentEvent::TurnFinished(result) => {
410 self.transcript.items.extend(result.items);
411 }
412 _ => {}
413 }
414 }
415}
416
417pub struct StdoutReporter<W> {
437 writer: W,
438 show_usage: bool,
439 errors: Vec<ReportError>,
440}
441
442impl<W> StdoutReporter<W>
443where
444 W: Write,
445{
446 pub fn new(writer: W) -> Self {
456 Self {
457 writer,
458 show_usage: true,
459 errors: Vec::new(),
460 }
461 }
462
463 pub fn with_usage(mut self, show_usage: bool) -> Self {
468 self.show_usage = show_usage;
469 self
470 }
471
472 pub fn writer(&self) -> &W {
474 &self.writer
475 }
476
477 pub fn take_errors(&mut self) -> Vec<ReportError> {
481 std::mem::take(&mut self.errors)
482 }
483
484 fn record_result(&mut self, result: Result<(), ReportError>) {
485 if let Err(error) = result {
486 self.errors.push(error);
487 }
488 }
489}
490
491impl<W> LoopObserver for StdoutReporter<W>
492where
493 W: Write + Send,
494{
495 fn handle_event(&mut self, event: AgentEvent) {
496 let result = write_stdout_event(&mut self.writer, &event, self.show_usage);
497 self.record_result(result);
498 }
499}
500
501fn write_stdout_event<W>(
502 writer: &mut W,
503 event: &AgentEvent,
504 show_usage: bool,
505) -> Result<(), ReportError>
506where
507 W: Write,
508{
509 match event {
510 AgentEvent::RunStarted { session_id } => {
511 writeln!(writer, "[run] started session={session_id}")?;
512 }
513 AgentEvent::TurnStarted {
514 session_id,
515 turn_id,
516 } => {
517 writeln!(writer, "[turn] started session={session_id} turn={turn_id}")?;
518 }
519 AgentEvent::InputAccepted { items, .. } => {
520 writeln!(writer, "[input] accepted items={}", items.len())?;
521 }
522 AgentEvent::ContentDelta(delta) => {
523 writeln!(writer, "[delta] {delta:?}")?;
524 }
525 AgentEvent::ToolCallRequested(call) => {
526 writeln!(writer, "[tool] call {} {}", call.name, call.input)?;
527 }
528 AgentEvent::ApprovalRequired(request) => {
529 writeln!(
530 writer,
531 "[approval] {} {:?}",
532 request.summary, request.reason
533 )?;
534 }
535 AgentEvent::AuthRequired(request) => {
536 writeln!(writer, "[auth] required provider={}", request.provider)?;
537 }
538 AgentEvent::ApprovalResolved { approved } => {
539 writeln!(writer, "[approval] resolved approved={approved}")?;
540 }
541 AgentEvent::AuthResolved { provided } => {
542 writeln!(writer, "[auth] resolved provided={provided}")?;
543 }
544 AgentEvent::CompactionStarted {
545 turn_id, reason, ..
546 } => {
547 writeln!(
548 writer,
549 "[compaction] started turn={} reason={reason:?}",
550 turn_id
551 .as_ref()
552 .map(ToString::to_string)
553 .unwrap_or_else(|| "none".into())
554 )?;
555 }
556 AgentEvent::CompactionFinished {
557 turn_id,
558 replaced_items,
559 transcript_len,
560 ..
561 } => {
562 writeln!(
563 writer,
564 "[compaction] finished turn={} replaced_items={} transcript_len={}",
565 turn_id
566 .as_ref()
567 .map(ToString::to_string)
568 .unwrap_or_else(|| "none".into()),
569 replaced_items,
570 transcript_len
571 )?;
572 }
573 AgentEvent::UsageUpdated(usage) if show_usage => {
574 writeln!(writer, "[usage] {}", format_usage(usage))?;
575 }
576 AgentEvent::UsageUpdated(_) => {}
577 AgentEvent::Warning { message } => {
578 writeln!(writer, "[warning] {message}")?;
579 }
580 AgentEvent::RunFailed { message } => {
581 writeln!(writer, "[error] {message}")?;
582 }
583 AgentEvent::TurnFinished(result) => {
584 writeln!(
585 writer,
586 "[turn] finished reason={:?} items={}",
587 result.finish_reason,
588 result.items.len()
589 )?;
590 for item in &result.items {
591 write_item_summary(writer, item)?;
592 }
593 if show_usage && let Some(usage) = &result.usage {
594 writeln!(writer, "[usage] {}", format_usage(usage))?;
595 }
596 }
597 }
598
599 writer.flush()?;
600 Ok(())
601}
602
603fn write_item_summary<W>(writer: &mut W, item: &Item) -> Result<(), ReportError>
604where
605 W: Write,
606{
607 writeln!(writer, " [{}]", item_kind_name(item.kind))?;
608 for part in &item.parts {
609 match part {
610 Part::Text(text) => writeln!(writer, " [text] {}", text.text)?,
611 Part::Reasoning(reasoning) => {
612 if let Some(summary) = &reasoning.summary {
613 writeln!(writer, " [reasoning] {summary}")?;
614 } else {
615 writeln!(writer, " [reasoning]")?;
616 }
617 }
618 Part::ToolCall(call) => {
619 writeln!(writer, " [tool-call] {} {}", call.name, call.input)?
620 }
621 Part::ToolResult(result) => writeln!(
622 writer,
623 " [tool-result] call={} error={}",
624 result.call_id, result.is_error
625 )?,
626 Part::Structured(value) => writeln!(writer, " [structured] {}", value.value)?,
627 Part::Media(media) => writeln!(
628 writer,
629 " [media] {:?} {}",
630 media.modality, media.mime_type
631 )?,
632 Part::File(file) => writeln!(
633 writer,
634 " [file] {}",
635 file.name.as_deref().unwrap_or("<unnamed>")
636 )?,
637 Part::Custom(custom) => writeln!(writer, " [custom] {}", custom.kind)?,
638 }
639 }
640 Ok(())
641}
642
643fn item_kind_name(kind: ItemKind) -> &'static str {
644 match kind {
645 ItemKind::System => "system",
646 ItemKind::Developer => "developer",
647 ItemKind::User => "user",
648 ItemKind::Assistant => "assistant",
649 ItemKind::Tool => "tool",
650 ItemKind::Context => "context",
651 }
652}
653
654fn format_usage(usage: &Usage) -> String {
655 match &usage.tokens {
656 Some(TokenUsage {
657 input_tokens,
658 output_tokens,
659 reasoning_tokens,
660 cached_input_tokens,
661 cache_write_input_tokens,
662 }) => format!(
663 "input={} output={} reasoning={} cached_input={} cache_write_input={}",
664 input_tokens,
665 output_tokens,
666 reasoning_tokens.unwrap_or_default(),
667 cached_input_tokens.unwrap_or_default(),
668 cache_write_input_tokens.unwrap_or_default()
669 ),
670 None => "no token usage".into(),
671 }
672}
673
674#[cfg(test)]
675mod tests {
676 use super::*;
677 use agentkit_core::{FinishReason, MetadataMap, SessionId, TextPart};
678 use agentkit_loop::TurnResult;
679
680 #[test]
681 fn usage_reporter_accumulates_usage_events_and_turn_results() {
682 let mut reporter = UsageReporter::new();
683
684 reporter.handle_event(AgentEvent::UsageUpdated(Usage {
685 tokens: Some(TokenUsage {
686 input_tokens: 10,
687 output_tokens: 5,
688 reasoning_tokens: Some(2),
689 cached_input_tokens: Some(1),
690 cache_write_input_tokens: Some(7),
691 }),
692 cost: None,
693 metadata: MetadataMap::new(),
694 }));
695
696 reporter.handle_event(AgentEvent::TurnFinished(TurnResult {
697 turn_id: "turn-1".into(),
698 finish_reason: FinishReason::Completed,
699 items: Vec::new(),
700 usage: Some(Usage {
701 tokens: Some(TokenUsage {
702 input_tokens: 3,
703 output_tokens: 4,
704 reasoning_tokens: Some(1),
705 cached_input_tokens: None,
706 cache_write_input_tokens: None,
707 }),
708 cost: None,
709 metadata: MetadataMap::new(),
710 }),
711 metadata: MetadataMap::new(),
712 }));
713
714 let summary = reporter.summary();
715 assert_eq!(summary.events_seen, 2);
716 assert_eq!(summary.usage_events_seen, 2);
717 assert_eq!(summary.turn_results_seen, 1);
718 assert_eq!(summary.totals.input_tokens, 13);
719 assert_eq!(summary.totals.output_tokens, 9);
720 assert_eq!(summary.totals.reasoning_tokens, 3);
721 assert_eq!(summary.totals.cached_input_tokens, 1);
722 assert_eq!(summary.totals.cache_write_input_tokens, 7);
723 }
724
725 #[test]
726 fn transcript_reporter_tracks_inputs_and_outputs() {
727 let mut reporter = TranscriptReporter::new();
728
729 reporter.handle_event(AgentEvent::InputAccepted {
730 session_id: SessionId::new("session-1"),
731 items: vec![Item {
732 id: None,
733 kind: ItemKind::User,
734 parts: vec![Part::Text(TextPart {
735 text: "hello".into(),
736 metadata: MetadataMap::new(),
737 })],
738 metadata: MetadataMap::new(),
739 }],
740 });
741
742 reporter.handle_event(AgentEvent::TurnFinished(TurnResult {
743 turn_id: "turn-1".into(),
744 finish_reason: FinishReason::Completed,
745 items: vec![Item {
746 id: None,
747 kind: ItemKind::Assistant,
748 parts: vec![Part::Text(TextPart {
749 text: "hi".into(),
750 metadata: MetadataMap::new(),
751 })],
752 metadata: MetadataMap::new(),
753 }],
754 usage: None,
755 metadata: MetadataMap::new(),
756 }));
757
758 assert_eq!(reporter.transcript().items.len(), 2);
759 assert_eq!(reporter.transcript().items[0].kind, ItemKind::User);
760 assert_eq!(reporter.transcript().items[1].kind, ItemKind::Assistant);
761 }
762
763 #[test]
764 fn jsonl_reporter_serializes_event_envelopes() {
765 let mut reporter = JsonlReporter::new(Vec::new());
766 reporter.handle_event(AgentEvent::RunStarted {
767 session_id: SessionId::new("session-1"),
768 });
769
770 let output = String::from_utf8(reporter.writer().clone()).unwrap();
771 assert!(output.contains("\"RunStarted\""));
772 assert!(output.contains("session-1"));
773 }
774}