1use pulsedb::{ExperienceId, InsightId, RelationId};
13use serde::{Deserialize, Serialize};
14use tokio::sync::broadcast;
15
16use crate::agent::{AgentKindTag, AgentOutcome};
17
18pub fn now_ms() -> u64 {
22 std::time::SystemTime::now()
23 .duration_since(std::time::UNIX_EPOCH)
24 .unwrap_or_default()
25 .as_millis() as u64
26}
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
39#[serde(tag = "type", rename_all = "snake_case")]
40pub enum HiveEvent {
41 AgentStarted {
44 timestamp_ms: u64,
45 agent_id: String,
46 name: String,
47 kind: AgentKindTag,
48 },
49 AgentCompleted {
51 timestamp_ms: u64,
52 agent_id: String,
53 outcome: AgentOutcome,
54 },
55
56 LlmCallStarted {
59 timestamp_ms: u64,
60 agent_id: String,
61 model: String,
62 message_count: usize,
63 },
64 LlmCallCompleted {
66 timestamp_ms: u64,
67 agent_id: String,
68 model: String,
69 duration_ms: u64,
70 input_tokens: u32,
72 output_tokens: u32,
74 },
75 LlmTokenStreamed {
77 timestamp_ms: u64,
78 agent_id: String,
79 token: String,
80 },
81
82 ToolCallStarted {
85 timestamp_ms: u64,
86 agent_id: String,
87 tool_name: String,
88 params: String,
90 },
91 ToolCallCompleted {
93 timestamp_ms: u64,
94 agent_id: String,
95 tool_name: String,
96 duration_ms: u64,
97 result_preview: String,
99 },
100 ToolApprovalRequested {
102 timestamp_ms: u64,
103 agent_id: String,
104 tool_name: String,
105 description: String,
106 },
107
108 ExperienceRecorded {
111 timestamp_ms: u64,
112 experience_id: ExperienceId,
113 agent_id: String,
114 content_preview: String,
116 experience_type: String,
118 importance: f32,
120 },
121 RelationshipInferred {
123 timestamp_ms: u64,
124 relation_id: RelationId,
125 agent_id: String,
127 },
128 InsightGenerated {
130 timestamp_ms: u64,
131 insight_id: InsightId,
132 source_count: usize,
133 agent_id: String,
135 },
136
137 SubstratePerceived {
140 timestamp_ms: u64,
141 agent_id: String,
142 experience_count: usize,
143 insight_count: usize,
144 },
145
146 EmbeddingComputed {
149 timestamp_ms: u64,
150 agent_id: String,
151 dimensions: usize,
152 duration_ms: u64,
153 },
154
155 WatchNotification {
162 timestamp_ms: u64,
163 experience_id: ExperienceId,
164 collective_id: pulsedb::CollectiveId,
165 event_type: String,
167 },
168}
169
170#[derive(Clone)]
177pub struct EventEmitter {
178 sender: broadcast::Sender<HiveEvent>,
179}
180
181impl EventEmitter {
182 pub fn new(capacity: usize) -> Self {
184 let (sender, _) = broadcast::channel(capacity);
185 Self { sender }
186 }
187
188 pub fn emit(&self, event: HiveEvent) {
191 let _ = self.sender.send(event);
192 }
193
194 pub fn subscribe(&self) -> broadcast::Receiver<HiveEvent> {
196 self.sender.subscribe()
197 }
198}
199
200impl Default for EventEmitter {
201 fn default() -> Self {
202 Self::new(256)
203 }
204}
205
206impl std::fmt::Debug for EventEmitter {
207 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
208 f.debug_struct("EventEmitter")
209 .field("subscriber_count", &self.sender.receiver_count())
210 .finish()
211 }
212}
213
214pub type EventBus = EventEmitter;
217
218#[cfg(test)]
219mod tests {
220 use super::*;
221
222 #[test]
223 fn test_hive_event_is_debug_clone() {
224 let event = HiveEvent::AgentStarted {
225 timestamp_ms: now_ms(),
226 agent_id: "a1".into(),
227 name: "researcher".into(),
228 kind: AgentKindTag::Llm,
229 };
230 let cloned = event.clone();
231 let debug = format!("{:?}", cloned);
232 assert!(debug.contains("researcher"));
233 }
234
235 #[test]
236 fn test_hive_event_serializes_to_json() {
237 let event = HiveEvent::LlmCallCompleted {
238 timestamp_ms: 1711500000000,
239 agent_id: "agent-1".into(),
240 model: "gpt-4".into(),
241 duration_ms: 1500,
242 input_tokens: 200,
243 output_tokens: 50,
244 };
245 let json = serde_json::to_string(&event).unwrap();
246 assert!(json.contains("\"type\":\"llm_call_completed\""));
247 assert!(json.contains("\"input_tokens\":200"));
248 assert!(json.contains("\"output_tokens\":50"));
249
250 let deserialized: HiveEvent = serde_json::from_str(&json).unwrap();
252 assert!(matches!(
253 deserialized,
254 HiveEvent::LlmCallCompleted {
255 input_tokens: 200,
256 output_tokens: 50,
257 ..
258 }
259 ));
260 }
261
262 #[test]
263 fn test_hive_event_serialize_tool_call() {
264 let event = HiveEvent::ToolCallStarted {
265 timestamp_ms: now_ms(),
266 agent_id: "a1".into(),
267 tool_name: "search".into(),
268 params: r#"{"query":"test"}"#.into(),
269 };
270 let json = serde_json::to_string(&event).unwrap();
271 assert!(json.contains("\"params\""));
272 assert!(json.contains("\"tool_call_started\""));
273 }
274
275 #[tokio::test]
276 async fn test_event_emitter_send_receive() {
277 let emitter = EventEmitter::new(16);
278 let mut rx = emitter.subscribe();
279
280 emitter.emit(HiveEvent::AgentStarted {
281 timestamp_ms: now_ms(),
282 agent_id: "a1".into(),
283 name: "test".into(),
284 kind: AgentKindTag::Llm,
285 });
286
287 let event = rx.recv().await.unwrap();
288 assert!(matches!(event, HiveEvent::AgentStarted { agent_id, .. } if agent_id == "a1"));
289 }
290
291 #[tokio::test]
292 async fn test_event_emitter_multiple_subscribers() {
293 let emitter = EventEmitter::new(16);
294 let mut rx1 = emitter.subscribe();
295 let mut rx2 = emitter.subscribe();
296
297 emitter.emit(HiveEvent::ToolCallStarted {
298 timestamp_ms: now_ms(),
299 agent_id: "a1".into(),
300 tool_name: "search".into(),
301 params: "{}".into(),
302 });
303
304 let e1 = rx1.recv().await.unwrap();
305 let e2 = rx2.recv().await.unwrap();
306 assert!(matches!(e1, HiveEvent::ToolCallStarted { .. }));
307 assert!(matches!(e2, HiveEvent::ToolCallStarted { .. }));
308 }
309
310 #[test]
311 fn test_event_emitter_no_subscribers_no_panic() {
312 let emitter = EventEmitter::new(16);
313 emitter.emit(HiveEvent::ExperienceRecorded {
314 timestamp_ms: now_ms(),
315 experience_id: ExperienceId::new(),
316 agent_id: "a1".into(),
317 content_preview: "test".into(),
318 experience_type: "Generic".into(),
319 importance: 0.5,
320 });
321 }
322
323 #[test]
324 fn test_event_emitter_clone_is_cheap() {
325 let emitter = EventEmitter::default();
326 let cloned = emitter.clone();
327 let mut rx = cloned.subscribe();
328 emitter.emit(HiveEvent::SubstratePerceived {
329 timestamp_ms: now_ms(),
330 agent_id: "a1".into(),
331 experience_count: 10,
332 insight_count: 2,
333 });
334 assert!(rx.try_recv().is_ok());
335 }
336
337 #[test]
338 fn test_event_emitter_debug() {
339 let emitter = EventEmitter::default();
340 let debug = format!("{:?}", emitter);
341 assert!(debug.contains("EventEmitter"));
342 }
343
344 #[test]
345 fn test_all_event_variants_clone() {
346 let events: Vec<HiveEvent> = vec![
347 HiveEvent::AgentStarted {
348 timestamp_ms: 0,
349 agent_id: "a".into(),
350 name: "n".into(),
351 kind: AgentKindTag::Llm,
352 },
353 HiveEvent::AgentCompleted {
354 timestamp_ms: 0,
355 agent_id: "a".into(),
356 outcome: AgentOutcome::Complete {
357 response: "done".into(),
358 },
359 },
360 HiveEvent::LlmCallStarted {
361 timestamp_ms: 0,
362 agent_id: "a".into(),
363 model: "gpt-4".into(),
364 message_count: 3,
365 },
366 HiveEvent::LlmCallCompleted {
367 timestamp_ms: 0,
368 agent_id: "a".into(),
369 model: "gpt-4".into(),
370 duration_ms: 1500,
371 input_tokens: 100,
372 output_tokens: 50,
373 },
374 HiveEvent::LlmTokenStreamed {
375 timestamp_ms: 0,
376 agent_id: "a".into(),
377 token: "hello".into(),
378 },
379 HiveEvent::ToolCallStarted {
380 timestamp_ms: 0,
381 agent_id: "a".into(),
382 tool_name: "search".into(),
383 params: "{}".into(),
384 },
385 HiveEvent::ToolCallCompleted {
386 timestamp_ms: 0,
387 agent_id: "a".into(),
388 tool_name: "search".into(),
389 duration_ms: 200,
390 result_preview: "found it".into(),
391 },
392 HiveEvent::ToolApprovalRequested {
393 timestamp_ms: 0,
394 agent_id: "a".into(),
395 tool_name: "delete".into(),
396 description: "Delete file".into(),
397 },
398 HiveEvent::ExperienceRecorded {
399 timestamp_ms: 0,
400 experience_id: ExperienceId::new(),
401 agent_id: "a".into(),
402 content_preview: "test".into(),
403 experience_type: "Generic".into(),
404 importance: 0.5,
405 },
406 HiveEvent::RelationshipInferred {
407 timestamp_ms: 0,
408 relation_id: RelationId::new(),
409 agent_id: "a".into(),
410 },
411 HiveEvent::InsightGenerated {
412 timestamp_ms: 0,
413 insight_id: InsightId::new(),
414 source_count: 5,
415 agent_id: "a".into(),
416 },
417 HiveEvent::SubstratePerceived {
418 timestamp_ms: 0,
419 agent_id: "a".into(),
420 experience_count: 10,
421 insight_count: 2,
422 },
423 HiveEvent::EmbeddingComputed {
424 timestamp_ms: 0,
425 agent_id: "a".into(),
426 dimensions: 384,
427 duration_ms: 100,
428 },
429 HiveEvent::WatchNotification {
430 timestamp_ms: 0,
431 experience_id: ExperienceId::new(),
432 collective_id: pulsedb::CollectiveId::new(),
433 event_type: "Created".into(),
434 },
435 ];
436 let _cloned: Vec<HiveEvent> = events.to_vec();
437 assert_eq!(events.len(), 14);
438 }
439
440 #[test]
441 fn test_now_ms_returns_nonzero() {
442 let ts = now_ms();
443 assert!(ts > 0, "Timestamp should be non-zero");
444 assert!(ts > 1_700_000_000_000, "Timestamp should be after 2023");
445 }
446}