xitca_postgres/lib.rs
1#![doc = include_str!("../README.md")]
2#![forbid(unsafe_code)]
3
4mod cancel;
5mod client;
6mod column;
7mod config;
8mod driver;
9mod execute;
10mod from_sql;
11mod prepare;
12mod query;
13mod session;
14
15pub mod copy;
16pub mod error;
17pub mod iter;
18pub mod pipeline;
19pub mod pool;
20pub mod row;
21pub mod statement;
22pub mod transaction;
23pub mod types;
24
25#[cfg(feature = "quic")]
26pub mod proxy;
27#[cfg(feature = "quic")]
28pub use driver::quic::QuicStream;
29
30pub use self::{
31 client::Client,
32 config::Config,
33 driver::Driver,
34 error::Error,
35 execute::{Execute, ExecuteBlocking, ExecuteMut},
36 from_sql::FromSqlExt,
37 query::{RowSimpleStream, RowSimpleStreamOwned, RowStream, RowStreamOwned},
38 session::Session,
39 statement::Statement,
40};
41
42#[cfg(feature = "compat")]
43pub mod compat {
44 //! compatibility mod to work with [`futures::Stream`] trait.
45 //!
46 //! by default this crate uses an async lending iterator to enable zero copy database row handling when possible.
47 //! but this approach can be difficult to hook into existing libraries and crates. In case a traditional async iterator
48 //! is needed this module offers types as adapters.
49 //!
50 //! # Examples
51 //! ```
52 //! # use xitca_postgres::{Client, Error, Execute, Statement};
53 //! # async fn convert(client: Client) -> Result<(), Error> {
54 //! // prepare a statement and query for rows.
55 //! let stmt = Statement::named("SELECT * from users", &[]).execute(&client).await?;
56 //! let mut stream = stmt.query(&client).await?;
57 //!
58 //! // assuming we want to spawn a tokio async task and handle the stream inside it.
59 //! // but code below would not work as stream is a borrowed type with lending iterator implements.
60 //! // tokio::spawn(async move {
61 //! // let stream = stream;
62 //! // })
63 //!
64 //! // in order to remove lifetime constraint we can do following:
65 //!
66 //! // convert borrowed stream to owned stream where lifetime constraints are lifted.
67 //! let mut stream = xitca_postgres::RowStreamOwned::from(stream);
68 //!
69 //! // spawn async task and move stream into it.
70 //! tokio::spawn(async move {
71 //! // use async iterator to handle rows.
72 //! use futures::stream::TryStreamExt;
73 //! while let Some(row) = stream.try_next().await? {
74 //! // handle row
75 //! }
76 //! Ok::<_, Error>(())
77 //! });
78 //! # Ok(())
79 //! # }
80 //! ```
81 //!
82 //! [`futures::Stream`]: futures_core::stream::Stream
83
84 pub use crate::statement::compat::StatementGuarded;
85}
86
87pub mod dev {
88 //! traits for extending functionalities through external crate
89
90 pub use crate::client::ClientBorrowMut;
91 pub use crate::copy::r#Copy;
92 pub use crate::driver::codec::{encode::Encode, response::IntoResponse, Response};
93 pub use crate::prepare::Prepare;
94 pub use crate::query::Query;
95}
96
97use core::{future::Future, pin::Pin, sync::atomic::AtomicUsize};
98
99use xitca_io::io::AsyncIo;
100
101static NEXT_ID: AtomicUsize = AtomicUsize::new(0);
102
103#[derive(Debug)]
104pub struct Postgres {
105 cfg: Result<Config, Error>,
106}
107
108impl Postgres {
109 pub fn new<C>(cfg: C) -> Self
110 where
111 Config: TryFrom<C>,
112 Error: From<<Config as TryFrom<C>>::Error>,
113 {
114 Self {
115 cfg: Config::try_from(cfg).map_err(Into::into),
116 }
117 }
118}
119
120impl Postgres {
121 /// Connect to database, returning [Client] and [Driver] on success
122 ///
123 /// # Examples:
124 /// ```rust
125 /// use std::future::IntoFuture;
126 /// use xitca_postgres::{Execute, Postgres};
127 ///
128 /// # async fn connect() {
129 /// let cfg = String::from("postgres://user:pass@localhost/db");
130 /// let (client, driver) = Postgres::new(cfg).connect().await.unwrap();
131 ///
132 /// // spawn driver as async task.
133 /// tokio::spawn(driver.into_future());
134 ///
135 /// // use client for query.
136 /// "SELECT 1".execute(&client).await.unwrap();
137 /// # }
138 ///
139 /// ```
140 pub async fn connect(self) -> Result<(Client, Driver), Error> {
141 let mut cfg = self.cfg?;
142 driver::connect(&mut cfg).await
143 }
144
145 /// Connect to database with an already established Io type.
146 /// Io type must impl [AsyncIo] trait to instruct the client and driver how to transmit
147 /// data through the Io.
148 pub async fn connect_io<Io>(self, io: Io) -> Result<(Client, Driver), Error>
149 where
150 Io: AsyncIo + Send + 'static,
151 {
152 let mut cfg = self.cfg?;
153 driver::connect_io(io, &mut cfg).await
154 }
155
156 #[cfg(feature = "quic")]
157 pub async fn connect_quic(self) -> Result<(Client, Driver), Error> {
158 use config::Host;
159
160 let mut cfg = self.cfg?;
161 cfg.host = cfg
162 .host
163 .into_iter()
164 .map(|host| match host {
165 Host::Tcp(host) => Host::Quic(host),
166 host => host,
167 })
168 .collect();
169 driver::connect(&mut cfg).await
170 }
171}
172
173type BoxedFuture<'a, O> = Pin<Box<dyn Future<Output = O> + Send + 'a>>;
174
175fn _assert_send<F: Send>(_: F) {}
176fn _assert_send2<F: Send>() {}
177
178fn _assert_connect_send() {
179 _assert_send(Postgres::new("postgres://postgres:postgres@localhost/postgres").connect());
180}
181
182fn _assert_driver_send() {
183 _assert_send2::<Driver>();
184}
185
186// temporary disabled test due to cargo workspace test bug.
187// #[cfg(feature = "quic")]
188// #[cfg(test)]
189// mod test {
190// use std::future::IntoFuture;
191
192// use quinn::{rustls::pki_types::PrivatePkcs8KeyDer, ServerConfig};
193
194// use crate::{proxy::Proxy, AsyncLendingIterator, Config, Postgres, QuicStream};
195
196// #[tokio::test]
197// async fn proxy() {
198// let name = vec!["127.0.0.1".to_string(), "localhost".to_string()];
199// let cert = rcgen::generate_simple_self_signed(name).unwrap();
200
201// let key = PrivatePkcs8KeyDer::from(cert.key_pair.serialize_der()).into();
202// let cert = cert.cert.der().clone();
203
204// let mut cfg = xitca_tls::rustls::ServerConfig::builder()
205// .with_no_client_auth()
206// .with_single_cert(vec![cert], key)
207// .unwrap();
208
209// cfg.alpn_protocols = vec![crate::driver::quic::QUIC_ALPN.to_vec()];
210
211// let cfg = quinn::crypto::rustls::QuicServerConfig::try_from(cfg).unwrap();
212
213// let config = ServerConfig::with_crypto(std::sync::Arc::new(cfg));
214
215// let upstream = tokio::net::lookup_host("localhost:5432").await.unwrap().next().unwrap();
216
217// tokio::spawn(
218// Proxy::with_config(config)
219// .upstream_addr(upstream)
220// .listen_addr("127.0.0.1:5432".parse().unwrap())
221// .run(),
222// );
223
224// let mut cfg = Config::new();
225
226// cfg.dbname("postgres").user("postgres").password("postgres");
227
228// let conn = crate::driver::quic::_connect_quic("127.0.0.1", &[5432]).await.unwrap();
229
230// let stream = conn.open_bi().await.unwrap();
231
232// let (cli, task) = Postgres::new(cfg).connect_io(QuicStream::from(stream)).await.unwrap();
233
234// let handle = tokio::spawn(task.into_future());
235
236// let _ = cli.query_simple("").await.unwrap().try_next().await;
237
238// drop(cli);
239
240// handle.await.unwrap();
241// }
242// }
243
244#[cfg(test)]
245mod test {
246 use super::*;
247
248 #[tokio::test]
249 async fn config_error() {
250 let mut cfg = Config::new();
251
252 cfg.dbname("postgres").user("postgres").password("postgres");
253
254 let mut cfg1 = cfg.clone();
255 cfg1.host("localhost");
256 Postgres::new(cfg1).connect().await.err().unwrap();
257
258 cfg.port(5432);
259 Postgres::new(cfg).connect().await.err().unwrap();
260 }
261}