stygian_graph/adapters/fallback.rs
1//! Fallback chain service adapter.
2//!
3//! Implements [`crate::ports::ScrapingService`] by trying a prioritised list of inner services
4//! with per-service circuit breakers. When a service's circuit is **Open** or
5//! its execution fails, the chain automatically moves to the next lower-priority
6//! service.
7//!
8//! # Behaviour
9//!
10//! 1. Services are tried in registration order (index 0 = highest priority).
11//! 2. A service is **skipped** when its circuit breaker is [`crate::ports::CircuitState::Open`]
12//! and the reset timeout has not yet elapsed. The chain then probes it once
13//! the timeout passes (half-open probe).
14//! 3. On **success** the corresponding circuit breaker records the success and the
15//! result is returned immediately — no further services are tried.
16//! 4. On **failure** the circuit breaker records the failure and the next service
17//! is tried.
18//! 5. If every service is exhausted the last error is propagated.
19//!
20//! # Example
21//!
22//! ```
23//! use std::sync::Arc;
24//! use std::time::Duration;
25//! use stygian_graph::adapters::fallback::FallbackChainService;
26//! use stygian_graph::adapters::noop::NoopService;
27//! use stygian_graph::adapters::resilience::CircuitBreakerImpl;
28//!
29//! let chain = FallbackChainService::builder()
30//! .add(Arc::new(NoopService), CircuitBreakerImpl::new(3, Duration::from_secs(30)))
31//! .named("primary-with-plugin-fallback")
32//! .build();
33//! ```
34
35use std::sync::Arc;
36use std::time::Duration;
37
38use async_trait::async_trait;
39use tracing::{debug, info, warn};
40
41use crate::adapters::resilience::CircuitBreakerImpl;
42use crate::domain::error::{Result, ServiceError, StygianError};
43use crate::ports::{CircuitBreaker, CircuitState, ScrapingService, ServiceInput, ServiceOutput};
44
45// ── Chain entry ───────────────────────────────────────────────────────────────
46
47/// A single link in the fallback chain: a service paired with its circuit breaker.
48struct ChainEntry {
49 service: Arc<dyn ScrapingService>,
50 breaker: Arc<CircuitBreakerImpl>,
51}
52
53// ── FallbackChainService ──────────────────────────────────────────────────────
54
55/// A [`ScrapingService`] that tries multiple inner services in priority order,
56/// automatically routing around open circuit breakers and failed services.
57///
58/// Construct via [`FallbackChainService::builder()`].
59///
60/// # Example
61///
62/// ```
63/// use std::sync::Arc;
64/// use std::time::Duration;
65/// use stygian_graph::adapters::fallback::FallbackChainService;
66/// use stygian_graph::adapters::noop::NoopService;
67/// use stygian_graph::adapters::resilience::CircuitBreakerImpl;
68/// use stygian_graph::ports::ScrapingService;
69///
70/// let chain = FallbackChainService::builder()
71/// .add(Arc::new(NoopService), CircuitBreakerImpl::new(5, Duration::from_secs(60)))
72/// .build();
73///
74/// assert_eq!(chain.name(), "fallback-chain");
75/// ```
76pub struct FallbackChainService {
77 entries: Vec<ChainEntry>,
78 name: &'static str,
79}
80
81impl FallbackChainService {
82 /// Return a [`FallbackChainBuilder`] for ergonomic construction.
83 ///
84 /// # Example
85 ///
86 /// ```
87 /// use stygian_graph::adapters::fallback::FallbackChainService;
88 ///
89 /// let builder = FallbackChainService::builder();
90 /// ```
91 pub const fn builder() -> FallbackChainBuilder {
92 FallbackChainBuilder::new()
93 }
94
95 /// Return the number of services in this chain.
96 ///
97 /// # Example
98 ///
99 /// ```
100 /// use std::sync::Arc;
101 /// use std::time::Duration;
102 /// use stygian_graph::adapters::fallback::FallbackChainService;
103 /// use stygian_graph::adapters::noop::NoopService;
104 /// use stygian_graph::adapters::resilience::CircuitBreakerImpl;
105 ///
106 /// let chain = FallbackChainService::builder()
107 /// .add(Arc::new(NoopService), CircuitBreakerImpl::new(3, Duration::from_secs(30)))
108 /// .add(Arc::new(NoopService), CircuitBreakerImpl::new(3, Duration::from_secs(30)))
109 /// .build();
110 ///
111 /// assert_eq!(chain.len(), 2);
112 /// ```
113 pub const fn len(&self) -> usize {
114 self.entries.len()
115 }
116
117 /// Return `true` when no services are registered.
118 ///
119 /// An empty chain always returns [`ServiceError::Unavailable`].
120 ///
121 /// # Example
122 ///
123 /// ```
124 /// use stygian_graph::adapters::fallback::FallbackChainService;
125 ///
126 /// let chain = FallbackChainService::builder().build();
127 /// assert!(chain.is_empty());
128 /// ```
129 pub const fn is_empty(&self) -> bool {
130 self.entries.is_empty()
131 }
132}
133
134#[async_trait]
135impl ScrapingService for FallbackChainService {
136 /// Execute the fallback chain.
137 ///
138 /// Tries each registered service in order, respecting circuit breaker state.
139 /// Returns the first successful result, or the last error if all services fail.
140 async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
141 let mut last_err: Option<StygianError> = None;
142
143 for (idx, entry) in self.entries.iter().enumerate() {
144 let state = entry.breaker.state();
145
146 // Skip services whose circuit is Open and hasn't timed out yet.
147 if state == CircuitState::Open {
148 if !entry.breaker.attempt_reset() {
149 debug!(
150 service = entry.service.name(),
151 chain = self.name,
152 idx,
153 "circuit open — skipping service in fallback chain"
154 );
155 continue;
156 }
157 debug!(
158 service = entry.service.name(),
159 chain = self.name,
160 idx,
161 "circuit half-open — probing service"
162 );
163 }
164
165 debug!(
166 service = entry.service.name(),
167 chain = self.name,
168 idx,
169 url = %input.url,
170 "fallback chain: attempting service"
171 );
172
173 match entry.service.execute(input.clone()).await {
174 Ok(output) => {
175 entry.breaker.record_success();
176 info!(
177 service = entry.service.name(),
178 chain = self.name,
179 idx,
180 "fallback chain: service succeeded"
181 );
182 return Ok(output);
183 }
184 Err(e) => {
185 entry.breaker.record_failure();
186 warn!(
187 service = entry.service.name(),
188 chain = self.name,
189 idx,
190 error = %e,
191 "fallback chain: service failed — advancing to next"
192 );
193 last_err = Some(e);
194 }
195 }
196 }
197
198 Err(last_err.unwrap_or_else(|| {
199 StygianError::Service(ServiceError::Unavailable(format!(
200 "fallback chain '{}' exhausted: no services registered or available",
201 self.name
202 )))
203 }))
204 }
205
206 fn name(&self) -> &'static str {
207 self.name
208 }
209}
210
211// ── FallbackChainBuilder ──────────────────────────────────────────────────────
212
213/// Builder for [`FallbackChainService`].
214///
215/// Services are tried in the order they are added. Add the highest-priority
216/// (cheapest / most reliable) service first; add the plugin extraction adapter
217/// last as the final fallback.
218///
219/// # Example
220///
221/// ```
222/// use std::sync::Arc;
223/// use std::time::Duration;
224/// use stygian_graph::adapters::fallback::FallbackChainBuilder;
225/// use stygian_graph::adapters::noop::NoopService;
226/// use stygian_graph::adapters::resilience::CircuitBreakerImpl;
227/// use stygian_graph::ports::ScrapingService;
228///
229/// let chain = FallbackChainBuilder::new()
230/// .add(Arc::new(NoopService), CircuitBreakerImpl::new(5, Duration::from_secs(60)))
231/// .add(Arc::new(NoopService), CircuitBreakerImpl::new(3, Duration::from_secs(30)))
232/// .named("http-to-plugin")
233/// .build();
234///
235/// assert_eq!(chain.len(), 2);
236/// assert_eq!(chain.name(), "http-to-plugin");
237/// ```
238pub struct FallbackChainBuilder {
239 entries: Vec<ChainEntry>,
240 name: &'static str,
241}
242
243impl FallbackChainBuilder {
244 /// Create an empty builder with the default name `"fallback-chain"`.
245 ///
246 /// # Example
247 ///
248 /// ```
249 /// use stygian_graph::adapters::fallback::FallbackChainBuilder;
250 ///
251 /// let builder = FallbackChainBuilder::new();
252 /// ```
253 pub const fn new() -> Self {
254 Self {
255 entries: Vec::new(),
256 name: "fallback-chain",
257 }
258 }
259
260 /// Add a service and its dedicated circuit breaker (highest to lowest priority).
261 ///
262 /// # Arguments
263 ///
264 /// * `service` — The [`ScrapingService`] to add.
265 /// * `breaker` — A [`CircuitBreakerImpl`] configured for this specific service.
266 ///
267 /// # Example
268 ///
269 /// ```
270 /// use std::sync::Arc;
271 /// use std::time::Duration;
272 /// use stygian_graph::adapters::fallback::FallbackChainBuilder;
273 /// use stygian_graph::adapters::noop::NoopService;
274 /// use stygian_graph::adapters::resilience::CircuitBreakerImpl;
275 ///
276 /// let builder = FallbackChainBuilder::new()
277 /// .add(Arc::new(NoopService), CircuitBreakerImpl::new(5, Duration::from_secs(60)));
278 /// ```
279 #[must_use]
280 pub fn add(mut self, service: Arc<dyn ScrapingService>, breaker: CircuitBreakerImpl) -> Self {
281 self.entries.push(ChainEntry {
282 service,
283 breaker: Arc::new(breaker),
284 });
285 self
286 }
287
288 /// Override the static name reported by [`ScrapingService::name`].
289 ///
290 /// # Example
291 ///
292 /// ```
293 /// use stygian_graph::adapters::fallback::FallbackChainBuilder;
294 /// use stygian_graph::ports::ScrapingService;
295 ///
296 /// let chain = FallbackChainBuilder::new().named("http-to-plugin-fallback").build();
297 /// assert_eq!(chain.name(), "http-to-plugin-fallback");
298 /// ```
299 #[must_use]
300 pub const fn named(mut self, name: &'static str) -> Self {
301 self.name = name;
302 self
303 }
304
305 /// Build the [`FallbackChainService`].
306 ///
307 /// An empty chain (no services added) is valid but will immediately return
308 /// [`ServiceError::Unavailable`] on every call.
309 ///
310 /// # Example
311 ///
312 /// ```
313 /// use stygian_graph::adapters::fallback::FallbackChainBuilder;
314 ///
315 /// let chain = FallbackChainBuilder::new().build();
316 /// assert!(chain.is_empty());
317 /// ```
318 pub fn build(self) -> FallbackChainService {
319 FallbackChainService {
320 entries: self.entries,
321 name: self.name,
322 }
323 }
324}
325
326impl Default for FallbackChainBuilder {
327 fn default() -> Self {
328 Self::new()
329 }
330}
331
332// ── Default circuit breaker parameters ───────────────────────────────────────
333
334/// Sensible default for a production circuit breaker on a primary scraper.
335///
336/// Opens after **5 consecutive failures** and attempts reset after **30 seconds**.
337///
338/// # Example
339///
340/// ```
341/// use stygian_graph::adapters::fallback::default_primary_breaker;
342///
343/// let breaker = default_primary_breaker();
344/// ```
345pub fn default_primary_breaker() -> CircuitBreakerImpl {
346 CircuitBreakerImpl::new(5, Duration::from_secs(30))
347}
348
349/// Sensible default for a production circuit breaker on a fallback scraper.
350///
351/// Opens after **3 consecutive failures** and attempts reset after **60 seconds**.
352/// The longer reset timeout gives the fallback more time to recover since it is
353/// typically a heavier operation.
354///
355/// # Example
356///
357/// ```
358/// use stygian_graph::adapters::fallback::default_fallback_breaker;
359///
360/// let breaker = default_fallback_breaker();
361/// ```
362pub fn default_fallback_breaker() -> CircuitBreakerImpl {
363 #[allow(clippy::duration_suboptimal_units)]
364 {
365 CircuitBreakerImpl::new(3, Duration::from_secs(60))
366 }
367}
368
369// ── Unit tests ────────────────────────────────────────────────────────────────
370
371#[cfg(test)]
372mod tests {
373 use super::*;
374 use crate::adapters::noop::NoopService;
375 use crate::domain::error::ServiceError;
376 use crate::ports::{ServiceInput, ServiceOutput};
377 use serde_json::json;
378
379 // ── helper: always-failing service ────────────────────────────────────
380
381 struct AlwaysFailService;
382
383 #[async_trait]
384 impl ScrapingService for AlwaysFailService {
385 async fn execute(&self, _input: ServiceInput) -> Result<ServiceOutput> {
386 Err(StygianError::Service(ServiceError::Unavailable(
387 "simulated failure".into(),
388 )))
389 }
390
391 fn name(&self) -> &'static str {
392 "always-fail"
393 }
394 }
395
396 fn make_input() -> ServiceInput {
397 ServiceInput {
398 url: "https://example.com".to_string(),
399 params: json!({}),
400 }
401 }
402
403 // ── tests ─────────────────────────────────────────────────────────────
404
405 #[tokio::test]
406 async fn test_first_service_succeeds() -> Result<()> {
407 let chain = FallbackChainService::builder()
408 .add(
409 Arc::new(NoopService),
410 CircuitBreakerImpl::new(5, Duration::from_secs(30)),
411 )
412 .add(
413 Arc::new(AlwaysFailService),
414 CircuitBreakerImpl::new(5, Duration::from_secs(30)),
415 )
416 .build();
417
418 let output = chain.execute(make_input()).await?;
419 match output.metadata.get("service") {
420 Some(service) => assert_eq!(service, "noop", "noop should win"),
421 None => {
422 return Err(
423 ServiceError::Unavailable("service key should exist".to_string()).into(),
424 );
425 }
426 }
427 Ok(())
428 }
429
430 #[tokio::test]
431 async fn test_fallback_fires_when_primary_fails() -> Result<()> {
432 let chain = FallbackChainService::builder()
433 .add(
434 Arc::new(AlwaysFailService),
435 CircuitBreakerImpl::new(5, Duration::from_secs(30)),
436 )
437 .add(
438 Arc::new(NoopService),
439 CircuitBreakerImpl::new(5, Duration::from_secs(30)),
440 )
441 .named("primary-then-noop")
442 .build();
443
444 let output = chain.execute(make_input()).await?;
445 match output.metadata.get("service") {
446 Some(service) => assert_eq!(
447 service, "noop",
448 "fallback noop should win after primary failure"
449 ),
450 None => {
451 return Err(
452 ServiceError::Unavailable("service key should exist".to_string()).into(),
453 );
454 }
455 }
456 Ok(())
457 }
458
459 #[tokio::test]
460 async fn test_all_services_fail_returns_error() {
461 let chain = FallbackChainService::builder()
462 .add(
463 Arc::new(AlwaysFailService),
464 CircuitBreakerImpl::new(5, Duration::from_secs(30)),
465 )
466 .add(
467 Arc::new(AlwaysFailService),
468 CircuitBreakerImpl::new(5, Duration::from_secs(30)),
469 )
470 .build();
471
472 let result = chain.execute(make_input()).await;
473 assert!(result.is_err(), "all-failing chain must return error");
474 }
475
476 #[tokio::test]
477 async fn test_empty_chain_returns_unavailable() {
478 let chain = FallbackChainService::builder().build();
479 let result = chain.execute(make_input()).await;
480 assert!(
481 result.is_err(),
482 "empty chain must return ServiceError::Unavailable"
483 );
484 }
485
486 #[tokio::test]
487 async fn test_chain_name_default() {
488 let chain = FallbackChainService::builder().build();
489 assert_eq!(chain.name(), "fallback-chain");
490 }
491
492 #[tokio::test]
493 async fn test_chain_name_custom() {
494 let chain = FallbackChainService::builder()
495 .named("http-to-plugin")
496 .build();
497 assert_eq!(chain.name(), "http-to-plugin");
498 }
499
500 #[tokio::test]
501 async fn test_open_circuit_skipped_advances_to_next() -> Result<()> {
502 // Breaker with threshold 1: one failure opens the circuit
503 let failing_breaker = CircuitBreakerImpl::new(1, {
504 #[allow(clippy::duration_suboptimal_units)]
505 {
506 Duration::from_secs(3600)
507 } // 1 hour
508 });
509
510 // Pre-open the circuit by recording the one required failure
511 failing_breaker.record_failure();
512 assert_eq!(
513 failing_breaker.state(),
514 CircuitState::Open,
515 "breaker should be open after threshold hit"
516 );
517
518 let chain = FallbackChainService::builder()
519 .add(Arc::new(AlwaysFailService), failing_breaker)
520 .add(
521 Arc::new(NoopService),
522 CircuitBreakerImpl::new(5, Duration::from_secs(30)),
523 )
524 .named("open-circuit-skip-test")
525 .build();
526
527 // The first service's circuit is open and the timeout is 3600s so it
528 // should be skipped entirely, and noop (second) should succeed.
529 let output = chain.execute(make_input()).await?;
530 match output.metadata.get("service") {
531 Some(service) => assert_eq!(
532 service, "noop",
533 "open-circuit service must be skipped; noop must serve the request"
534 ),
535 None => {
536 return Err(
537 ServiceError::Unavailable("service key should exist".to_string()).into(),
538 );
539 }
540 }
541 Ok(())
542 }
543
544 #[tokio::test]
545 async fn test_circuit_records_success_on_recovery() -> Result<()> {
546 // Build a chain with two noop services
547 let chain = FallbackChainService::builder()
548 .add(
549 Arc::new(NoopService),
550 CircuitBreakerImpl::new(5, Duration::from_secs(30)),
551 )
552 .build();
553
554 // Execute twice — both should succeed and circuit stays closed
555 chain.execute(make_input()).await?;
556 chain.execute(make_input()).await?;
557 Ok(())
558 }
559
560 #[tokio::test]
561 async fn test_len_and_is_empty() {
562 let empty = FallbackChainService::builder().build();
563 assert!(empty.is_empty());
564 assert_eq!(empty.len(), 0);
565
566 let one = FallbackChainService::builder()
567 .add(
568 Arc::new(NoopService),
569 CircuitBreakerImpl::new(5, Duration::from_secs(30)),
570 )
571 .build();
572 assert!(!one.is_empty());
573 assert_eq!(one.len(), 1);
574 }
575
576 #[tokio::test]
577 async fn test_default_breaker_helpers() {
578 let primary = default_primary_breaker();
579 let fallback = default_fallback_breaker();
580 assert_eq!(primary.state(), CircuitState::Closed);
581 assert_eq!(fallback.state(), CircuitState::Closed);
582 }
583}