postgres/client.rs
1use crate::connection::Connection;
2use crate::{
3 CancelToken, Config, CopyInWriter, CopyOutReader, Notifications, RowIter, Statement,
4 ToStatement, Transaction, TransactionBuilder,
5};
6use std::task::Poll;
7use std::time::Duration;
8use tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
9use tokio_postgres::types::{BorrowToSql, ToSql, Type};
10use tokio_postgres::{Error, Row, SimpleQueryMessage, Socket};
11
12/// A synchronous PostgreSQL client.
13pub struct Client {
14 connection: Connection,
15 client: tokio_postgres::Client,
16}
17
18impl Drop for Client {
19 fn drop(&mut self) {
20 let _ = self.close_inner();
21 }
22}
23
24impl Client {
25 pub(crate) fn new(connection: Connection, client: tokio_postgres::Client) -> Client {
26 Client { connection, client }
27 }
28
29 /// A convenience function which parses a configuration string into a `Config` and then connects to the database.
30 ///
31 /// See the documentation for [`Config`] for information about the connection syntax.
32 ///
33 /// [`Config`]: config/struct.Config.html
34 pub fn connect<T>(params: &str, tls_mode: T) -> Result<Client, Error>
35 where
36 T: MakeTlsConnect<Socket> + 'static + Send,
37 T::TlsConnect: Send,
38 T::Stream: Send,
39 <T::TlsConnect as TlsConnect<Socket>>::Future: Send,
40 {
41 params.parse::<Config>()?.connect(tls_mode)
42 }
43
44 /// Returns a new `Config` object which can be used to configure and connect to a database.
45 pub fn configure() -> Config {
46 Config::new()
47 }
48
49 /// Executes a statement, returning the number of rows modified.
50 ///
51 /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
52 /// provided, 1-indexed.
53 ///
54 /// If the statement does not modify any rows (e.g. `SELECT`), 0 is returned.
55 ///
56 /// The `query` argument can either be a `Statement`, or a raw query string. If the same statement will be
57 /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
58 /// with the `prepare` method.
59 ///
60 /// # Example
61 ///
62 /// ```no_run
63 /// use postgres::{Client, NoTls};
64 ///
65 /// # fn main() -> Result<(), postgres::Error> {
66 /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
67 ///
68 /// let bar = 1i32;
69 /// let baz = true;
70 /// let rows_updated = client.execute(
71 /// "UPDATE foo SET bar = $1 WHERE baz = $2",
72 /// &[&bar, &baz],
73 /// )?;
74 ///
75 /// println!("{} rows updated", rows_updated);
76 /// # Ok(())
77 /// # }
78 /// ```
79 pub fn execute<T>(&mut self, query: &T, params: &[&(dyn ToSql + Sync)]) -> Result<u64, Error>
80 where
81 T: ?Sized + ToStatement,
82 {
83 self.connection.block_on(self.client.execute(query, params))
84 }
85
86 /// Executes a statement, returning the resulting rows.
87 ///
88 /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
89 /// provided, 1-indexed.
90 ///
91 /// The `query` argument can either be a `Statement`, or a raw query string. If the same statement will be
92 /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
93 /// with the `prepare` method.
94 ///
95 /// # Examples
96 ///
97 /// ```no_run
98 /// use postgres::{Client, NoTls};
99 ///
100 /// # fn main() -> Result<(), postgres::Error> {
101 /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
102 ///
103 /// let baz = true;
104 /// for row in client.query("SELECT foo FROM bar WHERE baz = $1", &[&baz])? {
105 /// let foo: i32 = row.get("foo");
106 /// println!("foo: {}", foo);
107 /// }
108 /// # Ok(())
109 /// # }
110 /// ```
111 pub fn query<T>(&mut self, query: &T, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Row>, Error>
112 where
113 T: ?Sized + ToStatement,
114 {
115 self.connection.block_on(self.client.query(query, params))
116 }
117
118 /// Executes a statement which returns a single row, returning it.
119 ///
120 /// Returns an error if the query does not return exactly one row.
121 ///
122 /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
123 /// provided, 1-indexed.
124 ///
125 /// The `query` argument can either be a `Statement`, or a raw query string. If the same statement will be
126 /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
127 /// with the `prepare` method.
128 ///
129 /// # Examples
130 ///
131 /// ```no_run
132 /// use postgres::{Client, NoTls};
133 ///
134 /// # fn main() -> Result<(), postgres::Error> {
135 /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
136 ///
137 /// let baz = true;
138 /// let row = client.query_one("SELECT foo FROM bar WHERE baz = $1", &[&baz])?;
139 /// let foo: i32 = row.get("foo");
140 /// println!("foo: {}", foo);
141 /// # Ok(())
142 /// # }
143 /// ```
144 pub fn query_one<T>(&mut self, query: &T, params: &[&(dyn ToSql + Sync)]) -> Result<Row, Error>
145 where
146 T: ?Sized + ToStatement,
147 {
148 self.connection
149 .block_on(self.client.query_one(query, params))
150 }
151
152 /// Executes a statement which returns zero or one rows, returning it.
153 ///
154 /// Returns an error if the query returns more than one row.
155 ///
156 /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
157 /// provided, 1-indexed.
158 ///
159 /// The `query` argument can either be a `Statement`, or a raw query string. If the same statement will be
160 /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
161 /// with the `prepare` method.
162 ///
163 /// # Examples
164 ///
165 /// ```no_run
166 /// use postgres::{Client, NoTls};
167 ///
168 /// # fn main() -> Result<(), postgres::Error> {
169 /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
170 ///
171 /// let baz = true;
172 /// let row = client.query_opt("SELECT foo FROM bar WHERE baz = $1", &[&baz])?;
173 /// match row {
174 /// Some(row) => {
175 /// let foo: i32 = row.get("foo");
176 /// println!("foo: {}", foo);
177 /// }
178 /// None => println!("no matching foo"),
179 /// }
180 /// # Ok(())
181 /// # }
182 /// ```
183 pub fn query_opt<T>(
184 &mut self,
185 query: &T,
186 params: &[&(dyn ToSql + Sync)],
187 ) -> Result<Option<Row>, Error>
188 where
189 T: ?Sized + ToStatement,
190 {
191 self.connection
192 .block_on(self.client.query_opt(query, params))
193 }
194
195 /// A maximally-flexible version of `query`.
196 ///
197 /// It takes an iterator of parameters rather than a slice, and returns an iterator of rows rather than collecting
198 /// them into an array.
199 ///
200 /// # Examples
201 ///
202 /// ```no_run
203 /// use postgres::{Client, NoTls};
204 /// use fallible_iterator::FallibleIterator;
205 /// use std::iter;
206 ///
207 /// # fn main() -> Result<(), postgres::Error> {
208 /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
209 ///
210 /// let baz = true;
211 /// let mut it = client.query_raw("SELECT foo FROM bar WHERE baz = $1", iter::once(baz))?;
212 ///
213 /// while let Some(row) = it.next()? {
214 /// let foo: i32 = row.get("foo");
215 /// println!("foo: {}", foo);
216 /// }
217 /// # Ok(())
218 /// # }
219 /// ```
220 ///
221 /// If you have a type like `Vec<T>` where `T: ToSql` Rust will not know how to use it as params. To get around
222 /// this the type must explicitly be converted to `&dyn ToSql`.
223 ///
224 /// ```no_run
225 /// # use postgres::{Client, NoTls};
226 /// use postgres::types::ToSql;
227 /// use fallible_iterator::FallibleIterator;
228 /// # fn main() -> Result<(), postgres::Error> {
229 /// # let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
230 ///
231 /// let params: Vec<String> = vec![
232 /// "first param".into(),
233 /// "second param".into(),
234 /// ];
235 /// let mut it = client.query_raw(
236 /// "SELECT foo FROM bar WHERE biz = $1 AND baz = $2",
237 /// params,
238 /// )?;
239 ///
240 /// while let Some(row) = it.next()? {
241 /// let foo: i32 = row.get("foo");
242 /// println!("foo: {}", foo);
243 /// }
244 /// # Ok(())
245 /// # }
246 /// ```
247 pub fn query_raw<T, P, I>(&mut self, query: &T, params: I) -> Result<RowIter<'_>, Error>
248 where
249 T: ?Sized + ToStatement,
250 P: BorrowToSql,
251 I: IntoIterator<Item = P>,
252 I::IntoIter: ExactSizeIterator,
253 {
254 let stream = self
255 .connection
256 .block_on(self.client.query_raw(query, params))?;
257 Ok(RowIter::new(self.connection.as_ref(), stream))
258 }
259
260 /// Like `query`, but requires the types of query parameters to be explicitly specified.
261 ///
262 /// Compared to `query`, this method allows performing queries without three round trips (for
263 /// prepare, execute, and close) by requiring the caller to specify parameter values along with
264 /// their Postgres type. Thus, this is suitable in environments where prepared statements aren't
265 /// supported (such as Cloudflare Workers with Hyperdrive).
266 ///
267 /// A statement may contain parameters, specified by `$n`, where `n` is the index of the
268 /// parameter of the list provided, 1-indexed.
269 pub fn query_typed(
270 &mut self,
271 query: &str,
272 params: &[(&(dyn ToSql + Sync), Type)],
273 ) -> Result<Vec<Row>, Error> {
274 self.connection
275 .block_on(self.client.query_typed(query, params))
276 }
277
278 /// The maximally flexible version of [`query_typed`].
279 ///
280 /// Compared to `query`, this method allows performing queries without three round trips (for
281 /// prepare, execute, and close) by requiring the caller to specify parameter values along with
282 /// their Postgres type. Thus, this is suitable in environments where prepared statements aren't
283 /// supported (such as Cloudflare Workers with Hyperdrive).
284 ///
285 /// A statement may contain parameters, specified by `$n`, where `n` is the index of the
286 /// parameter of the list provided, 1-indexed.
287 ///
288 /// [`query_typed`]: #method.query_typed
289 ///
290 /// # Examples
291 /// ```no_run
292 /// # use postgres::{Client, NoTls};
293 /// use postgres::types::{ToSql, Type};
294 /// use fallible_iterator::FallibleIterator;
295 /// # fn main() -> Result<(), postgres::Error> {
296 /// # let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
297 ///
298 /// let params: Vec<(String, Type)> = vec![
299 /// ("first param".into(), Type::TEXT),
300 /// ("second param".into(), Type::TEXT),
301 /// ];
302 /// let mut it = client.query_typed_raw(
303 /// "SELECT foo FROM bar WHERE biz = $1 AND baz = $2",
304 /// params,
305 /// )?;
306 ///
307 /// while let Some(row) = it.next()? {
308 /// let foo: i32 = row.get("foo");
309 /// println!("foo: {}", foo);
310 /// }
311 /// # Ok(())
312 /// # }
313 /// ```
314 pub fn query_typed_raw<P, I>(&mut self, query: &str, params: I) -> Result<RowIter<'_>, Error>
315 where
316 P: BorrowToSql,
317 I: IntoIterator<Item = (P, Type)>,
318 {
319 let stream = self
320 .connection
321 .block_on(self.client.query_typed_raw(query, params))?;
322 Ok(RowIter::new(self.connection.as_ref(), stream))
323 }
324
325 /// Creates a new prepared statement.
326 ///
327 /// Prepared statements can be executed repeatedly, and may contain query parameters (indicated by `$1`, `$2`, etc),
328 /// which are set when executed. Prepared statements can only be used with the connection that created them.
329 ///
330 /// # Examples
331 ///
332 /// ```no_run
333 /// use postgres::{Client, NoTls};
334 ///
335 /// # fn main() -> Result<(), postgres::Error> {
336 /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
337 ///
338 /// let statement = client.prepare("SELECT name FROM people WHERE id = $1")?;
339 ///
340 /// for id in 0..10 {
341 /// let rows = client.query(&statement, &[&id])?;
342 /// let name: &str = rows[0].get(0);
343 /// println!("name: {}", name);
344 /// }
345 /// # Ok(())
346 /// # }
347 /// ```
348 pub fn prepare(&mut self, query: &str) -> Result<Statement, Error> {
349 self.connection.block_on(self.client.prepare(query))
350 }
351
352 /// Like `prepare`, but allows the types of query parameters to be explicitly specified.
353 ///
354 /// The list of types may be smaller than the number of parameters - the types of the remaining parameters will be
355 /// inferred. For example, `client.prepare_typed(query, &[])` is equivalent to `client.prepare(query)`.
356 ///
357 /// # Examples
358 ///
359 /// ```no_run
360 /// use postgres::{Client, NoTls};
361 /// use postgres::types::Type;
362 ///
363 /// # fn main() -> Result<(), postgres::Error> {
364 /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
365 ///
366 /// let statement = client.prepare_typed(
367 /// "SELECT name FROM people WHERE id = $1",
368 /// &[Type::INT8],
369 /// )?;
370 ///
371 /// for id in 0..10 {
372 /// let rows = client.query(&statement, &[&id])?;
373 /// let name: &str = rows[0].get(0);
374 /// println!("name: {}", name);
375 /// }
376 /// # Ok(())
377 /// # }
378 /// ```
379 pub fn prepare_typed(&mut self, query: &str, types: &[Type]) -> Result<Statement, Error> {
380 self.connection
381 .block_on(self.client.prepare_typed(query, types))
382 }
383
384 /// Executes a `COPY FROM STDIN` statement, returning the number of rows created.
385 ///
386 /// The `query` argument can either be a `Statement`, or a raw query string. The data in the provided reader is
387 /// passed along to the server verbatim; it is the caller's responsibility to ensure it uses the proper format.
388 /// PostgreSQL does not support parameters in `COPY` statements, so this method does not take any.
389 ///
390 /// The copy *must* be explicitly completed via the `finish` method. If it is not, the copy will be aborted.
391 ///
392 /// # Examples
393 ///
394 /// ```no_run
395 /// use postgres::{Client, NoTls};
396 /// use std::io::Write;
397 ///
398 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
399 /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
400 ///
401 /// let mut writer = client.copy_in("COPY people FROM stdin")?;
402 /// writer.write_all(b"1\tjohn\n2\tjane\n")?;
403 /// writer.finish()?;
404 /// # Ok(())
405 /// # }
406 /// ```
407 pub fn copy_in<T>(&mut self, query: &T) -> Result<CopyInWriter<'_>, Error>
408 where
409 T: ?Sized + ToStatement,
410 {
411 let sink = self.connection.block_on(self.client.copy_in(query))?;
412 Ok(CopyInWriter::new(self.connection.as_ref(), sink))
413 }
414
415 /// Executes a `COPY TO STDOUT` statement, returning a reader of the resulting data.
416 ///
417 /// The `query` argument can either be a `Statement`, or a raw query string. PostgreSQL does not support parameters
418 /// in `COPY` statements, so this method does not take any.
419 ///
420 /// # Examples
421 ///
422 /// ```no_run
423 /// use postgres::{Client, NoTls};
424 /// use std::io::Read;
425 ///
426 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
427 /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
428 ///
429 /// let mut reader = client.copy_out("COPY people TO stdout")?;
430 /// let mut buf = vec![];
431 /// reader.read_to_end(&mut buf)?;
432 /// # Ok(())
433 /// # }
434 /// ```
435 pub fn copy_out<T>(&mut self, query: &T) -> Result<CopyOutReader<'_>, Error>
436 where
437 T: ?Sized + ToStatement,
438 {
439 let stream = self.connection.block_on(self.client.copy_out(query))?;
440 Ok(CopyOutReader::new(self.connection.as_ref(), stream))
441 }
442
443 /// Executes a sequence of SQL statements using the simple query protocol.
444 ///
445 /// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that
446 /// point. The simple query protocol returns the values in rows as strings rather than in their binary encodings,
447 /// so the associated row type doesn't work with the `FromSql` trait. Rather than simply returning the rows, this
448 /// method returns a sequence of an enum which indicates either the completion of one of the commands, or a row of
449 /// data. This preserves the framing between the separate statements in the request.
450 ///
451 /// This is a simple convenience method over `simple_query_iter`.
452 ///
453 /// # Warning
454 ///
455 /// Prepared statements should be used for any query which contains user-specified data, as they provided the
456 /// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
457 /// them to this method!
458 pub fn simple_query(&mut self, query: &str) -> Result<Vec<SimpleQueryMessage>, Error> {
459 self.connection.block_on(self.client.simple_query(query))
460 }
461
462 /// Validates the connection by performing a simple no-op query.
463 ///
464 /// If the specified timeout is reached before the backend responds, an error will be returned.
465 pub fn is_valid(&mut self, timeout: Duration) -> Result<(), Error> {
466 let inner_client = &self.client;
467 self.connection.block_on(async {
468 let trivial_query = inner_client.simple_query("");
469 tokio::time::timeout(timeout, trivial_query)
470 .await
471 .map_err(|_| Error::__private_api_timeout())?
472 .map(|_| ())
473 })
474 }
475
476 /// Executes a sequence of SQL statements using the simple query protocol.
477 ///
478 /// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that
479 /// point. This is intended for use when, for example, initializing a database schema.
480 ///
481 /// # Warning
482 ///
483 /// Prepared statements should be use for any query which contains user-specified data, as they provided the
484 /// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
485 /// them to this method!
486 pub fn batch_execute(&mut self, query: &str) -> Result<(), Error> {
487 self.connection.block_on(self.client.batch_execute(query))
488 }
489
490 /// Begins a new database transaction.
491 ///
492 /// The transaction will roll back by default - use the `commit` method to commit it.
493 ///
494 /// # Examples
495 ///
496 /// ```no_run
497 /// use postgres::{Client, NoTls};
498 ///
499 /// # fn main() -> Result<(), postgres::Error> {
500 /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
501 ///
502 /// let mut transaction = client.transaction()?;
503 /// transaction.execute("UPDATE foo SET bar = 10", &[])?;
504 /// // ...
505 ///
506 /// transaction.commit()?;
507 /// # Ok(())
508 /// # }
509 /// ```
510 pub fn transaction(&mut self) -> Result<Transaction<'_>, Error> {
511 let transaction = self.connection.block_on(self.client.transaction())?;
512 Ok(Transaction::new(self.connection.as_ref(), transaction))
513 }
514
515 /// Returns a builder for a transaction with custom settings.
516 ///
517 /// Unlike the `transaction` method, the builder can be used to control the transaction's isolation level and other
518 /// attributes.
519 ///
520 /// # Examples
521 ///
522 /// ```no_run
523 /// use postgres::{Client, IsolationLevel, NoTls};
524 ///
525 /// # fn main() -> Result<(), postgres::Error> {
526 /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
527 ///
528 /// let mut transaction = client.build_transaction()
529 /// .isolation_level(IsolationLevel::RepeatableRead)
530 /// .start()?;
531 /// transaction.execute("UPDATE foo SET bar = 10", &[])?;
532 /// // ...
533 ///
534 /// transaction.commit()?;
535 /// # Ok(())
536 /// # }
537 /// ```
538 pub fn build_transaction(&mut self) -> TransactionBuilder<'_> {
539 TransactionBuilder::new(self.connection.as_ref(), self.client.build_transaction())
540 }
541
542 /// Returns a structure providing access to asynchronous notifications.
543 ///
544 /// Use the `LISTEN` command to register this connection for notifications.
545 pub fn notifications(&mut self) -> Notifications<'_> {
546 Notifications::new(self.connection.as_ref())
547 }
548
549 /// Constructs a cancellation token that can later be used to request cancellation of a query running on this
550 /// connection.
551 ///
552 /// # Examples
553 ///
554 /// ```no_run
555 /// use postgres::{Client, NoTls};
556 /// use postgres::error::SqlState;
557 /// use std::thread;
558 /// use std::time::Duration;
559 ///
560 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
561 /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
562 ///
563 /// let cancel_token = client.cancel_token();
564 ///
565 /// thread::spawn(move || {
566 /// // Abort the query after 5s.
567 /// thread::sleep(Duration::from_secs(5));
568 /// let _ = cancel_token.cancel_query(NoTls);
569 /// });
570 ///
571 /// match client.simple_query("SELECT long_running_query()") {
572 /// Err(e) if e.code() == Some(&SqlState::QUERY_CANCELED) => {
573 /// // Handle canceled query.
574 /// }
575 /// Err(err) => return Err(err.into()),
576 /// Ok(rows) => {
577 /// // ...
578 /// }
579 /// }
580 /// // ...
581 ///
582 /// # Ok(())
583 /// # }
584 /// ```
585 pub fn cancel_token(&self) -> CancelToken {
586 CancelToken::new(self.client.cancel_token())
587 }
588
589 /// Clears the client's type information cache.
590 ///
591 /// When user-defined types are used in a query, the client loads their definitions from the database and caches
592 /// them for the lifetime of the client. If those definitions are changed in the database, this method can be used
593 /// to flush the local cache and allow the new, updated definitions to be loaded.
594 pub fn clear_type_cache(&self) {
595 self.client.clear_type_cache();
596 }
597
598 /// Determines if the client's connection has already closed.
599 ///
600 /// If this returns `true`, the client is no longer usable.
601 pub fn is_closed(&self) -> bool {
602 self.client.is_closed()
603 }
604
605 /// Closes the client's connection to the server.
606 ///
607 /// This is equivalent to `Client`'s `Drop` implementation, except that it returns any error encountered to the
608 /// caller.
609 pub fn close(mut self) -> Result<(), Error> {
610 self.close_inner()
611 }
612
613 fn close_inner(&mut self) -> Result<(), Error> {
614 self.client.__private_api_close();
615
616 self.connection.poll_block_on(|_, _, done| {
617 if done {
618 Poll::Ready(Ok(()))
619 } else {
620 Poll::Pending
621 }
622 })
623 }
624}