use std::collections::HashMap;
use std::sync::{Arc, Mutex, OnceLock, RwLock};
use serde::{Deserialize, Serialize};
use crate::event_log::{AnyEventLog, EventLog, LogEvent as EventLogRecord, Topic};
use crate::tool_annotations::ToolKind;
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
pub enum WorkerEvent {
WorkerSpawned,
WorkerCompleted,
WorkerFailed,
WorkerCancelled,
}
impl WorkerEvent {
pub fn as_status(self) -> &'static str {
match self {
Self::WorkerSpawned => "running",
Self::WorkerCompleted => "completed",
Self::WorkerFailed => "failed",
Self::WorkerCancelled => "cancelled",
}
}
pub fn as_str(self) -> &'static str {
match self {
Self::WorkerSpawned => "WorkerSpawned",
Self::WorkerCompleted => "WorkerCompleted",
Self::WorkerFailed => "WorkerFailed",
Self::WorkerCancelled => "WorkerCancelled",
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ToolCallStatus {
Pending,
InProgress,
Completed,
Failed,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum AgentEvent {
AgentMessageChunk {
session_id: String,
content: String,
},
AgentThoughtChunk {
session_id: String,
content: String,
},
ToolCall {
session_id: String,
tool_call_id: String,
tool_name: String,
kind: Option<ToolKind>,
status: ToolCallStatus,
raw_input: serde_json::Value,
},
ToolCallUpdate {
session_id: String,
tool_call_id: String,
tool_name: String,
status: ToolCallStatus,
raw_output: Option<serde_json::Value>,
error: Option<String>,
},
Plan {
session_id: String,
plan: serde_json::Value,
},
TurnStart {
session_id: String,
iteration: usize,
},
TurnEnd {
session_id: String,
iteration: usize,
turn_info: serde_json::Value,
},
FeedbackInjected {
session_id: String,
kind: String,
content: String,
},
BudgetExhausted {
session_id: String,
max_iterations: usize,
},
LoopStuck {
session_id: String,
max_nudges: usize,
last_iteration: usize,
tail_excerpt: String,
},
DaemonWatchdogTripped {
session_id: String,
attempts: usize,
elapsed_ms: u64,
},
SkillActivated {
session_id: String,
skill_name: String,
iteration: usize,
reason: String,
},
SkillDeactivated {
session_id: String,
skill_name: String,
iteration: usize,
},
SkillScopeTools {
session_id: String,
skill_name: String,
allowed_tools: Vec<String>,
},
ToolSearchQuery {
session_id: String,
tool_use_id: String,
name: String,
query: serde_json::Value,
strategy: String,
mode: String,
},
ToolSearchResult {
session_id: String,
tool_use_id: String,
promoted: Vec<String>,
strategy: String,
mode: String,
},
TranscriptCompacted {
session_id: String,
mode: String,
strategy: String,
archived_messages: usize,
estimated_tokens_before: usize,
estimated_tokens_after: usize,
snapshot_asset_id: Option<String>,
},
}
impl AgentEvent {
pub fn session_id(&self) -> &str {
match self {
Self::AgentMessageChunk { session_id, .. }
| Self::AgentThoughtChunk { session_id, .. }
| Self::ToolCall { session_id, .. }
| Self::ToolCallUpdate { session_id, .. }
| Self::Plan { session_id, .. }
| Self::TurnStart { session_id, .. }
| Self::TurnEnd { session_id, .. }
| Self::FeedbackInjected { session_id, .. }
| Self::BudgetExhausted { session_id, .. }
| Self::LoopStuck { session_id, .. }
| Self::DaemonWatchdogTripped { session_id, .. }
| Self::SkillActivated { session_id, .. }
| Self::SkillDeactivated { session_id, .. }
| Self::SkillScopeTools { session_id, .. }
| Self::ToolSearchQuery { session_id, .. }
| Self::ToolSearchResult { session_id, .. }
| Self::TranscriptCompacted { session_id, .. } => session_id,
}
}
}
pub trait AgentEventSink: Send + Sync {
fn handle_event(&self, event: &AgentEvent);
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PersistedAgentEvent {
pub index: u64,
pub emitted_at_ms: i64,
pub frame_depth: Option<u32>,
#[serde(flatten)]
pub event: AgentEvent,
}
pub struct JsonlEventSink {
state: Mutex<JsonlEventSinkState>,
base_path: std::path::PathBuf,
}
struct JsonlEventSinkState {
writer: std::io::BufWriter<std::fs::File>,
index: u64,
bytes_written: u64,
rotation: u32,
}
impl JsonlEventSink {
pub const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
pub fn open(base_path: impl Into<std::path::PathBuf>) -> std::io::Result<Arc<Self>> {
let base_path = base_path.into();
if let Some(parent) = base_path.parent() {
std::fs::create_dir_all(parent)?;
}
let file = std::fs::OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&base_path)?;
Ok(Arc::new(Self {
state: Mutex::new(JsonlEventSinkState {
writer: std::io::BufWriter::new(file),
index: 0,
bytes_written: 0,
rotation: 0,
}),
base_path,
}))
}
pub fn flush(&self) -> std::io::Result<()> {
use std::io::Write as _;
self.state
.lock()
.expect("jsonl sink mutex poisoned")
.writer
.flush()
}
pub fn event_count(&self) -> u64 {
self.state.lock().expect("jsonl sink mutex poisoned").index
}
fn rotate_if_needed(&self, state: &mut JsonlEventSinkState) -> std::io::Result<()> {
use std::io::Write as _;
if state.bytes_written < Self::ROTATE_BYTES {
return Ok(());
}
state.writer.flush()?;
state.rotation += 1;
let suffix = format!("-{:06}", state.rotation);
let rotated = self.base_path.with_file_name({
let stem = self
.base_path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("event_log");
let ext = self
.base_path
.extension()
.and_then(|e| e.to_str())
.unwrap_or("jsonl");
format!("{stem}{suffix}.{ext}")
});
let file = std::fs::OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&rotated)?;
state.writer = std::io::BufWriter::new(file);
state.bytes_written = 0;
Ok(())
}
}
pub struct EventLogSink {
log: Arc<AnyEventLog>,
topic: Topic,
session_id: String,
}
impl EventLogSink {
pub fn new(log: Arc<AnyEventLog>, session_id: impl Into<String>) -> Arc<Self> {
let session_id = session_id.into();
let topic = Topic::new(format!(
"observability.agent_events.{}",
crate::event_log::sanitize_topic_component(&session_id)
))
.expect("session id should sanitize to a valid topic");
Arc::new(Self {
log,
topic,
session_id,
})
}
}
impl AgentEventSink for JsonlEventSink {
fn handle_event(&self, event: &AgentEvent) {
use std::io::Write as _;
let mut state = self.state.lock().expect("jsonl sink mutex poisoned");
let index = state.index;
state.index += 1;
let emitted_at_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0);
let envelope = PersistedAgentEvent {
index,
emitted_at_ms,
frame_depth: None,
event: event.clone(),
};
if let Ok(line) = serde_json::to_string(&envelope) {
let _ = state.writer.write_all(line.as_bytes());
let _ = state.writer.write_all(b"\n");
state.bytes_written += line.len() as u64 + 1;
let _ = self.rotate_if_needed(&mut state);
}
}
}
impl AgentEventSink for EventLogSink {
fn handle_event(&self, event: &AgentEvent) {
let event_json = match serde_json::to_value(event) {
Ok(value) => value,
Err(_) => return,
};
let event_kind = event_json
.get("type")
.and_then(|value| value.as_str())
.unwrap_or("agent_event")
.to_string();
let payload = serde_json::json!({
"index_hint": now_ms(),
"session_id": self.session_id,
"event": event_json,
});
let mut headers = std::collections::BTreeMap::new();
headers.insert("session_id".to_string(), self.session_id.clone());
let log = self.log.clone();
let topic = self.topic.clone();
let record = EventLogRecord::new(event_kind, payload).with_headers(headers);
if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.spawn(async move {
let _ = log.append(&topic, record).await;
});
} else {
let _ = futures::executor::block_on(log.append(&topic, record));
}
}
}
impl Drop for JsonlEventSink {
fn drop(&mut self) {
if let Ok(mut state) = self.state.lock() {
use std::io::Write as _;
let _ = state.writer.flush();
}
}
}
pub struct MultiSink {
sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
}
impl MultiSink {
pub fn new() -> Self {
Self {
sinks: Mutex::new(Vec::new()),
}
}
pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
self.sinks.lock().expect("sink mutex poisoned").push(sink);
}
pub fn len(&self) -> usize {
self.sinks.lock().expect("sink mutex poisoned").len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl Default for MultiSink {
fn default() -> Self {
Self::new()
}
}
impl AgentEventSink for MultiSink {
fn handle_event(&self, event: &AgentEvent) {
let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
for sink in sinks {
sink.handle_event(event);
}
}
}
type ExternalSinkRegistry = RwLock<HashMap<String, Vec<Arc<dyn AgentEventSink>>>>;
fn external_sinks() -> &'static ExternalSinkRegistry {
static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
}
pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
let session_id = session_id.into();
let mut reg = external_sinks().write().expect("sink registry poisoned");
reg.entry(session_id).or_default().push(sink);
}
pub fn clear_session_sinks(session_id: &str) {
external_sinks()
.write()
.expect("sink registry poisoned")
.remove(session_id);
}
pub fn reset_all_sinks() {
external_sinks()
.write()
.expect("sink registry poisoned")
.clear();
crate::agent_sessions::reset_session_store();
}
pub fn emit_event(event: &AgentEvent) {
let sinks: Vec<Arc<dyn AgentEventSink>> = {
let reg = external_sinks().read().expect("sink registry poisoned");
reg.get(event.session_id()).cloned().unwrap_or_default()
};
for sink in sinks {
sink.handle_event(event);
}
}
fn now_ms() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|duration| duration.as_millis() as i64)
.unwrap_or(0)
}
pub fn session_external_sink_count(session_id: &str) -> usize {
external_sinks()
.read()
.expect("sink registry poisoned")
.get(session_id)
.map(|v| v.len())
.unwrap_or(0)
}
pub fn session_closure_subscriber_count(session_id: &str) -> usize {
crate::agent_sessions::subscriber_count(session_id)
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
struct CountingSink(Arc<AtomicUsize>);
impl AgentEventSink for CountingSink {
fn handle_event(&self, _event: &AgentEvent) {
self.0.fetch_add(1, Ordering::SeqCst);
}
}
#[test]
fn multi_sink_fans_out_in_order() {
let multi = MultiSink::new();
let a = Arc::new(AtomicUsize::new(0));
let b = Arc::new(AtomicUsize::new(0));
multi.push(Arc::new(CountingSink(a.clone())));
multi.push(Arc::new(CountingSink(b.clone())));
let event = AgentEvent::TurnStart {
session_id: "s1".into(),
iteration: 1,
};
multi.handle_event(&event);
assert_eq!(a.load(Ordering::SeqCst), 1);
assert_eq!(b.load(Ordering::SeqCst), 1);
}
#[test]
fn session_scoped_sink_routing() {
reset_all_sinks();
let a = Arc::new(AtomicUsize::new(0));
let b = Arc::new(AtomicUsize::new(0));
register_sink("session-a", Arc::new(CountingSink(a.clone())));
register_sink("session-b", Arc::new(CountingSink(b.clone())));
emit_event(&AgentEvent::TurnStart {
session_id: "session-a".into(),
iteration: 0,
});
assert_eq!(a.load(Ordering::SeqCst), 1);
assert_eq!(b.load(Ordering::SeqCst), 0);
emit_event(&AgentEvent::TurnEnd {
session_id: "session-b".into(),
iteration: 0,
turn_info: serde_json::json!({}),
});
assert_eq!(a.load(Ordering::SeqCst), 1);
assert_eq!(b.load(Ordering::SeqCst), 1);
clear_session_sinks("session-a");
assert_eq!(session_external_sink_count("session-a"), 0);
assert_eq!(session_external_sink_count("session-b"), 1);
reset_all_sinks();
}
#[test]
fn jsonl_sink_writes_monotonic_indices_and_timestamps() {
use std::io::{BufRead, BufReader};
let dir = std::env::temp_dir().join(format!("harn-event-log-{}", std::process::id()));
std::fs::create_dir_all(&dir).unwrap();
let path = dir.join("event_log.jsonl");
let sink = JsonlEventSink::open(&path).unwrap();
for i in 0..5 {
sink.handle_event(&AgentEvent::TurnStart {
session_id: "s".into(),
iteration: i,
});
}
assert_eq!(sink.event_count(), 5);
sink.flush().unwrap();
let file = std::fs::File::open(&path).unwrap();
let mut last_idx: i64 = -1;
let mut last_ts: i64 = 0;
for line in BufReader::new(file).lines() {
let line = line.unwrap();
let val: serde_json::Value = serde_json::from_str(&line).unwrap();
let idx = val["index"].as_i64().unwrap();
let ts = val["emitted_at_ms"].as_i64().unwrap();
assert_eq!(idx, last_idx + 1, "indices must be contiguous");
assert!(ts >= last_ts, "timestamps must be non-decreasing");
last_idx = idx;
last_ts = ts;
assert_eq!(val["type"], "turn_start");
}
assert_eq!(last_idx, 4);
let _ = std::fs::remove_file(&path);
}
#[test]
fn tool_call_status_serde() {
assert_eq!(
serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
"\"pending\""
);
assert_eq!(
serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
"\"in_progress\""
);
assert_eq!(
serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
"\"completed\""
);
assert_eq!(
serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
"\"failed\""
);
}
}