xitca_postgres/client.rs
1use core::future::Future;
2
3use std::{
4 collections::HashMap,
5 sync::{Arc, Mutex},
6};
7
8use xitca_unsafe_collection::no_hash::NoHashBuilder;
9
10use super::{
11 copy::{CopyIn, CopyOut},
12 driver::{
13 DriverTx,
14 codec::{
15 Response,
16 encode::{self, Encode},
17 },
18 },
19 error::Error,
20 query::Query,
21 session::Session,
22 statement::Statement,
23 transaction::{Transaction, TransactionBuilder},
24 types::{Oid, Type},
25};
26
27/// a marker trait to confirm a mut reference of Client can be borrowed from self.
28///
29/// this is necessary for custom [Client] new types who want to utilize [Transaction] and [CopyIn].
30/// these types and their functions only work properly when [Client] is exclusively borrowed.
31///
32/// # Examples
33/// ```rust
34/// use std::sync::Arc;
35///
36/// use xitca_postgres::{dev::ClientBorrowMut, transaction::TransactionBuilder, Client, Error};
37/// # use xitca_postgres::dev::{Encode, Prepare, Query, Response};
38/// # use xitca_postgres::types::{Oid, Type};
39///
40/// // a client wrapper use reference counted smart pointer.
41/// // it's easy to create multiple instance of &mut SharedClient with help of cloning of smart pointer
42/// // and none of them can be used correctly with Transaction nor CopyIn
43/// #[derive(Clone)]
44/// struct SharedClient(Arc<Client>);
45///
46/// # impl Query for SharedClient {
47/// # fn _send_encode_query<S>(&self, stmt: S) -> Result<(S::Output, Response), Error>
48/// # where
49/// # S: Encode,
50/// # {
51/// # self.0._send_encode_query(stmt)
52/// # }
53/// # }
54/// # impl Prepare for SharedClient {
55/// # async fn _get_type(&self, oid: Oid) -> Result<Type, Error> {
56/// # self.0._get_type(oid).await
57/// # }
58/// # fn _get_type_blocking(&self, oid: Oid) -> Result<Type, Error> {
59/// # self.0._get_type_blocking(oid)
60/// # }
61/// # }
62///
63/// // client new type has to impl this trait to mark they can truly offer a mutable reference to Client
64/// //
65/// impl ClientBorrowMut for SharedClient {
66/// fn _borrow_mut(&mut self) -> &mut Client {
67/// Arc::get_mut(&mut self.0).expect("you can't safely implement this trait with SharedClient.")
68/// }
69/// }
70///
71/// async fn borrow_mut(cli: Client) -> Result<(), Error> {
72/// let mut cli = SharedClient(Arc::new(cli));
73///
74/// // this line works fine as shared client is exclusivly borrowed by transaction
75/// let _ = TransactionBuilder::new().begin(&mut cli).await?;
76///
77/// // clone shared client before starting a transaction.
78/// let _cli2 = cli.clone();
79///
80/// // this line will panic as shared client has another copy being held by cli2 varaiable
81/// let _ = TransactionBuilder::new().begin(&mut cli).await?;
82///
83/// Ok(())
84/// }
85/// ```
86///
87/// [Transaction]: crate::transaction::Transaction
88/// [CopyIn]: crate::copy::CopyIn
89pub trait ClientBorrowMut {
90 fn _borrow_mut(&mut self) -> &mut Client;
91}
92
93impl<T> ClientBorrowMut for &mut T
94where
95 T: ClientBorrowMut,
96{
97 #[inline]
98 fn _borrow_mut(&mut self) -> &mut Client {
99 T::_borrow_mut(*self)
100 }
101}
102
103/// Client is a handler type for [`Driver`]. it interacts with latter using channel and message for IO operation
104/// and de/encoding of postgres protocol in byte format.
105///
106/// Client expose a set of high level API to make the interaction represented in Rust function and types.
107///
108/// # Lifetime
109/// Client and [`Driver`] have a dependent lifetime where either side can trigger the other part to shutdown.
110/// From Client side it's in the form of dropping ownership.
111/// ## Examples
112/// ```
113/// # use core::future::IntoFuture;
114/// # use xitca_postgres::{error::Error, Config, Postgres};
115/// # async fn shut_down(cfg: Config) -> Result<(), Error> {
116/// // connect to a database and spawn driver as async task
117/// let (cli, drv) = Postgres::new(cfg).connect().await?;
118/// let handle = tokio::spawn(drv.into_future());
119///
120/// // drop client after finished usage
121/// drop(cli);
122///
123/// // client would notify driver to shutdown when it's dropped.
124/// // await on the handle would return a Result of the shutdown outcome from driver side.
125/// let _ = handle.await.unwrap();
126///
127/// # Ok(())
128/// # }
129/// ```
130///
131/// [`Driver`]: crate::driver::Driver
132pub struct Client {
133 pub(crate) tx: DriverTx,
134 pub(crate) cache: Box<ClientCache>,
135}
136
137pub(crate) struct ClientCache {
138 session: Session,
139 type_info: Mutex<CachedTypeInfo>,
140}
141
142/// A cache of type info and prepared statements for fetching type info
143/// (corresponding to the queries in the [prepare](prepare) module).
144struct CachedTypeInfo {
145 /// A statement for basic information for a type from its
146 /// OID. Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_QUERY) (or its
147 /// fallback).
148 typeinfo: Option<Statement>,
149 /// A statement for getting information for a composite type from its OID.
150 /// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY).
151 typeinfo_composite: Option<Statement>,
152 /// A statement for getting information for a composite type from its OID.
153 /// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY) (or
154 /// its fallback).
155 typeinfo_enum: Option<Statement>,
156 /// Cache of types already looked up.
157 types: HashMap<Oid, Type, NoHashBuilder>,
158}
159
160impl Client {
161 /// start a transaction
162 #[inline]
163 pub fn transaction(&mut self) -> impl Future<Output = Result<Transaction<&mut Self>, Error>> + Send {
164 TransactionBuilder::new().begin(self)
165 }
166
167 /// owned version of [`Client::transaction`]
168 #[inline]
169 pub fn transaction_owned(self) -> impl Future<Output = Result<Transaction<Self>, Error>> + Send {
170 TransactionBuilder::new().begin(self)
171 }
172
173 /// Executes a `COPY FROM STDIN` statement, returning a sink used to write the copy data.
174 ///
175 /// PostgreSQL does not support parameters in `COPY` statements, so this method does not take any. The copy *must*
176 /// be explicitly completed via [`CopyIn::finish`]. If it is not, the copy will be aborted.
177 #[inline]
178 pub fn copy_in(&mut self, stmt: &Statement) -> impl Future<Output = Result<CopyIn<'_, Self>, Error>> + Send {
179 CopyIn::new(self, stmt)
180 }
181
182 /// Executes a `COPY TO STDOUT` statement, returning async stream of the resulting data.
183 ///
184 /// PostgreSQL does not support parameters in `COPY` statements, so this method does not take any.
185 #[inline]
186 pub async fn copy_out(&self, stmt: &Statement) -> Result<CopyOut, Error> {
187 CopyOut::new(self, stmt).await
188 }
189
190 /// Constructs a cancellation token that can later be used to request cancellation of a query running on the
191 /// connection associated with this client.
192 pub fn cancel_token(&self) -> Session {
193 Session::clone(&self.cache.session)
194 }
195
196 /// a lossy hint of running state of io driver. an io driver shutdown can happen at the same time this api is called.
197 pub fn closed(&self) -> bool {
198 self.tx.is_closed()
199 }
200
201 pub fn typeinfo(&self) -> Option<Statement> {
202 self.cache
203 .type_info
204 .lock()
205 .unwrap()
206 .typeinfo
207 .as_ref()
208 .map(Statement::duplicate)
209 }
210
211 pub fn set_typeinfo(&self, statement: &Statement) {
212 self.cache.type_info.lock().unwrap().typeinfo = Some(statement.duplicate());
213 }
214
215 pub fn typeinfo_composite(&self) -> Option<Statement> {
216 self.cache
217 .type_info
218 .lock()
219 .unwrap()
220 .typeinfo_composite
221 .as_ref()
222 .map(Statement::duplicate)
223 }
224
225 pub fn set_typeinfo_composite(&self, statement: &Statement) {
226 self.cache.type_info.lock().unwrap().typeinfo_composite = Some(statement.duplicate());
227 }
228
229 pub fn typeinfo_enum(&self) -> Option<Statement> {
230 self.cache
231 .type_info
232 .lock()
233 .unwrap()
234 .typeinfo_enum
235 .as_ref()
236 .map(Statement::duplicate)
237 }
238
239 pub fn set_typeinfo_enum(&self, statement: &Statement) {
240 self.cache.type_info.lock().unwrap().typeinfo_enum = Some(statement.duplicate());
241 }
242
243 pub fn type_(&self, oid: Oid) -> Option<Type> {
244 self.cache.type_info.lock().unwrap().types.get(&oid).cloned()
245 }
246
247 pub fn set_type(&self, oid: Oid, type_: &Type) {
248 self.cache.type_info.lock().unwrap().types.insert(oid, type_.clone());
249 }
250
251 /// Clears the client's type information cache.
252 ///
253 /// When user-defined types are used in a query, the client loads their definitions from the database and caches
254 /// them for the lifetime of the client. If those definitions are changed in the database, this method can be used
255 /// to flush the local cache and allow the new, updated definitions to be loaded.
256 pub fn clear_type_cache(&self) {
257 self.cache.type_info.lock().unwrap().types.clear();
258 }
259
260 pub(crate) fn new(tx: DriverTx, session: Session) -> Self {
261 Self {
262 tx,
263 cache: Box::new(ClientCache {
264 session,
265 type_info: Mutex::new(CachedTypeInfo {
266 typeinfo: None,
267 typeinfo_composite: None,
268 typeinfo_enum: None,
269 types: HashMap::default(),
270 }),
271 }),
272 }
273 }
274}
275
276impl ClientBorrowMut for Client {
277 #[inline]
278 fn _borrow_mut(&mut self) -> &mut Client {
279 self
280 }
281}
282
283impl Query for Client {
284 #[inline]
285 fn _send_encode_query<S>(&self, stmt: S) -> Result<(S::Output, Response), Error>
286 where
287 S: Encode,
288 {
289 encode::send_encode_query(&self.tx, stmt)
290 }
291}
292
293impl Query for Arc<Client> {
294 #[inline]
295 fn _send_encode_query<S>(&self, stmt: S) -> Result<(S::Output, Response), Error>
296 where
297 S: Encode,
298 {
299 Client::_send_encode_query(&**self, stmt)
300 }
301}
302
303impl Drop for Client {
304 fn drop(&mut self) {
305 // convert leaked statements to guarded statements.
306 // this is to cancel the statement on client go away.
307 let (type_info, typeinfo_composite, typeinfo_enum) = {
308 let cache = self.cache.type_info.get_mut().unwrap();
309 (
310 cache.typeinfo.take(),
311 cache.typeinfo_composite.take(),
312 cache.typeinfo_enum.take(),
313 )
314 };
315
316 if let Some(stmt) = type_info {
317 drop(stmt.into_guarded(&*self));
318 }
319
320 if let Some(stmt) = typeinfo_composite {
321 drop(stmt.into_guarded(&*self));
322 }
323
324 if let Some(stmt) = typeinfo_enum {
325 drop(stmt.into_guarded(&*self));
326 }
327 }
328}