surf_pool/
lib.rs

1//! Connection pool for Surf
2use async_std::sync::{Mutex, MutexGuardArc};
3use async_weighted_semaphore::{Semaphore, SemaphoreGuardArc};
4use std::sync::Arc;
5use surf::Client;
6use thiserror::Error;
7
8const MAX_POOL_SIZE: usize = 100;
9/// Convenient Result redefinition that uses [SurfPoolError] as Error
10pub type Result<T> = ::std::result::Result<T, SurfPoolError>;
11
12#[derive(Clone, Debug)]
13/// The main struct, used to get a valid connection
14pub struct SurfPool {
15    pool: Vec<Arc<Mutex<Client>>>,
16    semaphore: Arc<Semaphore>,
17    health_check: Option<surf::Request>,
18}
19
20/// The builder struct, used to create a SurfPool
21#[derive(Debug, Default)]
22pub struct SurfPoolBuilder {
23    size: usize,
24    health_check: Option<surf::RequestBuilder>,
25    pre_connect: bool,
26}
27
28#[derive(Debug, Error)]
29pub enum SurfPoolError {
30    #[error("Size {0} is not valid (0 < size < {})", MAX_POOL_SIZE)]
31    SizeNotValid(usize),
32}
33
34impl SurfPoolBuilder {
35    /// This function is used to create a new builder
36    /// The parameter size is checked if is a valid and reasonable number
37    /// It cannot be 0 or bigger than 100
38    ///
39    /// ```rust
40    /// use surf_pool::SurfPoolBuilder;
41    ///
42    /// SurfPoolBuilder::new(3).unwrap();
43    /// ```
44    pub fn new(size: usize) -> Result<Self> {
45        if size == 0 || size > MAX_POOL_SIZE {
46            return Err(SurfPoolError::SizeNotValid(size));
47        }
48        Ok(SurfPoolBuilder {
49            size,
50            ..Default::default()
51        })
52    }
53    /// The health_check is a URL used to manage the connection
54    /// It's used to check the connection health status, as keepalive and
55    /// as pre-connect URL
56    ///
57    /// ```rust
58    /// use surf_pool::SurfPoolBuilder;
59    ///
60    /// let builder = SurfPoolBuilder::new(3)
61    ///     .unwrap()
62    ///     .health_check(surf::get("https://httpbin.org"));
63    /// ```
64    pub fn health_check(mut self, health_check: surf::RequestBuilder) -> Self {
65        self.health_check = Some(health_check);
66        self
67    }
68    /// If true, the connections are established during the build phase, using
69    /// the health_check. If the health_check is not defined, the pre-connection
70    /// cannot be peformed, hence it will be ignored
71    ///
72    /// ```rust
73    /// use surf_pool::SurfPoolBuilder;
74    ///
75    /// let builder = SurfPoolBuilder::new(3).
76    ///     unwrap()
77    ///     .health_check(surf::get("https://httpbin.org"))
78    ///     .pre_connect(true);
79    /// ```
80    pub fn pre_connect(mut self, pre_connect: bool) -> Self {
81        self.pre_connect = pre_connect;
82        self
83    }
84    /// The build function that creates the @SurfPool
85    /// If a health_check is available and pre_connect is set to true
86    /// the connections are established in this function
87    ///
88    /// ```rust
89    /// use surf_pool::SurfPoolBuilder;
90    ///
91    /// let builder = SurfPoolBuilder::new(3).
92    ///     unwrap()
93    ///     .health_check(surf::get("https://httpbin.org"))
94    ///     .pre_connect(true);
95    /// let pool = builder.build();
96    /// ```
97    pub async fn build(self) -> SurfPool {
98        let mut pool = Vec::with_capacity(self.size);
99        for _ in 0..self.size {
100            let m = Arc::new(Mutex::new(Client::new()));
101            pool.push(m.clone());
102        }
103        let health_check = if let Some(req) = self.health_check {
104            let req = req.build();
105
106            if self.pre_connect {
107                for m in &pool {
108                    let c = m.lock().await;
109                    c.recv_bytes(req.clone()).await.unwrap_or_default();
110                }
111            }
112            Some(req)
113        } else {
114            None
115        };
116        SurfPool {
117            pool,
118            semaphore: Arc::new(Semaphore::new(self.size)),
119            health_check,
120        }
121    }
122}
123
124#[derive(Debug)]
125pub struct Handler {
126    sg: SemaphoreGuardArc,
127    mg: MutexGuardArc<Client>,
128}
129
130impl SurfPool {
131    pub fn get_pool_size(&self) -> usize {
132        self.pool.len()
133    }
134    /// This function return an handler representing a potential connection
135    /// available in the pool.
136    /// The handler is not a connection, but a Surf client can be obtained
137    /// via [`get_client`]
138    /// If the pool is empty, the function will wait until an handler is
139    /// available again
140    /// To not starve other clients, it's important to drop the handler after
141    /// it has been used
142    /// The return type is an [`Option`], but it should never return `None`,
143    /// the system is designed in a way that, once unblocked, at least one
144    /// resources should be available
145    /// ```rust
146    /// # futures_lite::future::block_on( async {
147    ///
148    /// use surf_pool::SurfPoolBuilder;
149    ///
150    /// let builder = SurfPoolBuilder::new(3).unwrap();
151    /// let pool = builder.build().await;
152    /// let handler = pool.get_handler().await.unwrap();
153    /// # } )
154    /// ```
155    pub async fn get_handler(&self) -> Option<Handler> {
156        let sg = self.semaphore.acquire_arc(1).await.unwrap();
157        for m in &self.pool {
158            if let Some(mg) = m.try_lock_arc() {
159                return Some(Handler { sg, mg });
160            }
161        }
162        None
163    }
164}
165
166impl Handler {
167    /// This function allows you to get a Surf client that can be used
168    /// to perform an async http call
169    /// If the connection is previously established, the connection is
170    /// already ready to use
171    /// ```rust
172    /// # futures_lite::future::block_on( async {
173    ///
174    /// use surf_pool::SurfPoolBuilder;
175    ///
176    /// let builder = SurfPoolBuilder::new(3).unwrap();
177    /// let pool = builder.build().await;
178    /// let handler = pool.get_handler().await.unwrap();
179    /// handler
180    ///     .get_client()
181    ///     .get("https://httpbin.org")
182    ///     .recv_string()
183    ///     .await;
184    /// # } )
185    /// ```
186    pub fn get_client(&self) -> &Client {
187        &*self.mg
188    }
189}
190
191#[cfg(test)]
192mod tests {
193    use super::*;
194    #[async_std::test]
195    async fn with_pre_connected_pool() {
196        let builder = SurfPoolBuilder::new(3)
197            .unwrap()
198            .health_check(surf::get("https://pot.pizzamig.dev"))
199            .pre_connect(true);
200        let uut = builder.build().await;
201        assert_eq!(uut.get_pool_size(), 3);
202        let handler = uut.get_handler().await;
203        assert!(handler.is_some());
204        let handler = handler.unwrap();
205        handler
206            .get_client()
207            .get("https://pot.pizzamig.dev")
208            .recv_string()
209            .await
210            .unwrap();
211        let h2 = uut.get_handler().await;
212        assert!(h2.is_some());
213        let h2 = h2.unwrap();
214        h2.get_client()
215            .get("https://pot.pizzamig.dev")
216            .recv_string()
217            .await
218            .unwrap();
219    }
220
221    #[async_std::test]
222    async fn not_pre_connected_pool() {
223        let builder = SurfPoolBuilder::new(3)
224            .unwrap()
225            .health_check(surf::get("https://pot.pizzamig.dev"))
226            .pre_connect(false);
227        let uut = builder.build().await;
228        assert_eq!(uut.get_pool_size(), 3);
229        let handler = uut.get_handler().await;
230        assert!(handler.is_some());
231        let handler = handler.unwrap();
232        handler
233            .get_client()
234            .get("https://pot.pizzamig.dev")
235            .recv_string()
236            .await
237            .unwrap();
238        drop(handler);
239        let h2 = uut.get_handler().await;
240        assert!(h2.is_some());
241        let h2 = h2.unwrap();
242        h2.get_client()
243            .get("https://pot.pizzamig.dev")
244            .recv_string()
245            .await
246            .unwrap();
247    }
248}