Skip to main content

allframe_core/application/
resilience_observability.rs

1//! Observability for resilience operations.
2//!
3//! This module provides metrics collection, tracing instrumentation, and
4//! monitoring capabilities for resilience operations, enabling visibility into
5//! system reliability.
6//!
7//! # Features
8//!
9//! - **Metrics Collection**: Counters, histograms, and gauges for resilience
10//!   operations
11//! - **Tracing Instrumentation**: Detailed traces for policy execution and
12//!   failures
13//! - **Health Checks**: Circuit breaker and service health monitoring
14//! - **Alerting Integration**: Threshold-based alerting for resilience events
15
16use std::{
17    sync::Arc,
18    time::{Duration, Instant},
19};
20
21use crate::{
22    application::resilience::{
23        ResilienceMetrics, ResilienceOrchestrationError, ResilienceOrchestrator,
24    },
25    domain::resilience::{ResilienceDomainError, ResiliencePolicy},
26};
27
28#[cfg(feature = "resilience")]
29use crate::resilience::{CircuitBreaker, RateLimiter};
30
31#[cfg(not(feature = "resilience"))]
32use crate::application::resilience::{CircuitBreaker, RateLimiter};
33
34/// Observability service for resilience operations
35#[derive(Clone)]
36pub struct ResilienceObservability {
37    metrics_collector: Arc<dyn MetricsCollector>,
38    tracer: Arc<dyn ResilienceTracer>,
39}
40
41impl Default for ResilienceObservability {
42    fn default() -> Self {
43        Self::new()
44    }
45}
46
47impl ResilienceObservability {
48    /// Create a new observability service with default implementations
49    pub fn new() -> Self {
50        Self {
51            metrics_collector: Arc::new(NoOpMetricsCollector),
52            tracer: Arc::new(NoOpTracer),
53        }
54    }
55
56    /// Create with custom collector and tracer
57    pub fn with_components(
58        metrics_collector: Arc<dyn MetricsCollector>,
59        tracer: Arc<dyn ResilienceTracer>,
60    ) -> Self {
61        Self {
62            metrics_collector,
63            tracer,
64        }
65    }
66
67    /// Record the start of a resilience operation
68    pub fn record_operation_start(&self, operation_id: &str, policy: &ResiliencePolicy) {
69        self.metrics_collector.increment_counter(
70            "resilience_operations_total",
71            &[("operation", operation_id)],
72        );
73        self.tracer.start_span(
74            "resilience_operation",
75            &[
76                ("operation_id", operation_id),
77                ("policy_type", &policy_type_name(policy)),
78            ],
79        );
80    }
81
82    /// Record the completion of a resilience operation
83    pub fn record_operation_complete(
84        &self,
85        operation_id: &str,
86        policy: &ResiliencePolicy,
87        duration: Duration,
88        result: &Result<(), ResilienceOrchestrationError>,
89    ) {
90        let status = if result.is_ok() { "success" } else { "failure" };
91        let duration_ms = duration.as_millis() as f64;
92
93        // Record metrics
94        self.metrics_collector.increment_counter(
95            "resilience_operations_completed_total",
96            &[("operation", operation_id), ("status", status)],
97        );
98
99        self.metrics_collector.record_histogram(
100            "resilience_operation_duration_ms",
101            duration_ms,
102            &[
103                ("operation", operation_id),
104                ("policy_type", &policy_type_name(policy)),
105            ],
106        );
107
108        // Record policy-specific metrics
109        match policy {
110            ResiliencePolicy::Retry { max_attempts, .. } => {
111                self.metrics_collector.record_histogram(
112                    "resilience_retry_max_attempts",
113                    *max_attempts as f64,
114                    &[("operation", operation_id)],
115                );
116            }
117            ResiliencePolicy::CircuitBreaker {
118                failure_threshold, ..
119            } => {
120                self.metrics_collector.record_gauge(
121                    "resilience_circuit_breaker_failure_threshold",
122                    *failure_threshold as f64,
123                    &[("operation", operation_id)],
124                );
125            }
126            ResiliencePolicy::RateLimit {
127                requests_per_second,
128                ..
129            } => {
130                self.metrics_collector.record_gauge(
131                    "resilience_rate_limit_rps",
132                    *requests_per_second as f64,
133                    &[("operation", operation_id)],
134                );
135            }
136            _ => {}
137        }
138
139        // Handle errors specifically
140        if let Err(error) = result {
141            self.record_operation_error(operation_id, error);
142        }
143
144        // End tracing span
145        self.tracer.end_span(&[
146            ("duration_ms", &duration_ms.to_string()),
147            ("status", status),
148        ]);
149    }
150
151    /// Record resilience-specific errors
152    pub fn record_operation_error(&self, operation_id: &str, error: &ResilienceOrchestrationError) {
153        let error_type = match error {
154            ResilienceOrchestrationError::Domain(domain_error) => match domain_error {
155                ResilienceDomainError::RetryExhausted { .. } => "retry_exhausted",
156                ResilienceDomainError::CircuitOpen => "circuit_open",
157                ResilienceDomainError::RateLimited { .. } => "rate_limited",
158                ResilienceDomainError::Timeout { .. } => "timeout",
159                ResilienceDomainError::Infrastructure { .. } => "infrastructure",
160                _ => "domain_error",
161            },
162            ResilienceOrchestrationError::Infrastructure(_) => "infrastructure",
163            ResilienceOrchestrationError::Configuration(_) => "configuration",
164            ResilienceOrchestrationError::Cancelled => "cancelled",
165        };
166
167        self.metrics_collector.increment_counter(
168            "resilience_operation_errors_total",
169            &[("operation", operation_id), ("error_type", error_type)],
170        );
171
172        self.tracer.add_event(
173            "resilience_error",
174            &[("operation_id", operation_id), ("error_type", error_type)],
175        );
176    }
177
178    /// Record circuit breaker state changes
179    pub fn record_circuit_breaker_state_change(
180        &self,
181        circuit_breaker_id: &str,
182        old_state: CircuitBreakerState,
183        new_state: CircuitBreakerState,
184    ) {
185        self.metrics_collector.increment_counter(
186            "resilience_circuit_breaker_state_changes_total",
187            &[
188                ("circuit_breaker", circuit_breaker_id),
189                ("old_state", old_state.as_str()),
190                ("new_state", new_state.as_str()),
191            ],
192        );
193
194        self.tracer.add_event(
195            "circuit_breaker_state_change",
196            &[
197                ("circuit_breaker_id", circuit_breaker_id),
198                ("old_state", old_state.as_str()),
199                ("new_state", new_state.as_str()),
200            ],
201        );
202    }
203
204    /// Get current health status
205    pub fn health_status(&self) -> ResilienceHealthStatus {
206        // This would typically aggregate metrics to determine overall health
207        ResilienceHealthStatus {
208            overall_health: HealthLevel::Healthy,
209            circuit_breakers_open: 0,
210            services_degraded: 0,
211            last_updated: std::time::SystemTime::now(),
212        }
213    }
214
215    /// Export metrics in Prometheus format
216    pub fn export_prometheus_metrics(&self) -> String {
217        // This would collect all metrics and format them for Prometheus
218        "# AllFrame Resilience Metrics\n# (Implementation would export actual metrics)\n"
219            .to_string()
220    }
221}
222
223/// Circuit breaker states for observability
224#[derive(Clone, Copy, Debug, PartialEq)]
225pub enum CircuitBreakerState {
226    Closed,
227    Open,
228    HalfOpen,
229}
230
231impl CircuitBreakerState {
232    pub fn as_str(&self) -> &'static str {
233        match self {
234            CircuitBreakerState::Closed => "closed",
235            CircuitBreakerState::Open => "open",
236            CircuitBreakerState::HalfOpen => "half_open",
237        }
238    }
239}
240
241/// Overall health status of the resilience system
242#[derive(Clone, Debug)]
243pub struct ResilienceHealthStatus {
244    pub overall_health: HealthLevel,
245    pub circuit_breakers_open: u32,
246    pub services_degraded: u32,
247    pub last_updated: std::time::SystemTime,
248}
249
250/// Health levels for services
251#[derive(Clone, Copy, Debug, PartialEq)]
252pub enum HealthLevel {
253    Healthy,
254    Degraded,
255    Unhealthy,
256    Unknown,
257}
258
259/// Metrics collection trait
260#[async_trait::async_trait]
261pub trait MetricsCollector: Send + Sync {
262    /// Increment a counter metric
263    fn increment_counter(&self, name: &str, labels: &[(&str, &str)]);
264
265    /// Record a histogram value
266    fn record_histogram(&self, name: &str, value: f64, labels: &[(&str, &str)]);
267
268    /// Set a gauge value
269    fn record_gauge(&self, name: &str, value: f64, labels: &[(&str, &str)]);
270}
271
272/// Tracing instrumentation trait
273#[async_trait::async_trait]
274pub trait ResilienceTracer: Send + Sync {
275    /// Start a new trace span
276    fn start_span(&self, name: &str, attributes: &[(&str, &str)]);
277
278    /// End the current span
279    fn end_span(&self, attributes: &[(&str, &str)]);
280
281    /// Add an event to the current span
282    fn add_event(&self, name: &str, attributes: &[(&str, &str)]);
283}
284
285/// No-op implementation for when observability is disabled
286pub struct NoOpMetricsCollector;
287
288impl MetricsCollector for NoOpMetricsCollector {
289    fn increment_counter(&self, _name: &str, _labels: &[(&str, &str)]) {}
290    fn record_histogram(&self, _name: &str, _value: f64, _labels: &[(&str, &str)]) {}
291    fn record_gauge(&self, _name: &str, _value: f64, _labels: &[(&str, &str)]) {}
292}
293
294/// No-op tracer implementation
295pub struct NoOpTracer;
296
297#[async_trait::async_trait]
298impl ResilienceTracer for NoOpTracer {
299    fn start_span(&self, _name: &str, _attributes: &[(&str, &str)]) {}
300    fn end_span(&self, _attributes: &[(&str, &str)]) {}
301    fn add_event(&self, _name: &str, _attributes: &[(&str, &str)]) {}
302}
303
304/// Prometheus metrics collector implementation
305#[cfg(feature = "prometheus")]
306pub mod prometheus_metrics {
307    use std::collections::HashMap;
308    use std::sync::RwLock;
309
310    use ::prometheus::{CounterVec, GaugeVec, HistogramVec, Opts};
311
312    use super::*;
313
314    /// Prometheus-backed metrics collector for resilience observability
315    pub struct PrometheusMetricsCollector {
316        counters: RwLock<HashMap<String, CounterVec>>,
317        histograms: RwLock<HashMap<String, HistogramVec>>,
318        gauges: RwLock<HashMap<String, GaugeVec>>,
319    }
320
321    impl PrometheusMetricsCollector {
322        /// Create a new Prometheus metrics collector
323        pub fn new() -> Self {
324            Self {
325                counters: RwLock::new(HashMap::new()),
326                histograms: RwLock::new(HashMap::new()),
327                gauges: RwLock::new(HashMap::new()),
328            }
329        }
330
331        fn label_values<'a>(labels: &'a [(&'a str, &'a str)]) -> Vec<&'a str> {
332            labels.iter().map(|(_, v)| *v).collect()
333        }
334    }
335
336    impl MetricsCollector for PrometheusMetricsCollector {
337        fn increment_counter(&self, name: &str, labels: &[(&str, &str)]) {
338            let label_names: Vec<&str> = labels.iter().map(|(k, _)| *k).collect();
339            let label_vals = Self::label_values(labels);
340
341            let counters = self.counters.read().unwrap();
342            if let Some(counter) = counters.get(name) {
343                if let Ok(m) = counter.get_metric_with_label_values(&label_vals) {
344                    m.inc();
345                }
346                return;
347            }
348            drop(counters);
349
350            let mut counters = self.counters.write().unwrap();
351            let counter = counters.entry(name.to_string()).or_insert_with(|| {
352                let c = CounterVec::new(Opts::new(name, name), &label_names)
353                    .expect("Failed to create counter");
354                let _ = ::prometheus::register(Box::new(c.clone()));
355                c
356            });
357            if let Ok(m) = counter.get_metric_with_label_values(&label_vals) {
358                m.inc();
359            }
360        }
361
362        fn record_histogram(&self, name: &str, value: f64, labels: &[(&str, &str)]) {
363            let label_names: Vec<&str> = labels.iter().map(|(k, _)| *k).collect();
364            let label_vals = Self::label_values(labels);
365
366            let histograms = self.histograms.read().unwrap();
367            if let Some(hist) = histograms.get(name) {
368                if let Ok(m) = hist.get_metric_with_label_values(&label_vals) {
369                    m.observe(value);
370                }
371                return;
372            }
373            drop(histograms);
374
375            let mut histograms = self.histograms.write().unwrap();
376            let hist = histograms.entry(name.to_string()).or_insert_with(|| {
377                let h = HistogramVec::new(
378                    ::prometheus::HistogramOpts::new(name, name),
379                    &label_names,
380                )
381                .expect("Failed to create histogram");
382                let _ = ::prometheus::register(Box::new(h.clone()));
383                h
384            });
385            if let Ok(m) = hist.get_metric_with_label_values(&label_vals) {
386                m.observe(value);
387            }
388        }
389
390        fn record_gauge(&self, name: &str, value: f64, labels: &[(&str, &str)]) {
391            let label_names: Vec<&str> = labels.iter().map(|(k, _)| *k).collect();
392            let label_vals = Self::label_values(labels);
393
394            let gauges = self.gauges.read().unwrap();
395            if let Some(gauge) = gauges.get(name) {
396                if let Ok(m) = gauge.get_metric_with_label_values(&label_vals) {
397                    m.set(value);
398                }
399                return;
400            }
401            drop(gauges);
402
403            let mut gauges = self.gauges.write().unwrap();
404            let gauge = gauges.entry(name.to_string()).or_insert_with(|| {
405                let g = GaugeVec::new(Opts::new(name, name), &label_names)
406                    .expect("Failed to create gauge");
407                let _ = ::prometheus::register(Box::new(g.clone()));
408                g
409            });
410            if let Ok(m) = gauge.get_metric_with_label_values(&label_vals) {
411                m.set(value);
412            }
413        }
414    }
415}
416
417/// Helper function to get policy type name for metrics
418fn policy_type_name(policy: &ResiliencePolicy) -> String {
419    match policy {
420        ResiliencePolicy::None => "none".to_string(),
421        ResiliencePolicy::Retry { .. } => "retry".to_string(),
422        ResiliencePolicy::CircuitBreaker { .. } => "circuit_breaker".to_string(),
423        ResiliencePolicy::RateLimit { .. } => "rate_limit".to_string(),
424        ResiliencePolicy::Timeout { .. } => "timeout".to_string(),
425        ResiliencePolicy::Combined { .. } => "combined".to_string(),
426    }
427}
428
429/// Instrumented wrapper for resilience orchestrator
430pub struct InstrumentedResilienceOrchestrator<T: ResilienceOrchestrator> {
431    inner: T,
432    observability: ResilienceObservability,
433}
434
435impl<T: ResilienceOrchestrator> InstrumentedResilienceOrchestrator<T> {
436    pub fn new(inner: T, observability: ResilienceObservability) -> Self {
437        Self {
438            inner,
439            observability,
440        }
441    }
442}
443
444#[async_trait::async_trait]
445impl<T: ResilienceOrchestrator> ResilienceOrchestrator for InstrumentedResilienceOrchestrator<T> {
446    async fn execute_with_policy<V, F, Fut, E>(
447        &self,
448        policy: ResiliencePolicy,
449        operation: F,
450    ) -> Result<V, ResilienceOrchestrationError>
451    where
452        F: FnMut() -> Fut + Send,
453        Fut: std::future::Future<Output = Result<V, E>> + Send,
454        E: Into<ResilienceOrchestrationError> + Send,
455    {
456        let operation_id = "anonymous_operation"; // In a real implementation, this would be configurable
457        let start_time = Instant::now();
458
459        let policy_clone = policy.clone();
460
461        self.observability
462            .record_operation_start(operation_id, &policy_clone);
463
464        let result = self.inner.execute_with_policy(policy, operation).await;
465
466        let duration = start_time.elapsed();
467        match &result {
468            Ok(_) => {
469                self.observability.record_operation_complete(
470                    operation_id,
471                    &policy_clone,
472                    duration,
473                    &Ok(()),
474                );
475            }
476            Err(ref err) => {
477                // Clone the original error to preserve its type for observability
478                let cloned_err = match err {
479                    ResilienceOrchestrationError::Domain(d) => {
480                        ResilienceOrchestrationError::Domain(d.clone())
481                    }
482                    ResilienceOrchestrationError::Infrastructure(s) => {
483                        ResilienceOrchestrationError::Infrastructure(s.clone())
484                    }
485                    ResilienceOrchestrationError::Configuration(s) => {
486                        ResilienceOrchestrationError::Configuration(s.clone())
487                    }
488                    ResilienceOrchestrationError::Cancelled => {
489                        ResilienceOrchestrationError::Cancelled
490                    }
491                };
492                self.observability.record_operation_complete(
493                    operation_id,
494                    &policy_clone,
495                    duration,
496                    &Err(cloned_err),
497                );
498            }
499        }
500
501        result
502    }
503
504    fn get_circuit_breaker(&self, name: &str) -> Option<&CircuitBreaker> {
505        self.inner.get_circuit_breaker(name)
506    }
507
508    fn get_rate_limiter(&self, name: &str) -> Option<&RateLimiter> {
509        self.inner.get_rate_limiter(name)
510    }
511
512    fn metrics(&self) -> ResilienceMetrics {
513        self.inner.metrics()
514    }
515}
516
517#[cfg(test)]
518mod tests {
519    use super::*;
520
521    #[tokio::test]
522    async fn test_observability_recording() {
523        let observability = ResilienceObservability::new();
524        let policy = ResiliencePolicy::Retry {
525            max_attempts: 3,
526            backoff: crate::domain::resilience::BackoffStrategy::default(),
527        };
528
529        // Record operation lifecycle
530        observability.record_operation_start("test_operation", &policy);
531
532        let duration = Duration::from_millis(150);
533        let result = Ok(());
534
535        observability.record_operation_complete("test_operation", &policy, duration, &result);
536
537        // Verify health status works
538        let health = observability.health_status();
539        assert_eq!(health.overall_health, HealthLevel::Healthy);
540    }
541
542    #[test]
543    fn test_policy_type_name() {
544        assert_eq!(policy_type_name(&ResiliencePolicy::None), "none");
545        assert_eq!(
546            policy_type_name(&ResiliencePolicy::Retry {
547                max_attempts: 3,
548                backoff: crate::domain::resilience::BackoffStrategy::default(),
549            }),
550            "retry"
551        );
552        assert_eq!(
553            policy_type_name(&ResiliencePolicy::CircuitBreaker {
554                failure_threshold: 5,
555                recovery_timeout: Duration::from_secs(30),
556                success_threshold: 3,
557            }),
558            "circuit_breaker"
559        );
560    }
561
562    #[test]
563    fn test_circuit_breaker_state_transitions() {
564        let observability = ResilienceObservability::new();
565
566        observability.record_circuit_breaker_state_change(
567            "test_circuit",
568            CircuitBreakerState::Closed,
569            CircuitBreakerState::Open,
570        );
571
572        // In a real implementation, this would update metrics
573        let health = observability.health_status();
574        assert_eq!(health.circuit_breakers_open, 0); // No-op implementation
575    }
576
577    #[test]
578    fn test_prometheus_export() {
579        let observability = ResilienceObservability::new();
580        let metrics = observability.export_prometheus_metrics();
581        assert!(metrics.contains("#"));
582    }
583}