1use oxi_sdk::observability::{AuditAction, AuditTrail};
15use oxi_sdk::EventBus as SdkEventBus;
16use serde::{Deserialize, Serialize};
17use std::sync::Arc;
18use uuid::Uuid;
19
20use crate::types::AgentId;
21
22pub type EventBus = SdkEventBus<KernelEvent>;
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31pub enum KernelEvent {
32 AgentCreated {
34 id: AgentId,
36 name: String,
38 },
39 AgentStarted {
41 id: AgentId,
43 },
44 AgentStopped {
46 id: AgentId,
48 },
49 AgentFailed {
51 id: AgentId,
53 error: String,
55 },
56 MessageReceived {
58 from: AgentId,
60 content: String,
62 },
63 SeedCreated {
65 seed_id: uuid::Uuid,
67 },
68 EvaluationComplete {
70 seed_id: uuid::Uuid,
72 passed: bool,
74 },
75 PhaseStarted {
77 session_id: String,
79 phase: oxios_ouroboros::Phase,
81 },
82 PhaseCompleted {
84 session_id: String,
86 phase: oxios_ouroboros::Phase,
88 result_summary: String,
90 },
91 AgentOutput {
93 session_id: String,
95 agent_id: AgentId,
97 output: String,
99 },
100 ApprovalRequested {
102 id: uuid::Uuid,
104 action: String,
106 resource: String,
108 reason: String,
110 },
111 ApprovalResolved {
113 id: uuid::Uuid,
115 approved: bool,
117 },
118 MemoryStored {
120 id: String,
122 memory_type: String,
124 source: String,
126 },
127 MemoryRecalled {
129 query: String,
131 count: usize,
133 },
134 AgentGroupCreated {
136 group_id: uuid::Uuid,
138 agent_count: usize,
140 },
141 AgentGroupMemberCompleted {
143 group_id: uuid::Uuid,
145 agent_id: uuid::Uuid,
147 success: bool,
149 },
150 ProjectCreated {
152 project_id: uuid::Uuid,
154 name: String,
156 source: String,
158 },
159 ProjectActivated {
161 project_id: uuid::Uuid,
163 name: String,
165 },
166 EvolutionStarted {
168 seed_id: uuid::Uuid,
170 new_seed_id: uuid::Uuid,
172 iteration: u32,
174 },
175 EvolutionMaxReached {
177 seed_id: uuid::Uuid,
179 final_score: f64,
181 iterations: u32,
183 },
184
185 ToolExecutionStarted {
190 session_id: String,
192 tool_name: String,
194 tool_call_id: String,
196 tool_args: serde_json::Value,
198 },
199 ToolExecutionFinished {
201 session_id: String,
203 tool_call_id: String,
205 tool_name: String,
207 duration_ms: u64,
209 is_error: bool,
211 output_summary: String,
213 },
214 ToolExecutionProgress {
216 session_id: String,
218 tool_call_id: String,
220 tool_name: String,
222 progress: String,
224 #[serde(default, skip_serializing_if = "Option::is_none")]
228 tab_id: Option<Uuid>,
229 #[serde(default, skip_serializing_if = "Option::is_none")]
235 context: Option<serde_json::Value>,
236 },
237 MemoryRecallUsed {
239 session_id: String,
241 query: String,
243 count: usize,
245 source: String,
247 },
248 TokenUsageUpdate {
250 session_id: String,
252 input_tokens: u64,
254 output_tokens: u64,
256 },
257 ReasoningFragment {
259 session_id: String,
261 content: String,
263 source: String,
265 },
266
267 CalendarEventCreated {
270 uid: String,
272 title: String,
274 start: String,
276 end: String,
278 },
279 CalendarEventUpdated {
281 uid: String,
283 title: String,
285 },
286 CalendarEventDeleted {
288 uid: String,
290 title: String,
292 },
293 EmailSent {
295 subject: String,
297 message_id: String,
299 #[serde(default, skip_serializing_if = "Option::is_none")]
301 template_name: Option<String>,
302 },
303}
304
305pub fn kernel_event_to_audit_action(event: &KernelEvent) -> AuditAction {
307 match event {
308 KernelEvent::AgentCreated { name, .. } => AuditAction::AgentSpawn {
309 task_type: name.clone(),
310 },
311 KernelEvent::AgentStarted { .. } => AuditAction::AgentSpawn {
312 task_type: "started".to_string(),
313 },
314 KernelEvent::AgentStopped { .. } => AuditAction::AgentExit {
315 reason: "stopped".to_string(),
316 },
317 KernelEvent::AgentFailed { error, .. } => AuditAction::AgentExit {
318 reason: error.clone(),
319 },
320 KernelEvent::MessageReceived { content, .. } => AuditAction::Other {
321 detail: format!("message: {content}"),
322 },
323 KernelEvent::SeedCreated { seed_id, .. } => AuditAction::Other {
324 detail: format!("seed_created:{seed_id}"),
325 },
326 KernelEvent::EvaluationComplete { seed_id, passed } => AuditAction::Other {
327 detail: format!("evaluation:{seed_id}:{passed}"),
328 },
329 KernelEvent::PhaseStarted { session_id, phase } => AuditAction::Other {
330 detail: format!("phase_started:{session_id}:{phase}"),
331 },
332 KernelEvent::PhaseCompleted {
333 session_id,
334 phase,
335 result_summary,
336 } => AuditAction::Other {
337 detail: format!("phase_completed:{session_id}:{phase}:{result_summary}"),
338 },
339 KernelEvent::AgentOutput { output, .. } => AuditAction::Other {
340 detail: format!("agent_output:{output}"),
341 },
342 KernelEvent::ApprovalRequested {
343 id,
344 action,
345 resource,
346 reason: _,
347 } => AuditAction::Other {
348 detail: format!("approval_requested:{id}:{action}:{resource}"),
349 },
350 KernelEvent::ApprovalResolved { id, approved } => AuditAction::Other {
351 detail: format!("approval_resolved:{id}:{approved}"),
352 },
353 KernelEvent::MemoryStored {
354 id, memory_type, ..
355 } => AuditAction::MemoryWrite {
356 entry_id: format!("{id}:{memory_type}"),
357 },
358 KernelEvent::MemoryRecalled { query, count } => AuditAction::MemoryRead {
359 entry_id: format!("query:{query}:{count}results"),
360 },
361 KernelEvent::AgentGroupCreated {
362 group_id,
363 agent_count,
364 } => AuditAction::Other {
365 detail: format!("group_created:{group_id}:{agent_count}agents"),
366 },
367 KernelEvent::AgentGroupMemberCompleted {
368 group_id,
369 agent_id,
370 success,
371 } => AuditAction::Other {
372 detail: format!("group_member_completed:{group_id}:{agent_id}:{success}"),
373 },
374 KernelEvent::EvolutionStarted {
375 seed_id,
376 new_seed_id,
377 iteration,
378 } => AuditAction::Other {
379 detail: format!("evolution:{seed_id}->{new_seed_id}:iter{iteration}"),
380 },
381 KernelEvent::EvolutionMaxReached {
382 seed_id,
383 final_score,
384 iterations,
385 } => AuditAction::Other {
386 detail: format!("evolution_max:{seed_id}:score={final_score}:iters={iterations}"),
387 },
388 KernelEvent::ProjectCreated {
389 project_id: _,
390 name,
391 source,
392 } => AuditAction::Other {
393 detail: format!("project_created:{name}:{source}"),
394 },
395 KernelEvent::ProjectActivated {
396 project_id: _,
397 name,
398 } => AuditAction::Other {
399 detail: format!("project_activated:{name}"),
400 },
401 KernelEvent::ToolExecutionStarted { tool_name, .. } => AuditAction::Other {
403 detail: format!("tool_started:{tool_name}"),
404 },
405 KernelEvent::ToolExecutionFinished {
406 tool_name,
407 is_error,
408 ..
409 } => AuditAction::Other {
410 detail: format!(
411 "tool_finished:{tool_name}:{}",
412 if *is_error { "error" } else { "ok" }
413 ),
414 },
415 KernelEvent::ToolExecutionProgress {
416 tool_name,
417 tab_id,
418 context,
419 ..
420 } => AuditAction::Other {
421 detail: {
422 let mut d = format!("tool_progress:{tool_name}");
423 if let Some(id) = tab_id {
424 d.push_str(&format!(":tab={id}"));
425 }
426 if let Some(ctx) = context
427 .as_ref()
428 .and_then(|c| c.get("kind"))
429 .and_then(|k| k.as_str())
430 {
431 d.push_str(&format!(":{ctx}"));
432 }
433 d
434 },
435 },
436 KernelEvent::MemoryRecallUsed { query, count, .. } => AuditAction::MemoryRead {
437 entry_id: format!("recall:{query}:{count}results"),
438 },
439 KernelEvent::TokenUsageUpdate {
440 input_tokens,
441 output_tokens,
442 ..
443 } => AuditAction::Other {
444 detail: format!("tokens:in={input_tokens}:out={output_tokens}"),
445 },
446 KernelEvent::ReasoningFragment { source, .. } => AuditAction::Other {
447 detail: format!("reasoning:{source}"),
448 },
449 KernelEvent::CalendarEventCreated { uid, title, .. } => AuditAction::Other {
450 detail: format!("calendar:created:{uid}:{title}"),
451 },
452 KernelEvent::CalendarEventUpdated { uid, title } => AuditAction::Other {
453 detail: format!("calendar:updated:{uid}:{title}"),
454 },
455 KernelEvent::CalendarEventDeleted { uid, title } => AuditAction::Other {
456 detail: format!("calendar:deleted:{uid}:{title}"),
457 },
458 KernelEvent::EmailSent {
459 subject,
460 message_id,
461 template_name,
462 } => AuditAction::Other {
463 detail: format!("email:sent:{subject} (msg={message_id}, tpl={template_name:?})"),
464 },
465 }
466}
467
468fn extract_agent_id(event: &KernelEvent) -> String {
470 match event {
471 KernelEvent::AgentCreated { id, .. } => id.to_string(),
472 KernelEvent::AgentStarted { id, .. } => id.to_string(),
473 KernelEvent::AgentStopped { id, .. } => id.to_string(),
474 KernelEvent::AgentFailed { id, .. } => id.to_string(),
475 KernelEvent::MessageReceived { from, .. } => from.to_string(),
476 KernelEvent::AgentOutput { agent_id, .. } => agent_id.to_string(),
477 KernelEvent::AgentGroupMemberCompleted { agent_id, .. } => agent_id.to_string(),
478 KernelEvent::ProjectActivated { project_id, .. } => format!("project:{project_id}"),
479 KernelEvent::ToolExecutionStarted { session_id, .. } => format!("session:{session_id}"),
481 KernelEvent::ToolExecutionFinished { session_id, .. } => format!("session:{session_id}"),
482 KernelEvent::ToolExecutionProgress { session_id, .. } => format!("session:{session_id}"),
483 KernelEvent::MemoryRecallUsed { session_id, .. } => format!("session:{session_id}"),
484 KernelEvent::TokenUsageUpdate { session_id, .. } => format!("session:{session_id}"),
485 KernelEvent::ReasoningFragment { session_id, .. } => format!("session:{session_id}"),
486 _ => "system".to_string(),
487 }
488}
489
490pub fn attach_audit_trail(bus: &EventBus, audit: Arc<AuditTrail>) {
496 let mut rx = bus.subscribe();
497 tokio::spawn(async move {
498 loop {
499 match rx.recv().await {
500 Ok(event) => {
501 let actor = extract_agent_id(&event);
502 let action = kernel_event_to_audit_action(&event);
503 let resource = format!("{event:?}");
504 audit.append(actor, action, resource);
505 }
506 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
507 tracing::warn!(
508 skipped = n,
509 "Audit trail subscriber lagged, skipping events"
510 );
511 continue;
512 }
513 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
514 tracing::info!("Audit trail event bus closed, exiting");
515 break;
516 }
517 }
518 }
519 });
520}
521
522#[cfg(test)]
523mod tests {
524 use super::*;
525
526 fn sample_event(name: &str) -> KernelEvent {
527 KernelEvent::AgentCreated {
528 id: AgentId::new_v4(),
529 name: name.to_string(),
530 }
531 }
532
533 #[test]
534 fn test_event_bus_uses_sdk() {
535 let bus: EventBus = EventBus::new(256);
536 assert!(format!("{:?}", bus).contains("EventBus"));
537 }
538
539 #[tokio::test]
540 async fn test_publish_no_subscribers_ok() {
541 let bus = EventBus::new(16);
542 let result = bus.publish(sample_event("orphan"));
543 assert!(result.is_ok());
544 }
545
546 #[tokio::test]
547 async fn test_single_subscriber_receives_event() {
548 let bus = EventBus::new(16);
549 let mut rx = bus.subscribe();
550
551 let event = sample_event("test-agent");
552 bus.publish(event.clone()).unwrap();
553
554 let received = rx.try_recv().expect("should receive event");
555 match received {
556 KernelEvent::AgentCreated { name, .. } => assert_eq!(name, "test-agent"),
557 _ => panic!("wrong event type"),
558 }
559 }
560
561 #[tokio::test]
562 async fn test_multiple_subscribers_receive_events() {
563 let bus = EventBus::new(16);
564 let mut rx1 = bus.subscribe();
565 let mut rx2 = bus.subscribe();
566
567 let event = sample_event("multi");
568 bus.publish(event.clone()).unwrap();
569
570 let r1 = rx1.try_recv().expect("rx1 should receive event");
571 let r2 = rx2.try_recv().expect("rx2 should receive event");
572
573 assert!(matches!(r1, KernelEvent::AgentCreated { .. }));
574 assert!(matches!(r2, KernelEvent::AgentCreated { .. }));
575 }
576
577 #[tokio::test]
578 async fn test_kernel_event_to_audit_action() {
579 let event = KernelEvent::AgentFailed {
580 id: AgentId::new_v4(),
581 error: "boom".to_string(),
582 };
583 let action = kernel_event_to_audit_action(&event);
584 match action {
585 AuditAction::AgentExit { reason } => assert_eq!(reason, "boom"),
586 other => panic!("expected AgentExit, got {other:?}"),
587 }
588 }
589
590 #[test]
596 fn test_rfc015_event_round_trip_json() {
597 let cases: Vec<KernelEvent> = vec![
598 KernelEvent::ToolExecutionStarted {
599 session_id: "s1".into(),
600 tool_name: "read_file".into(),
601 tool_call_id: "call_1".into(),
602 tool_args: serde_json::json!({"path": "/src/main.rs"}),
603 },
604 KernelEvent::ToolExecutionFinished {
605 session_id: "s1".into(),
606 tool_call_id: "call_1".into(),
607 tool_name: "read_file".into(),
608 duration_ms: 234,
609 is_error: false,
610 output_summary: "fn main() {}".into(),
611 },
612 KernelEvent::ToolExecutionProgress {
613 session_id: "s1".into(),
614 tool_call_id: "call_1".into(),
615 tool_name: "read_file".into(),
616 progress: "reading line 42/100".into(),
617 tab_id: None,
618 context: None,
619 },
620 KernelEvent::MemoryRecallUsed {
621 session_id: "s1".into(),
622 query: "rust errors".into(),
623 count: 3,
624 source: "warm".into(),
625 },
626 KernelEvent::TokenUsageUpdate {
627 session_id: "s1".into(),
628 input_tokens: 1234,
629 output_tokens: 567,
630 },
631 KernelEvent::ReasoningFragment {
632 session_id: "s1".into(),
633 content: "compaction done".into(),
634 source: "compaction".into(),
635 },
636 ];
637 for event in cases {
638 let json = serde_json::to_string(&event).expect("serialize");
639 let back: KernelEvent = serde_json::from_str(&json).expect("deserialize");
640 let json2 = serde_json::to_string(&back).expect("serialize round-trip");
641 assert_eq!(json, json2, "round-trip should be stable");
642 }
643 }
644
645 #[test]
648 fn test_tool_execution_progress_serde_round_trip() {
649 let event = KernelEvent::ToolExecutionProgress {
650 session_id: "s-abc".into(),
651 tool_call_id: "call_42".into(),
652 tool_name: "browse".into(),
653 progress: "loading https://example.com".into(),
654 tab_id: Some(Uuid::new_v4()),
655 context: None,
656 };
657 let json = serde_json::to_string(&event).expect("serialize");
658 let back: KernelEvent = serde_json::from_str(&json).expect("deserialize");
659 match back {
660 KernelEvent::ToolExecutionProgress {
661 ref session_id,
662 ref tool_call_id,
663 ref tool_name,
664 ref progress,
665 tab_id,
666 ..
667 } => {
668 assert_eq!(session_id, "s-abc");
669 assert_eq!(tool_call_id, "call_42");
670 assert_eq!(tool_name, "browse");
671 assert_eq!(progress, "loading https://example.com");
672 assert!(tab_id.is_some(), "tab_id should round-trip when present");
673 }
674 other => panic!("expected ToolExecutionProgress, got {other:?}"),
675 }
676 }
677
678 #[test]
684 fn test_tool_execution_progress_audit_action() {
685 let with_tab = KernelEvent::ToolExecutionProgress {
686 session_id: "s1".into(),
687 tool_call_id: "c1".into(),
688 tool_name: "browse".into(),
689 progress: "navigating".into(),
690 tab_id: Some(Uuid::new_v4()),
691 context: None,
692 };
693 match kernel_event_to_audit_action(&with_tab) {
694 AuditAction::Other { detail } => {
695 assert!(detail.contains("tool_progress"), "detail: {detail}");
696 assert!(detail.contains("browse"), "detail: {detail}");
697 assert!(
698 detail.contains(":tab="),
699 "detail should include tab id: {detail}"
700 );
701 }
702 other => panic!("expected Other, got {other:?}"),
703 }
704 let without_tab = KernelEvent::ToolExecutionProgress {
705 session_id: "s1".into(),
706 tool_call_id: "c1".into(),
707 tool_name: "browse".into(),
708 progress: "navigating".into(),
709 tab_id: None,
710 context: None,
711 };
712 match kernel_event_to_audit_action(&without_tab) {
713 AuditAction::Other { detail } => {
714 assert_eq!(detail, "tool_progress:browse");
715 }
716 other => panic!("expected Other, got {other:?}"),
717 }
718 }
719
720 #[test]
724 fn test_tool_execution_progress_tab_id_optional_in_serde() {
725 let legacy_json = r#"{
728 "ToolExecutionProgress": {
729 "session_id": "s-old",
730 "tool_call_id": "call_legacy",
731 "tool_name": "browse",
732 "progress": "step 1"
733 }
734 }"#;
735 let event: KernelEvent = serde_json::from_str(legacy_json).expect("deserialize legacy");
736 match &event {
737 KernelEvent::ToolExecutionProgress {
738 session_id,
739 tool_call_id,
740 tool_name,
741 progress,
742 tab_id,
743 ..
744 } => {
745 assert_eq!(session_id, "s-old");
746 assert_eq!(tool_call_id, "call_legacy");
747 assert_eq!(tool_name, "browse");
748 assert_eq!(progress, "step 1");
749 assert!(tab_id.is_none(), "missing field should default to None");
750 }
751 other => panic!("expected ToolExecutionProgress, got {other:?}"),
752 }
753 let json = serde_json::to_string(&event).expect("serialize");
756 assert!(
757 !json.contains("tab_id"),
758 "tab_id should be omitted when None: {json}"
759 );
760 }
761
762 #[test]
766 fn test_rfc015_extract_agent_id() {
767 let event = KernelEvent::ToolExecutionStarted {
768 session_id: "abc-123".into(),
769 tool_name: "bash".into(),
770 tool_call_id: "c1".into(),
771 tool_args: serde_json::Value::Null,
772 };
773 let action = kernel_event_to_audit_action(&event);
776 match action {
777 AuditAction::Other { detail } => {
778 assert!(
779 detail.contains("bash"),
780 "tool name in audit detail: {detail}"
781 );
782 }
783 other => panic!("expected Other, got {other:?}"),
784 }
785 }
786}