Skip to main content

oxirs_stream/
observability.rs

1//! # Advanced Observability and Telemetry for OxiRS Stream
2//!
3//! Comprehensive monitoring, metrics collection, distributed tracing, and
4//! observability features for production deployment of OxiRS streaming systems.
5
6use anyhow::Result;
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::sync::{broadcast, RwLock};
13#[cfg(feature = "opentelemetry")]
14use tracing::info;
15use tracing::{debug, error, warn};
16use uuid::Uuid;
17
18// OpenTelemetry imports for enhanced observability (simplified for now)
19// Full OpenTelemetry integration can be enabled when dependencies are stable
20#[cfg(feature = "opentelemetry")]
21use opentelemetry::global::BoxedTracer;
22
23use crate::StreamEvent;
24
25/// Comprehensive telemetry configuration
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct TelemetryConfig {
28    /// Enable OpenTelemetry integration
29    pub enable_opentelemetry: bool,
30    /// Jaeger endpoint for distributed tracing
31    pub jaeger_endpoint: Option<String>,
32    /// Prometheus metrics endpoint
33    pub prometheus_endpoint: Option<String>,
34    /// Custom metrics collection interval
35    pub metrics_interval: Duration,
36    /// Enable detailed performance profiling
37    pub enable_profiling: bool,
38    /// Sampling rate for traces (0.0 to 1.0)
39    pub trace_sampling_rate: f64,
40    /// Enable custom business metrics
41    pub enable_business_metrics: bool,
42    /// Maximum number of spans to keep in memory
43    pub max_spans_in_memory: usize,
44}
45
46impl Default for TelemetryConfig {
47    fn default() -> Self {
48        Self {
49            enable_opentelemetry: true,
50            jaeger_endpoint: Some("http://localhost:14268/api/traces".to_string()),
51            prometheus_endpoint: Some("http://localhost:9090".to_string()),
52            metrics_interval: Duration::from_secs(30),
53            enable_profiling: false,
54            trace_sampling_rate: 0.1,
55            enable_business_metrics: true,
56            max_spans_in_memory: 10000,
57        }
58    }
59}
60
61/// Detailed performance metrics for streaming operations
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct StreamingMetrics {
64    /// Events processed per second
65    pub events_per_second: f64,
66    /// Average processing latency in milliseconds
67    pub avg_latency_ms: f64,
68    /// P95 latency in milliseconds
69    pub p95_latency_ms: f64,
70    /// P99 latency in milliseconds
71    pub p99_latency_ms: f64,
72    /// Memory usage in megabytes
73    pub memory_usage_mb: f64,
74    /// CPU usage percentage
75    pub cpu_usage_percent: f64,
76    /// Network I/O bytes per second
77    pub network_io_bps: f64,
78    /// Error rate as percentage
79    pub error_rate_percent: f64,
80    /// Queue depth/backlog size
81    pub queue_depth: u64,
82    /// Active connections count
83    pub active_connections: u64,
84    /// Timestamp of metrics collection
85    pub timestamp: DateTime<Utc>,
86}
87
88impl Default for StreamingMetrics {
89    fn default() -> Self {
90        Self {
91            events_per_second: 0.0,
92            avg_latency_ms: 0.0,
93            p95_latency_ms: 0.0,
94            p99_latency_ms: 0.0,
95            memory_usage_mb: 0.0,
96            cpu_usage_percent: 0.0,
97            network_io_bps: 0.0,
98            error_rate_percent: 0.0,
99            queue_depth: 0,
100            active_connections: 0,
101            timestamp: Utc::now(),
102        }
103    }
104}
105
106/// Business-level streaming metrics
107#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct BusinessMetrics {
109    /// Total events processed since startup
110    pub total_events_processed: u64,
111    /// Total events failed since startup
112    pub total_events_failed: u64,
113    /// Revenue-related events count
114    pub revenue_events_count: u64,
115    /// Customer interaction events count
116    pub customer_events_count: u64,
117    /// System health score (0.0 to 1.0)
118    pub health_score: f64,
119    /// Data quality score (0.0 to 1.0)
120    pub data_quality_score: f64,
121    /// Custom business metrics
122    pub custom_metrics: HashMap<String, f64>,
123}
124
125impl Default for BusinessMetrics {
126    fn default() -> Self {
127        Self {
128            total_events_processed: 0,
129            total_events_failed: 0,
130            revenue_events_count: 0,
131            customer_events_count: 0,
132            health_score: 1.0,
133            data_quality_score: 1.0,
134            custom_metrics: HashMap::new(),
135        }
136    }
137}
138
139/// Distributed tracing span information
140#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct TraceSpan {
142    /// Unique span identifier
143    pub span_id: String,
144    /// Parent span identifier if any
145    pub parent_span_id: Option<String>,
146    /// Trace identifier
147    pub trace_id: String,
148    /// Operation name
149    pub operation_name: String,
150    /// Span start time
151    pub start_time: DateTime<Utc>,
152    /// Span duration
153    pub duration: Option<Duration>,
154    /// Span tags/attributes
155    pub tags: HashMap<String, String>,
156    /// Span logs/events
157    pub logs: Vec<SpanLog>,
158    /// Span status
159    pub status: SpanStatus,
160}
161
162/// Span log entry
163#[derive(Debug, Clone, Serialize, Deserialize)]
164pub struct SpanLog {
165    pub timestamp: DateTime<Utc>,
166    pub level: String,
167    pub message: String,
168    pub fields: HashMap<String, String>,
169}
170
171/// Span execution status
172#[derive(Debug, Clone, Serialize, Deserialize)]
173pub enum SpanStatus {
174    Ok,
175    Error { message: String },
176    Timeout,
177    Cancelled,
178}
179
180/// Alert configuration and management
181#[derive(Debug, Clone, Serialize, Deserialize)]
182pub struct AlertConfig {
183    /// Enable alerting system
184    pub enabled: bool,
185    /// Latency threshold for alerts (milliseconds)
186    pub latency_threshold_ms: f64,
187    /// Error rate threshold for alerts (percentage)
188    pub error_rate_threshold_percent: f64,
189    /// Memory usage threshold for alerts (percentage)
190    pub memory_threshold_percent: f64,
191    /// Queue depth threshold for alerts
192    pub queue_depth_threshold: u64,
193    /// Alert notification endpoints
194    pub notification_endpoints: Vec<String>,
195    /// Alert cooldown period to prevent spam
196    pub cooldown_period: Duration,
197}
198
199impl Default for AlertConfig {
200    fn default() -> Self {
201        Self {
202            enabled: true,
203            latency_threshold_ms: 100.0,
204            error_rate_threshold_percent: 5.0,
205            memory_threshold_percent: 80.0,
206            queue_depth_threshold: 10000,
207            notification_endpoints: vec!["http://localhost:9093/api/v1/alerts".to_string()],
208            cooldown_period: Duration::from_secs(300),
209        }
210    }
211}
212
213/// Advanced observability system for streaming operations
214pub struct StreamObservability {
215    config: TelemetryConfig,
216    alert_config: AlertConfig,
217    streaming_metrics: Arc<RwLock<StreamingMetrics>>,
218    business_metrics: Arc<RwLock<BusinessMetrics>>,
219    active_spans: Arc<RwLock<HashMap<String, TraceSpan>>>,
220    metrics_history: Arc<RwLock<Vec<StreamingMetrics>>>,
221    alert_sender: broadcast::Sender<AlertEvent>,
222    last_alert_times: Arc<RwLock<HashMap<String, DateTime<Utc>>>>,
223    /// OpenTelemetry tracer for enhanced distributed tracing
224    #[cfg(feature = "opentelemetry")]
225    tracer: Option<Arc<BoxedTracer>>,
226    #[cfg(not(feature = "opentelemetry"))]
227    tracer: Option<()>,
228}
229
230/// Alert event structure
231#[derive(Debug, Clone, Serialize, Deserialize)]
232pub struct AlertEvent {
233    pub alert_id: String,
234    pub alert_type: AlertType,
235    pub severity: AlertSeverity,
236    pub message: String,
237    pub metric_value: f64,
238    pub threshold: f64,
239    pub timestamp: DateTime<Utc>,
240    pub metadata: HashMap<String, String>,
241}
242
243/// Types of alerts that can be triggered
244#[derive(Debug, Clone, Serialize, Deserialize)]
245pub enum AlertType {
246    HighLatency,
247    HighErrorRate,
248    HighMemoryUsage,
249    QueueBacklog,
250    ConnectionFailure,
251    DataQualityIssue,
252    Custom(String),
253}
254
255/// Alert severity levels
256#[derive(Debug, Clone, Serialize, Deserialize)]
257pub enum AlertSeverity {
258    Info,
259    Warning,
260    Critical,
261    Emergency,
262}
263
264impl StreamObservability {
265    /// Create a new observability system
266    pub fn new(config: TelemetryConfig, alert_config: AlertConfig) -> Self {
267        let (alert_sender, _) = broadcast::channel(1000);
268
269        // Initialize OpenTelemetry tracer if enabled
270        #[cfg(feature = "opentelemetry")]
271        let tracer = if config.enable_opentelemetry {
272            Self::setup_jaeger_tracer(&config).ok()
273        } else {
274            None
275        };
276        #[cfg(not(feature = "opentelemetry"))]
277        let tracer = if config.enable_opentelemetry {
278            warn!("OpenTelemetry requested but feature not enabled. Compile with --features opentelemetry");
279            None
280        } else {
281            None
282        };
283
284        Self {
285            config,
286            alert_config,
287            streaming_metrics: Arc::new(RwLock::new(StreamingMetrics::default())),
288            business_metrics: Arc::new(RwLock::new(BusinessMetrics::default())),
289            active_spans: Arc::new(RwLock::new(HashMap::new())),
290            metrics_history: Arc::new(RwLock::new(Vec::new())),
291            alert_sender,
292            last_alert_times: Arc::new(RwLock::new(HashMap::new())),
293            tracer,
294        }
295    }
296
297    /// Set up Jaeger tracer for distributed tracing
298    #[cfg(feature = "opentelemetry")]
299    fn setup_jaeger_tracer(config: &TelemetryConfig) -> Result<Arc<BoxedTracer>> {
300        // For now, return a placeholder implementation
301        // Full OpenTelemetry integration will be implemented when dependencies are stable
302        warn!("OpenTelemetry Jaeger integration is disabled pending dependency stability");
303
304        if let Some(jaeger_endpoint) = &config.jaeger_endpoint {
305            info!("Jaeger endpoint configured: {}", jaeger_endpoint);
306        }
307
308        // Return a no-op tracer for now
309        let tracer = opentelemetry::global::tracer("oxirs-stream");
310        Ok(Arc::new(tracer))
311    }
312
313    /// Start a new distributed trace span
314    pub async fn start_span(&self, operation_name: &str, parent_span_id: Option<String>) -> String {
315        let span_id = Uuid::new_v4().to_string();
316        let trace_id = parent_span_id
317            .clone()
318            .unwrap_or_else(|| Uuid::new_v4().to_string());
319
320        let span = TraceSpan {
321            span_id: span_id.clone(),
322            parent_span_id,
323            trace_id,
324            operation_name: operation_name.to_string(),
325            start_time: Utc::now(),
326            duration: None,
327            tags: HashMap::new(),
328            logs: Vec::new(),
329            status: SpanStatus::Ok,
330        };
331
332        // Add span to active spans if we haven't exceeded the limit
333        {
334            let mut active_spans = self.active_spans.write().await;
335            if active_spans.len() < self.config.max_spans_in_memory {
336                active_spans.insert(span_id.clone(), span);
337            }
338        }
339
340        debug!(
341            "Started span: {} for operation: {}",
342            span_id, operation_name
343        );
344        span_id
345    }
346
347    /// Finish a trace span
348    pub async fn finish_span(&self, span_id: &str, status: SpanStatus) -> Result<()> {
349        let mut active_spans = self.active_spans.write().await;
350
351        if let Some(mut span) = active_spans.remove(span_id) {
352            span.duration = Some(Utc::now().signed_duration_since(span.start_time).to_std()?);
353            span.status = status;
354
355            debug!(
356                "Finished span: {} with duration: {:?}",
357                span_id, span.duration
358            );
359
360            // In a real implementation, you'd send this to your tracing backend
361            if self.config.enable_opentelemetry {
362                self.export_span_to_jaeger(&span).await?;
363            }
364        }
365
366        Ok(())
367    }
368
369    /// Add a tag to an active span
370    pub async fn add_span_tag(&self, span_id: &str, key: &str, value: &str) -> Result<()> {
371        let mut active_spans = self.active_spans.write().await;
372
373        if let Some(span) = active_spans.get_mut(span_id) {
374            span.tags.insert(key.to_string(), value.to_string());
375        }
376
377        Ok(())
378    }
379
380    /// Add a log entry to an active span
381    pub async fn add_span_log(&self, span_id: &str, level: &str, message: &str) -> Result<()> {
382        let mut active_spans = self.active_spans.write().await;
383
384        if let Some(span) = active_spans.get_mut(span_id) {
385            span.logs.push(SpanLog {
386                timestamp: Utc::now(),
387                level: level.to_string(),
388                message: message.to_string(),
389                fields: HashMap::new(),
390            });
391        }
392
393        Ok(())
394    }
395
396    /// Record a streaming event for metrics collection
397    pub async fn record_event(
398        &self,
399        event: &StreamEvent,
400        processing_duration: Duration,
401    ) -> Result<()> {
402        let processing_time_ms = processing_duration.as_millis() as f64;
403
404        // Update streaming metrics
405        {
406            let mut metrics = self.streaming_metrics.write().await;
407
408            // Simple moving average for latency
409            metrics.avg_latency_ms = (metrics.avg_latency_ms + processing_time_ms) / 2.0;
410
411            // Update P95/P99 approximation (simplified)
412            if processing_time_ms > metrics.p95_latency_ms {
413                metrics.p95_latency_ms = processing_time_ms;
414            }
415            if processing_time_ms > metrics.p99_latency_ms {
416                metrics.p99_latency_ms = processing_time_ms;
417            }
418
419            metrics.timestamp = Utc::now();
420        }
421
422        // Update business metrics if enabled
423        if self.config.enable_business_metrics {
424            let mut business_metrics = self.business_metrics.write().await;
425            business_metrics.total_events_processed += 1;
426
427            // Classify business events based on event type
428            if let StreamEvent::TripleAdded { subject, .. } = event {
429                if subject.contains("customer") {
430                    business_metrics.customer_events_count += 1;
431                } else if subject.contains("revenue") || subject.contains("order") {
432                    business_metrics.revenue_events_count += 1;
433                }
434            }
435        }
436
437        // Check for alerts
438        self.check_and_trigger_alerts().await?;
439
440        Ok(())
441    }
442
443    /// Record an error for metrics and alerting
444    pub async fn record_error(&self, error: &anyhow::Error, context: &str) -> Result<()> {
445        error!("Streaming error in {}: {}", context, error);
446
447        // Update error metrics
448        {
449            let mut business_metrics = self.business_metrics.write().await;
450            business_metrics.total_events_failed += 1;
451
452            // Calculate error rate
453            let total_events =
454                business_metrics.total_events_processed + business_metrics.total_events_failed;
455            if total_events > 0 {
456                let error_rate =
457                    (business_metrics.total_events_failed as f64 / total_events as f64) * 100.0;
458
459                let mut streaming_metrics = self.streaming_metrics.write().await;
460                streaming_metrics.error_rate_percent = error_rate;
461            }
462        }
463
464        // Trigger error rate alert if necessary
465        self.check_and_trigger_alerts().await?;
466
467        Ok(())
468    }
469
470    /// Update system resource metrics
471    pub async fn update_system_metrics(
472        &self,
473        memory_mb: f64,
474        cpu_percent: f64,
475        network_bps: f64,
476    ) -> Result<()> {
477        let mut metrics = self.streaming_metrics.write().await;
478        metrics.memory_usage_mb = memory_mb;
479        metrics.cpu_usage_percent = cpu_percent;
480        metrics.network_io_bps = network_bps;
481        metrics.timestamp = Utc::now();
482
483        // Add to history for trend analysis
484        {
485            let mut history = self.metrics_history.write().await;
486            history.push(metrics.clone());
487
488            // Keep only last 1000 entries
489            if history.len() > 1000 {
490                history.remove(0);
491            }
492        }
493
494        Ok(())
495    }
496
497    /// Get current streaming metrics
498    pub async fn get_streaming_metrics(&self) -> StreamingMetrics {
499        self.streaming_metrics.read().await.clone()
500    }
501
502    /// Get current business metrics
503    pub async fn get_business_metrics(&self) -> BusinessMetrics {
504        self.business_metrics.read().await.clone()
505    }
506
507    /// Get metrics history for trend analysis
508    pub async fn get_metrics_history(&self) -> Vec<StreamingMetrics> {
509        self.metrics_history.read().await.clone()
510    }
511
512    /// Subscribe to alert notifications
513    pub fn subscribe_to_alerts(&self) -> broadcast::Receiver<AlertEvent> {
514        self.alert_sender.subscribe()
515    }
516
517    /// Check metrics against thresholds and trigger alerts
518    async fn check_and_trigger_alerts(&self) -> Result<()> {
519        if !self.alert_config.enabled {
520            return Ok(());
521        }
522
523        let metrics = self.streaming_metrics.read().await;
524        let _now = Utc::now();
525
526        // Check latency threshold
527        if metrics.avg_latency_ms > self.alert_config.latency_threshold_ms {
528            self.trigger_alert(
529                AlertType::HighLatency,
530                AlertSeverity::Warning,
531                &format!(
532                    "Average latency ({:.2}ms) exceeds threshold ({:.2}ms)",
533                    metrics.avg_latency_ms, self.alert_config.latency_threshold_ms
534                ),
535                metrics.avg_latency_ms,
536                self.alert_config.latency_threshold_ms,
537            )
538            .await?;
539        }
540
541        // Check error rate threshold
542        if metrics.error_rate_percent > self.alert_config.error_rate_threshold_percent {
543            self.trigger_alert(
544                AlertType::HighErrorRate,
545                AlertSeverity::Critical,
546                &format!(
547                    "Error rate ({:.2}%) exceeds threshold ({:.2}%)",
548                    metrics.error_rate_percent, self.alert_config.error_rate_threshold_percent
549                ),
550                metrics.error_rate_percent,
551                self.alert_config.error_rate_threshold_percent,
552            )
553            .await?;
554        }
555
556        // Check memory usage threshold
557        let memory_percent = (metrics.memory_usage_mb / 1024.0) * 100.0; // Rough approximation
558        if memory_percent > self.alert_config.memory_threshold_percent {
559            self.trigger_alert(
560                AlertType::HighMemoryUsage,
561                AlertSeverity::Warning,
562                &format!(
563                    "Memory usage ({:.2}%) exceeds threshold ({:.2}%)",
564                    memory_percent, self.alert_config.memory_threshold_percent
565                ),
566                memory_percent,
567                self.alert_config.memory_threshold_percent,
568            )
569            .await?;
570        }
571
572        // Check queue depth threshold
573        if metrics.queue_depth > self.alert_config.queue_depth_threshold {
574            self.trigger_alert(
575                AlertType::QueueBacklog,
576                AlertSeverity::Critical,
577                &format!(
578                    "Queue depth ({}) exceeds threshold ({})",
579                    metrics.queue_depth, self.alert_config.queue_depth_threshold
580                ),
581                metrics.queue_depth as f64,
582                self.alert_config.queue_depth_threshold as f64,
583            )
584            .await?;
585        }
586
587        Ok(())
588    }
589
590    /// Trigger an alert if cooldown period has passed
591    async fn trigger_alert(
592        &self,
593        alert_type: AlertType,
594        severity: AlertSeverity,
595        message: &str,
596        metric_value: f64,
597        threshold: f64,
598    ) -> Result<()> {
599        let alert_key = format!("{alert_type:?}");
600        let now = Utc::now();
601
602        // Check cooldown period
603        {
604            let last_alert_times = self.last_alert_times.read().await;
605            if let Some(last_time) = last_alert_times.get(&alert_key) {
606                if now.signed_duration_since(*last_time).to_std()?
607                    < self.alert_config.cooldown_period
608                {
609                    return Ok(()); // Still in cooldown
610                }
611            }
612        }
613
614        // Update last alert time
615        {
616            let mut last_alert_times = self.last_alert_times.write().await;
617            last_alert_times.insert(alert_key, now);
618        }
619
620        // Create and send alert
621        let alert = AlertEvent {
622            alert_id: Uuid::new_v4().to_string(),
623            alert_type,
624            severity,
625            message: message.to_string(),
626            metric_value,
627            threshold,
628            timestamp: now,
629            metadata: HashMap::new(),
630        };
631
632        // Send alert to subscribers
633        let _ = self.alert_sender.send(alert.clone());
634
635        warn!("Alert triggered: {} - {}", alert.alert_id, alert.message);
636
637        Ok(())
638    }
639
640    /// Export span to Jaeger using OpenTelemetry tracer
641    async fn export_span_to_jaeger(&self, span: &TraceSpan) -> Result<()> {
642        #[cfg(feature = "opentelemetry")]
643        {
644            if let Some(_tracer) = &self.tracer {
645                debug!(
646                    "OpenTelemetry span export is disabled pending dependency stability. Span: {}",
647                    span.span_id
648                );
649                // Full OpenTelemetry integration will be implemented when dependencies are stable
650            } else if let Some(jaeger_endpoint) = &self.config.jaeger_endpoint {
651                debug!(
652                    "Tracer not initialized, skipping span export to {}",
653                    jaeger_endpoint
654                );
655            }
656        }
657
658        #[cfg(not(feature = "opentelemetry"))]
659        {
660            if let Some(jaeger_endpoint) = &self.config.jaeger_endpoint {
661                debug!(
662                    "OpenTelemetry feature not enabled, skipping span export to {}. Span: {}",
663                    jaeger_endpoint, span.span_id
664                );
665            }
666        }
667
668        Ok(())
669    }
670
671    /// Generate a comprehensive observability report
672    pub async fn generate_observability_report(&self) -> Result<String> {
673        let streaming_metrics = self.get_streaming_metrics().await;
674        let business_metrics = self.get_business_metrics().await;
675        let metrics_history = self.get_metrics_history().await;
676
677        let report = format!(
678            r#"
679# OxiRS Stream Observability Report
680Generated: {}
681
682## Performance Metrics
683- Events per Second: {:.2}
684- Average Latency: {:.2}ms
685- P95 Latency: {:.2}ms
686- P99 Latency: {:.2}ms
687- Error Rate: {:.2}%
688- Memory Usage: {:.2}MB
689- CPU Usage: {:.2}%
690- Network I/O: {:.2} Bps
691
692## Business Metrics
693- Total Events Processed: {}
694- Total Events Failed: {}
695- Revenue Events: {}
696- Customer Events: {}
697- Health Score: {:.2}
698- Data Quality Score: {:.2}
699
700## System Health
701- Active Connections: {}
702- Queue Depth: {}
703- Metrics History Length: {}
704
705## Configuration
706- OpenTelemetry Enabled: {}
707- Profiling Enabled: {}
708- Trace Sampling Rate: {:.1}%
709- Alert Thresholds:
710  - Latency: {:.2}ms
711  - Error Rate: {:.2}%
712  - Memory: {:.2}%
713  - Queue Depth: {}
714"#,
715            Utc::now(),
716            streaming_metrics.events_per_second,
717            streaming_metrics.avg_latency_ms,
718            streaming_metrics.p95_latency_ms,
719            streaming_metrics.p99_latency_ms,
720            streaming_metrics.error_rate_percent,
721            streaming_metrics.memory_usage_mb,
722            streaming_metrics.cpu_usage_percent,
723            streaming_metrics.network_io_bps,
724            business_metrics.total_events_processed,
725            business_metrics.total_events_failed,
726            business_metrics.revenue_events_count,
727            business_metrics.customer_events_count,
728            business_metrics.health_score,
729            business_metrics.data_quality_score,
730            streaming_metrics.active_connections,
731            streaming_metrics.queue_depth,
732            metrics_history.len(),
733            self.config.enable_opentelemetry,
734            self.config.enable_profiling,
735            self.config.trace_sampling_rate * 100.0,
736            self.alert_config.latency_threshold_ms,
737            self.alert_config.error_rate_threshold_percent,
738            self.alert_config.memory_threshold_percent,
739            self.alert_config.queue_depth_threshold,
740        );
741
742        Ok(report)
743    }
744}
745
746#[cfg(test)]
747mod tests {
748    use super::*;
749    use tokio::time::{sleep, Duration};
750
751    #[tokio::test]
752    async fn test_observability_creation() {
753        let config = TelemetryConfig::default();
754        let alert_config = AlertConfig::default();
755        let observability = StreamObservability::new(config, alert_config);
756
757        assert_eq!(
758            observability
759                .get_streaming_metrics()
760                .await
761                .events_per_second,
762            0.0
763        );
764    }
765
766    #[tokio::test]
767    async fn test_span_lifecycle() {
768        let config = TelemetryConfig::default();
769        let alert_config = AlertConfig::default();
770        let observability = StreamObservability::new(config, alert_config);
771
772        let span_id = observability.start_span("test_operation", None).await;
773        assert!(!span_id.is_empty());
774
775        observability
776            .add_span_tag(&span_id, "test_key", "test_value")
777            .await
778            .unwrap();
779        observability
780            .add_span_log(&span_id, "info", "Test log message")
781            .await
782            .unwrap();
783
784        observability
785            .finish_span(&span_id, SpanStatus::Ok)
786            .await
787            .unwrap();
788
789        // Span should be removed from active spans after finishing
790        let active_spans = observability.active_spans.read().await;
791        assert!(!active_spans.contains_key(&span_id));
792    }
793
794    #[tokio::test]
795    async fn test_metrics_recording() {
796        let config = TelemetryConfig::default();
797        let alert_config = AlertConfig::default();
798        let observability = StreamObservability::new(config, alert_config);
799
800        let event = crate::event::StreamEvent::TripleAdded {
801            subject: "http://example.org/subject".to_string(),
802            predicate: "http://example.org/predicate".to_string(),
803            object: "\"test_object\"".to_string(),
804            graph: None,
805            metadata: crate::event::EventMetadata::default(),
806        };
807
808        observability
809            .record_event(&event, Duration::from_millis(50))
810            .await
811            .unwrap();
812
813        let metrics = observability.get_streaming_metrics().await;
814        assert!(metrics.avg_latency_ms > 0.0);
815
816        let business_metrics = observability.get_business_metrics().await;
817        assert_eq!(business_metrics.total_events_processed, 1);
818    }
819
820    #[tokio::test]
821    async fn test_alert_system() {
822        let config = TelemetryConfig::default();
823        let alert_config = AlertConfig {
824            latency_threshold_ms: 10.0,
825            ..Default::default()
826        }; // Very low threshold for testing
827
828        let observability = StreamObservability::new(config, alert_config);
829        let mut alert_receiver = observability.subscribe_to_alerts();
830
831        let event = crate::event::StreamEvent::TripleAdded {
832            subject: "http://example.org/subject".to_string(),
833            predicate: "http://example.org/predicate".to_string(),
834            object: "\"test_object\"".to_string(),
835            graph: None,
836            metadata: crate::event::EventMetadata::default(),
837        };
838
839        // Record an event with high latency to trigger alert
840        observability
841            .record_event(&event, Duration::from_millis(100))
842            .await
843            .unwrap();
844
845        // Check if alert was triggered
846        tokio::select! {
847            alert = alert_receiver.recv() => {
848                assert!(alert.is_ok());
849                let alert = alert.unwrap();
850                assert!(matches!(alert.alert_type, AlertType::HighLatency));
851            }
852            _ = sleep(Duration::from_millis(100)) => {
853                // Alert might not be triggered due to timing
854            }
855        }
856    }
857
858    #[tokio::test]
859    async fn test_observability_report() {
860        let config = TelemetryConfig::default();
861        let alert_config = AlertConfig::default();
862        let observability = StreamObservability::new(config, alert_config);
863
864        // Add some test data
865        observability
866            .update_system_metrics(100.0, 50.0, 1000.0)
867            .await
868            .unwrap();
869
870        let report = observability.generate_observability_report().await.unwrap();
871        assert!(report.contains("OxiRS Stream Observability Report"));
872        assert!(report.contains("Memory Usage: 100.00MB"));
873        assert!(report.contains("CPU Usage: 50.00%"));
874    }
875}