1use std::sync::Arc;
13
14use pulsedb::{ExperienceId, InsightId, RelationId};
15use serde::{Deserialize, Serialize};
16use tokio::sync::broadcast;
17
18use crate::agent::{AgentKindTag, AgentOutcome};
19use crate::export::EventExporter;
20
21pub fn now_ms() -> u64 {
25 std::time::SystemTime::now()
26 .duration_since(std::time::UNIX_EPOCH)
27 .unwrap_or_default()
28 .as_millis() as u64
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
42#[serde(tag = "type", rename_all = "snake_case")]
43pub enum HiveEvent {
44 AgentStarted {
47 timestamp_ms: u64,
48 agent_id: String,
49 name: String,
50 kind: AgentKindTag,
51 },
52 AgentCompleted {
54 timestamp_ms: u64,
55 agent_id: String,
56 outcome: AgentOutcome,
57 },
58
59 LlmCallStarted {
62 timestamp_ms: u64,
63 agent_id: String,
64 model: String,
65 message_count: usize,
66 },
67 LlmCallCompleted {
69 timestamp_ms: u64,
70 agent_id: String,
71 model: String,
72 duration_ms: u64,
73 input_tokens: u32,
75 output_tokens: u32,
77 },
78 LlmTokenStreamed {
80 timestamp_ms: u64,
81 agent_id: String,
82 token: String,
83 },
84
85 ToolCallStarted {
88 timestamp_ms: u64,
89 agent_id: String,
90 tool_name: String,
91 params: String,
93 },
94 ToolCallCompleted {
96 timestamp_ms: u64,
97 agent_id: String,
98 tool_name: String,
99 duration_ms: u64,
100 result_preview: String,
102 },
103 ToolApprovalRequested {
105 timestamp_ms: u64,
106 agent_id: String,
107 tool_name: String,
108 description: String,
109 },
110
111 ExperienceRecorded {
114 timestamp_ms: u64,
115 experience_id: ExperienceId,
116 agent_id: String,
117 content_preview: String,
119 experience_type: String,
121 importance: f32,
123 },
124 RelationshipInferred {
126 timestamp_ms: u64,
127 relation_id: RelationId,
128 agent_id: String,
130 },
131 InsightGenerated {
133 timestamp_ms: u64,
134 insight_id: InsightId,
135 source_count: usize,
136 agent_id: String,
138 },
139
140 SubstratePerceived {
143 timestamp_ms: u64,
144 agent_id: String,
145 experience_count: usize,
146 insight_count: usize,
147 },
148
149 EmbeddingComputed {
152 timestamp_ms: u64,
153 agent_id: String,
154 dimensions: usize,
155 duration_ms: u64,
156 },
157
158 WatchNotification {
165 timestamp_ms: u64,
166 experience_id: ExperienceId,
167 collective_id: pulsedb::CollectiveId,
168 event_type: String,
170 },
171}
172
173#[derive(Clone)]
180pub struct EventEmitter {
181 sender: broadcast::Sender<HiveEvent>,
182 exporter: Option<Arc<dyn EventExporter>>,
183}
184
185impl EventEmitter {
186 pub fn new(capacity: usize) -> Self {
188 let (sender, _) = broadcast::channel(capacity);
189 Self {
190 sender,
191 exporter: None,
192 }
193 }
194
195 pub fn with_exporter(capacity: usize, exporter: Arc<dyn EventExporter>) -> Self {
200 let (sender, _) = broadcast::channel(capacity);
201 Self {
202 sender,
203 exporter: Some(exporter),
204 }
205 }
206
207 pub fn emit(&self, event: HiveEvent) {
210 if let Some(exporter) = &self.exporter {
211 let exporter = Arc::clone(exporter);
212 let event_clone = event.clone();
213 tokio::spawn(async move {
214 exporter.export(&event_clone).await;
215 });
216 }
217 let _ = self.sender.send(event);
218 }
219
220 pub fn subscribe(&self) -> broadcast::Receiver<HiveEvent> {
222 self.sender.subscribe()
223 }
224}
225
226impl Default for EventEmitter {
227 fn default() -> Self {
228 Self::new(256)
229 }
230}
231
232impl std::fmt::Debug for EventEmitter {
233 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234 f.debug_struct("EventEmitter")
235 .field("subscriber_count", &self.sender.receiver_count())
236 .finish()
237 }
238}
239
240pub type EventBus = EventEmitter;
243
244#[cfg(test)]
245mod tests {
246 use super::*;
247
248 #[test]
249 fn test_hive_event_is_debug_clone() {
250 let event = HiveEvent::AgentStarted {
251 timestamp_ms: now_ms(),
252 agent_id: "a1".into(),
253 name: "researcher".into(),
254 kind: AgentKindTag::Llm,
255 };
256 let cloned = event.clone();
257 let debug = format!("{:?}", cloned);
258 assert!(debug.contains("researcher"));
259 }
260
261 #[test]
262 fn test_hive_event_serializes_to_json() {
263 let event = HiveEvent::LlmCallCompleted {
264 timestamp_ms: 1711500000000,
265 agent_id: "agent-1".into(),
266 model: "gpt-4".into(),
267 duration_ms: 1500,
268 input_tokens: 200,
269 output_tokens: 50,
270 };
271 let json = serde_json::to_string(&event).unwrap();
272 assert!(json.contains("\"type\":\"llm_call_completed\""));
273 assert!(json.contains("\"input_tokens\":200"));
274 assert!(json.contains("\"output_tokens\":50"));
275
276 let deserialized: HiveEvent = serde_json::from_str(&json).unwrap();
278 assert!(matches!(
279 deserialized,
280 HiveEvent::LlmCallCompleted {
281 input_tokens: 200,
282 output_tokens: 50,
283 ..
284 }
285 ));
286 }
287
288 #[test]
289 fn test_hive_event_serialize_tool_call() {
290 let event = HiveEvent::ToolCallStarted {
291 timestamp_ms: now_ms(),
292 agent_id: "a1".into(),
293 tool_name: "search".into(),
294 params: r#"{"query":"test"}"#.into(),
295 };
296 let json = serde_json::to_string(&event).unwrap();
297 assert!(json.contains("\"params\""));
298 assert!(json.contains("\"tool_call_started\""));
299 }
300
301 #[tokio::test]
302 async fn test_event_emitter_send_receive() {
303 let emitter = EventEmitter::new(16);
304 let mut rx = emitter.subscribe();
305
306 emitter.emit(HiveEvent::AgentStarted {
307 timestamp_ms: now_ms(),
308 agent_id: "a1".into(),
309 name: "test".into(),
310 kind: AgentKindTag::Llm,
311 });
312
313 let event = rx.recv().await.unwrap();
314 assert!(matches!(event, HiveEvent::AgentStarted { agent_id, .. } if agent_id == "a1"));
315 }
316
317 #[tokio::test]
318 async fn test_event_emitter_multiple_subscribers() {
319 let emitter = EventEmitter::new(16);
320 let mut rx1 = emitter.subscribe();
321 let mut rx2 = emitter.subscribe();
322
323 emitter.emit(HiveEvent::ToolCallStarted {
324 timestamp_ms: now_ms(),
325 agent_id: "a1".into(),
326 tool_name: "search".into(),
327 params: "{}".into(),
328 });
329
330 let e1 = rx1.recv().await.unwrap();
331 let e2 = rx2.recv().await.unwrap();
332 assert!(matches!(e1, HiveEvent::ToolCallStarted { .. }));
333 assert!(matches!(e2, HiveEvent::ToolCallStarted { .. }));
334 }
335
336 #[test]
337 fn test_event_emitter_no_subscribers_no_panic() {
338 let emitter = EventEmitter::new(16);
339 emitter.emit(HiveEvent::ExperienceRecorded {
340 timestamp_ms: now_ms(),
341 experience_id: ExperienceId::new(),
342 agent_id: "a1".into(),
343 content_preview: "test".into(),
344 experience_type: "Generic".into(),
345 importance: 0.5,
346 });
347 }
348
349 #[test]
350 fn test_event_emitter_clone_is_cheap() {
351 let emitter = EventEmitter::default();
352 let cloned = emitter.clone();
353 let mut rx = cloned.subscribe();
354 emitter.emit(HiveEvent::SubstratePerceived {
355 timestamp_ms: now_ms(),
356 agent_id: "a1".into(),
357 experience_count: 10,
358 insight_count: 2,
359 });
360 assert!(rx.try_recv().is_ok());
361 }
362
363 #[test]
364 fn test_event_emitter_debug() {
365 let emitter = EventEmitter::default();
366 let debug = format!("{:?}", emitter);
367 assert!(debug.contains("EventEmitter"));
368 }
369
370 #[test]
371 fn test_all_event_variants_clone() {
372 let events: Vec<HiveEvent> = vec![
373 HiveEvent::AgentStarted {
374 timestamp_ms: 0,
375 agent_id: "a".into(),
376 name: "n".into(),
377 kind: AgentKindTag::Llm,
378 },
379 HiveEvent::AgentCompleted {
380 timestamp_ms: 0,
381 agent_id: "a".into(),
382 outcome: AgentOutcome::Complete {
383 response: "done".into(),
384 },
385 },
386 HiveEvent::LlmCallStarted {
387 timestamp_ms: 0,
388 agent_id: "a".into(),
389 model: "gpt-4".into(),
390 message_count: 3,
391 },
392 HiveEvent::LlmCallCompleted {
393 timestamp_ms: 0,
394 agent_id: "a".into(),
395 model: "gpt-4".into(),
396 duration_ms: 1500,
397 input_tokens: 100,
398 output_tokens: 50,
399 },
400 HiveEvent::LlmTokenStreamed {
401 timestamp_ms: 0,
402 agent_id: "a".into(),
403 token: "hello".into(),
404 },
405 HiveEvent::ToolCallStarted {
406 timestamp_ms: 0,
407 agent_id: "a".into(),
408 tool_name: "search".into(),
409 params: "{}".into(),
410 },
411 HiveEvent::ToolCallCompleted {
412 timestamp_ms: 0,
413 agent_id: "a".into(),
414 tool_name: "search".into(),
415 duration_ms: 200,
416 result_preview: "found it".into(),
417 },
418 HiveEvent::ToolApprovalRequested {
419 timestamp_ms: 0,
420 agent_id: "a".into(),
421 tool_name: "delete".into(),
422 description: "Delete file".into(),
423 },
424 HiveEvent::ExperienceRecorded {
425 timestamp_ms: 0,
426 experience_id: ExperienceId::new(),
427 agent_id: "a".into(),
428 content_preview: "test".into(),
429 experience_type: "Generic".into(),
430 importance: 0.5,
431 },
432 HiveEvent::RelationshipInferred {
433 timestamp_ms: 0,
434 relation_id: RelationId::new(),
435 agent_id: "a".into(),
436 },
437 HiveEvent::InsightGenerated {
438 timestamp_ms: 0,
439 insight_id: InsightId::new(),
440 source_count: 5,
441 agent_id: "a".into(),
442 },
443 HiveEvent::SubstratePerceived {
444 timestamp_ms: 0,
445 agent_id: "a".into(),
446 experience_count: 10,
447 insight_count: 2,
448 },
449 HiveEvent::EmbeddingComputed {
450 timestamp_ms: 0,
451 agent_id: "a".into(),
452 dimensions: 384,
453 duration_ms: 100,
454 },
455 HiveEvent::WatchNotification {
456 timestamp_ms: 0,
457 experience_id: ExperienceId::new(),
458 collective_id: pulsedb::CollectiveId::new(),
459 event_type: "Created".into(),
460 },
461 ];
462 let _cloned: Vec<HiveEvent> = events.to_vec();
463 assert_eq!(events.len(), 14);
464 }
465
466 #[test]
467 fn test_now_ms_returns_nonzero() {
468 let ts = now_ms();
469 assert!(ts > 0, "Timestamp should be non-zero");
470 assert!(ts > 1_700_000_000_000, "Timestamp should be after 2023");
471 }
472}