1use anyhow::Result;
7use serde::{Deserialize, Serialize};
8use std::sync::Arc;
9use tokio::sync::broadcast;
10
11use crate::audit_trail::{AuditAction, AuditTrail};
12use crate::types::AgentId;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16pub enum KernelEvent {
17 AgentCreated {
19 id: AgentId,
21 name: String,
23 },
24 AgentStarted {
26 id: AgentId,
28 },
29 AgentStopped {
31 id: AgentId,
33 },
34 AgentFailed {
36 id: AgentId,
38 error: String,
40 },
41 MessageReceived {
43 from: AgentId,
45 content: String,
47 },
48 SeedCreated {
50 seed_id: uuid::Uuid,
52 },
53 EvaluationComplete {
55 seed_id: uuid::Uuid,
57 passed: bool,
59 },
60 PhaseStarted {
62 session_id: String,
64 phase: oxios_ouroboros::Phase,
66 },
67 PhaseCompleted {
69 session_id: String,
71 phase: oxios_ouroboros::Phase,
73 result_summary: String,
75 },
76 AgentOutput {
78 session_id: String,
80 agent_id: AgentId,
82 output: String,
84 },
85 ApprovalRequested {
87 id: uuid::Uuid,
89 action: String,
91 resource: String,
93 reason: String,
95 },
96 ApprovalResolved {
98 id: uuid::Uuid,
100 approved: bool,
102 },
103 MemoryStored {
105 id: String,
107 memory_type: String,
109 source: String,
111 },
112 MemoryRecalled {
114 query: String,
116 count: usize,
118 },
119 AgentGroupCreated {
121 group_id: uuid::Uuid,
123 agent_count: usize,
125 },
126 AgentGroupMemberCompleted {
128 group_id: uuid::Uuid,
130 agent_id: uuid::Uuid,
132 success: bool,
134 },
135 SpaceCreated {
137 space_id: uuid::Uuid,
139 name: String,
141 source: String,
143 },
144 SpaceActivated {
146 space_id: uuid::Uuid,
148 name: String,
150 },
151 SpaceArchived {
153 space_id: uuid::Uuid,
155 name: String,
157 },
158 KnowledgeCrossReferenced {
160 from_space: uuid::Uuid,
162 to_space: uuid::Uuid,
164 entries: usize,
166 flow: String,
168 },
169 SpacesMerged {
171 survivor: uuid::Uuid,
173 absorbed: uuid::Uuid,
175 entries_migrated: usize,
177 },
178}
179
180pub fn kernel_event_to_audit_action(event: &KernelEvent) -> AuditAction {
182 match event {
183 KernelEvent::AgentCreated { name, .. } => AuditAction::AgentSpawn {
184 task_type: name.clone(),
185 },
186 KernelEvent::AgentStarted { .. } => AuditAction::AgentSpawn {
187 task_type: "started".to_string(),
188 },
189 KernelEvent::AgentStopped { .. } => AuditAction::AgentExit {
190 reason: "stopped".to_string(),
191 },
192 KernelEvent::AgentFailed { error, .. } => AuditAction::AgentExit {
193 reason: error.clone(),
194 },
195 KernelEvent::MessageReceived { content, .. } => AuditAction::Other {
196 detail: format!("message: {}", content),
197 },
198 KernelEvent::SeedCreated { seed_id, .. } => AuditAction::Other {
199 detail: format!("seed_created:{}", seed_id),
200 },
201 KernelEvent::EvaluationComplete { seed_id, passed } => AuditAction::Other {
202 detail: format!("evaluation:{}:{}", seed_id, passed),
203 },
204 KernelEvent::PhaseStarted { session_id, phase } => AuditAction::Other {
205 detail: format!("phase_started:{}:{}", session_id, phase),
206 },
207 KernelEvent::PhaseCompleted {
208 session_id,
209 phase,
210 result_summary,
211 } => AuditAction::Other {
212 detail: format!(
213 "phase_completed:{}:{}:{}",
214 session_id, phase, result_summary
215 ),
216 },
217 KernelEvent::AgentOutput { output, .. } => AuditAction::Other {
218 detail: format!("agent_output:{}", output),
219 },
220 KernelEvent::ApprovalRequested {
221 id,
222 action,
223 resource,
224 reason: _,
225 } => AuditAction::Other {
226 detail: format!("approval_requested:{}:{}:{}", id, action, resource),
227 },
228 KernelEvent::ApprovalResolved { id, approved } => AuditAction::Other {
229 detail: format!("approval_resolved:{}:{}", id, approved),
230 },
231 KernelEvent::MemoryStored {
232 id, memory_type, ..
233 } => AuditAction::MemoryWrite {
234 entry_id: format!("{}:{}", id, memory_type),
235 },
236 KernelEvent::MemoryRecalled { query, count } => AuditAction::MemoryRead {
237 entry_id: format!("query:{}:{}results", query, count),
238 },
239 KernelEvent::AgentGroupCreated {
240 group_id,
241 agent_count,
242 } => AuditAction::Other {
243 detail: format!("group_created:{}:{}agents", group_id, agent_count),
244 },
245 KernelEvent::AgentGroupMemberCompleted {
246 group_id,
247 agent_id,
248 success,
249 } => AuditAction::Other {
250 detail: format!(
251 "group_member_completed:{}:{}:{}",
252 group_id, agent_id, success
253 ),
254 },
255 KernelEvent::SpaceCreated {
256 space_id,
257 name,
258 source,
259 } => AuditAction::Other {
260 detail: format!("space_created:{}:{}:{}", space_id, name, source),
261 },
262 KernelEvent::SpaceActivated { space_id, name } => AuditAction::Other {
263 detail: format!("space_activated:{}:{}", space_id, name),
264 },
265 KernelEvent::SpaceArchived { space_id, name } => AuditAction::Other {
266 detail: format!("space_archived:{}:{}", space_id, name),
267 },
268 KernelEvent::KnowledgeCrossReferenced {
269 from_space,
270 to_space,
271 entries,
272 flow,
273 } => AuditAction::Other {
274 detail: format!(
275 "knowledge_xref:{}->{}:{}:{}entries",
276 from_space, to_space, flow, entries
277 ),
278 },
279 KernelEvent::SpacesMerged {
280 survivor,
281 absorbed,
282 entries_migrated,
283 } => AuditAction::Other {
284 detail: format!(
285 "spaces_merged:{}<-{}:{}entries",
286 survivor, absorbed, entries_migrated
287 ),
288 },
289 }
290}
291
292fn extract_agent_id(event: &KernelEvent) -> String {
294 match event {
295 KernelEvent::AgentCreated { id, .. } => id.to_string(),
296 KernelEvent::AgentStarted { id, .. } => id.to_string(),
297 KernelEvent::AgentStopped { id, .. } => id.to_string(),
298 KernelEvent::AgentFailed { id, .. } => id.to_string(),
299 KernelEvent::MessageReceived { from, .. } => from.to_string(),
300 KernelEvent::AgentOutput { agent_id, .. } => agent_id.to_string(),
301 KernelEvent::AgentGroupMemberCompleted { agent_id, .. } => agent_id.to_string(),
302 KernelEvent::SpaceActivated { space_id, .. } => format!("space:{}", space_id),
303 _ => "system".to_string(),
304 }
305}
306
307#[derive(Clone)]
312pub struct EventBus {
313 sender: broadcast::Sender<KernelEvent>,
314}
315
316impl EventBus {
317 pub fn new(capacity: usize) -> Self {
329 let (sender, _) = broadcast::channel(capacity);
330 Self { sender }
331 }
332
333 pub fn subscribe(&self) -> broadcast::Receiver<KernelEvent> {
335 self.sender.subscribe()
336 }
337
338 pub fn publish(&self, event: KernelEvent) -> Result<()> {
340 let _ = self.sender.send(event);
342 Ok(())
343 }
344
345 pub fn attach_audit_trail(&self, audit: Arc<AuditTrail>) {
348 let mut rx = self.subscribe();
349 tokio::spawn(async move {
350 while let Ok(event) = rx.recv().await {
351 let actor = extract_agent_id(&event);
352 let action = kernel_event_to_audit_action(&event);
353 let resource = format!("{:?}", event);
354 audit.append(actor, action, resource);
355 }
356 });
357 }
358}
359
360impl std::fmt::Debug for EventBus {
361 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
362 f.debug_struct("EventBus").finish()
363 }
364}