apub_breaker_session/
lib.rs

1//! A Session implementation for limiting requests to domains that consistently fail
2//!
3//! ```rust
4//! use apub_breaker_session::BreakerSession;
5//! use std::time::Duration;
6//!
7//! // Create a session that refuses requests for an hour after 10 consecutive failures to a given domain
8//! let breaker_session = BreakerSession::limit(10, Duration::from_secs(60 * 60));
9//! ```
10
11#![deny(missing_docs)]
12
13use apub_core::session::Session;
14use dashmap::DashMap;
15use std::{
16    sync::Arc,
17    time::{Duration, Instant},
18};
19use url::{Host, Url};
20
21#[derive(Debug)]
22struct Breaker {
23    failure_count: usize,
24    broken_at: Instant,
25}
26
27type BreakerKey = (Host<String>, Option<u16>);
28
29/// The BreakerSession type
30#[derive(Clone, Debug)]
31pub struct BreakerSession {
32    limit: usize,
33    breaker_duration: Duration,
34    hosts: Arc<DashMap<BreakerKey, Breaker>>,
35}
36
37impl BreakerSession {
38    /// Create a new BreakerSession
39    ///
40    /// ```rust
41    /// use apub_breaker_session::BreakerSession;
42    /// use std::time::Duration;
43    ///
44    /// // Create a session that refuses requests for an hour after 10 consecutive failures to a given domain
45    /// let breaker_session = BreakerSession::limit(10, Duration::from_secs(60 * 60));
46    /// ```
47    pub fn limit(limit: usize, breaker_duration: Duration) -> Self {
48        Self {
49            limit,
50            breaker_duration,
51            hosts: Arc::new(DashMap::new()),
52        }
53    }
54}
55
56impl Session for BreakerSession {
57    fn should_procede(&mut self, url: &Url) -> bool {
58        if let Some(host) = url.host() {
59            let key = (host.to_owned(), url.port());
60
61            let mut breaker = self.hosts.entry(key).or_default();
62
63            if breaker.failure_count < self.limit {
64                return true;
65            }
66
67            if Instant::now() > breaker.broken_at + self.breaker_duration {
68                breaker.failure_count = 0;
69            }
70
71            false
72        } else {
73            true
74        }
75    }
76
77    fn mark_success(&mut self, url: &Url) {
78        if let Some(host) = url.host() {
79            let key = (host.to_owned(), url.port());
80            let mut breaker = self.hosts.entry(key).or_default();
81
82            breaker.failure_count = 0;
83        }
84    }
85
86    fn mark_failure(&mut self, url: &Url) {
87        if let Some(host) = url.host() {
88            let key = (host.to_owned(), url.port());
89            let mut breaker = self.hosts.entry(key).or_default();
90
91            breaker.failure_count += 1;
92            if breaker.failure_count >= self.limit {
93                breaker.broken_at = Instant::now();
94            }
95        }
96    }
97}
98
99impl Default for Breaker {
100    fn default() -> Self {
101        Self {
102            failure_count: 0,
103            broken_at: Instant::now(),
104        }
105    }
106}