xitca_postgres/
lib.rs

1#![doc = include_str!("../README.md")]
2#![forbid(unsafe_code)]
3
4mod cancel;
5mod client;
6mod column;
7mod config;
8mod driver;
9mod execute;
10mod from_sql;
11mod prepare;
12mod query;
13mod session;
14
15pub mod copy;
16pub mod error;
17pub mod iter;
18pub mod pipeline;
19pub mod pool;
20pub mod row;
21pub mod statement;
22pub mod transaction;
23pub mod types;
24
25#[cfg(feature = "quic")]
26pub mod proxy;
27#[cfg(feature = "quic")]
28pub use driver::quic::QuicStream;
29
30pub use self::{
31    client::Client,
32    config::Config,
33    driver::Driver,
34    error::Error,
35    execute::{Execute, ExecuteBlocking, ExecuteMut},
36    from_sql::FromSqlExt,
37    query::{RowSimpleStream, RowSimpleStreamOwned, RowStream, RowStreamOwned},
38    session::Session,
39    statement::Statement,
40};
41
42#[cfg(feature = "compat")]
43pub mod compat {
44    //! compatibility mod to work with [`futures::Stream`] trait.
45    //!
46    //! by default this crate uses an async lending iterator to enable zero copy database row handling when possible.
47    //! but this approach can be difficult to hook into existing libraries and crates. In case a traditional async iterator
48    //! is needed this module offers types as adapters.
49    //!
50    //! # Examples
51    //! ```
52    //! # use xitca_postgres::{Client, Error, Execute, Statement};
53    //! # async fn convert(client: Client) -> Result<(), Error> {
54    //! // prepare a statement and query for rows.
55    //! let stmt = Statement::named("SELECT * from users", &[]).execute(&client).await?;
56    //! let mut stream = stmt.query(&client).await?;
57    //!
58    //! // assuming we want to spawn a tokio async task and handle the stream inside it.
59    //! // but code below would not work as stream is a borrowed type with lending iterator implements.
60    //! // tokio::spawn(async move {
61    //! //    let stream = stream;
62    //! // })
63    //!
64    //! // in order to remove lifetime constraint we can do following:
65    //!
66    //! // convert borrowed stream to owned stream where lifetime constraints are lifted.
67    //! let mut stream = xitca_postgres::RowStreamOwned::from(stream);
68    //!
69    //! // spawn async task and move stream into it.
70    //! tokio::spawn(async move {
71    //!     // use async iterator to handle rows.
72    //!     use futures::stream::TryStreamExt;
73    //!     while let Some(row) = stream.try_next().await? {
74    //!         // handle row
75    //!     }
76    //!     Ok::<_, Error>(())
77    //! });
78    //! # Ok(())
79    //! # }
80    //! ```
81    //!
82    //! [`futures::Stream`]: futures_core::stream::Stream
83
84    pub use crate::statement::compat::StatementGuarded;
85}
86
87pub mod dev {
88    //! traits for extending functionalities through external crate
89
90    pub use crate::client::ClientBorrowMut;
91    pub use crate::copy::r#Copy;
92    pub use crate::driver::codec::{encode::Encode, response::IntoResponse, Response};
93    pub use crate::prepare::Prepare;
94    pub use crate::query::Query;
95}
96
97use core::{future::Future, pin::Pin, sync::atomic::AtomicUsize};
98
99use xitca_io::io::AsyncIo;
100
101static NEXT_ID: AtomicUsize = AtomicUsize::new(0);
102
103#[derive(Debug)]
104pub struct Postgres {
105    cfg: Result<Config, Error>,
106}
107
108impl Postgres {
109    pub fn new<C>(cfg: C) -> Self
110    where
111        Config: TryFrom<C>,
112        Error: From<<Config as TryFrom<C>>::Error>,
113    {
114        Self {
115            cfg: Config::try_from(cfg).map_err(Into::into),
116        }
117    }
118}
119
120impl Postgres {
121    /// Connect to database, returning [Client] and [Driver] on success
122    ///
123    /// # Examples:
124    /// ```rust
125    /// use std::future::IntoFuture;
126    /// use xitca_postgres::{Execute, Postgres};
127    ///
128    /// # async fn connect() {
129    /// let cfg = String::from("postgres://user:pass@localhost/db");
130    /// let (client, driver) = Postgres::new(cfg).connect().await.unwrap();
131    ///
132    /// // spawn driver as async task.
133    /// tokio::spawn(driver.into_future());
134    ///
135    /// // use client for query.
136    /// "SELECT 1".execute(&client).await.unwrap();
137    /// # }
138    ///
139    /// ```
140    pub async fn connect(self) -> Result<(Client, Driver), Error> {
141        let mut cfg = self.cfg?;
142        driver::connect(&mut cfg).await
143    }
144
145    /// Connect to database with an already established Io type.
146    /// Io type must impl [AsyncIo] trait to instruct the client and driver how to transmit
147    /// data through the Io.
148    pub async fn connect_io<Io>(self, io: Io) -> Result<(Client, Driver), Error>
149    where
150        Io: AsyncIo + Send + 'static,
151    {
152        let mut cfg = self.cfg?;
153        driver::connect_io(io, &mut cfg).await
154    }
155
156    #[cfg(feature = "quic")]
157    pub async fn connect_quic(self) -> Result<(Client, Driver), Error> {
158        use config::Host;
159
160        let mut cfg = self.cfg?;
161        cfg.host = cfg
162            .host
163            .into_iter()
164            .map(|host| match host {
165                Host::Tcp(host) => Host::Quic(host),
166                host => host,
167            })
168            .collect();
169        driver::connect(&mut cfg).await
170    }
171}
172
173type BoxedFuture<'a, O> = Pin<Box<dyn Future<Output = O> + Send + 'a>>;
174
175fn _assert_send<F: Send>(_: F) {}
176fn _assert_send2<F: Send>() {}
177
178fn _assert_connect_send() {
179    _assert_send(Postgres::new("postgres://postgres:postgres@localhost/postgres").connect());
180}
181
182fn _assert_driver_send() {
183    _assert_send2::<Driver>();
184}
185
186// temporary disabled test due to cargo workspace test bug.
187// #[cfg(feature = "quic")]
188// #[cfg(test)]
189// mod test {
190//     use std::future::IntoFuture;
191
192//     use quinn::{rustls::pki_types::PrivatePkcs8KeyDer, ServerConfig};
193
194//     use crate::{proxy::Proxy, AsyncLendingIterator, Config, Postgres, QuicStream};
195
196//     #[tokio::test]
197//     async fn proxy() {
198//         let name = vec!["127.0.0.1".to_string(), "localhost".to_string()];
199//         let cert = rcgen::generate_simple_self_signed(name).unwrap();
200
201//         let key = PrivatePkcs8KeyDer::from(cert.key_pair.serialize_der()).into();
202//         let cert = cert.cert.der().clone();
203
204//         let mut cfg = xitca_tls::rustls::ServerConfig::builder()
205//             .with_no_client_auth()
206//             .with_single_cert(vec![cert], key)
207//             .unwrap();
208
209//         cfg.alpn_protocols = vec![crate::driver::quic::QUIC_ALPN.to_vec()];
210
211//         let cfg = quinn::crypto::rustls::QuicServerConfig::try_from(cfg).unwrap();
212
213//         let config = ServerConfig::with_crypto(std::sync::Arc::new(cfg));
214
215//         let upstream = tokio::net::lookup_host("localhost:5432").await.unwrap().next().unwrap();
216
217//         tokio::spawn(
218//             Proxy::with_config(config)
219//                 .upstream_addr(upstream)
220//                 .listen_addr("127.0.0.1:5432".parse().unwrap())
221//                 .run(),
222//         );
223
224//         let mut cfg = Config::new();
225
226//         cfg.dbname("postgres").user("postgres").password("postgres");
227
228//         let conn = crate::driver::quic::_connect_quic("127.0.0.1", &[5432]).await.unwrap();
229
230//         let stream = conn.open_bi().await.unwrap();
231
232//         let (cli, task) = Postgres::new(cfg).connect_io(QuicStream::from(stream)).await.unwrap();
233
234//         let handle = tokio::spawn(task.into_future());
235
236//         let _ = cli.query_simple("").await.unwrap().try_next().await;
237
238//         drop(cli);
239
240//         handle.await.unwrap();
241//     }
242// }
243
244#[cfg(test)]
245mod test {
246    use super::*;
247
248    #[tokio::test]
249    async fn config_error() {
250        let mut cfg = Config::new();
251
252        cfg.dbname("postgres").user("postgres").password("postgres");
253
254        let mut cfg1 = cfg.clone();
255        cfg1.host("localhost");
256        Postgres::new(cfg1).connect().await.err().unwrap();
257
258        cfg.port(5432);
259        Postgres::new(cfg).connect().await.err().unwrap();
260    }
261}