1mod buffered;
44mod channel;
45mod policy;
46
47#[cfg(feature = "tracing")]
48mod tracing_reporter;
49
50pub use buffered::BufferedReporter;
51pub use channel::ChannelReporter;
52pub use policy::{FailurePolicy, FallibleObserver, PolicyReporter};
53
54#[cfg(feature = "tracing")]
55pub use tracing_reporter::TracingReporter;
56
57use std::io::{self, Write};
58use std::time::SystemTime;
59
60use agentkit_core::{Item, ItemKind, Part, TokenUsage, Usage};
61use agentkit_loop::{AgentEvent, LoopObserver, TurnResult};
62use serde::Serialize;
63use thiserror::Error;
64
65#[derive(Debug, Error)]
72pub enum ReportError {
73 #[error("io error: {0}")]
75 Io(#[from] io::Error),
76 #[error("serialization error: {0}")]
78 Serialize(#[from] serde_json::Error),
79 #[error("channel send failed")]
81 ChannelSend,
82}
83
84#[derive(Clone, Debug, PartialEq, Serialize)]
90pub struct EventEnvelope<'a> {
91 pub timestamp: SystemTime,
93 pub event: &'a AgentEvent,
95}
96
97#[derive(Default)]
117pub struct CompositeReporter {
118 children: Vec<Box<dyn LoopObserver>>,
119}
120
121impl CompositeReporter {
122 pub fn new() -> Self {
124 Self::default()
125 }
126
127 pub fn with_observer(mut self, observer: impl LoopObserver + 'static) -> Self {
133 self.children.push(Box::new(observer));
134 self
135 }
136
137 pub fn push(&mut self, observer: impl LoopObserver + 'static) -> &mut Self {
146 self.children.push(Box::new(observer));
147 self
148 }
149}
150
151impl LoopObserver for CompositeReporter {
152 fn handle_event(&self, event: AgentEvent) {
153 for child in &self.children {
154 child.handle_event(event.clone());
155 }
156 }
157}
158
159pub struct JsonlReporter<W> {
184 writer: std::sync::Mutex<W>,
185 flush_each_event: bool,
186 errors: std::sync::Mutex<Vec<ReportError>>,
187}
188
189impl<W> JsonlReporter<W>
190where
191 W: Write,
192{
193 pub fn new(writer: W) -> Self {
195 Self {
196 writer: std::sync::Mutex::new(writer),
197 flush_each_event: true,
198 errors: std::sync::Mutex::new(Vec::new()),
199 }
200 }
201
202 pub fn with_flush_each_event(mut self, flush_each_event: bool) -> Self {
204 self.flush_each_event = flush_each_event;
205 self
206 }
207
208 pub fn take_errors(&self) -> Vec<ReportError> {
210 std::mem::take(&mut *self.errors.lock().unwrap_or_else(|e| e.into_inner()))
211 }
212
213 fn record_result(&self, result: Result<(), ReportError>) {
214 if let Err(error) = result {
215 self.errors
216 .lock()
217 .unwrap_or_else(|e| e.into_inner())
218 .push(error);
219 }
220 }
221
222 pub fn into_inner(self) -> W {
224 self.writer.into_inner().unwrap_or_else(|e| e.into_inner())
225 }
226}
227
228impl<W> LoopObserver for JsonlReporter<W>
229where
230 W: Write + Send,
231{
232 fn handle_event(&self, event: AgentEvent) {
233 let result = (|| -> Result<(), ReportError> {
234 let envelope = EventEnvelope {
235 timestamp: SystemTime::now(),
236 event: &event,
237 };
238 let mut buf = serde_json::to_vec(&envelope)?;
239 buf.push(b'\n');
240 let mut writer = self.writer.lock().unwrap_or_else(|e| e.into_inner());
241 writer.write_all(&buf)?;
242 if self.flush_each_event {
243 writer.flush()?;
244 }
245 Ok(())
246 })();
247 self.record_result(result);
248 }
249}
250
251#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
253pub struct UsageTotals {
254 pub input_tokens: u64,
256 pub output_tokens: u64,
258 pub reasoning_tokens: u64,
260 pub cached_input_tokens: u64,
262 pub cache_write_input_tokens: u64,
264}
265
266#[derive(Clone, Debug, Default, PartialEq)]
268pub struct CostTotals {
269 pub amount: f64,
271 pub currency: Option<String>,
273}
274
275#[derive(Clone, Debug, Default, PartialEq)]
279pub struct UsageSummary {
280 pub events_seen: usize,
282 pub usage_events_seen: usize,
285 pub turn_results_seen: usize,
287 pub totals: UsageTotals,
289 pub cost: Option<CostTotals>,
291}
292
293#[derive(Default)]
317pub struct UsageReporter {
318 summary: std::sync::Mutex<UsageSummary>,
319}
320
321impl UsageReporter {
322 pub fn new() -> Self {
324 Self::default()
325 }
326
327 pub fn summary(&self) -> UsageSummary {
329 self.summary
330 .lock()
331 .unwrap_or_else(|e| e.into_inner())
332 .clone()
333 }
334
335 fn absorb(summary: &mut UsageSummary, usage: &Usage) {
336 summary.usage_events_seen += 1;
337 if let Some(tokens) = &usage.tokens {
338 summary.totals.input_tokens += tokens.input_tokens;
339 summary.totals.output_tokens += tokens.output_tokens;
340 summary.totals.reasoning_tokens += tokens.reasoning_tokens.unwrap_or_default();
341 summary.totals.cached_input_tokens += tokens.cached_input_tokens.unwrap_or_default();
342 summary.totals.cache_write_input_tokens +=
343 tokens.cache_write_input_tokens.unwrap_or_default();
344 }
345 if let Some(cost) = &usage.cost {
346 let totals = summary.cost.get_or_insert_with(CostTotals::default);
347 totals.amount += cost.amount;
348 if totals.currency.is_none() {
349 totals.currency = Some(cost.currency.clone());
350 }
351 }
352 }
353}
354
355impl LoopObserver for UsageReporter {
356 fn handle_event(&self, event: AgentEvent) {
357 let mut summary = self.summary.lock().unwrap_or_else(|e| e.into_inner());
358 summary.events_seen += 1;
359 match event {
360 AgentEvent::UsageUpdated(usage) => Self::absorb(&mut summary, &usage),
361 AgentEvent::TurnFinished(TurnResult {
362 usage: Some(usage), ..
363 }) => {
364 summary.turn_results_seen += 1;
365 Self::absorb(&mut summary, &usage);
366 }
367 AgentEvent::TurnFinished(_) => {
368 summary.turn_results_seen += 1;
369 }
370 _ => {}
371 }
372 }
373}
374
375#[derive(Clone, Debug, Default, PartialEq)]
380pub struct TranscriptView {
381 pub items: Vec<Item>,
383}
384
385#[derive(Default)]
406pub struct TranscriptReporter {
407 transcript: std::sync::Mutex<TranscriptView>,
408}
409
410impl TranscriptReporter {
411 pub fn new() -> Self {
413 Self::default()
414 }
415
416 pub fn transcript(&self) -> TranscriptView {
418 self.transcript
419 .lock()
420 .unwrap_or_else(|e| e.into_inner())
421 .clone()
422 }
423}
424
425impl LoopObserver for TranscriptReporter {
426 fn handle_event(&self, event: AgentEvent) {
427 let mut transcript = self.transcript.lock().unwrap_or_else(|e| e.into_inner());
428 match event {
429 AgentEvent::InputAccepted { items, .. } => {
430 transcript.items.extend(items);
431 }
432 AgentEvent::TurnFinished(result) => {
433 transcript.items.extend(result.items);
434 }
435 _ => {}
436 }
437 }
438}
439
440pub struct StdoutReporter<W> {
460 writer: std::sync::Mutex<W>,
461 show_usage: bool,
462 errors: std::sync::Mutex<Vec<ReportError>>,
463}
464
465impl<W> StdoutReporter<W>
466where
467 W: Write,
468{
469 pub fn new(writer: W) -> Self {
471 Self {
472 writer: std::sync::Mutex::new(writer),
473 show_usage: true,
474 errors: std::sync::Mutex::new(Vec::new()),
475 }
476 }
477
478 pub fn with_usage(mut self, show_usage: bool) -> Self {
480 self.show_usage = show_usage;
481 self
482 }
483
484 pub fn take_errors(&self) -> Vec<ReportError> {
486 std::mem::take(&mut *self.errors.lock().unwrap_or_else(|e| e.into_inner()))
487 }
488
489 fn record_result(&self, result: Result<(), ReportError>) {
490 if let Err(error) = result {
491 self.errors
492 .lock()
493 .unwrap_or_else(|e| e.into_inner())
494 .push(error);
495 }
496 }
497}
498
499impl<W> LoopObserver for StdoutReporter<W>
500where
501 W: Write + Send,
502{
503 fn handle_event(&self, event: AgentEvent) {
504 let result = (|| -> Result<(), ReportError> {
505 let mut buf: Vec<u8> = Vec::new();
506 write_stdout_event(&mut buf, &event, self.show_usage)?;
507 let mut writer = self.writer.lock().unwrap_or_else(|e| e.into_inner());
508 writer.write_all(&buf)?;
509 writer.flush()?;
510 Ok(())
511 })();
512 self.record_result(result);
513 }
514}
515
516fn write_stdout_event<W>(
517 writer: &mut W,
518 event: &AgentEvent,
519 show_usage: bool,
520) -> Result<(), ReportError>
521where
522 W: Write,
523{
524 match event {
525 AgentEvent::RunStarted { session_id } => {
526 writeln!(writer, "[run] started session={session_id}")?;
527 }
528 AgentEvent::TurnStarted {
529 session_id,
530 turn_id,
531 } => {
532 writeln!(writer, "[turn] started session={session_id} turn={turn_id}")?;
533 }
534 AgentEvent::InputAccepted { items, .. } => {
535 writeln!(writer, "[input] accepted items={}", items.len())?;
536 }
537 AgentEvent::ContentDelta(delta) => {
538 writeln!(writer, "[delta] {delta:?}")?;
539 }
540 AgentEvent::ToolCallRequested(call) => {
541 writeln!(writer, "[tool] call {} {}", call.name, call.input)?;
542 }
543 AgentEvent::ToolResultReceived(result) => {
544 writeln!(
545 writer,
546 "[tool] result call_id={} is_error={}",
547 result.call_id, result.is_error
548 )?;
549 }
550 AgentEvent::ApprovalRequired(request) => {
551 writeln!(
552 writer,
553 "[approval] {} {:?}",
554 request.summary, request.reason
555 )?;
556 }
557 AgentEvent::ApprovalResolved { approved } => {
558 writeln!(writer, "[approval] resolved approved={approved}")?;
559 }
560 AgentEvent::ToolCatalogChanged(event) => {
561 writeln!(
562 writer,
563 "[tools] catalog changed source={} added={} removed={} changed={}",
564 event.source,
565 event.added.len(),
566 event.removed.len(),
567 event.changed.len()
568 )?;
569 }
570 AgentEvent::MutationStarted {
571 turn_id,
572 mutator,
573 point,
574 ..
575 } => {
576 writeln!(
577 writer,
578 "[mutation] started turn={} mutator={mutator} point={point:?}",
579 turn_id
580 .as_ref()
581 .map(ToString::to_string)
582 .unwrap_or_else(|| "none".into()),
583 )?;
584 }
585 AgentEvent::MutationFinished {
586 turn_id,
587 mutator,
588 dirty,
589 ..
590 } => {
591 writeln!(
592 writer,
593 "[mutation] finished turn={} mutator={mutator} dirty={dirty}",
594 turn_id
595 .as_ref()
596 .map(ToString::to_string)
597 .unwrap_or_else(|| "none".into()),
598 )?;
599 }
600 AgentEvent::UsageUpdated(usage) if show_usage => {
601 writeln!(writer, "[usage] {}", format_usage(usage))?;
602 }
603 AgentEvent::UsageUpdated(_) => {}
604 AgentEvent::Warning { message } => {
605 writeln!(writer, "[warning] {message}")?;
606 }
607 AgentEvent::RunFailed { message } => {
608 writeln!(writer, "[error] {message}")?;
609 }
610 AgentEvent::TurnFinished(result) => {
611 writeln!(
612 writer,
613 "[turn] finished reason={:?} items={}",
614 result.finish_reason,
615 result.items.len()
616 )?;
617 for item in &result.items {
618 write_item_summary(writer, item)?;
619 }
620 if show_usage && let Some(usage) = &result.usage {
621 writeln!(writer, "[usage] {}", format_usage(usage))?;
622 }
623 }
624 }
625
626 writer.flush()?;
627 Ok(())
628}
629
630fn write_item_summary<W>(writer: &mut W, item: &Item) -> Result<(), ReportError>
631where
632 W: Write,
633{
634 writeln!(writer, " [{}]", item_kind_name(item.kind))?;
635 for part in &item.parts {
636 match part {
637 Part::Text(text) => writeln!(writer, " [text] {}", text.text)?,
638 Part::Reasoning(reasoning) => {
639 if let Some(summary) = &reasoning.summary {
640 writeln!(writer, " [reasoning] {summary}")?;
641 } else {
642 writeln!(writer, " [reasoning]")?;
643 }
644 }
645 Part::ToolCall(call) => {
646 writeln!(writer, " [tool-call] {} {}", call.name, call.input)?
647 }
648 Part::ToolResult(result) => writeln!(
649 writer,
650 " [tool-result] call={} error={}",
651 result.call_id, result.is_error
652 )?,
653 Part::Structured(value) => writeln!(writer, " [structured] {}", value.value)?,
654 Part::Media(media) => writeln!(
655 writer,
656 " [media] {:?} {}",
657 media.modality, media.mime_type
658 )?,
659 Part::File(file) => writeln!(
660 writer,
661 " [file] {}",
662 file.name.as_deref().unwrap_or("<unnamed>")
663 )?,
664 Part::Custom(custom) => writeln!(writer, " [custom] {}", custom.kind)?,
665 }
666 }
667 Ok(())
668}
669
670fn item_kind_name(kind: ItemKind) -> &'static str {
671 match kind {
672 ItemKind::System => "system",
673 ItemKind::Developer => "developer",
674 ItemKind::User => "user",
675 ItemKind::Assistant => "assistant",
676 ItemKind::Tool => "tool",
677 ItemKind::Context => "context",
678 ItemKind::Notification => "notification",
679 }
680}
681
682fn format_usage(usage: &Usage) -> String {
683 match &usage.tokens {
684 Some(TokenUsage {
685 input_tokens,
686 output_tokens,
687 reasoning_tokens,
688 cached_input_tokens,
689 cache_write_input_tokens,
690 }) => format!(
691 "input={} output={} reasoning={} cached_input={} cache_write_input={}",
692 input_tokens,
693 output_tokens,
694 reasoning_tokens.unwrap_or_default(),
695 cached_input_tokens.unwrap_or_default(),
696 cache_write_input_tokens.unwrap_or_default()
697 ),
698 None => "no token usage".into(),
699 }
700}
701
702#[cfg(test)]
703mod tests {
704 use super::*;
705 use agentkit_core::{FinishReason, MetadataMap, SessionId, TextPart};
706 use agentkit_loop::TurnResult;
707
708 #[test]
709 fn usage_reporter_accumulates_usage_events_and_turn_results() {
710 let reporter = UsageReporter::new();
711
712 reporter.handle_event(AgentEvent::UsageUpdated(Usage {
713 tokens: Some(TokenUsage {
714 input_tokens: 10,
715 output_tokens: 5,
716 reasoning_tokens: Some(2),
717 cached_input_tokens: Some(1),
718 cache_write_input_tokens: Some(7),
719 }),
720 cost: None,
721 metadata: MetadataMap::new(),
722 }));
723
724 reporter.handle_event(AgentEvent::TurnFinished(TurnResult {
725 turn_id: "turn-1".into(),
726 finish_reason: FinishReason::Completed,
727 items: Vec::new(),
728 usage: Some(Usage {
729 tokens: Some(TokenUsage {
730 input_tokens: 3,
731 output_tokens: 4,
732 reasoning_tokens: Some(1),
733 cached_input_tokens: None,
734 cache_write_input_tokens: None,
735 }),
736 cost: None,
737 metadata: MetadataMap::new(),
738 }),
739 metadata: MetadataMap::new(),
740 }));
741
742 let summary = reporter.summary();
743 assert_eq!(summary.events_seen, 2);
744 assert_eq!(summary.usage_events_seen, 2);
745 assert_eq!(summary.turn_results_seen, 1);
746 assert_eq!(summary.totals.input_tokens, 13);
747 assert_eq!(summary.totals.output_tokens, 9);
748 assert_eq!(summary.totals.reasoning_tokens, 3);
749 assert_eq!(summary.totals.cached_input_tokens, 1);
750 assert_eq!(summary.totals.cache_write_input_tokens, 7);
751 }
752
753 #[test]
754 fn transcript_reporter_tracks_inputs_and_outputs() {
755 let reporter = TranscriptReporter::new();
756
757 reporter.handle_event(AgentEvent::InputAccepted {
758 session_id: SessionId::new("session-1"),
759 items: vec![Item {
760 id: None,
761 kind: ItemKind::User,
762 parts: vec![Part::Text(TextPart {
763 text: "hello".into(),
764 metadata: MetadataMap::new(),
765 })],
766 metadata: MetadataMap::new(),
767 usage: None,
768 finish_reason: None,
769 created_at: None,
770 }],
771 });
772
773 reporter.handle_event(AgentEvent::TurnFinished(TurnResult {
774 turn_id: "turn-1".into(),
775 finish_reason: FinishReason::Completed,
776 items: vec![Item {
777 id: None,
778 kind: ItemKind::Assistant,
779 parts: vec![Part::Text(TextPart {
780 text: "hi".into(),
781 metadata: MetadataMap::new(),
782 })],
783 metadata: MetadataMap::new(),
784 usage: None,
785 finish_reason: None,
786 created_at: None,
787 }],
788 usage: None,
789 metadata: MetadataMap::new(),
790 }));
791
792 assert_eq!(reporter.transcript().items.len(), 2);
793 assert_eq!(reporter.transcript().items[0].kind, ItemKind::User);
794 assert_eq!(reporter.transcript().items[1].kind, ItemKind::Assistant);
795 }
796
797 #[test]
798 fn jsonl_reporter_serializes_event_envelopes() {
799 let reporter = JsonlReporter::new(Vec::new());
800 reporter.handle_event(AgentEvent::RunStarted {
801 session_id: SessionId::new("session-1"),
802 });
803
804 let output = String::from_utf8(reporter.into_inner()).unwrap();
805 assert!(output.contains("\"RunStarted\""));
806 assert!(output.contains("session-1"));
807 }
808
809 fn run_started_event() -> AgentEvent {
810 AgentEvent::RunStarted {
811 session_id: SessionId::new("s1"),
812 }
813 }
814
815 #[test]
816 fn buffered_reporter_flushes_at_capacity() {
817 let reporter = BufferedReporter::new(UsageReporter::new(), 2);
818 reporter.handle_event(run_started_event());
819 assert_eq!(reporter.pending(), 1);
820 assert_eq!(reporter.inner().summary().events_seen, 0);
821
822 reporter.handle_event(run_started_event());
823 assert_eq!(reporter.pending(), 0);
824 assert_eq!(reporter.inner().summary().events_seen, 2);
825 }
826
827 #[test]
828 fn buffered_reporter_manual_flush() {
829 let reporter = BufferedReporter::new(UsageReporter::new(), 0);
830 reporter.handle_event(run_started_event());
831 reporter.handle_event(run_started_event());
832 assert_eq!(reporter.pending(), 2);
833
834 reporter.flush();
835 assert_eq!(reporter.pending(), 0);
836 assert_eq!(reporter.inner().summary().events_seen, 2);
837 }
838
839 #[test]
840 fn buffered_reporter_flushes_on_drop() {
841 let inner = {
842 let reporter = BufferedReporter::new(UsageReporter::new(), 100);
843 reporter.handle_event(run_started_event());
844 reporter.handle_event(run_started_event());
845 assert_eq!(reporter.inner().summary().events_seen, 0);
846 assert_eq!(reporter.pending(), 2);
849 reporter
850 };
851 assert_eq!(inner.inner().summary().events_seen, 0);
855 }
858
859 #[test]
860 fn channel_reporter_delivers_events() {
861 let (reporter, rx) = ChannelReporter::pair();
862 reporter.handle_event(run_started_event());
863 reporter.handle_event(run_started_event());
864
865 let events: Vec<_> = rx.try_iter().collect();
866 assert_eq!(events.len(), 2);
867 }
868
869 #[test]
870 fn channel_reporter_survives_dropped_receiver() {
871 let (reporter, rx) = ChannelReporter::pair();
872 drop(rx);
873 reporter.handle_event(run_started_event());
875 }
876
877 #[test]
878 fn channel_reporter_fallible_returns_error_on_dropped_receiver() {
879 let (reporter, rx) = ChannelReporter::pair();
880 drop(rx);
881
882 let result = reporter.try_handle_event(&run_started_event());
883 assert!(matches!(result, Err(ReportError::ChannelSend)));
884 }
885
886 #[test]
887 fn policy_reporter_ignore_swallows_errors() {
888 let (reporter, rx) = ChannelReporter::pair();
889 drop(rx);
890 let policy = PolicyReporter::new(reporter, FailurePolicy::Ignore);
891 policy.handle_event(run_started_event());
892 assert!(policy.take_errors().is_empty());
893 }
894
895 #[test]
896 fn policy_reporter_accumulate_collects_errors() {
897 let (reporter, rx) = ChannelReporter::pair();
898 drop(rx);
899 let policy = PolicyReporter::new(reporter, FailurePolicy::Accumulate);
900 policy.handle_event(run_started_event());
901 policy.handle_event(run_started_event());
902
903 let errors = policy.take_errors();
904 assert_eq!(errors.len(), 2);
905 assert!(matches!(errors[0], ReportError::ChannelSend));
906 }
907
908 #[test]
909 #[should_panic(expected = "reporter error: channel send failed")]
910 fn policy_reporter_fail_fast_panics() {
911 let (reporter, rx) = ChannelReporter::pair();
912 drop(rx);
913 let policy = PolicyReporter::new(reporter, FailurePolicy::FailFast);
914 policy.handle_event(run_started_event());
915 }
916}