yb_tokio_postgres/lib.rs
1//! An asynchronous, pipelined, PostgreSQL client.
2//!
3//! # Example
4//!
5//! ```no_run
6//! use yb_tokio_postgres::{NoTls, Error};
7//!
8//! # #[cfg(not(feature = "runtime"))] fn main() {}
9//! # #[cfg(feature = "runtime")]
10//! #[tokio::main] // By default, yb_tokio_postgres uses the tokio crate as its runtime.
11//! async fn main() -> Result<(), Error> {
12//! // Connect to the database.
13//! let (client, connection) =
14//! yb_tokio_postgres::connect("host=localhost user=postgres", NoTls).await?;
15//!
16//! // The connection object performs the actual communication with the database,
17//! // so spawn it off to run on its own.
18//! tokio::spawn(async move {
19//! if let Err(e) = connection.await {
20//! eprintln!("connection error: {}", e);
21//! }
22//! });
23//!
24//! // Now we can execute a simple statement that just returns its parameter.
25//! let rows = client
26//! .query("SELECT $1::TEXT", &[&"hello world"])
27//! .await?;
28//!
29//! // And then check that we got back the same string we sent over.
30//! let value: &str = rows[0].get(0);
31//! assert_eq!(value, "hello world");
32//!
33//! Ok(())
34//! }
35//! ```
36//!
37//! # Behavior
38//!
39//! Calling a method like `Client::query` on its own does nothing. The associated request is not sent to the database
40//! until the future returned by the method is first polled. Requests are executed in the order that they are first
41//! polled, not in the order that their futures are created.
42//!
43//! # Pipelining
44//!
45//! The client supports *pipelined* requests. Pipelining can improve performance in use cases in which multiple,
46//! independent queries need to be executed. In a traditional workflow, each query is sent to the server after the
47//! previous query completes. In contrast, pipelining allows the client to send all of the queries to the server up
48//! front, minimizing time spent by one side waiting for the other to finish sending data:
49//!
50//! ```not_rust
51//! Sequential Pipelined
52//! | Client | Server | | Client | Server |
53//! |----------------|-----------------| |----------------|-----------------|
54//! | send query 1 | | | send query 1 | |
55//! | | process query 1 | | send query 2 | process query 1 |
56//! | receive rows 1 | | | send query 3 | process query 2 |
57//! | send query 2 | | | receive rows 1 | process query 3 |
58//! | | process query 2 | | receive rows 2 | |
59//! | receive rows 2 | | | receive rows 3 | |
60//! | send query 3 | |
61//! | | process query 3 |
62//! | receive rows 3 | |
63//! ```
64//!
65//! In both cases, the PostgreSQL server is executing the queries sequentially - pipelining just allows both sides of
66//! the connection to work concurrently when possible.
67//!
68//! Pipelining happens automatically when futures are polled concurrently (for example, by using the futures `join`
69//! combinator):
70//!
71//! ```rust
72//! use futures_util::future;
73//! use std::future::Future;
74//! use yb_tokio_postgres::{Client, Error, Statement};
75//!
76//! async fn pipelined_prepare(
77//! client: &Client,
78//! ) -> Result<(Statement, Statement), Error>
79//! {
80//! future::try_join(
81//! client.prepare("SELECT * FROM foo"),
82//! client.prepare("INSERT INTO bar (id, name) VALUES ($1, $2)")
83//! ).await
84//! }
85//! ```
86//!
87//! # Runtime
88//!
89//! The client works with arbitrary `AsyncRead + AsyncWrite` streams. Convenience APIs are provided to handle the
90//! connection process, but these are gated by the `runtime` Cargo feature, which is enabled by default. If disabled,
91//! all dependence on the tokio runtime is removed.
92//!
93//! # SSL/TLS support
94//!
95//! TLS support is implemented via external libraries. `Client::connect` and `Config::connect` take a TLS implementation
96//! as an argument. The `NoTls` type in this crate can be used when TLS is not required. Otherwise, the
97//! `postgres-openssl` and `postgres-native-tls` crates provide implementations backed by the `openssl` and `native-tls`
98//! crates, respectively.
99//!
100//! # Features
101//!
102//! The following features can be enabled from `Cargo.toml`:
103//!
104//! | Feature | Description | Extra dependencies | Default |
105//! | ------- | ----------- | ------------------ | ------- |
106//! | `runtime` | Enable convenience API for the connection process based on the `tokio` crate. | [tokio](https://crates.io/crates/tokio) 1.0 with the features `net` and `time` | yes |
107//! | `array-impls` | Enables `ToSql` and `FromSql` trait impls for arrays | - | no |
108//! | `with-bit-vec-0_6` | Enable support for the `bit-vec` crate. | [bit-vec](https://crates.io/crates/bit-vec) 0.6 | no |
109//! | `with-chrono-0_4` | Enable support for the `chrono` crate. | [chrono](https://crates.io/crates/chrono) 0.4 | no |
110//! | `with-eui48-0_4` | Enable support for the 0.4 version of the `eui48` crate. This is deprecated and will be removed. | [eui48](https://crates.io/crates/eui48) 0.4 | no |
111//! | `with-eui48-1` | Enable support for the 1.0 version of the `eui48` crate. | [eui48](https://crates.io/crates/eui48) 1.0 | no |
112//! | `with-geo-types-0_6` | Enable support for the 0.6 version of the `geo-types` crate. | [geo-types](https://crates.io/crates/geo-types/0.6.0) 0.6 | no |
113//! | `with-geo-types-0_7` | Enable support for the 0.7 version of the `geo-types` crate. | [geo-types](https://crates.io/crates/geo-types/0.7.0) 0.7 | no |
114//! | `with-serde_json-1` | Enable support for the `serde_json` crate. | [serde_json](https://crates.io/crates/serde_json) 1.0 | no |
115//! | `with-uuid-0_8` | Enable support for the `uuid` crate. | [uuid](https://crates.io/crates/uuid) 0.8 | no |
116//! | `with-uuid-1` | Enable support for the `uuid` crate. | [uuid](https://crates.io/crates/uuid) 1.0 | no |
117//! | `with-time-0_2` | Enable support for the 0.2 version of the `time` crate. | [time](https://crates.io/crates/time/0.2.0) 0.2 | no |
118//! | `with-time-0_3` | Enable support for the 0.3 version of the `time` crate. | [time](https://crates.io/crates/time/0.3.0) 0.3 | no |
119#![warn(rust_2018_idioms, clippy::all, missing_docs)]
120
121use config::Host;
122use connect::decrease_connection_count;
123use log::info;
124
125pub use crate::cancel_token::CancelToken;
126pub use crate::client::Client;
127pub use crate::config::Config;
128pub use crate::connection::Connection;
129pub use crate::copy_in::CopyInSink;
130pub use crate::copy_out::CopyOutStream;
131use crate::error::DbError;
132pub use crate::error::Error;
133pub use crate::generic_client::GenericClient;
134pub use crate::portal::Portal;
135pub use crate::query::RowStream;
136pub use crate::row::{Row, SimpleQueryRow};
137pub use crate::simple_query::SimpleQueryStream;
138#[cfg(feature = "runtime")]
139pub use crate::socket::Socket;
140pub use crate::statement::{Column, Statement};
141#[cfg(feature = "runtime")]
142use crate::tls::MakeTlsConnect;
143pub use crate::tls::NoTls;
144pub use crate::to_statement::ToStatement;
145pub use crate::transaction::Transaction;
146pub use crate::transaction_builder::{IsolationLevel, TransactionBuilder};
147use crate::types::ToSql;
148
149pub mod binary_copy;
150mod bind;
151#[cfg(feature = "runtime")]
152mod cancel_query;
153mod cancel_query_raw;
154mod cancel_token;
155mod client;
156mod codec;
157pub mod config;
158#[cfg(feature = "runtime")]
159mod connect;
160mod connect_raw;
161#[cfg(feature = "runtime")]
162mod connect_socket;
163mod connect_tls;
164mod connection;
165mod copy_in;
166mod copy_out;
167pub mod error;
168mod generic_client;
169#[cfg(not(target_arch = "wasm32"))]
170mod keepalive;
171mod maybe_tls_stream;
172mod portal;
173mod prepare;
174mod query;
175pub mod row;
176mod simple_query;
177#[cfg(feature = "runtime")]
178mod socket;
179mod statement;
180pub mod tls;
181mod to_statement;
182mod transaction;
183mod transaction_builder;
184pub mod types;
185
186/// A convenience function which parses a connection string and connects to the database.
187///
188/// See the documentation for [`Config`] for details on the connection string format.
189///
190/// Requires the `runtime` Cargo feature (enabled by default).
191///
192/// [`Config`]: config/struct.Config.html
193#[cfg(feature = "runtime")]
194pub async fn connect<T>(
195 config: &str,
196 tls: T,
197) -> Result<(Client, Connection<Socket, T::Stream>), Error>
198where
199 T: MakeTlsConnect<Socket>,
200{
201 let config = config.parse::<Config>()?;
202 config.connect(tls).await
203}
204
205///close connection to server
206pub fn close(client: &Client) {
207 let socket_config = client.get_socket_config();
208 if socket_config.is_some() {
209 let host = socket_config.unwrap().hostname;
210 info!("closing one connection to {:?}", host);
211 decrease_connection_count(Host::Tcp(host.unwrap()));
212 }
213}
214
215/// An asynchronous notification.
216#[derive(Clone, Debug)]
217pub struct Notification {
218 process_id: i32,
219 channel: String,
220 payload: String,
221}
222
223impl Notification {
224 /// The process ID of the notifying backend process.
225 pub fn process_id(&self) -> i32 {
226 self.process_id
227 }
228
229 /// The name of the channel that the notify has been raised on.
230 pub fn channel(&self) -> &str {
231 &self.channel
232 }
233
234 /// The "payload" string passed from the notifying process.
235 pub fn payload(&self) -> &str {
236 &self.payload
237 }
238}
239
240/// An asynchronous message from the server.
241#[allow(clippy::large_enum_variant)]
242#[derive(Debug, Clone)]
243#[non_exhaustive]
244pub enum AsyncMessage {
245 /// A notice.
246 ///
247 /// Notices use the same format as errors, but aren't "errors" per-se.
248 Notice(DbError),
249 /// A notification.
250 ///
251 /// Connections can subscribe to notifications with the `LISTEN` command.
252 Notification(Notification),
253}
254
255/// Message returned by the `SimpleQuery` stream.
256#[derive(Debug)]
257#[non_exhaustive]
258pub enum SimpleQueryMessage {
259 /// A row of data.
260 Row(SimpleQueryRow),
261 /// A statement in the query has completed.
262 ///
263 /// The number of rows modified or selected is returned.
264 CommandComplete(u64),
265}
266
267fn slice_iter<'a>(
268 s: &'a [&'a (dyn ToSql + Sync)],
269) -> impl ExactSizeIterator<Item = &'a dyn ToSql> + 'a {
270 s.iter().map(|s| *s as _)
271}