Skip to main content

stygian_graph/adapters/
fallback.rs

1//! Fallback chain service adapter.
2//!
3//! Implements [`crate::ports::ScrapingService`] by trying a prioritised list of inner services
4//! with per-service circuit breakers.  When a service's circuit is **Open** or
5//! its execution fails, the chain automatically moves to the next lower-priority
6//! service.
7//!
8//! # Behaviour
9//!
10//! 1. Services are tried in registration order (index 0 = highest priority).
11//! 2. A service is **skipped** when its circuit breaker is [`crate::ports::CircuitState::Open`]
12//!    and the reset timeout has not yet elapsed.  The chain then probes it once
13//!    the timeout passes (half-open probe).
14//! 3. On **success** the corresponding circuit breaker records the success and the
15//!    result is returned immediately — no further services are tried.
16//! 4. On **failure** the circuit breaker records the failure and the next service
17//!    is tried.
18//! 5. If every service is exhausted the last error is propagated.
19//!
20//! # Example
21//!
22//! ```
23//! use std::sync::Arc;
24//! use std::time::Duration;
25//! use stygian_graph::adapters::fallback::FallbackChainService;
26//! use stygian_graph::adapters::noop::NoopService;
27//! use stygian_graph::adapters::resilience::CircuitBreakerImpl;
28//!
29//! let chain = FallbackChainService::builder()
30//!     .add(Arc::new(NoopService), CircuitBreakerImpl::new(3, Duration::from_secs(30)))
31//!     .named("primary-with-plugin-fallback")
32//!     .build();
33//! ```
34
35use std::sync::Arc;
36use std::time::Duration;
37
38use async_trait::async_trait;
39use tracing::{debug, info, warn};
40
41use crate::adapters::resilience::CircuitBreakerImpl;
42use crate::domain::error::{Result, ServiceError, StygianError};
43use crate::ports::{CircuitBreaker, CircuitState, ScrapingService, ServiceInput, ServiceOutput};
44
45// ── Chain entry ───────────────────────────────────────────────────────────────
46
47/// A single link in the fallback chain: a service paired with its circuit breaker.
48struct ChainEntry {
49    service: Arc<dyn ScrapingService>,
50    breaker: Arc<CircuitBreakerImpl>,
51}
52
53// ── FallbackChainService ──────────────────────────────────────────────────────
54
55/// A [`ScrapingService`] that tries multiple inner services in priority order,
56/// automatically routing around open circuit breakers and failed services.
57///
58/// Construct via [`FallbackChainService::builder()`].
59///
60/// # Example
61///
62/// ```
63/// use std::sync::Arc;
64/// use std::time::Duration;
65/// use stygian_graph::adapters::fallback::FallbackChainService;
66/// use stygian_graph::adapters::noop::NoopService;
67/// use stygian_graph::adapters::resilience::CircuitBreakerImpl;
68/// use stygian_graph::ports::ScrapingService;
69///
70/// let chain = FallbackChainService::builder()
71///     .add(Arc::new(NoopService), CircuitBreakerImpl::new(5, Duration::from_secs(60)))
72///     .build();
73///
74/// assert_eq!(chain.name(), "fallback-chain");
75/// ```
76pub struct FallbackChainService {
77    entries: Vec<ChainEntry>,
78    name: &'static str,
79}
80
81impl FallbackChainService {
82    /// Return a [`FallbackChainBuilder`] for ergonomic construction.
83    ///
84    /// # Example
85    ///
86    /// ```
87    /// use stygian_graph::adapters::fallback::FallbackChainService;
88    ///
89    /// let builder = FallbackChainService::builder();
90    /// ```
91    pub const fn builder() -> FallbackChainBuilder {
92        FallbackChainBuilder::new()
93    }
94
95    /// Return the number of services in this chain.
96    ///
97    /// # Example
98    ///
99    /// ```
100    /// use std::sync::Arc;
101    /// use std::time::Duration;
102    /// use stygian_graph::adapters::fallback::FallbackChainService;
103    /// use stygian_graph::adapters::noop::NoopService;
104    /// use stygian_graph::adapters::resilience::CircuitBreakerImpl;
105    ///
106    /// let chain = FallbackChainService::builder()
107    ///     .add(Arc::new(NoopService), CircuitBreakerImpl::new(3, Duration::from_secs(30)))
108    ///     .add(Arc::new(NoopService), CircuitBreakerImpl::new(3, Duration::from_secs(30)))
109    ///     .build();
110    ///
111    /// assert_eq!(chain.len(), 2);
112    /// ```
113    pub const fn len(&self) -> usize {
114        self.entries.len()
115    }
116
117    /// Return `true` when no services are registered.
118    ///
119    /// An empty chain always returns [`ServiceError::Unavailable`].
120    ///
121    /// # Example
122    ///
123    /// ```
124    /// use stygian_graph::adapters::fallback::FallbackChainService;
125    ///
126    /// let chain = FallbackChainService::builder().build();
127    /// assert!(chain.is_empty());
128    /// ```
129    pub const fn is_empty(&self) -> bool {
130        self.entries.is_empty()
131    }
132}
133
134#[async_trait]
135impl ScrapingService for FallbackChainService {
136    /// Execute the fallback chain.
137    ///
138    /// Tries each registered service in order, respecting circuit breaker state.
139    /// Returns the first successful result, or the last error if all services fail.
140    async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
141        let mut last_err: Option<StygianError> = None;
142
143        for (idx, entry) in self.entries.iter().enumerate() {
144            let state = entry.breaker.state();
145
146            // Skip services whose circuit is Open and hasn't timed out yet.
147            if state == CircuitState::Open {
148                if !entry.breaker.attempt_reset() {
149                    debug!(
150                        service = entry.service.name(),
151                        chain = self.name,
152                        idx,
153                        "circuit open — skipping service in fallback chain"
154                    );
155                    continue;
156                }
157                debug!(
158                    service = entry.service.name(),
159                    chain = self.name,
160                    idx,
161                    "circuit half-open — probing service"
162                );
163            }
164
165            debug!(
166                service = entry.service.name(),
167                chain = self.name,
168                idx,
169                url = %input.url,
170                "fallback chain: attempting service"
171            );
172
173            match entry.service.execute(input.clone()).await {
174                Ok(output) => {
175                    entry.breaker.record_success();
176                    info!(
177                        service = entry.service.name(),
178                        chain = self.name,
179                        idx,
180                        "fallback chain: service succeeded"
181                    );
182                    return Ok(output);
183                }
184                Err(e) => {
185                    entry.breaker.record_failure();
186                    warn!(
187                        service = entry.service.name(),
188                        chain = self.name,
189                        idx,
190                        error = %e,
191                        "fallback chain: service failed — advancing to next"
192                    );
193                    last_err = Some(e);
194                }
195            }
196        }
197
198        Err(last_err.unwrap_or_else(|| {
199            StygianError::Service(ServiceError::Unavailable(format!(
200                "fallback chain '{}' exhausted: no services registered or available",
201                self.name
202            )))
203        }))
204    }
205
206    fn name(&self) -> &'static str {
207        self.name
208    }
209}
210
211// ── FallbackChainBuilder ──────────────────────────────────────────────────────
212
213/// Builder for [`FallbackChainService`].
214///
215/// Services are tried in the order they are added.  Add the highest-priority
216/// (cheapest / most reliable) service first; add the plugin extraction adapter
217/// last as the final fallback.
218///
219/// # Example
220///
221/// ```
222/// use std::sync::Arc;
223/// use std::time::Duration;
224/// use stygian_graph::adapters::fallback::FallbackChainBuilder;
225/// use stygian_graph::adapters::noop::NoopService;
226/// use stygian_graph::adapters::resilience::CircuitBreakerImpl;
227/// use stygian_graph::ports::ScrapingService;
228///
229/// let chain = FallbackChainBuilder::new()
230///     .add(Arc::new(NoopService), CircuitBreakerImpl::new(5, Duration::from_secs(60)))
231///     .add(Arc::new(NoopService), CircuitBreakerImpl::new(3, Duration::from_secs(30)))
232///     .named("http-to-plugin")
233///     .build();
234///
235/// assert_eq!(chain.len(), 2);
236/// assert_eq!(chain.name(), "http-to-plugin");
237/// ```
238pub struct FallbackChainBuilder {
239    entries: Vec<ChainEntry>,
240    name: &'static str,
241}
242
243impl FallbackChainBuilder {
244    /// Create an empty builder with the default name `"fallback-chain"`.
245    ///
246    /// # Example
247    ///
248    /// ```
249    /// use stygian_graph::adapters::fallback::FallbackChainBuilder;
250    ///
251    /// let builder = FallbackChainBuilder::new();
252    /// ```
253    pub const fn new() -> Self {
254        Self {
255            entries: Vec::new(),
256            name: "fallback-chain",
257        }
258    }
259
260    /// Add a service and its dedicated circuit breaker (highest to lowest priority).
261    ///
262    /// # Arguments
263    ///
264    /// * `service` — The [`ScrapingService`] to add.
265    /// * `breaker` — A [`CircuitBreakerImpl`] configured for this specific service.
266    ///
267    /// # Example
268    ///
269    /// ```
270    /// use std::sync::Arc;
271    /// use std::time::Duration;
272    /// use stygian_graph::adapters::fallback::FallbackChainBuilder;
273    /// use stygian_graph::adapters::noop::NoopService;
274    /// use stygian_graph::adapters::resilience::CircuitBreakerImpl;
275    ///
276    /// let builder = FallbackChainBuilder::new()
277    ///     .add(Arc::new(NoopService), CircuitBreakerImpl::new(5, Duration::from_secs(60)));
278    /// ```
279    #[must_use]
280    pub fn add(mut self, service: Arc<dyn ScrapingService>, breaker: CircuitBreakerImpl) -> Self {
281        self.entries.push(ChainEntry {
282            service,
283            breaker: Arc::new(breaker),
284        });
285        self
286    }
287
288    /// Override the static name reported by [`ScrapingService::name`].
289    ///
290    /// # Example
291    ///
292    /// ```
293    /// use stygian_graph::adapters::fallback::FallbackChainBuilder;
294    /// use stygian_graph::ports::ScrapingService;
295    ///
296    /// let chain = FallbackChainBuilder::new().named("http-to-plugin-fallback").build();
297    /// assert_eq!(chain.name(), "http-to-plugin-fallback");
298    /// ```
299    #[must_use]
300    pub const fn named(mut self, name: &'static str) -> Self {
301        self.name = name;
302        self
303    }
304
305    /// Build the [`FallbackChainService`].
306    ///
307    /// An empty chain (no services added) is valid but will immediately return
308    /// [`ServiceError::Unavailable`] on every call.
309    ///
310    /// # Example
311    ///
312    /// ```
313    /// use stygian_graph::adapters::fallback::FallbackChainBuilder;
314    ///
315    /// let chain = FallbackChainBuilder::new().build();
316    /// assert!(chain.is_empty());
317    /// ```
318    pub fn build(self) -> FallbackChainService {
319        FallbackChainService {
320            entries: self.entries,
321            name: self.name,
322        }
323    }
324}
325
326impl Default for FallbackChainBuilder {
327    fn default() -> Self {
328        Self::new()
329    }
330}
331
332// ── Default circuit breaker parameters ───────────────────────────────────────
333
334/// Sensible default for a production circuit breaker on a primary scraper.
335///
336/// Opens after **5 consecutive failures** and attempts reset after **30 seconds**.
337///
338/// # Example
339///
340/// ```
341/// use stygian_graph::adapters::fallback::default_primary_breaker;
342///
343/// let breaker = default_primary_breaker();
344/// ```
345pub fn default_primary_breaker() -> CircuitBreakerImpl {
346    CircuitBreakerImpl::new(5, Duration::from_secs(30))
347}
348
349/// Sensible default for a production circuit breaker on a fallback scraper.
350///
351/// Opens after **3 consecutive failures** and attempts reset after **60 seconds**.
352/// The longer reset timeout gives the fallback more time to recover since it is
353/// typically a heavier operation.
354///
355/// # Example
356///
357/// ```
358/// use stygian_graph::adapters::fallback::default_fallback_breaker;
359///
360/// let breaker = default_fallback_breaker();
361/// ```
362pub fn default_fallback_breaker() -> CircuitBreakerImpl {
363    #[allow(clippy::duration_suboptimal_units)]
364    {
365        CircuitBreakerImpl::new(3, Duration::from_secs(60))
366    }
367}
368
369// ── Unit tests ────────────────────────────────────────────────────────────────
370
371#[cfg(test)]
372mod tests {
373    use super::*;
374    use crate::adapters::noop::NoopService;
375    use crate::domain::error::ServiceError;
376    use crate::ports::{ServiceInput, ServiceOutput};
377    use serde_json::json;
378
379    // ── helper: always-failing service ────────────────────────────────────
380
381    struct AlwaysFailService;
382
383    #[async_trait]
384    impl ScrapingService for AlwaysFailService {
385        async fn execute(&self, _input: ServiceInput) -> Result<ServiceOutput> {
386            Err(StygianError::Service(ServiceError::Unavailable(
387                "simulated failure".into(),
388            )))
389        }
390
391        fn name(&self) -> &'static str {
392            "always-fail"
393        }
394    }
395
396    fn make_input() -> ServiceInput {
397        ServiceInput {
398            url: "https://example.com".to_string(),
399            params: json!({}),
400        }
401    }
402
403    // ── tests ─────────────────────────────────────────────────────────────
404
405    #[tokio::test]
406    async fn test_first_service_succeeds() -> Result<()> {
407        let chain = FallbackChainService::builder()
408            .add(
409                Arc::new(NoopService),
410                CircuitBreakerImpl::new(5, Duration::from_secs(30)),
411            )
412            .add(
413                Arc::new(AlwaysFailService),
414                CircuitBreakerImpl::new(5, Duration::from_secs(30)),
415            )
416            .build();
417
418        let output = chain.execute(make_input()).await?;
419        match output.metadata.get("service") {
420            Some(service) => assert_eq!(service, "noop", "noop should win"),
421            None => {
422                return Err(
423                    ServiceError::Unavailable("service key should exist".to_string()).into(),
424                );
425            }
426        }
427        Ok(())
428    }
429
430    #[tokio::test]
431    async fn test_fallback_fires_when_primary_fails() -> Result<()> {
432        let chain = FallbackChainService::builder()
433            .add(
434                Arc::new(AlwaysFailService),
435                CircuitBreakerImpl::new(5, Duration::from_secs(30)),
436            )
437            .add(
438                Arc::new(NoopService),
439                CircuitBreakerImpl::new(5, Duration::from_secs(30)),
440            )
441            .named("primary-then-noop")
442            .build();
443
444        let output = chain.execute(make_input()).await?;
445        match output.metadata.get("service") {
446            Some(service) => assert_eq!(
447                service, "noop",
448                "fallback noop should win after primary failure"
449            ),
450            None => {
451                return Err(
452                    ServiceError::Unavailable("service key should exist".to_string()).into(),
453                );
454            }
455        }
456        Ok(())
457    }
458
459    #[tokio::test]
460    async fn test_all_services_fail_returns_error() {
461        let chain = FallbackChainService::builder()
462            .add(
463                Arc::new(AlwaysFailService),
464                CircuitBreakerImpl::new(5, Duration::from_secs(30)),
465            )
466            .add(
467                Arc::new(AlwaysFailService),
468                CircuitBreakerImpl::new(5, Duration::from_secs(30)),
469            )
470            .build();
471
472        let result = chain.execute(make_input()).await;
473        assert!(result.is_err(), "all-failing chain must return error");
474    }
475
476    #[tokio::test]
477    async fn test_empty_chain_returns_unavailable() {
478        let chain = FallbackChainService::builder().build();
479        let result = chain.execute(make_input()).await;
480        assert!(
481            result.is_err(),
482            "empty chain must return ServiceError::Unavailable"
483        );
484    }
485
486    #[tokio::test]
487    async fn test_chain_name_default() {
488        let chain = FallbackChainService::builder().build();
489        assert_eq!(chain.name(), "fallback-chain");
490    }
491
492    #[tokio::test]
493    async fn test_chain_name_custom() {
494        let chain = FallbackChainService::builder()
495            .named("http-to-plugin")
496            .build();
497        assert_eq!(chain.name(), "http-to-plugin");
498    }
499
500    #[tokio::test]
501    async fn test_open_circuit_skipped_advances_to_next() -> Result<()> {
502        // Breaker with threshold 1: one failure opens the circuit
503        let failing_breaker = CircuitBreakerImpl::new(1, {
504            #[allow(clippy::duration_suboptimal_units)]
505            {
506                Duration::from_secs(3600)
507            } // 1 hour
508        });
509
510        // Pre-open the circuit by recording the one required failure
511        failing_breaker.record_failure();
512        assert_eq!(
513            failing_breaker.state(),
514            CircuitState::Open,
515            "breaker should be open after threshold hit"
516        );
517
518        let chain = FallbackChainService::builder()
519            .add(Arc::new(AlwaysFailService), failing_breaker)
520            .add(
521                Arc::new(NoopService),
522                CircuitBreakerImpl::new(5, Duration::from_secs(30)),
523            )
524            .named("open-circuit-skip-test")
525            .build();
526
527        // The first service's circuit is open and the timeout is 3600s so it
528        // should be skipped entirely, and noop (second) should succeed.
529        let output = chain.execute(make_input()).await?;
530        match output.metadata.get("service") {
531            Some(service) => assert_eq!(
532                service, "noop",
533                "open-circuit service must be skipped; noop must serve the request"
534            ),
535            None => {
536                return Err(
537                    ServiceError::Unavailable("service key should exist".to_string()).into(),
538                );
539            }
540        }
541        Ok(())
542    }
543
544    #[tokio::test]
545    async fn test_circuit_records_success_on_recovery() -> Result<()> {
546        // Build a chain with two noop services
547        let chain = FallbackChainService::builder()
548            .add(
549                Arc::new(NoopService),
550                CircuitBreakerImpl::new(5, Duration::from_secs(30)),
551            )
552            .build();
553
554        // Execute twice — both should succeed and circuit stays closed
555        chain.execute(make_input()).await?;
556        chain.execute(make_input()).await?;
557        Ok(())
558    }
559
560    #[tokio::test]
561    async fn test_len_and_is_empty() {
562        let empty = FallbackChainService::builder().build();
563        assert!(empty.is_empty());
564        assert_eq!(empty.len(), 0);
565
566        let one = FallbackChainService::builder()
567            .add(
568                Arc::new(NoopService),
569                CircuitBreakerImpl::new(5, Duration::from_secs(30)),
570            )
571            .build();
572        assert!(!one.is_empty());
573        assert_eq!(one.len(), 1);
574    }
575
576    #[tokio::test]
577    async fn test_default_breaker_helpers() {
578        let primary = default_primary_breaker();
579        let fallback = default_fallback_breaker();
580        assert_eq!(primary.state(), CircuitState::Closed);
581        assert_eq!(fallback.state(), CircuitState::Closed);
582    }
583}