llm_memory_graph/observatory/
emitter.rs1use super::events::MemoryGraphEvent;
42use super::publisher::EventPublisher;
43use std::sync::atomic::{AtomicU64, Ordering};
44use std::sync::Arc;
45use tokio::sync::RwLock;
46
47#[derive(Clone)]
53pub struct AsyncEventEmitter<P: EventPublisher + 'static> {
54 publisher: Arc<P>,
56 stats: Arc<EmissionStats>,
58 log_errors: bool,
60}
61
62impl<P: EventPublisher + 'static> AsyncEventEmitter<P> {
63 pub fn new(publisher: Arc<P>) -> Self {
79 Self {
80 publisher,
81 stats: Arc::new(EmissionStats::new()),
82 log_errors: true,
83 }
84 }
85
86 pub fn new_silent(publisher: Arc<P>) -> Self {
88 Self {
89 publisher,
90 stats: Arc::new(EmissionStats::new()),
91 log_errors: false,
92 }
93 }
94
95 pub fn emit(&self, event: MemoryGraphEvent) {
124 let publisher = Arc::clone(&self.publisher);
125 let stats = Arc::clone(&self.stats);
126 let log_errors = self.log_errors;
127
128 tokio::spawn(async move {
129 stats.inc_submitted();
130
131 match publisher.publish(event).await {
132 Ok(()) => {
133 stats.inc_emitted();
134 }
135 Err(e) => {
136 stats.inc_failed();
137 if log_errors {
138 tracing::warn!("Failed to emit event: {}", e);
139 }
140 }
141 }
142 });
143 }
144
145 pub fn emit_batch(&self, events: Vec<MemoryGraphEvent>) {
151 let publisher = Arc::clone(&self.publisher);
152 let stats = Arc::clone(&self.stats);
153 let log_errors = self.log_errors;
154 let count = events.len() as u64;
155
156 tokio::spawn(async move {
157 stats.inc_submitted_by(count);
158
159 match publisher.publish_batch(events).await {
160 Ok(()) => {
161 stats.inc_emitted_by(count);
162 }
163 Err(e) => {
164 stats.inc_failed_by(count);
165 if log_errors {
166 tracing::warn!("Failed to emit event batch: {}", e);
167 }
168 }
169 }
170 });
171 }
172
173 pub async fn emit_sync(&self, event: MemoryGraphEvent) -> crate::error::Result<()> {
186 self.stats.inc_submitted();
187
188 match self.publisher.publish(event).await {
189 Ok(()) => {
190 self.stats.inc_emitted();
191 Ok(())
192 }
193 Err(e) => {
194 self.stats.inc_failed();
195 if self.log_errors {
196 tracing::warn!("Failed to emit event: {}", e);
197 }
198 Err(e)
199 }
200 }
201 }
202
203 pub async fn stats(&self) -> EmissionStatsSnapshot {
209 self.stats.snapshot().await
210 }
211
212 pub async fn reset_stats(&self) {
214 self.stats.reset().await;
215 }
216
217 pub fn publisher(&self) -> &Arc<P> {
219 &self.publisher
220 }
221}
222
223struct EmissionStats {
225 events_submitted: AtomicU64,
227 events_emitted: AtomicU64,
229 events_failed: AtomicU64,
231 peak_concurrent: RwLock<u64>,
233}
234
235impl EmissionStats {
236 fn new() -> Self {
237 Self {
238 events_submitted: AtomicU64::new(0),
239 events_emitted: AtomicU64::new(0),
240 events_failed: AtomicU64::new(0),
241 peak_concurrent: RwLock::new(0),
242 }
243 }
244
245 fn inc_submitted(&self) {
246 self.events_submitted.fetch_add(1, Ordering::Relaxed);
247 }
248
249 fn inc_submitted_by(&self, count: u64) {
250 self.events_submitted.fetch_add(count, Ordering::Relaxed);
251 }
252
253 fn inc_emitted(&self) {
254 self.events_emitted.fetch_add(1, Ordering::Relaxed);
255 }
256
257 fn inc_emitted_by(&self, count: u64) {
258 self.events_emitted.fetch_add(count, Ordering::Relaxed);
259 }
260
261 fn inc_failed(&self) {
262 self.events_failed.fetch_add(1, Ordering::Relaxed);
263 }
264
265 fn inc_failed_by(&self, count: u64) {
266 self.events_failed.fetch_add(count, Ordering::Relaxed);
267 }
268
269 async fn snapshot(&self) -> EmissionStatsSnapshot {
270 EmissionStatsSnapshot {
271 events_submitted: self.events_submitted.load(Ordering::Relaxed),
272 events_emitted: self.events_emitted.load(Ordering::Relaxed),
273 events_failed: self.events_failed.load(Ordering::Relaxed),
274 peak_concurrent: *self.peak_concurrent.read().await,
275 }
276 }
277
278 async fn reset(&self) {
279 self.events_submitted.store(0, Ordering::Relaxed);
280 self.events_emitted.store(0, Ordering::Relaxed);
281 self.events_failed.store(0, Ordering::Relaxed);
282 *self.peak_concurrent.write().await = 0;
283 }
284}
285
286#[derive(Debug, Clone, Copy, PartialEq, Eq)]
288pub struct EmissionStatsSnapshot {
289 pub events_submitted: u64,
291 pub events_emitted: u64,
293 pub events_failed: u64,
295 pub peak_concurrent: u64,
297}
298
299impl EmissionStatsSnapshot {
300 pub fn success_rate(&self) -> f64 {
302 if self.events_submitted == 0 {
303 100.0
304 } else {
305 (self.events_emitted as f64 / self.events_submitted as f64) * 100.0
306 }
307 }
308
309 pub fn failure_rate(&self) -> f64 {
311 if self.events_submitted == 0 {
312 0.0
313 } else {
314 (self.events_failed as f64 / self.events_submitted as f64) * 100.0
315 }
316 }
317}
318
319#[cfg(test)]
320mod tests {
321 use super::*;
322 use crate::observatory::publisher::InMemoryPublisher;
323 use crate::types::{NodeId, NodeType, SessionId};
324 use chrono::Utc;
325 use std::collections::HashMap;
326 use tokio::time::{sleep, Duration};
327
328 #[tokio::test]
329 async fn test_emitter_creation() {
330 let publisher = Arc::new(InMemoryPublisher::new());
331 let emitter = AsyncEventEmitter::new(publisher.clone());
332
333 let stats = emitter.stats().await;
334 assert_eq!(stats.events_submitted, 0);
335 assert_eq!(stats.events_emitted, 0);
336 assert_eq!(stats.events_failed, 0);
337 }
338
339 #[tokio::test]
340 async fn test_emit_single_event() {
341 let publisher = Arc::new(InMemoryPublisher::new());
342 let emitter = AsyncEventEmitter::new(publisher.clone());
343
344 let event = MemoryGraphEvent::NodeCreated {
345 node_id: NodeId::new(),
346 node_type: NodeType::Prompt,
347 session_id: Some(SessionId::new()),
348 timestamp: Utc::now(),
349 metadata: HashMap::new(),
350 };
351
352 emitter.emit(event);
353
354 sleep(Duration::from_millis(50)).await;
356
357 let stats = emitter.stats().await;
358 assert_eq!(stats.events_submitted, 1);
359 assert_eq!(stats.events_emitted, 1);
360 assert_eq!(stats.events_failed, 0);
361
362 let published = publisher.get_events().await;
364 assert_eq!(published.len(), 1);
365 }
366
367 #[tokio::test]
368 async fn test_emit_multiple_events() {
369 let publisher = Arc::new(InMemoryPublisher::new());
370 let emitter = AsyncEventEmitter::new(publisher.clone());
371
372 for _ in 0..10 {
373 let event = MemoryGraphEvent::QueryExecuted {
374 query_type: "test".to_string(),
375 results_count: 5,
376 duration_ms: 10,
377 timestamp: Utc::now(),
378 };
379 emitter.emit(event);
380 }
381
382 sleep(Duration::from_millis(100)).await;
384
385 let stats = emitter.stats().await;
386 assert_eq!(stats.events_submitted, 10);
387 assert_eq!(stats.events_emitted, 10);
388 assert_eq!(stats.events_failed, 0);
389
390 let published = publisher.get_events().await;
391 assert_eq!(published.len(), 10);
392 }
393
394 #[tokio::test]
395 async fn test_emit_batch() {
396 let publisher = Arc::new(InMemoryPublisher::new());
397 let emitter = AsyncEventEmitter::new(publisher.clone());
398
399 let events = vec![
400 MemoryGraphEvent::NodeCreated {
401 node_id: NodeId::new(),
402 node_type: NodeType::Prompt,
403 session_id: None,
404 timestamp: Utc::now(),
405 metadata: HashMap::new(),
406 },
407 MemoryGraphEvent::NodeCreated {
408 node_id: NodeId::new(),
409 node_type: NodeType::Response,
410 session_id: None,
411 timestamp: Utc::now(),
412 metadata: HashMap::new(),
413 },
414 MemoryGraphEvent::QueryExecuted {
415 query_type: "batch".to_string(),
416 results_count: 2,
417 duration_ms: 15,
418 timestamp: Utc::now(),
419 },
420 ];
421
422 emitter.emit_batch(events);
423
424 sleep(Duration::from_millis(50)).await;
426
427 let stats = emitter.stats().await;
428 assert_eq!(stats.events_submitted, 3);
429 assert_eq!(stats.events_emitted, 3);
430
431 let published = publisher.get_events().await;
432 assert_eq!(published.len(), 3);
433 }
434
435 #[tokio::test]
436 async fn test_emit_sync() {
437 let publisher = Arc::new(InMemoryPublisher::new());
438 let emitter = AsyncEventEmitter::new(publisher.clone());
439
440 let event = MemoryGraphEvent::NodeCreated {
441 node_id: NodeId::new(),
442 node_type: NodeType::Prompt,
443 session_id: None,
444 timestamp: Utc::now(),
445 metadata: HashMap::new(),
446 };
447
448 emitter.emit_sync(event).await.unwrap();
450
451 let stats = emitter.stats().await;
452 assert_eq!(stats.events_submitted, 1);
453 assert_eq!(stats.events_emitted, 1);
454
455 let published = publisher.get_events().await;
456 assert_eq!(published.len(), 1);
457 }
458
459 #[tokio::test]
460 async fn test_concurrent_emission() {
461 let publisher = Arc::new(InMemoryPublisher::new());
462 let emitter = AsyncEventEmitter::new(publisher.clone());
463
464 let mut handles = vec![];
465
466 for i in 0..50 {
467 let emitter_clone = emitter.clone();
468 let handle = tokio::spawn(async move {
469 let event = MemoryGraphEvent::QueryExecuted {
470 query_type: format!("query_{}", i),
471 results_count: i,
472 duration_ms: 10,
473 timestamp: Utc::now(),
474 };
475 emitter_clone.emit(event);
476 });
477 handles.push(handle);
478 }
479
480 for handle in handles {
481 handle.await.unwrap();
482 }
483
484 sleep(Duration::from_millis(200)).await;
486
487 let stats = emitter.stats().await;
488 assert_eq!(stats.events_submitted, 50);
489 assert_eq!(stats.events_emitted, 50);
490
491 let published = publisher.get_events().await;
492 assert_eq!(published.len(), 50);
493 }
494
495 #[tokio::test]
496 async fn test_stats_snapshot() {
497 let publisher = Arc::new(InMemoryPublisher::new());
498 let emitter = AsyncEventEmitter::new(publisher);
499
500 let event = MemoryGraphEvent::NodeCreated {
501 node_id: NodeId::new(),
502 node_type: NodeType::Prompt,
503 session_id: None,
504 timestamp: Utc::now(),
505 metadata: HashMap::new(),
506 };
507
508 emitter.emit(event);
509 sleep(Duration::from_millis(50)).await;
510
511 let stats = emitter.stats().await;
512 assert_eq!(stats.success_rate(), 100.0);
513 assert_eq!(stats.failure_rate(), 0.0);
514 }
515
516 #[tokio::test]
517 async fn test_reset_stats() {
518 let publisher = Arc::new(InMemoryPublisher::new());
519 let emitter = AsyncEventEmitter::new(publisher);
520
521 let event = MemoryGraphEvent::QueryExecuted {
522 query_type: "test".to_string(),
523 results_count: 1,
524 duration_ms: 10,
525 timestamp: Utc::now(),
526 };
527
528 emitter.emit(event);
529 sleep(Duration::from_millis(50)).await;
530
531 let stats_before = emitter.stats().await;
532 assert_eq!(stats_before.events_emitted, 1);
533
534 emitter.reset_stats().await;
535
536 let stats_after = emitter.stats().await;
537 assert_eq!(stats_after.events_submitted, 0);
538 assert_eq!(stats_after.events_emitted, 0);
539 assert_eq!(stats_after.events_failed, 0);
540 }
541
542 #[tokio::test]
543 async fn test_silent_emitter() {
544 let publisher = Arc::new(InMemoryPublisher::new());
545 let emitter = AsyncEventEmitter::new_silent(publisher.clone());
546
547 let event = MemoryGraphEvent::NodeCreated {
548 node_id: NodeId::new(),
549 node_type: NodeType::Prompt,
550 session_id: None,
551 timestamp: Utc::now(),
552 metadata: HashMap::new(),
553 };
554
555 emitter.emit(event);
557 sleep(Duration::from_millis(50)).await;
558
559 let published = publisher.get_events().await;
560 assert_eq!(published.len(), 1);
561 }
562
563 #[tokio::test]
564 async fn test_mixed_emit_modes() {
565 let publisher = Arc::new(InMemoryPublisher::new());
566 let emitter = AsyncEventEmitter::new(publisher.clone());
567
568 let event1 = MemoryGraphEvent::NodeCreated {
570 node_id: NodeId::new(),
571 node_type: NodeType::Prompt,
572 session_id: None,
573 timestamp: Utc::now(),
574 metadata: HashMap::new(),
575 };
576
577 let event2 = MemoryGraphEvent::QueryExecuted {
578 query_type: "test".to_string(),
579 results_count: 1,
580 duration_ms: 10,
581 timestamp: Utc::now(),
582 };
583
584 emitter.emit(event1);
585 emitter.emit_sync(event2).await.unwrap();
586
587 sleep(Duration::from_millis(50)).await;
588
589 let stats = emitter.stats().await;
590 assert_eq!(stats.events_submitted, 2);
591 assert_eq!(stats.events_emitted, 2);
592
593 let published = publisher.get_events().await;
594 assert_eq!(published.len(), 2);
595 }
596
597 #[tokio::test]
598 async fn test_high_throughput() {
599 let publisher = Arc::new(InMemoryPublisher::new());
600 let emitter = AsyncEventEmitter::new(publisher.clone());
601
602 for i in 0..1000 {
604 let event = MemoryGraphEvent::QueryExecuted {
605 query_type: format!("query_{}", i),
606 results_count: i,
607 duration_ms: 1,
608 timestamp: Utc::now(),
609 };
610 emitter.emit(event);
611 }
612
613 sleep(Duration::from_millis(500)).await;
615
616 let stats = emitter.stats().await;
617 assert_eq!(stats.events_submitted, 1000);
618 assert_eq!(stats.events_emitted, 1000);
619 assert_eq!(stats.events_failed, 0);
620
621 let published = publisher.get_events().await;
622 assert_eq!(published.len(), 1000);
623 }
624
625 #[tokio::test]
626 async fn test_success_failure_rates() {
627 let publisher = Arc::new(InMemoryPublisher::new());
628 let emitter = AsyncEventEmitter::new(publisher);
629
630 for _ in 0..10 {
632 let event = MemoryGraphEvent::NodeCreated {
633 node_id: NodeId::new(),
634 node_type: NodeType::Prompt,
635 session_id: None,
636 timestamp: Utc::now(),
637 metadata: HashMap::new(),
638 };
639 emitter.emit(event);
640 }
641
642 sleep(Duration::from_millis(100)).await;
643
644 let stats = emitter.stats().await;
645 assert_eq!(stats.success_rate(), 100.0);
646 assert_eq!(stats.failure_rate(), 0.0);
647 }
648}