use std::sync::{Arc, Mutex};
use std::time::SystemTime;
use serde::{Deserialize, Serialize};
use crate::harness::cost::CostTotals;
use crate::harness::ids::{
CallId, ComponentId, EventId, ExecutionStatus, HarnessPhase, RunId, ThreadId,
};
use crate::harness::message::MessageDelta;
use crate::harness::usage::{Usage, UsageTotals};
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case", tag = "kind")]
pub enum AgentEvent {
RunStarted {
run_id: RunId,
#[serde(default, skip_serializing_if = "Option::is_none")]
thread_id: Option<ThreadId>,
},
ModelStarted {
call_id: CallId,
model: String,
},
ModelDelta {
call_id: CallId,
delta: MessageDelta,
},
ModelCompleted {
call_id: CallId,
#[serde(default, skip_serializing_if = "Option::is_none")]
usage: Option<Usage>,
},
ToolStarted {
call_id: CallId,
tool_name: String,
},
ToolCompleted {
call_id: CallId,
tool_name: String,
},
StateUpdate,
MiddlewareStarted {
name: String,
},
MiddlewareCompleted {
name: String,
},
CacheHit {
call_id: CallId,
key: String,
},
CacheMiss {
call_id: CallId,
key: String,
},
RetryScheduled {
call_id: CallId,
attempt: usize,
},
RateLimitWaited {
waited_ms: u64,
},
FallbackSelected {
from: String,
to: String,
},
SubAgentStarted {
name: String,
depth: usize,
},
SubAgentCompleted {
name: String,
depth: usize,
},
SubAgentReused {
name: String,
turn: usize,
},
Steered {
command_kind: String,
accepted: bool,
},
Compressed {
from_tokens: u64,
to_tokens: u64,
},
RouteSelected {
route: String,
},
UsageRecorded {
usage: Usage,
},
CostRecorded {
cost: CostTotals,
},
LimitReached {
#[serde(rename = "limit_kind")]
kind: LimitKind,
},
MemoryLoaded,
MemorySaved,
ToolProgress {
call_id: CallId,
message: String,
},
MiddlewareFailed {
name: String,
error: String,
},
StreamClosed,
RunCompleted {
run_id: RunId,
},
RunFailed {
run_id: RunId,
error: String,
},
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum LimitKind {
ModelCalls,
ToolCalls,
WallClock,
}
impl LimitKind {
pub fn as_str(&self) -> &'static str {
match self {
LimitKind::ModelCalls => "model_calls",
LimitKind::ToolCalls => "tool_calls",
LimitKind::WallClock => "wall_clock",
}
}
}
impl AgentEvent {
pub fn kind(&self) -> &'static str {
match self {
AgentEvent::RunStarted { .. } => "run.started",
AgentEvent::ModelStarted { .. } => "model.started",
AgentEvent::ModelDelta { .. } => "model.delta",
AgentEvent::ModelCompleted { .. } => "model.completed",
AgentEvent::ToolStarted { .. } => "tool.started",
AgentEvent::ToolCompleted { .. } => "tool.completed",
AgentEvent::StateUpdate => "state.update",
AgentEvent::MiddlewareStarted { .. } => "middleware.started",
AgentEvent::MiddlewareCompleted { .. } => "middleware.completed",
AgentEvent::CacheHit { .. } => "cache.hit",
AgentEvent::CacheMiss { .. } => "cache.miss",
AgentEvent::RetryScheduled { .. } => "retry.scheduled",
AgentEvent::RateLimitWaited { .. } => "rate_limit.waited",
AgentEvent::FallbackSelected { .. } => "model.fallback_selected",
AgentEvent::SubAgentStarted { .. } => "subagent.started",
AgentEvent::SubAgentCompleted { .. } => "subagent.completed",
AgentEvent::SubAgentReused { .. } => "subagent.reused",
AgentEvent::Steered { .. } => "agent.steered",
AgentEvent::Compressed { .. } => "context.compressed",
AgentEvent::RouteSelected { .. } => "route.selected",
AgentEvent::UsageRecorded { .. } => "usage.recorded",
AgentEvent::CostRecorded { .. } => "cost.recorded",
AgentEvent::LimitReached { .. } => "limit.reached",
AgentEvent::MemoryLoaded => "memory.loaded",
AgentEvent::MemorySaved => "memory.saved",
AgentEvent::ToolProgress { .. } => "tool.progress",
AgentEvent::MiddlewareFailed { .. } => "middleware.failed",
AgentEvent::StreamClosed => "stream.closed",
AgentEvent::RunCompleted { .. } => "run.completed",
AgentEvent::RunFailed { .. } => "run.failed",
}
}
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct EventRecord {
pub id: EventId,
pub offset: u64,
pub event: AgentEvent,
}
pub trait EventListener: Send + Sync {
fn on_event(&self, record: &EventRecord);
}
#[derive(Clone)]
pub struct EventSink {
pub(crate) inner: Arc<Mutex<EventSinkInner>>,
}
pub(crate) struct EventSinkInner {
pub(crate) next_offset: u64,
pub(crate) listeners: Vec<Arc<dyn EventListener>>,
}
pub struct RecordingListener {
pub(crate) records: Arc<Mutex<Vec<EventRecord>>>,
}
pub struct EventJournal {
pub(crate) records: Arc<Mutex<Vec<EventRecord>>>,
pub(crate) sink: EventSink,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct HarnessRunStatus {
pub run_id: RunId,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub parent_run_id: Option<RunId>,
pub root_run_id: RunId,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub thread_id: Option<ThreadId>,
pub component: ComponentId,
pub status: ExecutionStatus,
pub current_phase: HarnessPhase,
pub model_calls: usize,
pub tool_calls: usize,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub active_model_call: Option<CallId>,
#[serde(default)]
pub active_tool_calls: Vec<CallId>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_event_id: Option<EventId>,
pub usage: UsageTotals,
pub cost: CostTotals,
#[serde(with = "serde_system_time")]
pub started_at: SystemTime,
#[serde(with = "serde_system_time")]
pub updated_at: SystemTime,
#[serde(
default,
skip_serializing_if = "Option::is_none",
with = "serde_system_time_opt"
)]
pub ended_at: Option<SystemTime>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
#[serde(default)]
pub metadata: serde_json::Value,
}
mod serde_system_time {
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use serde::{Deserialize, Deserializer, Serializer};
pub fn serialize<S: Serializer>(t: &SystemTime, s: S) -> Result<S::Ok, S::Error> {
let secs = t
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_secs();
s.serialize_u64(secs)
}
pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<SystemTime, D::Error> {
let secs = u64::deserialize(d)?;
Ok(UNIX_EPOCH + Duration::from_secs(secs))
}
}
mod serde_system_time_opt {
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use serde::{Deserialize, Deserializer, Serializer};
pub fn serialize<S: Serializer>(t: &Option<SystemTime>, s: S) -> Result<S::Ok, S::Error> {
match t {
Some(t) => {
let secs = t
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_secs();
s.serialize_some(&secs)
}
None => s.serialize_none(),
}
}
pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Option<SystemTime>, D::Error> {
let secs = Option::<u64>::deserialize(d)?;
Ok(secs.map(|s| UNIX_EPOCH + Duration::from_secs(s)))
}
}