xitca_postgres/client.rs
1use core::future::Future;
2
3use std::{collections::HashMap, sync::Mutex};
4
5use xitca_unsafe_collection::no_hash::NoHashBuilder;
6
7use super::{
8 copy::{CopyIn, CopyOut},
9 driver::{
10 DriverTx,
11 codec::{Response, encode::Encode, response::IntoResponse},
12 },
13 error::Error,
14 session::Session,
15 statement::Statement,
16 transaction::{Transaction, TransactionBuilder},
17 types::{Oid, Type},
18};
19
20/// a marker trait to confirm a mut reference of Client can be borrowed from self.
21///
22/// this is necessary for custom [Client] new types who want to utilize [Transaction] and [CopyIn].
23/// these types and their functions only work properly when [Client] is exclusively borrowed.
24///
25/// # Examples
26/// ```rust
27/// use std::sync::Arc;
28///
29/// use xitca_postgres::{dev::{ClientBorrow, ClientBorrowMut}, transaction::TransactionBuilder, Client, Error};
30///
31/// // a client wrapper use reference counted smart pointer.
32/// // it's easy to create multiple instance of &mut SharedClient with help of cloning of smart pointer
33/// // and none of them can be used correctly with Transaction nor CopyIn
34/// #[derive(Clone)]
35/// struct SharedClient(Arc<Client>);
36///
37/// // boilerplate impl required by ClientBorrowMut trait
38/// impl ClientBorrow for SharedClient {
39/// fn borrow_cli_ref(&self) -> &Client {
40/// &self.0
41/// }
42/// }
43///
44/// // client new type has to impl this trait to mark they can truly offer a mutable reference to Client
45/// impl ClientBorrowMut for SharedClient {
46/// fn borrow_cli_mut(&mut self) -> &mut Client {
47/// Arc::get_mut(&mut self.0).expect("you can't safely implement this trait with SharedClient.")
48/// }
49/// }
50///
51/// async fn borrow_mut(cli: Client) -> Result<(), Error> {
52/// let mut cli = SharedClient(Arc::new(cli));
53///
54/// // this line works fine as shared client is exclusivly borrowed by transaction
55/// let _ = TransactionBuilder::new().begin(&mut cli).await?;
56///
57/// // clone shared client before starting a transaction.
58/// let _cli2 = cli.clone();
59///
60/// // this line will panic as shared client has another copy being held by cli2 varaiable
61/// let _ = TransactionBuilder::new().begin(&mut cli).await?;
62///
63/// Ok(())
64/// }
65/// ```
66///
67/// [Transaction]: crate::transaction::Transaction
68/// [CopyIn]: crate::copy::CopyIn
69pub trait ClientBorrowMut: ClientBorrow {
70 fn borrow_cli_mut(&mut self) -> &mut Client;
71}
72
73impl<T> ClientBorrowMut for &mut T
74where
75 T: ClientBorrowMut,
76{
77 #[inline]
78 fn borrow_cli_mut(&mut self) -> &mut Client {
79 T::borrow_cli_mut(*self)
80 }
81}
82
83/// super set of [`ClientBorrowMut`] trait
84pub trait ClientBorrow {
85 fn borrow_cli_ref(&self) -> &Client;
86}
87
88impl<T> ClientBorrow for &T
89where
90 T: ClientBorrow,
91{
92 #[inline]
93 fn borrow_cli_ref(&self) -> &Client {
94 T::borrow_cli_ref(*self)
95 }
96}
97
98impl<T> ClientBorrow for &mut T
99where
100 T: ClientBorrow,
101{
102 #[inline]
103 fn borrow_cli_ref(&self) -> &Client {
104 T::borrow_cli_ref(&**self)
105 }
106}
107/// Client is a handler type for [`Driver`]. it interacts with latter using channel and message for IO operation
108/// and de/encoding of postgres protocol in byte format.
109///
110/// Client expose a set of high level API to make the interaction represented in Rust function and types.
111///
112/// # Lifetime
113/// Client and [`Driver`] have a dependent lifetime where either side can trigger the other part to shutdown.
114/// From Client side it's in the form of dropping ownership.
115/// ## Examples
116/// ```
117/// # use core::future::IntoFuture;
118/// # use xitca_postgres::{error::Error, Config, Postgres};
119/// # async fn shut_down(cfg: Config) -> Result<(), Error> {
120/// // connect to a database and spawn driver as async task
121/// let (cli, drv) = Postgres::new(cfg).connect().await?;
122/// let handle = tokio::spawn(drv.into_future());
123///
124/// // drop client after finished usage
125/// drop(cli);
126///
127/// // client would notify driver to shutdown when it's dropped.
128/// // await on the handle would return a Result of the shutdown outcome from driver side.
129/// let _ = handle.await.unwrap();
130///
131/// # Ok(())
132/// # }
133/// ```
134///
135/// [`Driver`]: crate::driver::Driver
136pub struct Client {
137 pub(crate) tx: DriverTx,
138 pub(crate) cache: Box<ClientCache>,
139}
140
141pub(crate) struct ClientCache {
142 session: Session,
143 type_info: Mutex<CachedTypeInfo>,
144}
145
146/// A cache of type info and prepared statements for fetching type info
147/// (corresponding to the queries in the [prepare](prepare) module).
148struct CachedTypeInfo {
149 /// A statement for basic information for a type from its
150 /// OID. Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_QUERY) (or its
151 /// fallback).
152 typeinfo: Option<Statement>,
153 /// A statement for getting information for a composite type from its OID.
154 /// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY).
155 typeinfo_composite: Option<Statement>,
156 /// A statement for getting information for a composite type from its OID.
157 /// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY) (or
158 /// its fallback).
159 typeinfo_enum: Option<Statement>,
160 /// Cache of types already looked up.
161 types: HashMap<Oid, Type, NoHashBuilder>,
162}
163
164impl Client {
165 /// start a transaction
166 #[inline]
167 pub fn transaction(&mut self) -> impl Future<Output = Result<Transaction<'_, Self>, Error>> + Send {
168 TransactionBuilder::new().begin(self)
169 }
170
171 /// owned version of [`Client::transaction`]
172 #[inline]
173 pub fn transaction_owned(self) -> impl Future<Output = Result<Transaction<'static, Self>, Error>> + Send {
174 TransactionBuilder::new().begin_owned(self)
175 }
176
177 /// Executes a `COPY FROM STDIN` statement, returning a sink used to write the copy data.
178 ///
179 /// PostgreSQL does not support parameters in `COPY` statements, so this method does not take any. The copy *must*
180 /// be explicitly completed via [`CopyIn::finish`]. If it is not, the copy will be aborted.
181 #[inline]
182 pub fn copy_in(&mut self, stmt: &Statement) -> impl Future<Output = Result<CopyIn<'_, Self>, Error>> + Send {
183 CopyIn::new(self, stmt)
184 }
185
186 /// Executes a `COPY TO STDOUT` statement, returning async stream of the resulting data.
187 ///
188 /// PostgreSQL does not support parameters in `COPY` statements, so this method does not take any.
189 #[inline]
190 pub async fn copy_out(&self, stmt: &Statement) -> Result<CopyOut, Error> {
191 CopyOut::new(self, stmt).await
192 }
193
194 /// Constructs a cancellation token that can later be used to request cancellation of a query running on the
195 /// connection associated with this client.
196 pub fn cancel_token(&self) -> Session {
197 Session::clone(&self.cache.session)
198 }
199
200 /// a lossy hint of running state of io driver. an io driver shutdown can happen at the same time this api is called.
201 pub fn closed(&self) -> bool {
202 self.tx.is_closed()
203 }
204
205 pub fn typeinfo(&self) -> Option<Statement> {
206 self.cache
207 .type_info
208 .lock()
209 .unwrap()
210 .typeinfo
211 .as_ref()
212 .map(Statement::duplicate)
213 }
214
215 pub fn set_typeinfo(&self, statement: &Statement) {
216 self.cache.type_info.lock().unwrap().typeinfo = Some(statement.duplicate());
217 }
218
219 pub fn typeinfo_composite(&self) -> Option<Statement> {
220 self.cache
221 .type_info
222 .lock()
223 .unwrap()
224 .typeinfo_composite
225 .as_ref()
226 .map(Statement::duplicate)
227 }
228
229 pub fn set_typeinfo_composite(&self, statement: &Statement) {
230 self.cache.type_info.lock().unwrap().typeinfo_composite = Some(statement.duplicate());
231 }
232
233 pub fn typeinfo_enum(&self) -> Option<Statement> {
234 self.cache
235 .type_info
236 .lock()
237 .unwrap()
238 .typeinfo_enum
239 .as_ref()
240 .map(Statement::duplicate)
241 }
242
243 pub fn set_typeinfo_enum(&self, statement: &Statement) {
244 self.cache.type_info.lock().unwrap().typeinfo_enum = Some(statement.duplicate());
245 }
246
247 pub fn type_(&self, oid: Oid) -> Option<Type> {
248 self.cache.type_info.lock().unwrap().types.get(&oid).cloned()
249 }
250
251 pub fn set_type(&self, oid: Oid, type_: &Type) {
252 self.cache.type_info.lock().unwrap().types.insert(oid, type_.clone());
253 }
254
255 /// Clears the client's type information cache.
256 ///
257 /// When user-defined types are used in a query, the client loads their definitions from the database and caches
258 /// them for the lifetime of the client. If those definitions are changed in the database, this method can be used
259 /// to flush the local cache and allow the new, updated definitions to be loaded.
260 pub fn clear_type_cache(&self) {
261 self.cache.type_info.lock().unwrap().types.clear();
262 }
263
264 #[inline]
265 pub(crate) fn query<S>(&self, stmt: S) -> Result<<S::Output as IntoResponse>::Response, Error>
266 where
267 S: Encode,
268 {
269 self.query_raw(stmt).map(|(opt, res)| opt.into_response(res))
270 }
271
272 #[inline]
273 pub(crate) fn query_raw<S>(&self, stmt: S) -> Result<(S::Output, Response), Error>
274 where
275 S: Encode,
276 {
277 self.tx.send(|buf| stmt.encode(buf))
278 }
279
280 pub(crate) fn new(tx: DriverTx, session: Session) -> Self {
281 Self {
282 tx,
283 cache: Box::new(ClientCache {
284 session,
285 type_info: Mutex::new(CachedTypeInfo {
286 typeinfo: None,
287 typeinfo_composite: None,
288 typeinfo_enum: None,
289 types: HashMap::default(),
290 }),
291 }),
292 }
293 }
294}
295
296impl ClientBorrowMut for Client {
297 #[inline]
298 fn borrow_cli_mut(&mut self) -> &mut Client {
299 self
300 }
301}
302
303impl ClientBorrow for Client {
304 #[inline]
305 fn borrow_cli_ref(&self) -> &Client {
306 self
307 }
308}
309
310impl Drop for Client {
311 fn drop(&mut self) {
312 // convert leaked statements to guarded statements.
313 // this is to cancel the statement on client go away.
314 let (type_info, typeinfo_composite, typeinfo_enum) = {
315 let cache = self.cache.type_info.get_mut().unwrap();
316 (
317 cache.typeinfo.take(),
318 cache.typeinfo_composite.take(),
319 cache.typeinfo_enum.take(),
320 )
321 };
322
323 if let Some(stmt) = type_info {
324 drop(stmt.into_guarded(&*self));
325 }
326
327 if let Some(stmt) = typeinfo_composite {
328 drop(stmt.into_guarded(&*self));
329 }
330
331 if let Some(stmt) = typeinfo_enum {
332 drop(stmt.into_guarded(&*self));
333 }
334 }
335}