Skip to main content

xitca_postgres/
client.rs

1use core::future::Future;
2
3use std::{collections::HashMap, sync::Mutex};
4
5use xitca_unsafe_collection::no_hash::NoHashBuilder;
6
7use super::{
8    copy::{CopyIn, CopyOut},
9    driver::{
10        DriverTx,
11        codec::{Response, encode::Encode, response::IntoResponse},
12    },
13    error::Error,
14    session::Session,
15    statement::Statement,
16    transaction::{Transaction, TransactionBuilder},
17    types::{Oid, Type},
18};
19
20/// a marker trait to confirm a mut reference of Client can be borrowed from self.
21///
22/// this is necessary for custom [Client] new types who want to utilize [Transaction] and [CopyIn].
23/// these types and their functions only work properly when [Client] is exclusively borrowed.
24///
25/// # Examples
26/// ```rust
27/// use std::sync::Arc;
28///
29/// use xitca_postgres::{dev::{ClientBorrow, ClientBorrowMut}, transaction::TransactionBuilder, Client, Error};
30///
31/// // a client wrapper use reference counted smart pointer.
32/// // it's easy to create multiple instance of &mut SharedClient with help of cloning of smart pointer
33/// // and none of them can be used correctly with Transaction nor CopyIn
34/// #[derive(Clone)]
35/// struct SharedClient(Arc<Client>);
36///
37/// // boilerplate impl required by ClientBorrowMut trait
38/// impl ClientBorrow for SharedClient {
39///     fn borrow_cli_ref(&self) -> &Client {
40///         &self.0
41///     }
42/// }
43///
44/// // client new type has to impl this trait to mark they can truly offer a mutable reference to Client
45/// impl ClientBorrowMut for SharedClient {
46///     fn borrow_cli_mut(&mut self) -> &mut Client {
47///         Arc::get_mut(&mut self.0).expect("you can't safely implement this trait with SharedClient.")
48///     }
49/// }
50///
51/// async fn borrow_mut(cli: Client) -> Result<(), Error> {
52///     let mut cli = SharedClient(Arc::new(cli));
53///
54///     // this line works fine as shared client is exclusivly borrowed by transaction
55///     let _ = TransactionBuilder::new().begin(&mut cli).await?;    
56///
57///     // clone shared client before starting a transaction.
58///     let _cli2 = cli.clone();
59///
60///     // this line will panic as shared client has another copy being held by cli2 varaiable
61///     let _ = TransactionBuilder::new().begin(&mut cli).await?;
62///
63///     Ok(())
64/// }
65/// ```
66///
67/// [Transaction]: crate::transaction::Transaction
68/// [CopyIn]: crate::copy::CopyIn
69pub trait ClientBorrowMut: ClientBorrow {
70    fn borrow_cli_mut(&mut self) -> &mut Client;
71}
72
73impl<T> ClientBorrowMut for &mut T
74where
75    T: ClientBorrowMut,
76{
77    #[inline]
78    fn borrow_cli_mut(&mut self) -> &mut Client {
79        T::borrow_cli_mut(*self)
80    }
81}
82
83/// super set of [`ClientBorrowMut`] trait
84pub trait ClientBorrow {
85    fn borrow_cli_ref(&self) -> &Client;
86}
87
88impl<T> ClientBorrow for &T
89where
90    T: ClientBorrow,
91{
92    #[inline]
93    fn borrow_cli_ref(&self) -> &Client {
94        T::borrow_cli_ref(*self)
95    }
96}
97
98impl<T> ClientBorrow for &mut T
99where
100    T: ClientBorrow,
101{
102    #[inline]
103    fn borrow_cli_ref(&self) -> &Client {
104        T::borrow_cli_ref(&**self)
105    }
106}
107/// Client is a handler type for [`Driver`]. it interacts with latter using channel and message for IO operation
108/// and de/encoding of postgres protocol in byte format.
109///
110/// Client expose a set of high level API to make the interaction represented in Rust function and types.
111///
112/// # Lifetime
113/// Client and [`Driver`] have a dependent lifetime where either side can trigger the other part to shutdown.
114/// From Client side it's in the form of dropping ownership.
115/// ## Examples
116/// ```
117/// # use core::future::IntoFuture;
118/// # use xitca_postgres::{error::Error, Config, Postgres};
119/// # async fn shut_down(cfg: Config) -> Result<(), Error> {
120/// // connect to a database and spawn driver as async task
121/// let (cli, drv) = Postgres::new(cfg).connect().await?;
122/// let handle = tokio::spawn(drv.into_future());
123///
124/// // drop client after finished usage
125/// drop(cli);
126///
127/// // client would notify driver to shutdown when it's dropped.
128/// // await on the handle would return a Result of the shutdown outcome from driver side.  
129/// let _ = handle.await.unwrap();
130///
131/// # Ok(())
132/// # }
133/// ```
134///
135/// [`Driver`]: crate::driver::Driver
136pub struct Client {
137    pub(crate) tx: DriverTx,
138    pub(crate) cache: Box<ClientCache>,
139}
140
141pub(crate) struct ClientCache {
142    session: Session,
143    type_info: Mutex<CachedTypeInfo>,
144}
145
146/// A cache of type info and prepared statements for fetching type info
147/// (corresponding to the queries in the [prepare](prepare) module).
148struct CachedTypeInfo {
149    /// A statement for basic information for a type from its
150    /// OID. Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_QUERY) (or its
151    /// fallback).
152    typeinfo: Option<Statement>,
153    /// A statement for getting information for a composite type from its OID.
154    /// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY).
155    typeinfo_composite: Option<Statement>,
156    /// A statement for getting information for a composite type from its OID.
157    /// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY) (or
158    /// its fallback).
159    typeinfo_enum: Option<Statement>,
160    /// Cache of types already looked up.
161    types: HashMap<Oid, Type, NoHashBuilder>,
162}
163
164impl Client {
165    /// start a transaction
166    #[inline]
167    pub fn transaction(&mut self) -> impl Future<Output = Result<Transaction<'_, Self>, Error>> + Send {
168        TransactionBuilder::new().begin(self)
169    }
170
171    /// owned version of [`Client::transaction`]
172    #[inline]
173    pub fn transaction_owned(self) -> impl Future<Output = Result<Transaction<'static, Self>, Error>> + Send {
174        TransactionBuilder::new().begin_owned(self)
175    }
176
177    /// Executes a `COPY FROM STDIN` statement, returning a sink used to write the copy data.
178    ///
179    /// PostgreSQL does not support parameters in `COPY` statements, so this method does not take any. The copy *must*
180    /// be explicitly completed via [`CopyIn::finish`]. If it is not, the copy will be aborted.
181    #[inline]
182    pub fn copy_in(&mut self, stmt: &Statement) -> impl Future<Output = Result<CopyIn<'_, Self>, Error>> + Send {
183        CopyIn::new(self, stmt)
184    }
185
186    /// Executes a `COPY TO STDOUT` statement, returning async stream of the resulting data.
187    ///
188    /// PostgreSQL does not support parameters in `COPY` statements, so this method does not take any.
189    #[inline]
190    pub async fn copy_out(&self, stmt: &Statement) -> Result<CopyOut, Error> {
191        CopyOut::new(self, stmt).await
192    }
193
194    /// Constructs a cancellation token that can later be used to request cancellation of a query running on the
195    /// connection associated with this client.
196    pub fn cancel_token(&self) -> Session {
197        Session::clone(&self.cache.session)
198    }
199
200    /// a lossy hint of running state of io driver. an io driver shutdown can happen at the same time this api is called.
201    pub fn closed(&self) -> bool {
202        self.tx.is_closed()
203    }
204
205    pub fn typeinfo(&self) -> Option<Statement> {
206        self.cache
207            .type_info
208            .lock()
209            .unwrap()
210            .typeinfo
211            .as_ref()
212            .map(Statement::duplicate)
213    }
214
215    pub fn set_typeinfo(&self, statement: &Statement) {
216        self.cache.type_info.lock().unwrap().typeinfo = Some(statement.duplicate());
217    }
218
219    pub fn typeinfo_composite(&self) -> Option<Statement> {
220        self.cache
221            .type_info
222            .lock()
223            .unwrap()
224            .typeinfo_composite
225            .as_ref()
226            .map(Statement::duplicate)
227    }
228
229    pub fn set_typeinfo_composite(&self, statement: &Statement) {
230        self.cache.type_info.lock().unwrap().typeinfo_composite = Some(statement.duplicate());
231    }
232
233    pub fn typeinfo_enum(&self) -> Option<Statement> {
234        self.cache
235            .type_info
236            .lock()
237            .unwrap()
238            .typeinfo_enum
239            .as_ref()
240            .map(Statement::duplicate)
241    }
242
243    pub fn set_typeinfo_enum(&self, statement: &Statement) {
244        self.cache.type_info.lock().unwrap().typeinfo_enum = Some(statement.duplicate());
245    }
246
247    pub fn type_(&self, oid: Oid) -> Option<Type> {
248        self.cache.type_info.lock().unwrap().types.get(&oid).cloned()
249    }
250
251    pub fn set_type(&self, oid: Oid, type_: &Type) {
252        self.cache.type_info.lock().unwrap().types.insert(oid, type_.clone());
253    }
254
255    /// Clears the client's type information cache.
256    ///
257    /// When user-defined types are used in a query, the client loads their definitions from the database and caches
258    /// them for the lifetime of the client. If those definitions are changed in the database, this method can be used
259    /// to flush the local cache and allow the new, updated definitions to be loaded.
260    pub fn clear_type_cache(&self) {
261        self.cache.type_info.lock().unwrap().types.clear();
262    }
263
264    #[inline]
265    pub(crate) fn query<S>(&self, stmt: S) -> Result<<S::Output as IntoResponse>::Response, Error>
266    where
267        S: Encode,
268    {
269        self.query_raw(stmt).map(|(opt, res)| opt.into_response(res))
270    }
271
272    #[inline]
273    pub(crate) fn query_raw<S>(&self, stmt: S) -> Result<(S::Output, Response), Error>
274    where
275        S: Encode,
276    {
277        self.tx.send(|buf| stmt.encode(buf))
278    }
279
280    pub(crate) fn new(tx: DriverTx, session: Session) -> Self {
281        Self {
282            tx,
283            cache: Box::new(ClientCache {
284                session,
285                type_info: Mutex::new(CachedTypeInfo {
286                    typeinfo: None,
287                    typeinfo_composite: None,
288                    typeinfo_enum: None,
289                    types: HashMap::default(),
290                }),
291            }),
292        }
293    }
294}
295
296impl ClientBorrowMut for Client {
297    #[inline]
298    fn borrow_cli_mut(&mut self) -> &mut Client {
299        self
300    }
301}
302
303impl ClientBorrow for Client {
304    #[inline]
305    fn borrow_cli_ref(&self) -> &Client {
306        self
307    }
308}
309
310impl Drop for Client {
311    fn drop(&mut self) {
312        // convert leaked statements to guarded statements.
313        // this is to cancel the statement on client go away.
314        let (type_info, typeinfo_composite, typeinfo_enum) = {
315            let cache = self.cache.type_info.get_mut().unwrap();
316            (
317                cache.typeinfo.take(),
318                cache.typeinfo_composite.take(),
319                cache.typeinfo_enum.take(),
320            )
321        };
322
323        if let Some(stmt) = type_info {
324            drop(stmt.into_guarded(&*self));
325        }
326
327        if let Some(stmt) = typeinfo_composite {
328            drop(stmt.into_guarded(&*self));
329        }
330
331        if let Some(stmt) = typeinfo_enum {
332            drop(stmt.into_guarded(&*self));
333        }
334    }
335}