1use crate::{Connection, Result, executor::Executor, transport::PgTransport};
3
4mod config;
5
6#[cfg(feature = "tokio")]
7mod worker;
8
9pub use config::PoolConfig;
10
11#[derive(Debug)]
13#[clippy::has_significant_drop]
14pub struct Pool {
15 conn: Option<Connection>,
16 #[cfg(feature = "tokio")]
17 handle: worker::WorkerHandle,
18 #[cfg(not(feature = "tokio"))]
19 handle: mock_handle::WorkerHandle,
20}
21
22impl Drop for Pool {
23 fn drop(&mut self) {
24 if let Some(conn) = self.conn.take() {
25 self.handle.release(conn);
26 }
27 }
28}
29
30impl Clone for Pool {
31 fn clone(&self) -> Self {
32 Self {
33 conn: None,
34 handle: self.handle.clone(),
35 }
36 }
37}
38
39impl Pool {
40 pub async fn connect(url: &str) -> Result<Self> {
42 PoolConfig::from_env().connect(url).await
43 }
44
45 pub fn connect_lazy(url: &str) -> Result<Self> {
47 PoolConfig::from_env().connect_lazy(url)
48 }
49
50 pub async fn connect_env() -> Result<Pool> {
56 Self::connect_with(PoolConfig::from_env()).await
57 }
58
59 pub async fn connect_with(config: PoolConfig) -> Result<Self> {
61 #[cfg(feature = "tokio")]
62 {
63 let (handle,worker) = worker::WorkerHandle::new(config);
64 tokio::spawn(worker);
65 Ok(Self { conn: None, handle })
66 }
67
68 #[cfg(not(feature = "tokio"))]
69 {
70 let _ = config;
71 panic!("runtime disabled")
72 }
73 }
74
75 pub fn connect_lazy_with(config: PoolConfig) -> Self {
77 #[cfg(feature = "tokio")]
78 {
79 let (handle,worker) = worker::WorkerHandle::new(config);
80 tokio::spawn(worker);
81 Self { conn: None, handle }
82 }
83
84 #[cfg(not(feature = "tokio"))]
85 {
86 let _ = config;
87 panic!("runtime disabled")
88 }
89 }
90
91 fn poll_connection(&mut self, cx: &mut std::task::Context) -> std::task::Poll<Result<Connection>> {
92 self.handle.poll_acquire(cx)
93 }
94}
95
96impl Executor for Pool {
97 type Transport = PoolConnection<'static>;
98
99 type Future = PoolConnect<'static>;
100
101 fn connection(self) -> Self::Future {
102 PoolConnect { pool: Some(PoolCow::Owned(self)) }
103 }
104}
105
106impl Executor for &Pool {
107 type Transport = PoolConnection<'static>;
108
109 type Future = PoolConnect<'static>;
110
111 fn connection(self) -> Self::Future {
112 PoolConnect { pool: Some(PoolCow::Owned(self.clone())) }
113 }
114}
115
116impl<'a> Executor for &'a mut Pool {
117 type Transport = PoolConnection<'a>;
118
119 type Future = PoolConnect<'a>;
120
121 fn connection(self) -> Self::Future {
122 PoolConnect { pool: Some(PoolCow::Borrow(self)) }
123 }
124}
125
126#[derive(Debug)]
128pub struct PoolConnect<'a> {
129 pool: Option<PoolCow<'a>>,
130}
131
132impl<'a> Future for PoolConnect<'a> {
133 type Output = Result<PoolConnection<'a>>;
134
135 fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
136 use std::task::Poll::*;
137 if let Some(conn) = self.pool.as_mut().unwrap().as_mut().conn.take() {
138 return Ready(Ok(PoolConnection { conn: Some(conn), pool: self.pool.take().unwrap() }))
139 }
140 let conn = std::task::ready!(self.pool.as_mut().unwrap().as_mut().poll_connection(cx)?);
141 crate::common::verbose!(target: "pool_handle", "pool connection checkout");
142 Ready(Ok(PoolConnection { conn: Some(conn), pool: self.pool.take().unwrap() }))
143 }
144}
145
146#[derive(Debug)]
148pub struct PoolConnection<'a> {
149 pool: PoolCow<'a>,
150 conn: Option<Connection>,
151}
152
153#[derive(Debug)]
154enum PoolCow<'a> {
155 Borrow(&'a mut Pool),
156 Owned(Pool),
157}
158
159impl PoolCow<'_> {
160 fn as_ref(&self) -> &Pool {
161 match self {
162 PoolCow::Borrow(pool) => pool,
163 PoolCow::Owned(pool) => pool,
164 }
165 }
166
167 fn as_mut(&mut self) -> &mut Pool {
168 match self {
169 PoolCow::Borrow(pool) => pool,
170 PoolCow::Owned(pool) => pool,
171 }
172 }
173}
174
175impl PoolConnection<'_> {
176 pub fn pool(&self) -> &Pool {
178 self.pool.as_ref()
179 }
180
181 pub fn connection(&mut self) -> &mut Connection {
183 self.conn.as_mut().unwrap()
185 }
186}
187
188impl Drop for PoolConnection<'_> {
189 fn drop(&mut self) {
190 self.pool.as_mut().conn = self.conn.take();
191 }
192}
193
194impl PgTransport for PoolConnection<'_> {
195 fn poll_flush(&mut self, cx: &mut std::task::Context) -> std::task::Poll<std::io::Result<()>> {
196 self.connection().poll_flush(cx)
197 }
198
199 fn poll_recv<B: crate::postgres::BackendProtocol>(&mut self, cx: &mut std::task::Context) -> std::task::Poll<Result<B>> {
200 self.connection().poll_recv(cx)
201 }
202
203 fn ready_request(&mut self) {
204 self.connection().ready_request();
205 }
206
207 fn send<F: crate::postgres::FrontendProtocol>(&mut self, message: F) {
208 self.connection().send(message);
209 }
210
211 fn send_startup(&mut self, startup: crate::postgres::frontend::Startup) {
212 self.connection().send_startup(startup);
213 }
214
215 fn get_stmt(&mut self, sql: u64) -> Option<crate::statement::StatementName> {
216 self.connection().get_stmt(sql)
217 }
218
219 fn add_stmt(&mut self, sql: u64, id: crate::statement::StatementName) {
220 self.connection().add_stmt(sql, id);
221 }
222}
223
224#[cfg(not(feature = "tokio"))]
225mod mock_handle {
226 use std::task::{Context, Poll};
227
228 use crate::{Connection, Result};
229
230 #[derive(Debug, Clone)]
231 pub struct WorkerHandle;
232
233 impl WorkerHandle {
234 pub fn poll_acquire(&mut self, _: &mut Context) -> Poll<Result<Connection>> {
235 unreachable!()
236 }
237
238 pub fn release(&self, _: Connection) {
239 unreachable!()
240 }
241 }
242}
243