mecha10_core/rpc_breaker.rs
1//! RPC with Circuit Breaker Integration
2//!
3//! Combines RPC calls with circuit breaker pattern for improved fault tolerance.
4//! Prevents cascading failures when services are down or slow.
5//!
6//! # Features
7//!
8//! - Automatic circuit breaking for failing RPC endpoints
9//! - Per-endpoint circuit breaker configuration
10//! - Shared circuit breaker registry for efficiency
11//! - Fail-fast when circuit is open
12//! - Compatible with existing RPC API
13//!
14//! # Example
15//!
16//! ```rust
17//! use mecha10::prelude::*;
18//! use mecha10::rpc_breaker::{RpcBreakerExt, CircuitBreakerConfig};
19//!
20//! # async fn example(ctx: &Context) -> Result<()> {
21//! // Make RPC call with circuit breaker
22//! let result: CommandResult = ctx
23//! .request_with_breaker(
24//! "/motor/control",
25//! &command,
26//! CircuitBreakerConfig {
27//! failure_threshold: 5,
28//! success_threshold: 2,
29//! timeout: Duration::from_secs(30),
30//! name: Some("motor_control".to_string()),
31//! }
32//! )
33//! .timeout(Duration::from_secs(5))
34//! .execute()
35//! .await?;
36//! # Ok(())
37//! # }
38//! ```
39
40use crate::circuit_breaker::{CircuitBreaker, CircuitBreakerConfig};
41use crate::context::Context;
42use crate::error::Result;
43use crate::messages::Message;
44use crate::rpc::RpcExt;
45use serde::{de::DeserializeOwned, Serialize};
46use std::collections::HashMap;
47use std::sync::Arc;
48use std::time::Duration;
49use tokio::sync::RwLock;
50
51// ============================================================================
52// Circuit Breaker Registry
53// ============================================================================
54
55/// Global registry of circuit breakers for RPC endpoints
56///
57/// Maintains one circuit breaker per endpoint to track health across all calls.
58#[derive(Clone)]
59pub struct CircuitBreakerRegistry {
60 breakers: Arc<RwLock<HashMap<String, CircuitBreaker>>>,
61}
62
63impl CircuitBreakerRegistry {
64 /// Create a new empty registry
65 pub fn new() -> Self {
66 Self {
67 breakers: Arc::new(RwLock::new(HashMap::new())),
68 }
69 }
70
71 /// Get or create a circuit breaker for an endpoint
72 ///
73 /// # Arguments
74 ///
75 /// * `endpoint` - Endpoint identifier (typically the topic)
76 /// * `config` - Configuration for the circuit breaker
77 ///
78 /// # Returns
79 ///
80 /// A circuit breaker for this endpoint
81 pub async fn get_or_create(&self, endpoint: &str, config: CircuitBreakerConfig) -> CircuitBreaker {
82 let mut breakers = self.breakers.write().await;
83
84 breakers
85 .entry(endpoint.to_string())
86 .or_insert_with(|| CircuitBreaker::new(config))
87 .clone()
88 }
89
90 /// Get statistics for all circuit breakers
91 pub async fn stats(&self) -> HashMap<String, CircuitBreakerStats> {
92 let breakers = self.breakers.read().await;
93 let mut stats_map = HashMap::new();
94
95 for (endpoint, breaker) in breakers.iter() {
96 let (total_requests, total_successes, total_failures, total_rejected) = breaker.stats().await;
97 let success_rate = breaker.success_rate().await;
98 let state = if breaker.is_open().await {
99 "open".to_string()
100 } else if breaker.is_half_open().await {
101 "half-open".to_string()
102 } else {
103 "closed".to_string()
104 };
105
106 stats_map.insert(
107 endpoint.clone(),
108 CircuitBreakerStats {
109 state,
110 total_requests,
111 total_successes,
112 total_failures,
113 total_rejected,
114 success_rate,
115 },
116 );
117 }
118
119 stats_map
120 }
121
122 /// Reset all circuit breakers
123 pub async fn reset_all(&self) {
124 let mut breakers = self.breakers.write().await;
125 breakers.clear();
126 }
127}
128
129impl Default for CircuitBreakerRegistry {
130 fn default() -> Self {
131 Self::new()
132 }
133}
134
135/// Circuit breaker statistics
136#[derive(Debug, Clone)]
137pub struct CircuitBreakerStats {
138 pub state: String,
139 pub total_requests: u64,
140 pub total_successes: u64,
141 pub total_failures: u64,
142 pub total_rejected: u64,
143 pub success_rate: f64,
144}
145
146// ============================================================================
147// RPC Extension with Circuit Breaker
148// ============================================================================
149
150/// Extension trait for RPC with circuit breaker support
151pub trait RpcBreakerExt {
152 /// Make an RPC request with circuit breaker protection
153 ///
154 /// # Arguments
155 ///
156 /// * `topic` - Base topic for the RPC
157 /// * `request` - Request payload
158 /// * `config` - Circuit breaker configuration
159 ///
160 /// # Returns
161 ///
162 /// A request builder that can be configured and executed
163 ///
164 /// # Example
165 ///
166 /// ```rust
167 /// use mecha10::prelude::*;
168 /// use mecha10::rpc_breaker::{RpcBreakerExt, CircuitBreakerConfig};
169 ///
170 /// # async fn example(ctx: &Context) -> Result<()> {
171 /// let result: Response = ctx
172 /// .request_with_breaker(
173 /// "/motor/control",
174 /// &command,
175 /// CircuitBreakerConfig::default()
176 /// )
177 /// .timeout(Duration::from_secs(5))
178 /// .execute()
179 /// .await?;
180 /// # Ok(())
181 /// # }
182 /// ```
183 fn request_with_breaker<Req, Resp>(
184 &self,
185 topic: &str,
186 request: &Req,
187 config: CircuitBreakerConfig,
188 ) -> BreakerRequestBuilder<'_, Req, Resp>
189 where
190 Req: Message + Serialize + Clone,
191 Resp: Message + DeserializeOwned + Send + 'static;
192}
193
194impl RpcBreakerExt for Context {
195 fn request_with_breaker<Req, Resp>(
196 &self,
197 topic: &str,
198 request: &Req,
199 config: CircuitBreakerConfig,
200 ) -> BreakerRequestBuilder<'_, Req, Resp>
201 where
202 Req: Message + Serialize + Clone,
203 Resp: Message + DeserializeOwned + Send + 'static,
204 {
205 BreakerRequestBuilder {
206 ctx: self,
207 topic: topic.to_string(),
208 request: request.clone(),
209 config,
210 timeout: Duration::from_secs(10),
211 _phantom: std::marker::PhantomData,
212 }
213 }
214}
215
216/// Builder for RPC requests with circuit breaker
217pub struct BreakerRequestBuilder<'a, Req, Resp> {
218 ctx: &'a Context,
219 topic: String,
220 request: Req,
221 config: CircuitBreakerConfig,
222 timeout: Duration,
223 _phantom: std::marker::PhantomData<Resp>,
224}
225
226impl<'a, Req, Resp> BreakerRequestBuilder<'a, Req, Resp>
227where
228 Req: Message + Serialize + Clone,
229 Resp: Message + DeserializeOwned + Send + 'static,
230{
231 /// Set the request timeout
232 ///
233 /// # Example
234 ///
235 /// ```rust
236 /// # use mecha10::prelude::*;
237 /// # use mecha10::rpc_breaker::RpcBreakerExt;
238 /// # async fn example(ctx: &Context) -> Result<()> {
239 /// let result = ctx
240 /// .request_with_breaker("/topic", &request, Default::default())
241 /// .timeout(Duration::from_secs(5))
242 /// .await?;
243 /// # Ok(())
244 /// # }
245 /// ```
246 pub fn timeout(mut self, timeout: Duration) -> Self {
247 self.timeout = timeout;
248 self
249 }
250
251 /// Execute the RPC request with circuit breaker protection
252 ///
253 /// # Returns
254 ///
255 /// - `Ok(Resp)` if request succeeded
256 /// - `Err(CircuitBreakerOpen)` if circuit is open
257 /// - `Err(...)` if request failed or timed out
258 pub async fn execute(self) -> Result<Resp> {
259 // Get or create circuit breaker for this endpoint
260 let registry = get_global_registry();
261 let breaker = registry.get_or_create(&self.topic, self.config).await;
262
263 // Execute request through circuit breaker
264 let ctx = self.ctx;
265 let topic = self.topic.clone();
266 let request = self.request.clone();
267 let timeout = self.timeout;
268
269 breaker
270 .call(|| async move { ctx.request(&topic, &request).timeout(timeout).execute().await })
271 .await
272 }
273}
274
275// ============================================================================
276// Global Registry
277// ============================================================================
278
279use once_cell::sync::Lazy;
280
281static GLOBAL_REGISTRY: Lazy<CircuitBreakerRegistry> = Lazy::new(CircuitBreakerRegistry::new);
282
283/// Get the global circuit breaker registry
284pub fn get_global_registry() -> &'static CircuitBreakerRegistry {
285 &GLOBAL_REGISTRY
286}
287
288/// Get statistics for all circuit breakers
289pub async fn get_all_stats() -> HashMap<String, CircuitBreakerStats> {
290 GLOBAL_REGISTRY.stats().await
291}
292
293/// Reset all circuit breakers (useful for testing)
294pub async fn reset_all_breakers() {
295 GLOBAL_REGISTRY.reset_all().await;
296}
297
298// ============================================================================
299// Convenience Configs
300// ============================================================================
301
302impl CircuitBreakerConfig {
303 /// Configuration for critical services (aggressive circuit breaking)
304 ///
305 /// - Opens after 3 failures
306 /// - Requires 3 successes to close
307 /// - 60 second timeout
308 pub fn critical(name: &str) -> Self {
309 Self {
310 failure_threshold: 3,
311 success_threshold: 3,
312 timeout: Duration::from_secs(60),
313 name: Some(name.to_string()),
314 }
315 }
316
317 /// Configuration for standard services (balanced)
318 ///
319 /// - Opens after 5 failures
320 /// - Requires 2 successes to close
321 /// - 30 second timeout
322 pub fn standard(name: &str) -> Self {
323 Self {
324 failure_threshold: 5,
325 success_threshold: 2,
326 timeout: Duration::from_secs(30),
327 name: Some(name.to_string()),
328 }
329 }
330
331 /// Configuration for resilient services (lenient circuit breaking)
332 ///
333 /// - Opens after 10 failures
334 /// - Requires 2 successes to close
335 /// - 15 second timeout
336 pub fn resilient(name: &str) -> Self {
337 Self {
338 failure_threshold: 10,
339 success_threshold: 2,
340 timeout: Duration::from_secs(15),
341 name: Some(name.to_string()),
342 }
343 }
344}