1mod buffered;
44mod channel;
45mod policy;
46
47#[cfg(feature = "tracing")]
48mod tracing_reporter;
49
50pub use buffered::BufferedReporter;
51pub use channel::ChannelReporter;
52pub use policy::{FailurePolicy, FallibleObserver, PolicyReporter};
53
54#[cfg(feature = "tracing")]
55pub use tracing_reporter::TracingReporter;
56
57use std::io::{self, Write};
58use std::time::SystemTime;
59
60use agentkit_core::{Item, ItemKind, Part, TokenUsage, Usage};
61use agentkit_loop::{AgentEvent, LoopObserver, TurnResult};
62use serde::Serialize;
63use thiserror::Error;
64
65#[derive(Debug, Error)]
72pub enum ReportError {
73 #[error("io error: {0}")]
75 Io(#[from] io::Error),
76 #[error("serialization error: {0}")]
78 Serialize(#[from] serde_json::Error),
79 #[error("channel send failed")]
81 ChannelSend,
82}
83
84#[derive(Clone, Debug, PartialEq, Serialize)]
90pub struct EventEnvelope<'a> {
91 pub timestamp: SystemTime,
93 pub event: &'a AgentEvent,
95}
96
97#[derive(Default)]
117pub struct CompositeReporter {
118 children: Vec<Box<dyn LoopObserver>>,
119}
120
121impl CompositeReporter {
122 pub fn new() -> Self {
124 Self::default()
125 }
126
127 pub fn with_observer(mut self, observer: impl LoopObserver + 'static) -> Self {
133 self.children.push(Box::new(observer));
134 self
135 }
136
137 pub fn push(&mut self, observer: impl LoopObserver + 'static) -> &mut Self {
146 self.children.push(Box::new(observer));
147 self
148 }
149}
150
151impl LoopObserver for CompositeReporter {
152 fn handle_event(&mut self, event: AgentEvent) {
153 for child in &mut self.children {
154 child.handle_event(event.clone());
155 }
156 }
157}
158
159pub struct JsonlReporter<W> {
184 writer: W,
185 flush_each_event: bool,
186 errors: Vec<ReportError>,
187}
188
189impl<W> JsonlReporter<W>
190where
191 W: Write,
192{
193 pub fn new(writer: W) -> Self {
203 Self {
204 writer,
205 flush_each_event: true,
206 errors: Vec::new(),
207 }
208 }
209
210 pub fn with_flush_each_event(mut self, flush_each_event: bool) -> Self {
214 self.flush_each_event = flush_each_event;
215 self
216 }
217
218 pub fn writer(&self) -> &W {
222 &self.writer
223 }
224
225 pub fn writer_mut(&mut self) -> &mut W {
227 &mut self.writer
228 }
229
230 pub fn take_errors(&mut self) -> Vec<ReportError> {
234 std::mem::take(&mut self.errors)
235 }
236
237 fn record_result(&mut self, result: Result<(), ReportError>) {
238 if let Err(error) = result {
239 self.errors.push(error);
240 }
241 }
242}
243
244impl<W> LoopObserver for JsonlReporter<W>
245where
246 W: Write + Send,
247{
248 fn handle_event(&mut self, event: AgentEvent) {
249 let result = (|| -> Result<(), ReportError> {
250 let envelope = EventEnvelope {
251 timestamp: SystemTime::now(),
252 event: &event,
253 };
254 serde_json::to_writer(&mut self.writer, &envelope)?;
255 self.writer.write_all(b"\n")?;
256 if self.flush_each_event {
257 self.writer.flush()?;
258 }
259 Ok(())
260 })();
261 self.record_result(result);
262 }
263}
264
265#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
267pub struct UsageTotals {
268 pub input_tokens: u64,
270 pub output_tokens: u64,
272 pub reasoning_tokens: u64,
274 pub cached_input_tokens: u64,
276 pub cache_write_input_tokens: u64,
278}
279
280#[derive(Clone, Debug, Default, PartialEq)]
282pub struct CostTotals {
283 pub amount: f64,
285 pub currency: Option<String>,
287}
288
289#[derive(Clone, Debug, Default, PartialEq)]
293pub struct UsageSummary {
294 pub events_seen: usize,
296 pub usage_events_seen: usize,
299 pub turn_results_seen: usize,
301 pub totals: UsageTotals,
303 pub cost: Option<CostTotals>,
305}
306
307#[derive(Default)]
331pub struct UsageReporter {
332 summary: UsageSummary,
333}
334
335impl UsageReporter {
336 pub fn new() -> Self {
338 Self::default()
339 }
340
341 pub fn summary(&self) -> &UsageSummary {
343 &self.summary
344 }
345
346 fn absorb(&mut self, usage: &Usage) {
347 self.summary.usage_events_seen += 1;
348 if let Some(tokens) = &usage.tokens {
349 self.summary.totals.input_tokens += tokens.input_tokens;
350 self.summary.totals.output_tokens += tokens.output_tokens;
351 self.summary.totals.reasoning_tokens += tokens.reasoning_tokens.unwrap_or_default();
352 self.summary.totals.cached_input_tokens +=
353 tokens.cached_input_tokens.unwrap_or_default();
354 self.summary.totals.cache_write_input_tokens +=
355 tokens.cache_write_input_tokens.unwrap_or_default();
356 }
357 if let Some(cost) = &usage.cost {
358 let totals = self.summary.cost.get_or_insert_with(CostTotals::default);
359 totals.amount += cost.amount;
360 if totals.currency.is_none() {
361 totals.currency = Some(cost.currency.clone());
362 }
363 }
364 }
365}
366
367impl LoopObserver for UsageReporter {
368 fn handle_event(&mut self, event: AgentEvent) {
369 self.summary.events_seen += 1;
370 match event {
371 AgentEvent::UsageUpdated(usage) => self.absorb(&usage),
372 AgentEvent::TurnFinished(TurnResult {
373 usage: Some(usage), ..
374 }) => {
375 self.summary.turn_results_seen += 1;
376 self.absorb(&usage);
377 }
378 AgentEvent::TurnFinished(_) => {
379 self.summary.turn_results_seen += 1;
380 }
381 _ => {}
382 }
383 }
384}
385
386#[derive(Clone, Debug, Default, PartialEq)]
391pub struct TranscriptView {
392 pub items: Vec<Item>,
394}
395
396#[derive(Default)]
417pub struct TranscriptReporter {
418 transcript: TranscriptView,
419}
420
421impl TranscriptReporter {
422 pub fn new() -> Self {
424 Self::default()
425 }
426
427 pub fn transcript(&self) -> &TranscriptView {
429 &self.transcript
430 }
431}
432
433impl LoopObserver for TranscriptReporter {
434 fn handle_event(&mut self, event: AgentEvent) {
435 match event {
436 AgentEvent::InputAccepted { items, .. } => {
437 self.transcript.items.extend(items);
438 }
439 AgentEvent::TurnFinished(result) => {
440 self.transcript.items.extend(result.items);
441 }
442 _ => {}
443 }
444 }
445}
446
447pub struct StdoutReporter<W> {
467 writer: W,
468 show_usage: bool,
469 errors: Vec<ReportError>,
470}
471
472impl<W> StdoutReporter<W>
473where
474 W: Write,
475{
476 pub fn new(writer: W) -> Self {
486 Self {
487 writer,
488 show_usage: true,
489 errors: Vec::new(),
490 }
491 }
492
493 pub fn with_usage(mut self, show_usage: bool) -> Self {
498 self.show_usage = show_usage;
499 self
500 }
501
502 pub fn writer(&self) -> &W {
504 &self.writer
505 }
506
507 pub fn take_errors(&mut self) -> Vec<ReportError> {
511 std::mem::take(&mut self.errors)
512 }
513
514 fn record_result(&mut self, result: Result<(), ReportError>) {
515 if let Err(error) = result {
516 self.errors.push(error);
517 }
518 }
519}
520
521impl<W> LoopObserver for StdoutReporter<W>
522where
523 W: Write + Send,
524{
525 fn handle_event(&mut self, event: AgentEvent) {
526 let result = write_stdout_event(&mut self.writer, &event, self.show_usage);
527 self.record_result(result);
528 }
529}
530
531fn write_stdout_event<W>(
532 writer: &mut W,
533 event: &AgentEvent,
534 show_usage: bool,
535) -> Result<(), ReportError>
536where
537 W: Write,
538{
539 match event {
540 AgentEvent::RunStarted { session_id } => {
541 writeln!(writer, "[run] started session={session_id}")?;
542 }
543 AgentEvent::TurnStarted {
544 session_id,
545 turn_id,
546 } => {
547 writeln!(writer, "[turn] started session={session_id} turn={turn_id}")?;
548 }
549 AgentEvent::InputAccepted { items, .. } => {
550 writeln!(writer, "[input] accepted items={}", items.len())?;
551 }
552 AgentEvent::ContentDelta(delta) => {
553 writeln!(writer, "[delta] {delta:?}")?;
554 }
555 AgentEvent::ToolCallRequested(call) => {
556 writeln!(writer, "[tool] call {} {}", call.name, call.input)?;
557 }
558 AgentEvent::ApprovalRequired(request) => {
559 writeln!(
560 writer,
561 "[approval] {} {:?}",
562 request.summary, request.reason
563 )?;
564 }
565 AgentEvent::AuthRequired(request) => {
566 writeln!(writer, "[auth] required provider={}", request.provider)?;
567 }
568 AgentEvent::ApprovalResolved { approved } => {
569 writeln!(writer, "[approval] resolved approved={approved}")?;
570 }
571 AgentEvent::AuthResolved { provided } => {
572 writeln!(writer, "[auth] resolved provided={provided}")?;
573 }
574 AgentEvent::CompactionStarted {
575 turn_id, reason, ..
576 } => {
577 writeln!(
578 writer,
579 "[compaction] started turn={} reason={reason:?}",
580 turn_id
581 .as_ref()
582 .map(ToString::to_string)
583 .unwrap_or_else(|| "none".into())
584 )?;
585 }
586 AgentEvent::CompactionFinished {
587 turn_id,
588 replaced_items,
589 transcript_len,
590 ..
591 } => {
592 writeln!(
593 writer,
594 "[compaction] finished turn={} replaced_items={} transcript_len={}",
595 turn_id
596 .as_ref()
597 .map(ToString::to_string)
598 .unwrap_or_else(|| "none".into()),
599 replaced_items,
600 transcript_len
601 )?;
602 }
603 AgentEvent::UsageUpdated(usage) if show_usage => {
604 writeln!(writer, "[usage] {}", format_usage(usage))?;
605 }
606 AgentEvent::UsageUpdated(_) => {}
607 AgentEvent::Warning { message } => {
608 writeln!(writer, "[warning] {message}")?;
609 }
610 AgentEvent::RunFailed { message } => {
611 writeln!(writer, "[error] {message}")?;
612 }
613 AgentEvent::TurnFinished(result) => {
614 writeln!(
615 writer,
616 "[turn] finished reason={:?} items={}",
617 result.finish_reason,
618 result.items.len()
619 )?;
620 for item in &result.items {
621 write_item_summary(writer, item)?;
622 }
623 if show_usage && let Some(usage) = &result.usage {
624 writeln!(writer, "[usage] {}", format_usage(usage))?;
625 }
626 }
627 }
628
629 writer.flush()?;
630 Ok(())
631}
632
633fn write_item_summary<W>(writer: &mut W, item: &Item) -> Result<(), ReportError>
634where
635 W: Write,
636{
637 writeln!(writer, " [{}]", item_kind_name(item.kind))?;
638 for part in &item.parts {
639 match part {
640 Part::Text(text) => writeln!(writer, " [text] {}", text.text)?,
641 Part::Reasoning(reasoning) => {
642 if let Some(summary) = &reasoning.summary {
643 writeln!(writer, " [reasoning] {summary}")?;
644 } else {
645 writeln!(writer, " [reasoning]")?;
646 }
647 }
648 Part::ToolCall(call) => {
649 writeln!(writer, " [tool-call] {} {}", call.name, call.input)?
650 }
651 Part::ToolResult(result) => writeln!(
652 writer,
653 " [tool-result] call={} error={}",
654 result.call_id, result.is_error
655 )?,
656 Part::Structured(value) => writeln!(writer, " [structured] {}", value.value)?,
657 Part::Media(media) => writeln!(
658 writer,
659 " [media] {:?} {}",
660 media.modality, media.mime_type
661 )?,
662 Part::File(file) => writeln!(
663 writer,
664 " [file] {}",
665 file.name.as_deref().unwrap_or("<unnamed>")
666 )?,
667 Part::Custom(custom) => writeln!(writer, " [custom] {}", custom.kind)?,
668 }
669 }
670 Ok(())
671}
672
673fn item_kind_name(kind: ItemKind) -> &'static str {
674 match kind {
675 ItemKind::System => "system",
676 ItemKind::Developer => "developer",
677 ItemKind::User => "user",
678 ItemKind::Assistant => "assistant",
679 ItemKind::Tool => "tool",
680 ItemKind::Context => "context",
681 }
682}
683
684fn format_usage(usage: &Usage) -> String {
685 match &usage.tokens {
686 Some(TokenUsage {
687 input_tokens,
688 output_tokens,
689 reasoning_tokens,
690 cached_input_tokens,
691 cache_write_input_tokens,
692 }) => format!(
693 "input={} output={} reasoning={} cached_input={} cache_write_input={}",
694 input_tokens,
695 output_tokens,
696 reasoning_tokens.unwrap_or_default(),
697 cached_input_tokens.unwrap_or_default(),
698 cache_write_input_tokens.unwrap_or_default()
699 ),
700 None => "no token usage".into(),
701 }
702}
703
704#[cfg(test)]
705mod tests {
706 use super::*;
707 use agentkit_core::{FinishReason, MetadataMap, SessionId, TextPart};
708 use agentkit_loop::TurnResult;
709
710 #[test]
711 fn usage_reporter_accumulates_usage_events_and_turn_results() {
712 let mut reporter = UsageReporter::new();
713
714 reporter.handle_event(AgentEvent::UsageUpdated(Usage {
715 tokens: Some(TokenUsage {
716 input_tokens: 10,
717 output_tokens: 5,
718 reasoning_tokens: Some(2),
719 cached_input_tokens: Some(1),
720 cache_write_input_tokens: Some(7),
721 }),
722 cost: None,
723 metadata: MetadataMap::new(),
724 }));
725
726 reporter.handle_event(AgentEvent::TurnFinished(TurnResult {
727 turn_id: "turn-1".into(),
728 finish_reason: FinishReason::Completed,
729 items: Vec::new(),
730 usage: Some(Usage {
731 tokens: Some(TokenUsage {
732 input_tokens: 3,
733 output_tokens: 4,
734 reasoning_tokens: Some(1),
735 cached_input_tokens: None,
736 cache_write_input_tokens: None,
737 }),
738 cost: None,
739 metadata: MetadataMap::new(),
740 }),
741 metadata: MetadataMap::new(),
742 }));
743
744 let summary = reporter.summary();
745 assert_eq!(summary.events_seen, 2);
746 assert_eq!(summary.usage_events_seen, 2);
747 assert_eq!(summary.turn_results_seen, 1);
748 assert_eq!(summary.totals.input_tokens, 13);
749 assert_eq!(summary.totals.output_tokens, 9);
750 assert_eq!(summary.totals.reasoning_tokens, 3);
751 assert_eq!(summary.totals.cached_input_tokens, 1);
752 assert_eq!(summary.totals.cache_write_input_tokens, 7);
753 }
754
755 #[test]
756 fn transcript_reporter_tracks_inputs_and_outputs() {
757 let mut reporter = TranscriptReporter::new();
758
759 reporter.handle_event(AgentEvent::InputAccepted {
760 session_id: SessionId::new("session-1"),
761 items: vec![Item {
762 id: None,
763 kind: ItemKind::User,
764 parts: vec![Part::Text(TextPart {
765 text: "hello".into(),
766 metadata: MetadataMap::new(),
767 })],
768 metadata: MetadataMap::new(),
769 }],
770 });
771
772 reporter.handle_event(AgentEvent::TurnFinished(TurnResult {
773 turn_id: "turn-1".into(),
774 finish_reason: FinishReason::Completed,
775 items: vec![Item {
776 id: None,
777 kind: ItemKind::Assistant,
778 parts: vec![Part::Text(TextPart {
779 text: "hi".into(),
780 metadata: MetadataMap::new(),
781 })],
782 metadata: MetadataMap::new(),
783 }],
784 usage: None,
785 metadata: MetadataMap::new(),
786 }));
787
788 assert_eq!(reporter.transcript().items.len(), 2);
789 assert_eq!(reporter.transcript().items[0].kind, ItemKind::User);
790 assert_eq!(reporter.transcript().items[1].kind, ItemKind::Assistant);
791 }
792
793 #[test]
794 fn jsonl_reporter_serializes_event_envelopes() {
795 let mut reporter = JsonlReporter::new(Vec::new());
796 reporter.handle_event(AgentEvent::RunStarted {
797 session_id: SessionId::new("session-1"),
798 });
799
800 let output = String::from_utf8(reporter.writer().clone()).unwrap();
801 assert!(output.contains("\"RunStarted\""));
802 assert!(output.contains("session-1"));
803 }
804
805 fn run_started_event() -> AgentEvent {
806 AgentEvent::RunStarted {
807 session_id: SessionId::new("s1"),
808 }
809 }
810
811 #[test]
812 fn buffered_reporter_flushes_at_capacity() {
813 let mut reporter = BufferedReporter::new(UsageReporter::new(), 2);
814 reporter.handle_event(run_started_event());
815 assert_eq!(reporter.pending(), 1);
816 assert_eq!(reporter.inner().summary().events_seen, 0);
817
818 reporter.handle_event(run_started_event());
819 assert_eq!(reporter.pending(), 0);
820 assert_eq!(reporter.inner().summary().events_seen, 2);
821 }
822
823 #[test]
824 fn buffered_reporter_manual_flush() {
825 let mut reporter = BufferedReporter::new(UsageReporter::new(), 0);
826 reporter.handle_event(run_started_event());
827 reporter.handle_event(run_started_event());
828 assert_eq!(reporter.pending(), 2);
829
830 reporter.flush();
831 assert_eq!(reporter.pending(), 0);
832 assert_eq!(reporter.inner().summary().events_seen, 2);
833 }
834
835 #[test]
836 fn buffered_reporter_flushes_on_drop() {
837 let inner = {
838 let mut reporter = BufferedReporter::new(UsageReporter::new(), 100);
839 reporter.handle_event(run_started_event());
840 reporter.handle_event(run_started_event());
841 assert_eq!(reporter.inner().summary().events_seen, 0);
842 assert_eq!(reporter.pending(), 2);
845 reporter
846 };
847 assert_eq!(inner.inner().summary().events_seen, 0);
851 }
854
855 #[test]
856 fn channel_reporter_delivers_events() {
857 let (mut reporter, rx) = ChannelReporter::pair();
858 reporter.handle_event(run_started_event());
859 reporter.handle_event(run_started_event());
860
861 let events: Vec<_> = rx.try_iter().collect();
862 assert_eq!(events.len(), 2);
863 }
864
865 #[test]
866 fn channel_reporter_survives_dropped_receiver() {
867 let (mut reporter, rx) = ChannelReporter::pair();
868 drop(rx);
869 reporter.handle_event(run_started_event());
871 }
872
873 #[test]
874 fn channel_reporter_fallible_returns_error_on_dropped_receiver() {
875 let (mut reporter, rx) = ChannelReporter::pair();
876 drop(rx);
877
878 let result = reporter.try_handle_event(&run_started_event());
879 assert!(matches!(result, Err(ReportError::ChannelSend)));
880 }
881
882 #[test]
883 fn policy_reporter_ignore_swallows_errors() {
884 let (reporter, rx) = ChannelReporter::pair();
885 drop(rx);
886 let mut policy = PolicyReporter::new(reporter, FailurePolicy::Ignore);
887 policy.handle_event(run_started_event());
888 assert!(policy.take_errors().is_empty());
889 }
890
891 #[test]
892 fn policy_reporter_accumulate_collects_errors() {
893 let (reporter, rx) = ChannelReporter::pair();
894 drop(rx);
895 let mut policy = PolicyReporter::new(reporter, FailurePolicy::Accumulate);
896 policy.handle_event(run_started_event());
897 policy.handle_event(run_started_event());
898
899 let errors = policy.take_errors();
900 assert_eq!(errors.len(), 2);
901 assert!(matches!(errors[0], ReportError::ChannelSend));
902 }
903
904 #[test]
905 #[should_panic(expected = "reporter error: channel send failed")]
906 fn policy_reporter_fail_fast_panics() {
907 let (reporter, rx) = ChannelReporter::pair();
908 drop(rx);
909 let mut policy = PolicyReporter::new(reporter, FailurePolicy::FailFast);
910 policy.handle_event(run_started_event());
911 }
912}