1use oxi_sdk::EventBus as SdkEventBus;
15use oxi_sdk::observability::{AuditAction, AuditTrail};
16use serde::{Deserialize, Serialize};
17use std::sync::Arc;
18use uuid::Uuid;
19
20use crate::types::AgentId;
21
22pub type EventBus = SdkEventBus<KernelEvent>;
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31pub enum KernelEvent {
32 AgentCreated {
34 id: AgentId,
36 name: String,
38 },
39 AgentStarted {
41 id: AgentId,
43 },
44 AgentStopped {
46 id: AgentId,
48 },
49 AgentFailed {
51 id: AgentId,
53 error: String,
55 },
56 MessageReceived {
58 from: AgentId,
60 content: String,
62 },
63 SeedCreated {
65 seed_id: uuid::Uuid,
67 },
68 EvaluationComplete {
70 seed_id: uuid::Uuid,
72 passed: bool,
74 },
75 PhaseStarted {
77 session_id: String,
79 phase: oxios_ouroboros::Phase,
81 },
82 PhaseCompleted {
84 session_id: String,
86 phase: oxios_ouroboros::Phase,
88 result_summary: String,
90 },
91 AgentOutput {
93 session_id: String,
95 agent_id: AgentId,
97 output: String,
99 },
100 ApprovalRequested {
102 id: uuid::Uuid,
104 tool_name: String,
106 action: String,
108 resource: String,
110 reason: String,
112 session_id: Option<String>,
114 },
115 ApprovalResolved {
117 id: uuid::Uuid,
119 approved: bool,
121 },
122 MemoryStored {
124 id: String,
126 memory_type: String,
128 source: String,
130 },
131 MemoryRecalled {
133 query: String,
135 count: usize,
137 },
138 AgentGroupCreated {
140 group_id: uuid::Uuid,
142 agent_count: usize,
144 },
145 AgentGroupMemberCompleted {
147 group_id: uuid::Uuid,
149 agent_id: uuid::Uuid,
151 success: bool,
153 },
154 ProjectCreated {
156 project_id: uuid::Uuid,
158 name: String,
160 source: String,
162 },
163 ProjectActivated {
165 project_id: uuid::Uuid,
167 name: String,
169 },
170 EvolutionStarted {
172 seed_id: uuid::Uuid,
174 new_seed_id: uuid::Uuid,
176 iteration: u32,
178 },
179 EvolutionMaxReached {
181 seed_id: uuid::Uuid,
183 final_score: f64,
185 iterations: u32,
187 },
188
189 ToolExecutionStarted {
194 session_id: String,
196 tool_name: String,
198 tool_call_id: String,
200 tool_args: serde_json::Value,
202 #[serde(default, skip_serializing_if = "Option::is_none")]
205 context: Option<serde_json::Value>,
206 },
207 ToolExecutionFinished {
209 session_id: String,
211 tool_call_id: String,
213 tool_name: String,
215 duration_ms: u64,
217 is_error: bool,
219 output_summary: String,
221 },
222 ToolExecutionProgress {
224 session_id: String,
226 tool_call_id: String,
228 tool_name: String,
230 progress: String,
232 #[serde(default, skip_serializing_if = "Option::is_none")]
236 tab_id: Option<Uuid>,
237 #[serde(default, skip_serializing_if = "Option::is_none")]
243 context: Option<serde_json::Value>,
244 },
245 MemoryRecallUsed {
247 session_id: String,
249 query: String,
251 count: usize,
253 source: String,
255 },
256 TokenUsageUpdate {
258 session_id: String,
260 input_tokens: u64,
262 output_tokens: u64,
264 },
265 ReasoningFragment {
267 session_id: String,
269 content: String,
271 source: String,
273 },
274
275 CalendarEventCreated {
278 uid: String,
280 title: String,
282 start: String,
284 end: String,
286 },
287 CalendarEventUpdated {
289 uid: String,
291 title: String,
293 },
294 CalendarEventDeleted {
296 uid: String,
298 title: String,
300 },
301 EmailSent {
303 subject: String,
305 message_id: String,
307 #[serde(default, skip_serializing_if = "Option::is_none")]
309 template_name: Option<String>,
310 },
311
312 KnowledgePersisted {
315 session_id: String,
316 message_index: usize,
317 path: String,
318 source: String, },
320 KnowledgeRemoved {
322 session_id: String,
323 message_index: usize,
324 },
325}
326
327pub fn kernel_event_to_audit_action(event: &KernelEvent) -> AuditAction {
329 match event {
330 KernelEvent::AgentCreated { name, .. } => AuditAction::AgentSpawn {
331 task_type: name.clone(),
332 },
333 KernelEvent::AgentStarted { .. } => AuditAction::AgentSpawn {
334 task_type: "started".to_string(),
335 },
336 KernelEvent::AgentStopped { .. } => AuditAction::AgentExit {
337 reason: "stopped".to_string(),
338 },
339 KernelEvent::AgentFailed { error, .. } => AuditAction::AgentExit {
340 reason: error.clone(),
341 },
342 KernelEvent::MessageReceived { content, .. } => AuditAction::Other {
343 detail: format!("message: {content}"),
344 },
345 KernelEvent::SeedCreated { seed_id, .. } => AuditAction::Other {
346 detail: format!("seed_created:{seed_id}"),
347 },
348 KernelEvent::EvaluationComplete { seed_id, passed } => AuditAction::Other {
349 detail: format!("evaluation:{seed_id}:{passed}"),
350 },
351 KernelEvent::PhaseStarted { session_id, phase } => AuditAction::Other {
352 detail: format!("phase_started:{session_id}:{phase}"),
353 },
354 KernelEvent::PhaseCompleted {
355 session_id,
356 phase,
357 result_summary,
358 } => AuditAction::Other {
359 detail: format!("phase_completed:{session_id}:{phase}:{result_summary}"),
360 },
361 KernelEvent::AgentOutput { output, .. } => AuditAction::Other {
362 detail: format!("agent_output:{output}"),
363 },
364 KernelEvent::ApprovalRequested {
365 id,
366 action,
367 resource,
368 ..
369 } => AuditAction::Other {
370 detail: format!("approval_requested:{id}:{action}:{resource}"),
371 },
372 KernelEvent::ApprovalResolved { id, approved } => AuditAction::Other {
373 detail: format!("approval_resolved:{id}:{approved}"),
374 },
375 KernelEvent::MemoryStored {
376 id, memory_type, ..
377 } => AuditAction::MemoryWrite {
378 entry_id: format!("{id}:{memory_type}"),
379 },
380 KernelEvent::MemoryRecalled { query, count } => AuditAction::MemoryRead {
381 entry_id: format!("query:{query}:{count}results"),
382 },
383 KernelEvent::AgentGroupCreated {
384 group_id,
385 agent_count,
386 } => AuditAction::Other {
387 detail: format!("group_created:{group_id}:{agent_count}agents"),
388 },
389 KernelEvent::AgentGroupMemberCompleted {
390 group_id,
391 agent_id,
392 success,
393 } => AuditAction::Other {
394 detail: format!("group_member_completed:{group_id}:{agent_id}:{success}"),
395 },
396 KernelEvent::EvolutionStarted {
397 seed_id,
398 new_seed_id,
399 iteration,
400 } => AuditAction::Other {
401 detail: format!("evolution:{seed_id}->{new_seed_id}:iter{iteration}"),
402 },
403 KernelEvent::EvolutionMaxReached {
404 seed_id,
405 final_score,
406 iterations,
407 } => AuditAction::Other {
408 detail: format!("evolution_max:{seed_id}:score={final_score}:iters={iterations}"),
409 },
410 KernelEvent::ProjectCreated {
411 project_id: _,
412 name,
413 source,
414 } => AuditAction::Other {
415 detail: format!("project_created:{name}:{source}"),
416 },
417 KernelEvent::ProjectActivated {
418 project_id: _,
419 name,
420 } => AuditAction::Other {
421 detail: format!("project_activated:{name}"),
422 },
423 KernelEvent::ToolExecutionStarted { tool_name, .. } => AuditAction::Other {
425 detail: format!("tool_started:{tool_name}"),
426 },
427 KernelEvent::ToolExecutionFinished {
428 tool_name,
429 is_error,
430 ..
431 } => AuditAction::Other {
432 detail: format!(
433 "tool_finished:{tool_name}:{}",
434 if *is_error { "error" } else { "ok" }
435 ),
436 },
437 KernelEvent::ToolExecutionProgress {
438 tool_name,
439 tab_id,
440 context,
441 ..
442 } => AuditAction::Other {
443 detail: {
444 let mut d = format!("tool_progress:{tool_name}");
445 if let Some(id) = tab_id {
446 d.push_str(&format!(":tab={id}"));
447 }
448 if let Some(ctx) = context
449 .as_ref()
450 .and_then(|c| c.get("kind"))
451 .and_then(|k| k.as_str())
452 {
453 d.push_str(&format!(":{ctx}"));
454 }
455 d
456 },
457 },
458 KernelEvent::MemoryRecallUsed { query, count, .. } => AuditAction::MemoryRead {
459 entry_id: format!("recall:{query}:{count}results"),
460 },
461 KernelEvent::TokenUsageUpdate {
462 input_tokens,
463 output_tokens,
464 ..
465 } => AuditAction::Other {
466 detail: format!("tokens:in={input_tokens}:out={output_tokens}"),
467 },
468 KernelEvent::ReasoningFragment { source, .. } => AuditAction::Other {
469 detail: format!("reasoning:{source}"),
470 },
471 KernelEvent::CalendarEventCreated { uid, title, .. } => AuditAction::Other {
472 detail: format!("calendar:created:{uid}:{title}"),
473 },
474 KernelEvent::CalendarEventUpdated { uid, title } => AuditAction::Other {
475 detail: format!("calendar:updated:{uid}:{title}"),
476 },
477 KernelEvent::CalendarEventDeleted { uid, title } => AuditAction::Other {
478 detail: format!("calendar:deleted:{uid}:{title}"),
479 },
480 KernelEvent::EmailSent {
481 subject,
482 message_id,
483 template_name,
484 } => AuditAction::Other {
485 detail: format!("email:sent:{subject} (msg={message_id}, tpl={template_name:?})"),
486 },
487 KernelEvent::KnowledgePersisted {
488 session_id,
489 message_index,
490 path,
491 source,
492 } => AuditAction::Other {
493 detail: format!("knowledge:persisted:{session_id}:{message_index}:{path}:{source}"),
494 },
495 KernelEvent::KnowledgeRemoved {
496 session_id,
497 message_index,
498 } => AuditAction::Other {
499 detail: format!("knowledge:removed:{session_id}:{message_index}"),
500 },
501 }
502}
503
504fn extract_agent_id(event: &KernelEvent) -> String {
506 match event {
507 KernelEvent::AgentCreated { id, .. } => id.to_string(),
508 KernelEvent::AgentStarted { id, .. } => id.to_string(),
509 KernelEvent::AgentStopped { id, .. } => id.to_string(),
510 KernelEvent::AgentFailed { id, .. } => id.to_string(),
511 KernelEvent::MessageReceived { from, .. } => from.to_string(),
512 KernelEvent::AgentOutput { agent_id, .. } => agent_id.to_string(),
513 KernelEvent::AgentGroupMemberCompleted { agent_id, .. } => agent_id.to_string(),
514 KernelEvent::ProjectActivated { project_id, .. } => format!("project:{project_id}"),
515 KernelEvent::ToolExecutionStarted { session_id, .. } => format!("session:{session_id}"),
517 KernelEvent::ToolExecutionFinished { session_id, .. } => format!("session:{session_id}"),
518 KernelEvent::ToolExecutionProgress { session_id, .. } => format!("session:{session_id}"),
519 KernelEvent::MemoryRecallUsed { session_id, .. } => format!("session:{session_id}"),
520 KernelEvent::TokenUsageUpdate { session_id, .. } => format!("session:{session_id}"),
521 KernelEvent::ReasoningFragment { session_id, .. } => format!("session:{session_id}"),
522 KernelEvent::KnowledgePersisted { session_id, .. } => format!("session:{session_id}"),
523 KernelEvent::KnowledgeRemoved { session_id, .. } => format!("session:{session_id}"),
524 _ => "system".to_string(),
525 }
526}
527
528pub fn attach_audit_trail(bus: &EventBus, audit: Arc<AuditTrail>) {
534 let mut rx = bus.subscribe();
535 tokio::spawn(async move {
536 loop {
537 match rx.recv().await {
538 Ok(event) => {
539 let actor = extract_agent_id(&event);
540 let action = kernel_event_to_audit_action(&event);
541 let resource = format!("{event:?}");
542 audit.append(actor, action, resource);
543 }
544 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
545 crate::metrics::get_metrics().audit_lagged_events.inc_by(n);
549 tracing::warn!(
550 skipped = n,
551 "Audit trail subscriber lagged, skipping events"
552 );
553 continue;
554 }
555 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
556 tracing::info!("Audit trail event bus closed, exiting");
557 break;
558 }
559 }
560 }
561 });
562}
563
564#[cfg(test)]
565mod tests {
566 use super::*;
567
568 fn sample_event(name: &str) -> KernelEvent {
569 KernelEvent::AgentCreated {
570 id: AgentId::new_v4(),
571 name: name.to_string(),
572 }
573 }
574
575 #[test]
576 fn test_event_bus_uses_sdk() {
577 let bus: EventBus = EventBus::new(256);
578 assert!(format!("{:?}", bus).contains("EventBus"));
579 }
580
581 #[tokio::test]
582 async fn test_publish_no_subscribers_ok() {
583 let bus = EventBus::new(16);
584 let result = bus.publish(sample_event("orphan"));
585 assert!(result.is_ok());
586 }
587
588 #[tokio::test]
589 async fn test_single_subscriber_receives_event() {
590 let bus = EventBus::new(16);
591 let mut rx = bus.subscribe();
592
593 let event = sample_event("test-agent");
594 bus.publish(event.clone()).unwrap();
595
596 let received = rx.try_recv().expect("should receive event");
597 match received {
598 KernelEvent::AgentCreated { name, .. } => assert_eq!(name, "test-agent"),
599 _ => panic!("wrong event type"),
600 }
601 }
602
603 #[tokio::test]
604 async fn test_multiple_subscribers_receive_events() {
605 let bus = EventBus::new(16);
606 let mut rx1 = bus.subscribe();
607 let mut rx2 = bus.subscribe();
608
609 let event = sample_event("multi");
610 bus.publish(event.clone()).unwrap();
611
612 let r1 = rx1.try_recv().expect("rx1 should receive event");
613 let r2 = rx2.try_recv().expect("rx2 should receive event");
614
615 assert!(matches!(r1, KernelEvent::AgentCreated { .. }));
616 assert!(matches!(r2, KernelEvent::AgentCreated { .. }));
617 }
618
619 #[tokio::test]
620 async fn test_kernel_event_to_audit_action() {
621 let event = KernelEvent::AgentFailed {
622 id: AgentId::new_v4(),
623 error: "boom".to_string(),
624 };
625 let action = kernel_event_to_audit_action(&event);
626 match action {
627 AuditAction::AgentExit { reason } => assert_eq!(reason, "boom"),
628 other => panic!("expected AgentExit, got {other:?}"),
629 }
630 }
631
632 #[test]
638 fn test_rfc015_event_round_trip_json() {
639 let cases: Vec<KernelEvent> = vec![
640 KernelEvent::ToolExecutionStarted {
641 session_id: "s1".into(),
642 tool_name: "read_file".into(),
643 tool_call_id: "call_1".into(),
644 tool_args: serde_json::json!({"path": "/src/main.rs"}),
645 context: None,
646 },
647 KernelEvent::ToolExecutionFinished {
648 session_id: "s1".into(),
649 tool_call_id: "call_1".into(),
650 tool_name: "read_file".into(),
651 duration_ms: 234,
652 is_error: false,
653 output_summary: "fn main() {}".into(),
654 },
655 KernelEvent::ToolExecutionProgress {
656 session_id: "s1".into(),
657 tool_call_id: "call_1".into(),
658 tool_name: "read_file".into(),
659 progress: "reading line 42/100".into(),
660 tab_id: None,
661 context: None,
662 },
663 KernelEvent::MemoryRecallUsed {
664 session_id: "s1".into(),
665 query: "rust errors".into(),
666 count: 3,
667 source: "warm".into(),
668 },
669 KernelEvent::TokenUsageUpdate {
670 session_id: "s1".into(),
671 input_tokens: 1234,
672 output_tokens: 567,
673 },
674 KernelEvent::ReasoningFragment {
675 session_id: "s1".into(),
676 content: "compaction done".into(),
677 source: "compaction".into(),
678 },
679 ];
680 for event in cases {
681 let json = serde_json::to_string(&event).expect("serialize");
682 let back: KernelEvent = serde_json::from_str(&json).expect("deserialize");
683 let json2 = serde_json::to_string(&back).expect("serialize round-trip");
684 assert_eq!(json, json2, "round-trip should be stable");
685 }
686 }
687
688 #[test]
691 fn test_tool_execution_progress_serde_round_trip() {
692 let event = KernelEvent::ToolExecutionProgress {
693 session_id: "s-abc".into(),
694 tool_call_id: "call_42".into(),
695 tool_name: "browse".into(),
696 progress: "loading https://example.com".into(),
697 tab_id: Some(Uuid::new_v4()),
698 context: None,
699 };
700 let json = serde_json::to_string(&event).expect("serialize");
701 let back: KernelEvent = serde_json::from_str(&json).expect("deserialize");
702 match back {
703 KernelEvent::ToolExecutionProgress {
704 ref session_id,
705 ref tool_call_id,
706 ref tool_name,
707 ref progress,
708 tab_id,
709 ..
710 } => {
711 assert_eq!(session_id, "s-abc");
712 assert_eq!(tool_call_id, "call_42");
713 assert_eq!(tool_name, "browse");
714 assert_eq!(progress, "loading https://example.com");
715 assert!(tab_id.is_some(), "tab_id should round-trip when present");
716 }
717 other => panic!("expected ToolExecutionProgress, got {other:?}"),
718 }
719 }
720
721 #[test]
727 fn test_tool_execution_progress_audit_action() {
728 let with_tab = KernelEvent::ToolExecutionProgress {
729 session_id: "s1".into(),
730 tool_call_id: "c1".into(),
731 tool_name: "browse".into(),
732 progress: "navigating".into(),
733 tab_id: Some(Uuid::new_v4()),
734 context: None,
735 };
736 match kernel_event_to_audit_action(&with_tab) {
737 AuditAction::Other { detail } => {
738 assert!(detail.contains("tool_progress"), "detail: {detail}");
739 assert!(detail.contains("browse"), "detail: {detail}");
740 assert!(
741 detail.contains(":tab="),
742 "detail should include tab id: {detail}"
743 );
744 }
745 other => panic!("expected Other, got {other:?}"),
746 }
747 let without_tab = KernelEvent::ToolExecutionProgress {
748 session_id: "s1".into(),
749 tool_call_id: "c1".into(),
750 tool_name: "browse".into(),
751 progress: "navigating".into(),
752 tab_id: None,
753 context: None,
754 };
755 match kernel_event_to_audit_action(&without_tab) {
756 AuditAction::Other { detail } => {
757 assert_eq!(detail, "tool_progress:browse");
758 }
759 other => panic!("expected Other, got {other:?}"),
760 }
761 }
762
763 #[test]
767 fn test_tool_execution_progress_tab_id_optional_in_serde() {
768 let legacy_json = r#"{
771 "ToolExecutionProgress": {
772 "session_id": "s-old",
773 "tool_call_id": "call_legacy",
774 "tool_name": "browse",
775 "progress": "step 1"
776 }
777 }"#;
778 let event: KernelEvent = serde_json::from_str(legacy_json).expect("deserialize legacy");
779 match &event {
780 KernelEvent::ToolExecutionProgress {
781 session_id,
782 tool_call_id,
783 tool_name,
784 progress,
785 tab_id,
786 ..
787 } => {
788 assert_eq!(session_id, "s-old");
789 assert_eq!(tool_call_id, "call_legacy");
790 assert_eq!(tool_name, "browse");
791 assert_eq!(progress, "step 1");
792 assert!(tab_id.is_none(), "missing field should default to None");
793 }
794 other => panic!("expected ToolExecutionProgress, got {other:?}"),
795 }
796 let json = serde_json::to_string(&event).expect("serialize");
799 assert!(
800 !json.contains("tab_id"),
801 "tab_id should be omitted when None: {json}"
802 );
803 }
804
805 #[test]
809 fn test_rfc015_extract_agent_id() {
810 let event = KernelEvent::ToolExecutionStarted {
811 session_id: "abc-123".into(),
812 tool_name: "bash".into(),
813 tool_call_id: "c1".into(),
814 tool_args: serde_json::Value::Null,
815 context: None,
816 };
817 let action = kernel_event_to_audit_action(&event);
820 match action {
821 AuditAction::Other { detail } => {
822 assert!(
823 detail.contains("bash"),
824 "tool name in audit detail: {detail}"
825 );
826 }
827 other => panic!("expected Other, got {other:?}"),
828 }
829 }
830}