xitca_postgres/client.rs
1use core::future::Future;
2
3use std::{
4 collections::HashMap,
5 sync::{Arc, Mutex},
6};
7
8use xitca_io::bytes::BytesMut;
9use xitca_unsafe_collection::no_hash::NoHashBuilder;
10
11use super::{
12 copy::{r#Copy, CopyIn, CopyOut},
13 driver::{
14 codec::{
15 encode::{self, Encode},
16 Response,
17 },
18 DriverTx,
19 },
20 error::Error,
21 prepare::Prepare,
22 query::Query,
23 session::Session,
24 statement::Statement,
25 transaction::Transaction,
26 types::{Oid, Type},
27};
28
29/// a marker trait to confirm a mut reference of Client can be borrowed from self.
30///
31/// this is necessary for custom [Client] new types who want to utilize [Transaction] and [CopyIn].
32/// these types and their functions only work properly when [Client] is exclusively borrowed.
33///
34/// # Examples
35/// ```rust
36/// use std::sync::Arc;
37///
38/// use xitca_postgres::{dev::ClientBorrowMut, Client};
39///
40/// // a client wrapper use reference counted smart pointer.
41/// // it's easy to create multiple instance of &mut SharedClient with help of cloning of smart pointer
42/// // and none of them can be used correctly with Transaction nor CopyIn
43/// #[derive(Clone)]
44/// struct SharedClient(Arc<Client>);
45///
46/// // client new type has to impl this trait to mark they can truly offer a mutable reference to Client
47/// impl ClientBorrowMut for SharedClient {
48/// fn _borrow_mut(&mut self) -> &mut Client {
49/// panic!("you can't safely implement this trait with SharedClient. and Transaction::new will cause a panic with it")
50/// }
51/// }
52///
53/// // another client wrapper without indirect
54/// struct ExclusiveClient(Client);
55///
56/// // trait can be implemented correctly. marking this new type can be accept by Transaction and CopyIn
57/// impl ClientBorrowMut for ExclusiveClient {
58/// fn _borrow_mut(&mut self) -> &mut Client {
59/// &mut self.0
60/// }
61/// }
62/// ```
63///
64/// [Transaction]: crate::transaction::Transaction
65/// [CopyIn]: crate::copy::CopyIn
66pub trait ClientBorrowMut {
67 fn _borrow_mut(&mut self) -> &mut Client;
68}
69
70/// Client is a handler type for [`Driver`]. it interacts with latter using channel and message for IO operation
71/// and de/encoding of postgres protocol in byte format.
72///
73/// Client expose a set of high level API to make the interaction represented in Rust function and types.
74///
75/// # Lifetime
76/// Client and [`Driver`] have a dependent lifetime where either side can trigger the other part to shutdown.
77/// From Client side it's in the form of dropping ownership.
78/// ## Examples
79/// ```
80/// # use core::future::IntoFuture;
81/// # use xitca_postgres::{error::Error, Config, Postgres};
82/// # async fn shut_down(cfg: Config) -> Result<(), Error> {
83/// // connect to a database and spawn driver as async task
84/// let (cli, drv) = Postgres::new(cfg).connect().await?;
85/// let handle = tokio::spawn(drv.into_future());
86///
87/// // drop client after finished usage
88/// drop(cli);
89///
90/// // client would notify driver to shutdown when it's dropped.
91/// // await on the handle would return a Result of the shutdown outcome from driver side.
92/// let _ = handle.await.unwrap();
93///
94/// # Ok(())
95/// # }
96/// ```
97///
98/// [`Driver`]: crate::driver::Driver
99pub struct Client {
100 pub(crate) tx: DriverTx,
101 pub(crate) cache: Box<ClientCache>,
102}
103
104pub(crate) struct ClientCache {
105 session: Session,
106 type_info: Mutex<CachedTypeInfo>,
107}
108
109/// A cache of type info and prepared statements for fetching type info
110/// (corresponding to the queries in the [prepare](prepare) module).
111struct CachedTypeInfo {
112 /// A statement for basic information for a type from its
113 /// OID. Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_QUERY) (or its
114 /// fallback).
115 typeinfo: Option<Statement>,
116 /// A statement for getting information for a composite type from its OID.
117 /// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY).
118 typeinfo_composite: Option<Statement>,
119 /// A statement for getting information for a composite type from its OID.
120 /// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY) (or
121 /// its fallback).
122 typeinfo_enum: Option<Statement>,
123 /// Cache of types already looked up.
124 types: HashMap<Oid, Type, NoHashBuilder>,
125}
126
127impl Client {
128 /// start a transaction
129 #[inline]
130 pub fn transaction(&mut self) -> impl Future<Output = Result<Transaction<Self>, Error>> + Send {
131 Transaction::<Self>::builder().begin(self)
132 }
133
134 /// Executes a `COPY FROM STDIN` statement, returning a sink used to write the copy data.
135 ///
136 /// PostgreSQL does not support parameters in `COPY` statements, so this method does not take any. The copy *must*
137 /// be explicitly completed via [`CopyIn::finish`]. If it is not, the copy will be aborted.
138 #[inline]
139 pub fn copy_in(&mut self, stmt: &Statement) -> impl Future<Output = Result<CopyIn<Self>, Error>> + Send {
140 CopyIn::new(self, stmt)
141 }
142
143 /// Executes a `COPY TO STDOUT` statement, returning async stream of the resulting data.
144 ///
145 /// PostgreSQL does not support parameters in `COPY` statements, so this method does not take any.
146 #[inline]
147 pub async fn copy_out(&self, stmt: &Statement) -> Result<CopyOut, Error> {
148 CopyOut::new(self, stmt).await
149 }
150
151 /// Constructs a cancellation token that can later be used to request cancellation of a query running on the
152 /// connection associated with this client.
153 pub fn cancel_token(&self) -> Session {
154 Session::clone(&self.cache.session)
155 }
156
157 /// a lossy hint of running state of io driver. an io driver shutdown can happen
158 /// at the same time this api is called.
159 pub fn closed(&self) -> bool {
160 self.tx.is_closed()
161 }
162
163 pub fn typeinfo(&self) -> Option<Statement> {
164 self.cache
165 .type_info
166 .lock()
167 .unwrap()
168 .typeinfo
169 .as_ref()
170 .map(Statement::duplicate)
171 }
172
173 pub fn set_typeinfo(&self, statement: &Statement) {
174 self.cache.type_info.lock().unwrap().typeinfo = Some(statement.duplicate());
175 }
176
177 pub fn typeinfo_composite(&self) -> Option<Statement> {
178 self.cache
179 .type_info
180 .lock()
181 .unwrap()
182 .typeinfo_composite
183 .as_ref()
184 .map(Statement::duplicate)
185 }
186
187 pub fn set_typeinfo_composite(&self, statement: &Statement) {
188 self.cache.type_info.lock().unwrap().typeinfo_composite = Some(statement.duplicate());
189 }
190
191 pub fn typeinfo_enum(&self) -> Option<Statement> {
192 self.cache
193 .type_info
194 .lock()
195 .unwrap()
196 .typeinfo_enum
197 .as_ref()
198 .map(Statement::duplicate)
199 }
200
201 pub fn set_typeinfo_enum(&self, statement: &Statement) {
202 self.cache.type_info.lock().unwrap().typeinfo_enum = Some(statement.duplicate());
203 }
204
205 pub fn type_(&self, oid: Oid) -> Option<Type> {
206 self.cache.type_info.lock().unwrap().types.get(&oid).cloned()
207 }
208
209 pub fn set_type(&self, oid: Oid, type_: &Type) {
210 self.cache.type_info.lock().unwrap().types.insert(oid, type_.clone());
211 }
212
213 /// Clears the client's type information cache.
214 ///
215 /// When user-defined types are used in a query, the client loads their definitions from the database and caches
216 /// them for the lifetime of the client. If those definitions are changed in the database, this method can be used
217 /// to flush the local cache and allow the new, updated definitions to be loaded.
218 pub fn clear_type_cache(&self) {
219 self.cache.type_info.lock().unwrap().types.clear();
220 }
221
222 pub(crate) fn new(tx: DriverTx, session: Session) -> Self {
223 Self {
224 tx,
225 cache: Box::new(ClientCache {
226 session,
227 type_info: Mutex::new(CachedTypeInfo {
228 typeinfo: None,
229 typeinfo_composite: None,
230 typeinfo_enum: None,
231 types: HashMap::default(),
232 }),
233 }),
234 }
235 }
236}
237
238impl ClientBorrowMut for Client {
239 #[inline]
240 fn _borrow_mut(&mut self) -> &mut Client {
241 self
242 }
243}
244
245impl Prepare for Arc<Client> {
246 #[inline]
247 fn _get_type(&self, oid: Oid) -> crate::BoxedFuture<'_, Result<Type, Error>> {
248 Client::_get_type(self, oid)
249 }
250
251 #[inline]
252 fn _get_type_blocking(&self, oid: Oid) -> Result<Type, Error> {
253 Client::_get_type_blocking(self, oid)
254 }
255}
256
257impl Query for Arc<Client> {
258 #[inline]
259 fn _send_encode_query<S>(&self, stmt: S) -> Result<(S::Output, Response), Error>
260 where
261 S: Encode,
262 {
263 Client::_send_encode_query(self, stmt)
264 }
265}
266
267impl Query for Client {
268 #[inline]
269 fn _send_encode_query<S>(&self, stmt: S) -> Result<(S::Output, Response), Error>
270 where
271 S: Encode,
272 {
273 encode::send_encode_query(&self.tx, stmt)
274 }
275}
276
277impl r#Copy for Client {
278 #[inline]
279 fn send_one_way<F>(&self, func: F) -> Result<(), Error>
280 where
281 F: FnOnce(&mut BytesMut) -> Result<(), Error>,
282 {
283 self.tx.send_one_way(func)
284 }
285}
286
287impl Drop for Client {
288 fn drop(&mut self) {
289 // convert leaked statements to guarded statements.
290 // this is to cancel the statement on client go away.
291 let (type_info, typeinfo_composite, typeinfo_enum) = {
292 let cache = self.cache.type_info.get_mut().unwrap();
293 (
294 cache.typeinfo.take(),
295 cache.typeinfo_composite.take(),
296 cache.typeinfo_enum.take(),
297 )
298 };
299
300 if let Some(stmt) = type_info {
301 drop(stmt.into_guarded(&*self));
302 }
303
304 if let Some(stmt) = typeinfo_composite {
305 drop(stmt.into_guarded(&*self));
306 }
307
308 if let Some(stmt) = typeinfo_enum {
309 drop(stmt.into_guarded(&*self));
310 }
311 }
312}