resolute 0.5.0

Compile-time-checked PostgreSQL queries with a pure-Rust wire protocol driver.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
//! Pool integration: typed client backed by pg-pool's ConnPool.
//!
//! Connections are reused across checkouts — the `AsyncConn` (with its
//! reader/writer tasks) survives checkout/return cycles.

use std::sync::Arc;

use pg_pool::async_wire::AsyncPoolable;
use pg_pool::{ConnPool, ConnPoolConfig, LifecycleHooks, PoolError, PoolGuard};

use crate::encode::SqlParam;
use crate::error::TypedError;
use crate::row::Row;

/// A pool of typed database connections.
///
/// Connections are `AsyncConn` instances that persist across checkouts.
/// Each checkout returns a `PooledClient` that auto-returns the
/// connection to the pool on drop.
///
/// ```no_run
/// # async fn example() -> Result<(), resolute::TypedError> {
/// use resolute::ExclusivePool;
/// let pool = ExclusivePool::connect("127.0.0.1:5432", "user", "pass", "mydb", 10).await?;
/// let client = pool.get().await?;
/// let rows = client.query("SELECT 1::int4 AS n", &[]).await?;
/// # let _ = rows;
/// # Ok(())
/// # }
/// ```
#[derive(Debug)]
pub struct ExclusivePool {
    pool: Arc<ConnPool<AsyncPoolable>>,
}

impl ExclusivePool {
    /// Create a new typed pool.
    ///
    /// # Errors
    ///
    /// Returns `PoolError::Connect(PgWireError)` if the initial minimum-size
    /// connections cannot be established.
    pub async fn new(
        config: ConnPoolConfig,
        hooks: LifecycleHooks<AsyncPoolable>,
    ) -> Result<Self, PoolError<pg_wired::PgWireError>> {
        let pool = ConnPool::new(config, hooks).await?;
        Ok(Self { pool })
    }

    /// Connect with sensible defaults.
    ///
    /// # Errors
    ///
    /// Same cases as [`ExclusivePool::new`].
    pub async fn connect(
        addr: &str,
        user: &str,
        password: &str,
        database: &str,
        max_size: usize,
    ) -> Result<Self, PoolError<pg_wired::PgWireError>> {
        let mut config = ConnPoolConfig::default();
        config.addr = addr.to_string();
        config.user = user.to_string();
        config.password = password.to_string();
        config.database = database.to_string();
        config.max_size = max_size;
        Self::new(config, LifecycleHooks::default()).await
    }

    /// Check out a connection from the pool.
    ///
    /// The returned `PooledClient` implements `Deref<Target = AsyncConn>`
    /// and can be used with all `Executor` trait methods. The connection is
    /// automatically returned to the pool when the client is dropped.
    ///
    /// # Errors
    ///
    /// Returns `TypedError::Pool` wrapping `PoolError::Timeout` if the pool is
    /// at `max_size` and no connection becomes available before the configured
    /// `checkout_timeout`, `PoolError::Connect(PgWireError)` if a new
    /// connection was needed but couldn't be established, or
    /// `PoolError::Draining` / `PoolError::Closed` if the pool is shutting
    /// down.
    pub async fn get(&self) -> Result<PooledClient, TypedError> {
        tracing::debug!("pool checkout");
        crate::metrics::record_pool_checkout();
        let guard = self.pool.get().await.map_err(|e| {
            tracing::warn!(error = %e, "pool checkout failed");
            crate::metrics::record_pool_timeout();
            TypedError::from(e)
        })?;
        Ok(PooledClient { guard })
    }

    /// Pool metrics.
    pub fn metrics(&self) -> pg_pool::PoolMetrics {
        self.pool.metrics()
    }

    /// Pre-populate the pool to a target number of connections.
    /// Avoids cold-start latency on the first requests.
    ///
    /// ```no_run
    /// # async fn _doctest() -> Result<(), Box<dyn std::error::Error>> {
    /// # use resolute::ExclusivePool;
    /// let pool = ExclusivePool::connect("127.0.0.1:5432", "user", "pass", "db", 10).await?;
    /// pool.warm_up(5).await;  // pre-create 5 connections
    /// # Ok(()) }
    /// ```
    pub async fn warm_up(&self, target: usize) {
        self.pool.warm_up(target).await;
    }

    /// Drain the pool — all idle connections are closed.
    pub async fn drain(&self) {
        self.pool.drain().await;
    }
}

/// A typed client checked out from the pool.
///
/// Queries go through the pooled `AsyncConn`. When this is dropped,
/// the connection is returned to the pool for reuse.
pub struct PooledClient {
    guard: PoolGuard<AsyncPoolable>,
}

impl std::fmt::Debug for PooledClient {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("PooledClient").finish_non_exhaustive()
    }
}

impl PooledClient {
    /// Access the underlying `AsyncConn` for direct use.
    pub fn conn(&self) -> &pg_wired::AsyncConn {
        self.guard.conn()
    }

