surf_pool/lib.rs
1//! Connection pool for Surf
2use async_std::sync::{Mutex, MutexGuardArc};
3use async_weighted_semaphore::{Semaphore, SemaphoreGuardArc};
4use std::sync::Arc;
5use surf::Client;
6use thiserror::Error;
7
8const MAX_POOL_SIZE: usize = 100;
9/// Convenient Result redefinition that uses [SurfPoolError] as Error
10pub type Result<T> = ::std::result::Result<T, SurfPoolError>;
11
12#[derive(Clone, Debug)]
13/// The main struct, used to get a valid connection
14pub struct SurfPool {
15 pool: Vec<Arc<Mutex<Client>>>,
16 semaphore: Arc<Semaphore>,
17 health_check: Option<surf::Request>,
18}
19
20/// The builder struct, used to create a SurfPool
21#[derive(Debug, Default)]
22pub struct SurfPoolBuilder {
23 size: usize,
24 health_check: Option<surf::RequestBuilder>,
25 pre_connect: bool,
26}
27
28#[derive(Debug, Error)]
29pub enum SurfPoolError {
30 #[error("Size {0} is not valid (0 < size < {})", MAX_POOL_SIZE)]
31 SizeNotValid(usize),
32}
33
34impl SurfPoolBuilder {
35 /// This function is used to create a new builder
36 /// The parameter size is checked if is a valid and reasonable number
37 /// It cannot be 0 or bigger than 100
38 ///
39 /// ```rust
40 /// use surf_pool::SurfPoolBuilder;
41 ///
42 /// SurfPoolBuilder::new(3).unwrap();
43 /// ```
44 pub fn new(size: usize) -> Result<Self> {
45 if size == 0 || size > MAX_POOL_SIZE {
46 return Err(SurfPoolError::SizeNotValid(size));
47 }
48 Ok(SurfPoolBuilder {
49 size,
50 ..Default::default()
51 })
52 }
53 /// The health_check is a URL used to manage the connection
54 /// It's used to check the connection health status, as keepalive and
55 /// as pre-connect URL
56 ///
57 /// ```rust
58 /// use surf_pool::SurfPoolBuilder;
59 ///
60 /// let builder = SurfPoolBuilder::new(3)
61 /// .unwrap()
62 /// .health_check(surf::get("https://httpbin.org"));
63 /// ```
64 pub fn health_check(mut self, health_check: surf::RequestBuilder) -> Self {
65 self.health_check = Some(health_check);
66 self
67 }
68 /// If true, the connections are established during the build phase, using
69 /// the health_check. If the health_check is not defined, the pre-connection
70 /// cannot be peformed, hence it will be ignored
71 ///
72 /// ```rust
73 /// use surf_pool::SurfPoolBuilder;
74 ///
75 /// let builder = SurfPoolBuilder::new(3).
76 /// unwrap()
77 /// .health_check(surf::get("https://httpbin.org"))
78 /// .pre_connect(true);
79 /// ```
80 pub fn pre_connect(mut self, pre_connect: bool) -> Self {
81 self.pre_connect = pre_connect;
82 self
83 }
84 /// The build function that creates the @SurfPool
85 /// If a health_check is available and pre_connect is set to true
86 /// the connections are established in this function
87 ///
88 /// ```rust
89 /// use surf_pool::SurfPoolBuilder;
90 ///
91 /// let builder = SurfPoolBuilder::new(3).
92 /// unwrap()
93 /// .health_check(surf::get("https://httpbin.org"))
94 /// .pre_connect(true);
95 /// let pool = builder.build();
96 /// ```
97 pub async fn build(self) -> SurfPool {
98 let mut pool = Vec::with_capacity(self.size);
99 for _ in 0..self.size {
100 let m = Arc::new(Mutex::new(Client::new()));
101 pool.push(m.clone());
102 }
103 let health_check = if let Some(req) = self.health_check {
104 let req = req.build();
105
106 if self.pre_connect {
107 for m in &pool {
108 let c = m.lock().await;
109 c.recv_bytes(req.clone()).await.unwrap_or_default();
110 }
111 }
112 Some(req)
113 } else {
114 None
115 };
116 SurfPool {
117 pool,
118 semaphore: Arc::new(Semaphore::new(self.size)),
119 health_check,
120 }
121 }
122}
123
124#[derive(Debug)]
125pub struct Handler {
126 sg: SemaphoreGuardArc,
127 mg: MutexGuardArc<Client>,
128}
129
130impl SurfPool {
131 pub fn get_pool_size(&self) -> usize {
132 self.pool.len()
133 }
134 /// This function return an handler representing a potential connection
135 /// available in the pool.
136 /// The handler is not a connection, but a Surf client can be obtained
137 /// via [`get_client`]
138 /// If the pool is empty, the function will wait until an handler is
139 /// available again
140 /// To not starve other clients, it's important to drop the handler after
141 /// it has been used
142 /// The return type is an [`Option`], but it should never return `None`,
143 /// the system is designed in a way that, once unblocked, at least one
144 /// resources should be available
145 /// ```rust
146 /// # futures_lite::future::block_on( async {
147 ///
148 /// use surf_pool::SurfPoolBuilder;
149 ///
150 /// let builder = SurfPoolBuilder::new(3).unwrap();
151 /// let pool = builder.build().await;
152 /// let handler = pool.get_handler().await.unwrap();
153 /// # } )
154 /// ```
155 pub async fn get_handler(&self) -> Option<Handler> {
156 let sg = self.semaphore.acquire_arc(1).await.unwrap();
157 for m in &self.pool {
158 if let Some(mg) = m.try_lock_arc() {
159 return Some(Handler { sg, mg });
160 }
161 }
162 None
163 }
164}
165
166impl Handler {
167 /// This function allows you to get a Surf client that can be used
168 /// to perform an async http call
169 /// If the connection is previously established, the connection is
170 /// already ready to use
171 /// ```rust
172 /// # futures_lite::future::block_on( async {
173 ///
174 /// use surf_pool::SurfPoolBuilder;
175 ///
176 /// let builder = SurfPoolBuilder::new(3).unwrap();
177 /// let pool = builder.build().await;
178 /// let handler = pool.get_handler().await.unwrap();
179 /// handler
180 /// .get_client()
181 /// .get("https://httpbin.org")
182 /// .recv_string()
183 /// .await;
184 /// # } )
185 /// ```
186 pub fn get_client(&self) -> &Client {
187 &*self.mg
188 }
189}
190
191#[cfg(test)]
192mod tests {
193 use super::*;
194 #[async_std::test]
195 async fn with_pre_connected_pool() {
196 let builder = SurfPoolBuilder::new(3)
197 .unwrap()
198 .health_check(surf::get("https://pot.pizzamig.dev"))
199 .pre_connect(true);
200 let uut = builder.build().await;
201 assert_eq!(uut.get_pool_size(), 3);
202 let handler = uut.get_handler().await;
203 assert!(handler.is_some());
204 let handler = handler.unwrap();
205 handler
206 .get_client()
207 .get("https://pot.pizzamig.dev")
208 .recv_string()
209 .await
210 .unwrap();
211 let h2 = uut.get_handler().await;
212 assert!(h2.is_some());
213 let h2 = h2.unwrap();
214 h2.get_client()
215 .get("https://pot.pizzamig.dev")
216 .recv_string()
217 .await
218 .unwrap();
219 }
220
221 #[async_std::test]
222 async fn not_pre_connected_pool() {
223 let builder = SurfPoolBuilder::new(3)
224 .unwrap()
225 .health_check(surf::get("https://pot.pizzamig.dev"))
226 .pre_connect(false);
227 let uut = builder.build().await;
228 assert_eq!(uut.get_pool_size(), 3);
229 let handler = uut.get_handler().await;
230 assert!(handler.is_some());
231 let handler = handler.unwrap();
232 handler
233 .get_client()
234 .get("https://pot.pizzamig.dev")
235 .recv_string()
236 .await
237 .unwrap();
238 drop(handler);
239 let h2 = uut.get_handler().await;
240 assert!(h2.is_some());
241 let h2 = h2.unwrap();
242 h2.get_client()
243 .get("https://pot.pizzamig.dev")
244 .recv_string()
245 .await
246 .unwrap();
247 }
248}