Skip to main content

tokio_postgres/
client.rs

1#[cfg(feature = "runtime")]
2use crate::Socket;
3use crate::codec::{BackendMessages, FrontendMessage};
4use crate::config::{SslMode, SslNegotiation};
5use crate::connection::{Request, RequestMessages};
6use crate::copy_out::CopyOutStream;
7#[cfg(feature = "runtime")]
8use crate::keepalive::KeepaliveConfig;
9use crate::query::RowStream;
10use crate::simple_query::SimpleQueryStream;
11#[cfg(feature = "runtime")]
12use crate::tls::MakeTlsConnect;
13use crate::tls::TlsConnect;
14use crate::types::{Oid, ToSql, Type};
15use crate::{
16    CancelToken, CopyInSink, Error, Row, SimpleQueryMessage, Statement, ToStatement, Transaction,
17    TransactionBuilder, copy_in, copy_out, prepare, query, simple_query, slice_iter,
18};
19use bytes::{Buf, BytesMut};
20use fallible_iterator::FallibleIterator;
21use futures_channel::mpsc;
22use futures_util::{StreamExt, TryStreamExt};
23use parking_lot::Mutex;
24use postgres_protocol::message::backend::Message;
25use postgres_protocol::message::frontend;
26use postgres_types::{BorrowToSql, FromSqlOwned};
27use std::collections::HashMap;
28use std::fmt;
29use std::future;
30#[cfg(feature = "runtime")]
31use std::net::IpAddr;
32#[cfg(feature = "runtime")]
33use std::path::PathBuf;
34use std::pin::pin;
35use std::sync::Arc;
36use std::task::{Context, Poll, ready};
37#[cfg(feature = "runtime")]
38use std::time::Duration;
39use tokio::io::{AsyncRead, AsyncWrite};
40
41pub struct Responses {
42    receiver: mpsc::Receiver<BackendMessages>,
43    cur: BackendMessages,
44}
45
46impl Responses {
47    pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Message, Error>> {
48        loop {
49            match self.cur.next().map_err(Error::parse)? {
50                Some(Message::ErrorResponse(body)) => return Poll::Ready(Err(Error::db(body))),
51                Some(message) => return Poll::Ready(Ok(message)),
52                None => {}
53            }
54
55            match ready!(self.receiver.poll_next_unpin(cx)) {
56                Some(messages) => self.cur = messages,
57                None => return Poll::Ready(Err(Error::closed())),
58            }
59        }
60    }
61
62    pub async fn next(&mut self) -> Result<Message, Error> {
63        future::poll_fn(|cx| self.poll_next(cx)).await
64    }
65}
66
67/// A cache of type info and prepared statements for fetching type info
68/// (corresponding to the queries in the [prepare](prepare) module).
69#[derive(Default)]
70struct CachedTypeInfo {
71    /// A statement for basic information for a type from its
72    /// OID. Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_QUERY) (or its
73    /// fallback).
74    typeinfo: Option<Statement>,
75    /// A statement for getting information for a composite type from its OID.
76    /// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY).
77    typeinfo_composite: Option<Statement>,
78    /// A statement for getting information for a composite type from its OID.
79    /// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY) (or
80    /// its fallback).
81    typeinfo_enum: Option<Statement>,
82
83    /// Cache of types already looked up.
84    types: HashMap<Oid, Type>,
85}
86
87pub struct InnerClient {
88    sender: mpsc::UnboundedSender<Request>,
89    cached_typeinfo: Mutex<CachedTypeInfo>,
90
91    /// A buffer to use when writing out postgres commands.
92    buffer: Mutex<BytesMut>,
93}
94
95impl InnerClient {
96    pub fn send(&self, messages: RequestMessages) -> Result<Responses, Error> {
97        let (sender, receiver) = mpsc::channel(1);
98        let request = Request { messages, sender };
99        self.sender
100            .unbounded_send(request)
101            .map_err(|_| Error::closed())?;
102
103        Ok(Responses {
104            receiver,
105            cur: BackendMessages::empty(),
106        })
107    }
108
109    pub fn typeinfo(&self) -> Option<Statement> {
110        self.cached_typeinfo.lock().typeinfo.clone()
111    }
112
113    pub fn set_typeinfo(&self, statement: &Statement) {
114        self.cached_typeinfo.lock().typeinfo = Some(statement.clone());
115    }
116
117    pub fn typeinfo_composite(&self) -> Option<Statement> {
118        self.cached_typeinfo.lock().typeinfo_composite.clone()
119    }
120
121    pub fn set_typeinfo_composite(&self, statement: &Statement) {
122        self.cached_typeinfo.lock().typeinfo_composite = Some(statement.clone());
123    }
124
125    pub fn typeinfo_enum(&self) -> Option<Statement> {
126        self.cached_typeinfo.lock().typeinfo_enum.clone()
127    }
128
129    pub fn set_typeinfo_enum(&self, statement: &Statement) {
130        self.cached_typeinfo.lock().typeinfo_enum = Some(statement.clone());
131    }
132
133    pub fn type_(&self, oid: Oid) -> Option<Type> {
134        self.cached_typeinfo.lock().types.get(&oid).cloned()
135    }
136
137    pub fn set_type(&self, oid: Oid, type_: &Type) {
138        self.cached_typeinfo.lock().types.insert(oid, type_.clone());
139    }
140
141    pub fn clear_type_cache(&self) {
142        self.cached_typeinfo.lock().types.clear();
143    }
144
145    /// Call the given function with a buffer to be used when writing out
146    /// postgres commands.
147    pub fn with_buf<F, R>(&self, f: F) -> R
148    where
149        F: FnOnce(&mut BytesMut) -> R,
150    {
151        let mut buffer = self.buffer.lock();
152        let r = f(&mut buffer);
153        buffer.clear();
154        r
155    }
156}
157
158#[cfg(feature = "runtime")]
159#[derive(Clone)]
160pub(crate) struct SocketConfig {
161    pub addr: Addr,
162    pub hostname: Option<String>,
163    pub port: u16,
164    pub connect_timeout: Option<Duration>,
165    pub tcp_user_timeout: Option<Duration>,
166    pub keepalive: Option<KeepaliveConfig>,
167}
168
169#[cfg(feature = "runtime")]
170#[derive(Clone)]
171pub(crate) enum Addr {
172    Tcp(IpAddr),
173    #[cfg(unix)]
174    Unix(PathBuf),
175}
176
177/// An asynchronous PostgreSQL client.
178///
179/// The client is one half of what is returned when a connection is established. Users interact with the database
180/// through this client object.
181pub struct Client {
182    inner: Arc<InnerClient>,
183    #[cfg(feature = "runtime")]
184    socket_config: Option<SocketConfig>,
185    ssl_mode: SslMode,
186    ssl_negotiation: SslNegotiation,
187    process_id: i32,
188    secret_key: i32,
189}
190
191impl Client {
192    pub(crate) fn new(
193        sender: mpsc::UnboundedSender<Request>,
194        ssl_mode: SslMode,
195        ssl_negotiation: SslNegotiation,
196        process_id: i32,
197        secret_key: i32,
198    ) -> Client {
199        Client {
200            inner: Arc::new(InnerClient {
201                sender,
202                cached_typeinfo: Default::default(),
203                buffer: Default::default(),
204            }),
205            #[cfg(feature = "runtime")]
206            socket_config: None,
207            ssl_mode,
208            ssl_negotiation,
209            process_id,
210            secret_key,
211        }
212    }
213
214    pub(crate) fn inner(&self) -> &Arc<InnerClient> {
215        &self.inner
216    }
217
218    #[cfg(feature = "runtime")]
219    pub(crate) fn set_socket_config(&mut self, socket_config: SocketConfig) {
220        self.socket_config = Some(socket_config);
221    }
222
223    /// Creates a new prepared statement.
224    ///
225    /// Prepared statements can be executed repeatedly, and may contain query parameters (indicated by `$1`, `$2`, etc),
226    /// which are set when executed. Prepared statements can only be used with the connection that created them.
227    pub async fn prepare(&self, query: &str) -> Result<Statement, Error> {
228        self.prepare_typed(query, &[]).await
229    }
230
231    /// Like `prepare`, but allows the types of query parameters to be explicitly specified.
232    ///
233    /// The list of types may be smaller than the number of parameters - the types of the remaining parameters will be
234    /// inferred. For example, `client.prepare_typed(query, &[])` is equivalent to `client.prepare(query)`.
235    pub async fn prepare_typed(
236        &self,
237        query: &str,
238        parameter_types: &[Type],
239    ) -> Result<Statement, Error> {
240        prepare::prepare(&self.inner, query, parameter_types).await
241    }
242
243    /// Executes a statement, returning a vector of the resulting rows.
244    ///
245    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
246    /// provided, 1-indexed.
247    ///
248    /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
249    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
250    /// with the `prepare` method.
251    pub async fn query<T>(
252        &self,
253        statement: &T,
254        params: &[&(dyn ToSql + Sync)],
255    ) -> Result<Vec<Row>, Error>
256    where
257        T: ?Sized + ToStatement,
258    {
259        self.query_raw(statement, slice_iter(params))
260            .await?
261            .try_collect()
262            .await
263    }
264
265    /// Returns a vector of scalars.
266    pub async fn query_scalar<R: FromSqlOwned, T>(
267        &self,
268        statement: &T,
269        params: &[&(dyn ToSql + Sync)],
270    ) -> Result<Vec<R>, Error>
271    where
272        T: ?Sized + ToStatement + fmt::Debug,
273    {
274        let rows: Vec<Row> = self
275            .query_raw(statement, slice_iter(params))
276            .await?
277            .try_collect()
278            .await?;
279
280        if let Some(row) = rows.first() {
281            if row.len() != 1 {
282                return Err(Error::column_count());
283            }
284        };
285
286        rows.into_iter().map(|r| r.try_get(0)).collect()
287    }
288
289    /// Executes a statement which returns a single row, returning it.
290    ///
291    /// Returns an error if the query does not return exactly one row.
292    ///
293    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
294    /// provided, 1-indexed.
295    ///
296    /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
297    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
298    /// with the `prepare` method.
299    pub async fn query_one<T>(
300        &self,
301        statement: &T,
302        params: &[&(dyn ToSql + Sync)],
303    ) -> Result<Row, Error>
304    where
305        T: ?Sized + ToStatement,
306    {
307        self.query_opt(statement, params)
308            .await
309            .and_then(|res| res.ok_or_else(Error::row_count))
310    }
311
312    /// Like [`Client::query_one`] but returns one scalar.
313    pub async fn query_one_scalar<R: FromSqlOwned, T>(
314        &self,
315        statement: &T,
316        params: &[&(dyn ToSql + Sync)],
317    ) -> Result<R, Error>
318    where
319        T: ?Sized + ToStatement + fmt::Debug,
320    {
321        let row = self.query_one(statement, params).await?;
322
323        if row.len() != 1 {
324            return Err(Error::column_count());
325        }
326
327        row.try_get(0)
328    }
329
330    /// Executes a statements which returns zero or one rows, returning it.
331    ///
332    /// Returns an error if the query returns more than one row.
333    ///
334    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
335    /// provided, 1-indexed.
336    ///
337    /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
338    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
339    /// with the `prepare` method.
340    pub async fn query_opt<T>(
341        &self,
342        statement: &T,
343        params: &[&(dyn ToSql + Sync)],
344    ) -> Result<Option<Row>, Error>
345    where
346        T: ?Sized + ToStatement,
347    {
348        let mut stream = pin!(self.query_raw(statement, slice_iter(params)).await?);
349
350        let mut first = None;
351
352        // Originally this was two calls to `try_next().await?`,
353        // once for the first element, and second to error if more than one.
354        //
355        // However, this new form with only one .await in a loop generates
356        // slightly smaller codegen/stack usage for the resulting future.
357        while let Some(row) = stream.try_next().await? {
358            if first.is_some() {
359                return Err(Error::row_count());
360            }
361
362            first = Some(row);
363        }
364
365        Ok(first)
366    }
367
368    /// Like [`Client::query_opt`] but returns an optional scalar.
369    pub async fn query_opt_scalar<R: FromSqlOwned, T>(
370        &self,
371        statement: &T,
372        params: &[&(dyn ToSql + Sync)],
373    ) -> Result<Option<R>, Error>
374    where
375        T: ?Sized + ToStatement + fmt::Debug,
376    {
377        let row = self.query_opt(statement, params).await?;
378
379        if let Some(row) = &row {
380            if row.len() != 1 {
381                return Err(Error::column_count());
382            }
383        }
384
385        row.map(|x| x.try_get::<_, R>(0)).transpose()
386    }
387
388    /// The maximally flexible version of [`query`].
389    ///
390    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
391    /// provided, 1-indexed.
392    ///
393    /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
394    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
395    /// with the `prepare` method.
396    ///
397    /// [`query`]: #method.query
398    ///
399    /// # Examples
400    ///
401    /// ```no_run
402    /// # async fn async_main(client: &tokio_postgres::Client) -> Result<(), tokio_postgres::Error> {
403    /// use std::pin::pin;
404    /// use futures_util::TryStreamExt;
405    ///
406    /// let params: Vec<String> = vec![
407    ///     "first param".into(),
408    ///     "second param".into(),
409    /// ];
410    /// let mut it = pin!(client.query_raw(
411    ///     "SELECT foo FROM bar WHERE biz = $1 AND baz = $2",
412    ///     params,
413    /// ).await?);
414    ///
415    /// while let Some(row) = it.try_next().await? {
416    ///     let foo: i32 = row.get("foo");
417    ///     println!("foo: {}", foo);
418    /// }
419    /// # Ok(())
420    /// # }
421    /// ```
422    pub async fn query_raw<T, P, I>(&self, statement: &T, params: I) -> Result<RowStream, Error>
423    where
424        T: ?Sized + ToStatement,
425        P: BorrowToSql,
426        I: IntoIterator<Item = P>,
427        I::IntoIter: ExactSizeIterator,
428    {
429        let statement = statement.__convert().into_statement(&self.inner).await?;
430        query::query(&self.inner, statement, params).await
431    }
432
433    /// Like `query`, but requires the types of query parameters to be explicitly specified.
434    ///
435    /// Compared to `query`, this method allows performing queries without three round trips (for
436    /// prepare, execute, and close) by requiring the caller to specify parameter values along with
437    /// their Postgres type. Thus, this is suitable in environments where prepared statements aren't
438    /// supported (such as Cloudflare Workers with Hyperdrive).
439    ///
440    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the
441    /// parameter of the list provided, 1-indexed.
442    pub async fn query_typed(
443        &self,
444        query: &str,
445        params: &[(&(dyn ToSql + Sync), Type)],
446    ) -> Result<Vec<Row>, Error> {
447        self.query_typed_raw(query, params.iter().map(|(v, t)| (*v, t.clone())))
448            .await?
449            .try_collect()
450            .await
451    }
452
453    /// Like `query_one`, but requires the types of query parameters to be explicitly specified.
454    ///
455    /// Compared to `query_one`, this method allows performing queries without three round trips (for
456    /// prepare, execute, and close) by requiring the caller to specify parameter values along with
457    /// their Postgres type. Thus, this is suitable in environments where prepared statements aren't
458    /// supported (such as Cloudflare Workers with Hyperdrive).
459    ///
460    /// Executes a statement which returns a single row, returning it.
461    ///
462    /// Returns an error if the query does not return exactly one row.
463    ///
464    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
465    /// provided, 1-indexed.
466    ///
467    pub async fn query_typed_one(
468        &self,
469        statement: &str,
470        params: &[(&(dyn ToSql + Sync), Type)],
471    ) -> Result<Row, Error> {
472        self.query_typed_opt(statement, params)
473            .await
474            .and_then(|res| res.ok_or_else(Error::row_count))
475    }
476
477    /// Like `query_one`, but requires the types of query parameters to be explicitly specified.
478    ///
479    /// Compared to `query_one`, this method allows performing queries without three round trips (for
480    /// prepare, execute, and close) by requiring the caller to specify parameter values along with
481    /// their Postgres type. Thus, this is suitable in environments where prepared statements aren't
482    /// supported (such as Cloudflare Workers with Hyperdrive).
483    ///
484    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the
485    /// parameter of the list provided, 1-indexed.
486    /// Executes a statements which returns zero or one rows, returning it.
487    ///
488    /// Returns an error if the query returns more than one row.
489    pub async fn query_typed_opt(
490        &self,
491        statement: &str,
492        params: &[(&(dyn ToSql + Sync), Type)],
493    ) -> Result<Option<Row>, Error> {
494        let mut stream = pin!(
495            self.query_typed_raw(statement, params.iter().map(|(v, t)| (*v, t.clone())))
496                .await?
497        );
498
499        let mut first = None;
500
501        // Originally this was two calls to `try_next().await?`,
502        // once for the first element, and second to error if more than one.
503        //
504        // However, this new form with only one .await in a loop generates
505        // slightly smaller codegen/stack usage for the resulting future.
506        while let Some(row) = stream.try_next().await? {
507            if first.is_some() {
508                return Err(Error::row_count());
509            }
510
511            first = Some(row);
512        }
513
514        Ok(first)
515    }
516
517    /// The maximally flexible version of [`query_typed`].
518    ///
519    /// Compared to `query`, this method allows performing queries without three round trips (for
520    /// prepare, execute, and close) by requiring the caller to specify parameter values along with
521    /// their Postgres type. Thus, this is suitable in environments where prepared statements aren't
522    /// supported (such as Cloudflare Workers with Hyperdrive).
523    ///
524    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the
525    /// parameter of the list provided, 1-indexed.
526    ///
527    /// [`query_typed`]: #method.query_typed
528    ///
529    /// # Examples
530    ///
531    /// ```no_run
532    /// # async fn async_main(client: &tokio_postgres::Client) -> Result<(), tokio_postgres::Error> {
533    /// use std::pin::pin;
534    /// use futures_util::{TryStreamExt};
535    /// use tokio_postgres::types::Type;
536    ///
537    /// let params: Vec<(String, Type)> = vec![
538    ///     ("first param".into(), Type::TEXT),
539    ///     ("second param".into(), Type::TEXT),
540    /// ];
541    /// let mut it = pin!(client.query_typed_raw(
542    ///     "SELECT foo FROM bar WHERE biz = $1 AND baz = $2",
543    ///     params,
544    /// ).await?);
545    ///
546    /// while let Some(row) = it.try_next().await? {
547    ///     let foo: i32 = row.get("foo");
548    ///     println!("foo: {}", foo);
549    /// }
550    /// # Ok(())
551    /// # }
552    /// ```
553    pub async fn query_typed_raw<P, I>(&self, query: &str, params: I) -> Result<RowStream, Error>
554    where
555        P: BorrowToSql,
556        I: IntoIterator<Item = (P, Type)>,
557    {
558        query::query_typed(&self.inner, query, params).await
559    }
560
561    /// Executes a statement, returning the number of rows modified.
562    ///
563    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
564    /// provided, 1-indexed.
565    ///
566    /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
567    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
568    /// with the `prepare` method.
569    ///
570    /// If the statement does not modify any rows (e.g. `SELECT`), 0 is returned.
571    pub async fn execute<T>(
572        &self,
573        statement: &T,
574        params: &[&(dyn ToSql + Sync)],
575    ) -> Result<u64, Error>
576    where
577        T: ?Sized + ToStatement,
578    {
579        self.execute_raw(statement, slice_iter(params)).await
580    }
581
582    /// Executes a statement, returning the number of rows modified.
583    ///
584    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
585    /// provided, 1-indexed.
586    ///
587    /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
588    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
589    /// with the `prepare` method.
590    ///
591    /// If the statement does not modify any rows (e.g. `SELECT`), 0 is returned.
592    pub async fn execute_typed(
593        &self,
594        statement: &str,
595        params: &[(&(dyn ToSql + Sync), Type)],
596    ) -> Result<u64, Error> {
597        query::execute_typed(
598            &self.inner,
599            statement,
600            params.iter().map(|(v, t)| (*v, t.clone())),
601        )
602        .await
603    }
604
605    /// The maximally flexible version of [`execute`].
606    ///
607    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
608    /// provided, 1-indexed.
609    ///
610    /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
611    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
612    /// with the `prepare` method.
613    ///
614    /// [`execute`]: #method.execute
615    pub async fn execute_raw<T, P, I>(&self, statement: &T, params: I) -> Result<u64, Error>
616    where
617        T: ?Sized + ToStatement,
618        P: BorrowToSql,
619        I: IntoIterator<Item = P>,
620        I::IntoIter: ExactSizeIterator,
621    {
622        let statement = statement.__convert().into_statement(&self.inner).await?;
623        query::execute(self.inner(), statement, params).await
624    }
625
626    /// Executes a `COPY FROM STDIN` statement, returning a sink used to write the copy data.
627    ///
628    /// PostgreSQL does not support parameters in `COPY` statements, so this method does not take any. The copy *must*
629    /// be explicitly completed via the `Sink::close` or `finish` methods. If it is not, the copy will be aborted.
630    pub async fn copy_in<T, U>(&self, statement: &T) -> Result<CopyInSink<U>, Error>
631    where
632        T: ?Sized + ToStatement,
633        U: Buf + 'static + Send,
634    {
635        let statement = statement.__convert().into_statement(&self.inner).await?;
636        copy_in::copy_in(self.inner(), statement).await
637    }
638
639    /// Executes a `COPY TO STDOUT` statement, returning a stream of the resulting data.
640    ///
641    /// PostgreSQL does not support parameters in `COPY` statements, so this method does not take any.
642    pub async fn copy_out<T>(&self, statement: &T) -> Result<CopyOutStream, Error>
643    where
644        T: ?Sized + ToStatement,
645    {
646        let statement = statement.__convert().into_statement(&self.inner).await?;
647        copy_out::copy_out(self.inner(), statement).await
648    }
649
650    /// Executes a sequence of SQL statements using the simple query protocol, returning the resulting rows.
651    ///
652    /// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that
653    /// point. The simple query protocol returns the values in rows as strings rather than in their binary encodings,
654    /// so the associated row type doesn't work with the `FromSql` trait. Rather than simply returning a list of the
655    /// rows, this method returns a list of an enum which indicates either the completion of one of the commands,
656    /// or a row of data. This preserves the framing between the separate statements in the request.
657    ///
658    /// # Warning
659    ///
660    /// Prepared statements should be use for any query which contains user-specified data, as they provided the
661    /// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
662    /// them to this method!
663    pub async fn simple_query(&self, query: &str) -> Result<Vec<SimpleQueryMessage>, Error> {
664        self.simple_query_raw(query).await?.try_collect().await
665    }
666
667    /// Executes a sequence of SQL statements using the simple query protocol, returning the resulting rows as a stream.
668    ///
669    /// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that
670    /// point. The simple query protocol returns the values in rows as strings rather than in their binary encodings,
671    /// so the associated row type doesn't work with the `FromSql` trait. Rather than simply returning a list of the
672    /// rows, this method returns a list of an enum which indicates either the completion of one of the commands,
673    /// or a row of data. This preserves the framing between the separate statements in the request.
674    ///
675    /// # Warning
676    ///
677    /// Prepared statements should be use for any query which contains user-specified data, as they provided the
678    /// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
679    /// them to this method!
680    pub async fn simple_query_raw(&self, query: &str) -> Result<SimpleQueryStream, Error> {
681        simple_query::simple_query(self.inner(), query).await
682    }
683
684    /// Executes a sequence of SQL statements using the simple query protocol.
685    ///
686    /// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that
687    /// point. This is intended for use when, for example, initializing a database schema.
688    ///
689    /// # Warning
690    ///
691    /// Prepared statements should be use for any query which contains user-specified data, as they provided the
692    /// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
693    /// them to this method!
694    pub async fn batch_execute(&self, query: &str) -> Result<(), Error> {
695        simple_query::batch_execute(self.inner(), query).await
696    }
697
698    /// Check that the connection is alive and wait for the confirmation.
699    pub async fn check_connection(&self) -> Result<(), Error> {
700        // sync is a very quick message to test the connection health.
701        query::sync(self.inner()).await
702    }
703
704    /// Begins a new database transaction.
705    ///
706    /// The transaction will roll back by default - use the `commit` method to commit it.
707    pub async fn transaction(&mut self) -> Result<Transaction<'_>, Error> {
708        self.build_transaction().start().await
709    }
710
711    /// Returns a builder for a transaction with custom settings.
712    ///
713    /// Unlike the `transaction` method, the builder can be used to control the transaction's isolation level and other
714    /// attributes.
715    pub fn build_transaction(&mut self) -> TransactionBuilder<'_> {
716        TransactionBuilder::new(self)
717    }
718
719    /// Constructs a cancellation token that can later be used to request cancellation of a query running on the
720    /// connection associated with this client.
721    pub fn cancel_token(&self) -> CancelToken {
722        CancelToken {
723            #[cfg(feature = "runtime")]
724            socket_config: self.socket_config.clone(),
725            ssl_mode: self.ssl_mode,
726            ssl_negotiation: self.ssl_negotiation,
727            process_id: self.process_id,
728            secret_key: self.secret_key,
729        }
730    }
731
732    /// Attempts to cancel an in-progress query.
733    ///
734    /// The server provides no information about whether a cancellation attempt was successful or not. An error will
735    /// only be returned if the client was unable to connect to the database.
736    ///
737    /// Requires the `runtime` Cargo feature (enabled by default).
738    #[cfg(feature = "runtime")]
739    #[deprecated(since = "0.6.0", note = "use Client::cancel_token() instead")]
740    pub async fn cancel_query<T>(&self, tls: T) -> Result<(), Error>
741    where
742        T: MakeTlsConnect<Socket>,
743    {
744        self.cancel_token().cancel_query(tls).await
745    }
746
747    /// Like `cancel_query`, but uses a stream which is already connected to the server rather than opening a new
748    /// connection itself.
749    #[deprecated(since = "0.6.0", note = "use Client::cancel_token() instead")]
750    pub async fn cancel_query_raw<S, T>(&self, stream: S, tls: T) -> Result<(), Error>
751    where
752        S: AsyncRead + AsyncWrite + Unpin,
753        T: TlsConnect<S>,
754    {
755        self.cancel_token().cancel_query_raw(stream, tls).await
756    }
757
758    /// Clears the client's type information cache.
759    ///
760    /// When user-defined types are used in a query, the client loads their definitions from the database and caches
761    /// them for the lifetime of the client. If those definitions are changed in the database, this method can be used
762    /// to flush the local cache and allow the new, updated definitions to be loaded.
763    pub fn clear_type_cache(&self) {
764        self.inner().clear_type_cache();
765    }
766
767    /// Determines if the connection to the server has already closed.
768    ///
769    /// In that case, all future queries will fail.
770    pub fn is_closed(&self) -> bool {
771        self.inner.sender.is_closed()
772    }
773
774    #[doc(hidden)]
775    pub fn __private_api_rollback(&self, name: Option<&str>) {
776        let buf = self.inner().with_buf(|buf| {
777            if let Some(name) = name {
778                frontend::query(&format!("ROLLBACK TO {}", name), buf).unwrap();
779            } else {
780                frontend::query("ROLLBACK", buf).unwrap();
781            }
782            buf.split().freeze()
783        });
784        let _ = self
785            .inner()
786            .send(RequestMessages::Single(FrontendMessage::Raw(buf)));
787    }
788
789    #[doc(hidden)]
790    pub fn __private_api_close(&mut self) {
791        self.inner.sender.close_channel()
792    }
793}
794
795impl fmt::Debug for Client {
796    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
797        f.debug_struct("Client").finish()
798    }
799}