tokio_postgres/client.rs
1#[cfg(feature = "runtime")]
2use crate::Socket;
3use crate::codec::{BackendMessages, FrontendMessage};
4use crate::config::{SslMode, SslNegotiation};
5use crate::connection::{Request, RequestMessages};
6use crate::copy_out::CopyOutStream;
7#[cfg(feature = "runtime")]
8use crate::keepalive::KeepaliveConfig;
9use crate::query::RowStream;
10use crate::simple_query::SimpleQueryStream;
11#[cfg(feature = "runtime")]
12use crate::tls::MakeTlsConnect;
13use crate::tls::TlsConnect;
14use crate::types::{Oid, ToSql, Type};
15use crate::{
16 CancelToken, CopyInSink, Error, Row, SimpleQueryMessage, Statement, ToStatement, Transaction,
17 TransactionBuilder, copy_in, copy_out, prepare, query, simple_query, slice_iter,
18};
19use bytes::{Buf, BytesMut};
20use fallible_iterator::FallibleIterator;
21use futures_channel::mpsc;
22use futures_util::{StreamExt, TryStreamExt};
23use parking_lot::Mutex;
24use postgres_protocol::message::backend::Message;
25use postgres_protocol::message::frontend;
26use postgres_types::{BorrowToSql, FromSqlOwned};
27use std::collections::HashMap;
28use std::fmt;
29use std::future;
30#[cfg(feature = "runtime")]
31use std::net::IpAddr;
32#[cfg(feature = "runtime")]
33use std::path::PathBuf;
34use std::pin::pin;
35use std::sync::Arc;
36use std::task::{Context, Poll, ready};
37#[cfg(feature = "runtime")]
38use std::time::Duration;
39use tokio::io::{AsyncRead, AsyncWrite};
40
41pub struct Responses {
42 receiver: mpsc::Receiver<BackendMessages>,
43 cur: BackendMessages,
44}
45
46impl Responses {
47 pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Message, Error>> {
48 loop {
49 match self.cur.next().map_err(Error::parse)? {
50 Some(Message::ErrorResponse(body)) => return Poll::Ready(Err(Error::db(body))),
51 Some(message) => return Poll::Ready(Ok(message)),
52 None => {}
53 }
54
55 match ready!(self.receiver.poll_next_unpin(cx)) {
56 Some(messages) => self.cur = messages,
57 None => return Poll::Ready(Err(Error::closed())),
58 }
59 }
60 }
61
62 pub async fn next(&mut self) -> Result<Message, Error> {
63 future::poll_fn(|cx| self.poll_next(cx)).await
64 }
65}
66
67/// A cache of type info and prepared statements for fetching type info
68/// (corresponding to the queries in the [prepare](prepare) module).
69#[derive(Default)]
70struct CachedTypeInfo {
71 /// A statement for basic information for a type from its
72 /// OID. Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_QUERY) (or its
73 /// fallback).
74 typeinfo: Option<Statement>,
75 /// A statement for getting information for a composite type from its OID.
76 /// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY).
77 typeinfo_composite: Option<Statement>,
78 /// A statement for getting information for a composite type from its OID.
79 /// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY) (or
80 /// its fallback).
81 typeinfo_enum: Option<Statement>,
82
83 /// Cache of types already looked up.
84 types: HashMap<Oid, Type>,
85}
86
87pub struct InnerClient {
88 sender: mpsc::UnboundedSender<Request>,
89 cached_typeinfo: Mutex<CachedTypeInfo>,
90
91 /// A buffer to use when writing out postgres commands.
92 buffer: Mutex<BytesMut>,
93}
94
95impl InnerClient {
96 pub fn send(&self, messages: RequestMessages) -> Result<Responses, Error> {
97 let (sender, receiver) = mpsc::channel(1);
98 let request = Request { messages, sender };
99 self.sender
100 .unbounded_send(request)
101 .map_err(|_| Error::closed())?;
102
103 Ok(Responses {
104 receiver,
105 cur: BackendMessages::empty(),
106 })
107 }
108
109 pub fn typeinfo(&self) -> Option<Statement> {
110 self.cached_typeinfo.lock().typeinfo.clone()
111 }
112
113 pub fn set_typeinfo(&self, statement: &Statement) {
114 self.cached_typeinfo.lock().typeinfo = Some(statement.clone());
115 }
116
117 pub fn typeinfo_composite(&self) -> Option<Statement> {
118 self.cached_typeinfo.lock().typeinfo_composite.clone()
119 }
120
121 pub fn set_typeinfo_composite(&self, statement: &Statement) {
122 self.cached_typeinfo.lock().typeinfo_composite = Some(statement.clone());
123 }
124
125 pub fn typeinfo_enum(&self) -> Option<Statement> {
126 self.cached_typeinfo.lock().typeinfo_enum.clone()
127 }
128
129 pub fn set_typeinfo_enum(&self, statement: &Statement) {
130 self.cached_typeinfo.lock().typeinfo_enum = Some(statement.clone());
131 }
132
133 pub fn type_(&self, oid: Oid) -> Option<Type> {
134 self.cached_typeinfo.lock().types.get(&oid).cloned()
135 }
136
137 pub fn set_type(&self, oid: Oid, type_: &Type) {
138 self.cached_typeinfo.lock().types.insert(oid, type_.clone());
139 }
140
141 pub fn clear_type_cache(&self) {
142 self.cached_typeinfo.lock().types.clear();
143 }
144
145 /// Call the given function with a buffer to be used when writing out
146 /// postgres commands.
147 pub fn with_buf<F, R>(&self, f: F) -> R
148 where
149 F: FnOnce(&mut BytesMut) -> R,
150 {
151 let mut buffer = self.buffer.lock();
152 let r = f(&mut buffer);
153 buffer.clear();
154 r
155 }
156}
157
158#[cfg(feature = "runtime")]
159#[derive(Clone)]
160pub(crate) struct SocketConfig {
161 pub addr: Addr,
162 pub hostname: Option<String>,
163 pub port: u16,
164 pub connect_timeout: Option<Duration>,
165 pub tcp_user_timeout: Option<Duration>,
166 pub keepalive: Option<KeepaliveConfig>,
167}
168
169#[cfg(feature = "runtime")]
170#[derive(Clone)]
171pub(crate) enum Addr {
172 Tcp(IpAddr),
173 #[cfg(unix)]
174 Unix(PathBuf),
175}
176
177/// An asynchronous PostgreSQL client.
178///
179/// The client is one half of what is returned when a connection is established. Users interact with the database
180/// through this client object.
181pub struct Client {
182 inner: Arc<InnerClient>,
183 #[cfg(feature = "runtime")]
184 socket_config: Option<SocketConfig>,
185 ssl_mode: SslMode,
186 ssl_negotiation: SslNegotiation,
187 process_id: i32,
188 secret_key: i32,
189}
190
191impl Client {
192 pub(crate) fn new(
193 sender: mpsc::UnboundedSender<Request>,
194 ssl_mode: SslMode,
195 ssl_negotiation: SslNegotiation,
196 process_id: i32,
197 secret_key: i32,
198 ) -> Client {
199 Client {
200 inner: Arc::new(InnerClient {
201 sender,
202 cached_typeinfo: Default::default(),
203 buffer: Default::default(),
204 }),
205 #[cfg(feature = "runtime")]
206 socket_config: None,
207 ssl_mode,
208 ssl_negotiation,
209 process_id,
210 secret_key,
211 }
212 }
213
214 pub(crate) fn inner(&self) -> &Arc<InnerClient> {
215 &self.inner
216 }
217
218 #[cfg(feature = "runtime")]
219 pub(crate) fn set_socket_config(&mut self, socket_config: SocketConfig) {
220 self.socket_config = Some(socket_config);
221 }
222
223 /// Creates a new prepared statement.
224 ///
225 /// Prepared statements can be executed repeatedly, and may contain query parameters (indicated by `$1`, `$2`, etc),
226 /// which are set when executed. Prepared statements can only be used with the connection that created them.
227 pub async fn prepare(&self, query: &str) -> Result<Statement, Error> {
228 self.prepare_typed(query, &[]).await
229 }
230
231 /// Like `prepare`, but allows the types of query parameters to be explicitly specified.
232 ///
233 /// The list of types may be smaller than the number of parameters - the types of the remaining parameters will be
234 /// inferred. For example, `client.prepare_typed(query, &[])` is equivalent to `client.prepare(query)`.
235 pub async fn prepare_typed(
236 &self,
237 query: &str,
238 parameter_types: &[Type],
239 ) -> Result<Statement, Error> {
240 prepare::prepare(&self.inner, query, parameter_types).await
241 }
242
243 /// Executes a statement, returning a vector of the resulting rows.
244 ///
245 /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
246 /// provided, 1-indexed.
247 ///
248 /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
249 /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
250 /// with the `prepare` method.
251 pub async fn query<T>(
252 &self,
253 statement: &T,
254 params: &[&(dyn ToSql + Sync)],
255 ) -> Result<Vec<Row>, Error>
256 where
257 T: ?Sized + ToStatement,
258 {
259 self.query_raw(statement, slice_iter(params))
260 .await?
261 .try_collect()
262 .await
263 }
264
265 /// Returns a vector of scalars.
266 pub async fn query_scalar<R: FromSqlOwned, T>(
267 &self,
268 statement: &T,
269 params: &[&(dyn ToSql + Sync)],
270 ) -> Result<Vec<R>, Error>
271 where
272 T: ?Sized + ToStatement + fmt::Debug,
273 {
274 let rows: Vec<Row> = self
275 .query_raw(statement, slice_iter(params))
276 .await?
277 .try_collect()
278 .await?;
279
280 if let Some(row) = rows.first() {
281 if row.len() != 1 {
282 return Err(Error::column_count());
283 }
284 };
285
286 rows.into_iter().map(|r| r.try_get(0)).collect()
287 }
288
289 /// Executes a statement which returns a single row, returning it.
290 ///
291 /// Returns an error if the query does not return exactly one row.
292 ///
293 /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
294 /// provided, 1-indexed.
295 ///
296 /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
297 /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
298 /// with the `prepare` method.
299 pub async fn query_one<T>(
300 &self,
301 statement: &T,
302 params: &[&(dyn ToSql + Sync)],
303 ) -> Result<Row, Error>
304 where
305 T: ?Sized + ToStatement,
306 {
307 self.query_opt(statement, params)
308 .await
309 .and_then(|res| res.ok_or_else(Error::row_count))
310 }
311
312 /// Like [`Client::query_one`] but returns one scalar.
313 pub async fn query_one_scalar<R: FromSqlOwned, T>(
314 &self,
315 statement: &T,
316 params: &[&(dyn ToSql + Sync)],
317 ) -> Result<R, Error>
318 where
319 T: ?Sized + ToStatement + fmt::Debug,
320 {
321 let row = self.query_one(statement, params).await?;
322
323 if row.len() != 1 {
324 return Err(Error::column_count());
325 }
326
327 row.try_get(0)
328 }
329
330 /// Executes a statements which returns zero or one rows, returning it.
331 ///
332 /// Returns an error if the query returns more than one row.
333 ///
334 /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
335 /// provided, 1-indexed.
336 ///
337 /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
338 /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
339 /// with the `prepare` method.
340 pub async fn query_opt<T>(
341 &self,
342 statement: &T,
343 params: &[&(dyn ToSql + Sync)],
344 ) -> Result<Option<Row>, Error>
345 where
346 T: ?Sized + ToStatement,
347 {
348 let mut stream = pin!(self.query_raw(statement, slice_iter(params)).await?);
349
350 let mut first = None;
351
352 // Originally this was two calls to `try_next().await?`,
353 // once for the first element, and second to error if more than one.
354 //
355 // However, this new form with only one .await in a loop generates
356 // slightly smaller codegen/stack usage for the resulting future.
357 while let Some(row) = stream.try_next().await? {
358 if first.is_some() {
359 return Err(Error::row_count());
360 }
361
362 first = Some(row);
363 }
364
365 Ok(first)
366 }
367
368 /// Like [`Client::query_opt`] but returns an optional scalar.
369 pub async fn query_opt_scalar<R: FromSqlOwned, T>(
370 &self,
371 statement: &T,
372 params: &[&(dyn ToSql + Sync)],
373 ) -> Result<Option<R>, Error>
374 where
375 T: ?Sized + ToStatement + fmt::Debug,
376 {
377 let row = self.query_opt(statement, params).await?;
378
379 if let Some(row) = &row {
380 if row.len() != 1 {
381 return Err(Error::column_count());
382 }
383 }
384
385 row.map(|x| x.try_get::<_, R>(0)).transpose()
386 }
387
388 /// The maximally flexible version of [`query`].
389 ///
390 /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
391 /// provided, 1-indexed.
392 ///
393 /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
394 /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
395 /// with the `prepare` method.
396 ///
397 /// [`query`]: #method.query
398 ///
399 /// # Examples
400 ///
401 /// ```no_run
402 /// # async fn async_main(client: &tokio_postgres::Client) -> Result<(), tokio_postgres::Error> {
403 /// use std::pin::pin;
404 /// use futures_util::TryStreamExt;
405 ///
406 /// let params: Vec<String> = vec![
407 /// "first param".into(),
408 /// "second param".into(),
409 /// ];
410 /// let mut it = pin!(client.query_raw(
411 /// "SELECT foo FROM bar WHERE biz = $1 AND baz = $2",
412 /// params,
413 /// ).await?);
414 ///
415 /// while let Some(row) = it.try_next().await? {
416 /// let foo: i32 = row.get("foo");
417 /// println!("foo: {}", foo);
418 /// }
419 /// # Ok(())
420 /// # }
421 /// ```
422 pub async fn query_raw<T, P, I>(&self, statement: &T, params: I) -> Result<RowStream, Error>
423 where
424 T: ?Sized + ToStatement,
425 P: BorrowToSql,
426 I: IntoIterator<Item = P>,
427 I::IntoIter: ExactSizeIterator,
428 {
429 let statement = statement.__convert().into_statement(&self.inner).await?;
430 query::query(&self.inner, statement, params).await
431 }
432
433 /// Like `query`, but requires the types of query parameters to be explicitly specified.
434 ///
435 /// Compared to `query`, this method allows performing queries without three round trips (for
436 /// prepare, execute, and close) by requiring the caller to specify parameter values along with
437 /// their Postgres type. Thus, this is suitable in environments where prepared statements aren't
438 /// supported (such as Cloudflare Workers with Hyperdrive).
439 ///
440 /// A statement may contain parameters, specified by `$n`, where `n` is the index of the
441 /// parameter of the list provided, 1-indexed.
442 pub async fn query_typed(
443 &self,
444 query: &str,
445 params: &[(&(dyn ToSql + Sync), Type)],
446 ) -> Result<Vec<Row>, Error> {
447 self.query_typed_raw(query, params.iter().map(|(v, t)| (*v, t.clone())))
448 .await?
449 .try_collect()
450 .await
451 }
452
453 /// Like `query_one`, but requires the types of query parameters to be explicitly specified.
454 ///
455 /// Compared to `query_one`, this method allows performing queries without three round trips (for
456 /// prepare, execute, and close) by requiring the caller to specify parameter values along with
457 /// their Postgres type. Thus, this is suitable in environments where prepared statements aren't
458 /// supported (such as Cloudflare Workers with Hyperdrive).
459 ///
460 /// Executes a statement which returns a single row, returning it.
461 ///
462 /// Returns an error if the query does not return exactly one row.
463 ///
464 /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
465 /// provided, 1-indexed.
466 ///
467 pub async fn query_typed_one(
468 &self,
469 statement: &str,
470 params: &[(&(dyn ToSql + Sync), Type)],
471 ) -> Result<Row, Error> {
472 self.query_typed_opt(statement, params)
473 .await
474 .and_then(|res| res.ok_or_else(Error::row_count))
475 }
476
477 /// Like `query_one`, but requires the types of query parameters to be explicitly specified.
478 ///
479 /// Compared to `query_one`, this method allows performing queries without three round trips (for
480 /// prepare, execute, and close) by requiring the caller to specify parameter values along with
481 /// their Postgres type. Thus, this is suitable in environments where prepared statements aren't
482 /// supported (such as Cloudflare Workers with Hyperdrive).
483 ///
484 /// A statement may contain parameters, specified by `$n`, where `n` is the index of the
485 /// parameter of the list provided, 1-indexed.
486 /// Executes a statements which returns zero or one rows, returning it.
487 ///
488 /// Returns an error if the query returns more than one row.
489 pub async fn query_typed_opt(
490 &self,
491 statement: &str,
492 params: &[(&(dyn ToSql + Sync), Type)],
493 ) -> Result<Option<Row>, Error> {
494 let mut stream = pin!(
495 self.query_typed_raw(statement, params.iter().map(|(v, t)| (*v, t.clone())))
496 .await?
497 );
498
499 let mut first = None;
500
501 // Originally this was two calls to `try_next().await?`,
502 // once for the first element, and second to error if more than one.
503 //
504 // However, this new form with only one .await in a loop generates
505 // slightly smaller codegen/stack usage for the resulting future.
506 while let Some(row) = stream.try_next().await? {
507 if first.is_some() {
508 return Err(Error::row_count());
509 }
510
511 first = Some(row);
512 }
513
514 Ok(first)
515 }
516
517 /// The maximally flexible version of [`query_typed`].
518 ///
519 /// Compared to `query`, this method allows performing queries without three round trips (for
520 /// prepare, execute, and close) by requiring the caller to specify parameter values along with
521 /// their Postgres type. Thus, this is suitable in environments where prepared statements aren't
522 /// supported (such as Cloudflare Workers with Hyperdrive).
523 ///
524 /// A statement may contain parameters, specified by `$n`, where `n` is the index of the
525 /// parameter of the list provided, 1-indexed.
526 ///
527 /// [`query_typed`]: #method.query_typed
528 ///
529 /// # Examples
530 ///
531 /// ```no_run
532 /// # async fn async_main(client: &tokio_postgres::Client) -> Result<(), tokio_postgres::Error> {
533 /// use std::pin::pin;
534 /// use futures_util::{TryStreamExt};
535 /// use tokio_postgres::types::Type;
536 ///
537 /// let params: Vec<(String, Type)> = vec![
538 /// ("first param".into(), Type::TEXT),
539 /// ("second param".into(), Type::TEXT),
540 /// ];
541 /// let mut it = pin!(client.query_typed_raw(
542 /// "SELECT foo FROM bar WHERE biz = $1 AND baz = $2",
543 /// params,
544 /// ).await?);
545 ///
546 /// while let Some(row) = it.try_next().await? {
547 /// let foo: i32 = row.get("foo");
548 /// println!("foo: {}", foo);
549 /// }
550 /// # Ok(())
551 /// # }
552 /// ```
553 pub async fn query_typed_raw<P, I>(&self, query: &str, params: I) -> Result<RowStream, Error>
554 where
555 P: BorrowToSql,
556 I: IntoIterator<Item = (P, Type)>,
557 {
558 query::query_typed(&self.inner, query, params).await
559 }
560
561 /// Executes a statement, returning the number of rows modified.
562 ///
563 /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
564 /// provided, 1-indexed.
565 ///
566 /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
567 /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
568 /// with the `prepare` method.
569 ///
570 /// If the statement does not modify any rows (e.g. `SELECT`), 0 is returned.
571 pub async fn execute<T>(
572 &self,
573 statement: &T,
574 params: &[&(dyn ToSql + Sync)],
575 ) -> Result<u64, Error>
576 where
577 T: ?Sized + ToStatement,
578 {
579 self.execute_raw(statement, slice_iter(params)).await
580 }
581
582 /// Executes a statement, returning the number of rows modified.
583 ///
584 /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
585 /// provided, 1-indexed.
586 ///
587 /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
588 /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
589 /// with the `prepare` method.
590 ///
591 /// If the statement does not modify any rows (e.g. `SELECT`), 0 is returned.
592 pub async fn execute_typed(
593 &self,
594 statement: &str,
595 params: &[(&(dyn ToSql + Sync), Type)],
596 ) -> Result<u64, Error> {
597 query::execute_typed(
598 &self.inner,
599 statement,
600 params.iter().map(|(v, t)| (*v, t.clone())),
601 )
602 .await
603 }
604
605 /// The maximally flexible version of [`execute`].
606 ///
607 /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
608 /// provided, 1-indexed.
609 ///
610 /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
611 /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
612 /// with the `prepare` method.
613 ///
614 /// [`execute`]: #method.execute
615 pub async fn execute_raw<T, P, I>(&self, statement: &T, params: I) -> Result<u64, Error>
616 where
617 T: ?Sized + ToStatement,
618 P: BorrowToSql,
619 I: IntoIterator<Item = P>,
620 I::IntoIter: ExactSizeIterator,
621 {
622 let statement = statement.__convert().into_statement(&self.inner).await?;
623 query::execute(self.inner(), statement, params).await
624 }
625
626 /// Executes a `COPY FROM STDIN` statement, returning a sink used to write the copy data.
627 ///
628 /// PostgreSQL does not support parameters in `COPY` statements, so this method does not take any. The copy *must*
629 /// be explicitly completed via the `Sink::close` or `finish` methods. If it is not, the copy will be aborted.
630 pub async fn copy_in<T, U>(&self, statement: &T) -> Result<CopyInSink<U>, Error>
631 where
632 T: ?Sized + ToStatement,
633 U: Buf + 'static + Send,
634 {
635 let statement = statement.__convert().into_statement(&self.inner).await?;
636 copy_in::copy_in(self.inner(), statement).await
637 }
638
639 /// Executes a `COPY TO STDOUT` statement, returning a stream of the resulting data.
640 ///
641 /// PostgreSQL does not support parameters in `COPY` statements, so this method does not take any.
642 pub async fn copy_out<T>(&self, statement: &T) -> Result<CopyOutStream, Error>
643 where
644 T: ?Sized + ToStatement,
645 {
646 let statement = statement.__convert().into_statement(&self.inner).await?;
647 copy_out::copy_out(self.inner(), statement).await
648 }
649
650 /// Executes a sequence of SQL statements using the simple query protocol, returning the resulting rows.
651 ///
652 /// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that
653 /// point. The simple query protocol returns the values in rows as strings rather than in their binary encodings,
654 /// so the associated row type doesn't work with the `FromSql` trait. Rather than simply returning a list of the
655 /// rows, this method returns a list of an enum which indicates either the completion of one of the commands,
656 /// or a row of data. This preserves the framing between the separate statements in the request.
657 ///
658 /// # Warning
659 ///
660 /// Prepared statements should be use for any query which contains user-specified data, as they provided the
661 /// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
662 /// them to this method!
663 pub async fn simple_query(&self, query: &str) -> Result<Vec<SimpleQueryMessage>, Error> {
664 self.simple_query_raw(query).await?.try_collect().await
665 }
666
667 /// Executes a sequence of SQL statements using the simple query protocol, returning the resulting rows as a stream.
668 ///
669 /// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that
670 /// point. The simple query protocol returns the values in rows as strings rather than in their binary encodings,
671 /// so the associated row type doesn't work with the `FromSql` trait. Rather than simply returning a list of the
672 /// rows, this method returns a list of an enum which indicates either the completion of one of the commands,
673 /// or a row of data. This preserves the framing between the separate statements in the request.
674 ///
675 /// # Warning
676 ///
677 /// Prepared statements should be use for any query which contains user-specified data, as they provided the
678 /// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
679 /// them to this method!
680 pub async fn simple_query_raw(&self, query: &str) -> Result<SimpleQueryStream, Error> {
681 simple_query::simple_query(self.inner(), query).await
682 }
683
684 /// Executes a sequence of SQL statements using the simple query protocol.
685 ///
686 /// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that
687 /// point. This is intended for use when, for example, initializing a database schema.
688 ///
689 /// # Warning
690 ///
691 /// Prepared statements should be use for any query which contains user-specified data, as they provided the
692 /// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
693 /// them to this method!
694 pub async fn batch_execute(&self, query: &str) -> Result<(), Error> {
695 simple_query::batch_execute(self.inner(), query).await
696 }
697
698 /// Check that the connection is alive and wait for the confirmation.
699 pub async fn check_connection(&self) -> Result<(), Error> {
700 // sync is a very quick message to test the connection health.
701 query::sync(self.inner()).await
702 }
703
704 /// Begins a new database transaction.
705 ///
706 /// The transaction will roll back by default - use the `commit` method to commit it.
707 pub async fn transaction(&mut self) -> Result<Transaction<'_>, Error> {
708 self.build_transaction().start().await
709 }
710
711 /// Returns a builder for a transaction with custom settings.
712 ///
713 /// Unlike the `transaction` method, the builder can be used to control the transaction's isolation level and other
714 /// attributes.
715 pub fn build_transaction(&mut self) -> TransactionBuilder<'_> {
716 TransactionBuilder::new(self)
717 }
718
719 /// Constructs a cancellation token that can later be used to request cancellation of a query running on the
720 /// connection associated with this client.
721 pub fn cancel_token(&self) -> CancelToken {
722 CancelToken {
723 #[cfg(feature = "runtime")]
724 socket_config: self.socket_config.clone(),
725 ssl_mode: self.ssl_mode,
726 ssl_negotiation: self.ssl_negotiation,
727 process_id: self.process_id,
728 secret_key: self.secret_key,
729 }
730 }
731
732 /// Attempts to cancel an in-progress query.
733 ///
734 /// The server provides no information about whether a cancellation attempt was successful or not. An error will
735 /// only be returned if the client was unable to connect to the database.
736 ///
737 /// Requires the `runtime` Cargo feature (enabled by default).
738 #[cfg(feature = "runtime")]
739 #[deprecated(since = "0.6.0", note = "use Client::cancel_token() instead")]
740 pub async fn cancel_query<T>(&self, tls: T) -> Result<(), Error>
741 where
742 T: MakeTlsConnect<Socket>,
743 {
744 self.cancel_token().cancel_query(tls).await
745 }
746
747 /// Like `cancel_query`, but uses a stream which is already connected to the server rather than opening a new
748 /// connection itself.
749 #[deprecated(since = "0.6.0", note = "use Client::cancel_token() instead")]
750 pub async fn cancel_query_raw<S, T>(&self, stream: S, tls: T) -> Result<(), Error>
751 where
752 S: AsyncRead + AsyncWrite + Unpin,
753 T: TlsConnect<S>,
754 {
755 self.cancel_token().cancel_query_raw(stream, tls).await
756 }
757
758 /// Clears the client's type information cache.
759 ///
760 /// When user-defined types are used in a query, the client loads their definitions from the database and caches
761 /// them for the lifetime of the client. If those definitions are changed in the database, this method can be used
762 /// to flush the local cache and allow the new, updated definitions to be loaded.
763 pub fn clear_type_cache(&self) {
764 self.inner().clear_type_cache();
765 }
766
767 /// Determines if the connection to the server has already closed.
768 ///
769 /// In that case, all future queries will fail.
770 pub fn is_closed(&self) -> bool {
771 self.inner.sender.is_closed()
772 }
773
774 #[doc(hidden)]
775 pub fn __private_api_rollback(&self, name: Option<&str>) {
776 let buf = self.inner().with_buf(|buf| {
777 if let Some(name) = name {
778 frontend::query(&format!("ROLLBACK TO {}", name), buf).unwrap();
779 } else {
780 frontend::query("ROLLBACK", buf).unwrap();
781 }
782 buf.split().freeze()
783 });
784 let _ = self
785 .inner()
786 .send(RequestMessages::Single(FrontendMessage::Raw(buf)));
787 }
788
789 #[doc(hidden)]
790 pub fn __private_api_close(&mut self) {
791 self.inner.sender.close_channel()
792 }
793}
794
795impl fmt::Debug for Client {
796 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
797 f.debug_struct("Client").finish()
798 }
799}