1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
//! # An asynchronous connection pool. //! some code come from //! [bb8](https://docs.rs/bb8/0.3.1/bb8/) //! [L3-37](https://github.com/OneSignal/L3-37/) //! //! ## feature //! //! * `default` - multi thread pool where `Send` bound is needed for all futures. //! * `no-send` - single thread pool where `!Send` futures are accepted. //! //! # Known Limitation: //! can't be used in nested runtimes. //! //! # Example: //! ``` //! // This example shows how to implement the pool on async_std runtime. //! // Most of the xxx-tang crates are implemented with tokio runtime so they can be seen as examples on that matter. //! //! use std::fmt::{Debug, Formatter, Result as FmtResult}; //! use std::future::Future; //! use std::sync::atomic::{AtomicUsize, Ordering}; //! use std::time::{Duration, Instant}; //! //! use async_std::task; //! use smol::Timer; //! use tang_rs::{Builder, Manager, ManagerFuture, ManagerTimeout}; //! //! // our test pool would just generate usize from 0 as connections. //! struct TestPoolManager(AtomicUsize); //! //! impl TestPoolManager { //! fn new() -> Self { //! TestPoolManager(AtomicUsize::new(0)) //! } //! } //! //! // dummy error type //! struct TestPoolError; //! //! impl Debug for TestPoolError { //! fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { //! f.debug_struct("TestPoolError") //! .field("source", &"Unknown") //! .finish() //! } //! } //! //! // convert instant as timeout error to our pool error. //! impl From<Instant> for TestPoolError { //! fn from(_: Instant) -> Self { //! TestPoolError //! } //! } //! //! impl Manager for TestPoolManager { //! type Connection = usize; //! type Error = TestPoolError; //! type Timeout = Timer; //! type TimeoutError = Instant; //! //! fn connect(&self) -> ManagerFuture<'_, Result<Self::Connection, Self::Error>> { //! // how we generate new connections and put them into pool. //! Box::pin(async move { Ok(self.0.fetch_add(1, Ordering::SeqCst)) }) //! } //! //! fn is_valid<'a>( //! &self, //! _conn: &'a mut Self::Connection, //! ) -> ManagerFuture<'a, Result<(), Self::Error>> { //! Box::pin(async { //! // when the connection is pulled from the pool we can check if it's valid. //! Ok(()) //! }) //! } //! //! fn is_closed(&self, _conn: &mut Self::Connection) -> bool { //! // return true if you check the connection and want it to be dropped from the pool because it's closed. //! false //! } //! //! fn spawn<Fut>(&self, fut: Fut) //! where //! Fut: Future<Output = ()> + Send + 'static, //! { //! // some pool inner functions would want to spawn on your executor. //! // you can use the handler to further manage them if you want. //! // normally we just spawn the task and forget about it. //! let _handler = task::spawn(fut); //! } //! //! // Boilerplate implement for runtime specific timeout future. //! fn timeout<Fut: Future>(&self,fut: Fut, dur: Duration) -> ManagerTimeout<Fut, Self::Timeout> { //! ManagerTimeout::new(fut, Timer::after(dur)) //! } //! } //! //! #[async_std::main] //! async fn main() { //! let mgr = TestPoolManager::new(); //! //! let builder = Builder::new() //! .always_check(false) //! .idle_timeout(None) //! .max_lifetime(None) //! .min_idle(24) //! .max_size(24) //! .build(mgr); //! //! let pool = builder.await.expect("fail to build pool"); //! //! // spawn 24 futures and pull connections from pool at the same time. //! let (tx, rx) = async_std::sync::channel(100); //! for _i in 0..24 { //! let pool = pool.clone(); //! let tx = tx.clone(); //! task::spawn(async move { //! let mut pool_ref = pool.get().await.expect("fail to get PoolRef"); //! let conn_ref = &*pool_ref; //! println!("we have the reference of a connection : {:?}", conn_ref); //! //! // we can also get a mut reference from pool_ref //! let conn_ref = &mut *pool_ref; //! //! let _ = tx.send(*conn_ref); //! }); //! } //! drop(tx); //! //! while let Ok(_connection) = rx.recv().await { //! // We just wait until all connections are pulled out once //! } //! } //!``` pub use builder::Builder; pub use manager::{GarbageCollect, Manager, ManagerFuture, ManagerInterval, ScheduleReaping}; pub use pool::{Pool, PoolRef, PoolRefOwned, SharedManagedPool}; pub use util::timeout::ManagerTimeout; mod builder; mod manager; mod pool; mod pool_inner; mod util; #[cfg(all(feature = "default", feature = "no-send"))] compile_error!("only one of 'default' or 'no-send' features can be enabled");