mod types;
pub use types::*;
use types::EventSinkInner;
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
use crate::harness::cost::CostTotals;
use crate::harness::ids::{ComponentId, ExecutionStatus, HarnessPhase, RunId, ThreadId};
use crate::harness::usage::UsageTotals;
impl EventSink {
pub fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(EventSinkInner {
next_offset: 0,
listeners: Vec::new(),
})),
}
}
pub fn subscribe(&self, listener: Arc<dyn EventListener>) {
let mut inner = self.inner.lock().expect("EventSink lock poisoned");
inner.listeners.push(listener);
}
pub fn emit(&self, event: AgentEvent) -> EventRecord {
let mut inner = self.inner.lock().expect("EventSink lock poisoned");
let offset = inner.next_offset;
inner.next_offset += 1;
let id = crate::harness::ids::EventId::new(format!("evt-{offset}"));
let record = EventRecord { id, offset, event };
for listener in &inner.listeners {
listener.on_event(&record);
}
record
}
pub fn len(&self) -> usize {
self.inner
.lock()
.expect("EventSink lock poisoned")
.listeners
.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl Default for EventSink {
fn default() -> Self {
Self::new()
}
}
impl RecordingListener {
pub fn new() -> Self {
Self {
records: Arc::new(Mutex::new(Vec::new())),
}
}
pub fn events(&self) -> Vec<EventRecord> {
self.records
.lock()
.expect("RecordingListener lock poisoned")
.clone()
}
pub fn len(&self) -> usize {
self.records
.lock()
.expect("RecordingListener lock poisoned")
.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl EventListener for RecordingListener {
fn on_event(&self, record: &EventRecord) {
self.records
.lock()
.expect("RecordingListener lock poisoned")
.push(record.clone());
}
}
impl Default for RecordingListener {
fn default() -> Self {
Self::new()
}
}
impl EventJournal {
pub fn new() -> Self {
Self {
records: Arc::new(Mutex::new(Vec::new())),
sink: EventSink::new(),
}
}
pub fn append(&self, event: AgentEvent) -> EventRecord {
let record = self.sink.emit(event);
self.records
.lock()
.expect("EventJournal lock poisoned")
.push(record.clone());
record
}
pub fn replay_from(&self, from_offset: u64) -> Vec<EventRecord> {
self.records
.lock()
.expect("EventJournal lock poisoned")
.iter()
.filter(|r| r.offset >= from_offset)
.cloned()
.collect()
}
pub fn len(&self) -> usize {
self.records
.lock()
.expect("EventJournal lock poisoned")
.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl Default for EventJournal {
fn default() -> Self {
Self::new()
}
}
impl HarnessRunStatus {
pub fn new(run_id: RunId, component: ComponentId) -> Self {
let now = SystemTime::now();
Self {
root_run_id: run_id.clone(),
run_id,
parent_run_id: None,
thread_id: None,
component,
status: ExecutionStatus::Pending,
current_phase: HarnessPhase::Idle,
model_calls: 0,
tool_calls: 0,
active_model_call: None,
active_tool_calls: Vec::new(),
last_event_id: None,
usage: UsageTotals::new(),
cost: CostTotals::default(),
started_at: now,
updated_at: now,
ended_at: None,
error: None,
metadata: serde_json::Value::Null,
}
}
pub fn mark_running(&mut self, phase: HarnessPhase) {
self.status = ExecutionStatus::Running;
self.current_phase = phase;
self.touch();
}
pub fn mark_completed(&mut self) {
self.status = ExecutionStatus::Completed;
self.current_phase = HarnessPhase::Done;
let now = SystemTime::now();
self.ended_at = Some(now);
self.updated_at = now;
}
pub fn mark_failed(&mut self, error: impl Into<String>) {
self.status = ExecutionStatus::Failed;
self.current_phase = HarnessPhase::Done;
self.error = Some(error.into());
let now = SystemTime::now();
self.ended_at = Some(now);
self.updated_at = now;
}
pub fn mark_interrupted(&mut self) {
self.status = ExecutionStatus::Interrupted;
self.touch();
}
pub fn set_last_event(&mut self, id: crate::harness::ids::EventId) {
self.last_event_id = Some(id);
self.touch();
}
pub fn with_thread(mut self, thread_id: ThreadId) -> Self {
self.thread_id = Some(thread_id);
self
}
pub fn with_parent(mut self, parent_run_id: RunId, root_run_id: RunId) -> Self {
self.parent_run_id = Some(parent_run_id);
self.root_run_id = root_run_id;
self
}
fn touch(&mut self) {
self.updated_at = SystemTime::now();
}
}
#[cfg(test)]
mod test;