use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::broadcast;
use crate::audit_trail::{AuditAction, AuditTrail};
use crate::types::AgentId;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum KernelEvent {
AgentCreated {
id: AgentId,
name: String,
},
AgentStarted {
id: AgentId,
},
AgentStopped {
id: AgentId,
},
AgentFailed {
id: AgentId,
error: String,
},
MessageReceived {
from: AgentId,
content: String,
},
SeedCreated {
seed_id: uuid::Uuid,
},
EvaluationComplete {
seed_id: uuid::Uuid,
passed: bool,
},
PhaseStarted {
session_id: String,
phase: oxios_ouroboros::Phase,
},
PhaseCompleted {
session_id: String,
phase: oxios_ouroboros::Phase,
result_summary: String,
},
AgentOutput {
session_id: String,
agent_id: AgentId,
output: String,
},
ApprovalRequested {
id: uuid::Uuid,
action: String,
resource: String,
reason: String,
},
ApprovalResolved {
id: uuid::Uuid,
approved: bool,
},
MemoryStored {
id: String,
memory_type: String,
source: String,
},
MemoryRecalled {
query: String,
count: usize,
},
AgentGroupCreated {
group_id: uuid::Uuid,
agent_count: usize,
},
AgentGroupMemberCompleted {
group_id: uuid::Uuid,
agent_id: uuid::Uuid,
success: bool,
},
ProjectCreated {
project_id: uuid::Uuid,
name: String,
source: String,
},
ProjectActivated {
project_id: uuid::Uuid,
name: String,
},
EvolutionStarted {
seed_id: uuid::Uuid,
new_seed_id: uuid::Uuid,
iteration: u32,
},
EvolutionMaxReached {
seed_id: uuid::Uuid,
final_score: f64,
iterations: u32,
},
}
pub fn kernel_event_to_audit_action(event: &KernelEvent) -> AuditAction {
match event {
KernelEvent::AgentCreated { name, .. } => AuditAction::AgentSpawn {
task_type: name.clone(),
},
KernelEvent::AgentStarted { .. } => AuditAction::AgentSpawn {
task_type: "started".to_string(),
},
KernelEvent::AgentStopped { .. } => AuditAction::AgentExit {
reason: "stopped".to_string(),
},
KernelEvent::AgentFailed { error, .. } => AuditAction::AgentExit {
reason: error.clone(),
},
KernelEvent::MessageReceived { content, .. } => AuditAction::Other {
detail: format!("message: {content}"),
},
KernelEvent::SeedCreated { seed_id, .. } => AuditAction::Other {
detail: format!("seed_created:{seed_id}"),
},
KernelEvent::EvaluationComplete { seed_id, passed } => AuditAction::Other {
detail: format!("evaluation:{seed_id}:{passed}"),
},
KernelEvent::PhaseStarted { session_id, phase } => AuditAction::Other {
detail: format!("phase_started:{session_id}:{phase}"),
},
KernelEvent::PhaseCompleted {
session_id,
phase,
result_summary,
} => AuditAction::Other {
detail: format!("phase_completed:{session_id}:{phase}:{result_summary}"),
},
KernelEvent::AgentOutput { output, .. } => AuditAction::Other {
detail: format!("agent_output:{output}"),
},
KernelEvent::ApprovalRequested {
id,
action,
resource,
reason: _,
} => AuditAction::Other {
detail: format!("approval_requested:{id}:{action}:{resource}"),
},
KernelEvent::ApprovalResolved { id, approved } => AuditAction::Other {
detail: format!("approval_resolved:{id}:{approved}"),
},
KernelEvent::MemoryStored {
id, memory_type, ..
} => AuditAction::MemoryWrite {
entry_id: format!("{id}:{memory_type}"),
},
KernelEvent::MemoryRecalled { query, count } => AuditAction::MemoryRead {
entry_id: format!("query:{query}:{count}results"),
},
KernelEvent::AgentGroupCreated {
group_id,
agent_count,
} => AuditAction::Other {
detail: format!("group_created:{group_id}:{agent_count}agents"),
},
KernelEvent::AgentGroupMemberCompleted {
group_id,
agent_id,
success,
} => AuditAction::Other {
detail: format!("group_member_completed:{group_id}:{agent_id}:{success}"),
},
KernelEvent::EvolutionStarted {
seed_id,
new_seed_id,
iteration,
} => AuditAction::Other {
detail: format!("evolution:{seed_id}->{new_seed_id}:iter{iteration}"),
},
KernelEvent::EvolutionMaxReached {
seed_id,
final_score,
iterations,
} => AuditAction::Other {
detail: format!("evolution_max:{seed_id}:score={final_score}:iters={iterations}"),
},
KernelEvent::ProjectCreated {
project_id: _,
name,
source,
} => AuditAction::Other {
detail: format!("project_created:{name}:{source}"),
},
KernelEvent::ProjectActivated {
project_id: _,
name,
} => AuditAction::Other {
detail: format!("project_activated:{name}"),
},
}
}
fn extract_agent_id(event: &KernelEvent) -> String {
match event {
KernelEvent::AgentCreated { id, .. } => id.to_string(),
KernelEvent::AgentStarted { id, .. } => id.to_string(),
KernelEvent::AgentStopped { id, .. } => id.to_string(),
KernelEvent::AgentFailed { id, .. } => id.to_string(),
KernelEvent::MessageReceived { from, .. } => from.to_string(),
KernelEvent::AgentOutput { agent_id, .. } => agent_id.to_string(),
KernelEvent::AgentGroupMemberCompleted { agent_id, .. } => agent_id.to_string(),
KernelEvent::ProjectActivated { project_id, .. } => format!("project:{project_id}"),
_ => "system".to_string(),
}
}
#[derive(Clone)]
pub struct EventBus {
sender: broadcast::Sender<KernelEvent>,
}
impl EventBus {
pub fn new(capacity: usize) -> Self {
let (sender, _) = broadcast::channel(capacity);
Self { sender }
}
pub fn subscribe(&self) -> broadcast::Receiver<KernelEvent> {
self.sender.subscribe()
}
pub fn publish(&self, event: KernelEvent) -> Result<()> {
let _ = self.sender.send(event);
Ok(())
}
pub fn attach_audit_trail(&self, audit: Arc<AuditTrail>) {
let mut rx = self.subscribe();
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(event) => {
let actor = extract_agent_id(&event);
let action = kernel_event_to_audit_action(&event);
let resource = format!("{event:?}");
audit.append(actor, action, resource);
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!(
skipped = n,
"Audit trail subscriber lagged, skipping events"
);
continue;
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
tracing::info!("Audit trail event bus closed, exiting");
break;
}
}
}
});
}
}
impl std::fmt::Debug for EventBus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EventBus").finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn sample_event(name: &str) -> KernelEvent {
KernelEvent::AgentCreated {
id: AgentId::new_v4(),
name: name.to_string(),
}
}
#[test]
fn test_event_bus_new_and_debug() {
let bus = EventBus::new(256);
assert!(format!("{:?}", bus).contains("EventBus"));
}
#[tokio::test]
async fn test_publish_no_subscribers_ok() {
let bus = EventBus::new(16);
let result = bus.publish(sample_event("orphan"));
assert!(result.is_ok());
}
#[tokio::test]
async fn test_single_subscriber_receives_event() {
let bus = EventBus::new(16);
let mut rx = bus.subscribe();
let event = sample_event("test-agent");
bus.publish(event.clone()).unwrap();
let received = rx.try_recv().expect("should receive event");
match received {
KernelEvent::AgentCreated { name, .. } => assert_eq!(name, "test-agent"),
_ => panic!("wrong event type"),
}
}
#[tokio::test]
async fn test_multiple_subscribers_receive_events() {
let bus = EventBus::new(16);
let mut rx1 = bus.subscribe();
let mut rx2 = bus.subscribe();
bus.publish(sample_event("multi")).unwrap();
assert!(rx1.try_recv().is_ok());
assert!(rx2.try_recv().is_ok());
}
#[tokio::test]
async fn test_subscriber_receives_only_post_subscribe() {
let bus = EventBus::new(16);
bus.publish(sample_event("before")).unwrap();
let mut rx = bus.subscribe();
bus.publish(sample_event("after")).unwrap();
let received = rx.try_recv().expect("should receive event");
match received {
KernelEvent::AgentCreated { name, .. } => assert_eq!(name, "after"),
_ => panic!("wrong event type"),
}
assert!(rx.try_recv().is_err());
}
#[test]
fn test_kernel_event_serialization_roundtrip() {
let events = vec![
KernelEvent::AgentCreated {
id: AgentId::new_v4(),
name: "agent-1".to_string(),
},
KernelEvent::AgentFailed {
id: AgentId::new_v4(),
error: "timeout".to_string(),
},
KernelEvent::SeedCreated {
seed_id: uuid::Uuid::new_v4(),
},
KernelEvent::EvaluationComplete {
seed_id: uuid::Uuid::new_v4(),
passed: true,
},
KernelEvent::MemoryStored {
id: "mem-123".to_string(),
memory_type: "fact".to_string(),
source: "session".to_string(),
},
KernelEvent::MemoryRecalled {
query: "test query".to_string(),
count: 5,
},
KernelEvent::EvolutionStarted {
seed_id: uuid::Uuid::new_v4(),
new_seed_id: uuid::Uuid::new_v4(),
iteration: 2,
},
KernelEvent::EvolutionMaxReached {
seed_id: uuid::Uuid::new_v4(),
final_score: 0.85,
iterations: 10,
},
];
for event in events {
let json = serde_json::to_string(&event).unwrap();
let restored: KernelEvent = serde_json::from_str(&json).unwrap();
let json2 = serde_json::to_string(&restored).unwrap();
assert_eq!(json, json2, "roundtrip failed for {:?}", event);
}
}
#[test]
fn test_kernel_event_to_audit_action() {
let event = KernelEvent::AgentCreated {
id: AgentId::new_v4(),
name: "worker".to_string(),
};
let action = kernel_event_to_audit_action(&event);
match action {
AuditAction::AgentSpawn { task_type } => assert_eq!(task_type, "worker"),
_ => panic!("expected AgentSpawn"),
}
let event = KernelEvent::AgentFailed {
id: AgentId::new_v4(),
error: "OOM".to_string(),
};
let action = kernel_event_to_audit_action(&event);
match action {
AuditAction::AgentExit { reason } => assert_eq!(reason, "OOM"),
_ => panic!("expected AgentExit"),
}
let event = KernelEvent::MemoryStored {
id: "m1".to_string(),
memory_type: "fact".to_string(),
source: "auto".to_string(),
};
let action = kernel_event_to_audit_action(&event);
match action {
AuditAction::MemoryWrite { entry_id } => {
assert!(entry_id.contains("m1"));
assert!(entry_id.contains("fact"));
}
_ => panic!("expected MemoryWrite"),
}
let event = KernelEvent::MemoryRecalled {
query: "rust".to_string(),
count: 3,
};
let action = kernel_event_to_audit_action(&event);
match action {
AuditAction::MemoryRead { entry_id } => {
assert!(entry_id.contains("rust"));
assert!(entry_id.contains("3"));
}
_ => panic!("expected MemoryRead"),
}
}
#[test]
fn test_extract_agent_id() {
let id = AgentId::new_v4();
let event = KernelEvent::AgentCreated {
id,
name: "a".to_string(),
};
assert_eq!(extract_agent_id(&event), id.to_string());
let event = KernelEvent::AgentStarted { id };
assert_eq!(extract_agent_id(&event), id.to_string());
let event = KernelEvent::AgentStopped { id };
assert_eq!(extract_agent_id(&event), id.to_string());
let event = KernelEvent::AgentFailed {
id,
error: "err".to_string(),
};
assert_eq!(extract_agent_id(&event), id.to_string());
let event = KernelEvent::MessageReceived {
from: id,
content: "hello".to_string(),
};
assert_eq!(extract_agent_id(&event), id.to_string());
let event = KernelEvent::SeedCreated {
seed_id: uuid::Uuid::new_v4(),
};
assert_eq!(extract_agent_id(&event), "system");
}
#[tokio::test]
async fn test_attach_audit_trail_forwards_events() {
let bus = EventBus::new(64);
let audit = Arc::new(AuditTrail::new(1000));
bus.attach_audit_trail(audit.clone());
bus.publish(KernelEvent::AgentCreated {
id: AgentId::new_v4(),
name: "audit-test".to_string(),
})
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
assert!(
audit.len() >= 1,
"audit trail should have recorded the event"
);
}
}