xitca_postgres/lib.rs
1#![doc = include_str!("../README.md")]
2#![forbid(unsafe_code)]
3#![cfg_attr(any(feature = "io-uring", feature = "compio"), feature(async_iterator, gen_blocks))]
4#![cfg_attr(feature = "nightly", feature(impl_trait_in_assoc_type))]
5
6mod cancel;
7mod client;
8mod column;
9mod config;
10mod driver;
11mod execute;
12mod from_sql;
13mod prepare;
14mod protocol;
15mod query;
16mod session;
17
18pub mod copy;
19pub mod error;
20pub mod iter;
21pub mod pool;
22pub mod row;
23pub mod statement;
24pub mod transaction;
25pub mod types;
26
27#[cfg(feature = "quic")]
28pub mod proxy;
29#[cfg(feature = "quic")]
30pub use driver::quic::QuicStream;
31
32#[cfg(feature = "compio")]
33pub use driver::compio::CompIoDriver;
34
35pub use self::{
36 client::Client,
37 column::Column,
38 config::Config,
39 driver::Driver,
40 error::Error,
41 execute::{Execute, ExecuteBlocking},
42 from_sql::FromSqlExt,
43 query::{RowSimpleStream, RowSimpleStreamOwned, RowStream, RowStreamOwned},
44 session::Session,
45 statement::Statement,
46};
47
48#[cfg(feature = "compat")]
49pub mod compat {
50 //! compatibility mod to work with [`futures::Stream`] trait.
51 //!
52 //! by default this crate uses an async lending iterator to enable zero copy database row handling when possible.
53 //! but this approach can be difficult to hook into existing libraries and crates. In case a traditional async iterator
54 //! is needed this module offers types as adapters.
55 //!
56 //! # Examples
57 //! ```
58 //! # use xitca_postgres::{Client, Error, Execute, Statement};
59 //! # async fn convert(client: Client) -> Result<(), Error> {
60 //! // prepare a statement and query for rows.
61 //! let stmt = Statement::named("SELECT * from users", &[]).execute(&client).await?;
62 //! let mut stream = stmt.query(&client).await?;
63 //!
64 //! // assuming we want to spawn a tokio async task and handle the stream inside it.
65 //! // but code below would not work as stream is a borrowed type with lending iterator implements.
66 //! // tokio::spawn(async move {
67 //! // let stream = stream;
68 //! // })
69 //!
70 //! // in order to remove lifetime constraint we can do following:
71 //!
72 //! // convert borrowed stream to owned stream where lifetime constraints are lifted.
73 //! let mut stream = xitca_postgres::RowStreamOwned::from(stream);
74 //!
75 //! // spawn async task and move stream into it.
76 //! tokio::spawn(async move {
77 //! // use async iterator to handle rows.
78 //! use futures::stream::TryStreamExt;
79 //! while let Some(row) = stream.try_next().await? {
80 //! // handle row
81 //! }
82 //! Ok::<_, Error>(())
83 //! });
84 //! # Ok(())
85 //! # }
86 //! ```
87 //!
88 //! [`futures::Stream`]: futures_core::stream::Stream
89}
90
91pub mod dev {
92 //! traits for extending functionalities through external crate
93
94 pub use crate::client::{ClientBorrow, ClientBorrowMut};
95 pub use crate::copy::r#Copy;
96 pub use crate::driver::codec::{Response, encode::Encode, response::IntoResponse};
97}
98
99use core::{future::Future, pin::Pin, sync::atomic::AtomicUsize};
100
101use xitca_io::io::AsyncIo;
102
103static NEXT_ID: AtomicUsize = AtomicUsize::new(0);
104
105#[derive(Debug)]
106pub struct Postgres {
107 cfg: Result<Config, Error>,
108}
109
110impl Postgres {
111 pub fn new<C>(cfg: C) -> Self
112 where
113 Config: TryFrom<C>,
114 Error: From<<Config as TryFrom<C>>::Error>,
115 {
116 Self {
117 cfg: Config::try_from(cfg).map_err(Into::into),
118 }
119 }
120}
121
122impl Postgres {
123 /// Connect to database, returning [Client] and [Driver] on success
124 ///
125 /// # Examples:
126 /// ```rust
127 /// use std::future::IntoFuture;
128 /// use xitca_postgres::{Execute, Postgres};
129 ///
130 /// # async fn connect() {
131 /// let cfg = String::from("postgres://user:pass@localhost/db");
132 /// let (client, driver) = Postgres::new(cfg).connect().await.unwrap();
133 ///
134 /// // spawn driver as async task.
135 /// tokio::spawn(driver.into_future());
136 ///
137 /// // use client for query.
138 /// "SELECT 1".execute(&client).await.unwrap();
139 /// # }
140 ///
141 /// ```
142 pub async fn connect(self) -> Result<(Client, Driver), Error> {
143 let mut cfg = self.cfg?;
144 driver::connect(&mut cfg).await
145 }
146
147 /// Connect to database with an already established Io type.
148 /// Io type must impl [AsyncIo] trait to instruct the client and driver how to transmit
149 /// data through the Io.
150 pub async fn connect_io<Io>(self, io: Io) -> Result<(Client, Driver), Error>
151 where
152 Io: AsyncIo + Send + 'static,
153 {
154 let mut cfg = self.cfg?;
155 driver::connect_io(io, &mut cfg).await
156 }
157
158 #[cfg(feature = "quic")]
159 pub async fn connect_quic(self) -> Result<(Client, Driver), Error> {
160 use config::Host;
161
162 let mut cfg = self.cfg?;
163 cfg.host = cfg
164 .host
165 .into_iter()
166 .map(|host| match host {
167 Host::Tcp(host) => Host::Quic(host),
168 host => host,
169 })
170 .collect();
171 driver::connect(&mut cfg).await
172 }
173}
174
175type BoxedFuture<'a, O> = Pin<Box<dyn Future<Output = O> + Send + 'a>>;
176
177fn _assert_send<F: Send>(_: F) {}
178fn _assert_send2<F: Send>() {}
179
180fn _assert_connect_send() {
181 _assert_send(Postgres::new("postgres://postgres:postgres@localhost/postgres").connect());
182}
183
184fn _assert_driver_send() {
185 _assert_send2::<Driver>();
186}
187
188// temporary disabled test due to cargo workspace test bug.
189// #[cfg(feature = "quic")]
190// #[cfg(test)]
191// mod test {
192// use std::future::IntoFuture;
193
194// use quinn::{rustls::pki_types::PrivatePkcs8KeyDer, ServerConfig};
195
196// use crate::{proxy::Proxy, AsyncLendingIterator, Config, Postgres, QuicStream};
197
198// #[tokio::test]
199// async fn proxy() {
200// let name = vec!["127.0.0.1".to_string(), "localhost".to_string()];
201// let cert = rcgen::generate_simple_self_signed(name).unwrap();
202
203// let key = PrivatePkcs8KeyDer::from(cert.key_pair.serialize_der()).into();
204// let cert = cert.cert.der().clone();
205
206// let mut cfg = xitca_tls::rustls::ServerConfig::builder()
207// .with_no_client_auth()
208// .with_single_cert(vec![cert], key)
209// .unwrap();
210
211// cfg.alpn_protocols = vec![crate::driver::quic::QUIC_ALPN.to_vec()];
212
213// let cfg = quinn::crypto::rustls::QuicServerConfig::try_from(cfg).unwrap();
214
215// let config = ServerConfig::with_crypto(std::sync::Arc::new(cfg));
216
217// let upstream = tokio::net::lookup_host("localhost:5432").await.unwrap().next().unwrap();
218
219// tokio::spawn(
220// Proxy::with_config(config)
221// .upstream_addr(upstream)
222// .listen_addr("127.0.0.1:5432".parse().unwrap())
223// .run(),
224// );
225
226// let mut cfg = Config::new();
227
228// cfg.dbname("postgres").user("postgres").password("postgres");
229
230// let conn = crate::driver::quic::_connect_quic("127.0.0.1", &[5432]).await.unwrap();
231
232// let stream = conn.open_bi().await.unwrap();
233
234// let (cli, task) = Postgres::new(cfg).connect_io(QuicStream::from(stream)).await.unwrap();
235
236// let handle = tokio::spawn(task.into_future());
237
238// let _ = cli.query_simple("").await.unwrap().try_next().await;
239
240// drop(cli);
241
242// handle.await.unwrap();
243// }
244// }
245
246#[cfg(test)]
247mod test {
248 use super::*;
249
250 #[tokio::test]
251 async fn config_error() {
252 let mut cfg = Config::new();
253
254 cfg.dbname("postgres").user("postgres").password("postgres");
255
256 let mut cfg1 = cfg.clone();
257 cfg1.host("localhost");
258 Postgres::new(cfg1).connect().await.err().unwrap();
259
260 cfg.port(5432);
261 Postgres::new(cfg).connect().await.err().unwrap();
262 }
263}