1use core::{ops::Deref, sync::atomic::Ordering};
4
5use std::sync::Arc;
6
7use super::{
8 client::ClientBorrow,
9 column::Column,
10 driver::codec::AsParams,
11 types::{ToSql, Type},
12};
13
14pub struct StatementGuarded<'a, C>
20where
21 C: ClientBorrow,
22{
23 stmt: Option<Statement>,
24 cli: &'a C,
25}
26
27impl<C> AsRef<Statement> for StatementGuarded<'_, C>
28where
29 C: ClientBorrow,
30{
31 #[inline]
32 fn as_ref(&self) -> &Statement {
33 self
34 }
35}
36
37impl<C> Deref for StatementGuarded<'_, C>
38where
39 C: ClientBorrow,
40{
41 type Target = Statement;
42
43 fn deref(&self) -> &Self::Target {
44 self.stmt.as_ref().unwrap()
45 }
46}
47
48impl<C> Drop for StatementGuarded<'_, C>
49where
50 C: ClientBorrow,
51{
52 fn drop(&mut self) {
53 if let Some(stmt) = self.stmt.take() {
54 let _ = self.cli.borrow_cli_ref().query_raw(stmt.cancel());
55 }
56 }
57}
58
59impl<C> StatementGuarded<'_, C>
60where
61 C: ClientBorrow,
62{
63 pub fn leak(mut self) -> Statement {
66 self.stmt.take().unwrap()
67 }
68}
69
70#[derive(Default)]
84pub struct Statement {
85 name: Arc<str>,
86 params: Arc<[Type]>,
87 columns: Arc<[Column]>,
88}
89
90impl Statement {
91 pub(crate) fn new(name: String, params: Vec<Type>, columns: Vec<Column>) -> Self {
92 Self {
93 name: name.into(),
94 params: params.into(),
95 columns: columns.into(),
96 }
97 }
98
99 pub(crate) fn duplicate(&self) -> Self {
101 Self {
102 name: self.name.clone(),
103 params: self.params.clone(),
104 columns: self.columns.clone(),
105 }
106 }
107
108 pub(crate) fn name(&self) -> &str {
109 &self.name
110 }
111
112 pub(crate) fn columns_owned(&self) -> Arc<[Column]> {
113 self.columns.clone()
114 }
115
116 fn cancel(&self) -> StatementPreparedCancel<'_> {
117 StatementPreparedCancel { name: self.name() }
118 }
119
120 #[inline]
125 pub const fn named<'a>(stmt: &'a str, types: &'a [Type]) -> StatementNamed<'a> {
126 StatementNamed { stmt, types }
127 }
128
129 #[inline]
144 pub fn bind<P>(&self, params: P) -> StatementPreparedQuery<'_, P>
145 where
146 P: AsParams,
147 {
148 StatementPreparedQuery { stmt: self, params }
149 }
150
151 #[inline]
161 pub fn bind_dyn<'p, 't>(
162 &self,
163 params: &'p [&'t (dyn ToSql + Sync)],
164 ) -> StatementPreparedQuery<'_, impl ExactSizeIterator<Item = &'t (dyn ToSql + Sync)> + Clone + 'p> {
165 self.bind(params.iter().cloned())
166 }
167
168 #[inline]
171 pub fn bind_none(&self) -> StatementPreparedQuery<'_, [bool; 0]> {
172 self.bind([])
173 }
174
175 #[inline]
177 pub fn params(&self) -> &[Type] {
178 &self.params
179 }
180
181 #[inline]
183 pub fn columns(&self) -> &[Column] {
184 &self.columns
185 }
186
187 #[inline]
189 pub fn into_guarded<C>(self, cli: &C) -> StatementGuarded<'_, C>
190 where
191 C: ClientBorrow,
192 {
193 StatementGuarded { stmt: Some(self), cli }
194 }
195}
196
197pub struct StatementNamed<'a> {
199 pub(crate) stmt: &'a str,
200 pub(crate) types: &'a [Type],
201}
202
203impl<'a> StatementNamed<'a> {
204 fn name() -> String {
205 let id = crate::NEXT_ID.fetch_add(1, Ordering::Relaxed);
206 format!("s{id}")
207 }
208
209 #[inline]
211 pub fn bind<P>(self, params: P) -> StatementQuery<'a, P> {
212 StatementQuery {
213 stmt: self.stmt,
214 types: self.types,
215 params,
216 }
217 }
218
219 #[inline]
221 pub fn bind_dyn<'p, 't>(
222 self,
223 params: &'p [&'t (dyn ToSql + Sync)],
224 ) -> StatementQuery<'a, impl ExactSizeIterator<Item = &'t (dyn ToSql + Sync)> + Clone + 'p> {
225 self.bind(params.iter().cloned())
226 }
227
228 #[inline]
230 pub fn bind_none(self) -> StatementQuery<'a, [bool; 0]> {
231 StatementQuery {
232 stmt: self.stmt,
233 types: self.types,
234 params: [],
235 }
236 }
237}
238
239pub(crate) struct StatementCreate<'a, 'c, C> {
240 pub(crate) name: String,
241 pub(crate) stmt: &'a str,
242 pub(crate) types: &'a [Type],
243 pub(crate) cli: &'c C,
244}
245
246impl<'a, 'c, C> From<(StatementNamed<'a>, &'c C)> for StatementCreate<'a, 'c, C> {
247 fn from((stmt, cli): (StatementNamed<'a>, &'c C)) -> Self {
248 Self {
249 name: StatementNamed::name(),
250 stmt: stmt.stmt,
251 types: stmt.types,
252 cli,
253 }
254 }
255}
256
257pub(crate) struct StatementCreateBlocking<'a, 'c, C> {
258 pub(crate) name: String,
259 pub(crate) stmt: &'a str,
260 pub(crate) types: &'a [Type],
261 pub(crate) cli: &'c C,
262}
263
264impl<'a, 'c, C> From<(StatementNamed<'a>, &'c C)> for StatementCreateBlocking<'a, 'c, C> {
265 fn from((stmt, cli): (StatementNamed<'a>, &'c C)) -> Self {
266 Self {
267 name: StatementNamed::name(),
268 stmt: stmt.stmt,
269 types: stmt.types,
270 cli,
271 }
272 }
273}
274
275pub(crate) struct StatementPreparedCancel<'a> {
276 pub(crate) name: &'a str,
277}
278
279pub struct StatementPreparedQuery<'a, P> {
286 pub(crate) stmt: &'a Statement,
287 pub(crate) params: P,
288}
289
290impl<'a, P> StatementPreparedQuery<'a, P> {
291 #[inline]
292 pub fn into_owned(self) -> StatementPreparedQueryOwned<'a, P> {
293 StatementPreparedQueryOwned {
294 stmt: self.stmt,
295 params: self.params,
296 }
297 }
298}
299
300pub struct StatementPreparedQueryOwned<'a, P> {
307 pub(crate) stmt: &'a Statement,
308 pub(crate) params: P,
309}
310
311pub struct StatementQuery<'a, P> {
327 pub(crate) stmt: &'a str,
328 pub(crate) types: &'a [Type],
329 pub(crate) params: P,
330}
331
332impl<'a, P> StatementQuery<'a, P> {
333 pub fn into_single_rtt(self) -> StatementSingleRTTQuery<'a, P> {
337 StatementSingleRTTQuery { query: self }
338 }
339}
340
341pub struct StatementSingleRTTQuery<'a, P> {
344 query: StatementQuery<'a, P>,
345}
346
347impl<'a, P> StatementSingleRTTQuery<'a, P> {
348 pub(crate) fn into_with_cli<'c, C>(self, cli: &'c C) -> StatementSingleRTTQueryWithCli<'a, 'c, P, C> {
349 StatementSingleRTTQueryWithCli { query: self.query, cli }
350 }
351}
352
353pub(crate) struct StatementSingleRTTQueryWithCli<'a, 'c, P, C> {
354 pub(crate) query: StatementQuery<'a, P>,
355 pub(crate) cli: &'c C,
356}
357
358pub struct StatementGuardedOwned<C>
362where
363 C: ClientBorrow,
364{
365 stmt: Statement,
366 cli: C,
367}
368
369impl<C> Clone for StatementGuardedOwned<C>
370where
371 C: ClientBorrow + Clone,
372{
373 fn clone(&self) -> Self {
374 Self {
375 stmt: self.stmt.duplicate(),
376 cli: self.cli.clone(),
377 }
378 }
379}
380
381impl<C> Drop for StatementGuardedOwned<C>
382where
383 C: ClientBorrow,
384{
385 fn drop(&mut self) {
386 if Arc::strong_count(&self.stmt.name) == 1 {
388 debug_assert_eq!(Arc::strong_count(&self.stmt.params), 1);
389 debug_assert_eq!(Arc::strong_count(&self.stmt.columns), 1);
390 let _ = self.cli.borrow_cli_ref().query_raw(self.stmt.cancel());
391 }
392 }
393}
394
395impl<C> Deref for StatementGuardedOwned<C>
396where
397 C: ClientBorrow,
398{
399 type Target = Statement;
400
401 fn deref(&self) -> &Self::Target {
402 &self.stmt
403 }
404}
405
406impl<C> AsRef<Statement> for StatementGuardedOwned<C>
407where
408 C: ClientBorrow,
409{
410 fn as_ref(&self) -> &Statement {
411 &self.stmt
412 }
413}
414
415impl<C> StatementGuardedOwned<C>
416where
417 C: ClientBorrow,
418{
419 pub fn new(stmt: Statement, cli: C) -> Self {
421 Self { stmt, cli }
422 }
423
424 pub fn client(&self) -> &C {
427 &self.cli
428 }
429}
430
431#[cfg(test)]
432mod test {
433 use core::future::IntoFuture;
434
435 use crate::{
436 Postgres,
437 error::{DbError, SqlState},
438 execute::Execute,
439 iter::AsyncLendingIterator,
440 statement::Statement,
441 };
442
443 #[tokio::test]
444 async fn cancel_statement() {
445 let (cli, drv) = Postgres::new("postgres://postgres:postgres@localhost:5432")
446 .connect()
447 .await
448 .unwrap();
449
450 tokio::task::spawn(drv.into_future());
451
452 std::path::Path::new("./samples/test.sql").execute(&cli).await.unwrap();
453
454 let stmt = Statement::named("SELECT id, name FROM foo ORDER BY id", &[])
455 .execute(&cli)
456 .await
457 .unwrap();
458
459 let stmt_raw = stmt.duplicate();
460
461 drop(stmt);
462
463 let mut stream = stmt_raw.query(&cli).await.unwrap();
464
465 let e = stream.try_next().await.err().unwrap();
466
467 let e = e.downcast_ref::<DbError>().unwrap();
468
469 assert_eq!(e.code(), &SqlState::INVALID_SQL_STATEMENT_NAME);
470 }
471}