1use anyhow::Result;
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::sync::{broadcast, RwLock};
13#[cfg(feature = "opentelemetry")]
14use tracing::info;
15use tracing::{debug, error, warn};
16use uuid::Uuid;
17
18#[cfg(feature = "opentelemetry")]
21use opentelemetry::global::BoxedTracer;
22
23use crate::StreamEvent;
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct TelemetryConfig {
28 pub enable_opentelemetry: bool,
30 pub jaeger_endpoint: Option<String>,
32 pub prometheus_endpoint: Option<String>,
34 pub metrics_interval: Duration,
36 pub enable_profiling: bool,
38 pub trace_sampling_rate: f64,
40 pub enable_business_metrics: bool,
42 pub max_spans_in_memory: usize,
44}
45
46impl Default for TelemetryConfig {
47 fn default() -> Self {
48 Self {
49 enable_opentelemetry: true,
50 jaeger_endpoint: Some("http://localhost:14268/api/traces".to_string()),
51 prometheus_endpoint: Some("http://localhost:9090".to_string()),
52 metrics_interval: Duration::from_secs(30),
53 enable_profiling: false,
54 trace_sampling_rate: 0.1,
55 enable_business_metrics: true,
56 max_spans_in_memory: 10000,
57 }
58 }
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct StreamingMetrics {
64 pub events_per_second: f64,
66 pub avg_latency_ms: f64,
68 pub p95_latency_ms: f64,
70 pub p99_latency_ms: f64,
72 pub memory_usage_mb: f64,
74 pub cpu_usage_percent: f64,
76 pub network_io_bps: f64,
78 pub error_rate_percent: f64,
80 pub queue_depth: u64,
82 pub active_connections: u64,
84 pub timestamp: DateTime<Utc>,
86}
87
88impl Default for StreamingMetrics {
89 fn default() -> Self {
90 Self {
91 events_per_second: 0.0,
92 avg_latency_ms: 0.0,
93 p95_latency_ms: 0.0,
94 p99_latency_ms: 0.0,
95 memory_usage_mb: 0.0,
96 cpu_usage_percent: 0.0,
97 network_io_bps: 0.0,
98 error_rate_percent: 0.0,
99 queue_depth: 0,
100 active_connections: 0,
101 timestamp: Utc::now(),
102 }
103 }
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct BusinessMetrics {
109 pub total_events_processed: u64,
111 pub total_events_failed: u64,
113 pub revenue_events_count: u64,
115 pub customer_events_count: u64,
117 pub health_score: f64,
119 pub data_quality_score: f64,
121 pub custom_metrics: HashMap<String, f64>,
123}
124
125impl Default for BusinessMetrics {
126 fn default() -> Self {
127 Self {
128 total_events_processed: 0,
129 total_events_failed: 0,
130 revenue_events_count: 0,
131 customer_events_count: 0,
132 health_score: 1.0,
133 data_quality_score: 1.0,
134 custom_metrics: HashMap::new(),
135 }
136 }
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct TraceSpan {
142 pub span_id: String,
144 pub parent_span_id: Option<String>,
146 pub trace_id: String,
148 pub operation_name: String,
150 pub start_time: DateTime<Utc>,
152 pub duration: Option<Duration>,
154 pub tags: HashMap<String, String>,
156 pub logs: Vec<SpanLog>,
158 pub status: SpanStatus,
160}
161
162#[derive(Debug, Clone, Serialize, Deserialize)]
164pub struct SpanLog {
165 pub timestamp: DateTime<Utc>,
166 pub level: String,
167 pub message: String,
168 pub fields: HashMap<String, String>,
169}
170
171#[derive(Debug, Clone, Serialize, Deserialize)]
173pub enum SpanStatus {
174 Ok,
175 Error { message: String },
176 Timeout,
177 Cancelled,
178}
179
180#[derive(Debug, Clone, Serialize, Deserialize)]
182pub struct AlertConfig {
183 pub enabled: bool,
185 pub latency_threshold_ms: f64,
187 pub error_rate_threshold_percent: f64,
189 pub memory_threshold_percent: f64,
191 pub queue_depth_threshold: u64,
193 pub notification_endpoints: Vec<String>,
195 pub cooldown_period: Duration,
197}
198
199impl Default for AlertConfig {
200 fn default() -> Self {
201 Self {
202 enabled: true,
203 latency_threshold_ms: 100.0,
204 error_rate_threshold_percent: 5.0,
205 memory_threshold_percent: 80.0,
206 queue_depth_threshold: 10000,
207 notification_endpoints: vec!["http://localhost:9093/api/v1/alerts".to_string()],
208 cooldown_period: Duration::from_secs(300),
209 }
210 }
211}
212
213pub struct StreamObservability {
215 config: TelemetryConfig,
216 alert_config: AlertConfig,
217 streaming_metrics: Arc<RwLock<StreamingMetrics>>,
218 business_metrics: Arc<RwLock<BusinessMetrics>>,
219 active_spans: Arc<RwLock<HashMap<String, TraceSpan>>>,
220 metrics_history: Arc<RwLock<Vec<StreamingMetrics>>>,
221 alert_sender: broadcast::Sender<AlertEvent>,
222 last_alert_times: Arc<RwLock<HashMap<String, DateTime<Utc>>>>,
223 #[cfg(feature = "opentelemetry")]
225 tracer: Option<Arc<BoxedTracer>>,
226 #[cfg(not(feature = "opentelemetry"))]
227 tracer: Option<()>,
228}
229
230#[derive(Debug, Clone, Serialize, Deserialize)]
232pub struct AlertEvent {
233 pub alert_id: String,
234 pub alert_type: AlertType,
235 pub severity: AlertSeverity,
236 pub message: String,
237 pub metric_value: f64,
238 pub threshold: f64,
239 pub timestamp: DateTime<Utc>,
240 pub metadata: HashMap<String, String>,
241}
242
243#[derive(Debug, Clone, Serialize, Deserialize)]
245pub enum AlertType {
246 HighLatency,
247 HighErrorRate,
248 HighMemoryUsage,
249 QueueBacklog,
250 ConnectionFailure,
251 DataQualityIssue,
252 Custom(String),
253}
254
255#[derive(Debug, Clone, Serialize, Deserialize)]
257pub enum AlertSeverity {
258 Info,
259 Warning,
260 Critical,
261 Emergency,
262}
263
264impl StreamObservability {
265 pub fn new(config: TelemetryConfig, alert_config: AlertConfig) -> Self {
267 let (alert_sender, _) = broadcast::channel(1000);
268
269 #[cfg(feature = "opentelemetry")]
271 let tracer = if config.enable_opentelemetry {
272 Self::setup_jaeger_tracer(&config).ok()
273 } else {
274 None
275 };
276 #[cfg(not(feature = "opentelemetry"))]
277 let tracer = if config.enable_opentelemetry {
278 warn!("OpenTelemetry requested but feature not enabled. Compile with --features opentelemetry");
279 None
280 } else {
281 None
282 };
283
284 Self {
285 config,
286 alert_config,
287 streaming_metrics: Arc::new(RwLock::new(StreamingMetrics::default())),
288 business_metrics: Arc::new(RwLock::new(BusinessMetrics::default())),
289 active_spans: Arc::new(RwLock::new(HashMap::new())),
290 metrics_history: Arc::new(RwLock::new(Vec::new())),
291 alert_sender,
292 last_alert_times: Arc::new(RwLock::new(HashMap::new())),
293 tracer,
294 }
295 }
296
297 #[cfg(feature = "opentelemetry")]
299 fn setup_jaeger_tracer(config: &TelemetryConfig) -> Result<Arc<BoxedTracer>> {
300 warn!("OpenTelemetry Jaeger integration is disabled pending dependency stability");
303
304 if let Some(jaeger_endpoint) = &config.jaeger_endpoint {
305 info!("Jaeger endpoint configured: {}", jaeger_endpoint);
306 }
307
308 let tracer = opentelemetry::global::tracer("oxirs-stream");
310 Ok(Arc::new(tracer))
311 }
312
313 pub async fn start_span(&self, operation_name: &str, parent_span_id: Option<String>) -> String {
315 let span_id = Uuid::new_v4().to_string();
316 let trace_id = parent_span_id
317 .clone()
318 .unwrap_or_else(|| Uuid::new_v4().to_string());
319
320 let span = TraceSpan {
321 span_id: span_id.clone(),
322 parent_span_id,
323 trace_id,
324 operation_name: operation_name.to_string(),
325 start_time: Utc::now(),
326 duration: None,
327 tags: HashMap::new(),
328 logs: Vec::new(),
329 status: SpanStatus::Ok,
330 };
331
332 {
334 let mut active_spans = self.active_spans.write().await;
335 if active_spans.len() < self.config.max_spans_in_memory {
336 active_spans.insert(span_id.clone(), span);
337 }
338 }
339
340 debug!(
341 "Started span: {} for operation: {}",
342 span_id, operation_name
343 );
344 span_id
345 }
346
347 pub async fn finish_span(&self, span_id: &str, status: SpanStatus) -> Result<()> {
349 let mut active_spans = self.active_spans.write().await;
350
351 if let Some(mut span) = active_spans.remove(span_id) {
352 span.duration = Some(Utc::now().signed_duration_since(span.start_time).to_std()?);
353 span.status = status;
354
355 debug!(
356 "Finished span: {} with duration: {:?}",
357 span_id, span.duration
358 );
359
360 if self.config.enable_opentelemetry {
362 self.export_span_to_jaeger(&span).await?;
363 }
364 }
365
366 Ok(())
367 }
368
369 pub async fn add_span_tag(&self, span_id: &str, key: &str, value: &str) -> Result<()> {
371 let mut active_spans = self.active_spans.write().await;
372
373 if let Some(span) = active_spans.get_mut(span_id) {
374 span.tags.insert(key.to_string(), value.to_string());
375 }
376
377 Ok(())
378 }
379
380 pub async fn add_span_log(&self, span_id: &str, level: &str, message: &str) -> Result<()> {
382 let mut active_spans = self.active_spans.write().await;
383
384 if let Some(span) = active_spans.get_mut(span_id) {
385 span.logs.push(SpanLog {
386 timestamp: Utc::now(),
387 level: level.to_string(),
388 message: message.to_string(),
389 fields: HashMap::new(),
390 });
391 }
392
393 Ok(())
394 }
395
396 pub async fn record_event(
398 &self,
399 event: &StreamEvent,
400 processing_duration: Duration,
401 ) -> Result<()> {
402 let processing_time_ms = processing_duration.as_millis() as f64;
403
404 {
406 let mut metrics = self.streaming_metrics.write().await;
407
408 metrics.avg_latency_ms = (metrics.avg_latency_ms + processing_time_ms) / 2.0;
410
411 if processing_time_ms > metrics.p95_latency_ms {
413 metrics.p95_latency_ms = processing_time_ms;
414 }
415 if processing_time_ms > metrics.p99_latency_ms {
416 metrics.p99_latency_ms = processing_time_ms;
417 }
418
419 metrics.timestamp = Utc::now();
420 }
421
422 if self.config.enable_business_metrics {
424 let mut business_metrics = self.business_metrics.write().await;
425 business_metrics.total_events_processed += 1;
426
427 if let StreamEvent::TripleAdded { subject, .. } = event {
429 if subject.contains("customer") {
430 business_metrics.customer_events_count += 1;
431 } else if subject.contains("revenue") || subject.contains("order") {
432 business_metrics.revenue_events_count += 1;
433 }
434 }
435 }
436
437 self.check_and_trigger_alerts().await?;
439
440 Ok(())
441 }
442
443 pub async fn record_error(&self, error: &anyhow::Error, context: &str) -> Result<()> {
445 error!("Streaming error in {}: {}", context, error);
446
447 {
449 let mut business_metrics = self.business_metrics.write().await;
450 business_metrics.total_events_failed += 1;
451
452 let total_events =
454 business_metrics.total_events_processed + business_metrics.total_events_failed;
455 if total_events > 0 {
456 let error_rate =
457 (business_metrics.total_events_failed as f64 / total_events as f64) * 100.0;
458
459 let mut streaming_metrics = self.streaming_metrics.write().await;
460 streaming_metrics.error_rate_percent = error_rate;
461 }
462 }
463
464 self.check_and_trigger_alerts().await?;
466
467 Ok(())
468 }
469
470 pub async fn update_system_metrics(
472 &self,
473 memory_mb: f64,
474 cpu_percent: f64,
475 network_bps: f64,
476 ) -> Result<()> {
477 let mut metrics = self.streaming_metrics.write().await;
478 metrics.memory_usage_mb = memory_mb;
479 metrics.cpu_usage_percent = cpu_percent;
480 metrics.network_io_bps = network_bps;
481 metrics.timestamp = Utc::now();
482
483 {
485 let mut history = self.metrics_history.write().await;
486 history.push(metrics.clone());
487
488 if history.len() > 1000 {
490 history.remove(0);
491 }
492 }
493
494 Ok(())
495 }
496
497 pub async fn get_streaming_metrics(&self) -> StreamingMetrics {
499 self.streaming_metrics.read().await.clone()
500 }
501
502 pub async fn get_business_metrics(&self) -> BusinessMetrics {
504 self.business_metrics.read().await.clone()
505 }
506
507 pub async fn get_metrics_history(&self) -> Vec<StreamingMetrics> {
509 self.metrics_history.read().await.clone()
510 }
511
512 pub fn subscribe_to_alerts(&self) -> broadcast::Receiver<AlertEvent> {
514 self.alert_sender.subscribe()
515 }
516
517 async fn check_and_trigger_alerts(&self) -> Result<()> {
519 if !self.alert_config.enabled {
520 return Ok(());
521 }
522
523 let metrics = self.streaming_metrics.read().await;
524 let _now = Utc::now();
525
526 if metrics.avg_latency_ms > self.alert_config.latency_threshold_ms {
528 self.trigger_alert(
529 AlertType::HighLatency,
530 AlertSeverity::Warning,
531 &format!(
532 "Average latency ({:.2}ms) exceeds threshold ({:.2}ms)",
533 metrics.avg_latency_ms, self.alert_config.latency_threshold_ms
534 ),
535 metrics.avg_latency_ms,
536 self.alert_config.latency_threshold_ms,
537 )
538 .await?;
539 }
540
541 if metrics.error_rate_percent > self.alert_config.error_rate_threshold_percent {
543 self.trigger_alert(
544 AlertType::HighErrorRate,
545 AlertSeverity::Critical,
546 &format!(
547 "Error rate ({:.2}%) exceeds threshold ({:.2}%)",
548 metrics.error_rate_percent, self.alert_config.error_rate_threshold_percent
549 ),
550 metrics.error_rate_percent,
551 self.alert_config.error_rate_threshold_percent,
552 )
553 .await?;
554 }
555
556 let memory_percent = (metrics.memory_usage_mb / 1024.0) * 100.0; if memory_percent > self.alert_config.memory_threshold_percent {
559 self.trigger_alert(
560 AlertType::HighMemoryUsage,
561 AlertSeverity::Warning,
562 &format!(
563 "Memory usage ({:.2}%) exceeds threshold ({:.2}%)",
564 memory_percent, self.alert_config.memory_threshold_percent
565 ),
566 memory_percent,
567 self.alert_config.memory_threshold_percent,
568 )
569 .await?;
570 }
571
572 if metrics.queue_depth > self.alert_config.queue_depth_threshold {
574 self.trigger_alert(
575 AlertType::QueueBacklog,
576 AlertSeverity::Critical,
577 &format!(
578 "Queue depth ({}) exceeds threshold ({})",
579 metrics.queue_depth, self.alert_config.queue_depth_threshold
580 ),
581 metrics.queue_depth as f64,
582 self.alert_config.queue_depth_threshold as f64,
583 )
584 .await?;
585 }
586
587 Ok(())
588 }
589
590 async fn trigger_alert(
592 &self,
593 alert_type: AlertType,
594 severity: AlertSeverity,
595 message: &str,
596 metric_value: f64,
597 threshold: f64,
598 ) -> Result<()> {
599 let alert_key = format!("{alert_type:?}");
600 let now = Utc::now();
601
602 {
604 let last_alert_times = self.last_alert_times.read().await;
605 if let Some(last_time) = last_alert_times.get(&alert_key) {
606 if now.signed_duration_since(*last_time).to_std()?
607 < self.alert_config.cooldown_period
608 {
609 return Ok(()); }
611 }
612 }
613
614 {
616 let mut last_alert_times = self.last_alert_times.write().await;
617 last_alert_times.insert(alert_key, now);
618 }
619
620 let alert = AlertEvent {
622 alert_id: Uuid::new_v4().to_string(),
623 alert_type,
624 severity,
625 message: message.to_string(),
626 metric_value,
627 threshold,
628 timestamp: now,
629 metadata: HashMap::new(),
630 };
631
632 let _ = self.alert_sender.send(alert.clone());
634
635 warn!("Alert triggered: {} - {}", alert.alert_id, alert.message);
636
637 Ok(())
638 }
639
640 async fn export_span_to_jaeger(&self, span: &TraceSpan) -> Result<()> {
642 #[cfg(feature = "opentelemetry")]
643 {
644 if let Some(_tracer) = &self.tracer {
645 debug!(
646 "OpenTelemetry span export is disabled pending dependency stability. Span: {}",
647 span.span_id
648 );
649 } else if let Some(jaeger_endpoint) = &self.config.jaeger_endpoint {
651 debug!(
652 "Tracer not initialized, skipping span export to {}",
653 jaeger_endpoint
654 );
655 }
656 }
657
658 #[cfg(not(feature = "opentelemetry"))]
659 {
660 if let Some(jaeger_endpoint) = &self.config.jaeger_endpoint {
661 debug!(
662 "OpenTelemetry feature not enabled, skipping span export to {}. Span: {}",
663 jaeger_endpoint, span.span_id
664 );
665 }
666 }
667
668 Ok(())
669 }
670
671 pub async fn generate_observability_report(&self) -> Result<String> {
673 let streaming_metrics = self.get_streaming_metrics().await;
674 let business_metrics = self.get_business_metrics().await;
675 let metrics_history = self.get_metrics_history().await;
676
677 let report = format!(
678 r#"
679# OxiRS Stream Observability Report
680Generated: {}
681
682## Performance Metrics
683- Events per Second: {:.2}
684- Average Latency: {:.2}ms
685- P95 Latency: {:.2}ms
686- P99 Latency: {:.2}ms
687- Error Rate: {:.2}%
688- Memory Usage: {:.2}MB
689- CPU Usage: {:.2}%
690- Network I/O: {:.2} Bps
691
692## Business Metrics
693- Total Events Processed: {}
694- Total Events Failed: {}
695- Revenue Events: {}
696- Customer Events: {}
697- Health Score: {:.2}
698- Data Quality Score: {:.2}
699
700## System Health
701- Active Connections: {}
702- Queue Depth: {}
703- Metrics History Length: {}
704
705## Configuration
706- OpenTelemetry Enabled: {}
707- Profiling Enabled: {}
708- Trace Sampling Rate: {:.1}%
709- Alert Thresholds:
710 - Latency: {:.2}ms
711 - Error Rate: {:.2}%
712 - Memory: {:.2}%
713 - Queue Depth: {}
714"#,
715 Utc::now(),
716 streaming_metrics.events_per_second,
717 streaming_metrics.avg_latency_ms,
718 streaming_metrics.p95_latency_ms,
719 streaming_metrics.p99_latency_ms,
720 streaming_metrics.error_rate_percent,
721 streaming_metrics.memory_usage_mb,
722 streaming_metrics.cpu_usage_percent,
723 streaming_metrics.network_io_bps,
724 business_metrics.total_events_processed,
725 business_metrics.total_events_failed,
726 business_metrics.revenue_events_count,
727 business_metrics.customer_events_count,
728 business_metrics.health_score,
729 business_metrics.data_quality_score,
730 streaming_metrics.active_connections,
731 streaming_metrics.queue_depth,
732 metrics_history.len(),
733 self.config.enable_opentelemetry,
734 self.config.enable_profiling,
735 self.config.trace_sampling_rate * 100.0,
736 self.alert_config.latency_threshold_ms,
737 self.alert_config.error_rate_threshold_percent,
738 self.alert_config.memory_threshold_percent,
739 self.alert_config.queue_depth_threshold,
740 );
741
742 Ok(report)
743 }
744}
745
746#[cfg(test)]
747mod tests {
748 use super::*;
749 use tokio::time::{sleep, Duration};
750
751 #[tokio::test]
752 async fn test_observability_creation() {
753 let config = TelemetryConfig::default();
754 let alert_config = AlertConfig::default();
755 let observability = StreamObservability::new(config, alert_config);
756
757 assert_eq!(
758 observability
759 .get_streaming_metrics()
760 .await
761 .events_per_second,
762 0.0
763 );
764 }
765
766 #[tokio::test]
767 async fn test_span_lifecycle() {
768 let config = TelemetryConfig::default();
769 let alert_config = AlertConfig::default();
770 let observability = StreamObservability::new(config, alert_config);
771
772 let span_id = observability.start_span("test_operation", None).await;
773 assert!(!span_id.is_empty());
774
775 observability
776 .add_span_tag(&span_id, "test_key", "test_value")
777 .await
778 .unwrap();
779 observability
780 .add_span_log(&span_id, "info", "Test log message")
781 .await
782 .unwrap();
783
784 observability
785 .finish_span(&span_id, SpanStatus::Ok)
786 .await
787 .unwrap();
788
789 let active_spans = observability.active_spans.read().await;
791 assert!(!active_spans.contains_key(&span_id));
792 }
793
794 #[tokio::test]
795 async fn test_metrics_recording() {
796 let config = TelemetryConfig::default();
797 let alert_config = AlertConfig::default();
798 let observability = StreamObservability::new(config, alert_config);
799
800 let event = crate::event::StreamEvent::TripleAdded {
801 subject: "http://example.org/subject".to_string(),
802 predicate: "http://example.org/predicate".to_string(),
803 object: "\"test_object\"".to_string(),
804 graph: None,
805 metadata: crate::event::EventMetadata::default(),
806 };
807
808 observability
809 .record_event(&event, Duration::from_millis(50))
810 .await
811 .unwrap();
812
813 let metrics = observability.get_streaming_metrics().await;
814 assert!(metrics.avg_latency_ms > 0.0);
815
816 let business_metrics = observability.get_business_metrics().await;
817 assert_eq!(business_metrics.total_events_processed, 1);
818 }
819
820 #[tokio::test]
821 async fn test_alert_system() {
822 let config = TelemetryConfig::default();
823 let alert_config = AlertConfig {
824 latency_threshold_ms: 10.0,
825 ..Default::default()
826 }; let observability = StreamObservability::new(config, alert_config);
829 let mut alert_receiver = observability.subscribe_to_alerts();
830
831 let event = crate::event::StreamEvent::TripleAdded {
832 subject: "http://example.org/subject".to_string(),
833 predicate: "http://example.org/predicate".to_string(),
834 object: "\"test_object\"".to_string(),
835 graph: None,
836 metadata: crate::event::EventMetadata::default(),
837 };
838
839 observability
841 .record_event(&event, Duration::from_millis(100))
842 .await
843 .unwrap();
844
845 tokio::select! {
847 alert = alert_receiver.recv() => {
848 assert!(alert.is_ok());
849 let alert = alert.unwrap();
850 assert!(matches!(alert.alert_type, AlertType::HighLatency));
851 }
852 _ = sleep(Duration::from_millis(100)) => {
853 }
855 }
856 }
857
858 #[tokio::test]
859 async fn test_observability_report() {
860 let config = TelemetryConfig::default();
861 let alert_config = AlertConfig::default();
862 let observability = StreamObservability::new(config, alert_config);
863
864 observability
866 .update_system_metrics(100.0, 50.0, 1000.0)
867 .await
868 .unwrap();
869
870 let report = observability.generate_observability_report().await.unwrap();
871 assert!(report.contains("OxiRS Stream Observability Report"));
872 assert!(report.contains("Memory Usage: 100.00MB"));
873 assert!(report.contains("CPU Usage: 50.00%"));
874 }
875}