postgrpc/pools/mod.rs
1use futures_util::TryStream;
2use std::fmt;
3use tonic::{async_trait, Status};
4
5#[cfg_attr(doc, doc(cfg(feature = "deadpool")))]
6#[cfg(any(doc, feature = "deadpool"))]
7pub mod deadpool;
8mod protocol;
9#[cfg_attr(doc, doc(cfg(feature = "transaction")))]
10#[cfg(any(doc, feature = "transaction"))]
11pub mod transaction;
12
13/// Newtype wrapper around dynamically-typed, JSON-compatible [`pbjson_types::Value`]s.
14///
15/// This implements [`tokio_postgres::types::ToSql`] as a means of interfacing between the loose types of
16/// JSON and the richer type system encoded in the Postgres protocol.
17#[derive(Debug, Clone)]
18pub struct Parameter(pbjson_types::Value);
19
20/// gRPC-compatible connection behavior across database connection types. All inputs and outputs
21/// are based on protobuf's JSON-compatible well-known-types [`pbjson_types::Struct`] and
22/// [`pbjson_types::Value`].
23///
24/// #### Example:
25///
26/// ```rust
27/// use postgrpc::pools::{Connection, Parameter};
28/// use tonic::{Row, Status};
29///
30/// // implementing a real StructStream (a fallible stream of Structs)
31/// // is an exercise left to the reader
32/// struct StructStream;
33///
34/// impl From<Vec<Row>> for StructStream {
35/// // also an exercise for the reader
36/// }
37///
38/// #[async_trait]
39/// impl Connection for tokio_postgres::Client {
40/// type Error = Status;
41/// type RowStream = StructStream;
42///
43/// async fn query(
44/// &self,
45/// statement: &str,
46/// parameters: &[Parameter],
47/// ) -> Result<Self::RowStream, Self::Error> {
48/// // Parameter implements ToSql, so can be used directly in query()
49/// let rows: Vec<_> = self
50/// .query(statement, parameters)
51/// .await
52/// .map_error(|error| Status::invalid_argument(error.to_string()))?;
53///
54/// // it's best to stream rows instead of collecting them into a Vec
55/// // but this is only meant to be a quick example
56/// Ok(StructStream::from(rows))
57/// }
58///
59/// #[tracing::instrument(skip(self))]
60/// async fn batch(&self, query: &str) -> Result<(), Self::Error> {
61/// tracing::trace!("Executing batch query on Connection");
62/// self.batch_execute(query).await.map_err(|error| Status::invalid_argument(error.to_string()))
63/// }
64/// }
65/// ```
66
67#[async_trait]
68pub trait Connection: Send + Sync {
69 /// A fallible stream of rows returned from the database as protobuf structs
70 type RowStream: TryStream<Ok = pbjson_types::Struct, Error = Self::Error> + Send + Sync;
71
72 /// Error type on the connection encompassing top-level errors (i.e. "bad connection") and
73 /// errors within a RowStream
74 type Error: std::error::Error + Into<Status> + Send + Sync;
75
76 /// Run a query parameterized by the Connection's associated Parameter, returning a RowStream
77 async fn query(
78 &self,
79 statement: &str,
80 parameters: &[Parameter],
81 ) -> Result<Self::RowStream, Self::Error>;
82
83 /// Run a set of SQL statements using the simple query protocol
84 async fn batch(&self, query: &str) -> Result<(), Self::Error>;
85}
86
87/// Connection pool behavior that can be customized across async pool implementations.
88///
89/// The key difference between a [`Pool`] and most other connection pools is the way new
90/// connections are accessed: by building connection logic around a `Key` that can be derived from
91/// a [`tonic::Request`], all connection isolation and preparation can be handled internally to the
92/// pool. Furthermore, pools don't _have_ to be traditional pools, but can hand out shared access
93/// to a single [`Connection`].
94///
95/// #### Example:
96///
97/// ```
98/// use postgrpc::pools::{Pool, Connection};
99/// use std::collections::BTreeMap;
100/// use tokio::sync::RwLock;
101/// use uuid::Uuid;
102/// use tonic::Status;
103///
104/// // a simple connection wrapper
105/// // (implementing postgrpc::Connection is an exercise for the reader)
106/// #[derive(Clone)]
107/// struct MyConnection(Arc<tokio_postgres::Client>);
108///
109/// // a toy pool wrapping a collection of tokio_postgres::Clients
110/// // accessible by unique IDs that are provided by the caller
111/// struct MyPool {
112/// connections: RwLock<BTreeMap<Uuid, MyConnection>>,
113/// config: tokio_postgres::config::Config
114/// }
115///
116/// #[postgrpc::async_trait]
117/// impl Pool for MyPool {
118/// type Key: Uuid;
119/// type Connection: MyConnection;
120/// type Error: Status;
121///
122/// async fn get_connection(&self, key: Self::Key) -> Result<Self::Connection, Self::Error> {
123/// // get a connection from the pool or store one for later
124/// let connections = self.connections.read().await;
125///
126/// match connections.get(&key) {
127/// Some(connection) => Ok(Arc::clone(connection.0)),
128/// None => {
129/// // drop the previous lock on the connections
130/// drop(connections);
131///
132/// // connect to the database using the configuration
133/// let (client, connection) = self
134/// .config
135/// .connect(tokio_postgres::NoTls)
136/// .map_error(|error| Status::internal(error.to_string()))?;
137///
138/// // spawn the raw connection off onto an executor
139/// tokio::spawn(async move {
140/// if let Err(error) = connection.await {
141/// eprintln!("connection error: {}", error);
142/// }
143/// });
144///
145/// // store a reference to the connection for later
146/// let connection = MyConnection(Arc::new(client));
147/// self.connections.write().await.insert(key, connection);
148///
149/// // return another reference to the connection for use
150/// Ok(connection)
151/// }
152/// }
153/// }
154/// }
155/// ```
156#[async_trait]
157pub trait Pool: Send + Sync {
158 /// The key by which connections are selected from the Pool, allowing for custom
159 /// connection-fetching logic in Pool implementations
160 type Key: fmt::Debug + Send + Sync;
161
162 /// The underlying connection type returned from the Pool
163 type Connection: Connection;
164
165 /// Errors related to fetching Connections from the Pool
166 type Error: std::error::Error
167 + From<<Self::Connection as Connection>::Error>
168 + Into<Status>
169 + Send
170 + Sync;
171
172 /// Get a single connection from the pool using some key
173 async fn get_connection(&self, key: Self::Key) -> Result<Self::Connection, Self::Error>;
174}