xitca_postgres/
client.rs

1use core::future::Future;
2
3use std::{
4    collections::HashMap,
5    sync::{Arc, Mutex},
6};
7
8use xitca_unsafe_collection::no_hash::NoHashBuilder;
9
10use super::{
11    copy::{CopyIn, CopyOut},
12    driver::{
13        DriverTx,
14        codec::{
15            Response,
16            encode::{self, Encode},
17        },
18    },
19    error::Error,
20    query::Query,
21    session::Session,
22    statement::Statement,
23    transaction::{Transaction, TransactionBuilder},
24    types::{Oid, Type},
25};
26
27/// a marker trait to confirm a mut reference of Client can be borrowed from self.
28///
29/// this is necessary for custom [Client] new types who want to utilize [Transaction] and [CopyIn].
30/// these types and their functions only work properly when [Client] is exclusively borrowed.
31///
32/// # Examples
33/// ```rust
34/// use std::sync::Arc;
35///
36/// use xitca_postgres::{dev::ClientBorrowMut, transaction::TransactionBuilder, Client, Error};
37/// # use xitca_postgres::dev::{Encode, Prepare, Query, Response};
38/// # use xitca_postgres::types::{Oid, Type};
39///
40/// // a client wrapper use reference counted smart pointer.
41/// // it's easy to create multiple instance of &mut SharedClient with help of cloning of smart pointer
42/// // and none of them can be used correctly with Transaction nor CopyIn
43/// #[derive(Clone)]
44/// struct SharedClient(Arc<Client>);
45///
46/// # impl Query for SharedClient {
47/// #     fn _send_encode_query<S>(&self, stmt: S) -> Result<(S::Output, Response), Error>
48/// #     where
49/// #         S: Encode,
50/// #     {
51/// #         self.0._send_encode_query(stmt)
52/// #     }
53/// # }
54/// # impl Prepare for SharedClient {
55/// #     async fn _get_type(&self, oid: Oid) -> Result<Type, Error> {
56/// #         self.0._get_type(oid).await
57/// #     }
58/// #     fn _get_type_blocking(&self, oid: Oid) -> Result<Type, Error> {
59/// #         self.0._get_type_blocking(oid)
60/// #     }
61/// # }
62///
63/// // client new type has to impl this trait to mark they can truly offer a mutable reference to Client
64/// //
65/// impl ClientBorrowMut for SharedClient {
66///     fn _borrow_mut(&mut self) -> &mut Client {
67///         Arc::get_mut(&mut self.0).expect("you can't safely implement this trait with SharedClient.")
68///     }
69/// }
70///
71/// async fn borrow_mut(cli: Client) -> Result<(), Error> {
72///     let mut cli = SharedClient(Arc::new(cli));
73///
74///     // this line works fine as shared client is exclusivly borrowed by transaction
75///     let _ = TransactionBuilder::new().begin(&mut cli).await?;    
76///
77///     // clone shared client before starting a transaction.
78///     let _cli2 = cli.clone();
79///
80///     // this line will panic as shared client has another copy being held by cli2 varaiable
81///     let _ = TransactionBuilder::new().begin(&mut cli).await?;
82///
83///     Ok(())
84/// }
85/// ```
86///
87/// [Transaction]: crate::transaction::Transaction
88/// [CopyIn]: crate::copy::CopyIn
89pub trait ClientBorrowMut {
90    fn _borrow_mut(&mut self) -> &mut Client;
91}
92
93impl<T> ClientBorrowMut for &mut T
94where
95    T: ClientBorrowMut,
96{
97    #[inline]
98    fn _borrow_mut(&mut self) -> &mut Client {
99        T::_borrow_mut(*self)
100    }
101}
102
103/// Client is a handler type for [`Driver`]. it interacts with latter using channel and message for IO operation
104/// and de/encoding of postgres protocol in byte format.
105///
106/// Client expose a set of high level API to make the interaction represented in Rust function and types.
107///
108/// # Lifetime
109/// Client and [`Driver`] have a dependent lifetime where either side can trigger the other part to shutdown.
110/// From Client side it's in the form of dropping ownership.
111/// ## Examples
112/// ```
113/// # use core::future::IntoFuture;
114/// # use xitca_postgres::{error::Error, Config, Postgres};
115/// # async fn shut_down(cfg: Config) -> Result<(), Error> {
116/// // connect to a database and spawn driver as async task
117/// let (cli, drv) = Postgres::new(cfg).connect().await?;
118/// let handle = tokio::spawn(drv.into_future());
119///
120/// // drop client after finished usage
121/// drop(cli);
122///
123/// // client would notify driver to shutdown when it's dropped.
124/// // await on the handle would return a Result of the shutdown outcome from driver side.  
125/// let _ = handle.await.unwrap();
126///
127/// # Ok(())
128/// # }
129/// ```
130///
131/// [`Driver`]: crate::driver::Driver
132pub struct Client {
133    pub(crate) tx: DriverTx,
134    pub(crate) cache: Box<ClientCache>,
135}
136
137pub(crate) struct ClientCache {
138    session: Session,
139    type_info: Mutex<CachedTypeInfo>,
140}
141
142/// A cache of type info and prepared statements for fetching type info
143/// (corresponding to the queries in the [prepare](prepare) module).
144struct CachedTypeInfo {
145    /// A statement for basic information for a type from its
146    /// OID. Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_QUERY) (or its
147    /// fallback).
148    typeinfo: Option<Statement>,
149    /// A statement for getting information for a composite type from its OID.
150    /// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY).
151    typeinfo_composite: Option<Statement>,
152    /// A statement for getting information for a composite type from its OID.
153    /// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY) (or
154    /// its fallback).
155    typeinfo_enum: Option<Statement>,
156    /// Cache of types already looked up.
157    types: HashMap<Oid, Type, NoHashBuilder>,
158}
159
160impl Client {
161    /// start a transaction
162    #[inline]
163    pub fn transaction(&mut self) -> impl Future<Output = Result<Transaction<&mut Self>, Error>> + Send {
164        TransactionBuilder::new().begin(self)
165    }
166
167    /// owned version of [`Client::transaction`]
168    #[inline]
169    pub fn transaction_owned(self) -> impl Future<Output = Result<Transaction<Self>, Error>> + Send {
170        TransactionBuilder::new().begin(self)
171    }
172
173    /// Executes a `COPY FROM STDIN` statement, returning a sink used to write the copy data.
174    ///
175    /// PostgreSQL does not support parameters in `COPY` statements, so this method does not take any. The copy *must*
176    /// be explicitly completed via [`CopyIn::finish`]. If it is not, the copy will be aborted.
177    #[inline]
178    pub fn copy_in(&mut self, stmt: &Statement) -> impl Future<Output = Result<CopyIn<'_, Self>, Error>> + Send {
179        CopyIn::new(self, stmt)
180    }
181
182    /// Executes a `COPY TO STDOUT` statement, returning async stream of the resulting data.
183    ///
184    /// PostgreSQL does not support parameters in `COPY` statements, so this method does not take any.
185    #[inline]
186    pub async fn copy_out(&self, stmt: &Statement) -> Result<CopyOut, Error> {
187        CopyOut::new(self, stmt).await
188    }
189
190    /// Constructs a cancellation token that can later be used to request cancellation of a query running on the
191    /// connection associated with this client.
192    pub fn cancel_token(&self) -> Session {
193        Session::clone(&self.cache.session)
194    }
195
196    /// a lossy hint of running state of io driver. an io driver shutdown can happen at the same time this api is called.
197    pub fn closed(&self) -> bool {
198        self.tx.is_closed()
199    }
200
201    pub fn typeinfo(&self) -> Option<Statement> {
202        self.cache
203            .type_info
204            .lock()
205            .unwrap()
206            .typeinfo
207            .as_ref()
208            .map(Statement::duplicate)
209    }
210
211    pub fn set_typeinfo(&self, statement: &Statement) {
212        self.cache.type_info.lock().unwrap().typeinfo = Some(statement.duplicate());
213    }
214
215    pub fn typeinfo_composite(&self) -> Option<Statement> {
216        self.cache
217            .type_info
218            .lock()
219            .unwrap()
220            .typeinfo_composite
221            .as_ref()
222            .map(Statement::duplicate)
223    }
224
225    pub fn set_typeinfo_composite(&self, statement: &Statement) {
226        self.cache.type_info.lock().unwrap().typeinfo_composite = Some(statement.duplicate());
227    }
228
229    pub fn typeinfo_enum(&self) -> Option<Statement> {
230        self.cache
231            .type_info
232            .lock()
233            .unwrap()
234            .typeinfo_enum
235            .as_ref()
236            .map(Statement::duplicate)
237    }
238
239    pub fn set_typeinfo_enum(&self, statement: &Statement) {
240        self.cache.type_info.lock().unwrap().typeinfo_enum = Some(statement.duplicate());
241    }
242
243    pub fn type_(&self, oid: Oid) -> Option<Type> {
244        self.cache.type_info.lock().unwrap().types.get(&oid).cloned()
245    }
246
247    pub fn set_type(&self, oid: Oid, type_: &Type) {
248        self.cache.type_info.lock().unwrap().types.insert(oid, type_.clone());
249    }
250
251    /// Clears the client's type information cache.
252    ///
253    /// When user-defined types are used in a query, the client loads their definitions from the database and caches
254    /// them for the lifetime of the client. If those definitions are changed in the database, this method can be used
255    /// to flush the local cache and allow the new, updated definitions to be loaded.
256    pub fn clear_type_cache(&self) {
257        self.cache.type_info.lock().unwrap().types.clear();
258    }
259
260    pub(crate) fn new(tx: DriverTx, session: Session) -> Self {
261        Self {
262            tx,
263            cache: Box::new(ClientCache {
264                session,
265                type_info: Mutex::new(CachedTypeInfo {
266                    typeinfo: None,
267                    typeinfo_composite: None,
268                    typeinfo_enum: None,
269                    types: HashMap::default(),
270                }),
271            }),
272        }
273    }
274}
275
276impl ClientBorrowMut for Client {
277    #[inline]
278    fn _borrow_mut(&mut self) -> &mut Client {
279        self
280    }
281}
282
283impl Query for Client {
284    #[inline]
285    fn _send_encode_query<S>(&self, stmt: S) -> Result<(S::Output, Response), Error>
286    where
287        S: Encode,
288    {
289        encode::send_encode_query(&self.tx, stmt)
290    }
291}
292
293impl Query for Arc<Client> {
294    #[inline]
295    fn _send_encode_query<S>(&self, stmt: S) -> Result<(S::Output, Response), Error>
296    where
297        S: Encode,
298    {
299        Client::_send_encode_query(&**self, stmt)
300    }
301}
302
303impl Drop for Client {
304    fn drop(&mut self) {
305        // convert leaked statements to guarded statements.
306        // this is to cancel the statement on client go away.
307        let (type_info, typeinfo_composite, typeinfo_enum) = {
308            let cache = self.cache.type_info.get_mut().unwrap();
309            (
310                cache.typeinfo.take(),
311                cache.typeinfo_composite.take(),
312                cache.typeinfo_enum.take(),
313            )
314        };
315
316        if let Some(stmt) = type_info {
317            drop(stmt.into_guarded(&*self));
318        }
319
320        if let Some(stmt) = typeinfo_composite {
321            drop(stmt.into_guarded(&*self));
322        }
323
324        if let Some(stmt) = typeinfo_enum {
325            drop(stmt.into_guarded(&*self));
326        }
327    }
328}