    /// Execute a query via the pooled connection.
    pub async fn query(&self, sql: &str, params: &[&dyn SqlParam]) -> Result<Vec<Row>, TypedError> {
        // Build a temporary Client-like wrapper that uses the pooled AsyncConn.
        crate::query::Client::query_on_conn(self.guard.conn(), sql, params).await
    }

    /// Execute a statement via the pooled connection.
    pub async fn execute(&self, sql: &str, params: &[&dyn SqlParam]) -> Result<u64, TypedError> {
        crate::query::Client::execute_on_conn(self.guard.conn(), sql, params).await
    }

    /// Send a simple text query.
    ///
    /// Simple-query SQL is opaque to the driver, so we conservatively flag
    /// the connection state-mutated. This forces a `DISCARD ALL` on
    /// checkout return, which is the safe default for arbitrary user SQL
    /// (could be `SET`, `LISTEN`, `BEGIN`, etc.).
    pub async fn simple_query(&self, sql: &str) -> Result<(), TypedError> {
        self.guard.conn().mark_state_mutated();
        crate::query::Client::simple_query_on_conn(self.guard.conn(), sql).await
    }

    /// Bulk-load data via COPY FROM STDIN.
    pub async fn copy_in(&self, copy_sql: &str, data: &[u8]) -> Result<u64, TypedError> {
        self.guard.conn().mark_state_mutated();
        self.guard
            .conn()
            .copy_in(copy_sql, data)
            .await
            .map_err(|e| TypedError::from(e).with_sql(copy_sql))
    }

    /// Export data via COPY TO STDOUT.
    pub async fn copy_out(&self, copy_sql: &str) -> Result<Vec<u8>, TypedError> {
        self.guard.conn().mark_state_mutated();
        self.guard
            .conn()
            .copy_out(copy_sql)
            .await
            .map_err(|e| TypedError::from(e).with_sql(copy_sql))
    }

    /// Check if the connection is alive.
    pub fn is_alive(&self) -> bool {
        self.guard.conn().is_alive()
    }

    /// Get a `CancelToken` that can be used to cancel an in-flight query on
    /// this connection from another task.
    pub fn cancel_token(&self) -> pg_wired::CancelToken {
        self.guard.conn().cancel_token()
    }

    /// Acquire a session-level advisory lock. Blocks until the lock is granted.
    ///
    /// # Errors
    ///
    /// Returns `TypedError::Wire` if the lock query fails.
    pub async fn advisory_lock(&self, key: i64) -> Result<(), TypedError> {
        // Session-level locks survive across statements and require
        // DISCARD ALL on return to release. Mark dirty so the pool
        // resets the connection.
        self.guard.conn().mark_state_mutated();
        self.execute("SELECT pg_advisory_lock($1)", &[&key]).await?;
        Ok(())
    }

    /// Try to acquire a session-level advisory lock without blocking. Returns
    /// `true` if the lock was acquired, `false` otherwise.
    ///
    /// # Errors
    ///
    /// Returns `TypedError::Wire` if the lock query fails.
    pub async fn try_advisory_lock(&self, key: i64) -> Result<bool, TypedError> {
        // Session-level lock survives across statements; flag dirty so the
        // pool releases it via DISCARD ALL on return.
        self.guard.conn().mark_state_mutated();
        let rows = self
            .query("SELECT pg_try_advisory_lock($1)", &[&key])
            .await?;
        rows[0].get::<bool>(0)
    }

    /// Release a session-level advisory lock previously acquired with
    /// [`advisory_lock`](Self::advisory_lock). Returns `true` if a lock was
    /// released, `false` if no matching lock was held.
    ///
    /// # Errors
    ///
    /// Returns `TypedError::Wire` if the unlock query fails.
    pub async fn advisory_unlock(&self, key: i64) -> Result<bool, TypedError> {
        let rows = self.query("SELECT pg_advisory_unlock($1)", &[&key]).await?;
        rows[0].get::<bool>(0)
    }

    /// Acquire a transaction-scoped advisory lock. Released automatically at
    /// transaction end.
    ///
    /// # Errors
    ///
    /// Returns `TypedError::Wire` if the lock query fails.
    pub async fn advisory_xact_lock(&self, key: i64) -> Result<(), TypedError> {
        self.execute("SELECT pg_advisory_xact_lock($1)", &[&key])
            .await?;
        Ok(())
    }

    /// Try to acquire a transaction-scoped advisory lock without blocking.
    ///
    /// # Errors
    ///
    /// Returns `TypedError::Wire` if the lock query fails.
    pub async fn try_advisory_xact_lock(&self, key: i64) -> Result<bool, TypedError> {
        let rows = self
            .query("SELECT pg_try_advisory_xact_lock($1)", &[&key])
            .await?;
        rows[0].get::<bool>(0)
    }

