yb_postgres/client.rs
1use crate::connection::Connection;
2use crate::{
3 CancelToken, Config, CopyInWriter, CopyOutReader, Notifications, RowIter, Statement,
4 ToStatement, Transaction, TransactionBuilder,
5};
6use std::task::Poll;
7use std::time::Duration;
8use yb_tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
9use yb_tokio_postgres::types::{BorrowToSql, ToSql, Type};
10use yb_tokio_postgres::{Error, Row, SimpleQueryMessage, Socket, close};
11
12/// A synchronous PostgreSQL client.
13pub struct Client {
14 connection: Connection,
15 client: yb_tokio_postgres::Client,
16}
17
18impl Drop for Client {
19 fn drop(&mut self) {
20 close(&self.client);
21 let _ = self.close_inner();
22 }
23}
24
25impl Client {
26 pub(crate) fn new(connection: Connection, client: yb_tokio_postgres::Client) -> Client {
27 Client { connection, client }
28 }
29
30 /// A convenience function which parses a configuration string into a `Config` and then connects to the database.
31 ///
32 /// See the documentation for [`Config`] for information about the connection syntax.
33 ///
34 /// [`Config`]: config/struct.Config.html
35 pub fn connect<T>(params: &str, tls_mode: T) -> Result<Client, Error>
36 where
37 T: MakeTlsConnect<Socket> + 'static + Send,
38 T::TlsConnect: Send,
39 T::Stream: Send,
40 <T::TlsConnect as TlsConnect<Socket>>::Future: Send,
41 {
42 params.parse::<Config>()?.connect(tls_mode)
43 }
44
45 /// Returns a new `Config` object which can be used to configure and connect to a database.
46 pub fn configure() -> Config {
47 Config::new()
48 }
49
50 /// Executes a statement, returning the number of rows modified.
51 ///
52 /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
53 /// provided, 1-indexed.
54 ///
55 /// If the statement does not modify any rows (e.g. `SELECT`), 0 is returned.
56 ///
57 /// The `query` argument can either be a `Statement`, or a raw query string. If the same statement will be
58 /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
59 /// with the `prepare` method.
60 ///
61 /// # Example
62 ///
63 /// ```no_run
64 /// use yb_postgres::{Client, NoTls};
65 ///
66 /// # fn main() -> Result<(), yb_postgres::Error> {
67 /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
68 ///
69 /// let bar = 1i32;
70 /// let baz = true;
71 /// let rows_updated = client.execute(
72 /// "UPDATE foo SET bar = $1 WHERE baz = $2",
73 /// &[&bar, &baz],
74 /// )?;
75 ///
76 /// println!("{} rows updated", rows_updated);
77 /// # Ok(())
78 /// # }
79 /// ```
80 pub fn execute<T>(&mut self, query: &T, params: &[&(dyn ToSql + Sync)]) -> Result<u64, Error>
81 where
82 T: ?Sized + ToStatement,
83 {
84 self.connection.block_on(self.client.execute(query, params))
85 }
86
87 /// Executes a statement, returning the resulting rows.
88 ///
89 /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
90 /// provided, 1-indexed.
91 ///
92 /// The `query` argument can either be a `Statement`, or a raw query string. If the same statement will be
93 /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
94 /// with the `prepare` method.
95 ///
96 /// # Examples
97 ///
98 /// ```no_run
99 /// use yb_postgres::{Client, NoTls};
100 ///
101 /// # fn main() -> Result<(), yb_postgres::Error> {
102 /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
103 ///
104 /// let baz = true;
105 /// for row in client.query("SELECT foo FROM bar WHERE baz = $1", &[&baz])? {
106 /// let foo: i32 = row.get("foo");
107 /// println!("foo: {}", foo);
108 /// }
109 /// # Ok(())
110 /// # }
111 /// ```
112 pub fn query<T>(&mut self, query: &T, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Row>, Error>
113 where
114 T: ?Sized + ToStatement,
115 {
116 self.connection.block_on(self.client.query(query, params))
117 }
118
119 /// Executes a statement which returns a single row, returning it.
120 ///
121 /// Returns an error if the query does not return exactly one row.
122 ///
123 /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
124 /// provided, 1-indexed.
125 ///
126 /// The `query` argument can either be a `Statement`, or a raw query string. If the same statement will be
127 /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
128 /// with the `prepare` method.
129 ///
130 /// # Examples
131 ///
132 /// ```no_run
133 /// use yb_postgres::{Client, NoTls};
134 ///
135 /// # fn main() -> Result<(), yb_postgres::Error> {
136 /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
137 ///
138 /// let baz = true;
139 /// let row = client.query_one("SELECT foo FROM bar WHERE baz = $1", &[&baz])?;
140 /// let foo: i32 = row.get("foo");
141 /// println!("foo: {}", foo);
142 /// # Ok(())
143 /// # }
144 /// ```
145 pub fn query_one<T>(&mut self, query: &T, params: &[&(dyn ToSql + Sync)]) -> Result<Row, Error>
146 where
147 T: ?Sized + ToStatement,
148 {
149 self.connection
150 .block_on(self.client.query_one(query, params))
151 }
152
153 /// Executes a statement which returns zero or one rows, returning it.
154 ///
155 /// Returns an error if the query returns more than one row.
156 ///
157 /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
158 /// provided, 1-indexed.
159 ///
160 /// The `query` argument can either be a `Statement`, or a raw query string. If the same statement will be
161 /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
162 /// with the `prepare` method.
163 ///
164 /// # Examples
165 ///
166 /// ```no_run
167 /// use yb_postgres::{Client, NoTls};
168 ///
169 /// # fn main() -> Result<(), yb_postgres::Error> {
170 /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
171 ///
172 /// let baz = true;
173 /// let row = client.query_opt("SELECT foo FROM bar WHERE baz = $1", &[&baz])?;
174 /// match row {
175 /// Some(row) => {
176 /// let foo: i32 = row.get("foo");
177 /// println!("foo: {}", foo);
178 /// }
179 /// None => println!("no matching foo"),
180 /// }
181 /// # Ok(())
182 /// # }
183 /// ```
184 pub fn query_opt<T>(
185 &mut self,
186 query: &T,
187 params: &[&(dyn ToSql + Sync)],
188 ) -> Result<Option<Row>, Error>
189 where
190 T: ?Sized + ToStatement,
191 {
192 self.connection
193 .block_on(self.client.query_opt(query, params))
194 }
195
196 /// A maximally-flexible version of `query`.
197 ///
198 /// It takes an iterator of parameters rather than a slice, and returns an iterator of rows rather than collecting
199 /// them into an array.
200 ///
201 /// # Examples
202 ///
203 /// ```no_run
204 /// use yb_postgres::{Client, NoTls};
205 /// use fallible_iterator::FallibleIterator;
206 /// use std::iter;
207 ///
208 /// # fn main() -> Result<(), yb_postgres::Error> {
209 /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
210 ///
211 /// let baz = true;
212 /// let mut it = client.query_raw("SELECT foo FROM bar WHERE baz = $1", iter::once(baz))?;
213 ///
214 /// while let Some(row) = it.next()? {
215 /// let foo: i32 = row.get("foo");
216 /// println!("foo: {}", foo);
217 /// }
218 /// # Ok(())
219 /// # }
220 /// ```
221 ///
222 /// If you have a type like `Vec<T>` where `T: ToSql` Rust will not know how to use it as params. To get around
223 /// this the type must explicitly be converted to `&dyn ToSql`.
224 ///
225 /// ```no_run
226 /// # use yb_postgres::{Client, NoTls};
227 /// use yb_postgres::types::ToSql;
228 /// use fallible_iterator::FallibleIterator;
229 /// # fn main() -> Result<(), yb_postgres::Error> {
230 /// # let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
231 ///
232 /// let params: Vec<String> = vec![
233 /// "first param".into(),
234 /// "second param".into(),
235 /// ];
236 /// let mut it = client.query_raw(
237 /// "SELECT foo FROM bar WHERE biz = $1 AND baz = $2",
238 /// params,
239 /// )?;
240 ///
241 /// while let Some(row) = it.next()? {
242 /// let foo: i32 = row.get("foo");
243 /// println!("foo: {}", foo);
244 /// }
245 /// # Ok(())
246 /// # }
247 /// ```
248 pub fn query_raw<T, P, I>(&mut self, query: &T, params: I) -> Result<RowIter<'_>, Error>
249 where
250 T: ?Sized + ToStatement,
251 P: BorrowToSql,
252 I: IntoIterator<Item = P>,
253 I::IntoIter: ExactSizeIterator,
254 {
255 let stream = self
256 .connection
257 .block_on(self.client.query_raw(query, params))?;
258 Ok(RowIter::new(self.connection.as_ref(), stream))
259 }
260
261 /// Creates a new prepared statement.
262 ///
263 /// Prepared statements can be executed repeatedly, and may contain query parameters (indicated by `$1`, `$2`, etc),
264 /// which are set when executed. Prepared statements can only be used with the connection that created them.
265 ///
266 /// # Examples
267 ///
268 /// ```no_run
269 /// use yb_postgres::{Client, NoTls};
270 ///
271 /// # fn main() -> Result<(), yb_postgres::Error> {
272 /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
273 ///
274 /// let statement = client.prepare("SELECT name FROM people WHERE id = $1")?;
275 ///
276 /// for id in 0..10 {
277 /// let rows = client.query(&statement, &[&id])?;
278 /// let name: &str = rows[0].get(0);
279 /// println!("name: {}", name);
280 /// }
281 /// # Ok(())
282 /// # }
283 /// ```
284 pub fn prepare(&mut self, query: &str) -> Result<Statement, Error> {
285 self.connection.block_on(self.client.prepare(query))
286 }
287
288 /// Like `prepare`, but allows the types of query parameters to be explicitly specified.
289 ///
290 /// The list of types may be smaller than the number of parameters - the types of the remaining parameters will be
291 /// inferred. For example, `client.prepare_typed(query, &[])` is equivalent to `client.prepare(query)`.
292 ///
293 /// # Examples
294 ///
295 /// ```no_run
296 /// use yb_postgres::{Client, NoTls};
297 /// use yb_postgres::types::Type;
298 ///
299 /// # fn main() -> Result<(), yb_postgres::Error> {
300 /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
301 ///
302 /// let statement = client.prepare_typed(
303 /// "SELECT name FROM people WHERE id = $1",
304 /// &[Type::INT8],
305 /// )?;
306 ///
307 /// for id in 0..10 {
308 /// let rows = client.query(&statement, &[&id])?;
309 /// let name: &str = rows[0].get(0);
310 /// println!("name: {}", name);
311 /// }
312 /// # Ok(())
313 /// # }
314 /// ```
315 pub fn prepare_typed(&mut self, query: &str, types: &[Type]) -> Result<Statement, Error> {
316 self.connection
317 .block_on(self.client.prepare_typed(query, types))
318 }
319
320 /// Executes a `COPY FROM STDIN` statement, returning the number of rows created.
321 ///
322 /// The `query` argument can either be a `Statement`, or a raw query string. The data in the provided reader is
323 /// passed along to the server verbatim; it is the caller's responsibility to ensure it uses the proper format.
324 /// PostgreSQL does not support parameters in `COPY` statements, so this method does not take any.
325 ///
326 /// The copy *must* be explicitly completed via the `finish` method. If it is not, the copy will be aborted.
327 ///
328 /// # Examples
329 ///
330 /// ```no_run
331 /// use yb_postgres::{Client, NoTls};
332 /// use std::io::Write;
333 ///
334 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
335 /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
336 ///
337 /// let mut writer = client.copy_in("COPY people FROM stdin")?;
338 /// writer.write_all(b"1\tjohn\n2\tjane\n")?;
339 /// writer.finish()?;
340 /// # Ok(())
341 /// # }
342 /// ```
343 pub fn copy_in<T>(&mut self, query: &T) -> Result<CopyInWriter<'_>, Error>
344 where
345 T: ?Sized + ToStatement,
346 {
347 let sink = self.connection.block_on(self.client.copy_in(query))?;
348 Ok(CopyInWriter::new(self.connection.as_ref(), sink))
349 }
350
351 /// Executes a `COPY TO STDOUT` statement, returning a reader of the resulting data.
352 ///
353 /// The `query` argument can either be a `Statement`, or a raw query string. PostgreSQL does not support parameters
354 /// in `COPY` statements, so this method does not take any.
355 ///
356 /// # Examples
357 ///
358 /// ```no_run
359 /// use yb_postgres::{Client, NoTls};
360 /// use std::io::Read;
361 ///
362 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
363 /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
364 ///
365 /// let mut reader = client.copy_out("COPY people TO stdout")?;
366 /// let mut buf = vec![];
367 /// reader.read_to_end(&mut buf)?;
368 /// # Ok(())
369 /// # }
370 /// ```
371 pub fn copy_out<T>(&mut self, query: &T) -> Result<CopyOutReader<'_>, Error>
372 where
373 T: ?Sized + ToStatement,
374 {
375 let stream = self.connection.block_on(self.client.copy_out(query))?;
376 Ok(CopyOutReader::new(self.connection.as_ref(), stream))
377 }
378
379 /// Executes a sequence of SQL statements using the simple query protocol.
380 ///
381 /// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that
382 /// point. The simple query protocol returns the values in rows as strings rather than in their binary encodings,
383 /// so the associated row type doesn't work with the `FromSql` trait. Rather than simply returning the rows, this
384 /// method returns a sequence of an enum which indicates either the completion of one of the commands, or a row of
385 /// data. This preserves the framing between the separate statements in the request.
386 ///
387 /// This is a simple convenience method over `simple_query_iter`.
388 ///
389 /// # Warning
390 ///
391 /// Prepared statements should be used for any query which contains user-specified data, as they provided the
392 /// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
393 /// them to this method!
394 pub fn simple_query(&mut self, query: &str) -> Result<Vec<SimpleQueryMessage>, Error> {
395 self.connection.block_on(self.client.simple_query(query))
396 }
397
398 /// Validates the connection by performing a simple no-op query.
399 ///
400 /// If the specified timeout is reached before the backend responds, an error will be returned.
401 pub fn is_valid(&mut self, timeout: Duration) -> Result<(), Error> {
402 let inner_client = &self.client;
403 self.connection.block_on(async {
404 let trivial_query = inner_client.simple_query("");
405 tokio::time::timeout(timeout, trivial_query)
406 .await
407 .map_err(|_| Error::__private_api_timeout())?
408 .map(|_| ())
409 })
410 }
411
412 /// Executes a sequence of SQL statements using the simple query protocol.
413 ///
414 /// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that
415 /// point. This is intended for use when, for example, initializing a database schema.
416 ///
417 /// # Warning
418 ///
419 /// Prepared statements should be use for any query which contains user-specified data, as they provided the
420 /// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
421 /// them to this method!
422 pub fn batch_execute(&mut self, query: &str) -> Result<(), Error> {
423 self.connection.block_on(self.client.batch_execute(query))
424 }
425
426 /// Begins a new database transaction.
427 ///
428 /// The transaction will roll back by default - use the `commit` method to commit it.
429 ///
430 /// # Examples
431 ///
432 /// ```no_run
433 /// use yb_postgres::{Client, NoTls};
434 ///
435 /// # fn main() -> Result<(), postgres::Error> {
436 /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
437 ///
438 /// let mut transaction = client.transaction()?;
439 /// transaction.execute("UPDATE foo SET bar = 10", &[])?;
440 /// // ...
441 ///
442 /// transaction.commit()?;
443 /// # Ok(())
444 /// # }
445 /// ```
446 pub fn transaction(&mut self) -> Result<Transaction<'_>, Error> {
447 let transaction = self.connection.block_on(self.client.transaction())?;
448 Ok(Transaction::new(self.connection.as_ref(), transaction))
449 }
450
451 /// Returns a builder for a transaction with custom settings.
452 ///
453 /// Unlike the `transaction` method, the builder can be used to control the transaction's isolation level and other
454 /// attributes.
455 ///
456 /// # Examples
457 ///
458 /// ```no_run
459 /// use yb_postgres::{Client, IsolationLevel, NoTls};
460 ///
461 /// # fn main() -> Result<(), yb_postgres::Error> {
462 /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
463 ///
464 /// let mut transaction = client.build_transaction()
465 /// .isolation_level(IsolationLevel::RepeatableRead)
466 /// .start()?;
467 /// transaction.execute("UPDATE foo SET bar = 10", &[])?;
468 /// // ...
469 ///
470 /// transaction.commit()?;
471 /// # Ok(())
472 /// # }
473 /// ```
474 pub fn build_transaction(&mut self) -> TransactionBuilder<'_> {
475 TransactionBuilder::new(self.connection.as_ref(), self.client.build_transaction())
476 }
477
478 /// Returns a structure providing access to asynchronous notifications.
479 ///
480 /// Use the `LISTEN` command to register this connection for notifications.
481 pub fn notifications(&mut self) -> Notifications<'_> {
482 Notifications::new(self.connection.as_ref())
483 }
484
485 /// Constructs a cancellation token that can later be used to request cancellation of a query running on this
486 /// connection.
487 ///
488 /// # Examples
489 ///
490 /// ```no_run
491 /// use yb_postgres::{Client, NoTls};
492 /// use yb_postgres::error::SqlState;
493 /// use std::thread;
494 /// use std::time::Duration;
495 ///
496 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
497 /// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
498 ///
499 /// let cancel_token = client.cancel_token();
500 ///
501 /// thread::spawn(move || {
502 /// // Abort the query after 5s.
503 /// thread::sleep(Duration::from_secs(5));
504 /// let _ = cancel_token.cancel_query(NoTls);
505 /// });
506 ///
507 /// match client.simple_query("SELECT long_running_query()") {
508 /// Err(e) if e.code() == Some(&SqlState::QUERY_CANCELED) => {
509 /// // Handle canceled query.
510 /// }
511 /// Err(err) => return Err(err.into()),
512 /// Ok(rows) => {
513 /// // ...
514 /// }
515 /// }
516 /// // ...
517 ///
518 /// # Ok(())
519 /// # }
520 /// ```
521 pub fn cancel_token(&self) -> CancelToken {
522 CancelToken::new(self.client.cancel_token())
523 }
524
525 /// Clears the client's type information cache.
526 ///
527 /// When user-defined types are used in a query, the client loads their definitions from the database and caches
528 /// them for the lifetime of the client. If those definitions are changed in the database, this method can be used
529 /// to flush the local cache and allow the new, updated definitions to be loaded.
530 pub fn clear_type_cache(&self) {
531 self.client.clear_type_cache();
532 }
533
534 /// Determines if the client's connection has already closed.
535 ///
536 /// If this returns `true`, the client is no longer usable.
537 pub fn is_closed(&self) -> bool {
538 self.client.is_closed()
539 }
540
541 /// Closes the client's connection to the server.
542 ///
543 /// This is equivalent to `Client`'s `Drop` implementation, except that it returns any error encountered to the
544 /// caller.
545 pub fn close(mut self) -> Result<(), Error> {
546 self.close_inner()
547 }
548
549 fn close_inner(&mut self) -> Result<(), Error> {
550 self.client.__private_api_close();
551
552 self.connection.poll_block_on(|_, _, done| {
553 if done {
554 Poll::Ready(Ok(()))
555 } else {
556 Poll::Pending
557 }
558 })
559 }
560}