1use std::collections::HashMap;
22use std::sync::{Arc, Mutex, OnceLock, RwLock};
23
24use serde::{Deserialize, Serialize};
25
26use crate::tool_annotations::ToolKind;
27
28#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
33pub enum WorkerEvent {
34 WorkerSpawned,
35 WorkerCompleted,
36 WorkerFailed,
37 WorkerCancelled,
38}
39
40impl WorkerEvent {
41 pub fn as_status(self) -> &'static str {
42 match self {
43 Self::WorkerSpawned => "running",
44 Self::WorkerCompleted => "completed",
45 Self::WorkerFailed => "failed",
46 Self::WorkerCancelled => "cancelled",
47 }
48 }
49
50 pub fn as_str(self) -> &'static str {
51 match self {
52 Self::WorkerSpawned => "WorkerSpawned",
53 Self::WorkerCompleted => "WorkerCompleted",
54 Self::WorkerFailed => "WorkerFailed",
55 Self::WorkerCancelled => "WorkerCancelled",
56 }
57 }
58}
59
60#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
62#[serde(rename_all = "snake_case")]
63pub enum ToolCallStatus {
64 Pending,
66 InProgress,
68 Completed,
70 Failed,
72}
73
74#[derive(Clone, Debug, Serialize, Deserialize)]
77#[serde(tag = "type", rename_all = "snake_case")]
78pub enum AgentEvent {
79 AgentMessageChunk {
80 session_id: String,
81 content: String,
82 },
83 AgentThoughtChunk {
84 session_id: String,
85 content: String,
86 },
87 ToolCall {
88 session_id: String,
89 tool_call_id: String,
90 tool_name: String,
91 kind: Option<ToolKind>,
92 status: ToolCallStatus,
93 raw_input: serde_json::Value,
94 },
95 ToolCallUpdate {
96 session_id: String,
97 tool_call_id: String,
98 tool_name: String,
99 status: ToolCallStatus,
100 raw_output: Option<serde_json::Value>,
101 error: Option<String>,
102 },
103 Plan {
104 session_id: String,
105 plan: serde_json::Value,
106 },
107 TurnStart {
108 session_id: String,
109 iteration: usize,
110 },
111 TurnEnd {
112 session_id: String,
113 iteration: usize,
114 turn_info: serde_json::Value,
115 },
116 FeedbackInjected {
117 session_id: String,
118 kind: String,
119 content: String,
120 },
121 BudgetExhausted {
125 session_id: String,
126 max_iterations: usize,
127 },
128 LoopStuck {
132 session_id: String,
133 max_nudges: usize,
134 last_iteration: usize,
135 tail_excerpt: String,
136 },
137 DaemonWatchdogTripped {
142 session_id: String,
143 attempts: usize,
144 elapsed_ms: u64,
145 },
146 SkillActivated {
150 session_id: String,
151 skill_name: String,
152 iteration: usize,
153 reason: String,
154 },
155 SkillDeactivated {
158 session_id: String,
159 skill_name: String,
160 iteration: usize,
161 },
162 SkillScopeTools {
165 session_id: String,
166 skill_name: String,
167 allowed_tools: Vec<String>,
168 },
169 ToolSearchQuery {
177 session_id: String,
178 tool_use_id: String,
179 name: String,
180 query: serde_json::Value,
181 strategy: String,
182 mode: String,
183 },
184 ToolSearchResult {
188 session_id: String,
189 tool_use_id: String,
190 promoted: Vec<String>,
191 strategy: String,
192 mode: String,
193 },
194 TranscriptCompacted {
195 session_id: String,
196 mode: String,
197 strategy: String,
198 archived_messages: usize,
199 estimated_tokens_before: usize,
200 estimated_tokens_after: usize,
201 snapshot_asset_id: Option<String>,
202 },
203}
204
205impl AgentEvent {
206 pub fn session_id(&self) -> &str {
207 match self {
208 Self::AgentMessageChunk { session_id, .. }
209 | Self::AgentThoughtChunk { session_id, .. }
210 | Self::ToolCall { session_id, .. }
211 | Self::ToolCallUpdate { session_id, .. }
212 | Self::Plan { session_id, .. }
213 | Self::TurnStart { session_id, .. }
214 | Self::TurnEnd { session_id, .. }
215 | Self::FeedbackInjected { session_id, .. }
216 | Self::BudgetExhausted { session_id, .. }
217 | Self::LoopStuck { session_id, .. }
218 | Self::DaemonWatchdogTripped { session_id, .. }
219 | Self::SkillActivated { session_id, .. }
220 | Self::SkillDeactivated { session_id, .. }
221 | Self::SkillScopeTools { session_id, .. }
222 | Self::ToolSearchQuery { session_id, .. }
223 | Self::ToolSearchResult { session_id, .. }
224 | Self::TranscriptCompacted { session_id, .. } => session_id,
225 }
226 }
227}
228
229pub trait AgentEventSink: Send + Sync {
232 fn handle_event(&self, event: &AgentEvent);
233}
234
235#[derive(Clone, Debug, Serialize, Deserialize)]
242pub struct PersistedAgentEvent {
243 pub index: u64,
247 pub emitted_at_ms: i64,
251 pub frame_depth: Option<u32>,
255 #[serde(flatten)]
257 pub event: AgentEvent,
258}
259
260pub struct JsonlEventSink {
265 state: Mutex<JsonlEventSinkState>,
266 base_path: std::path::PathBuf,
267}
268
269struct JsonlEventSinkState {
270 writer: std::io::BufWriter<std::fs::File>,
271 index: u64,
272 bytes_written: u64,
273 rotation: u32,
274}
275
276impl JsonlEventSink {
277 pub const ROTATE_BYTES: u64 = 100 * 1024 * 1024;
281
282 pub fn open(base_path: impl Into<std::path::PathBuf>) -> std::io::Result<Arc<Self>> {
286 let base_path = base_path.into();
287 if let Some(parent) = base_path.parent() {
288 std::fs::create_dir_all(parent)?;
289 }
290 let file = std::fs::OpenOptions::new()
291 .create(true)
292 .truncate(true)
293 .write(true)
294 .open(&base_path)?;
295 Ok(Arc::new(Self {
296 state: Mutex::new(JsonlEventSinkState {
297 writer: std::io::BufWriter::new(file),
298 index: 0,
299 bytes_written: 0,
300 rotation: 0,
301 }),
302 base_path,
303 }))
304 }
305
306 pub fn flush(&self) -> std::io::Result<()> {
309 use std::io::Write as _;
310 self.state
311 .lock()
312 .expect("jsonl sink mutex poisoned")
313 .writer
314 .flush()
315 }
316
317 pub fn event_count(&self) -> u64 {
320 self.state.lock().expect("jsonl sink mutex poisoned").index
321 }
322
323 fn rotate_if_needed(&self, state: &mut JsonlEventSinkState) -> std::io::Result<()> {
324 use std::io::Write as _;
325 if state.bytes_written < Self::ROTATE_BYTES {
326 return Ok(());
327 }
328 state.writer.flush()?;
329 state.rotation += 1;
330 let suffix = format!("-{:06}", state.rotation);
331 let rotated = self.base_path.with_file_name({
332 let stem = self
333 .base_path
334 .file_stem()
335 .and_then(|s| s.to_str())
336 .unwrap_or("event_log");
337 let ext = self
338 .base_path
339 .extension()
340 .and_then(|e| e.to_str())
341 .unwrap_or("jsonl");
342 format!("{stem}{suffix}.{ext}")
343 });
344 let file = std::fs::OpenOptions::new()
345 .create(true)
346 .truncate(true)
347 .write(true)
348 .open(&rotated)?;
349 state.writer = std::io::BufWriter::new(file);
350 state.bytes_written = 0;
351 Ok(())
352 }
353}
354
355impl AgentEventSink for JsonlEventSink {
356 fn handle_event(&self, event: &AgentEvent) {
357 use std::io::Write as _;
358 let mut state = self.state.lock().expect("jsonl sink mutex poisoned");
359 let index = state.index;
360 state.index += 1;
361 let emitted_at_ms = std::time::SystemTime::now()
362 .duration_since(std::time::UNIX_EPOCH)
363 .map(|d| d.as_millis() as i64)
364 .unwrap_or(0);
365 let envelope = PersistedAgentEvent {
366 index,
367 emitted_at_ms,
368 frame_depth: None,
369 event: event.clone(),
370 };
371 if let Ok(line) = serde_json::to_string(&envelope) {
372 let _ = state.writer.write_all(line.as_bytes());
377 let _ = state.writer.write_all(b"\n");
378 state.bytes_written += line.len() as u64 + 1;
379 let _ = self.rotate_if_needed(&mut state);
380 }
381 }
382}
383
384impl Drop for JsonlEventSink {
385 fn drop(&mut self) {
386 if let Ok(mut state) = self.state.lock() {
387 use std::io::Write as _;
388 let _ = state.writer.flush();
389 }
390 }
391}
392
393pub struct MultiSink {
395 sinks: Mutex<Vec<Arc<dyn AgentEventSink>>>,
396}
397
398impl MultiSink {
399 pub fn new() -> Self {
400 Self {
401 sinks: Mutex::new(Vec::new()),
402 }
403 }
404 pub fn push(&self, sink: Arc<dyn AgentEventSink>) {
405 self.sinks.lock().expect("sink mutex poisoned").push(sink);
406 }
407 pub fn len(&self) -> usize {
408 self.sinks.lock().expect("sink mutex poisoned").len()
409 }
410 pub fn is_empty(&self) -> bool {
411 self.len() == 0
412 }
413}
414
415impl Default for MultiSink {
416 fn default() -> Self {
417 Self::new()
418 }
419}
420
421impl AgentEventSink for MultiSink {
422 fn handle_event(&self, event: &AgentEvent) {
423 let sinks = self.sinks.lock().expect("sink mutex poisoned").clone();
429 for sink in sinks {
430 sink.handle_event(event);
431 }
432 }
433}
434
435type ExternalSinkRegistry = RwLock<HashMap<String, Vec<Arc<dyn AgentEventSink>>>>;
436
437fn external_sinks() -> &'static ExternalSinkRegistry {
438 static REGISTRY: OnceLock<ExternalSinkRegistry> = OnceLock::new();
439 REGISTRY.get_or_init(|| RwLock::new(HashMap::new()))
440}
441
442pub fn register_sink(session_id: impl Into<String>, sink: Arc<dyn AgentEventSink>) {
443 let session_id = session_id.into();
444 let mut reg = external_sinks().write().expect("sink registry poisoned");
445 reg.entry(session_id).or_default().push(sink);
446}
447
448pub fn clear_session_sinks(session_id: &str) {
452 external_sinks()
453 .write()
454 .expect("sink registry poisoned")
455 .remove(session_id);
456}
457
458pub fn reset_all_sinks() {
459 external_sinks()
460 .write()
461 .expect("sink registry poisoned")
462 .clear();
463 crate::agent_sessions::reset_session_store();
464}
465
466pub fn emit_event(event: &AgentEvent) {
470 let sinks: Vec<Arc<dyn AgentEventSink>> = {
471 let reg = external_sinks().read().expect("sink registry poisoned");
472 reg.get(event.session_id()).cloned().unwrap_or_default()
473 };
474 for sink in sinks {
475 sink.handle_event(event);
476 }
477}
478
479pub fn session_external_sink_count(session_id: &str) -> usize {
480 external_sinks()
481 .read()
482 .expect("sink registry poisoned")
483 .get(session_id)
484 .map(|v| v.len())
485 .unwrap_or(0)
486}
487
488pub fn session_closure_subscriber_count(session_id: &str) -> usize {
489 crate::agent_sessions::subscriber_count(session_id)
490}
491
492#[cfg(test)]
493mod tests {
494 use super::*;
495 use std::sync::atomic::{AtomicUsize, Ordering};
496
497 struct CountingSink(Arc<AtomicUsize>);
498 impl AgentEventSink for CountingSink {
499 fn handle_event(&self, _event: &AgentEvent) {
500 self.0.fetch_add(1, Ordering::SeqCst);
501 }
502 }
503
504 #[test]
505 fn multi_sink_fans_out_in_order() {
506 let multi = MultiSink::new();
507 let a = Arc::new(AtomicUsize::new(0));
508 let b = Arc::new(AtomicUsize::new(0));
509 multi.push(Arc::new(CountingSink(a.clone())));
510 multi.push(Arc::new(CountingSink(b.clone())));
511 let event = AgentEvent::TurnStart {
512 session_id: "s1".into(),
513 iteration: 1,
514 };
515 multi.handle_event(&event);
516 assert_eq!(a.load(Ordering::SeqCst), 1);
517 assert_eq!(b.load(Ordering::SeqCst), 1);
518 }
519
520 #[test]
521 fn session_scoped_sink_routing() {
522 reset_all_sinks();
523 let a = Arc::new(AtomicUsize::new(0));
524 let b = Arc::new(AtomicUsize::new(0));
525 register_sink("session-a", Arc::new(CountingSink(a.clone())));
526 register_sink("session-b", Arc::new(CountingSink(b.clone())));
527 emit_event(&AgentEvent::TurnStart {
528 session_id: "session-a".into(),
529 iteration: 0,
530 });
531 assert_eq!(a.load(Ordering::SeqCst), 1);
532 assert_eq!(b.load(Ordering::SeqCst), 0);
533 emit_event(&AgentEvent::TurnEnd {
534 session_id: "session-b".into(),
535 iteration: 0,
536 turn_info: serde_json::json!({}),
537 });
538 assert_eq!(a.load(Ordering::SeqCst), 1);
539 assert_eq!(b.load(Ordering::SeqCst), 1);
540 clear_session_sinks("session-a");
541 assert_eq!(session_external_sink_count("session-a"), 0);
542 assert_eq!(session_external_sink_count("session-b"), 1);
543 reset_all_sinks();
544 }
545
546 #[test]
547 fn jsonl_sink_writes_monotonic_indices_and_timestamps() {
548 use std::io::{BufRead, BufReader};
549 let dir = std::env::temp_dir().join(format!("harn-event-log-{}", std::process::id()));
550 std::fs::create_dir_all(&dir).unwrap();
551 let path = dir.join("event_log.jsonl");
552 let sink = JsonlEventSink::open(&path).unwrap();
553 for i in 0..5 {
554 sink.handle_event(&AgentEvent::TurnStart {
555 session_id: "s".into(),
556 iteration: i,
557 });
558 }
559 assert_eq!(sink.event_count(), 5);
560 sink.flush().unwrap();
561
562 let file = std::fs::File::open(&path).unwrap();
564 let mut last_idx: i64 = -1;
565 let mut last_ts: i64 = 0;
566 for line in BufReader::new(file).lines() {
567 let line = line.unwrap();
568 let val: serde_json::Value = serde_json::from_str(&line).unwrap();
569 let idx = val["index"].as_i64().unwrap();
570 let ts = val["emitted_at_ms"].as_i64().unwrap();
571 assert_eq!(idx, last_idx + 1, "indices must be contiguous");
572 assert!(ts >= last_ts, "timestamps must be non-decreasing");
573 last_idx = idx;
574 last_ts = ts;
575 assert_eq!(val["type"], "turn_start");
577 }
578 assert_eq!(last_idx, 4);
579 let _ = std::fs::remove_file(&path);
580 }
581
582 #[test]
583 fn tool_call_status_serde() {
584 assert_eq!(
585 serde_json::to_string(&ToolCallStatus::Pending).unwrap(),
586 "\"pending\""
587 );
588 assert_eq!(
589 serde_json::to_string(&ToolCallStatus::InProgress).unwrap(),
590 "\"in_progress\""
591 );
592 assert_eq!(
593 serde_json::to_string(&ToolCallStatus::Completed).unwrap(),
594 "\"completed\""
595 );
596 assert_eq!(
597 serde_json::to_string(&ToolCallStatus::Failed).unwrap(),
598 "\"failed\""
599 );
600 }
601}