postgrpc/pools/
mod.rs

1use futures_util::TryStream;
2use std::fmt;
3use tonic::{async_trait, Status};
4
5#[cfg_attr(doc, doc(cfg(feature = "deadpool")))]
6#[cfg(any(doc, feature = "deadpool"))]
7pub mod deadpool;
8mod protocol;
9#[cfg_attr(doc, doc(cfg(feature = "transaction")))]
10#[cfg(any(doc, feature = "transaction"))]
11pub mod transaction;
12
13/// Newtype wrapper around dynamically-typed, JSON-compatible [`pbjson_types::Value`]s.
14///
15/// This implements [`tokio_postgres::types::ToSql`] as a means of interfacing between the loose types of
16/// JSON and the richer type system encoded in the Postgres protocol.
17#[derive(Debug, Clone)]
18pub struct Parameter(pbjson_types::Value);
19
20/// gRPC-compatible connection behavior across database connection types. All inputs and outputs
21/// are based on protobuf's JSON-compatible well-known-types [`pbjson_types::Struct`] and
22/// [`pbjson_types::Value`].
23///
24/// #### Example:
25///
26/// ```rust
27/// use postgrpc::pools::{Connection, Parameter};
28/// use tonic::{Row, Status};
29///
30/// // implementing a real StructStream (a fallible stream of Structs)
31/// // is an exercise left to the reader
32/// struct StructStream;
33///
34/// impl From<Vec<Row>> for StructStream {
35///   // also an exercise for the reader
36/// }
37///
38/// #[async_trait]
39/// impl Connection for tokio_postgres::Client {
40///   type Error = Status;
41///   type RowStream = StructStream;
42///
43///   async fn query(
44///     &self,
45///     statement: &str,
46///     parameters: &[Parameter],
47///   ) -> Result<Self::RowStream, Self::Error> {
48///     // Parameter implements ToSql, so can be used directly in query()
49///     let rows: Vec<_> = self
50///       .query(statement, parameters)
51///       .await
52///       .map_error(|error| Status::invalid_argument(error.to_string()))?;
53///
54///     // it's best to stream rows instead of collecting them into a Vec
55///     // but this is only meant to be a quick example
56///      Ok(StructStream::from(rows))
57///   }
58///
59///  #[tracing::instrument(skip(self))]
60///  async fn batch(&self, query: &str) -> Result<(), Self::Error> {
61///    tracing::trace!("Executing batch query on Connection");
62///    self.batch_execute(query).await.map_err(|error| Status::invalid_argument(error.to_string()))
63///    }
64/// }
65/// ```
66
67#[async_trait]
68pub trait Connection: Send + Sync {
69    /// A fallible stream of rows returned from the database as protobuf structs
70    type RowStream: TryStream<Ok = pbjson_types::Struct, Error = Self::Error> + Send + Sync;
71
72    /// Error type on the connection encompassing top-level errors (i.e. "bad connection") and
73    /// errors within a RowStream
74    type Error: std::error::Error + Into<Status> + Send + Sync;
75
76    /// Run a query parameterized by the Connection's associated Parameter, returning a RowStream
77    async fn query(
78        &self,
79        statement: &str,
80        parameters: &[Parameter],
81    ) -> Result<Self::RowStream, Self::Error>;
82
83    /// Run a set of SQL statements using the simple query protocol
84    async fn batch(&self, query: &str) -> Result<(), Self::Error>;
85}
86
87/// Connection pool behavior that can be customized across async pool implementations.
88///
89/// The key difference between a [`Pool`] and most other connection pools is the way new
90/// connections are accessed: by building connection logic around a `Key` that can be derived from
91/// a [`tonic::Request`], all connection isolation and preparation can be handled internally to the
92/// pool. Furthermore, pools don't _have_ to be traditional pools, but can hand out shared access
93/// to a single [`Connection`].
94///
95/// #### Example:
96///
97/// ```
98/// use postgrpc::pools::{Pool, Connection};
99/// use std::collections::BTreeMap;
100/// use tokio::sync::RwLock;
101/// use uuid::Uuid;
102/// use tonic::Status;
103///
104/// // a simple connection wrapper
105/// // (implementing postgrpc::Connection is an exercise for the reader)
106/// #[derive(Clone)]
107/// struct MyConnection(Arc<tokio_postgres::Client>);
108///
109/// // a toy pool wrapping a collection of tokio_postgres::Clients
110/// // accessible by unique IDs that are provided by the caller
111/// struct MyPool {
112///     connections: RwLock<BTreeMap<Uuid, MyConnection>>,
113///     config: tokio_postgres::config::Config
114/// }
115///
116/// #[postgrpc::async_trait]
117/// impl Pool for MyPool {
118///     type Key: Uuid;
119///     type Connection: MyConnection;
120///     type Error: Status;
121///
122///     async fn get_connection(&self, key: Self::Key) -> Result<Self::Connection, Self::Error> {
123///         // get a connection from the pool or store one for later
124///         let connections = self.connections.read().await;
125///
126///         match connections.get(&key) {
127///             Some(connection) => Ok(Arc::clone(connection.0)),
128///             None => {
129///                 // drop the previous lock on the connections
130///                 drop(connections);
131///
132///                 // connect to the database using the configuration
133///                 let (client, connection) = self
134///                     .config
135///                     .connect(tokio_postgres::NoTls)
136///                     .map_error(|error| Status::internal(error.to_string()))?;
137///
138///                 // spawn the raw connection off onto an executor
139///                 tokio::spawn(async move {
140///                     if let Err(error) = connection.await {
141///                         eprintln!("connection error: {}", error);
142///                     }
143///                 });
144///
145///                 // store a reference to the connection for later
146///                 let connection = MyConnection(Arc::new(client));
147///                 self.connections.write().await.insert(key, connection);
148///
149///                 // return another reference to the connection for use
150///                 Ok(connection)
151///             }
152///         }
153///     }
154/// }
155/// ```
156#[async_trait]
157pub trait Pool: Send + Sync {
158    /// The key by which connections are selected from the Pool, allowing for custom
159    /// connection-fetching logic in Pool implementations
160    type Key: fmt::Debug + Send + Sync;
161
162    /// The underlying connection type returned from the Pool
163    type Connection: Connection;
164
165    /// Errors related to fetching Connections from the Pool
166    type Error: std::error::Error
167        + From<<Self::Connection as Connection>::Error>
168        + Into<Status>
169        + Send
170        + Sync;
171
172    /// Get a single connection from the pool using some key
173    async fn get_connection(&self, key: Self::Key) -> Result<Self::Connection, Self::Error>;
174}