mod buffered;
mod channel;
mod policy;
#[cfg(feature = "tracing")]
mod tracing_reporter;
pub use buffered::BufferedReporter;
pub use channel::ChannelReporter;
pub use policy::{FailurePolicy, FallibleObserver, PolicyReporter};
#[cfg(feature = "tracing")]
pub use tracing_reporter::TracingReporter;
use std::io::{self, Write};
use std::time::SystemTime;
use agentkit_core::{Item, ItemKind, Part, TokenUsage, Usage};
use agentkit_loop::{AgentEvent, LoopObserver, TurnResult};
use serde::Serialize;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum ReportError {
#[error("io error: {0}")]
Io(#[from] io::Error),
#[error("serialization error: {0}")]
Serialize(#[from] serde_json::Error),
#[error("channel send failed")]
ChannelSend,
}
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct EventEnvelope<'a> {
pub timestamp: SystemTime,
pub event: &'a AgentEvent,
}
#[derive(Default)]
pub struct CompositeReporter {
children: Vec<Box<dyn LoopObserver>>,
}
impl CompositeReporter {
pub fn new() -> Self {
Self::default()
}
pub fn with_observer(mut self, observer: impl LoopObserver + 'static) -> Self {
self.children.push(Box::new(observer));
self
}
pub fn push(&mut self, observer: impl LoopObserver + 'static) -> &mut Self {
self.children.push(Box::new(observer));
self
}
}
impl LoopObserver for CompositeReporter {
fn handle_event(&self, event: AgentEvent) {
for child in &self.children {
child.handle_event(event.clone());
}
}
}
pub struct JsonlReporter<W> {
writer: std::sync::Mutex<W>,
flush_each_event: bool,
errors: std::sync::Mutex<Vec<ReportError>>,
}
impl<W> JsonlReporter<W>
where
W: Write,
{
pub fn new(writer: W) -> Self {
Self {
writer: std::sync::Mutex::new(writer),
flush_each_event: true,
errors: std::sync::Mutex::new(Vec::new()),
}
}
pub fn with_flush_each_event(mut self, flush_each_event: bool) -> Self {
self.flush_each_event = flush_each_event;
self
}
pub fn take_errors(&self) -> Vec<ReportError> {
std::mem::take(&mut *self.errors.lock().unwrap_or_else(|e| e.into_inner()))
}
fn record_result(&self, result: Result<(), ReportError>) {
if let Err(error) = result {
self.errors
.lock()
.unwrap_or_else(|e| e.into_inner())
.push(error);
}
}
pub fn into_inner(self) -> W {
self.writer.into_inner().unwrap_or_else(|e| e.into_inner())
}
}
impl<W> LoopObserver for JsonlReporter<W>
where
W: Write + Send,
{
fn handle_event(&self, event: AgentEvent) {
let result = (|| -> Result<(), ReportError> {
let envelope = EventEnvelope {
timestamp: SystemTime::now(),
event: &event,
};
let mut buf = serde_json::to_vec(&envelope)?;
buf.push(b'\n');
let mut writer = self.writer.lock().unwrap_or_else(|e| e.into_inner());
writer.write_all(&buf)?;
if self.flush_each_event {
writer.flush()?;
}
Ok(())
})();
self.record_result(result);
}
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub struct UsageTotals {
pub input_tokens: u64,
pub output_tokens: u64,
pub reasoning_tokens: u64,
pub cached_input_tokens: u64,
pub cache_write_input_tokens: u64,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct CostTotals {
pub amount: f64,
pub currency: Option<String>,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct UsageSummary {
pub events_seen: usize,
pub usage_events_seen: usize,
pub turn_results_seen: usize,
pub totals: UsageTotals,
pub cost: Option<CostTotals>,
}
#[derive(Default)]
pub struct UsageReporter {
summary: std::sync::Mutex<UsageSummary>,
}
impl UsageReporter {
pub fn new() -> Self {
Self::default()
}
pub fn summary(&self) -> UsageSummary {
self.summary
.lock()
.unwrap_or_else(|e| e.into_inner())
.clone()
}
fn absorb(summary: &mut UsageSummary, usage: &Usage) {
summary.usage_events_seen += 1;
if let Some(tokens) = &usage.tokens {
summary.totals.input_tokens += tokens.input_tokens;
summary.totals.output_tokens += tokens.output_tokens;
summary.totals.reasoning_tokens += tokens.reasoning_tokens.unwrap_or_default();
summary.totals.cached_input_tokens += tokens.cached_input_tokens.unwrap_or_default();
summary.totals.cache_write_input_tokens +=
tokens.cache_write_input_tokens.unwrap_or_default();
}
if let Some(cost) = &usage.cost {
let totals = summary.cost.get_or_insert_with(CostTotals::default);
totals.amount += cost.amount;
if totals.currency.is_none() {
totals.currency = Some(cost.currency.clone());
}
}
}
}
impl LoopObserver for UsageReporter {
fn handle_event(&self, event: AgentEvent) {
let mut summary = self.summary.lock().unwrap_or_else(|e| e.into_inner());
summary.events_seen += 1;
match event {
AgentEvent::UsageUpdated(usage) => Self::absorb(&mut summary, &usage),
AgentEvent::TurnFinished(TurnResult {
usage: Some(usage), ..
}) => {
summary.turn_results_seen += 1;
Self::absorb(&mut summary, &usage);
}
AgentEvent::TurnFinished(_) => {
summary.turn_results_seen += 1;
}
_ => {}
}
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct TranscriptView {
pub items: Vec<Item>,
}
#[derive(Default)]
pub struct TranscriptReporter {
transcript: std::sync::Mutex<TranscriptView>,
}
impl TranscriptReporter {
pub fn new() -> Self {
Self::default()
}
pub fn transcript(&self) -> TranscriptView {
self.transcript
.lock()
.unwrap_or_else(|e| e.into_inner())
.clone()
}
}
impl LoopObserver for TranscriptReporter {
fn handle_event(&self, event: AgentEvent) {
let mut transcript = self.transcript.lock().unwrap_or_else(|e| e.into_inner());
match event {
AgentEvent::InputAccepted { items, .. } => {
transcript.items.extend(items);
}
AgentEvent::TurnFinished(result) => {
transcript.items.extend(result.items);
}
_ => {}
}
}
}
pub struct StdoutReporter<W> {
writer: std::sync::Mutex<W>,
show_usage: bool,
errors: std::sync::Mutex<Vec<ReportError>>,
}
impl<W> StdoutReporter<W>
where
W: Write,
{
pub fn new(writer: W) -> Self {
Self {
writer: std::sync::Mutex::new(writer),
show_usage: true,
errors: std::sync::Mutex::new(Vec::new()),
}
}
pub fn with_usage(mut self, show_usage: bool) -> Self {
self.show_usage = show_usage;
self
}
pub fn take_errors(&self) -> Vec<ReportError> {
std::mem::take(&mut *self.errors.lock().unwrap_or_else(|e| e.into_inner()))
}
fn record_result(&self, result: Result<(), ReportError>) {
if let Err(error) = result {
self.errors
.lock()
.unwrap_or_else(|e| e.into_inner())
.push(error);
}
}
}
impl<W> LoopObserver for StdoutReporter<W>
where
W: Write + Send,
{
fn handle_event(&self, event: AgentEvent) {
let result = (|| -> Result<(), ReportError> {
let mut buf: Vec<u8> = Vec::new();
write_stdout_event(&mut buf, &event, self.show_usage)?;
let mut writer = self.writer.lock().unwrap_or_else(|e| e.into_inner());
writer.write_all(&buf)?;
writer.flush()?;
Ok(())
})();
self.record_result(result);
}
}
fn write_stdout_event<W>(
writer: &mut W,
event: &AgentEvent,
show_usage: bool,
) -> Result<(), ReportError>
where
W: Write,
{
match event {
AgentEvent::RunStarted { session_id } => {
writeln!(writer, "[run] started session={session_id}")?;
}
AgentEvent::TurnStarted {
session_id,
turn_id,
} => {
writeln!(writer, "[turn] started session={session_id} turn={turn_id}")?;
}
AgentEvent::InputAccepted { items, .. } => {
writeln!(writer, "[input] accepted items={}", items.len())?;
}
AgentEvent::ContentDelta(delta) => {
writeln!(writer, "[delta] {delta:?}")?;
}
AgentEvent::ToolCallRequested(call) => {
writeln!(writer, "[tool] call {} {}", call.name, call.input)?;
}
AgentEvent::ToolResultReceived(result) => {
writeln!(
writer,
"[tool] result call_id={} is_error={}",
result.call_id, result.is_error
)?;
}
AgentEvent::ApprovalRequired(request) => {
writeln!(
writer,
"[approval] {} {:?}",
request.summary, request.reason
)?;
}
AgentEvent::ApprovalResolved { approved } => {
writeln!(writer, "[approval] resolved approved={approved}")?;
}
AgentEvent::ToolCatalogChanged(event) => {
writeln!(
writer,
"[tools] catalog changed source={} added={} removed={} changed={}",
event.source,
event.added.len(),
event.removed.len(),
event.changed.len()
)?;
}
AgentEvent::MutationStarted {
turn_id,
mutator,
point,
..
} => {
writeln!(
writer,
"[mutation] started turn={} mutator={mutator} point={point:?}",
turn_id
.as_ref()
.map(ToString::to_string)
.unwrap_or_else(|| "none".into()),
)?;
}
AgentEvent::MutationFinished {
turn_id,
mutator,
dirty,
..
} => {
writeln!(
writer,
"[mutation] finished turn={} mutator={mutator} dirty={dirty}",
turn_id
.as_ref()
.map(ToString::to_string)
.unwrap_or_else(|| "none".into()),
)?;
}
AgentEvent::UsageUpdated(usage) if show_usage => {
writeln!(writer, "[usage] {}", format_usage(usage))?;
}
AgentEvent::UsageUpdated(_) => {}
AgentEvent::Warning { message } => {
writeln!(writer, "[warning] {message}")?;
}
AgentEvent::RunFailed { message } => {
writeln!(writer, "[error] {message}")?;
}
AgentEvent::TurnFinished(result) => {
writeln!(
writer,
"[turn] finished reason={:?} items={}",
result.finish_reason,
result.items.len()
)?;
for item in &result.items {
write_item_summary(writer, item)?;
}
if show_usage && let Some(usage) = &result.usage {
writeln!(writer, "[usage] {}", format_usage(usage))?;
}
}
}
writer.flush()?;
Ok(())
}
fn write_item_summary<W>(writer: &mut W, item: &Item) -> Result<(), ReportError>
where
W: Write,
{
writeln!(writer, " [{}]", item_kind_name(item.kind))?;
for part in &item.parts {
match part {
Part::Text(text) => writeln!(writer, " [text] {}", text.text)?,
Part::Reasoning(reasoning) => {
if let Some(summary) = &reasoning.summary {
writeln!(writer, " [reasoning] {summary}")?;
} else {
writeln!(writer, " [reasoning]")?;
}
}
Part::ToolCall(call) => {
writeln!(writer, " [tool-call] {} {}", call.name, call.input)?
}
Part::ToolResult(result) => writeln!(
writer,
" [tool-result] call={} error={}",
result.call_id, result.is_error
)?,
Part::Structured(value) => writeln!(writer, " [structured] {}", value.value)?,
Part::Media(media) => writeln!(
writer,
" [media] {:?} {}",
media.modality, media.mime_type
)?,
Part::File(file) => writeln!(
writer,
" [file] {}",
file.name.as_deref().unwrap_or("<unnamed>")
)?,
Part::Custom(custom) => writeln!(writer, " [custom] {}", custom.kind)?,
}
}
Ok(())
}
fn item_kind_name(kind: ItemKind) -> &'static str {
match kind {
ItemKind::System => "system",
ItemKind::Developer => "developer",
ItemKind::User => "user",
ItemKind::Assistant => "assistant",
ItemKind::Tool => "tool",
ItemKind::Context => "context",
ItemKind::Notification => "notification",
}
}
fn format_usage(usage: &Usage) -> String {
match &usage.tokens {
Some(TokenUsage {
input_tokens,
output_tokens,
reasoning_tokens,
cached_input_tokens,
cache_write_input_tokens,
}) => format!(
"input={} output={} reasoning={} cached_input={} cache_write_input={}",
input_tokens,
output_tokens,
reasoning_tokens.unwrap_or_default(),
cached_input_tokens.unwrap_or_default(),
cache_write_input_tokens.unwrap_or_default()
),
None => "no token usage".into(),
}
}
#[cfg(test)]
mod tests {
use super::*;
use agentkit_core::{FinishReason, MetadataMap, SessionId, TextPart};
use agentkit_loop::TurnResult;
#[test]
fn usage_reporter_accumulates_usage_events_and_turn_results() {
let reporter = UsageReporter::new();
reporter.handle_event(AgentEvent::UsageUpdated(Usage {
tokens: Some(TokenUsage {
input_tokens: 10,
output_tokens: 5,
reasoning_tokens: Some(2),
cached_input_tokens: Some(1),
cache_write_input_tokens: Some(7),
}),
cost: None,
metadata: MetadataMap::new(),
}));
reporter.handle_event(AgentEvent::TurnFinished(TurnResult {
turn_id: "turn-1".into(),
finish_reason: FinishReason::Completed,
items: Vec::new(),
usage: Some(Usage {
tokens: Some(TokenUsage {
input_tokens: 3,
output_tokens: 4,
reasoning_tokens: Some(1),
cached_input_tokens: None,
cache_write_input_tokens: None,
}),
cost: None,
metadata: MetadataMap::new(),
}),
metadata: MetadataMap::new(),
}));
let summary = reporter.summary();
assert_eq!(summary.events_seen, 2);
assert_eq!(summary.usage_events_seen, 2);
assert_eq!(summary.turn_results_seen, 1);
assert_eq!(summary.totals.input_tokens, 13);
assert_eq!(summary.totals.output_tokens, 9);
assert_eq!(summary.totals.reasoning_tokens, 3);
assert_eq!(summary.totals.cached_input_tokens, 1);
assert_eq!(summary.totals.cache_write_input_tokens, 7);
}
#[test]
fn transcript_reporter_tracks_inputs_and_outputs() {
let reporter = TranscriptReporter::new();
reporter.handle_event(AgentEvent::InputAccepted {
session_id: SessionId::new("session-1"),
items: vec![Item {
id: None,
kind: ItemKind::User,
parts: vec![Part::Text(TextPart {
text: "hello".into(),
metadata: MetadataMap::new(),
})],
metadata: MetadataMap::new(),
usage: None,
finish_reason: None,
created_at: None,
}],
});
reporter.handle_event(AgentEvent::TurnFinished(TurnResult {
turn_id: "turn-1".into(),
finish_reason: FinishReason::Completed,
items: vec![Item {
id: None,
kind: ItemKind::Assistant,
parts: vec![Part::Text(TextPart {
text: "hi".into(),
metadata: MetadataMap::new(),
})],
metadata: MetadataMap::new(),
usage: None,
finish_reason: None,
created_at: None,
}],
usage: None,
metadata: MetadataMap::new(),
}));
assert_eq!(reporter.transcript().items.len(), 2);
assert_eq!(reporter.transcript().items[0].kind, ItemKind::User);
assert_eq!(reporter.transcript().items[1].kind, ItemKind::Assistant);
}
#[test]
fn jsonl_reporter_serializes_event_envelopes() {
let reporter = JsonlReporter::new(Vec::new());
reporter.handle_event(AgentEvent::RunStarted {
session_id: SessionId::new("session-1"),
});
let output = String::from_utf8(reporter.into_inner()).unwrap();
assert!(output.contains("\"RunStarted\""));
assert!(output.contains("session-1"));
}
fn run_started_event() -> AgentEvent {
AgentEvent::RunStarted {
session_id: SessionId::new("s1"),
}
}
#[test]
fn buffered_reporter_flushes_at_capacity() {
let reporter = BufferedReporter::new(UsageReporter::new(), 2);
reporter.handle_event(run_started_event());
assert_eq!(reporter.pending(), 1);
assert_eq!(reporter.inner().summary().events_seen, 0);
reporter.handle_event(run_started_event());
assert_eq!(reporter.pending(), 0);
assert_eq!(reporter.inner().summary().events_seen, 2);
}
#[test]
fn buffered_reporter_manual_flush() {
let reporter = BufferedReporter::new(UsageReporter::new(), 0);
reporter.handle_event(run_started_event());
reporter.handle_event(run_started_event());
assert_eq!(reporter.pending(), 2);
reporter.flush();
assert_eq!(reporter.pending(), 0);
assert_eq!(reporter.inner().summary().events_seen, 2);
}
#[test]
fn buffered_reporter_flushes_on_drop() {
let inner = {
let reporter = BufferedReporter::new(UsageReporter::new(), 100);
reporter.handle_event(run_started_event());
reporter.handle_event(run_started_event());
assert_eq!(reporter.inner().summary().events_seen, 0);
assert_eq!(reporter.pending(), 2);
reporter
};
assert_eq!(inner.inner().summary().events_seen, 0);
}
#[test]
fn channel_reporter_delivers_events() {
let (reporter, rx) = ChannelReporter::pair();
reporter.handle_event(run_started_event());
reporter.handle_event(run_started_event());
let events: Vec<_> = rx.try_iter().collect();
assert_eq!(events.len(), 2);
}
#[test]
fn channel_reporter_survives_dropped_receiver() {
let (reporter, rx) = ChannelReporter::pair();
drop(rx);
reporter.handle_event(run_started_event());
}
#[test]
fn channel_reporter_fallible_returns_error_on_dropped_receiver() {
let (reporter, rx) = ChannelReporter::pair();
drop(rx);
let result = reporter.try_handle_event(&run_started_event());
assert!(matches!(result, Err(ReportError::ChannelSend)));
}
#[test]
fn policy_reporter_ignore_swallows_errors() {
let (reporter, rx) = ChannelReporter::pair();
drop(rx);
let policy = PolicyReporter::new(reporter, FailurePolicy::Ignore);
policy.handle_event(run_started_event());
assert!(policy.take_errors().is_empty());
}
#[test]
fn policy_reporter_accumulate_collects_errors() {
let (reporter, rx) = ChannelReporter::pair();
drop(rx);
let policy = PolicyReporter::new(reporter, FailurePolicy::Accumulate);
policy.handle_event(run_started_event());
policy.handle_event(run_started_event());
let errors = policy.take_errors();
assert_eq!(errors.len(), 2);
assert!(matches!(errors[0], ReportError::ChannelSend));
}
#[test]
#[should_panic(expected = "reporter error: channel send failed")]
fn policy_reporter_fail_fast_panics() {
let (reporter, rx) = ChannelReporter::pair();
drop(rx);
let policy = PolicyReporter::new(reporter, FailurePolicy::FailFast);
policy.handle_event(run_started_event());
}
}