mecha10_core/
rpc_breaker.rs

1//! RPC with Circuit Breaker Integration
2//!
3//! Combines RPC calls with circuit breaker pattern for improved fault tolerance.
4//! Prevents cascading failures when services are down or slow.
5//!
6//! # Features
7//!
8//! - Automatic circuit breaking for failing RPC endpoints
9//! - Per-endpoint circuit breaker configuration
10//! - Shared circuit breaker registry for efficiency
11//! - Fail-fast when circuit is open
12//! - Compatible with existing RPC API
13//!
14//! # Example
15//!
16//! ```rust
17//! use mecha10::prelude::*;
18//! use mecha10::rpc_breaker::{RpcBreakerExt, CircuitBreakerConfig};
19//!
20//! # async fn example(ctx: &Context) -> Result<()> {
21//! // Make RPC call with circuit breaker
22//! let result: CommandResult = ctx
23//!     .request_with_breaker(
24//!         "/motor/control",
25//!         &command,
26//!         CircuitBreakerConfig {
27//!             failure_threshold: 5,
28//!             success_threshold: 2,
29//!             timeout: Duration::from_secs(30),
30//!             name: Some("motor_control".to_string()),
31//!         }
32//!     )
33//!     .timeout(Duration::from_secs(5))
34//!     .execute()
35//!     .await?;
36//! # Ok(())
37//! # }
38//! ```
39
40use crate::circuit_breaker::{CircuitBreaker, CircuitBreakerConfig};
41use crate::context::Context;
42use crate::error::Result;
43use crate::messages::Message;
44use crate::rpc::RpcExt;
45use serde::{de::DeserializeOwned, Serialize};
46use std::collections::HashMap;
47use std::sync::Arc;
48use std::time::Duration;
49use tokio::sync::RwLock;
50
51// ============================================================================
52// Circuit Breaker Registry
53// ============================================================================
54
55/// Global registry of circuit breakers for RPC endpoints
56///
57/// Maintains one circuit breaker per endpoint to track health across all calls.
58#[derive(Clone)]
59pub struct CircuitBreakerRegistry {
60    breakers: Arc<RwLock<HashMap<String, CircuitBreaker>>>,
61}
62
63impl CircuitBreakerRegistry {
64    /// Create a new empty registry
65    pub fn new() -> Self {
66        Self {
67            breakers: Arc::new(RwLock::new(HashMap::new())),
68        }
69    }
70
71    /// Get or create a circuit breaker for an endpoint
72    ///
73    /// # Arguments
74    ///
75    /// * `endpoint` - Endpoint identifier (typically the topic)
76    /// * `config` - Configuration for the circuit breaker
77    ///
78    /// # Returns
79    ///
80    /// A circuit breaker for this endpoint
81    pub async fn get_or_create(&self, endpoint: &str, config: CircuitBreakerConfig) -> CircuitBreaker {
82        let mut breakers = self.breakers.write().await;
83
84        breakers
85            .entry(endpoint.to_string())
86            .or_insert_with(|| CircuitBreaker::new(config))
87            .clone()
88    }
89
90    /// Get statistics for all circuit breakers
91    pub async fn stats(&self) -> HashMap<String, CircuitBreakerStats> {
92        let breakers = self.breakers.read().await;
93        let mut stats_map = HashMap::new();
94
95        for (endpoint, breaker) in breakers.iter() {
96            let (total_requests, total_successes, total_failures, total_rejected) = breaker.stats().await;
97            let success_rate = breaker.success_rate().await;
98            let state = if breaker.is_open().await {
99                "open".to_string()
100            } else if breaker.is_half_open().await {
101                "half-open".to_string()
102            } else {
103                "closed".to_string()
104            };
105
106            stats_map.insert(
107                endpoint.clone(),
108                CircuitBreakerStats {
109                    state,
110                    total_requests,
111                    total_successes,
112                    total_failures,
113                    total_rejected,
114                    success_rate,
115                },
116            );
117        }
118
119        stats_map
120    }
121
122    /// Reset all circuit breakers
123    pub async fn reset_all(&self) {
124        let mut breakers = self.breakers.write().await;
125        breakers.clear();
126    }
127}
128
129impl Default for CircuitBreakerRegistry {
130    fn default() -> Self {
131        Self::new()
132    }
133}
134
135/// Circuit breaker statistics
136#[derive(Debug, Clone)]
137pub struct CircuitBreakerStats {
138    pub state: String,
139    pub total_requests: u64,
140    pub total_successes: u64,
141    pub total_failures: u64,
142    pub total_rejected: u64,
143    pub success_rate: f64,
144}
145
146// ============================================================================
147// RPC Extension with Circuit Breaker
148// ============================================================================
149
150/// Extension trait for RPC with circuit breaker support
151pub trait RpcBreakerExt {
152    /// Make an RPC request with circuit breaker protection
153    ///
154    /// # Arguments
155    ///
156    /// * `topic` - Base topic for the RPC
157    /// * `request` - Request payload
158    /// * `config` - Circuit breaker configuration
159    ///
160    /// # Returns
161    ///
162    /// A request builder that can be configured and executed
163    ///
164    /// # Example
165    ///
166    /// ```rust
167    /// use mecha10::prelude::*;
168    /// use mecha10::rpc_breaker::{RpcBreakerExt, CircuitBreakerConfig};
169    ///
170    /// # async fn example(ctx: &Context) -> Result<()> {
171    /// let result: Response = ctx
172    ///     .request_with_breaker(
173    ///         "/motor/control",
174    ///         &command,
175    ///         CircuitBreakerConfig::default()
176    ///     )
177    ///     .timeout(Duration::from_secs(5))
178    ///     .execute()
179    ///     .await?;
180    /// # Ok(())
181    /// # }
182    /// ```
183    fn request_with_breaker<Req, Resp>(
184        &self,
185        topic: &str,
186        request: &Req,
187        config: CircuitBreakerConfig,
188    ) -> BreakerRequestBuilder<'_, Req, Resp>
189    where
190        Req: Message + Serialize + Clone,
191        Resp: Message + DeserializeOwned + Send + 'static;
192}
193
194impl RpcBreakerExt for Context {
195    fn request_with_breaker<Req, Resp>(
196        &self,
197        topic: &str,
198        request: &Req,
199        config: CircuitBreakerConfig,
200    ) -> BreakerRequestBuilder<'_, Req, Resp>
201    where
202        Req: Message + Serialize + Clone,
203        Resp: Message + DeserializeOwned + Send + 'static,
204    {
205        BreakerRequestBuilder {
206            ctx: self,
207            topic: topic.to_string(),
208            request: request.clone(),
209            config,
210            timeout: Duration::from_secs(10),
211            _phantom: std::marker::PhantomData,
212        }
213    }
214}
215
216/// Builder for RPC requests with circuit breaker
217pub struct BreakerRequestBuilder<'a, Req, Resp> {
218    ctx: &'a Context,
219    topic: String,
220    request: Req,
221    config: CircuitBreakerConfig,
222    timeout: Duration,
223    _phantom: std::marker::PhantomData<Resp>,
224}
225
226impl<'a, Req, Resp> BreakerRequestBuilder<'a, Req, Resp>
227where
228    Req: Message + Serialize + Clone,
229    Resp: Message + DeserializeOwned + Send + 'static,
230{
231    /// Set the request timeout
232    ///
233    /// # Example
234    ///
235    /// ```rust
236    /// # use mecha10::prelude::*;
237    /// # use mecha10::rpc_breaker::RpcBreakerExt;
238    /// # async fn example(ctx: &Context) -> Result<()> {
239    /// let result = ctx
240    ///     .request_with_breaker("/topic", &request, Default::default())
241    ///     .timeout(Duration::from_secs(5))
242    ///     .await?;
243    /// # Ok(())
244    /// # }
245    /// ```
246    pub fn timeout(mut self, timeout: Duration) -> Self {
247        self.timeout = timeout;
248        self
249    }
250
251    /// Execute the RPC request with circuit breaker protection
252    ///
253    /// # Returns
254    ///
255    /// - `Ok(Resp)` if request succeeded
256    /// - `Err(CircuitBreakerOpen)` if circuit is open
257    /// - `Err(...)` if request failed or timed out
258    pub async fn execute(self) -> Result<Resp> {
259        // Get or create circuit breaker for this endpoint
260        let registry = get_global_registry();
261        let breaker = registry.get_or_create(&self.topic, self.config).await;
262
263        // Execute request through circuit breaker
264        let ctx = self.ctx;
265        let topic = self.topic.clone();
266        let request = self.request.clone();
267        let timeout = self.timeout;
268
269        breaker
270            .call(|| async move { ctx.request(&topic, &request).timeout(timeout).execute().await })
271            .await
272    }
273}
274
275// ============================================================================
276// Global Registry
277// ============================================================================
278
279use once_cell::sync::Lazy;
280
281static GLOBAL_REGISTRY: Lazy<CircuitBreakerRegistry> = Lazy::new(CircuitBreakerRegistry::new);
282
283/// Get the global circuit breaker registry
284pub fn get_global_registry() -> &'static CircuitBreakerRegistry {
285    &GLOBAL_REGISTRY
286}
287
288/// Get statistics for all circuit breakers
289pub async fn get_all_stats() -> HashMap<String, CircuitBreakerStats> {
290    GLOBAL_REGISTRY.stats().await
291}
292
293/// Reset all circuit breakers (useful for testing)
294pub async fn reset_all_breakers() {
295    GLOBAL_REGISTRY.reset_all().await;
296}
297
298// ============================================================================
299// Convenience Configs
300// ============================================================================
301
302impl CircuitBreakerConfig {
303    /// Configuration for critical services (aggressive circuit breaking)
304    ///
305    /// - Opens after 3 failures
306    /// - Requires 3 successes to close
307    /// - 60 second timeout
308    pub fn critical(name: &str) -> Self {
309        Self {
310            failure_threshold: 3,
311            success_threshold: 3,
312            timeout: Duration::from_secs(60),
313            name: Some(name.to_string()),
314        }
315    }
316
317    /// Configuration for standard services (balanced)
318    ///
319    /// - Opens after 5 failures
320    /// - Requires 2 successes to close
321    /// - 30 second timeout
322    pub fn standard(name: &str) -> Self {
323        Self {
324            failure_threshold: 5,
325            success_threshold: 2,
326            timeout: Duration::from_secs(30),
327            name: Some(name.to_string()),
328        }
329    }
330
331    /// Configuration for resilient services (lenient circuit breaking)
332    ///
333    /// - Opens after 10 failures
334    /// - Requires 2 successes to close
335    /// - 15 second timeout
336    pub fn resilient(name: &str) -> Self {
337        Self {
338            failure_threshold: 10,
339            success_threshold: 2,
340            timeout: Duration::from_secs(15),
341            name: Some(name.to_string()),
342        }
343    }
344}