use std::{
sync::Arc,
time::{Duration, Instant},
};
use crate::{
application::resilience::{
ResilienceMetrics, ResilienceOrchestrationError, ResilienceOrchestrator,
},
domain::resilience::{ResilienceDomainError, ResiliencePolicy},
};
#[cfg(feature = "resilience")]
use crate::resilience::{CircuitBreaker, RateLimiter};
#[cfg(not(feature = "resilience"))]
use crate::application::resilience::{CircuitBreaker, RateLimiter};
#[derive(Clone)]
pub struct ResilienceObservability {
metrics_collector: Arc<dyn MetricsCollector>,
tracer: Arc<dyn ResilienceTracer>,
}
impl Default for ResilienceObservability {
fn default() -> Self {
Self::new()
}
}
impl ResilienceObservability {
pub fn new() -> Self {
Self {
metrics_collector: Arc::new(NoOpMetricsCollector),
tracer: Arc::new(NoOpTracer),
}
}
pub fn with_components(
metrics_collector: Arc<dyn MetricsCollector>,
tracer: Arc<dyn ResilienceTracer>,
) -> Self {
Self {
metrics_collector,
tracer,
}
}
pub fn record_operation_start(&self, operation_id: &str, policy: &ResiliencePolicy) {
self.metrics_collector.increment_counter(
"resilience_operations_total",
&[("operation", operation_id)],
);
self.tracer.start_span(
"resilience_operation",
&[
("operation_id", operation_id),
("policy_type", &policy_type_name(policy)),
],
);
}
pub fn record_operation_complete(
&self,
operation_id: &str,
policy: &ResiliencePolicy,
duration: Duration,
result: &Result<(), ResilienceOrchestrationError>,
) {
let status = if result.is_ok() { "success" } else { "failure" };
let duration_ms = duration.as_millis() as f64;
self.metrics_collector.increment_counter(
"resilience_operations_completed_total",
&[("operation", operation_id), ("status", status)],
);
self.metrics_collector.record_histogram(
"resilience_operation_duration_ms",
duration_ms,
&[
("operation", operation_id),
("policy_type", &policy_type_name(policy)),
],
);
match policy {
ResiliencePolicy::Retry { max_attempts, .. } => {
self.metrics_collector.record_histogram(
"resilience_retry_max_attempts",
*max_attempts as f64,
&[("operation", operation_id)],
);
}
ResiliencePolicy::CircuitBreaker {
failure_threshold, ..
} => {
self.metrics_collector.record_gauge(
"resilience_circuit_breaker_failure_threshold",
*failure_threshold as f64,
&[("operation", operation_id)],
);
}
ResiliencePolicy::RateLimit {
requests_per_second,
..
} => {
self.metrics_collector.record_gauge(
"resilience_rate_limit_rps",
*requests_per_second as f64,
&[("operation", operation_id)],
);
}
_ => {}
}
if let Err(error) = result {
self.record_operation_error(operation_id, error);
}
self.tracer.end_span(&[
("duration_ms", &duration_ms.to_string()),
("status", status),
]);
}
pub fn record_operation_error(&self, operation_id: &str, error: &ResilienceOrchestrationError) {
let error_type = match error {
ResilienceOrchestrationError::Domain(domain_error) => match domain_error {
ResilienceDomainError::RetryExhausted { .. } => "retry_exhausted",
ResilienceDomainError::CircuitOpen => "circuit_open",
ResilienceDomainError::RateLimited { .. } => "rate_limited",
ResilienceDomainError::Timeout { .. } => "timeout",
ResilienceDomainError::Infrastructure { .. } => "infrastructure",
_ => "domain_error",
},
ResilienceOrchestrationError::Infrastructure(_) => "infrastructure",
ResilienceOrchestrationError::Configuration(_) => "configuration",
ResilienceOrchestrationError::Cancelled => "cancelled",
};
self.metrics_collector.increment_counter(
"resilience_operation_errors_total",
&[("operation", operation_id), ("error_type", error_type)],
);
self.tracer.add_event(
"resilience_error",
&[("operation_id", operation_id), ("error_type", error_type)],
);
}
pub fn record_circuit_breaker_state_change(
&self,
circuit_breaker_id: &str,
old_state: CircuitBreakerState,
new_state: CircuitBreakerState,
) {
self.metrics_collector.increment_counter(
"resilience_circuit_breaker_state_changes_total",
&[
("circuit_breaker", circuit_breaker_id),
("old_state", old_state.as_str()),
("new_state", new_state.as_str()),
],
);
self.tracer.add_event(
"circuit_breaker_state_change",
&[
("circuit_breaker_id", circuit_breaker_id),
("old_state", old_state.as_str()),
("new_state", new_state.as_str()),
],
);
}
pub fn health_status(&self) -> ResilienceHealthStatus {
ResilienceHealthStatus {
overall_health: HealthLevel::Healthy,
circuit_breakers_open: 0,
services_degraded: 0,
last_updated: std::time::SystemTime::now(),
}
}
pub fn export_prometheus_metrics(&self) -> String {
"# AllFrame Resilience Metrics\n# (Implementation would export actual metrics)\n"
.to_string()
}
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum CircuitBreakerState {
Closed,
Open,
HalfOpen,
}
impl CircuitBreakerState {
pub fn as_str(&self) -> &'static str {
match self {
CircuitBreakerState::Closed => "closed",
CircuitBreakerState::Open => "open",
CircuitBreakerState::HalfOpen => "half_open",
}
}
}
#[derive(Clone, Debug)]
pub struct ResilienceHealthStatus {
pub overall_health: HealthLevel,
pub circuit_breakers_open: u32,
pub services_degraded: u32,
pub last_updated: std::time::SystemTime,
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum HealthLevel {
Healthy,
Degraded,
Unhealthy,
Unknown,
}
#[async_trait::async_trait]
pub trait MetricsCollector: Send + Sync {
fn increment_counter(&self, name: &str, labels: &[(&str, &str)]);
fn record_histogram(&self, name: &str, value: f64, labels: &[(&str, &str)]);
fn record_gauge(&self, name: &str, value: f64, labels: &[(&str, &str)]);
}
#[async_trait::async_trait]
pub trait ResilienceTracer: Send + Sync {
fn start_span(&self, name: &str, attributes: &[(&str, &str)]);
fn end_span(&self, attributes: &[(&str, &str)]);
fn add_event(&self, name: &str, attributes: &[(&str, &str)]);
}
pub struct NoOpMetricsCollector;
impl MetricsCollector for NoOpMetricsCollector {
fn increment_counter(&self, _name: &str, _labels: &[(&str, &str)]) {}
fn record_histogram(&self, _name: &str, _value: f64, _labels: &[(&str, &str)]) {}
fn record_gauge(&self, _name: &str, _value: f64, _labels: &[(&str, &str)]) {}
}
pub struct NoOpTracer;
#[async_trait::async_trait]
impl ResilienceTracer for NoOpTracer {
fn start_span(&self, _name: &str, _attributes: &[(&str, &str)]) {}
fn end_span(&self, _attributes: &[(&str, &str)]) {}
fn add_event(&self, _name: &str, _attributes: &[(&str, &str)]) {}
}
#[cfg(feature = "prometheus")]
pub mod prometheus_metrics {
use std::collections::HashMap;
use std::sync::RwLock;
use ::prometheus::{CounterVec, GaugeVec, HistogramVec, Opts};
use super::*;
pub struct PrometheusMetricsCollector {
counters: RwLock<HashMap<String, CounterVec>>,
histograms: RwLock<HashMap<String, HistogramVec>>,
gauges: RwLock<HashMap<String, GaugeVec>>,
}
impl PrometheusMetricsCollector {
pub fn new() -> Self {
Self {
counters: RwLock::new(HashMap::new()),
histograms: RwLock::new(HashMap::new()),
gauges: RwLock::new(HashMap::new()),
}
}
fn label_values<'a>(labels: &'a [(&'a str, &'a str)]) -> Vec<&'a str> {
labels.iter().map(|(_, v)| *v).collect()
}
}
impl MetricsCollector for PrometheusMetricsCollector {
fn increment_counter(&self, name: &str, labels: &[(&str, &str)]) {
let label_names: Vec<&str> = labels.iter().map(|(k, _)| *k).collect();
let label_vals = Self::label_values(labels);
let counters = self.counters.read().unwrap();
if let Some(counter) = counters.get(name) {
if let Ok(m) = counter.get_metric_with_label_values(&label_vals) {
m.inc();
}
return;
}
drop(counters);
let mut counters = self.counters.write().unwrap();
let counter = counters.entry(name.to_string()).or_insert_with(|| {
let c = CounterVec::new(Opts::new(name, name), &label_names)
.expect("Failed to create counter");
let _ = ::prometheus::register(Box::new(c.clone()));
c
});
if let Ok(m) = counter.get_metric_with_label_values(&label_vals) {
m.inc();
}
}
fn record_histogram(&self, name: &str, value: f64, labels: &[(&str, &str)]) {
let label_names: Vec<&str> = labels.iter().map(|(k, _)| *k).collect();
let label_vals = Self::label_values(labels);
let histograms = self.histograms.read().unwrap();
if let Some(hist) = histograms.get(name) {
if let Ok(m) = hist.get_metric_with_label_values(&label_vals) {
m.observe(value);
}
return;
}
drop(histograms);
let mut histograms = self.histograms.write().unwrap();
let hist = histograms.entry(name.to_string()).or_insert_with(|| {
let h = HistogramVec::new(
::prometheus::HistogramOpts::new(name, name),
&label_names,
)
.expect("Failed to create histogram");
let _ = ::prometheus::register(Box::new(h.clone()));
h
});
if let Ok(m) = hist.get_metric_with_label_values(&label_vals) {
m.observe(value);
}
}
fn record_gauge(&self, name: &str, value: f64, labels: &[(&str, &str)]) {
let label_names: Vec<&str> = labels.iter().map(|(k, _)| *k).collect();
let label_vals = Self::label_values(labels);
let gauges = self.gauges.read().unwrap();
if let Some(gauge) = gauges.get(name) {
if let Ok(m) = gauge.get_metric_with_label_values(&label_vals) {
m.set(value);
}
return;
}
drop(gauges);
let mut gauges = self.gauges.write().unwrap();
let gauge = gauges.entry(name.to_string()).or_insert_with(|| {
let g = GaugeVec::new(Opts::new(name, name), &label_names)
.expect("Failed to create gauge");
let _ = ::prometheus::register(Box::new(g.clone()));
g
});
if let Ok(m) = gauge.get_metric_with_label_values(&label_vals) {
m.set(value);
}
}
}
}
fn policy_type_name(policy: &ResiliencePolicy) -> String {
match policy {
ResiliencePolicy::None => "none".to_string(),
ResiliencePolicy::Retry { .. } => "retry".to_string(),
ResiliencePolicy::CircuitBreaker { .. } => "circuit_breaker".to_string(),
ResiliencePolicy::RateLimit { .. } => "rate_limit".to_string(),
ResiliencePolicy::Timeout { .. } => "timeout".to_string(),
ResiliencePolicy::Combined { .. } => "combined".to_string(),
}
}
pub struct InstrumentedResilienceOrchestrator<T: ResilienceOrchestrator> {
inner: T,
observability: ResilienceObservability,
}
impl<T: ResilienceOrchestrator> InstrumentedResilienceOrchestrator<T> {
pub fn new(inner: T, observability: ResilienceObservability) -> Self {
Self {
inner,
observability,
}
}
}
#[async_trait::async_trait]
impl<T: ResilienceOrchestrator> ResilienceOrchestrator for InstrumentedResilienceOrchestrator<T> {
async fn execute_with_policy<V, F, Fut, E>(
&self,
policy: ResiliencePolicy,
operation: F,
) -> Result<V, ResilienceOrchestrationError>
where
F: FnMut() -> Fut + Send,
Fut: std::future::Future<Output = Result<V, E>> + Send,
E: Into<ResilienceOrchestrationError> + Send,
{
let operation_id = "anonymous_operation"; let start_time = Instant::now();
let policy_clone = policy.clone();
self.observability
.record_operation_start(operation_id, &policy_clone);
let result = self.inner.execute_with_policy(policy, operation).await;
let duration = start_time.elapsed();
match &result {
Ok(_) => {
self.observability.record_operation_complete(
operation_id,
&policy_clone,
duration,
&Ok(()),
);
}
Err(ref err) => {
let cloned_err = match err {
ResilienceOrchestrationError::Domain(d) => {
ResilienceOrchestrationError::Domain(d.clone())
}
ResilienceOrchestrationError::Infrastructure(s) => {
ResilienceOrchestrationError::Infrastructure(s.clone())
}
ResilienceOrchestrationError::Configuration(s) => {
ResilienceOrchestrationError::Configuration(s.clone())
}
ResilienceOrchestrationError::Cancelled => {
ResilienceOrchestrationError::Cancelled
}
};
self.observability.record_operation_complete(
operation_id,
&policy_clone,
duration,
&Err(cloned_err),
);
}
}
result
}
fn get_circuit_breaker(&self, name: &str) -> Option<&CircuitBreaker> {
self.inner.get_circuit_breaker(name)
}
fn get_rate_limiter(&self, name: &str) -> Option<&RateLimiter> {
self.inner.get_rate_limiter(name)
}
fn metrics(&self) -> ResilienceMetrics {
self.inner.metrics()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_observability_recording() {
let observability = ResilienceObservability::new();
let policy = ResiliencePolicy::Retry {
max_attempts: 3,
backoff: crate::domain::resilience::BackoffStrategy::default(),
};
observability.record_operation_start("test_operation", &policy);
let duration = Duration::from_millis(150);
let result = Ok(());
observability.record_operation_complete("test_operation", &policy, duration, &result);
let health = observability.health_status();
assert_eq!(health.overall_health, HealthLevel::Healthy);
}
#[test]
fn test_policy_type_name() {
assert_eq!(policy_type_name(&ResiliencePolicy::None), "none");
assert_eq!(
policy_type_name(&ResiliencePolicy::Retry {
max_attempts: 3,
backoff: crate::domain::resilience::BackoffStrategy::default(),
}),
"retry"
);
assert_eq!(
policy_type_name(&ResiliencePolicy::CircuitBreaker {
failure_threshold: 5,
recovery_timeout: Duration::from_secs(30),
success_threshold: 3,
}),
"circuit_breaker"
);
}
#[test]
fn test_circuit_breaker_state_transitions() {
let observability = ResilienceObservability::new();
observability.record_circuit_breaker_state_change(
"test_circuit",
CircuitBreakerState::Closed,
CircuitBreakerState::Open,
);
let health = observability.health_status();
assert_eq!(health.circuit_breakers_open, 0); }
#[test]
fn test_prometheus_export() {
let observability = ResilienceObservability::new();
let metrics = observability.export_prometheus_metrics();
assert!(metrics.contains("#"));
}
}