1mod buffered;
44mod channel;
45mod policy;
46
47#[cfg(feature = "tracing")]
48mod tracing_reporter;
49
50pub use buffered::BufferedReporter;
51pub use channel::ChannelReporter;
52pub use policy::{FailurePolicy, FallibleObserver, PolicyReporter};
53
54#[cfg(feature = "tracing")]
55pub use tracing_reporter::TracingReporter;
56
57use std::io::{self, Write};
58use std::time::SystemTime;
59
60use agentkit_core::{Item, ItemKind, Part, TokenUsage, Usage};
61use agentkit_loop::{AgentEvent, LoopObserver, TurnResult};
62use serde::Serialize;
63use thiserror::Error;
64
65#[derive(Debug, Error)]
72pub enum ReportError {
73 #[error("io error: {0}")]
75 Io(#[from] io::Error),
76 #[error("serialization error: {0}")]
78 Serialize(#[from] serde_json::Error),
79 #[error("channel send failed")]
81 ChannelSend,
82}
83
84#[derive(Clone, Debug, PartialEq, Serialize)]
90pub struct EventEnvelope<'a> {
91 pub timestamp: SystemTime,
93 pub event: &'a AgentEvent,
95}
96
97#[derive(Default)]
117pub struct CompositeReporter {
118 children: Vec<Box<dyn LoopObserver>>,
119}
120
121impl CompositeReporter {
122 pub fn new() -> Self {
124 Self::default()
125 }
126
127 pub fn with_observer(mut self, observer: impl LoopObserver + 'static) -> Self {
133 self.children.push(Box::new(observer));
134 self
135 }
136
137 pub fn push(&mut self, observer: impl LoopObserver + 'static) -> &mut Self {
146 self.children.push(Box::new(observer));
147 self
148 }
149}
150
151impl LoopObserver for CompositeReporter {
152 fn handle_event(&mut self, event: AgentEvent) {
153 for child in &mut self.children {
154 child.handle_event(event.clone());
155 }
156 }
157}
158
159pub struct JsonlReporter<W> {
184 writer: W,
185 flush_each_event: bool,
186 errors: Vec<ReportError>,
187}
188
189impl<W> JsonlReporter<W>
190where
191 W: Write,
192{
193 pub fn new(writer: W) -> Self {
203 Self {
204 writer,
205 flush_each_event: true,
206 errors: Vec::new(),
207 }
208 }
209
210 pub fn with_flush_each_event(mut self, flush_each_event: bool) -> Self {
214 self.flush_each_event = flush_each_event;
215 self
216 }
217
218 pub fn writer(&self) -> &W {
222 &self.writer
223 }
224
225 pub fn writer_mut(&mut self) -> &mut W {
227 &mut self.writer
228 }
229
230 pub fn take_errors(&mut self) -> Vec<ReportError> {
234 std::mem::take(&mut self.errors)
235 }
236
237 fn record_result(&mut self, result: Result<(), ReportError>) {
238 if let Err(error) = result {
239 self.errors.push(error);
240 }
241 }
242}
243
244impl<W> LoopObserver for JsonlReporter<W>
245where
246 W: Write + Send,
247{
248 fn handle_event(&mut self, event: AgentEvent) {
249 let result = (|| -> Result<(), ReportError> {
250 let envelope = EventEnvelope {
251 timestamp: SystemTime::now(),
252 event: &event,
253 };
254 serde_json::to_writer(&mut self.writer, &envelope)?;
255 self.writer.write_all(b"\n")?;
256 if self.flush_each_event {
257 self.writer.flush()?;
258 }
259 Ok(())
260 })();
261 self.record_result(result);
262 }
263}
264
265#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
267pub struct UsageTotals {
268 pub input_tokens: u64,
270 pub output_tokens: u64,
272 pub reasoning_tokens: u64,
274 pub cached_input_tokens: u64,
276 pub cache_write_input_tokens: u64,
278}
279
280#[derive(Clone, Debug, Default, PartialEq)]
282pub struct CostTotals {
283 pub amount: f64,
285 pub currency: Option<String>,
287}
288
289#[derive(Clone, Debug, Default, PartialEq)]
293pub struct UsageSummary {
294 pub events_seen: usize,
296 pub usage_events_seen: usize,
299 pub turn_results_seen: usize,
301 pub totals: UsageTotals,
303 pub cost: Option<CostTotals>,
305}
306
307#[derive(Default)]
331pub struct UsageReporter {
332 summary: UsageSummary,
333}
334
335impl UsageReporter {
336 pub fn new() -> Self {
338 Self::default()
339 }
340
341 pub fn summary(&self) -> &UsageSummary {
343 &self.summary
344 }
345
346 fn absorb(&mut self, usage: &Usage) {
347 self.summary.usage_events_seen += 1;
348 if let Some(tokens) = &usage.tokens {
349 self.summary.totals.input_tokens += tokens.input_tokens;
350 self.summary.totals.output_tokens += tokens.output_tokens;
351 self.summary.totals.reasoning_tokens += tokens.reasoning_tokens.unwrap_or_default();
352 self.summary.totals.cached_input_tokens +=
353 tokens.cached_input_tokens.unwrap_or_default();
354 self.summary.totals.cache_write_input_tokens +=
355 tokens.cache_write_input_tokens.unwrap_or_default();
356 }
357 if let Some(cost) = &usage.cost {
358 let totals = self.summary.cost.get_or_insert_with(CostTotals::default);
359 totals.amount += cost.amount;
360 if totals.currency.is_none() {
361 totals.currency = Some(cost.currency.clone());
362 }
363 }
364 }
365}
366
367impl LoopObserver for UsageReporter {
368 fn handle_event(&mut self, event: AgentEvent) {
369 self.summary.events_seen += 1;
370 match event {
371 AgentEvent::UsageUpdated(usage) => self.absorb(&usage),
372 AgentEvent::TurnFinished(TurnResult {
373 usage: Some(usage), ..
374 }) => {
375 self.summary.turn_results_seen += 1;
376 self.absorb(&usage);
377 }
378 AgentEvent::TurnFinished(_) => {
379 self.summary.turn_results_seen += 1;
380 }
381 _ => {}
382 }
383 }
384}
385
386#[derive(Clone, Debug, Default, PartialEq)]
391pub struct TranscriptView {
392 pub items: Vec<Item>,
394}
395
396#[derive(Default)]
417pub struct TranscriptReporter {
418 transcript: TranscriptView,
419}
420
421impl TranscriptReporter {
422 pub fn new() -> Self {
424 Self::default()
425 }
426
427 pub fn transcript(&self) -> &TranscriptView {
429 &self.transcript
430 }
431}
432
433impl LoopObserver for TranscriptReporter {
434 fn handle_event(&mut self, event: AgentEvent) {
435 match event {
436 AgentEvent::InputAccepted { items, .. } => {
437 self.transcript.items.extend(items);
438 }
439 AgentEvent::TurnFinished(result) => {
440 self.transcript.items.extend(result.items);
441 }
442 _ => {}
443 }
444 }
445}
446
447pub struct StdoutReporter<W> {
467 writer: W,
468 show_usage: bool,
469 errors: Vec<ReportError>,
470}
471
472impl<W> StdoutReporter<W>
473where
474 W: Write,
475{
476 pub fn new(writer: W) -> Self {
486 Self {
487 writer,
488 show_usage: true,
489 errors: Vec::new(),
490 }
491 }
492
493 pub fn with_usage(mut self, show_usage: bool) -> Self {
498 self.show_usage = show_usage;
499 self
500 }
501
502 pub fn writer(&self) -> &W {
504 &self.writer
505 }
506
507 pub fn take_errors(&mut self) -> Vec<ReportError> {
511 std::mem::take(&mut self.errors)
512 }
513
514 fn record_result(&mut self, result: Result<(), ReportError>) {
515 if let Err(error) = result {
516 self.errors.push(error);
517 }
518 }
519}
520
521impl<W> LoopObserver for StdoutReporter<W>
522where
523 W: Write + Send,
524{
525 fn handle_event(&mut self, event: AgentEvent) {
526 let result = write_stdout_event(&mut self.writer, &event, self.show_usage);
527 self.record_result(result);
528 }
529}
530
531fn write_stdout_event<W>(
532 writer: &mut W,
533 event: &AgentEvent,
534 show_usage: bool,
535) -> Result<(), ReportError>
536where
537 W: Write,
538{
539 match event {
540 AgentEvent::RunStarted { session_id } => {
541 writeln!(writer, "[run] started session={session_id}")?;
542 }
543 AgentEvent::TurnStarted {
544 session_id,
545 turn_id,
546 } => {
547 writeln!(writer, "[turn] started session={session_id} turn={turn_id}")?;
548 }
549 AgentEvent::InputAccepted { items, .. } => {
550 writeln!(writer, "[input] accepted items={}", items.len())?;
551 }
552 AgentEvent::ContentDelta(delta) => {
553 writeln!(writer, "[delta] {delta:?}")?;
554 }
555 AgentEvent::ToolCallRequested(call) => {
556 writeln!(writer, "[tool] call {} {}", call.name, call.input)?;
557 }
558 AgentEvent::ToolResultReceived(result) => {
559 writeln!(
560 writer,
561 "[tool] result call_id={} is_error={}",
562 result.call_id, result.is_error
563 )?;
564 }
565 AgentEvent::ApprovalRequired(request) => {
566 writeln!(
567 writer,
568 "[approval] {} {:?}",
569 request.summary, request.reason
570 )?;
571 }
572 AgentEvent::ApprovalResolved { approved } => {
573 writeln!(writer, "[approval] resolved approved={approved}")?;
574 }
575 AgentEvent::ToolCatalogChanged(event) => {
576 writeln!(
577 writer,
578 "[tools] catalog changed source={} added={} removed={} changed={}",
579 event.source,
580 event.added.len(),
581 event.removed.len(),
582 event.changed.len()
583 )?;
584 }
585 AgentEvent::CompactionStarted {
586 turn_id, reason, ..
587 } => {
588 writeln!(
589 writer,
590 "[compaction] started turn={} reason={reason:?}",
591 turn_id
592 .as_ref()
593 .map(ToString::to_string)
594 .unwrap_or_else(|| "none".into())
595 )?;
596 }
597 AgentEvent::CompactionFinished {
598 turn_id,
599 replaced_items,
600 transcript_len,
601 ..
602 } => {
603 writeln!(
604 writer,
605 "[compaction] finished turn={} replaced_items={} transcript_len={}",
606 turn_id
607 .as_ref()
608 .map(ToString::to_string)
609 .unwrap_or_else(|| "none".into()),
610 replaced_items,
611 transcript_len
612 )?;
613 }
614 AgentEvent::UsageUpdated(usage) if show_usage => {
615 writeln!(writer, "[usage] {}", format_usage(usage))?;
616 }
617 AgentEvent::UsageUpdated(_) => {}
618 AgentEvent::Warning { message } => {
619 writeln!(writer, "[warning] {message}")?;
620 }
621 AgentEvent::RunFailed { message } => {
622 writeln!(writer, "[error] {message}")?;
623 }
624 AgentEvent::TurnFinished(result) => {
625 writeln!(
626 writer,
627 "[turn] finished reason={:?} items={}",
628 result.finish_reason,
629 result.items.len()
630 )?;
631 for item in &result.items {
632 write_item_summary(writer, item)?;
633 }
634 if show_usage && let Some(usage) = &result.usage {
635 writeln!(writer, "[usage] {}", format_usage(usage))?;
636 }
637 }
638 }
639
640 writer.flush()?;
641 Ok(())
642}
643
644fn write_item_summary<W>(writer: &mut W, item: &Item) -> Result<(), ReportError>
645where
646 W: Write,
647{
648 writeln!(writer, " [{}]", item_kind_name(item.kind))?;
649 for part in &item.parts {
650 match part {
651 Part::Text(text) => writeln!(writer, " [text] {}", text.text)?,
652 Part::Reasoning(reasoning) => {
653 if let Some(summary) = &reasoning.summary {
654 writeln!(writer, " [reasoning] {summary}")?;
655 } else {
656 writeln!(writer, " [reasoning]")?;
657 }
658 }
659 Part::ToolCall(call) => {
660 writeln!(writer, " [tool-call] {} {}", call.name, call.input)?
661 }
662 Part::ToolResult(result) => writeln!(
663 writer,
664 " [tool-result] call={} error={}",
665 result.call_id, result.is_error
666 )?,
667 Part::Structured(value) => writeln!(writer, " [structured] {}", value.value)?,
668 Part::Media(media) => writeln!(
669 writer,
670 " [media] {:?} {}",
671 media.modality, media.mime_type
672 )?,
673 Part::File(file) => writeln!(
674 writer,
675 " [file] {}",
676 file.name.as_deref().unwrap_or("<unnamed>")
677 )?,
678 Part::Custom(custom) => writeln!(writer, " [custom] {}", custom.kind)?,
679 }
680 }
681 Ok(())
682}
683
684fn item_kind_name(kind: ItemKind) -> &'static str {
685 match kind {
686 ItemKind::System => "system",
687 ItemKind::Developer => "developer",
688 ItemKind::User => "user",
689 ItemKind::Assistant => "assistant",
690 ItemKind::Tool => "tool",
691 ItemKind::Context => "context",
692 ItemKind::Notification => "notification",
693 }
694}
695
696fn format_usage(usage: &Usage) -> String {
697 match &usage.tokens {
698 Some(TokenUsage {
699 input_tokens,
700 output_tokens,
701 reasoning_tokens,
702 cached_input_tokens,
703 cache_write_input_tokens,
704 }) => format!(
705 "input={} output={} reasoning={} cached_input={} cache_write_input={}",
706 input_tokens,
707 output_tokens,
708 reasoning_tokens.unwrap_or_default(),
709 cached_input_tokens.unwrap_or_default(),
710 cache_write_input_tokens.unwrap_or_default()
711 ),
712 None => "no token usage".into(),
713 }
714}
715
716#[cfg(test)]
717mod tests {
718 use super::*;
719 use agentkit_core::{FinishReason, MetadataMap, SessionId, TextPart};
720 use agentkit_loop::TurnResult;
721
722 #[test]
723 fn usage_reporter_accumulates_usage_events_and_turn_results() {
724 let mut reporter = UsageReporter::new();
725
726 reporter.handle_event(AgentEvent::UsageUpdated(Usage {
727 tokens: Some(TokenUsage {
728 input_tokens: 10,
729 output_tokens: 5,
730 reasoning_tokens: Some(2),
731 cached_input_tokens: Some(1),
732 cache_write_input_tokens: Some(7),
733 }),
734 cost: None,
735 metadata: MetadataMap::new(),
736 }));
737
738 reporter.handle_event(AgentEvent::TurnFinished(TurnResult {
739 turn_id: "turn-1".into(),
740 finish_reason: FinishReason::Completed,
741 items: Vec::new(),
742 usage: Some(Usage {
743 tokens: Some(TokenUsage {
744 input_tokens: 3,
745 output_tokens: 4,
746 reasoning_tokens: Some(1),
747 cached_input_tokens: None,
748 cache_write_input_tokens: None,
749 }),
750 cost: None,
751 metadata: MetadataMap::new(),
752 }),
753 metadata: MetadataMap::new(),
754 }));
755
756 let summary = reporter.summary();
757 assert_eq!(summary.events_seen, 2);
758 assert_eq!(summary.usage_events_seen, 2);
759 assert_eq!(summary.turn_results_seen, 1);
760 assert_eq!(summary.totals.input_tokens, 13);
761 assert_eq!(summary.totals.output_tokens, 9);
762 assert_eq!(summary.totals.reasoning_tokens, 3);
763 assert_eq!(summary.totals.cached_input_tokens, 1);
764 assert_eq!(summary.totals.cache_write_input_tokens, 7);
765 }
766
767 #[test]
768 fn transcript_reporter_tracks_inputs_and_outputs() {
769 let mut reporter = TranscriptReporter::new();
770
771 reporter.handle_event(AgentEvent::InputAccepted {
772 session_id: SessionId::new("session-1"),
773 items: vec![Item {
774 id: None,
775 kind: ItemKind::User,
776 parts: vec![Part::Text(TextPart {
777 text: "hello".into(),
778 metadata: MetadataMap::new(),
779 })],
780 metadata: MetadataMap::new(),
781 }],
782 });
783
784 reporter.handle_event(AgentEvent::TurnFinished(TurnResult {
785 turn_id: "turn-1".into(),
786 finish_reason: FinishReason::Completed,
787 items: vec![Item {
788 id: None,
789 kind: ItemKind::Assistant,
790 parts: vec![Part::Text(TextPart {
791 text: "hi".into(),
792 metadata: MetadataMap::new(),
793 })],
794 metadata: MetadataMap::new(),
795 }],
796 usage: None,
797 metadata: MetadataMap::new(),
798 }));
799
800 assert_eq!(reporter.transcript().items.len(), 2);
801 assert_eq!(reporter.transcript().items[0].kind, ItemKind::User);
802 assert_eq!(reporter.transcript().items[1].kind, ItemKind::Assistant);
803 }
804
805 #[test]
806 fn jsonl_reporter_serializes_event_envelopes() {
807 let mut reporter = JsonlReporter::new(Vec::new());
808 reporter.handle_event(AgentEvent::RunStarted {
809 session_id: SessionId::new("session-1"),
810 });
811
812 let output = String::from_utf8(reporter.writer().clone()).unwrap();
813 assert!(output.contains("\"RunStarted\""));
814 assert!(output.contains("session-1"));
815 }
816
817 fn run_started_event() -> AgentEvent {
818 AgentEvent::RunStarted {
819 session_id: SessionId::new("s1"),
820 }
821 }
822
823 #[test]
824 fn buffered_reporter_flushes_at_capacity() {
825 let mut reporter = BufferedReporter::new(UsageReporter::new(), 2);
826 reporter.handle_event(run_started_event());
827 assert_eq!(reporter.pending(), 1);
828 assert_eq!(reporter.inner().summary().events_seen, 0);
829
830 reporter.handle_event(run_started_event());
831 assert_eq!(reporter.pending(), 0);
832 assert_eq!(reporter.inner().summary().events_seen, 2);
833 }
834
835 #[test]
836 fn buffered_reporter_manual_flush() {
837 let mut reporter = BufferedReporter::new(UsageReporter::new(), 0);
838 reporter.handle_event(run_started_event());
839 reporter.handle_event(run_started_event());
840 assert_eq!(reporter.pending(), 2);
841
842 reporter.flush();
843 assert_eq!(reporter.pending(), 0);
844 assert_eq!(reporter.inner().summary().events_seen, 2);
845 }
846
847 #[test]
848 fn buffered_reporter_flushes_on_drop() {
849 let inner = {
850 let mut reporter = BufferedReporter::new(UsageReporter::new(), 100);
851 reporter.handle_event(run_started_event());
852 reporter.handle_event(run_started_event());
853 assert_eq!(reporter.inner().summary().events_seen, 0);
854 assert_eq!(reporter.pending(), 2);
857 reporter
858 };
859 assert_eq!(inner.inner().summary().events_seen, 0);
863 }
866
867 #[test]
868 fn channel_reporter_delivers_events() {
869 let (mut reporter, rx) = ChannelReporter::pair();
870 reporter.handle_event(run_started_event());
871 reporter.handle_event(run_started_event());
872
873 let events: Vec<_> = rx.try_iter().collect();
874 assert_eq!(events.len(), 2);
875 }
876
877 #[test]
878 fn channel_reporter_survives_dropped_receiver() {
879 let (mut reporter, rx) = ChannelReporter::pair();
880 drop(rx);
881 reporter.handle_event(run_started_event());
883 }
884
885 #[test]
886 fn channel_reporter_fallible_returns_error_on_dropped_receiver() {
887 let (mut reporter, rx) = ChannelReporter::pair();
888 drop(rx);
889
890 let result = reporter.try_handle_event(&run_started_event());
891 assert!(matches!(result, Err(ReportError::ChannelSend)));
892 }
893
894 #[test]
895 fn policy_reporter_ignore_swallows_errors() {
896 let (reporter, rx) = ChannelReporter::pair();
897 drop(rx);
898 let mut policy = PolicyReporter::new(reporter, FailurePolicy::Ignore);
899 policy.handle_event(run_started_event());
900 assert!(policy.take_errors().is_empty());
901 }
902
903 #[test]
904 fn policy_reporter_accumulate_collects_errors() {
905 let (reporter, rx) = ChannelReporter::pair();
906 drop(rx);
907 let mut policy = PolicyReporter::new(reporter, FailurePolicy::Accumulate);
908 policy.handle_event(run_started_event());
909 policy.handle_event(run_started_event());
910
911 let errors = policy.take_errors();
912 assert_eq!(errors.len(), 2);
913 assert!(matches!(errors[0], ReportError::ChannelSend));
914 }
915
916 #[test]
917 #[should_panic(expected = "reporter error: channel send failed")]
918 fn policy_reporter_fail_fast_panics() {
919 let (reporter, rx) = ChannelReporter::pair();
920 drop(rx);
921 let mut policy = PolicyReporter::new(reporter, FailurePolicy::FailFast);
922 policy.handle_event(run_started_event());
923 }
924}