xitca_postgres/
lib.rs

1#![doc = include_str!("../README.md")]
2#![forbid(unsafe_code)]
3#![cfg_attr(feature = "io-uring", feature(async_iterator, gen_blocks))]
4#![cfg_attr(feature = "nightly", feature(impl_trait_in_assoc_type))]
5
6mod cancel;
7mod client;
8mod column;
9mod config;
10mod driver;
11mod execute;
12mod from_sql;
13mod prepare;
14mod query;
15mod session;
16
17pub mod copy;
18pub mod error;
19pub mod iter;
20pub mod pool;
21pub mod row;
22pub mod statement;
23pub mod transaction;
24pub mod types;
25
26#[cfg(feature = "quic")]
27pub mod proxy;
28#[cfg(feature = "quic")]
29pub use driver::quic::QuicStream;
30
31pub use self::{
32    client::Client,
33    column::Column,
34    config::Config,
35    driver::Driver,
36    error::Error,
37    execute::{Execute, ExecuteBlocking},
38    from_sql::FromSqlExt,
39    query::{RowSimpleStream, RowSimpleStreamOwned, RowStream, RowStreamOwned},
40    session::Session,
41    statement::Statement,
42};
43
44#[cfg(feature = "compat")]
45pub mod compat {
46    //! compatibility mod to work with [`futures::Stream`] trait.
47    //!
48    //! by default this crate uses an async lending iterator to enable zero copy database row handling when possible.
49    //! but this approach can be difficult to hook into existing libraries and crates. In case a traditional async iterator
50    //! is needed this module offers types as adapters.
51    //!
52    //! # Examples
53    //! ```
54    //! # use xitca_postgres::{Client, Error, Execute, Statement};
55    //! # async fn convert(client: Client) -> Result<(), Error> {
56    //! // prepare a statement and query for rows.
57    //! let stmt = Statement::named("SELECT * from users", &[]).execute(&client).await?;
58    //! let mut stream = stmt.query(&client).await?;
59    //!
60    //! // assuming we want to spawn a tokio async task and handle the stream inside it.
61    //! // but code below would not work as stream is a borrowed type with lending iterator implements.
62    //! // tokio::spawn(async move {
63    //! //    let stream = stream;
64    //! // })
65    //!
66    //! // in order to remove lifetime constraint we can do following:
67    //!
68    //! // convert borrowed stream to owned stream where lifetime constraints are lifted.
69    //! let mut stream = xitca_postgres::RowStreamOwned::from(stream);
70    //!
71    //! // spawn async task and move stream into it.
72    //! tokio::spawn(async move {
73    //!     // use async iterator to handle rows.
74    //!     use futures::stream::TryStreamExt;
75    //!     while let Some(row) = stream.try_next().await? {
76    //!         // handle row
77    //!     }
78    //!     Ok::<_, Error>(())
79    //! });
80    //! # Ok(())
81    //! # }
82    //! ```
83    //!
84    //! [`futures::Stream`]: futures_core::stream::Stream
85}
86
87pub mod dev {
88    //! traits for extending functionalities through external crate
89
90    pub use crate::client::ClientBorrowMut;
91    pub use crate::copy::r#Copy;
92    pub use crate::driver::codec::{Response, encode::Encode, response::IntoResponse};
93    pub use crate::prepare::Prepare;
94    pub use crate::query::Query;
95}
96
97use core::{future::Future, pin::Pin, sync::atomic::AtomicUsize};
98
99use xitca_io::io::AsyncIo;
100
101static NEXT_ID: AtomicUsize = AtomicUsize::new(0);
102
103#[derive(Debug)]
104pub struct Postgres {
105    cfg: Result<Config, Error>,
106}
107
108impl Postgres {
109    pub fn new<C>(cfg: C) -> Self
110    where
111        Config: TryFrom<C>,
112        Error: From<<Config as TryFrom<C>>::Error>,
113    {
114        Self {
115            cfg: Config::try_from(cfg).map_err(Into::into),
116        }
117    }
118}
119
120impl Postgres {
121    /// Connect to database, returning [Client] and [Driver] on success
122    ///
123    /// # Examples:
124    /// ```rust
125    /// use std::future::IntoFuture;
126    /// use xitca_postgres::{Execute, Postgres};
127    ///
128    /// # async fn connect() {
129    /// let cfg = String::from("postgres://user:pass@localhost/db");
130    /// let (client, driver) = Postgres::new(cfg).connect().await.unwrap();
131    ///
132    /// // spawn driver as async task.
133    /// tokio::spawn(driver.into_future());
134    ///
135    /// // use client for query.
136    /// "SELECT 1".execute(&client).await.unwrap();
137    /// # }
138    ///
139    /// ```
140    pub async fn connect(self) -> Result<(Client, Driver), Error> {
141        let mut cfg = self.cfg?;
142        driver::connect(&mut cfg).await
143    }
144
145    /// Connect to database with an already established Io type.
146    /// Io type must impl [AsyncIo] trait to instruct the client and driver how to transmit
147    /// data through the Io.
148    pub async fn connect_io<Io>(self, io: Io) -> Result<(Client, Driver), Error>
149    where
150        Io: AsyncIo + Send + 'static,
151    {
152        let mut cfg = self.cfg?;
153        driver::connect_io(io, &mut cfg).await
154    }
155
156    #[cfg(feature = "quic")]
157    pub async fn connect_quic(self) -> Result<(Client, Driver), Error> {
158        use config::Host;
159
160        let mut cfg = self.cfg?;
161        cfg.host = cfg
162            .host
163            .into_iter()
164            .map(|host| match host {
165                Host::Tcp(host) => Host::Quic(host),
166                host => host,
167            })
168            .collect();
169        driver::connect(&mut cfg).await
170    }
171}
172
173type BoxedFuture<'a, O> = Pin<Box<dyn Future<Output = O> + Send + 'a>>;
174
175fn _assert_send<F: Send>(_: F) {}
176fn _assert_send2<F: Send>() {}
177
178fn _assert_connect_send() {
179    _assert_send(Postgres::new("postgres://postgres:postgres@localhost/postgres").connect());
180}
181
182fn _assert_driver_send() {
183    _assert_send2::<Driver>();
184}
185
186// temporary disabled test due to cargo workspace test bug.
187// #[cfg(feature = "quic")]
188// #[cfg(test)]
189// mod test {
190//     use std::future::IntoFuture;
191
192//     use quinn::{rustls::pki_types::PrivatePkcs8KeyDer, ServerConfig};
193
194//     use crate::{proxy::Proxy, AsyncLendingIterator, Config, Postgres, QuicStream};
195
196//     #[tokio::test]
197//     async fn proxy() {
198//         let name = vec!["127.0.0.1".to_string(), "localhost".to_string()];
199//         let cert = rcgen::generate_simple_self_signed(name).unwrap();
200
201//         let key = PrivatePkcs8KeyDer::from(cert.key_pair.serialize_der()).into();
202//         let cert = cert.cert.der().clone();
203
204//         let mut cfg = xitca_tls::rustls::ServerConfig::builder()
205//             .with_no_client_auth()
206//             .with_single_cert(vec![cert], key)
207//             .unwrap();
208
209//         cfg.alpn_protocols = vec![crate::driver::quic::QUIC_ALPN.to_vec()];
210
211//         let cfg = quinn::crypto::rustls::QuicServerConfig::try_from(cfg).unwrap();
212
213//         let config = ServerConfig::with_crypto(std::sync::Arc::new(cfg));
214
215//         let upstream = tokio::net::lookup_host("localhost:5432").await.unwrap().next().unwrap();
216
217//         tokio::spawn(
218//             Proxy::with_config(config)
219//                 .upstream_addr(upstream)
220//                 .listen_addr("127.0.0.1:5432".parse().unwrap())
221//                 .run(),
222//         );
223
224//         let mut cfg = Config::new();
225
226//         cfg.dbname("postgres").user("postgres").password("postgres");
227
228//         let conn = crate::driver::quic::_connect_quic("127.0.0.1", &[5432]).await.unwrap();
229
230//         let stream = conn.open_bi().await.unwrap();
231
232//         let (cli, task) = Postgres::new(cfg).connect_io(QuicStream::from(stream)).await.unwrap();
233
234//         let handle = tokio::spawn(task.into_future());
235
236//         let _ = cli.query_simple("").await.unwrap().try_next().await;
237
238//         drop(cli);
239
240//         handle.await.unwrap();
241//     }
242// }
243
244#[cfg(test)]
245mod test {
246    use super::*;
247
248    #[tokio::test]
249    async fn config_error() {
250        let mut cfg = Config::new();
251
252        cfg.dbname("postgres").user("postgres").password("postgres");
253
254        let mut cfg1 = cfg.clone();
255        cfg1.host("localhost");
256        Postgres::new(cfg1).connect().await.err().unwrap();
257
258        cfg.port(5432);
259        Postgres::new(cfg).connect().await.err().unwrap();
260    }
261}