postgres/
client.rs

1use crate::connection::Connection;
2use crate::{
3    CancelToken, Config, CopyInWriter, CopyOutReader, Notifications, RowIter, Statement,
4    ToStatement, Transaction, TransactionBuilder,
5};
6use std::task::Poll;
7use std::time::Duration;
8use tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
9use tokio_postgres::types::{BorrowToSql, ToSql, Type};
10use tokio_postgres::{Error, Row, SimpleQueryMessage, Socket};
11
12/// A synchronous PostgreSQL client.
13pub struct Client {
14    connection: Connection,
15    client: tokio_postgres::Client,
16}
17
18impl Drop for Client {
19    fn drop(&mut self) {
20        let _ = self.close_inner();
21    }
22}
23
24impl Client {
25    pub(crate) fn new(connection: Connection, client: tokio_postgres::Client) -> Client {
26        Client { connection, client }
27    }
28
29    /// A convenience function which parses a configuration string into a `Config` and then connects to the database.
30    ///
31    /// See the documentation for [`Config`] for information about the connection syntax.
32    ///
33    /// [`Config`]: config/struct.Config.html
34    pub fn connect<T>(params: &str, tls_mode: T) -> Result<Client, Error>
35    where
36        T: MakeTlsConnect<Socket> + 'static + Send,
37        T::TlsConnect: Send,
38        T::Stream: Send,
39        <T::TlsConnect as TlsConnect<Socket>>::Future: Send,
40    {
41        params.parse::<Config>()?.connect(tls_mode)
42    }
43
44    /// Returns a new `Config` object which can be used to configure and connect to a database.
45    pub fn configure() -> Config {
46        Config::new()
47    }
48
49    /// Executes a statement, returning the number of rows modified.
50    ///
51    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
52    /// provided, 1-indexed.
53    ///
54    /// If the statement does not modify any rows (e.g. `SELECT`), 0 is returned.
55    ///
56    /// The `query` argument can either be a `Statement`, or a raw query string. If the same statement will be
57    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
58    /// with the `prepare` method.
59    ///
60    /// # Example
61    ///
62    /// ```no_run
63    /// use postgres::{Client, NoTls};
64    ///
65    /// # fn main() -> Result<(), postgres::Error> {
66    /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
67    ///
68    /// let bar = 1i32;
69    /// let baz = true;
70    /// let rows_updated = client.execute(
71    ///     "UPDATE foo SET bar = $1 WHERE baz = $2",
72    ///     &[&bar, &baz],
73    /// )?;
74    ///
75    /// println!("{} rows updated", rows_updated);
76    /// # Ok(())
77    /// # }
78    /// ```
79    pub fn execute<T>(&mut self, query: &T, params: &[&(dyn ToSql + Sync)]) -> Result<u64, Error>
80    where
81        T: ?Sized + ToStatement,
82    {
83        self.connection.block_on(self.client.execute(query, params))
84    }
85
86    /// Executes a statement, returning the resulting rows.
87    ///
88    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
89    /// provided, 1-indexed.
90    ///
91    /// The `query` argument can either be a `Statement`, or a raw query string. If the same statement will be
92    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
93    /// with the `prepare` method.
94    ///
95    /// # Examples
96    ///
97    /// ```no_run
98    /// use postgres::{Client, NoTls};
99    ///
100    /// # fn main() -> Result<(), postgres::Error> {
101    /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
102    ///
103    /// let baz = true;
104    /// for row in client.query("SELECT foo FROM bar WHERE baz = $1", &[&baz])? {
105    ///     let foo: i32 = row.get("foo");
106    ///     println!("foo: {}", foo);
107    /// }
108    /// # Ok(())
109    /// # }
110    /// ```
111    pub fn query<T>(&mut self, query: &T, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Row>, Error>
112    where
113        T: ?Sized + ToStatement,
114    {
115        self.connection.block_on(self.client.query(query, params))
116    }
117
118    /// Executes a statement which returns a single row, returning it.
119    ///
120    /// Returns an error if the query does not return exactly one row.
121    ///
122    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
123    /// provided, 1-indexed.
124    ///
125    /// The `query` argument can either be a `Statement`, or a raw query string. If the same statement will be
126    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
127    /// with the `prepare` method.
128    ///
129    /// # Examples
130    ///
131    /// ```no_run
132    /// use postgres::{Client, NoTls};
133    ///
134    /// # fn main() -> Result<(), postgres::Error> {
135    /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
136    ///
137    /// let baz = true;
138    /// let row = client.query_one("SELECT foo FROM bar WHERE baz = $1", &[&baz])?;
139    /// let foo: i32 = row.get("foo");
140    /// println!("foo: {}", foo);
141    /// # Ok(())
142    /// # }
143    /// ```
144    pub fn query_one<T>(&mut self, query: &T, params: &[&(dyn ToSql + Sync)]) -> Result<Row, Error>
145    where
146        T: ?Sized + ToStatement,
147    {
148        self.connection
149            .block_on(self.client.query_one(query, params))
150    }
151
152    /// Executes a statement which returns zero or one rows, returning it.
153    ///
154    /// Returns an error if the query returns more than one row.
155    ///
156    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
157    /// provided, 1-indexed.
158    ///
159    /// The `query` argument can either be a `Statement`, or a raw query string. If the same statement will be
160    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
161    /// with the `prepare` method.
162    ///
163    /// # Examples
164    ///
165    /// ```no_run
166    /// use postgres::{Client, NoTls};
167    ///
168    /// # fn main() -> Result<(), postgres::Error> {
169    /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
170    ///
171    /// let baz = true;
172    /// let row = client.query_opt("SELECT foo FROM bar WHERE baz = $1", &[&baz])?;
173    /// match row {
174    ///     Some(row) => {
175    ///         let foo: i32 = row.get("foo");
176    ///         println!("foo: {}", foo);
177    ///     }
178    ///     None => println!("no matching foo"),
179    /// }
180    /// # Ok(())
181    /// # }
182    /// ```
183    pub fn query_opt<T>(
184        &mut self,
185        query: &T,
186        params: &[&(dyn ToSql + Sync)],
187    ) -> Result<Option<Row>, Error>
188    where
189        T: ?Sized + ToStatement,
190    {
191        self.connection
192            .block_on(self.client.query_opt(query, params))
193    }
194
195    /// A maximally-flexible version of `query`.
196    ///
197    /// It takes an iterator of parameters rather than a slice, and returns an iterator of rows rather than collecting
198    /// them into an array.
199    ///
200    /// # Examples
201    ///
202    /// ```no_run
203    /// use postgres::{Client, NoTls};
204    /// use fallible_iterator::FallibleIterator;
205    /// use std::iter;
206    ///
207    /// # fn main() -> Result<(), postgres::Error> {
208    /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
209    ///
210    /// let baz = true;
211    /// let mut it = client.query_raw("SELECT foo FROM bar WHERE baz = $1", iter::once(baz))?;
212    ///
213    /// while let Some(row) = it.next()? {
214    ///     let foo: i32 = row.get("foo");
215    ///     println!("foo: {}", foo);
216    /// }
217    /// # Ok(())
218    /// # }
219    /// ```
220    ///
221    /// If you have a type like `Vec<T>` where `T: ToSql` Rust will not know how to use it as params. To get around
222    /// this the type must explicitly be converted to `&dyn ToSql`.
223    ///
224    /// ```no_run
225    /// # use postgres::{Client, NoTls};
226    /// use postgres::types::ToSql;
227    /// use fallible_iterator::FallibleIterator;
228    /// # fn main() -> Result<(), postgres::Error> {
229    /// # let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
230    ///
231    /// let params: Vec<String> = vec![
232    ///     "first param".into(),
233    ///     "second param".into(),
234    /// ];
235    /// let mut it = client.query_raw(
236    ///     "SELECT foo FROM bar WHERE biz = $1 AND baz = $2",
237    ///     params,
238    /// )?;
239    ///
240    /// while let Some(row) = it.next()? {
241    ///     let foo: i32 = row.get("foo");
242    ///     println!("foo: {}", foo);
243    /// }
244    /// # Ok(())
245    /// # }
246    /// ```
247    pub fn query_raw<T, P, I>(&mut self, query: &T, params: I) -> Result<RowIter<'_>, Error>
248    where
249        T: ?Sized + ToStatement,
250        P: BorrowToSql,
251        I: IntoIterator<Item = P>,
252        I::IntoIter: ExactSizeIterator,
253    {
254        let stream = self
255            .connection
256            .block_on(self.client.query_raw(query, params))?;
257        Ok(RowIter::new(self.connection.as_ref(), stream))
258    }
259
260    /// Like `query`, but requires the types of query parameters to be explicitly specified.
261    ///
262    /// Compared to `query`, this method allows performing queries without three round trips (for
263    /// prepare, execute, and close) by requiring the caller to specify parameter values along with
264    /// their Postgres type. Thus, this is suitable in environments where prepared statements aren't
265    /// supported (such as Cloudflare Workers with Hyperdrive).
266    ///
267    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the
268    /// parameter of the list provided, 1-indexed.
269    pub fn query_typed(
270        &mut self,
271        query: &str,
272        params: &[(&(dyn ToSql + Sync), Type)],
273    ) -> Result<Vec<Row>, Error> {
274        self.connection
275            .block_on(self.client.query_typed(query, params))
276    }
277
278    /// The maximally flexible version of [`query_typed`].
279    ///
280    /// Compared to `query`, this method allows performing queries without three round trips (for
281    /// prepare, execute, and close) by requiring the caller to specify parameter values along with
282    /// their Postgres type. Thus, this is suitable in environments where prepared statements aren't
283    /// supported (such as Cloudflare Workers with Hyperdrive).
284    ///
285    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the
286    /// parameter of the list provided, 1-indexed.
287    ///
288    /// [`query_typed`]: #method.query_typed
289    ///
290    /// # Examples
291    /// ```no_run
292    /// # use postgres::{Client, NoTls};
293    /// use postgres::types::{ToSql, Type};
294    /// use fallible_iterator::FallibleIterator;
295    /// # fn main() -> Result<(), postgres::Error> {
296    /// # let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
297    ///
298    /// let params: Vec<(String, Type)> = vec![
299    ///     ("first param".into(), Type::TEXT),
300    ///     ("second param".into(), Type::TEXT),
301    /// ];
302    /// let mut it = client.query_typed_raw(
303    ///     "SELECT foo FROM bar WHERE biz = $1 AND baz = $2",
304    ///     params,
305    /// )?;
306    ///
307    /// while let Some(row) = it.next()? {
308    ///     let foo: i32 = row.get("foo");
309    ///     println!("foo: {}", foo);
310    /// }
311    /// # Ok(())
312    /// # }
313    /// ```
314    pub fn query_typed_raw<P, I>(&mut self, query: &str, params: I) -> Result<RowIter<'_>, Error>
315    where
316        P: BorrowToSql,
317        I: IntoIterator<Item = (P, Type)>,
318    {
319        let stream = self
320            .connection
321            .block_on(self.client.query_typed_raw(query, params))?;
322        Ok(RowIter::new(self.connection.as_ref(), stream))
323    }
324
325    /// Creates a new prepared statement.
326    ///
327    /// Prepared statements can be executed repeatedly, and may contain query parameters (indicated by `$1`, `$2`, etc),
328    /// which are set when executed. Prepared statements can only be used with the connection that created them.
329    ///
330    /// # Examples
331    ///
332    /// ```no_run
333    /// use postgres::{Client, NoTls};
334    ///
335    /// # fn main() -> Result<(), postgres::Error> {
336    /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
337    ///
338    /// let statement = client.prepare("SELECT name FROM people WHERE id = $1")?;
339    ///
340    /// for id in 0..10 {
341    ///     let rows = client.query(&statement, &[&id])?;
342    ///     let name: &str = rows[0].get(0);
343    ///     println!("name: {}", name);
344    /// }
345    /// # Ok(())
346    /// # }
347    /// ```
348    pub fn prepare(&mut self, query: &str) -> Result<Statement, Error> {
349        self.connection.block_on(self.client.prepare(query))
350    }
351
352    /// Like `prepare`, but allows the types of query parameters to be explicitly specified.
353    ///
354    /// The list of types may be smaller than the number of parameters - the types of the remaining parameters will be
355    /// inferred. For example, `client.prepare_typed(query, &[])` is equivalent to `client.prepare(query)`.
356    ///
357    /// # Examples
358    ///
359    /// ```no_run
360    /// use postgres::{Client, NoTls};
361    /// use postgres::types::Type;
362    ///
363    /// # fn main() -> Result<(), postgres::Error> {
364    /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
365    ///
366    /// let statement = client.prepare_typed(
367    ///     "SELECT name FROM people WHERE id = $1",
368    ///     &[Type::INT8],
369    /// )?;
370    ///
371    /// for id in 0..10 {
372    ///     let rows = client.query(&statement, &[&id])?;
373    ///     let name: &str = rows[0].get(0);
374    ///     println!("name: {}", name);
375    /// }
376    /// # Ok(())
377    /// # }
378    /// ```
379    pub fn prepare_typed(&mut self, query: &str, types: &[Type]) -> Result<Statement, Error> {
380        self.connection
381            .block_on(self.client.prepare_typed(query, types))
382    }
383
384    /// Executes a `COPY FROM STDIN` statement, returning the number of rows created.
385    ///
386    /// The `query` argument can either be a `Statement`, or a raw query string. The data in the provided reader is
387    /// passed along to the server verbatim; it is the caller's responsibility to ensure it uses the proper format.
388    /// PostgreSQL does not support parameters in `COPY` statements, so this method does not take any.
389    ///
390    /// The copy *must* be explicitly completed via the `finish` method. If it is not, the copy will be aborted.
391    ///
392    /// # Examples
393    ///
394    /// ```no_run
395    /// use postgres::{Client, NoTls};
396    /// use std::io::Write;
397    ///
398    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
399    /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
400    ///
401    /// let mut writer = client.copy_in("COPY people FROM stdin")?;
402    /// writer.write_all(b"1\tjohn\n2\tjane\n")?;
403    /// writer.finish()?;
404    /// # Ok(())
405    /// # }
406    /// ```
407    pub fn copy_in<T>(&mut self, query: &T) -> Result<CopyInWriter<'_>, Error>
408    where
409        T: ?Sized + ToStatement,
410    {
411        let sink = self.connection.block_on(self.client.copy_in(query))?;
412        Ok(CopyInWriter::new(self.connection.as_ref(), sink))
413    }
414
415    /// Executes a `COPY TO STDOUT` statement, returning a reader of the resulting data.
416    ///
417    /// The `query` argument can either be a `Statement`, or a raw query string. PostgreSQL does not support parameters
418    /// in `COPY` statements, so this method does not take any.
419    ///
420    /// # Examples
421    ///
422    /// ```no_run
423    /// use postgres::{Client, NoTls};
424    /// use std::io::Read;
425    ///
426    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
427    /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
428    ///
429    /// let mut reader = client.copy_out("COPY people TO stdout")?;
430    /// let mut buf = vec![];
431    /// reader.read_to_end(&mut buf)?;
432    /// # Ok(())
433    /// # }
434    /// ```
435    pub fn copy_out<T>(&mut self, query: &T) -> Result<CopyOutReader<'_>, Error>
436    where
437        T: ?Sized + ToStatement,
438    {
439        let stream = self.connection.block_on(self.client.copy_out(query))?;
440        Ok(CopyOutReader::new(self.connection.as_ref(), stream))
441    }
442
443    /// Executes a sequence of SQL statements using the simple query protocol.
444    ///
445    /// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that
446    /// point. The simple query protocol returns the values in rows as strings rather than in their binary encodings,
447    /// so the associated row type doesn't work with the `FromSql` trait. Rather than simply returning the rows, this
448    /// method returns a sequence of an enum which indicates either the completion of one of the commands, or a row of
449    /// data. This preserves the framing between the separate statements in the request.
450    ///
451    /// This is a simple convenience method over `simple_query_iter`.
452    ///
453    /// # Warning
454    ///
455    /// Prepared statements should be used for any query which contains user-specified data, as they provided the
456    /// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
457    /// them to this method!
458    pub fn simple_query(&mut self, query: &str) -> Result<Vec<SimpleQueryMessage>, Error> {
459        self.connection.block_on(self.client.simple_query(query))
460    }
461
462    /// Validates the connection by performing a simple no-op query.
463    ///
464    /// If the specified timeout is reached before the backend responds, an error will be returned.
465    pub fn is_valid(&mut self, timeout: Duration) -> Result<(), Error> {
466        let inner_client = &self.client;
467        self.connection.block_on(async {
468            let trivial_query = inner_client.simple_query("");
469            tokio::time::timeout(timeout, trivial_query)
470                .await
471                .map_err(|_| Error::__private_api_timeout())?
472                .map(|_| ())
473        })
474    }
475
476    /// Executes a sequence of SQL statements using the simple query protocol.
477    ///
478    /// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that
479    /// point. This is intended for use when, for example, initializing a database schema.
480    ///
481    /// # Warning
482    ///
483    /// Prepared statements should be use for any query which contains user-specified data, as they provided the
484    /// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
485    /// them to this method!
486    pub fn batch_execute(&mut self, query: &str) -> Result<(), Error> {
487        self.connection.block_on(self.client.batch_execute(query))
488    }
489
490    /// Begins a new database transaction.
491    ///
492    /// The transaction will roll back by default - use the `commit` method to commit it.
493    ///
494    /// # Examples
495    ///
496    /// ```no_run
497    /// use postgres::{Client, NoTls};
498    ///
499    /// # fn main() -> Result<(), postgres::Error> {
500    /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
501    ///
502    /// let mut transaction = client.transaction()?;
503    /// transaction.execute("UPDATE foo SET bar = 10", &[])?;
504    /// // ...
505    ///
506    /// transaction.commit()?;
507    /// # Ok(())
508    /// # }
509    /// ```
510    pub fn transaction(&mut self) -> Result<Transaction<'_>, Error> {
511        let transaction = self.connection.block_on(self.client.transaction())?;
512        Ok(Transaction::new(self.connection.as_ref(), transaction))
513    }
514
515    /// Returns a builder for a transaction with custom settings.
516    ///
517    /// Unlike the `transaction` method, the builder can be used to control the transaction's isolation level and other
518    /// attributes.
519    ///
520    /// # Examples
521    ///
522    /// ```no_run
523    /// use postgres::{Client, IsolationLevel, NoTls};
524    ///
525    /// # fn main() -> Result<(), postgres::Error> {
526    /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
527    ///
528    /// let mut transaction = client.build_transaction()
529    ///     .isolation_level(IsolationLevel::RepeatableRead)
530    ///     .start()?;
531    /// transaction.execute("UPDATE foo SET bar = 10", &[])?;
532    /// // ...
533    ///
534    /// transaction.commit()?;
535    /// # Ok(())
536    /// # }
537    /// ```
538    pub fn build_transaction(&mut self) -> TransactionBuilder<'_> {
539        TransactionBuilder::new(self.connection.as_ref(), self.client.build_transaction())
540    }
541
542    /// Returns a structure providing access to asynchronous notifications.
543    ///
544    /// Use the `LISTEN` command to register this connection for notifications.
545    pub fn notifications(&mut self) -> Notifications<'_> {
546        Notifications::new(self.connection.as_ref())
547    }
548
549    /// Constructs a cancellation token that can later be used to request cancellation of a query running on this
550    /// connection.
551    ///
552    /// # Examples
553    ///
554    /// ```no_run
555    /// use postgres::{Client, NoTls};
556    /// use postgres::error::SqlState;
557    /// use std::thread;
558    /// use std::time::Duration;
559    ///
560    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
561    /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
562    ///
563    /// let cancel_token = client.cancel_token();
564    ///
565    /// thread::spawn(move || {
566    ///     // Abort the query after 5s.
567    ///     thread::sleep(Duration::from_secs(5));
568    ///     let _ = cancel_token.cancel_query(NoTls);
569    /// });
570    ///
571    /// match client.simple_query("SELECT long_running_query()") {
572    ///     Err(e) if e.code() == Some(&SqlState::QUERY_CANCELED) => {
573    ///         // Handle canceled query.
574    ///     }
575    ///     Err(err) => return Err(err.into()),
576    ///     Ok(rows) => {
577    ///         // ...
578    ///     }
579    /// }
580    /// // ...
581    ///
582    /// # Ok(())
583    /// # }
584    /// ```
585    pub fn cancel_token(&self) -> CancelToken {
586        CancelToken::new(self.client.cancel_token())
587    }
588
589    /// Clears the client's type information cache.
590    ///
591    /// When user-defined types are used in a query, the client loads their definitions from the database and caches
592    /// them for the lifetime of the client. If those definitions are changed in the database, this method can be used
593    /// to flush the local cache and allow the new, updated definitions to be loaded.
594    pub fn clear_type_cache(&self) {
595        self.client.clear_type_cache();
596    }
597
598    /// Determines if the client's connection has already closed.
599    ///
600    /// If this returns `true`, the client is no longer usable.
601    pub fn is_closed(&self) -> bool {
602        self.client.is_closed()
603    }
604
605    /// Closes the client's connection to the server.
606    ///
607    /// This is equivalent to `Client`'s `Drop` implementation, except that it returns any error encountered to the
608    /// caller.
609    pub fn close(mut self) -> Result<(), Error> {
610        self.close_inner()
611    }
612
613    fn close_inner(&mut self) -> Result<(), Error> {
614        self.client.__private_api_close();
615
616        self.connection.poll_block_on(|_, _, done| {
617            if done {
618                Poll::Ready(Ok(()))
619            } else {
620                Poll::Pending
621            }
622        })
623    }
624}