1use oxi_sdk::EventBus as SdkEventBus;
15use oxi_sdk::observability::{AuditAction, AuditTrail};
16use serde::{Deserialize, Serialize};
17use std::sync::Arc;
18use uuid::Uuid;
19
20use crate::types::AgentId;
21
22pub type EventBus = SdkEventBus<KernelEvent>;
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31pub enum KernelEvent {
32 AgentCreated {
34 id: AgentId,
36 name: String,
38 },
39 AgentStarted {
41 id: AgentId,
43 },
44 AgentStopped {
51 id: AgentId,
53 #[serde(default)]
57 success: bool,
58 },
59 AgentFailed {
61 id: AgentId,
63 error: String,
65 },
66 MessageReceived {
68 from: AgentId,
70 content: String,
72 },
73 SeedCreated {
75 seed_id: uuid::Uuid,
77 },
78 EvaluationComplete {
80 seed_id: uuid::Uuid,
82 passed: bool,
84 },
85 PhaseStarted {
87 session_id: String,
89 phase: oxios_ouroboros::Phase,
91 },
92 PhaseCompleted {
94 session_id: String,
96 phase: oxios_ouroboros::Phase,
98 result_summary: String,
100 },
101 AgentOutput {
103 session_id: String,
105 agent_id: AgentId,
107 output: String,
109 },
110 ApprovalRequested {
112 id: uuid::Uuid,
114 tool_name: String,
116 action: String,
118 resource: String,
120 reason: String,
122 session_id: Option<String>,
124 },
125 ApprovalResolved {
127 id: uuid::Uuid,
129 approved: bool,
131 },
132 MemoryStored {
134 id: String,
136 memory_type: String,
138 source: String,
140 },
141 MemoryRecalled {
143 query: String,
145 count: usize,
147 },
148 AgentGroupCreated {
150 group_id: uuid::Uuid,
152 agent_count: usize,
154 },
155 AgentGroupMemberCompleted {
157 group_id: uuid::Uuid,
159 agent_id: uuid::Uuid,
161 success: bool,
163 },
164 ProjectCreated {
166 project_id: uuid::Uuid,
168 name: String,
170 source: String,
172 },
173 ProjectActivated {
175 project_id: uuid::Uuid,
177 name: String,
179 },
180 EvolutionStarted {
182 seed_id: uuid::Uuid,
184 new_seed_id: uuid::Uuid,
186 iteration: u32,
188 },
189 EvolutionMaxReached {
191 seed_id: uuid::Uuid,
193 final_score: f64,
195 iterations: u32,
197 },
198
199 ToolExecutionStarted {
204 session_id: String,
206 tool_name: String,
208 tool_call_id: String,
210 tool_args: serde_json::Value,
212 #[serde(default, skip_serializing_if = "Option::is_none")]
215 context: Option<serde_json::Value>,
216 },
217 ToolExecutionFinished {
219 session_id: String,
221 tool_call_id: String,
223 tool_name: String,
225 duration_ms: u64,
227 is_error: bool,
229 output_summary: String,
231 },
232 ToolExecutionProgress {
234 session_id: String,
236 tool_call_id: String,
238 tool_name: String,
240 progress: String,
242 #[serde(default, skip_serializing_if = "Option::is_none")]
246 tab_id: Option<Uuid>,
247 #[serde(default, skip_serializing_if = "Option::is_none")]
253 context: Option<serde_json::Value>,
254 },
255 MemoryRecallUsed {
257 session_id: String,
259 query: String,
261 count: usize,
263 source: String,
265 },
266 TokenUsageUpdate {
268 session_id: String,
270 input_tokens: u64,
272 output_tokens: u64,
274 },
275 ReasoningFragment {
277 session_id: String,
279 content: String,
281 source: String,
283 },
284
285 CalendarEventCreated {
288 uid: String,
290 title: String,
292 start: String,
294 end: String,
296 },
297 CalendarEventUpdated {
299 uid: String,
301 title: String,
303 },
304 CalendarEventDeleted {
306 uid: String,
308 title: String,
310 },
311 EmailSent {
313 subject: String,
315 message_id: String,
317 #[serde(default, skip_serializing_if = "Option::is_none")]
319 template_name: Option<String>,
320 },
321
322 KnowledgePersisted {
325 session_id: String,
326 message_index: usize,
327 path: String,
328 source: String, },
330 KnowledgeRemoved {
332 session_id: String,
333 message_index: usize,
334 },
335 AskUserRequest {
339 id: String,
342 question: String,
344 options: Vec<String>,
346 },
347}
348
349pub fn kernel_event_to_audit_action(event: &KernelEvent) -> AuditAction {
351 match event {
352 KernelEvent::AgentCreated { name, .. } => AuditAction::AgentSpawn {
353 task_type: name.clone(),
354 },
355 KernelEvent::AgentStarted { .. } => AuditAction::AgentSpawn {
356 task_type: "started".to_string(),
357 },
358 KernelEvent::AgentStopped { success, .. } => AuditAction::AgentExit {
359 reason: if *success {
360 "completed".to_string()
361 } else {
362 "stopped".to_string()
363 },
364 },
365 KernelEvent::AgentFailed { error, .. } => AuditAction::AgentExit {
366 reason: error.clone(),
367 },
368 KernelEvent::MessageReceived { content, .. } => AuditAction::Other {
369 detail: format!("message: {content}"),
370 },
371 KernelEvent::SeedCreated { seed_id, .. } => AuditAction::Other {
372 detail: format!("seed_created:{seed_id}"),
373 },
374 KernelEvent::EvaluationComplete { seed_id, passed } => AuditAction::Other {
375 detail: format!("evaluation:{seed_id}:{passed}"),
376 },
377 KernelEvent::PhaseStarted { session_id, phase } => AuditAction::Other {
378 detail: format!("phase_started:{session_id}:{phase}"),
379 },
380 KernelEvent::PhaseCompleted {
381 session_id,
382 phase,
383 result_summary,
384 } => AuditAction::Other {
385 detail: format!("phase_completed:{session_id}:{phase}:{result_summary}"),
386 },
387 KernelEvent::AgentOutput { output, .. } => AuditAction::Other {
388 detail: format!("agent_output:{output}"),
389 },
390 KernelEvent::ApprovalRequested {
391 id,
392 action,
393 resource,
394 ..
395 } => AuditAction::Other {
396 detail: format!("approval_requested:{id}:{action}:{resource}"),
397 },
398 KernelEvent::ApprovalResolved { id, approved } => AuditAction::Other {
399 detail: format!("approval_resolved:{id}:{approved}"),
400 },
401 KernelEvent::MemoryStored {
402 id, memory_type, ..
403 } => AuditAction::MemoryWrite {
404 entry_id: format!("{id}:{memory_type}"),
405 },
406 KernelEvent::MemoryRecalled { query, count } => AuditAction::MemoryRead {
407 entry_id: format!("query:{query}:{count}results"),
408 },
409 KernelEvent::AgentGroupCreated {
410 group_id,
411 agent_count,
412 } => AuditAction::Other {
413 detail: format!("group_created:{group_id}:{agent_count}agents"),
414 },
415 KernelEvent::AgentGroupMemberCompleted {
416 group_id,
417 agent_id,
418 success,
419 } => AuditAction::Other {
420 detail: format!("group_member_completed:{group_id}:{agent_id}:{success}"),
421 },
422 KernelEvent::EvolutionStarted {
423 seed_id,
424 new_seed_id,
425 iteration,
426 } => AuditAction::Other {
427 detail: format!("evolution:{seed_id}->{new_seed_id}:iter{iteration}"),
428 },
429 KernelEvent::EvolutionMaxReached {
430 seed_id,
431 final_score,
432 iterations,
433 } => AuditAction::Other {
434 detail: format!("evolution_max:{seed_id}:score={final_score}:iters={iterations}"),
435 },
436 KernelEvent::ProjectCreated {
437 project_id: _,
438 name,
439 source,
440 } => AuditAction::Other {
441 detail: format!("project_created:{name}:{source}"),
442 },
443 KernelEvent::ProjectActivated {
444 project_id: _,
445 name,
446 } => AuditAction::Other {
447 detail: format!("project_activated:{name}"),
448 },
449 KernelEvent::ToolExecutionStarted { tool_name, .. } => AuditAction::Other {
451 detail: format!("tool_started:{tool_name}"),
452 },
453 KernelEvent::ToolExecutionFinished {
454 tool_name,
455 is_error,
456 ..
457 } => AuditAction::Other {
458 detail: format!(
459 "tool_finished:{tool_name}:{}",
460 if *is_error { "error" } else { "ok" }
461 ),
462 },
463 KernelEvent::ToolExecutionProgress {
464 tool_name,
465 tab_id,
466 context,
467 ..
468 } => AuditAction::Other {
469 detail: {
470 let mut d = format!("tool_progress:{tool_name}");
471 if let Some(id) = tab_id {
472 d.push_str(&format!(":tab={id}"));
473 }
474 if let Some(ctx) = context
475 .as_ref()
476 .and_then(|c| c.get("kind"))
477 .and_then(|k| k.as_str())
478 {
479 d.push_str(&format!(":{ctx}"));
480 }
481 d
482 },
483 },
484 KernelEvent::MemoryRecallUsed { query, count, .. } => AuditAction::MemoryRead {
485 entry_id: format!("recall:{query}:{count}results"),
486 },
487 KernelEvent::TokenUsageUpdate {
488 input_tokens,
489 output_tokens,
490 ..
491 } => AuditAction::Other {
492 detail: format!("tokens:in={input_tokens}:out={output_tokens}"),
493 },
494 KernelEvent::ReasoningFragment { source, .. } => AuditAction::Other {
495 detail: format!("reasoning:{source}"),
496 },
497 KernelEvent::CalendarEventCreated { uid, title, .. } => AuditAction::Other {
498 detail: format!("calendar:created:{uid}:{title}"),
499 },
500 KernelEvent::CalendarEventUpdated { uid, title } => AuditAction::Other {
501 detail: format!("calendar:updated:{uid}:{title}"),
502 },
503 KernelEvent::CalendarEventDeleted { uid, title } => AuditAction::Other {
504 detail: format!("calendar:deleted:{uid}:{title}"),
505 },
506 KernelEvent::EmailSent {
507 subject,
508 message_id,
509 template_name,
510 } => AuditAction::Other {
511 detail: format!("email:sent:{subject} (msg={message_id}, tpl={template_name:?})"),
512 },
513 KernelEvent::KnowledgePersisted {
514 session_id,
515 message_index,
516 path,
517 source,
518 } => AuditAction::Other {
519 detail: format!("knowledge:persisted:{session_id}:{message_index}:{path}:{source}"),
520 },
521 KernelEvent::KnowledgeRemoved {
522 session_id,
523 message_index,
524 } => AuditAction::Other {
525 detail: format!("knowledge:removed:{session_id}:{message_index}"),
526 },
527 KernelEvent::AskUserRequest { id, question, .. } => AuditAction::Other {
528 detail: format!("ask_user:{id}:{question}"),
529 },
530 }
531}
532
533fn extract_agent_id(event: &KernelEvent) -> String {
535 match event {
536 KernelEvent::AgentCreated { id, .. } => id.to_string(),
537 KernelEvent::AgentStarted { id, .. } => id.to_string(),
538 KernelEvent::AgentStopped { id, .. } => id.to_string(),
539 KernelEvent::AgentFailed { id, .. } => id.to_string(),
540 KernelEvent::MessageReceived { from, .. } => from.to_string(),
541 KernelEvent::AgentOutput { agent_id, .. } => agent_id.to_string(),
542 KernelEvent::AgentGroupMemberCompleted { agent_id, .. } => agent_id.to_string(),
543 KernelEvent::ProjectActivated { project_id, .. } => format!("project:{project_id}"),
544 KernelEvent::ToolExecutionStarted { session_id, .. } => format!("session:{session_id}"),
546 KernelEvent::ToolExecutionFinished { session_id, .. } => format!("session:{session_id}"),
547 KernelEvent::ToolExecutionProgress { session_id, .. } => format!("session:{session_id}"),
548 KernelEvent::MemoryRecallUsed { session_id, .. } => format!("session:{session_id}"),
549 KernelEvent::TokenUsageUpdate { session_id, .. } => format!("session:{session_id}"),
550 KernelEvent::ReasoningFragment { session_id, .. } => format!("session:{session_id}"),
551 KernelEvent::KnowledgePersisted { session_id, .. } => format!("session:{session_id}"),
552 KernelEvent::KnowledgeRemoved { session_id, .. } => format!("session:{session_id}"),
553 _ => "system".to_string(),
554 }
555}
556
557pub fn attach_audit_trail(bus: &EventBus, audit: Arc<AuditTrail>) {
563 let mut rx = bus.subscribe();
564 tokio::spawn(async move {
565 loop {
566 match rx.recv().await {
567 Ok(event) => {
568 let actor = extract_agent_id(&event);
569 let action = kernel_event_to_audit_action(&event);
570 let resource = format!("{event:?}");
571 audit.append(actor, action, resource);
572 }
573 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
574 crate::metrics::get_metrics().audit_lagged_events.inc_by(n);
578 tracing::warn!(
579 skipped = n,
580 "Audit trail subscriber lagged, skipping events"
581 );
582 continue;
583 }
584 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
585 tracing::info!("Audit trail event bus closed, exiting");
586 break;
587 }
588 }
589 }
590 });
591}
592
593#[cfg(test)]
594mod tests {
595 use super::*;
596
597 fn sample_event(name: &str) -> KernelEvent {
598 KernelEvent::AgentCreated {
599 id: AgentId::new_v4(),
600 name: name.to_string(),
601 }
602 }
603
604 #[test]
605 fn test_event_bus_uses_sdk() {
606 let bus: EventBus = EventBus::new(256);
607 assert!(format!("{:?}", bus).contains("EventBus"));
608 }
609
610 #[tokio::test]
611 async fn test_publish_no_subscribers_ok() {
612 let bus = EventBus::new(16);
613 let result = bus.publish(sample_event("orphan"));
614 assert!(result.is_ok());
615 }
616
617 #[tokio::test]
618 async fn test_single_subscriber_receives_event() {
619 let bus = EventBus::new(16);
620 let mut rx = bus.subscribe();
621
622 let event = sample_event("test-agent");
623 bus.publish(event.clone()).unwrap();
624
625 let received = rx.try_recv().expect("should receive event");
626 match received {
627 KernelEvent::AgentCreated { name, .. } => assert_eq!(name, "test-agent"),
628 _ => panic!("wrong event type"),
629 }
630 }
631
632 #[tokio::test]
633 async fn test_multiple_subscribers_receive_events() {
634 let bus = EventBus::new(16);
635 let mut rx1 = bus.subscribe();
636 let mut rx2 = bus.subscribe();
637
638 let event = sample_event("multi");
639 bus.publish(event.clone()).unwrap();
640
641 let r1 = rx1.try_recv().expect("rx1 should receive event");
642 let r2 = rx2.try_recv().expect("rx2 should receive event");
643
644 assert!(matches!(r1, KernelEvent::AgentCreated { .. }));
645 assert!(matches!(r2, KernelEvent::AgentCreated { .. }));
646 }
647
648 #[tokio::test]
649 async fn test_kernel_event_to_audit_action() {
650 let event = KernelEvent::AgentFailed {
651 id: AgentId::new_v4(),
652 error: "boom".to_string(),
653 };
654 let action = kernel_event_to_audit_action(&event);
655 match action {
656 AuditAction::AgentExit { reason } => assert_eq!(reason, "boom"),
657 other => panic!("expected AgentExit, got {other:?}"),
658 }
659 }
660
661 #[test]
667 fn test_rfc015_event_round_trip_json() {
668 let cases: Vec<KernelEvent> = vec![
669 KernelEvent::ToolExecutionStarted {
670 session_id: "s1".into(),
671 tool_name: "read_file".into(),
672 tool_call_id: "call_1".into(),
673 tool_args: serde_json::json!({"path": "/src/main.rs"}),
674 context: None,
675 },
676 KernelEvent::ToolExecutionFinished {
677 session_id: "s1".into(),
678 tool_call_id: "call_1".into(),
679 tool_name: "read_file".into(),
680 duration_ms: 234,
681 is_error: false,
682 output_summary: "fn main() {}".into(),
683 },
684 KernelEvent::ToolExecutionProgress {
685 session_id: "s1".into(),
686 tool_call_id: "call_1".into(),
687 tool_name: "read_file".into(),
688 progress: "reading line 42/100".into(),
689 tab_id: None,
690 context: None,
691 },
692 KernelEvent::MemoryRecallUsed {
693 session_id: "s1".into(),
694 query: "rust errors".into(),
695 count: 3,
696 source: "warm".into(),
697 },
698 KernelEvent::TokenUsageUpdate {
699 session_id: "s1".into(),
700 input_tokens: 1234,
701 output_tokens: 567,
702 },
703 KernelEvent::ReasoningFragment {
704 session_id: "s1".into(),
705 content: "compaction done".into(),
706 source: "compaction".into(),
707 },
708 ];
709 for event in cases {
710 let json = serde_json::to_string(&event).expect("serialize");
711 let back: KernelEvent = serde_json::from_str(&json).expect("deserialize");
712 let json2 = serde_json::to_string(&back).expect("serialize round-trip");
713 assert_eq!(json, json2, "round-trip should be stable");
714 }
715 }
716
717 #[test]
720 fn test_tool_execution_progress_serde_round_trip() {
721 let event = KernelEvent::ToolExecutionProgress {
722 session_id: "s-abc".into(),
723 tool_call_id: "call_42".into(),
724 tool_name: "browse".into(),
725 progress: "loading https://example.com".into(),
726 tab_id: Some(Uuid::new_v4()),
727 context: None,
728 };
729 let json = serde_json::to_string(&event).expect("serialize");
730 let back: KernelEvent = serde_json::from_str(&json).expect("deserialize");
731 match back {
732 KernelEvent::ToolExecutionProgress {
733 ref session_id,
734 ref tool_call_id,
735 ref tool_name,
736 ref progress,
737 tab_id,
738 ..
739 } => {
740 assert_eq!(session_id, "s-abc");
741 assert_eq!(tool_call_id, "call_42");
742 assert_eq!(tool_name, "browse");
743 assert_eq!(progress, "loading https://example.com");
744 assert!(tab_id.is_some(), "tab_id should round-trip when present");
745 }
746 other => panic!("expected ToolExecutionProgress, got {other:?}"),
747 }
748 }
749
750 #[test]
756 fn test_tool_execution_progress_audit_action() {
757 let with_tab = KernelEvent::ToolExecutionProgress {
758 session_id: "s1".into(),
759 tool_call_id: "c1".into(),
760 tool_name: "browse".into(),
761 progress: "navigating".into(),
762 tab_id: Some(Uuid::new_v4()),
763 context: None,
764 };
765 match kernel_event_to_audit_action(&with_tab) {
766 AuditAction::Other { detail } => {
767 assert!(detail.contains("tool_progress"), "detail: {detail}");
768 assert!(detail.contains("browse"), "detail: {detail}");
769 assert!(
770 detail.contains(":tab="),
771 "detail should include tab id: {detail}"
772 );
773 }
774 other => panic!("expected Other, got {other:?}"),
775 }
776 let without_tab = KernelEvent::ToolExecutionProgress {
777 session_id: "s1".into(),
778 tool_call_id: "c1".into(),
779 tool_name: "browse".into(),
780 progress: "navigating".into(),
781 tab_id: None,
782 context: None,
783 };
784 match kernel_event_to_audit_action(&without_tab) {
785 AuditAction::Other { detail } => {
786 assert_eq!(detail, "tool_progress:browse");
787 }
788 other => panic!("expected Other, got {other:?}"),
789 }
790 }
791
792 #[test]
796 fn test_tool_execution_progress_tab_id_optional_in_serde() {
797 let legacy_json = r#"{
800 "ToolExecutionProgress": {
801 "session_id": "s-old",
802 "tool_call_id": "call_legacy",
803 "tool_name": "browse",
804 "progress": "step 1"
805 }
806 }"#;
807 let event: KernelEvent = serde_json::from_str(legacy_json).expect("deserialize legacy");
808 match &event {
809 KernelEvent::ToolExecutionProgress {
810 session_id,
811 tool_call_id,
812 tool_name,
813 progress,
814 tab_id,
815 ..
816 } => {
817 assert_eq!(session_id, "s-old");
818 assert_eq!(tool_call_id, "call_legacy");
819 assert_eq!(tool_name, "browse");
820 assert_eq!(progress, "step 1");
821 assert!(tab_id.is_none(), "missing field should default to None");
822 }
823 other => panic!("expected ToolExecutionProgress, got {other:?}"),
824 }
825 let json = serde_json::to_string(&event).expect("serialize");
828 assert!(
829 !json.contains("tab_id"),
830 "tab_id should be omitted when None: {json}"
831 );
832 }
833
834 #[test]
838 fn test_rfc015_extract_agent_id() {
839 let event = KernelEvent::ToolExecutionStarted {
840 session_id: "abc-123".into(),
841 tool_name: "bash".into(),
842 tool_call_id: "c1".into(),
843 tool_args: serde_json::Value::Null,
844 context: None,
845 };
846 let action = kernel_event_to_audit_action(&event);
849 match action {
850 AuditAction::Other { detail } => {
851 assert!(
852 detail.contains("bash"),
853 "tool name in audit detail: {detail}"
854 );
855 }
856 other => panic!("expected Other, got {other:?}"),
857 }
858 }
859}