    /// Begin a transaction on the pooled connection.
    ///
    /// The returned `PooledTransaction` borrows from this client, so the
    /// connection is pinned to this checkout for the transaction's lifetime
    /// and released back to the pool when this `PooledClient` is dropped.
    ///
    /// # Errors
    ///
    /// Returns `TypedError::Wire` if the `BEGIN` statement fails or the
    /// connection is broken.
    pub async fn begin(&self) -> Result<PooledTransaction<'_>, TypedError> {
        self.simple_query("BEGIN").await?;
        Ok(PooledTransaction {
            client: self,
            done: false,
        })
    }

    /// Begin a transaction with a specific isolation level on the pooled
    /// connection. See [`crate::IsolationLevel`] for the available levels.
    ///
    /// # Errors
    ///
    /// Same cases as [`PooledClient::begin`].
    pub async fn begin_with(
        &self,
        level: crate::IsolationLevel,
    ) -> Result<PooledTransaction<'_>, TypedError> {
        let sql = format!("BEGIN ISOLATION LEVEL {}", level.as_sql());
        self.simple_query(&sql).await?;
        Ok(PooledTransaction {
            client: self,
            done: false,
        })
    }
}

/// A transaction scoped to a pooled connection checkout.
///
/// Mirrors [`crate::Transaction`] but borrows from a [`PooledClient`],
/// so the pool guard is held for the transaction's lifetime.
///
/// **Drop behavior:** if the transaction is dropped without an explicit
/// `commit()` or `rollback()` (e.g. a `?` propagated an error out of the
/// block), the destructor enqueues a `ROLLBACK` on the connection's writer
/// task via a non-blocking send. The writer processes requests in FIFO
/// order, so the rollback runs before the pool's `DISCARD ALL` reset on
/// return, restoring the session to idle and keeping the connection in
/// the pool. No partial work is committed.
///
/// If queueing the rollback fails (writer queue full or task already gone),
/// the connection is marked broken via [`pg_wired::AsyncConn::mark_broken`]
/// and the pool destroys it on return rather than handing back a session
/// in an open or aborted transaction. Either way, the next caller is safe.
///
/// For clarity and to surface rollback errors, prefer calling `rollback()`
/// explicitly on the error path. The drop path is best-effort and does not
/// block on the rollback completing.
pub struct PooledTransaction<'a> {
    client: &'a PooledClient,
    done: bool,
}

impl<'a> std::fmt::Debug for PooledTransaction<'a> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("PooledTransaction")
            .field("done", &self.done)
            .finish_non_exhaustive()
    }
}

impl<'a> PooledTransaction<'a> {
    /// Access the underlying [`PooledClient`] this transaction is running on.
    pub fn client(&self) -> &'a PooledClient {
        self.client
    }

    /// Execute a query within the transaction.
    pub async fn query(&self, sql: &str, params: &[&dyn SqlParam]) -> Result<Vec<Row>, TypedError> {
        self.client.query(sql, params).await
    }

    /// Execute a statement within the transaction. Returns affected row count.
    pub async fn execute(&self, sql: &str, params: &[&dyn SqlParam]) -> Result<u64, TypedError> {
        self.client.execute(sql, params).await
    }

    /// Execute a query with named parameters within the transaction.
    pub async fn query_named(
        &self,
        sql: &str,
        params: &[(&str, &dyn SqlParam)],
    ) -> Result<Vec<Row>, TypedError> {
        let (rewritten, names) = crate::named_params::rewrite(sql);
        let ordered = crate::query::resolve_named_params(&names, params)?;
        self.client.query(&rewritten, &ordered).await
    }

    /// Execute a named-param statement within the transaction. Returns affected row count.
    pub async fn execute_named(
        &self,
        sql: &str,
        params: &[(&str, &dyn SqlParam)],
    ) -> Result<u64, TypedError> {
        let (rewritten, names) = crate::named_params::rewrite(sql);
        let ordered = crate::query::resolve_named_params(&names, params)?;
        self.client.execute(&rewritten, &ordered).await
    }

    /// Commit the transaction.
    pub async fn commit(mut self) -> Result<(), TypedError> {
        self.done = true;
        self.client.simple_query("COMMIT").await
    }

    /// Explicitly roll back the transaction.
    pub async fn rollback(mut self) -> Result<(), TypedError> {
        self.done = true;
        self.client.simple_query("ROLLBACK").await
    }
}

impl<'a> Drop for PooledTransaction<'a> {
    fn drop(&mut self) {
        if !self.done && self.client.is_alive() {
            // Drop is sync, so we can't await a ROLLBACK round-trip. Instead,
            // enqueue a simple-query `ROLLBACK` on the writer task's request
            // channel via `try_send` (no runtime handle required). The writer
            // task processes requests in FIFO order, so the rollback runs
            // before any later request from the same connection — including
            // the pool's `DISCARD ALL` reset — and the connection is restored
            // to idle without being destroyed.
            //
            // If queueing fails (channel full or the writer task is gone),
            // mark the connection broken so the pool discards it on return
            // rather than handing back a session that's stuck in an open or
            // aborted transaction.
            if !self.client.conn().enqueue_rollback() {
                self.client.conn().mark_broken();
                tracing::warn!(
                    "PooledTransaction dropped without commit/rollback; could not queue ROLLBACK, connection marked broken"
                );
            }
        }
    }
}