1pub use failsafe;
26use failsafe::futures::CircuitBreaker;
27
28#[derive(Clone)]
30pub struct FailsafeConnectionManager<T, U>
31where
32 T: bb8::ManageConnection,
33 U: CircuitBreaker + std::marker::Send + std::marker::Sync + 'static,
34{
35 connection_manager: T,
36 circuit_breaker: U,
37}
38
39impl<T, U> FailsafeConnectionManager<T, U>
40where
41 T: bb8::ManageConnection,
42 U: CircuitBreaker + std::marker::Send + std::marker::Sync + 'static,
43{
44 pub fn new(connection_manager: T, circuit_breaker: U) -> FailsafeConnectionManager<T, U> {
46 FailsafeConnectionManager {
47 connection_manager,
48 circuit_breaker,
49 }
50 }
51}
52
53impl<T, U> bb8::ManageConnection for FailsafeConnectionManager<T, U>
54where
55 T: bb8::ManageConnection,
56 U: CircuitBreaker + std::marker::Send + std::marker::Sync + 'static,
57{
58 type Connection = T::Connection;
59 type Error = failsafe::Error<T::Error>;
60
61 async fn connect(&self) -> Result<Self::Connection, Self::Error> {
62 self.circuit_breaker
63 .call(self.connection_manager.connect())
64 .await
65 }
66
67 async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
68 self.circuit_breaker
69 .call(self.connection_manager.is_valid(conn))
70 .await
71 }
72
73 fn has_broken(&self, conn: &mut Self::Connection) -> bool {
74 self.connection_manager.has_broken(conn)
75 }
76}
77
78#[cfg(test)]
79mod tests {
80 use crate::FailsafeConnectionManager;
81 use bb8::ManageConnection;
82 use std::sync::{Arc, Mutex};
83 use tokio::runtime::Runtime;
84
85 #[derive(Clone)]
86 struct FoobarConnectionManager {
87 counter: Arc<Mutex<u32>>,
88 }
89
90 impl FoobarConnectionManager {
91 fn new() -> FoobarConnectionManager {
92 FoobarConnectionManager {
93 counter: Arc::new(Mutex::new(0)),
94 }
95 }
96 }
97
98 impl bb8::ManageConnection for FoobarConnectionManager {
99 type Connection = ();
100 type Error = ();
101
102 async fn connect(&self) -> Result<Self::Connection, Self::Error> {
103 let mut guard = self.counter.lock().unwrap();
104 *guard = *guard + 1;
105 if *guard > 3 {
106 return Err(());
107 }
108 return Ok(());
109 }
110
111 async fn is_valid(&self, _conn: &mut Self::Connection) -> Result<(), Self::Error> {
112 Ok(())
113 }
114
115 fn has_broken(&self, _conn: &mut Self::Connection) -> bool {
116 false
117 }
118 }
119
120 #[test]
121 fn simple() {
122 let circuit_breaker = failsafe::Config::new().build();
123 let foomanager = FoobarConnectionManager::new();
124
125 let rt = Runtime::new().unwrap();
126 let failsafemanager = FailsafeConnectionManager::new(foomanager, circuit_breaker);
127
128 rt.block_on(async {
129 for _ in 0..3 {
130 assert!(failsafemanager.connect().await.is_ok());
131 }
132
133 for _ in 4..5 {
134 match failsafemanager.connect().await {
135 Ok(_) => panic!(),
136 Err(e) => match e {
137 failsafe::Error::Rejected => panic!(),
138 failsafe::Error::Inner(_) => {}
139 },
140 }
141 }
142
143 for _ in 5..10 {
144 match failsafemanager.connect().await {
145 Ok(_) => panic!(),
146 Err(e) => match e {
147 failsafe::Error::Rejected => {}
148 failsafe::Error::Inner(_) => (),
149 },
150 }
151 }
152 });
153 }
154}