1mod buffered;
44mod channel;
45mod policy;
46
47#[cfg(feature = "tracing")]
48mod tracing_reporter;
49
50pub use buffered::BufferedReporter;
51pub use channel::ChannelReporter;
52pub use policy::{FailurePolicy, FallibleObserver, PolicyReporter};
53
54#[cfg(feature = "tracing")]
55pub use tracing_reporter::TracingReporter;
56
57use std::io::{self, Write};
58use std::time::SystemTime;
59
60use agentkit_core::{Item, ItemKind, Part, TokenUsage, Usage};
61use agentkit_loop::{AgentEvent, LoopObserver, TurnResult};
62use serde::Serialize;
63use thiserror::Error;
64
65#[derive(Debug, Error)]
72pub enum ReportError {
73 #[error("io error: {0}")]
75 Io(#[from] io::Error),
76 #[error("serialization error: {0}")]
78 Serialize(#[from] serde_json::Error),
79 #[error("channel send failed")]
81 ChannelSend,
82}
83
84#[derive(Clone, Debug, PartialEq, Serialize)]
90pub struct EventEnvelope<'a> {
91 pub timestamp: SystemTime,
93 pub event: &'a AgentEvent,
95}
96
97#[derive(Default)]
117pub struct CompositeReporter {
118 children: Vec<Box<dyn LoopObserver>>,
119}
120
121impl CompositeReporter {
122 pub fn new() -> Self {
124 Self::default()
125 }
126
127 pub fn with_observer(mut self, observer: impl LoopObserver + 'static) -> Self {
133 self.children.push(Box::new(observer));
134 self
135 }
136
137 pub fn push(&mut self, observer: impl LoopObserver + 'static) -> &mut Self {
146 self.children.push(Box::new(observer));
147 self
148 }
149}
150
151impl LoopObserver for CompositeReporter {
152 fn handle_event(&self, event: AgentEvent) {
153 for child in &self.children {
154 child.handle_event(event.clone());
155 }
156 }
157}
158
159pub struct JsonlReporter<W> {
184 writer: std::sync::Mutex<W>,
185 flush_each_event: bool,
186 errors: std::sync::Mutex<Vec<ReportError>>,
187}
188
189impl<W> JsonlReporter<W>
190where
191 W: Write,
192{
193 pub fn new(writer: W) -> Self {
195 Self {
196 writer: std::sync::Mutex::new(writer),
197 flush_each_event: true,
198 errors: std::sync::Mutex::new(Vec::new()),
199 }
200 }
201
202 pub fn with_flush_each_event(mut self, flush_each_event: bool) -> Self {
204 self.flush_each_event = flush_each_event;
205 self
206 }
207
208 pub fn take_errors(&self) -> Vec<ReportError> {
210 std::mem::take(&mut *self.errors.lock().unwrap_or_else(|e| e.into_inner()))
211 }
212
213 fn record_result(&self, result: Result<(), ReportError>) {
214 if let Err(error) = result {
215 self.errors.lock().unwrap_or_else(|e| e.into_inner()).push(error);
216 }
217 }
218
219 pub fn into_inner(self) -> W {
221 self.writer.into_inner().unwrap_or_else(|e| e.into_inner())
222 }
223}
224
225impl<W> LoopObserver for JsonlReporter<W>
226where
227 W: Write + Send,
228{
229 fn handle_event(&self, event: AgentEvent) {
230 let result = (|| -> Result<(), ReportError> {
231 let envelope = EventEnvelope {
232 timestamp: SystemTime::now(),
233 event: &event,
234 };
235 let mut buf = serde_json::to_vec(&envelope)?;
236 buf.push(b'\n');
237 let mut writer = self.writer.lock().unwrap_or_else(|e| e.into_inner());
238 writer.write_all(&buf)?;
239 if self.flush_each_event {
240 writer.flush()?;
241 }
242 Ok(())
243 })();
244 self.record_result(result);
245 }
246}
247
248#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
250pub struct UsageTotals {
251 pub input_tokens: u64,
253 pub output_tokens: u64,
255 pub reasoning_tokens: u64,
257 pub cached_input_tokens: u64,
259 pub cache_write_input_tokens: u64,
261}
262
263#[derive(Clone, Debug, Default, PartialEq)]
265pub struct CostTotals {
266 pub amount: f64,
268 pub currency: Option<String>,
270}
271
272#[derive(Clone, Debug, Default, PartialEq)]
276pub struct UsageSummary {
277 pub events_seen: usize,
279 pub usage_events_seen: usize,
282 pub turn_results_seen: usize,
284 pub totals: UsageTotals,
286 pub cost: Option<CostTotals>,
288}
289
290#[derive(Default)]
314pub struct UsageReporter {
315 summary: std::sync::Mutex<UsageSummary>,
316}
317
318impl UsageReporter {
319 pub fn new() -> Self {
321 Self::default()
322 }
323
324 pub fn summary(&self) -> UsageSummary {
326 self.summary.lock().unwrap_or_else(|e| e.into_inner()).clone()
327 }
328
329 fn absorb(summary: &mut UsageSummary, usage: &Usage) {
330 summary.usage_events_seen += 1;
331 if let Some(tokens) = &usage.tokens {
332 summary.totals.input_tokens += tokens.input_tokens;
333 summary.totals.output_tokens += tokens.output_tokens;
334 summary.totals.reasoning_tokens += tokens.reasoning_tokens.unwrap_or_default();
335 summary.totals.cached_input_tokens += tokens.cached_input_tokens.unwrap_or_default();
336 summary.totals.cache_write_input_tokens +=
337 tokens.cache_write_input_tokens.unwrap_or_default();
338 }
339 if let Some(cost) = &usage.cost {
340 let totals = summary.cost.get_or_insert_with(CostTotals::default);
341 totals.amount += cost.amount;
342 if totals.currency.is_none() {
343 totals.currency = Some(cost.currency.clone());
344 }
345 }
346 }
347}
348
349impl LoopObserver for UsageReporter {
350 fn handle_event(&self, event: AgentEvent) {
351 let mut summary = self.summary.lock().unwrap_or_else(|e| e.into_inner());
352 summary.events_seen += 1;
353 match event {
354 AgentEvent::UsageUpdated(usage) => Self::absorb(&mut summary, &usage),
355 AgentEvent::TurnFinished(TurnResult {
356 usage: Some(usage), ..
357 }) => {
358 summary.turn_results_seen += 1;
359 Self::absorb(&mut summary, &usage);
360 }
361 AgentEvent::TurnFinished(_) => {
362 summary.turn_results_seen += 1;
363 }
364 _ => {}
365 }
366 }
367}
368
369#[derive(Clone, Debug, Default, PartialEq)]
374pub struct TranscriptView {
375 pub items: Vec<Item>,
377}
378
379#[derive(Default)]
400pub struct TranscriptReporter {
401 transcript: std::sync::Mutex<TranscriptView>,
402}
403
404impl TranscriptReporter {
405 pub fn new() -> Self {
407 Self::default()
408 }
409
410 pub fn transcript(&self) -> TranscriptView {
412 self.transcript.lock().unwrap_or_else(|e| e.into_inner()).clone()
413 }
414}
415
416impl LoopObserver for TranscriptReporter {
417 fn handle_event(&self, event: AgentEvent) {
418 let mut transcript = self.transcript.lock().unwrap_or_else(|e| e.into_inner());
419 match event {
420 AgentEvent::InputAccepted { items, .. } => {
421 transcript.items.extend(items);
422 }
423 AgentEvent::TurnFinished(result) => {
424 transcript.items.extend(result.items);
425 }
426 _ => {}
427 }
428 }
429}
430
431pub struct StdoutReporter<W> {
451 writer: std::sync::Mutex<W>,
452 show_usage: bool,
453 errors: std::sync::Mutex<Vec<ReportError>>,
454}
455
456impl<W> StdoutReporter<W>
457where
458 W: Write,
459{
460 pub fn new(writer: W) -> Self {
462 Self {
463 writer: std::sync::Mutex::new(writer),
464 show_usage: true,
465 errors: std::sync::Mutex::new(Vec::new()),
466 }
467 }
468
469 pub fn with_usage(mut self, show_usage: bool) -> Self {
471 self.show_usage = show_usage;
472 self
473 }
474
475 pub fn take_errors(&self) -> Vec<ReportError> {
477 std::mem::take(&mut *self.errors.lock().unwrap_or_else(|e| e.into_inner()))
478 }
479
480 fn record_result(&self, result: Result<(), ReportError>) {
481 if let Err(error) = result {
482 self.errors.lock().unwrap_or_else(|e| e.into_inner()).push(error);
483 }
484 }
485}
486
487impl<W> LoopObserver for StdoutReporter<W>
488where
489 W: Write + Send,
490{
491 fn handle_event(&self, event: AgentEvent) {
492 let result = (|| -> Result<(), ReportError> {
493 let mut buf: Vec<u8> = Vec::new();
494 write_stdout_event(&mut buf, &event, self.show_usage)?;
495 let mut writer = self.writer.lock().unwrap_or_else(|e| e.into_inner());
496 writer.write_all(&buf)?;
497 writer.flush()?;
498 Ok(())
499 })();
500 self.record_result(result);
501 }
502}
503
504fn write_stdout_event<W>(
505 writer: &mut W,
506 event: &AgentEvent,
507 show_usage: bool,
508) -> Result<(), ReportError>
509where
510 W: Write,
511{
512 match event {
513 AgentEvent::RunStarted { session_id } => {
514 writeln!(writer, "[run] started session={session_id}")?;
515 }
516 AgentEvent::TurnStarted {
517 session_id,
518 turn_id,
519 } => {
520 writeln!(writer, "[turn] started session={session_id} turn={turn_id}")?;
521 }
522 AgentEvent::InputAccepted { items, .. } => {
523 writeln!(writer, "[input] accepted items={}", items.len())?;
524 }
525 AgentEvent::ContentDelta(delta) => {
526 writeln!(writer, "[delta] {delta:?}")?;
527 }
528 AgentEvent::ToolCallRequested(call) => {
529 writeln!(writer, "[tool] call {} {}", call.name, call.input)?;
530 }
531 AgentEvent::ToolResultReceived(result) => {
532 writeln!(
533 writer,
534 "[tool] result call_id={} is_error={}",
535 result.call_id, result.is_error
536 )?;
537 }
538 AgentEvent::ApprovalRequired(request) => {
539 writeln!(
540 writer,
541 "[approval] {} {:?}",
542 request.summary, request.reason
543 )?;
544 }
545 AgentEvent::ApprovalResolved { approved } => {
546 writeln!(writer, "[approval] resolved approved={approved}")?;
547 }
548 AgentEvent::ToolCatalogChanged(event) => {
549 writeln!(
550 writer,
551 "[tools] catalog changed source={} added={} removed={} changed={}",
552 event.source,
553 event.added.len(),
554 event.removed.len(),
555 event.changed.len()
556 )?;
557 }
558 AgentEvent::MutationStarted {
559 turn_id,
560 mutator,
561 point,
562 ..
563 } => {
564 writeln!(
565 writer,
566 "[mutation] started turn={} mutator={mutator} point={point:?}",
567 turn_id
568 .as_ref()
569 .map(ToString::to_string)
570 .unwrap_or_else(|| "none".into()),
571 )?;
572 }
573 AgentEvent::MutationFinished {
574 turn_id,
575 mutator,
576 dirty,
577 ..
578 } => {
579 writeln!(
580 writer,
581 "[mutation] finished turn={} mutator={mutator} dirty={dirty}",
582 turn_id
583 .as_ref()
584 .map(ToString::to_string)
585 .unwrap_or_else(|| "none".into()),
586 )?;
587 }
588 AgentEvent::UsageUpdated(usage) if show_usage => {
589 writeln!(writer, "[usage] {}", format_usage(usage))?;
590 }
591 AgentEvent::UsageUpdated(_) => {}
592 AgentEvent::Warning { message } => {
593 writeln!(writer, "[warning] {message}")?;
594 }
595 AgentEvent::RunFailed { message } => {
596 writeln!(writer, "[error] {message}")?;
597 }
598 AgentEvent::TurnFinished(result) => {
599 writeln!(
600 writer,
601 "[turn] finished reason={:?} items={}",
602 result.finish_reason,
603 result.items.len()
604 )?;
605 for item in &result.items {
606 write_item_summary(writer, item)?;
607 }
608 if show_usage && let Some(usage) = &result.usage {
609 writeln!(writer, "[usage] {}", format_usage(usage))?;
610 }
611 }
612 }
613
614 writer.flush()?;
615 Ok(())
616}
617
618fn write_item_summary<W>(writer: &mut W, item: &Item) -> Result<(), ReportError>
619where
620 W: Write,
621{
622 writeln!(writer, " [{}]", item_kind_name(item.kind))?;
623 for part in &item.parts {
624 match part {
625 Part::Text(text) => writeln!(writer, " [text] {}", text.text)?,
626 Part::Reasoning(reasoning) => {
627 if let Some(summary) = &reasoning.summary {
628 writeln!(writer, " [reasoning] {summary}")?;
629 } else {
630 writeln!(writer, " [reasoning]")?;
631 }
632 }
633 Part::ToolCall(call) => {
634 writeln!(writer, " [tool-call] {} {}", call.name, call.input)?
635 }
636 Part::ToolResult(result) => writeln!(
637 writer,
638 " [tool-result] call={} error={}",
639 result.call_id, result.is_error
640 )?,
641 Part::Structured(value) => writeln!(writer, " [structured] {}", value.value)?,
642 Part::Media(media) => writeln!(
643 writer,
644 " [media] {:?} {}",
645 media.modality, media.mime_type
646 )?,
647 Part::File(file) => writeln!(
648 writer,
649 " [file] {}",
650 file.name.as_deref().unwrap_or("<unnamed>")
651 )?,
652 Part::Custom(custom) => writeln!(writer, " [custom] {}", custom.kind)?,
653 }
654 }
655 Ok(())
656}
657
658fn item_kind_name(kind: ItemKind) -> &'static str {
659 match kind {
660 ItemKind::System => "system",
661 ItemKind::Developer => "developer",
662 ItemKind::User => "user",
663 ItemKind::Assistant => "assistant",
664 ItemKind::Tool => "tool",
665 ItemKind::Context => "context",
666 ItemKind::Notification => "notification",
667 }
668}
669
670fn format_usage(usage: &Usage) -> String {
671 match &usage.tokens {
672 Some(TokenUsage {
673 input_tokens,
674 output_tokens,
675 reasoning_tokens,
676 cached_input_tokens,
677 cache_write_input_tokens,
678 }) => format!(
679 "input={} output={} reasoning={} cached_input={} cache_write_input={}",
680 input_tokens,
681 output_tokens,
682 reasoning_tokens.unwrap_or_default(),
683 cached_input_tokens.unwrap_or_default(),
684 cache_write_input_tokens.unwrap_or_default()
685 ),
686 None => "no token usage".into(),
687 }
688}
689
690#[cfg(test)]
691mod tests {
692 use super::*;
693 use agentkit_core::{FinishReason, MetadataMap, SessionId, TextPart};
694 use agentkit_loop::TurnResult;
695
696 #[test]
697 fn usage_reporter_accumulates_usage_events_and_turn_results() {
698 let reporter = UsageReporter::new();
699
700 reporter.handle_event(AgentEvent::UsageUpdated(Usage {
701 tokens: Some(TokenUsage {
702 input_tokens: 10,
703 output_tokens: 5,
704 reasoning_tokens: Some(2),
705 cached_input_tokens: Some(1),
706 cache_write_input_tokens: Some(7),
707 }),
708 cost: None,
709 metadata: MetadataMap::new(),
710 }));
711
712 reporter.handle_event(AgentEvent::TurnFinished(TurnResult {
713 turn_id: "turn-1".into(),
714 finish_reason: FinishReason::Completed,
715 items: Vec::new(),
716 usage: Some(Usage {
717 tokens: Some(TokenUsage {
718 input_tokens: 3,
719 output_tokens: 4,
720 reasoning_tokens: Some(1),
721 cached_input_tokens: None,
722 cache_write_input_tokens: None,
723 }),
724 cost: None,
725 metadata: MetadataMap::new(),
726 }),
727 metadata: MetadataMap::new(),
728 }));
729
730 let summary = reporter.summary();
731 assert_eq!(summary.events_seen, 2);
732 assert_eq!(summary.usage_events_seen, 2);
733 assert_eq!(summary.turn_results_seen, 1);
734 assert_eq!(summary.totals.input_tokens, 13);
735 assert_eq!(summary.totals.output_tokens, 9);
736 assert_eq!(summary.totals.reasoning_tokens, 3);
737 assert_eq!(summary.totals.cached_input_tokens, 1);
738 assert_eq!(summary.totals.cache_write_input_tokens, 7);
739 }
740
741 #[test]
742 fn transcript_reporter_tracks_inputs_and_outputs() {
743 let reporter = TranscriptReporter::new();
744
745 reporter.handle_event(AgentEvent::InputAccepted {
746 session_id: SessionId::new("session-1"),
747 items: vec![Item {
748 id: None,
749 kind: ItemKind::User,
750 parts: vec![Part::Text(TextPart {
751 text: "hello".into(),
752 metadata: MetadataMap::new(),
753 })],
754 metadata: MetadataMap::new(),
755 usage: None,
756 finish_reason: None,
757 created_at: None,
758 }],
759 });
760
761 reporter.handle_event(AgentEvent::TurnFinished(TurnResult {
762 turn_id: "turn-1".into(),
763 finish_reason: FinishReason::Completed,
764 items: vec![Item {
765 id: None,
766 kind: ItemKind::Assistant,
767 parts: vec![Part::Text(TextPart {
768 text: "hi".into(),
769 metadata: MetadataMap::new(),
770 })],
771 metadata: MetadataMap::new(),
772 usage: None,
773 finish_reason: None,
774 created_at: None,
775 }],
776 usage: None,
777 metadata: MetadataMap::new(),
778 }));
779
780 assert_eq!(reporter.transcript().items.len(), 2);
781 assert_eq!(reporter.transcript().items[0].kind, ItemKind::User);
782 assert_eq!(reporter.transcript().items[1].kind, ItemKind::Assistant);
783 }
784
785 #[test]
786 fn jsonl_reporter_serializes_event_envelopes() {
787 let reporter = JsonlReporter::new(Vec::new());
788 reporter.handle_event(AgentEvent::RunStarted {
789 session_id: SessionId::new("session-1"),
790 });
791
792 let output = String::from_utf8(reporter.into_inner()).unwrap();
793 assert!(output.contains("\"RunStarted\""));
794 assert!(output.contains("session-1"));
795 }
796
797 fn run_started_event() -> AgentEvent {
798 AgentEvent::RunStarted {
799 session_id: SessionId::new("s1"),
800 }
801 }
802
803 #[test]
804 fn buffered_reporter_flushes_at_capacity() {
805 let reporter = BufferedReporter::new(UsageReporter::new(), 2);
806 reporter.handle_event(run_started_event());
807 assert_eq!(reporter.pending(), 1);
808 assert_eq!(reporter.inner().summary().events_seen, 0);
809
810 reporter.handle_event(run_started_event());
811 assert_eq!(reporter.pending(), 0);
812 assert_eq!(reporter.inner().summary().events_seen, 2);
813 }
814
815 #[test]
816 fn buffered_reporter_manual_flush() {
817 let reporter = BufferedReporter::new(UsageReporter::new(), 0);
818 reporter.handle_event(run_started_event());
819 reporter.handle_event(run_started_event());
820 assert_eq!(reporter.pending(), 2);
821
822 reporter.flush();
823 assert_eq!(reporter.pending(), 0);
824 assert_eq!(reporter.inner().summary().events_seen, 2);
825 }
826
827 #[test]
828 fn buffered_reporter_flushes_on_drop() {
829 let inner = {
830 let reporter = BufferedReporter::new(UsageReporter::new(), 100);
831 reporter.handle_event(run_started_event());
832 reporter.handle_event(run_started_event());
833 assert_eq!(reporter.inner().summary().events_seen, 0);
834 assert_eq!(reporter.pending(), 2);
837 reporter
838 };
839 assert_eq!(inner.inner().summary().events_seen, 0);
843 }
846
847 #[test]
848 fn channel_reporter_delivers_events() {
849 let (reporter, rx) = ChannelReporter::pair();
850 reporter.handle_event(run_started_event());
851 reporter.handle_event(run_started_event());
852
853 let events: Vec<_> = rx.try_iter().collect();
854 assert_eq!(events.len(), 2);
855 }
856
857 #[test]
858 fn channel_reporter_survives_dropped_receiver() {
859 let (reporter, rx) = ChannelReporter::pair();
860 drop(rx);
861 reporter.handle_event(run_started_event());
863 }
864
865 #[test]
866 fn channel_reporter_fallible_returns_error_on_dropped_receiver() {
867 let (reporter, rx) = ChannelReporter::pair();
868 drop(rx);
869
870 let result = reporter.try_handle_event(&run_started_event());
871 assert!(matches!(result, Err(ReportError::ChannelSend)));
872 }
873
874 #[test]
875 fn policy_reporter_ignore_swallows_errors() {
876 let (reporter, rx) = ChannelReporter::pair();
877 drop(rx);
878 let policy = PolicyReporter::new(reporter, FailurePolicy::Ignore);
879 policy.handle_event(run_started_event());
880 assert!(policy.take_errors().is_empty());
881 }
882
883 #[test]
884 fn policy_reporter_accumulate_collects_errors() {
885 let (reporter, rx) = ChannelReporter::pair();
886 drop(rx);
887 let policy = PolicyReporter::new(reporter, FailurePolicy::Accumulate);
888 policy.handle_event(run_started_event());
889 policy.handle_event(run_started_event());
890
891 let errors = policy.take_errors();
892 assert_eq!(errors.len(), 2);
893 assert!(matches!(errors[0], ReportError::ChannelSend));
894 }
895
896 #[test]
897 #[should_panic(expected = "reporter error: channel send failed")]
898 fn policy_reporter_fail_fast_panics() {
899 let (reporter, rx) = ChannelReporter::pair();
900 drop(rx);
901 let policy = PolicyReporter::new(reporter, FailurePolicy::FailFast);
902 policy.handle_event(run_started_event());
903 }
904}