apub_breaker_session/
lib.rs1#![deny(missing_docs)]
12
13use apub_core::session::Session;
14use dashmap::DashMap;
15use std::{
16 sync::Arc,
17 time::{Duration, Instant},
18};
19use url::{Host, Url};
20
21#[derive(Debug)]
22struct Breaker {
23 failure_count: usize,
24 broken_at: Instant,
25}
26
27type BreakerKey = (Host<String>, Option<u16>);
28
29#[derive(Clone, Debug)]
31pub struct BreakerSession {
32 limit: usize,
33 breaker_duration: Duration,
34 hosts: Arc<DashMap<BreakerKey, Breaker>>,
35}
36
37impl BreakerSession {
38 pub fn limit(limit: usize, breaker_duration: Duration) -> Self {
48 Self {
49 limit,
50 breaker_duration,
51 hosts: Arc::new(DashMap::new()),
52 }
53 }
54}
55
56impl Session for BreakerSession {
57 fn should_procede(&mut self, url: &Url) -> bool {
58 if let Some(host) = url.host() {
59 let key = (host.to_owned(), url.port());
60
61 let mut breaker = self.hosts.entry(key).or_default();
62
63 if breaker.failure_count < self.limit {
64 return true;
65 }
66
67 if Instant::now() > breaker.broken_at + self.breaker_duration {
68 breaker.failure_count = 0;
69 }
70
71 false
72 } else {
73 true
74 }
75 }
76
77 fn mark_success(&mut self, url: &Url) {
78 if let Some(host) = url.host() {
79 let key = (host.to_owned(), url.port());
80 let mut breaker = self.hosts.entry(key).or_default();
81
82 breaker.failure_count = 0;
83 }
84 }
85
86 fn mark_failure(&mut self, url: &Url) {
87 if let Some(host) = url.host() {
88 let key = (host.to_owned(), url.port());
89 let mut breaker = self.hosts.entry(key).or_default();
90
91 breaker.failure_count += 1;
92 if breaker.failure_count >= self.limit {
93 breaker.broken_at = Instant::now();
94 }
95 }
96 }
97}
98
99impl Default for Breaker {
100 fn default() -> Self {
101 Self {
102 failure_count: 0,
103 broken_at: Instant::now(),
104 }
105 }
106}