llm_memory_graph/observatory/
emitter.rs

1//! Async event emitter for non-blocking event emission
2//!
3//! This module provides async event emission that doesn't block the main operation flow.
4//! Events are emitted in background tasks using `tokio::spawn`, ensuring that event
5//! publishing never delays critical operations.
6//!
7//! # Features
8//!
9//! - **Non-blocking**: Events are sent in background tasks
10//! - **Error resilience**: Emission errors don't affect main operations
11//! - **Statistics**: Track emission success/failure rates
12//! - **Graceful degradation**: Continues operating even if event system fails
13//!
14//! # Examples
15//!
16//! ```no_run
17//! use llm_memory_graph::observatory::{AsyncEventEmitter, InMemoryPublisher, MemoryGraphEvent};
18//! use std::sync::Arc;
19//!
20//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
21//! let publisher = Arc::new(InMemoryPublisher::new());
22//! let emitter = AsyncEventEmitter::new(publisher);
23//!
24//! // Emit event without blocking
25//! let event = MemoryGraphEvent::QueryExecuted {
26//!     query_type: "test".to_string(),
27//!     results_count: 10,
28//!     duration_ms: 50,
29//!     timestamp: chrono::Utc::now(),
30//! };
31//!
32//! emitter.emit(event);
33//!
34//! // Get statistics
35//! let stats = emitter.stats().await;
36//! println!("Emitted: {}, Failed: {}", stats.events_emitted, stats.events_failed);
37//! # Ok(())
38//! # }
39//! ```
40
41use super::events::MemoryGraphEvent;
42use super::publisher::EventPublisher;
43use std::sync::atomic::{AtomicU64, Ordering};
44use std::sync::Arc;
45use tokio::sync::RwLock;
46
47/// Async event emitter for non-blocking event emission
48///
49/// This struct wraps an EventPublisher and provides fire-and-forget
50/// emission semantics. Events are sent in background tasks using
51/// `tokio::spawn`, ensuring they never block the caller.
52#[derive(Clone)]
53pub struct AsyncEventEmitter<P: EventPublisher + 'static> {
54    /// The underlying event publisher
55    publisher: Arc<P>,
56    /// Statistics tracking
57    stats: Arc<EmissionStats>,
58    /// Whether to log errors
59    log_errors: bool,
60}
61
62impl<P: EventPublisher + 'static> AsyncEventEmitter<P> {
63    /// Create a new async event emitter
64    ///
65    /// # Arguments
66    ///
67    /// * `publisher` - The event publisher to use for sending events
68    ///
69    /// # Examples
70    ///
71    /// ```
72    /// use llm_memory_graph::observatory::{AsyncEventEmitter, InMemoryPublisher};
73    /// use std::sync::Arc;
74    ///
75    /// let publisher = Arc::new(InMemoryPublisher::new());
76    /// let emitter = AsyncEventEmitter::new(publisher);
77    /// ```
78    pub fn new(publisher: Arc<P>) -> Self {
79        Self {
80            publisher,
81            stats: Arc::new(EmissionStats::new()),
82            log_errors: true,
83        }
84    }
85
86    /// Create a new async event emitter without error logging
87    pub fn new_silent(publisher: Arc<P>) -> Self {
88        Self {
89            publisher,
90            stats: Arc::new(EmissionStats::new()),
91            log_errors: false,
92        }
93    }
94
95    /// Emit an event without blocking
96    ///
97    /// This method spawns a background task to publish the event and returns
98    /// immediately. Errors during emission are logged but don't affect the caller.
99    ///
100    /// # Arguments
101    ///
102    /// * `event` - The event to emit
103    ///
104    /// # Examples
105    ///
106    /// ```
107    /// # use llm_memory_graph::observatory::{AsyncEventEmitter, InMemoryPublisher, MemoryGraphEvent};
108    /// # use std::sync::Arc;
109    /// # async fn example() {
110    /// let publisher = Arc::new(InMemoryPublisher::new());
111    /// let emitter = AsyncEventEmitter::new(publisher);
112    ///
113    /// let event = MemoryGraphEvent::QueryExecuted {
114    ///     query_type: "test".to_string(),
115    ///     results_count: 10,
116    ///     duration_ms: 50,
117    ///     timestamp: chrono::Utc::now(),
118    /// };
119    ///
120    /// emitter.emit(event); // Returns immediately
121    /// # }
122    /// ```
123    pub fn emit(&self, event: MemoryGraphEvent) {
124        let publisher = Arc::clone(&self.publisher);
125        let stats = Arc::clone(&self.stats);
126        let log_errors = self.log_errors;
127
128        tokio::spawn(async move {
129            stats.inc_submitted();
130
131            match publisher.publish(event).await {
132                Ok(()) => {
133                    stats.inc_emitted();
134                }
135                Err(e) => {
136                    stats.inc_failed();
137                    if log_errors {
138                        tracing::warn!("Failed to emit event: {}", e);
139                    }
140                }
141            }
142        });
143    }
144
145    /// Emit multiple events without blocking
146    ///
147    /// # Arguments
148    ///
149    /// * `events` - Vector of events to emit
150    pub fn emit_batch(&self, events: Vec<MemoryGraphEvent>) {
151        let publisher = Arc::clone(&self.publisher);
152        let stats = Arc::clone(&self.stats);
153        let log_errors = self.log_errors;
154        let count = events.len() as u64;
155
156        tokio::spawn(async move {
157            stats.inc_submitted_by(count);
158
159            match publisher.publish_batch(events).await {
160                Ok(()) => {
161                    stats.inc_emitted_by(count);
162                }
163                Err(e) => {
164                    stats.inc_failed_by(count);
165                    if log_errors {
166                        tracing::warn!("Failed to emit event batch: {}", e);
167                    }
168                }
169            }
170        });
171    }
172
173    /// Emit an event and wait for completion
174    ///
175    /// Unlike `emit()`, this method waits for the event to be published
176    /// and returns any errors. Useful for testing and critical events.
177    ///
178    /// # Arguments
179    ///
180    /// * `event` - The event to emit
181    ///
182    /// # Returns
183    ///
184    /// Returns `Ok(())` if the event was successfully published
185    pub async fn emit_sync(&self, event: MemoryGraphEvent) -> crate::error::Result<()> {
186        self.stats.inc_submitted();
187
188        match self.publisher.publish(event).await {
189            Ok(()) => {
190                self.stats.inc_emitted();
191                Ok(())
192            }
193            Err(e) => {
194                self.stats.inc_failed();
195                if self.log_errors {
196                    tracing::warn!("Failed to emit event: {}", e);
197                }
198                Err(e)
199            }
200        }
201    }
202
203    /// Get emission statistics
204    ///
205    /// # Returns
206    ///
207    /// Returns a snapshot of current emission statistics
208    pub async fn stats(&self) -> EmissionStatsSnapshot {
209        self.stats.snapshot().await
210    }
211
212    /// Reset all statistics to zero
213    pub async fn reset_stats(&self) {
214        self.stats.reset().await;
215    }
216
217    /// Get the underlying publisher
218    pub fn publisher(&self) -> &Arc<P> {
219        &self.publisher
220    }
221}
222
223/// Statistics for event emission
224struct EmissionStats {
225    /// Total events submitted for emission
226    events_submitted: AtomicU64,
227    /// Total events successfully emitted
228    events_emitted: AtomicU64,
229    /// Total events that failed to emit
230    events_failed: AtomicU64,
231    /// Peak concurrent emissions (for monitoring)
232    peak_concurrent: RwLock<u64>,
233}
234
235impl EmissionStats {
236    fn new() -> Self {
237        Self {
238            events_submitted: AtomicU64::new(0),
239            events_emitted: AtomicU64::new(0),
240            events_failed: AtomicU64::new(0),
241            peak_concurrent: RwLock::new(0),
242        }
243    }
244
245    fn inc_submitted(&self) {
246        self.events_submitted.fetch_add(1, Ordering::Relaxed);
247    }
248
249    fn inc_submitted_by(&self, count: u64) {
250        self.events_submitted.fetch_add(count, Ordering::Relaxed);
251    }
252
253    fn inc_emitted(&self) {
254        self.events_emitted.fetch_add(1, Ordering::Relaxed);
255    }
256
257    fn inc_emitted_by(&self, count: u64) {
258        self.events_emitted.fetch_add(count, Ordering::Relaxed);
259    }
260
261    fn inc_failed(&self) {
262        self.events_failed.fetch_add(1, Ordering::Relaxed);
263    }
264
265    fn inc_failed_by(&self, count: u64) {
266        self.events_failed.fetch_add(count, Ordering::Relaxed);
267    }
268
269    async fn snapshot(&self) -> EmissionStatsSnapshot {
270        EmissionStatsSnapshot {
271            events_submitted: self.events_submitted.load(Ordering::Relaxed),
272            events_emitted: self.events_emitted.load(Ordering::Relaxed),
273            events_failed: self.events_failed.load(Ordering::Relaxed),
274            peak_concurrent: *self.peak_concurrent.read().await,
275        }
276    }
277
278    async fn reset(&self) {
279        self.events_submitted.store(0, Ordering::Relaxed);
280        self.events_emitted.store(0, Ordering::Relaxed);
281        self.events_failed.store(0, Ordering::Relaxed);
282        *self.peak_concurrent.write().await = 0;
283    }
284}
285
286/// Snapshot of emission statistics
287#[derive(Debug, Clone, Copy, PartialEq, Eq)]
288pub struct EmissionStatsSnapshot {
289    /// Total events submitted for emission
290    pub events_submitted: u64,
291    /// Total events successfully emitted
292    pub events_emitted: u64,
293    /// Total events that failed to emit
294    pub events_failed: u64,
295    /// Peak concurrent emissions
296    pub peak_concurrent: u64,
297}
298
299impl EmissionStatsSnapshot {
300    /// Calculate success rate as a percentage
301    pub fn success_rate(&self) -> f64 {
302        if self.events_submitted == 0 {
303            100.0
304        } else {
305            (self.events_emitted as f64 / self.events_submitted as f64) * 100.0
306        }
307    }
308
309    /// Calculate failure rate as a percentage
310    pub fn failure_rate(&self) -> f64 {
311        if self.events_submitted == 0 {
312            0.0
313        } else {
314            (self.events_failed as f64 / self.events_submitted as f64) * 100.0
315        }
316    }
317}
318
319#[cfg(test)]
320mod tests {
321    use super::*;
322    use crate::observatory::publisher::InMemoryPublisher;
323    use crate::types::{NodeId, NodeType, SessionId};
324    use chrono::Utc;
325    use std::collections::HashMap;
326    use tokio::time::{sleep, Duration};
327
328    #[tokio::test]
329    async fn test_emitter_creation() {
330        let publisher = Arc::new(InMemoryPublisher::new());
331        let emitter = AsyncEventEmitter::new(publisher.clone());
332
333        let stats = emitter.stats().await;
334        assert_eq!(stats.events_submitted, 0);
335        assert_eq!(stats.events_emitted, 0);
336        assert_eq!(stats.events_failed, 0);
337    }
338
339    #[tokio::test]
340    async fn test_emit_single_event() {
341        let publisher = Arc::new(InMemoryPublisher::new());
342        let emitter = AsyncEventEmitter::new(publisher.clone());
343
344        let event = MemoryGraphEvent::NodeCreated {
345            node_id: NodeId::new(),
346            node_type: NodeType::Prompt,
347            session_id: Some(SessionId::new()),
348            timestamp: Utc::now(),
349            metadata: HashMap::new(),
350        };
351
352        emitter.emit(event);
353
354        // Wait for async task to complete
355        sleep(Duration::from_millis(50)).await;
356
357        let stats = emitter.stats().await;
358        assert_eq!(stats.events_submitted, 1);
359        assert_eq!(stats.events_emitted, 1);
360        assert_eq!(stats.events_failed, 0);
361
362        // Verify event was published
363        let published = publisher.get_events().await;
364        assert_eq!(published.len(), 1);
365    }
366
367    #[tokio::test]
368    async fn test_emit_multiple_events() {
369        let publisher = Arc::new(InMemoryPublisher::new());
370        let emitter = AsyncEventEmitter::new(publisher.clone());
371
372        for _ in 0..10 {
373            let event = MemoryGraphEvent::QueryExecuted {
374                query_type: "test".to_string(),
375                results_count: 5,
376                duration_ms: 10,
377                timestamp: Utc::now(),
378            };
379            emitter.emit(event);
380        }
381
382        // Wait for async tasks to complete
383        sleep(Duration::from_millis(100)).await;
384
385        let stats = emitter.stats().await;
386        assert_eq!(stats.events_submitted, 10);
387        assert_eq!(stats.events_emitted, 10);
388        assert_eq!(stats.events_failed, 0);
389
390        let published = publisher.get_events().await;
391        assert_eq!(published.len(), 10);
392    }
393
394    #[tokio::test]
395    async fn test_emit_batch() {
396        let publisher = Arc::new(InMemoryPublisher::new());
397        let emitter = AsyncEventEmitter::new(publisher.clone());
398
399        let events = vec![
400            MemoryGraphEvent::NodeCreated {
401                node_id: NodeId::new(),
402                node_type: NodeType::Prompt,
403                session_id: None,
404                timestamp: Utc::now(),
405                metadata: HashMap::new(),
406            },
407            MemoryGraphEvent::NodeCreated {
408                node_id: NodeId::new(),
409                node_type: NodeType::Response,
410                session_id: None,
411                timestamp: Utc::now(),
412                metadata: HashMap::new(),
413            },
414            MemoryGraphEvent::QueryExecuted {
415                query_type: "batch".to_string(),
416                results_count: 2,
417                duration_ms: 15,
418                timestamp: Utc::now(),
419            },
420        ];
421
422        emitter.emit_batch(events);
423
424        // Wait for async task to complete
425        sleep(Duration::from_millis(50)).await;
426
427        let stats = emitter.stats().await;
428        assert_eq!(stats.events_submitted, 3);
429        assert_eq!(stats.events_emitted, 3);
430
431        let published = publisher.get_events().await;
432        assert_eq!(published.len(), 3);
433    }
434
435    #[tokio::test]
436    async fn test_emit_sync() {
437        let publisher = Arc::new(InMemoryPublisher::new());
438        let emitter = AsyncEventEmitter::new(publisher.clone());
439
440        let event = MemoryGraphEvent::NodeCreated {
441            node_id: NodeId::new(),
442            node_type: NodeType::Prompt,
443            session_id: None,
444            timestamp: Utc::now(),
445            metadata: HashMap::new(),
446        };
447
448        // This should complete synchronously
449        emitter.emit_sync(event).await.unwrap();
450
451        let stats = emitter.stats().await;
452        assert_eq!(stats.events_submitted, 1);
453        assert_eq!(stats.events_emitted, 1);
454
455        let published = publisher.get_events().await;
456        assert_eq!(published.len(), 1);
457    }
458
459    #[tokio::test]
460    async fn test_concurrent_emission() {
461        let publisher = Arc::new(InMemoryPublisher::new());
462        let emitter = AsyncEventEmitter::new(publisher.clone());
463
464        let mut handles = vec![];
465
466        for i in 0..50 {
467            let emitter_clone = emitter.clone();
468            let handle = tokio::spawn(async move {
469                let event = MemoryGraphEvent::QueryExecuted {
470                    query_type: format!("query_{}", i),
471                    results_count: i,
472                    duration_ms: 10,
473                    timestamp: Utc::now(),
474                };
475                emitter_clone.emit(event);
476            });
477            handles.push(handle);
478        }
479
480        for handle in handles {
481            handle.await.unwrap();
482        }
483
484        // Wait for all async emissions to complete
485        sleep(Duration::from_millis(200)).await;
486
487        let stats = emitter.stats().await;
488        assert_eq!(stats.events_submitted, 50);
489        assert_eq!(stats.events_emitted, 50);
490
491        let published = publisher.get_events().await;
492        assert_eq!(published.len(), 50);
493    }
494
495    #[tokio::test]
496    async fn test_stats_snapshot() {
497        let publisher = Arc::new(InMemoryPublisher::new());
498        let emitter = AsyncEventEmitter::new(publisher);
499
500        let event = MemoryGraphEvent::NodeCreated {
501            node_id: NodeId::new(),
502            node_type: NodeType::Prompt,
503            session_id: None,
504            timestamp: Utc::now(),
505            metadata: HashMap::new(),
506        };
507
508        emitter.emit(event);
509        sleep(Duration::from_millis(50)).await;
510
511        let stats = emitter.stats().await;
512        assert_eq!(stats.success_rate(), 100.0);
513        assert_eq!(stats.failure_rate(), 0.0);
514    }
515
516    #[tokio::test]
517    async fn test_reset_stats() {
518        let publisher = Arc::new(InMemoryPublisher::new());
519        let emitter = AsyncEventEmitter::new(publisher);
520
521        let event = MemoryGraphEvent::QueryExecuted {
522            query_type: "test".to_string(),
523            results_count: 1,
524            duration_ms: 10,
525            timestamp: Utc::now(),
526        };
527
528        emitter.emit(event);
529        sleep(Duration::from_millis(50)).await;
530
531        let stats_before = emitter.stats().await;
532        assert_eq!(stats_before.events_emitted, 1);
533
534        emitter.reset_stats().await;
535
536        let stats_after = emitter.stats().await;
537        assert_eq!(stats_after.events_submitted, 0);
538        assert_eq!(stats_after.events_emitted, 0);
539        assert_eq!(stats_after.events_failed, 0);
540    }
541
542    #[tokio::test]
543    async fn test_silent_emitter() {
544        let publisher = Arc::new(InMemoryPublisher::new());
545        let emitter = AsyncEventEmitter::new_silent(publisher.clone());
546
547        let event = MemoryGraphEvent::NodeCreated {
548            node_id: NodeId::new(),
549            node_type: NodeType::Prompt,
550            session_id: None,
551            timestamp: Utc::now(),
552            metadata: HashMap::new(),
553        };
554
555        // Should emit without logging errors
556        emitter.emit(event);
557        sleep(Duration::from_millis(50)).await;
558
559        let published = publisher.get_events().await;
560        assert_eq!(published.len(), 1);
561    }
562
563    #[tokio::test]
564    async fn test_mixed_emit_modes() {
565        let publisher = Arc::new(InMemoryPublisher::new());
566        let emitter = AsyncEventEmitter::new(publisher.clone());
567
568        // Mix async and sync emissions
569        let event1 = MemoryGraphEvent::NodeCreated {
570            node_id: NodeId::new(),
571            node_type: NodeType::Prompt,
572            session_id: None,
573            timestamp: Utc::now(),
574            metadata: HashMap::new(),
575        };
576
577        let event2 = MemoryGraphEvent::QueryExecuted {
578            query_type: "test".to_string(),
579            results_count: 1,
580            duration_ms: 10,
581            timestamp: Utc::now(),
582        };
583
584        emitter.emit(event1);
585        emitter.emit_sync(event2).await.unwrap();
586
587        sleep(Duration::from_millis(50)).await;
588
589        let stats = emitter.stats().await;
590        assert_eq!(stats.events_submitted, 2);
591        assert_eq!(stats.events_emitted, 2);
592
593        let published = publisher.get_events().await;
594        assert_eq!(published.len(), 2);
595    }
596
597    #[tokio::test]
598    async fn test_high_throughput() {
599        let publisher = Arc::new(InMemoryPublisher::new());
600        let emitter = AsyncEventEmitter::new(publisher.clone());
601
602        // Emit 1000 events rapidly
603        for i in 0..1000 {
604            let event = MemoryGraphEvent::QueryExecuted {
605                query_type: format!("query_{}", i),
606                results_count: i,
607                duration_ms: 1,
608                timestamp: Utc::now(),
609            };
610            emitter.emit(event);
611        }
612
613        // Wait for all emissions to complete
614        sleep(Duration::from_millis(500)).await;
615
616        let stats = emitter.stats().await;
617        assert_eq!(stats.events_submitted, 1000);
618        assert_eq!(stats.events_emitted, 1000);
619        assert_eq!(stats.events_failed, 0);
620
621        let published = publisher.get_events().await;
622        assert_eq!(published.len(), 1000);
623    }
624
625    #[tokio::test]
626    async fn test_success_failure_rates() {
627        let publisher = Arc::new(InMemoryPublisher::new());
628        let emitter = AsyncEventEmitter::new(publisher);
629
630        // All events should succeed with InMemoryPublisher
631        for _ in 0..10 {
632            let event = MemoryGraphEvent::NodeCreated {
633                node_id: NodeId::new(),
634                node_type: NodeType::Prompt,
635                session_id: None,
636                timestamp: Utc::now(),
637                metadata: HashMap::new(),
638            };
639            emitter.emit(event);
640        }
641
642        sleep(Duration::from_millis(100)).await;
643
644        let stats = emitter.stats().await;
645        assert_eq!(stats.success_rate(), 100.0);
646        assert_eq!(stats.failure_rate(), 0.0);
647    }
648}