Skip to main content

xitca_postgres/
statement.rs

1//! Statement module is mostly copy/paste from `tokio_postgres::statement`
2
3use core::{ops::Deref, sync::atomic::Ordering};
4
5use std::sync::Arc;
6
7use super::{
8    client::ClientBorrow,
9    column::Column,
10    driver::codec::AsParams,
11    types::{ToSql, Type},
12};
13
14/// a statement guard contains a prepared postgres statement.
15/// the guard can be dereferenced or borrowed as [`Statement`] which can be used for query apis.
16///
17/// the guard would cancel it's statement when dropped. generic C type must be a client type impl
18/// [`ClientBorrow`] trait to instruct the cancellation.
19pub struct StatementGuarded<'a, C>
20where
21    C: ClientBorrow,
22{
23    stmt: Option<Statement>,
24    cli: &'a C,
25}
26
27impl<C> AsRef<Statement> for StatementGuarded<'_, C>
28where
29    C: ClientBorrow,
30{
31    #[inline]
32    fn as_ref(&self) -> &Statement {
33        self
34    }
35}
36
37impl<C> Deref for StatementGuarded<'_, C>
38where
39    C: ClientBorrow,
40{
41    type Target = Statement;
42
43    fn deref(&self) -> &Self::Target {
44        self.stmt.as_ref().unwrap()
45    }
46}
47
48impl<C> Drop for StatementGuarded<'_, C>
49where
50    C: ClientBorrow,
51{
52    fn drop(&mut self) {
53        if let Some(stmt) = self.stmt.take() {
54            let _ = self.cli.borrow_cli_ref().query_raw(stmt.cancel());
55        }
56    }
57}
58
59impl<C> StatementGuarded<'_, C>
60where
61    C: ClientBorrow,
62{
63    /// leak the statement and it will lose automatic management
64    /// **DOES NOT** cause memory leak
65    pub fn leak(mut self) -> Statement {
66        self.stmt.take().unwrap()
67    }
68}
69
70/// named prepared postgres statement without information of which [`Client`] it belongs to and lifetime
71/// cycle management
72///
73/// this type is used as entry point for other statement types like [`StatementGuarded`] and [`CachedStatement`].
74/// itself is rarely directly used and main direct usage is for statement caching where owner of it is tasked
75/// with manual management of it's association and lifetime
76///
77/// [`Client`]: crate::client::Client
78/// [`CachedStatement`]: crate::pool::CachedStatement
79// Statement must not implement Clone trait. use `Statement::duplicate` if needed.
80// StatementGuarded impls Deref trait and with Clone trait it will be possible to copy Statement out of a
81// StatementGuarded. This is not a desired behavior and obtaining a Statement from it's guard should only
82// be possible with StatementGuarded::leak API.
83#[derive(Default)]
84pub struct Statement {
85    name: Arc<str>,
86    params: Arc<[Type]>,
87    columns: Arc<[Column]>,
88}
89
90impl Statement {
91    pub(crate) fn new(name: String, params: Vec<Type>, columns: Vec<Column>) -> Self {
92        Self {
93            name: name.into(),
94            params: params.into(),
95            columns: columns.into(),
96        }
97    }
98
99    // cloning of statement inside library must be carefully utlized to keep cancelation happen properly on drop.
100    pub(crate) fn duplicate(&self) -> Self {
101        Self {
102            name: self.name.clone(),
103            params: self.params.clone(),
104            columns: self.columns.clone(),
105        }
106    }
107
108    pub(crate) fn name(&self) -> &str {
109        &self.name
110    }
111
112    pub(crate) fn columns_owned(&self) -> Arc<[Column]> {
113        self.columns.clone()
114    }
115
116    fn cancel(&self) -> StatementPreparedCancel<'_> {
117        StatementPreparedCancel { name: self.name() }
118    }
119
120    /// construct a new named statement.
121    /// can be called with [`Execute::execute`] method for making a prepared statement.
122    ///
123    /// [`Execute::execute`]: crate::execute::Execute::execute
124    #[inline]
125    pub const fn named<'a>(stmt: &'a str, types: &'a [Type]) -> StatementNamed<'a> {
126        StatementNamed { stmt, types }
127    }
128
129    /// bind self to typed value parameters where they are encoded into a valid sql query in binary format
130    ///
131    /// # Examples
132    /// ```
133    /// # use xitca_postgres::{types::Type, Client, Error, Execute, Statement};
134    /// # async fn bind(cli: Client) -> Result<(), Error> {
135    /// // prepare a statement with typed parameters.
136    /// let stmt = Statement::named("SELECT * FROM users WHERE id = $1 AND age = $2", &[Type::INT4, Type::INT4])
137    ///     .execute(&cli).await?;
138    /// // bind statement to typed value parameters and start query
139    /// let row_stream = stmt.bind([9527_i32, 18]).query(&cli).await?;
140    /// # Ok(())
141    /// # }
142    /// ```
143    #[inline]
144    pub fn bind<P>(&self, params: P) -> StatementPreparedQuery<'_, P>
145    where
146        P: AsParams,
147    {
148        StatementPreparedQuery { stmt: self, params }
149    }
150
151    /// [Statement::bind] for dynamic typed parameters
152    ///
153    /// # Examples
154    /// ```
155    /// # fn bind_dyn(statement: xitca_postgres::statement::Statement) {
156    /// // bind to a dynamic typed slice where items have it's own concrete type.
157    /// let bind = statement.bind_dyn(&[&9527i32, &"nobody"]);
158    /// # }
159    /// ```
160    #[inline]
161    pub fn bind_dyn<'p, 't>(
162        &self,
163        params: &'p [&'t (dyn ToSql + Sync)],
164    ) -> StatementPreparedQuery<'_, impl ExactSizeIterator<Item = &'t (dyn ToSql + Sync)> + Clone + 'p> {
165        self.bind(params.iter().cloned())
166    }
167
168    /// specialized binding api for zero sized parameters.
169    /// function the same as `Statement::bind([])`
170    #[inline]
171    pub fn bind_none(&self) -> StatementPreparedQuery<'_, [bool; 0]> {
172        self.bind([])
173    }
174
175    /// Returns the expected types of the statement's parameters.
176    #[inline]
177    pub fn params(&self) -> &[Type] {
178        &self.params
179    }
180
181    /// Returns information about the columns returned when the statement is queried.
182    #[inline]
183    pub fn columns(&self) -> &[Column] {
184        &self.columns
185    }
186
187    /// Convert self to a drop guarded statement which would cancel on drop.
188    #[inline]
189    pub fn into_guarded<C>(self, cli: &C) -> StatementGuarded<'_, C>
190    where
191        C: ClientBorrow,
192    {
193        StatementGuarded { stmt: Some(self), cli }
194    }
195}
196
197/// a named statement that can be prepared separately
198pub struct StatementNamed<'a> {
199    pub(crate) stmt: &'a str,
200    pub(crate) types: &'a [Type],
201}
202
203impl<'a> StatementNamed<'a> {
204    fn name() -> String {
205        let id = crate::NEXT_ID.fetch_add(1, Ordering::Relaxed);
206        format!("s{id}")
207    }
208
209    /// function the same as [`Statement::bind`]
210    #[inline]
211    pub fn bind<P>(self, params: P) -> StatementQuery<'a, P> {
212        StatementQuery {
213            stmt: self.stmt,
214            types: self.types,
215            params,
216        }
217    }
218
219    /// function the same as [`Statement::bind_dyn`]
220    #[inline]
221    pub fn bind_dyn<'p, 't>(
222        self,
223        params: &'p [&'t (dyn ToSql + Sync)],
224    ) -> StatementQuery<'a, impl ExactSizeIterator<Item = &'t (dyn ToSql + Sync)> + Clone + 'p> {
225        self.bind(params.iter().cloned())
226    }
227
228    /// function the same as [`Statement::bind_none`]
229    #[inline]
230    pub fn bind_none(self) -> StatementQuery<'a, [bool; 0]> {
231        StatementQuery {
232            stmt: self.stmt,
233            types: self.types,
234            params: [],
235        }
236    }
237}
238
239pub(crate) struct StatementCreate<'a, 'c, C> {
240    pub(crate) name: String,
241    pub(crate) stmt: &'a str,
242    pub(crate) types: &'a [Type],
243    pub(crate) cli: &'c C,
244}
245
246impl<'a, 'c, C> From<(StatementNamed<'a>, &'c C)> for StatementCreate<'a, 'c, C> {
247    fn from((stmt, cli): (StatementNamed<'a>, &'c C)) -> Self {
248        Self {
249            name: StatementNamed::name(),
250            stmt: stmt.stmt,
251            types: stmt.types,
252            cli,
253        }
254    }
255}
256
257pub(crate) struct StatementCreateBlocking<'a, 'c, C> {
258    pub(crate) name: String,
259    pub(crate) stmt: &'a str,
260    pub(crate) types: &'a [Type],
261    pub(crate) cli: &'c C,
262}
263
264impl<'a, 'c, C> From<(StatementNamed<'a>, &'c C)> for StatementCreateBlocking<'a, 'c, C> {
265    fn from((stmt, cli): (StatementNamed<'a>, &'c C)) -> Self {
266        Self {
267            name: StatementNamed::name(),
268            stmt: stmt.stmt,
269            types: stmt.types,
270            cli,
271        }
272    }
273}
274
275pub(crate) struct StatementPreparedCancel<'a> {
276    pub(crate) name: &'a str,
277}
278
279/// a named and already prepared statement with it's query params
280///
281/// after [`Execute::query`] by certain excutor it would produce [`RowStream`] as response
282///
283/// [`Execute::query`]: crate::execute::Execute::query
284/// [`RowStream`]: crate::query::RowStream
285pub struct StatementPreparedQuery<'a, P> {
286    pub(crate) stmt: &'a Statement,
287    pub(crate) params: P,
288}
289
290impl<'a, P> StatementPreparedQuery<'a, P> {
291    #[inline]
292    pub fn into_owned(self) -> StatementPreparedQueryOwned<'a, P> {
293        StatementPreparedQueryOwned {
294            stmt: self.stmt,
295            params: self.params,
296        }
297    }
298}
299
300/// owned version of [`StatementPreparedQuery`]
301///
302/// after [`Execute::query`] by certain excutor it would produce [`RowStreamOwned`] as response
303///
304/// [`Execute::query`]: crate::execute::Execute::query
305/// [`RowStreamOwned`]: crate::query::RowStreamOwned
306pub struct StatementPreparedQueryOwned<'a, P> {
307    pub(crate) stmt: &'a Statement,
308    pub(crate) params: P,
309}
310
311/// an unprepared statement with it's query params
312///
313/// Certain executor can make use of unprepared statement and offer addtional functionality
314/// # Examples
315/// ```rust
316/// # use xitca_postgres::{pool::Pool, types::Type, Execute, Statement};
317/// async fn execute_with_pool(pool: &Pool) {
318///     // connection pool can execute unprepared statement directly where statement preparing
319///     // execution and caching happens internally
320///     let rows = Statement::named("SELECT * FROM user WHERE id = $1", &[Type::INT4])
321///         .bind([9527])
322///         .query(pool)
323///         .await;
324/// }
325/// ```
326pub struct StatementQuery<'a, P> {
327    pub(crate) stmt: &'a str,
328    pub(crate) types: &'a [Type],
329    pub(crate) params: P,
330}
331
332impl<'a, P> StatementQuery<'a, P> {
333    /// transform self to a single use of statement query with given executor
334    ///
335    /// See [`StatementSingleRTTQuery`] for explaination
336    pub fn into_single_rtt(self) -> StatementSingleRTTQuery<'a, P> {
337        StatementSingleRTTQuery { query: self }
338    }
339}
340
341/// an unprepared statement with it's query params and reference of certain executor
342/// given executor is tasked with prepare and query with a single round-trip to database
343pub struct StatementSingleRTTQuery<'a, P> {
344    query: StatementQuery<'a, P>,
345}
346
347impl<'a, P> StatementSingleRTTQuery<'a, P> {
348    pub(crate) fn into_with_cli<'c, C>(self, cli: &'c C) -> StatementSingleRTTQueryWithCli<'a, 'c, P, C> {
349        StatementSingleRTTQueryWithCli { query: self.query, cli }
350    }
351}
352
353pub(crate) struct StatementSingleRTTQueryWithCli<'a, 'c, P, C> {
354    pub(crate) query: StatementQuery<'a, P>,
355    pub(crate) cli: &'c C,
356}
357
358/// functions the same as [`StatementGuarded`]
359///
360/// instead of work with a reference this guard offers ownership without named lifetime constraint
361pub struct StatementGuardedOwned<C>
362where
363    C: ClientBorrow,
364{
365    stmt: Statement,
366    cli: C,
367}
368
369impl<C> Clone for StatementGuardedOwned<C>
370where
371    C: ClientBorrow + Clone,
372{
373    fn clone(&self) -> Self {
374        Self {
375            stmt: self.stmt.duplicate(),
376            cli: self.cli.clone(),
377        }
378    }
379}
380
381impl<C> Drop for StatementGuardedOwned<C>
382where
383    C: ClientBorrow,
384{
385    fn drop(&mut self) {
386        // cancel statement when the last copy is about to be dropped.
387        if Arc::strong_count(&self.stmt.name) == 1 {
388            debug_assert_eq!(Arc::strong_count(&self.stmt.params), 1);
389            debug_assert_eq!(Arc::strong_count(&self.stmt.columns), 1);
390            let _ = self.cli.borrow_cli_ref().query_raw(self.stmt.cancel());
391        }
392    }
393}
394
395impl<C> Deref for StatementGuardedOwned<C>
396where
397    C: ClientBorrow,
398{
399    type Target = Statement;
400
401    fn deref(&self) -> &Self::Target {
402        &self.stmt
403    }
404}
405
406impl<C> AsRef<Statement> for StatementGuardedOwned<C>
407where
408    C: ClientBorrow,
409{
410    fn as_ref(&self) -> &Statement {
411        &self.stmt
412    }
413}
414
415impl<C> StatementGuardedOwned<C>
416where
417    C: ClientBorrow,
418{
419    /// construct a new statement guard with raw statement and client
420    pub fn new(stmt: Statement, cli: C) -> Self {
421        Self { stmt, cli }
422    }
423
424    /// obtain client reference from guarded statement
425    /// can be helpful in use case where clinet object is not cheaply avaiable
426    pub fn client(&self) -> &C {
427        &self.cli
428    }
429}
430
431#[cfg(test)]
432mod test {
433    use core::future::IntoFuture;
434
435    use crate::{
436        Postgres,
437        error::{DbError, SqlState},
438        execute::Execute,
439        iter::AsyncLendingIterator,
440        statement::Statement,
441    };
442
443    #[tokio::test]
444    async fn cancel_statement() {
445        let (cli, drv) = Postgres::new("postgres://postgres:postgres@localhost:5432")
446            .connect()
447            .await
448            .unwrap();
449
450        tokio::task::spawn(drv.into_future());
451
452        std::path::Path::new("./samples/test.sql").execute(&cli).await.unwrap();
453
454        let stmt = Statement::named("SELECT id, name FROM foo ORDER BY id", &[])
455            .execute(&cli)
456            .await
457            .unwrap();
458
459        let stmt_raw = stmt.duplicate();
460
461        drop(stmt);
462
463        let mut stream = stmt_raw.query(&cli).await.unwrap();
464
465        let e = stream.try_next().await.err().unwrap();
466
467        let e = e.downcast_ref::<DbError>().unwrap();
468
469        assert_eq!(e.code(), &SqlState::INVALID_SQL_STATEMENT_NAME);
470    }
471}