1use std::{
17 sync::Arc,
18 time::{Duration, Instant},
19};
20
21use crate::{
22 application::resilience::{
23 ResilienceMetrics, ResilienceOrchestrationError, ResilienceOrchestrator,
24 },
25 domain::resilience::{ResilienceDomainError, ResiliencePolicy},
26};
27
28#[cfg(feature = "resilience")]
29use crate::resilience::{CircuitBreaker, RateLimiter};
30
31#[cfg(not(feature = "resilience"))]
32use crate::application::resilience::{CircuitBreaker, RateLimiter};
33
34#[derive(Clone)]
36pub struct ResilienceObservability {
37 metrics_collector: Arc<dyn MetricsCollector>,
38 tracer: Arc<dyn ResilienceTracer>,
39}
40
41impl Default for ResilienceObservability {
42 fn default() -> Self {
43 Self::new()
44 }
45}
46
47impl ResilienceObservability {
48 pub fn new() -> Self {
50 Self {
51 metrics_collector: Arc::new(NoOpMetricsCollector),
52 tracer: Arc::new(NoOpTracer),
53 }
54 }
55
56 pub fn with_components(
58 metrics_collector: Arc<dyn MetricsCollector>,
59 tracer: Arc<dyn ResilienceTracer>,
60 ) -> Self {
61 Self {
62 metrics_collector,
63 tracer,
64 }
65 }
66
67 pub fn record_operation_start(&self, operation_id: &str, policy: &ResiliencePolicy) {
69 self.metrics_collector.increment_counter(
70 "resilience_operations_total",
71 &[("operation", operation_id)],
72 );
73 self.tracer.start_span(
74 "resilience_operation",
75 &[
76 ("operation_id", operation_id),
77 ("policy_type", &policy_type_name(policy)),
78 ],
79 );
80 }
81
82 pub fn record_operation_complete(
84 &self,
85 operation_id: &str,
86 policy: &ResiliencePolicy,
87 duration: Duration,
88 result: &Result<(), ResilienceOrchestrationError>,
89 ) {
90 let status = if result.is_ok() { "success" } else { "failure" };
91 let duration_ms = duration.as_millis() as f64;
92
93 self.metrics_collector.increment_counter(
95 "resilience_operations_completed_total",
96 &[("operation", operation_id), ("status", status)],
97 );
98
99 self.metrics_collector.record_histogram(
100 "resilience_operation_duration_ms",
101 duration_ms,
102 &[
103 ("operation", operation_id),
104 ("policy_type", &policy_type_name(policy)),
105 ],
106 );
107
108 match policy {
110 ResiliencePolicy::Retry { max_attempts, .. } => {
111 self.metrics_collector.record_histogram(
112 "resilience_retry_max_attempts",
113 *max_attempts as f64,
114 &[("operation", operation_id)],
115 );
116 }
117 ResiliencePolicy::CircuitBreaker {
118 failure_threshold, ..
119 } => {
120 self.metrics_collector.record_gauge(
121 "resilience_circuit_breaker_failure_threshold",
122 *failure_threshold as f64,
123 &[("operation", operation_id)],
124 );
125 }
126 ResiliencePolicy::RateLimit {
127 requests_per_second,
128 ..
129 } => {
130 self.metrics_collector.record_gauge(
131 "resilience_rate_limit_rps",
132 *requests_per_second as f64,
133 &[("operation", operation_id)],
134 );
135 }
136 _ => {}
137 }
138
139 if let Err(error) = result {
141 self.record_operation_error(operation_id, error);
142 }
143
144 self.tracer.end_span(&[
146 ("duration_ms", &duration_ms.to_string()),
147 ("status", status),
148 ]);
149 }
150
151 pub fn record_operation_error(&self, operation_id: &str, error: &ResilienceOrchestrationError) {
153 let error_type = match error {
154 ResilienceOrchestrationError::Domain(domain_error) => match domain_error {
155 ResilienceDomainError::RetryExhausted { .. } => "retry_exhausted",
156 ResilienceDomainError::CircuitOpen => "circuit_open",
157 ResilienceDomainError::RateLimited { .. } => "rate_limited",
158 ResilienceDomainError::Timeout { .. } => "timeout",
159 ResilienceDomainError::Infrastructure { .. } => "infrastructure",
160 _ => "domain_error",
161 },
162 ResilienceOrchestrationError::Infrastructure(_) => "infrastructure",
163 ResilienceOrchestrationError::Configuration(_) => "configuration",
164 ResilienceOrchestrationError::Cancelled => "cancelled",
165 };
166
167 self.metrics_collector.increment_counter(
168 "resilience_operation_errors_total",
169 &[("operation", operation_id), ("error_type", error_type)],
170 );
171
172 self.tracer.add_event(
173 "resilience_error",
174 &[("operation_id", operation_id), ("error_type", error_type)],
175 );
176 }
177
178 pub fn record_circuit_breaker_state_change(
180 &self,
181 circuit_breaker_id: &str,
182 old_state: CircuitBreakerState,
183 new_state: CircuitBreakerState,
184 ) {
185 self.metrics_collector.increment_counter(
186 "resilience_circuit_breaker_state_changes_total",
187 &[
188 ("circuit_breaker", circuit_breaker_id),
189 ("old_state", old_state.as_str()),
190 ("new_state", new_state.as_str()),
191 ],
192 );
193
194 self.tracer.add_event(
195 "circuit_breaker_state_change",
196 &[
197 ("circuit_breaker_id", circuit_breaker_id),
198 ("old_state", old_state.as_str()),
199 ("new_state", new_state.as_str()),
200 ],
201 );
202 }
203
204 pub fn health_status(&self) -> ResilienceHealthStatus {
206 ResilienceHealthStatus {
208 overall_health: HealthLevel::Healthy,
209 circuit_breakers_open: 0,
210 services_degraded: 0,
211 last_updated: std::time::SystemTime::now(),
212 }
213 }
214
215 pub fn export_prometheus_metrics(&self) -> String {
217 "# AllFrame Resilience Metrics\n# (Implementation would export actual metrics)\n"
219 .to_string()
220 }
221}
222
223#[derive(Clone, Copy, Debug, PartialEq)]
225pub enum CircuitBreakerState {
226 Closed,
227 Open,
228 HalfOpen,
229}
230
231impl CircuitBreakerState {
232 pub fn as_str(&self) -> &'static str {
233 match self {
234 CircuitBreakerState::Closed => "closed",
235 CircuitBreakerState::Open => "open",
236 CircuitBreakerState::HalfOpen => "half_open",
237 }
238 }
239}
240
241#[derive(Clone, Debug)]
243pub struct ResilienceHealthStatus {
244 pub overall_health: HealthLevel,
245 pub circuit_breakers_open: u32,
246 pub services_degraded: u32,
247 pub last_updated: std::time::SystemTime,
248}
249
250#[derive(Clone, Copy, Debug, PartialEq)]
252pub enum HealthLevel {
253 Healthy,
254 Degraded,
255 Unhealthy,
256 Unknown,
257}
258
259#[async_trait::async_trait]
261pub trait MetricsCollector: Send + Sync {
262 fn increment_counter(&self, name: &str, labels: &[(&str, &str)]);
264
265 fn record_histogram(&self, name: &str, value: f64, labels: &[(&str, &str)]);
267
268 fn record_gauge(&self, name: &str, value: f64, labels: &[(&str, &str)]);
270}
271
272#[async_trait::async_trait]
274pub trait ResilienceTracer: Send + Sync {
275 fn start_span(&self, name: &str, attributes: &[(&str, &str)]);
277
278 fn end_span(&self, attributes: &[(&str, &str)]);
280
281 fn add_event(&self, name: &str, attributes: &[(&str, &str)]);
283}
284
285pub struct NoOpMetricsCollector;
287
288impl MetricsCollector for NoOpMetricsCollector {
289 fn increment_counter(&self, _name: &str, _labels: &[(&str, &str)]) {}
290 fn record_histogram(&self, _name: &str, _value: f64, _labels: &[(&str, &str)]) {}
291 fn record_gauge(&self, _name: &str, _value: f64, _labels: &[(&str, &str)]) {}
292}
293
294pub struct NoOpTracer;
296
297#[async_trait::async_trait]
298impl ResilienceTracer for NoOpTracer {
299 fn start_span(&self, _name: &str, _attributes: &[(&str, &str)]) {}
300 fn end_span(&self, _attributes: &[(&str, &str)]) {}
301 fn add_event(&self, _name: &str, _attributes: &[(&str, &str)]) {}
302}
303
304#[cfg(feature = "prometheus")]
306pub mod prometheus_metrics {
307 use std::collections::HashMap;
308 use std::sync::RwLock;
309
310 use ::prometheus::{CounterVec, GaugeVec, HistogramVec, Opts};
311
312 use super::*;
313
314 pub struct PrometheusMetricsCollector {
316 counters: RwLock<HashMap<String, CounterVec>>,
317 histograms: RwLock<HashMap<String, HistogramVec>>,
318 gauges: RwLock<HashMap<String, GaugeVec>>,
319 }
320
321 impl PrometheusMetricsCollector {
322 pub fn new() -> Self {
324 Self {
325 counters: RwLock::new(HashMap::new()),
326 histograms: RwLock::new(HashMap::new()),
327 gauges: RwLock::new(HashMap::new()),
328 }
329 }
330
331 fn label_values<'a>(labels: &'a [(&'a str, &'a str)]) -> Vec<&'a str> {
332 labels.iter().map(|(_, v)| *v).collect()
333 }
334 }
335
336 impl MetricsCollector for PrometheusMetricsCollector {
337 fn increment_counter(&self, name: &str, labels: &[(&str, &str)]) {
338 let label_names: Vec<&str> = labels.iter().map(|(k, _)| *k).collect();
339 let label_vals = Self::label_values(labels);
340
341 let counters = self.counters.read().unwrap();
342 if let Some(counter) = counters.get(name) {
343 if let Ok(m) = counter.get_metric_with_label_values(&label_vals) {
344 m.inc();
345 }
346 return;
347 }
348 drop(counters);
349
350 let mut counters = self.counters.write().unwrap();
351 let counter = counters.entry(name.to_string()).or_insert_with(|| {
352 let c = CounterVec::new(Opts::new(name, name), &label_names)
353 .expect("Failed to create counter");
354 let _ = ::prometheus::register(Box::new(c.clone()));
355 c
356 });
357 if let Ok(m) = counter.get_metric_with_label_values(&label_vals) {
358 m.inc();
359 }
360 }
361
362 fn record_histogram(&self, name: &str, value: f64, labels: &[(&str, &str)]) {
363 let label_names: Vec<&str> = labels.iter().map(|(k, _)| *k).collect();
364 let label_vals = Self::label_values(labels);
365
366 let histograms = self.histograms.read().unwrap();
367 if let Some(hist) = histograms.get(name) {
368 if let Ok(m) = hist.get_metric_with_label_values(&label_vals) {
369 m.observe(value);
370 }
371 return;
372 }
373 drop(histograms);
374
375 let mut histograms = self.histograms.write().unwrap();
376 let hist = histograms.entry(name.to_string()).or_insert_with(|| {
377 let h = HistogramVec::new(
378 ::prometheus::HistogramOpts::new(name, name),
379 &label_names,
380 )
381 .expect("Failed to create histogram");
382 let _ = ::prometheus::register(Box::new(h.clone()));
383 h
384 });
385 if let Ok(m) = hist.get_metric_with_label_values(&label_vals) {
386 m.observe(value);
387 }
388 }
389
390 fn record_gauge(&self, name: &str, value: f64, labels: &[(&str, &str)]) {
391 let label_names: Vec<&str> = labels.iter().map(|(k, _)| *k).collect();
392 let label_vals = Self::label_values(labels);
393
394 let gauges = self.gauges.read().unwrap();
395 if let Some(gauge) = gauges.get(name) {
396 if let Ok(m) = gauge.get_metric_with_label_values(&label_vals) {
397 m.set(value);
398 }
399 return;
400 }
401 drop(gauges);
402
403 let mut gauges = self.gauges.write().unwrap();
404 let gauge = gauges.entry(name.to_string()).or_insert_with(|| {
405 let g = GaugeVec::new(Opts::new(name, name), &label_names)
406 .expect("Failed to create gauge");
407 let _ = ::prometheus::register(Box::new(g.clone()));
408 g
409 });
410 if let Ok(m) = gauge.get_metric_with_label_values(&label_vals) {
411 m.set(value);
412 }
413 }
414 }
415}
416
417fn policy_type_name(policy: &ResiliencePolicy) -> String {
419 match policy {
420 ResiliencePolicy::None => "none".to_string(),
421 ResiliencePolicy::Retry { .. } => "retry".to_string(),
422 ResiliencePolicy::CircuitBreaker { .. } => "circuit_breaker".to_string(),
423 ResiliencePolicy::RateLimit { .. } => "rate_limit".to_string(),
424 ResiliencePolicy::Timeout { .. } => "timeout".to_string(),
425 ResiliencePolicy::Combined { .. } => "combined".to_string(),
426 }
427}
428
429pub struct InstrumentedResilienceOrchestrator<T: ResilienceOrchestrator> {
431 inner: T,
432 observability: ResilienceObservability,
433}
434
435impl<T: ResilienceOrchestrator> InstrumentedResilienceOrchestrator<T> {
436 pub fn new(inner: T, observability: ResilienceObservability) -> Self {
437 Self {
438 inner,
439 observability,
440 }
441 }
442}
443
444#[async_trait::async_trait]
445impl<T: ResilienceOrchestrator> ResilienceOrchestrator for InstrumentedResilienceOrchestrator<T> {
446 async fn execute_with_policy<V, F, Fut, E>(
447 &self,
448 policy: ResiliencePolicy,
449 operation: F,
450 ) -> Result<V, ResilienceOrchestrationError>
451 where
452 F: FnMut() -> Fut + Send,
453 Fut: std::future::Future<Output = Result<V, E>> + Send,
454 E: Into<ResilienceOrchestrationError> + Send,
455 {
456 let operation_id = "anonymous_operation"; let start_time = Instant::now();
458
459 let policy_clone = policy.clone();
460
461 self.observability
462 .record_operation_start(operation_id, &policy_clone);
463
464 let result = self.inner.execute_with_policy(policy, operation).await;
465
466 let duration = start_time.elapsed();
467 match &result {
468 Ok(_) => {
469 self.observability.record_operation_complete(
470 operation_id,
471 &policy_clone,
472 duration,
473 &Ok(()),
474 );
475 }
476 Err(ref err) => {
477 let cloned_err = match err {
479 ResilienceOrchestrationError::Domain(d) => {
480 ResilienceOrchestrationError::Domain(d.clone())
481 }
482 ResilienceOrchestrationError::Infrastructure(s) => {
483 ResilienceOrchestrationError::Infrastructure(s.clone())
484 }
485 ResilienceOrchestrationError::Configuration(s) => {
486 ResilienceOrchestrationError::Configuration(s.clone())
487 }
488 ResilienceOrchestrationError::Cancelled => {
489 ResilienceOrchestrationError::Cancelled
490 }
491 };
492 self.observability.record_operation_complete(
493 operation_id,
494 &policy_clone,
495 duration,
496 &Err(cloned_err),
497 );
498 }
499 }
500
501 result
502 }
503
504 fn get_circuit_breaker(&self, name: &str) -> Option<&CircuitBreaker> {
505 self.inner.get_circuit_breaker(name)
506 }
507
508 fn get_rate_limiter(&self, name: &str) -> Option<&RateLimiter> {
509 self.inner.get_rate_limiter(name)
510 }
511
512 fn metrics(&self) -> ResilienceMetrics {
513 self.inner.metrics()
514 }
515}
516
517#[cfg(test)]
518mod tests {
519 use super::*;
520
521 #[tokio::test]
522 async fn test_observability_recording() {
523 let observability = ResilienceObservability::new();
524 let policy = ResiliencePolicy::Retry {
525 max_attempts: 3,
526 backoff: crate::domain::resilience::BackoffStrategy::default(),
527 };
528
529 observability.record_operation_start("test_operation", &policy);
531
532 let duration = Duration::from_millis(150);
533 let result = Ok(());
534
535 observability.record_operation_complete("test_operation", &policy, duration, &result);
536
537 let health = observability.health_status();
539 assert_eq!(health.overall_health, HealthLevel::Healthy);
540 }
541
542 #[test]
543 fn test_policy_type_name() {
544 assert_eq!(policy_type_name(&ResiliencePolicy::None), "none");
545 assert_eq!(
546 policy_type_name(&ResiliencePolicy::Retry {
547 max_attempts: 3,
548 backoff: crate::domain::resilience::BackoffStrategy::default(),
549 }),
550 "retry"
551 );
552 assert_eq!(
553 policy_type_name(&ResiliencePolicy::CircuitBreaker {
554 failure_threshold: 5,
555 recovery_timeout: Duration::from_secs(30),
556 success_threshold: 3,
557 }),
558 "circuit_breaker"
559 );
560 }
561
562 #[test]
563 fn test_circuit_breaker_state_transitions() {
564 let observability = ResilienceObservability::new();
565
566 observability.record_circuit_breaker_state_change(
567 "test_circuit",
568 CircuitBreakerState::Closed,
569 CircuitBreakerState::Open,
570 );
571
572 let health = observability.health_status();
574 assert_eq!(health.circuit_breakers_open, 0); }
576
577 #[test]
578 fn test_prometheus_export() {
579 let observability = ResilienceObservability::new();
580 let metrics = observability.export_prometheus_metrics();
581 assert!(metrics.contains("#"));
582 }
583}