yb_postgres/
client.rs

1use crate::connection::Connection;
2use crate::{
3    CancelToken, Config, CopyInWriter, CopyOutReader, Notifications, RowIter, Statement,
4    ToStatement, Transaction, TransactionBuilder,
5};
6use std::task::Poll;
7use std::time::Duration;
8use yb_tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
9use yb_tokio_postgres::types::{BorrowToSql, ToSql, Type};
10use yb_tokio_postgres::{Error, Row, SimpleQueryMessage, Socket, close};
11
12/// A synchronous PostgreSQL client.
13pub struct Client {
14    connection: Connection,
15    client: yb_tokio_postgres::Client,
16}
17
18impl Drop for Client {
19    fn drop(&mut self) {
20        close(&self.client);
21        let _ = self.close_inner();
22    }
23}
24
25impl Client {
26    pub(crate) fn new(connection: Connection, client: yb_tokio_postgres::Client) -> Client {
27        Client { connection, client }
28    }
29
30    /// A convenience function which parses a configuration string into a `Config` and then connects to the database.
31    ///
32    /// See the documentation for [`Config`] for information about the connection syntax.
33    ///
34    /// [`Config`]: config/struct.Config.html
35    pub fn connect<T>(params: &str, tls_mode: T) -> Result<Client, Error>
36    where
37        T: MakeTlsConnect<Socket> + 'static + Send,
38        T::TlsConnect: Send,
39        T::Stream: Send,
40        <T::TlsConnect as TlsConnect<Socket>>::Future: Send,
41    {
42        params.parse::<Config>()?.connect(tls_mode)
43    }
44
45    /// Returns a new `Config` object which can be used to configure and connect to a database.
46    pub fn configure() -> Config {
47        Config::new()
48    }
49
50    /// Executes a statement, returning the number of rows modified.
51    ///
52    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
53    /// provided, 1-indexed.
54    ///
55    /// If the statement does not modify any rows (e.g. `SELECT`), 0 is returned.
56    ///
57    /// The `query` argument can either be a `Statement`, or a raw query string. If the same statement will be
58    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
59    /// with the `prepare` method.
60    ///
61    /// # Example
62    ///
63    /// ```no_run
64    /// use yb_postgres::{Client, NoTls};
65    ///
66    /// # fn main() -> Result<(), yb_postgres::Error> {
67    /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
68    ///
69    /// let bar = 1i32;
70    /// let baz = true;
71    /// let rows_updated = client.execute(
72    ///     "UPDATE foo SET bar = $1 WHERE baz = $2",
73    ///     &[&bar, &baz],
74    /// )?;
75    ///
76    /// println!("{} rows updated", rows_updated);
77    /// # Ok(())
78    /// # }
79    /// ```
80    pub fn execute<T>(&mut self, query: &T, params: &[&(dyn ToSql + Sync)]) -> Result<u64, Error>
81    where
82        T: ?Sized + ToStatement,
83    {
84        self.connection.block_on(self.client.execute(query, params))
85    }
86
87    /// Executes a statement, returning the resulting rows.
88    ///
89    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
90    /// provided, 1-indexed.
91    ///
92    /// The `query` argument can either be a `Statement`, or a raw query string. If the same statement will be
93    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
94    /// with the `prepare` method.
95    ///
96    /// # Examples
97    ///
98    /// ```no_run
99    /// use yb_postgres::{Client, NoTls};
100    ///
101    /// # fn main() -> Result<(), yb_postgres::Error> {
102    /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
103    ///
104    /// let baz = true;
105    /// for row in client.query("SELECT foo FROM bar WHERE baz = $1", &[&baz])? {
106    ///     let foo: i32 = row.get("foo");
107    ///     println!("foo: {}", foo);
108    /// }
109    /// # Ok(())
110    /// # }
111    /// ```
112    pub fn query<T>(&mut self, query: &T, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Row>, Error>
113    where
114        T: ?Sized + ToStatement,
115    {
116        self.connection.block_on(self.client.query(query, params))
117    }
118
119    /// Executes a statement which returns a single row, returning it.
120    ///
121    /// Returns an error if the query does not return exactly one row.
122    ///
123    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
124    /// provided, 1-indexed.
125    ///
126    /// The `query` argument can either be a `Statement`, or a raw query string. If the same statement will be
127    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
128    /// with the `prepare` method.
129    ///
130    /// # Examples
131    ///
132    /// ```no_run
133    /// use yb_postgres::{Client, NoTls};
134    ///
135    /// # fn main() -> Result<(), yb_postgres::Error> {
136    /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
137    ///
138    /// let baz = true;
139    /// let row = client.query_one("SELECT foo FROM bar WHERE baz = $1", &[&baz])?;
140    /// let foo: i32 = row.get("foo");
141    /// println!("foo: {}", foo);
142    /// # Ok(())
143    /// # }
144    /// ```
145    pub fn query_one<T>(&mut self, query: &T, params: &[&(dyn ToSql + Sync)]) -> Result<Row, Error>
146    where
147        T: ?Sized + ToStatement,
148    {
149        self.connection
150            .block_on(self.client.query_one(query, params))
151    }
152
153    /// Executes a statement which returns zero or one rows, returning it.
154    ///
155    /// Returns an error if the query returns more than one row.
156    ///
157    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
158    /// provided, 1-indexed.
159    ///
160    /// The `query` argument can either be a `Statement`, or a raw query string. If the same statement will be
161    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
162    /// with the `prepare` method.
163    ///
164    /// # Examples
165    ///
166    /// ```no_run
167    /// use yb_postgres::{Client, NoTls};
168    ///
169    /// # fn main() -> Result<(), yb_postgres::Error> {
170    /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
171    ///
172    /// let baz = true;
173    /// let row = client.query_opt("SELECT foo FROM bar WHERE baz = $1", &[&baz])?;
174    /// match row {
175    ///     Some(row) => {
176    ///         let foo: i32 = row.get("foo");
177    ///         println!("foo: {}", foo);
178    ///     }
179    ///     None => println!("no matching foo"),
180    /// }
181    /// # Ok(())
182    /// # }
183    /// ```
184    pub fn query_opt<T>(
185        &mut self,
186        query: &T,
187        params: &[&(dyn ToSql + Sync)],
188    ) -> Result<Option<Row>, Error>
189    where
190        T: ?Sized + ToStatement,
191    {
192        self.connection
193            .block_on(self.client.query_opt(query, params))
194    }
195
196    /// A maximally-flexible version of `query`.
197    ///
198    /// It takes an iterator of parameters rather than a slice, and returns an iterator of rows rather than collecting
199    /// them into an array.
200    ///
201    /// # Examples
202    ///
203    /// ```no_run
204    /// use yb_postgres::{Client, NoTls};
205    /// use fallible_iterator::FallibleIterator;
206    /// use std::iter;
207    ///
208    /// # fn main() -> Result<(), yb_postgres::Error> {
209    /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
210    ///
211    /// let baz = true;
212    /// let mut it = client.query_raw("SELECT foo FROM bar WHERE baz = $1", iter::once(baz))?;
213    ///
214    /// while let Some(row) = it.next()? {
215    ///     let foo: i32 = row.get("foo");
216    ///     println!("foo: {}", foo);
217    /// }
218    /// # Ok(())
219    /// # }
220    /// ```
221    ///
222    /// If you have a type like `Vec<T>` where `T: ToSql` Rust will not know how to use it as params. To get around
223    /// this the type must explicitly be converted to `&dyn ToSql`.
224    ///
225    /// ```no_run
226    /// # use yb_postgres::{Client, NoTls};
227    /// use yb_postgres::types::ToSql;
228    /// use fallible_iterator::FallibleIterator;
229    /// # fn main() -> Result<(), yb_postgres::Error> {
230    /// # let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
231    ///
232    /// let params: Vec<String> = vec![
233    ///     "first param".into(),
234    ///     "second param".into(),
235    /// ];
236    /// let mut it = client.query_raw(
237    ///     "SELECT foo FROM bar WHERE biz = $1 AND baz = $2",
238    ///     params,
239    /// )?;
240    ///
241    /// while let Some(row) = it.next()? {
242    ///     let foo: i32 = row.get("foo");
243    ///     println!("foo: {}", foo);
244    /// }
245    /// # Ok(())
246    /// # }
247    /// ```
248    pub fn query_raw<T, P, I>(&mut self, query: &T, params: I) -> Result<RowIter<'_>, Error>
249    where
250        T: ?Sized + ToStatement,
251        P: BorrowToSql,
252        I: IntoIterator<Item = P>,
253        I::IntoIter: ExactSizeIterator,
254    {
255        let stream = self
256            .connection
257            .block_on(self.client.query_raw(query, params))?;
258        Ok(RowIter::new(self.connection.as_ref(), stream))
259    }
260
261    /// Creates a new prepared statement.
262    ///
263    /// Prepared statements can be executed repeatedly, and may contain query parameters (indicated by `$1`, `$2`, etc),
264    /// which are set when executed. Prepared statements can only be used with the connection that created them.
265    ///
266    /// # Examples
267    ///
268    /// ```no_run
269    /// use yb_postgres::{Client, NoTls};
270    ///
271    /// # fn main() -> Result<(), yb_postgres::Error> {
272    /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
273    ///
274    /// let statement = client.prepare("SELECT name FROM people WHERE id = $1")?;
275    ///
276    /// for id in 0..10 {
277    ///     let rows = client.query(&statement, &[&id])?;
278    ///     let name: &str = rows[0].get(0);
279    ///     println!("name: {}", name);
280    /// }
281    /// # Ok(())
282    /// # }
283    /// ```
284    pub fn prepare(&mut self, query: &str) -> Result<Statement, Error> {
285        self.connection.block_on(self.client.prepare(query))
286    }
287
288    /// Like `prepare`, but allows the types of query parameters to be explicitly specified.
289    ///
290    /// The list of types may be smaller than the number of parameters - the types of the remaining parameters will be
291    /// inferred. For example, `client.prepare_typed(query, &[])` is equivalent to `client.prepare(query)`.
292    ///
293    /// # Examples
294    ///
295    /// ```no_run
296    /// use yb_postgres::{Client, NoTls};
297    /// use yb_postgres::types::Type;
298    ///
299    /// # fn main() -> Result<(), yb_postgres::Error> {
300    /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
301    ///
302    /// let statement = client.prepare_typed(
303    ///     "SELECT name FROM people WHERE id = $1",
304    ///     &[Type::INT8],
305    /// )?;
306    ///
307    /// for id in 0..10 {
308    ///     let rows = client.query(&statement, &[&id])?;
309    ///     let name: &str = rows[0].get(0);
310    ///     println!("name: {}", name);
311    /// }
312    /// # Ok(())
313    /// # }
314    /// ```
315    pub fn prepare_typed(&mut self, query: &str, types: &[Type]) -> Result<Statement, Error> {
316        self.connection
317            .block_on(self.client.prepare_typed(query, types))
318    }
319
320    /// Executes a `COPY FROM STDIN` statement, returning the number of rows created.
321    ///
322    /// The `query` argument can either be a `Statement`, or a raw query string. The data in the provided reader is
323    /// passed along to the server verbatim; it is the caller's responsibility to ensure it uses the proper format.
324    /// PostgreSQL does not support parameters in `COPY` statements, so this method does not take any.
325    ///
326    /// The copy *must* be explicitly completed via the `finish` method. If it is not, the copy will be aborted.
327    ///
328    /// # Examples
329    ///
330    /// ```no_run
331    /// use yb_postgres::{Client, NoTls};
332    /// use std::io::Write;
333    ///
334    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
335    /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
336    ///
337    /// let mut writer = client.copy_in("COPY people FROM stdin")?;
338    /// writer.write_all(b"1\tjohn\n2\tjane\n")?;
339    /// writer.finish()?;
340    /// # Ok(())
341    /// # }
342    /// ```
343    pub fn copy_in<T>(&mut self, query: &T) -> Result<CopyInWriter<'_>, Error>
344    where
345        T: ?Sized + ToStatement,
346    {
347        let sink = self.connection.block_on(self.client.copy_in(query))?;
348        Ok(CopyInWriter::new(self.connection.as_ref(), sink))
349    }
350
351    /// Executes a `COPY TO STDOUT` statement, returning a reader of the resulting data.
352    ///
353    /// The `query` argument can either be a `Statement`, or a raw query string. PostgreSQL does not support parameters
354    /// in `COPY` statements, so this method does not take any.
355    ///
356    /// # Examples
357    ///
358    /// ```no_run
359    /// use yb_postgres::{Client, NoTls};
360    /// use std::io::Read;
361    ///
362    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
363    /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
364    ///
365    /// let mut reader = client.copy_out("COPY people TO stdout")?;
366    /// let mut buf = vec![];
367    /// reader.read_to_end(&mut buf)?;
368    /// # Ok(())
369    /// # }
370    /// ```
371    pub fn copy_out<T>(&mut self, query: &T) -> Result<CopyOutReader<'_>, Error>
372    where
373        T: ?Sized + ToStatement,
374    {
375        let stream = self.connection.block_on(self.client.copy_out(query))?;
376        Ok(CopyOutReader::new(self.connection.as_ref(), stream))
377    }
378
379    /// Executes a sequence of SQL statements using the simple query protocol.
380    ///
381    /// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that
382    /// point. The simple query protocol returns the values in rows as strings rather than in their binary encodings,
383    /// so the associated row type doesn't work with the `FromSql` trait. Rather than simply returning the rows, this
384    /// method returns a sequence of an enum which indicates either the completion of one of the commands, or a row of
385    /// data. This preserves the framing between the separate statements in the request.
386    ///
387    /// This is a simple convenience method over `simple_query_iter`.
388    ///
389    /// # Warning
390    ///
391    /// Prepared statements should be used for any query which contains user-specified data, as they provided the
392    /// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
393    /// them to this method!
394    pub fn simple_query(&mut self, query: &str) -> Result<Vec<SimpleQueryMessage>, Error> {
395        self.connection.block_on(self.client.simple_query(query))
396    }
397
398    /// Validates the connection by performing a simple no-op query.
399    ///
400    /// If the specified timeout is reached before the backend responds, an error will be returned.
401    pub fn is_valid(&mut self, timeout: Duration) -> Result<(), Error> {
402        let inner_client = &self.client;
403        self.connection.block_on(async {
404            let trivial_query = inner_client.simple_query("");
405            tokio::time::timeout(timeout, trivial_query)
406                .await
407                .map_err(|_| Error::__private_api_timeout())?
408                .map(|_| ())
409        })
410    }
411
412    /// Executes a sequence of SQL statements using the simple query protocol.
413    ///
414    /// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that
415    /// point. This is intended for use when, for example, initializing a database schema.
416    ///
417    /// # Warning
418    ///
419    /// Prepared statements should be use for any query which contains user-specified data, as they provided the
420    /// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
421    /// them to this method!
422    pub fn batch_execute(&mut self, query: &str) -> Result<(), Error> {
423        self.connection.block_on(self.client.batch_execute(query))
424    }
425
426    /// Begins a new database transaction.
427    ///
428    /// The transaction will roll back by default - use the `commit` method to commit it.
429    ///
430    /// # Examples
431    ///
432    /// ```no_run
433    /// use yb_postgres::{Client, NoTls};
434    ///
435    /// # fn main() -> Result<(), postgres::Error> {
436    /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
437    ///
438    /// let mut transaction = client.transaction()?;
439    /// transaction.execute("UPDATE foo SET bar = 10", &[])?;
440    /// // ...
441    ///
442    /// transaction.commit()?;
443    /// # Ok(())
444    /// # }
445    /// ```
446    pub fn transaction(&mut self) -> Result<Transaction<'_>, Error> {
447        let transaction = self.connection.block_on(self.client.transaction())?;
448        Ok(Transaction::new(self.connection.as_ref(), transaction))
449    }
450
451    /// Returns a builder for a transaction with custom settings.
452    ///
453    /// Unlike the `transaction` method, the builder can be used to control the transaction's isolation level and other
454    /// attributes.
455    ///
456    /// # Examples
457    ///
458    /// ```no_run
459    /// use yb_postgres::{Client, IsolationLevel, NoTls};
460    ///
461    /// # fn main() -> Result<(), yb_postgres::Error> {
462    /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
463    ///
464    /// let mut transaction = client.build_transaction()
465    ///     .isolation_level(IsolationLevel::RepeatableRead)
466    ///     .start()?;
467    /// transaction.execute("UPDATE foo SET bar = 10", &[])?;
468    /// // ...
469    ///
470    /// transaction.commit()?;
471    /// # Ok(())
472    /// # }
473    /// ```
474    pub fn build_transaction(&mut self) -> TransactionBuilder<'_> {
475        TransactionBuilder::new(self.connection.as_ref(), self.client.build_transaction())
476    }
477
478    /// Returns a structure providing access to asynchronous notifications.
479    ///
480    /// Use the `LISTEN` command to register this connection for notifications.
481    pub fn notifications(&mut self) -> Notifications<'_> {
482        Notifications::new(self.connection.as_ref())
483    }
484
485    /// Constructs a cancellation token that can later be used to request cancellation of a query running on this
486    /// connection.
487    ///
488    /// # Examples
489    ///
490    /// ```no_run
491    /// use yb_postgres::{Client, NoTls};
492    /// use yb_postgres::error::SqlState;
493    /// use std::thread;
494    /// use std::time::Duration;
495    ///
496    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
497    /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
498    ///
499    /// let cancel_token = client.cancel_token();
500    ///
501    /// thread::spawn(move || {
502    ///     // Abort the query after 5s.
503    ///     thread::sleep(Duration::from_secs(5));
504    ///     let _ = cancel_token.cancel_query(NoTls);
505    /// });
506    ///
507    /// match client.simple_query("SELECT long_running_query()") {
508    ///     Err(e) if e.code() == Some(&SqlState::QUERY_CANCELED) => {
509    ///         // Handle canceled query.
510    ///     }
511    ///     Err(err) => return Err(err.into()),
512    ///     Ok(rows) => {
513    ///         // ...
514    ///     }
515    /// }
516    /// // ...
517    ///
518    /// # Ok(())
519    /// # }
520    /// ```
521    pub fn cancel_token(&self) -> CancelToken {
522        CancelToken::new(self.client.cancel_token())
523    }
524
525    /// Clears the client's type information cache.
526    ///
527    /// When user-defined types are used in a query, the client loads their definitions from the database and caches
528    /// them for the lifetime of the client. If those definitions are changed in the database, this method can be used
529    /// to flush the local cache and allow the new, updated definitions to be loaded.
530    pub fn clear_type_cache(&self) {
531        self.client.clear_type_cache();
532    }
533
534    /// Determines if the client's connection has already closed.
535    ///
536    /// If this returns `true`, the client is no longer usable.
537    pub fn is_closed(&self) -> bool {
538        self.client.is_closed()
539    }
540
541    /// Closes the client's connection to the server.
542    ///
543    /// This is equivalent to `Client`'s `Drop` implementation, except that it returns any error encountered to the
544    /// caller.
545    pub fn close(mut self) -> Result<(), Error> {
546        self.close_inner()
547    }
548
549    fn close_inner(&mut self) -> Result<(), Error> {
550        self.client.__private_api_close();
551
552        self.connection.poll_block_on(|_, _, done| {
553            if done {
554                Poll::Ready(Ok(()))
555            } else {
556                Poll::Pending
557            }
558        })
559    }
560}