xitca_postgres/
client.rs

1use core::future::Future;
2
3use std::{
4    collections::HashMap,
5    sync::{Arc, Mutex},
6};
7
8use xitca_io::bytes::BytesMut;
9use xitca_unsafe_collection::no_hash::NoHashBuilder;
10
11use super::{
12    copy::{r#Copy, CopyIn, CopyOut},
13    driver::{
14        codec::{
15            encode::{self, Encode},
16            Response,
17        },
18        DriverTx,
19    },
20    error::Error,
21    prepare::Prepare,
22    query::Query,
23    session::Session,
24    statement::Statement,
25    transaction::Transaction,
26    types::{Oid, Type},
27};
28
29/// a marker trait to confirm a mut reference of Client can be borrowed from self.
30///
31/// this is necessary for custom [Client] new types who want to utilize [Transaction] and [CopyIn].
32/// these types and their functions only work properly when [Client] is exclusively borrowed.
33///
34/// # Examples
35/// ```rust
36/// use std::sync::Arc;
37///
38/// use xitca_postgres::{dev::ClientBorrowMut, Client};
39///
40/// // a client wrapper use reference counted smart pointer.
41/// // it's easy to create multiple instance of &mut SharedClient with help of cloning of smart pointer
42/// // and none of them can be used correctly with Transaction nor CopyIn
43/// #[derive(Clone)]
44/// struct SharedClient(Arc<Client>);
45///
46/// // client new type has to impl this trait to mark they can truly offer a mutable reference to Client
47/// impl ClientBorrowMut for SharedClient {
48///     fn _borrow_mut(&mut self) -> &mut Client {
49///         panic!("you can't safely implement this trait with SharedClient. and Transaction::new will cause a panic with it")
50///     }
51/// }
52///
53/// // another client wrapper without indirect
54/// struct ExclusiveClient(Client);
55///
56/// // trait can be implemented correctly. marking this new type can be accept by Transaction and CopyIn
57/// impl ClientBorrowMut for ExclusiveClient {
58///     fn _borrow_mut(&mut self) -> &mut Client {
59///         &mut self.0
60///     }
61/// }
62/// ```
63///
64/// [Transaction]: crate::transaction::Transaction
65/// [CopyIn]: crate::copy::CopyIn
66pub trait ClientBorrowMut {
67    fn _borrow_mut(&mut self) -> &mut Client;
68}
69
70/// Client is a handler type for [`Driver`]. it interacts with latter using channel and message for IO operation
71/// and de/encoding of postgres protocol in byte format.
72///
73/// Client expose a set of high level API to make the interaction represented in Rust function and types.
74///
75/// # Lifetime
76/// Client and [`Driver`] have a dependent lifetime where either side can trigger the other part to shutdown.
77/// From Client side it's in the form of dropping ownership.
78/// ## Examples
79/// ```
80/// # use core::future::IntoFuture;
81/// # use xitca_postgres::{error::Error, Config, Postgres};
82/// # async fn shut_down(cfg: Config) -> Result<(), Error> {
83/// // connect to a database and spawn driver as async task
84/// let (cli, drv) = Postgres::new(cfg).connect().await?;
85/// let handle = tokio::spawn(drv.into_future());
86///
87/// // drop client after finished usage
88/// drop(cli);
89///
90/// // client would notify driver to shutdown when it's dropped.
91/// // await on the handle would return a Result of the shutdown outcome from driver side.  
92/// let _ = handle.await.unwrap();
93///
94/// # Ok(())
95/// # }
96/// ```
97///
98/// [`Driver`]: crate::driver::Driver
99pub struct Client {
100    pub(crate) tx: DriverTx,
101    pub(crate) cache: Box<ClientCache>,
102}
103
104pub(crate) struct ClientCache {
105    session: Session,
106    type_info: Mutex<CachedTypeInfo>,
107}
108
109/// A cache of type info and prepared statements for fetching type info
110/// (corresponding to the queries in the [prepare](prepare) module).
111struct CachedTypeInfo {
112    /// A statement for basic information for a type from its
113    /// OID. Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_QUERY) (or its
114    /// fallback).
115    typeinfo: Option<Statement>,
116    /// A statement for getting information for a composite type from its OID.
117    /// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY).
118    typeinfo_composite: Option<Statement>,
119    /// A statement for getting information for a composite type from its OID.
120    /// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY) (or
121    /// its fallback).
122    typeinfo_enum: Option<Statement>,
123    /// Cache of types already looked up.
124    types: HashMap<Oid, Type, NoHashBuilder>,
125}
126
127impl Client {
128    /// start a transaction
129    #[inline]
130    pub fn transaction(&mut self) -> impl Future<Output = Result<Transaction<Self>, Error>> + Send {
131        Transaction::<Self>::builder().begin(self)
132    }
133
134    /// Executes a `COPY FROM STDIN` statement, returning a sink used to write the copy data.
135    ///
136    /// PostgreSQL does not support parameters in `COPY` statements, so this method does not take any. The copy *must*
137    /// be explicitly completed via [`CopyIn::finish`]. If it is not, the copy will be aborted.
138    #[inline]
139    pub fn copy_in(&mut self, stmt: &Statement) -> impl Future<Output = Result<CopyIn<Self>, Error>> + Send {
140        CopyIn::new(self, stmt)
141    }
142
143    /// Executes a `COPY TO STDOUT` statement, returning async stream of the resulting data.
144    ///
145    /// PostgreSQL does not support parameters in `COPY` statements, so this method does not take any.
146    #[inline]
147    pub async fn copy_out(&self, stmt: &Statement) -> Result<CopyOut, Error> {
148        CopyOut::new(self, stmt).await
149    }
150
151    /// Constructs a cancellation token that can later be used to request cancellation of a query running on the
152    /// connection associated with this client.
153    pub fn cancel_token(&self) -> Session {
154        Session::clone(&self.cache.session)
155    }
156
157    /// a lossy hint of running state of io driver. an io driver shutdown can happen
158    /// at the same time this api is called.
159    pub fn closed(&self) -> bool {
160        self.tx.is_closed()
161    }
162
163    pub fn typeinfo(&self) -> Option<Statement> {
164        self.cache
165            .type_info
166            .lock()
167            .unwrap()
168            .typeinfo
169            .as_ref()
170            .map(Statement::duplicate)
171    }
172
173    pub fn set_typeinfo(&self, statement: &Statement) {
174        self.cache.type_info.lock().unwrap().typeinfo = Some(statement.duplicate());
175    }
176
177    pub fn typeinfo_composite(&self) -> Option<Statement> {
178        self.cache
179            .type_info
180            .lock()
181            .unwrap()
182            .typeinfo_composite
183            .as_ref()
184            .map(Statement::duplicate)
185    }
186
187    pub fn set_typeinfo_composite(&self, statement: &Statement) {
188        self.cache.type_info.lock().unwrap().typeinfo_composite = Some(statement.duplicate());
189    }
190
191    pub fn typeinfo_enum(&self) -> Option<Statement> {
192        self.cache
193            .type_info
194            .lock()
195            .unwrap()
196            .typeinfo_enum
197            .as_ref()
198            .map(Statement::duplicate)
199    }
200
201    pub fn set_typeinfo_enum(&self, statement: &Statement) {
202        self.cache.type_info.lock().unwrap().typeinfo_enum = Some(statement.duplicate());
203    }
204
205    pub fn type_(&self, oid: Oid) -> Option<Type> {
206        self.cache.type_info.lock().unwrap().types.get(&oid).cloned()
207    }
208
209    pub fn set_type(&self, oid: Oid, type_: &Type) {
210        self.cache.type_info.lock().unwrap().types.insert(oid, type_.clone());
211    }
212
213    /// Clears the client's type information cache.
214    ///
215    /// When user-defined types are used in a query, the client loads their definitions from the database and caches
216    /// them for the lifetime of the client. If those definitions are changed in the database, this method can be used
217    /// to flush the local cache and allow the new, updated definitions to be loaded.
218    pub fn clear_type_cache(&self) {
219        self.cache.type_info.lock().unwrap().types.clear();
220    }
221
222    pub(crate) fn new(tx: DriverTx, session: Session) -> Self {
223        Self {
224            tx,
225            cache: Box::new(ClientCache {
226                session,
227                type_info: Mutex::new(CachedTypeInfo {
228                    typeinfo: None,
229                    typeinfo_composite: None,
230                    typeinfo_enum: None,
231                    types: HashMap::default(),
232                }),
233            }),
234        }
235    }
236}
237
238impl ClientBorrowMut for Client {
239    #[inline]
240    fn _borrow_mut(&mut self) -> &mut Client {
241        self
242    }
243}
244
245impl Prepare for Arc<Client> {
246    #[inline]
247    fn _get_type(&self, oid: Oid) -> crate::BoxedFuture<'_, Result<Type, Error>> {
248        Client::_get_type(self, oid)
249    }
250
251    #[inline]
252    fn _get_type_blocking(&self, oid: Oid) -> Result<Type, Error> {
253        Client::_get_type_blocking(self, oid)
254    }
255}
256
257impl Query for Arc<Client> {
258    #[inline]
259    fn _send_encode_query<S>(&self, stmt: S) -> Result<(S::Output, Response), Error>
260    where
261        S: Encode,
262    {
263        Client::_send_encode_query(self, stmt)
264    }
265}
266
267impl Query for Client {
268    #[inline]
269    fn _send_encode_query<S>(&self, stmt: S) -> Result<(S::Output, Response), Error>
270    where
271        S: Encode,
272    {
273        encode::send_encode_query(&self.tx, stmt)
274    }
275}
276
277impl r#Copy for Client {
278    #[inline]
279    fn send_one_way<F>(&self, func: F) -> Result<(), Error>
280    where
281        F: FnOnce(&mut BytesMut) -> Result<(), Error>,
282    {
283        self.tx.send_one_way(func)
284    }
285}
286
287impl Drop for Client {
288    fn drop(&mut self) {
289        // convert leaked statements to guarded statements.
290        // this is to cancel the statement on client go away.
291        let (type_info, typeinfo_composite, typeinfo_enum) = {
292            let cache = self.cache.type_info.get_mut().unwrap();
293            (
294                cache.typeinfo.take(),
295                cache.typeinfo_composite.take(),
296                cache.typeinfo_enum.take(),
297            )
298        };
299
300        if let Some(stmt) = type_info {
301            drop(stmt.into_guarded(&*self));
302        }
303
304        if let Some(stmt) = typeinfo_composite {
305            drop(stmt.into_guarded(&*self));
306        }
307
308        if let Some(stmt) = typeinfo_enum {
309            drop(stmt.into_guarded(&*self));
310        }
311    }
312}