bb8_failsafe/
lib.rs

1//! thin wapper of failsafe-rs to provide circuit breaker captilites to bb8.
2//!
3//! # Example
4//!
5//! Using an imaginary "foodb" database.
6//!
7//! ```ignore
8//! #[tokio::main]
9//! async fn main() {
10//!     let manager = bb8_foodb::FooConnectionManager::new("localhost:1234");
11//!     let circuitbreaker = bb8_failsafe::failsafe::Config::new().build();
12//!     let safemanager = bb8_failsafe::FailsafeConnectionManager::new(manager, circuitbreaker);
13//!     let pool = bb8::Pool::builder().build(safemanager).await.unwrap();
14//!
15//!     for _ in 0..20 {
16//!         let pool = pool.clone();
17//!         tokio::spawn(async move {
18//!             let conn = pool.get().await.unwrap();
19//!             // use the connection
20//!             // it will be returned to the pool when it falls out of scope.
21//!         });
22//!     }
23//! }
24//! ```
25pub use failsafe;
26use failsafe::futures::CircuitBreaker;
27
28/// A genric bb8::ConnectionManager wrapped in failsafe-rs
29#[derive(Clone)]
30pub struct FailsafeConnectionManager<T, U>
31where
32    T: bb8::ManageConnection,
33    U: CircuitBreaker + std::marker::Send + std::marker::Sync + 'static,
34{
35    connection_manager: T,
36    circuit_breaker: U,
37}
38
39impl<T, U> FailsafeConnectionManager<T, U>
40where
41    T: bb8::ManageConnection,
42    U: CircuitBreaker + std::marker::Send + std::marker::Sync + 'static,
43{
44    /// Create a new FailsafeConnectionManager consuming a ConnectionManager and CircuitBreaker
45    pub fn new(connection_manager: T, circuit_breaker: U) -> FailsafeConnectionManager<T, U> {
46        FailsafeConnectionManager {
47            connection_manager,
48            circuit_breaker,
49        }
50    }
51}
52
53impl<T, U> bb8::ManageConnection for FailsafeConnectionManager<T, U>
54where
55    T: bb8::ManageConnection,
56    U: CircuitBreaker + std::marker::Send + std::marker::Sync + 'static,
57{
58    type Connection = T::Connection;
59    type Error = failsafe::Error<T::Error>;
60
61    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
62        self.circuit_breaker
63            .call(self.connection_manager.connect())
64            .await
65    }
66
67    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
68        self.circuit_breaker
69            .call(self.connection_manager.is_valid(conn))
70            .await
71    }
72
73    fn has_broken(&self, conn: &mut Self::Connection) -> bool {
74        self.connection_manager.has_broken(conn)
75    }
76}
77
78#[cfg(test)]
79mod tests {
80    use crate::FailsafeConnectionManager;
81    use bb8::ManageConnection;
82    use std::sync::{Arc, Mutex};
83    use tokio::runtime::Runtime;
84
85    #[derive(Clone)]
86    struct FoobarConnectionManager {
87        counter: Arc<Mutex<u32>>,
88    }
89
90    impl FoobarConnectionManager {
91        fn new() -> FoobarConnectionManager {
92            FoobarConnectionManager {
93                counter: Arc::new(Mutex::new(0)),
94            }
95        }
96    }
97
98    impl bb8::ManageConnection for FoobarConnectionManager {
99        type Connection = ();
100        type Error = ();
101
102        async fn connect(&self) -> Result<Self::Connection, Self::Error> {
103            let mut guard = self.counter.lock().unwrap();
104            *guard = *guard + 1;
105            if *guard > 3 {
106                return Err(());
107            }
108            return Ok(());
109        }
110
111        async fn is_valid(&self, _conn: &mut Self::Connection) -> Result<(), Self::Error> {
112            Ok(())
113        }
114
115        fn has_broken(&self, _conn: &mut Self::Connection) -> bool {
116            false
117        }
118    }
119
120    #[test]
121    fn simple() {
122        let circuit_breaker = failsafe::Config::new().build();
123        let foomanager = FoobarConnectionManager::new();
124
125        let rt = Runtime::new().unwrap();
126        let failsafemanager = FailsafeConnectionManager::new(foomanager, circuit_breaker);
127
128        rt.block_on(async {
129            for _ in 0..3 {
130                assert!(failsafemanager.connect().await.is_ok());
131            }
132
133            for _ in 4..5 {
134                match failsafemanager.connect().await {
135                    Ok(_) => panic!(),
136                    Err(e) => match e {
137                        failsafe::Error::Rejected => panic!(),
138                        failsafe::Error::Inner(_) => {}
139                    },
140                }
141            }
142
143            for _ in 5..10 {
144                match failsafemanager.connect().await {
145                    Ok(_) => panic!(),
146                    Err(e) => match e {
147                        failsafe::Error::Rejected => {}
148                        failsafe::Error::Inner(_) => (),
149                    },
150                }
151            }
152        });
153    }
154}