bsql-core 0.25.0

Runtime support for bsql — compile-time safe SQL for Rust
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
//! Database transactions with commit/rollback.
//!
//! Created via [`Pool::begin()`](crate::pool::Pool::begin). A transaction
//! holds a single connection from the pool for its entire lifetime. Queries
//! executed through the `Executor` trait run within the transaction.
//!
//! # Drop behavior
//!
//! If a `Transaction` is dropped without calling [`commit()`](Transaction::commit)
//! or [`rollback()`](Transaction::rollback), the driver discards the connection
//! from the pool. PostgreSQL auto-rollbacks when the connection closes. A warning
//! is emitted via `log::warn!` to help detect forgotten commits during development.

use std::fmt;

use bsql_driver_postgres::codec::Encode;

use crate::error::{BsqlError, BsqlResult, QueryError};
use crate::executor::OwnedResult;

/// Transaction isolation levels supported by PostgreSQL.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IsolationLevel {
    ReadUncommitted,
    ReadCommitted,
    RepeatableRead,
    Serializable,
}

impl IsolationLevel {
    /// SQL representation for `SET TRANSACTION ISOLATION LEVEL ...`.
    fn as_sql(&self) -> &'static str {
        match self {
            IsolationLevel::ReadUncommitted => "READ UNCOMMITTED",
            IsolationLevel::ReadCommitted => "READ COMMITTED",
            IsolationLevel::RepeatableRead => "REPEATABLE READ",
            IsolationLevel::Serializable => "SERIALIZABLE",
        }
    }
}

impl fmt::Display for IsolationLevel {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.write_str(self.as_sql())
    }
}

/// A database transaction.
///
/// Created by [`Pool::begin()`](crate::pool::Pool::begin). Must be explicitly
/// committed via [`commit()`](Transaction::commit). If dropped without
/// `commit()`, the connection is discarded from the pool and a warning is logged.
///
/// Use `.defer(&mut tx)` on queries to buffer writes, then `tx.commit()` to flush
/// them all in a single pipeline. Use `.execute(&mut tx)` or `.fetch(&mut tx)` for immediate
/// execution within the transaction.
///
/// # Example
///
/// ```rust,ignore
/// use bsql::Pool;
///
/// let pool = Pool::connect("postgres://user:pass@localhost/mydb")?;
/// let tx = pool.begin()?;
///
/// // Buffer writes with .defer() — nothing hits the network yet
/// bsql::query!("INSERT INTO log (msg) VALUES ($msg: &str)")
///     .defer(&mut tx)?;
///
/// // Or execute immediately within the transaction
/// bsql::query!("UPDATE accounts SET balance = 0 WHERE id = $id: i32")
///     .execute(&mut tx)?;
///
/// // commit() flushes all deferred operations, then commits
/// tx.commit()?;
/// ```
pub struct Transaction {
    inner: Option<bsql_driver_postgres::Transaction>,
    /// Set to true when commit() or rollback() is called.
    finished: bool,
}

impl Transaction {
    /// Wrap a driver-level transaction.
    pub(crate) fn from_driver(tx: bsql_driver_postgres::Transaction) -> Self {
        Self {
            inner: Some(tx),
            finished: false,
        }
    }

    /// Return a "transaction already consumed" error.
    fn consumed_error() -> BsqlError {
        BsqlError::Query(QueryError {
            message: "transaction already consumed".into(),
            pg_code: None,
            source: None,
        })
    }

    /// Commit the transaction and return the connection to the pool.
    ///
    /// Consumes `self` — the transaction cannot be used after commit.
    pub async fn commit(mut self) -> BsqlResult<()> {
        self.finished = true;
        let tx = self.inner.take().ok_or_else(Self::consumed_error)?;
        tx.commit().map_err(BsqlError::from)
    }

    /// Explicitly roll back the transaction and return the connection to the pool.
    ///
    /// Consumes `self` — the transaction cannot be used after rollback.
    pub async fn rollback(mut self) -> BsqlResult<()> {
        self.finished = true;
        let tx = self.inner.take().ok_or_else(Self::consumed_error)?;
        tx.rollback().map_err(BsqlError::from)
    }

    /// Create a savepoint within the transaction.
    ///
    /// The `name` must be a valid SQL identifier: ASCII alphanumeric and
    /// underscores only, starting with a letter or underscore. Maximum 63 characters.
    pub async fn savepoint(&mut self, name: &str) -> BsqlResult<()> {
        validate_savepoint_name(name)?;
        let sql = format!("SAVEPOINT {name}");
        let tx = self.inner.as_mut().ok_or_else(Self::consumed_error)?;
        tx.simple_query(&sql).map_err(BsqlError::from_driver_query)
    }

    /// Release (destroy) a savepoint, keeping its effects.
    ///
    /// The `name` must match a previously created savepoint.
    pub async fn release_savepoint(&mut self, name: &str) -> BsqlResult<()> {
        validate_savepoint_name(name)?;
        let sql = format!("RELEASE SAVEPOINT {name}");
        let tx = self.inner.as_mut().ok_or_else(Self::consumed_error)?;
        tx.simple_query(&sql).map_err(BsqlError::from_driver_query)
    }

    /// Roll back to a savepoint, undoing changes made after it was created.
    ///
    /// The savepoint remains valid after this call (can be rolled back to again).
    pub async fn rollback_to(&mut self, name: &str) -> BsqlResult<()> {
        validate_savepoint_name(name)?;
        let sql = format!("ROLLBACK TO SAVEPOINT {name}");
        let tx = self.inner.as_mut().ok_or_else(Self::consumed_error)?;
        tx.simple_query(&sql).map_err(BsqlError::from_driver_query)
    }

    /// Set the isolation level for this transaction.
    ///
    /// Must be called before the first query in the transaction (immediately
    /// after `begin()`). PostgreSQL rejects `SET TRANSACTION` after any
    /// data-modifying statement.
    pub async fn set_isolation(&mut self, level: IsolationLevel) -> BsqlResult<()> {
        let sql = format!("SET TRANSACTION ISOLATION LEVEL {}", level.as_sql());
        let tx = self.inner.as_mut().ok_or_else(Self::consumed_error)?;
        tx.simple_query(&sql).map_err(BsqlError::from_driver_query)
    }

    /// Execute a query within the transaction (used by QueryTarget dispatch).
    pub(crate) fn query_inner(
        &mut self,
        sql: &str,
        sql_hash: u64,
        params: &[&(dyn Encode + Sync)],
    ) -> BsqlResult<OwnedResult> {
        let tx = self.inner.as_mut().ok_or_else(Self::consumed_error)?;
        let result = tx
            .query(sql, sql_hash, params)
            .map_err(BsqlError::from_driver_query)?;
        Ok(OwnedResult::without_arena(result))
    }

    /// Execute without result rows within the transaction (used by QueryTarget dispatch).
    pub(crate) fn execute_inner(
        &mut self,
        sql: &str,
        sql_hash: u64,
        params: &[&(dyn Encode + Sync)],
    ) -> BsqlResult<u64> {
        let tx = self.inner.as_mut().ok_or_else(Self::consumed_error)?;
        tx.execute(sql, sql_hash, params)
            .map_err(BsqlError::from_driver_query)
    }

    /// Execute the same statement N times with different params in one pipeline.
    ///
    /// Sends all N Bind+Execute messages + one Sync. One round-trip for
    /// N operations within the transaction. Returns the affected row count
    /// for each parameter set.
    pub async fn execute_pipeline(
        &mut self,
        sql: &str,
        sql_hash: u64,
        param_sets: &[&[&(dyn Encode + Sync)]],
    ) -> BsqlResult<Vec<u64>> {
        let tx = self.inner.as_mut().ok_or_else(Self::consumed_error)?;
        tx.execute_pipeline(sql, sql_hash, param_sets)
            .map_err(BsqlError::from_driver_query)
    }

    // --- Deferred pipeline API ---

    /// Buffer an execute for deferred pipeline flush.
    ///
    /// The operation is not sent to the server immediately. Instead, the
    /// Bind+Execute message bytes are buffered internally. The buffered
    /// operations are sent as a single pipeline on [`commit()`](Self::commit)
    /// or [`flush_deferred()`](Self::flush_deferred).
    ///
    /// If the statement has not been prepared yet, a single round-trip is
    /// made to prepare it. After that, the Bind+Execute bytes are buffered
    /// with no I/O.
    ///
    /// Any read operation (`query_inner`, `for_each_raw`, `simple_query`, etc.)
    /// automatically flushes deferred operations first to ensure
    /// read-your-writes consistency.
    #[doc(hidden)]
    pub async fn defer_execute(
        &mut self,
        sql: &str,
        sql_hash: u64,
        params: &[&(dyn Encode + Sync)],
    ) -> BsqlResult<()> {
        let tx = self.inner.as_mut().ok_or_else(Self::consumed_error)?;
        tx.defer_execute(sql, sql_hash, params)
            .map_err(BsqlError::from_driver_query)
    }

    /// Flush all deferred operations as a single pipeline.
    ///
    /// Sends all buffered Bind+Execute messages + one Sync in a single TCP write.
    /// Returns the affected row count for each deferred operation.
    #[doc(hidden)]
    pub async fn flush_deferred(&mut self) -> BsqlResult<Vec<u64>> {
        let tx = self.inner.as_mut().ok_or_else(Self::consumed_error)?;
        tx.flush_deferred().map_err(BsqlError::from_driver_query)
    }

    /// Number of operations currently buffered for deferred execution.
    ///
    /// This is a diagnostic method primarily for testing. Most users should
    /// not need to call this -- deferred operations are flushed automatically
    /// on commit or before any read.
    #[doc(hidden)]
    pub fn deferred_count(&self) -> usize {
        match self.inner.as_ref() {
            Some(tx) => tx.deferred_count(),
            None => 0,
        }
    }

    /// Process each row directly from the wire buffer within this transaction.
    ///
    /// Zero arena allocation — the closure receives a `PgDataRow` that reads
    /// columns directly from the DataRow message bytes.
    pub async fn for_each_raw<F>(
        &mut self,
        sql: &str,
        sql_hash: u64,
        params: &[&(dyn Encode + Sync)],
        mut f: F,
    ) -> BsqlResult<()>
    where
        F: FnMut(bsql_driver_postgres::PgDataRow<'_>) -> BsqlResult<()>,
    {
        let tx = self.inner.as_mut().ok_or_else(Self::consumed_error)?;
        let mut user_err: Option<BsqlError> = None;
        let driver_result = tx.for_each(sql, sql_hash, params, |row| match f(row) {
            Ok(()) => Ok(()),
            Err(e) => {
                user_err = Some(e);
                Err(bsql_driver_postgres::DriverError::Protocol(
                    "for_each closure error".into(),
                ))
            }
        });
        if let Some(e) = user_err {
            return Err(e);
        }
        driver_result.map_err(BsqlError::from_driver_query)
    }

    /// Process each DataRow as raw bytes within this transaction.
    ///
    /// Like `for_each_raw` but passes the raw `&[u8]` DataRow payload directly
    /// to the closure — no `PgDataRow` construction, no SmallVec pre-scan.
    #[doc(hidden)]
    pub async fn __for_each_raw_bytes<F>(
        &mut self,
        sql: &str,
        sql_hash: u64,
        params: &[&(dyn Encode + Sync)],
        mut f: F,
    ) -> BsqlResult<()>
    where
        F: FnMut(&[u8]) -> BsqlResult<()>,
    {
        let tx = self.inner.as_mut().ok_or_else(Self::consumed_error)?;
        let mut user_err: Option<BsqlError> = None;
        let driver_result = tx.for_each_raw(sql, sql_hash, params, |data| match f(data) {
            Ok(()) => Ok(()),
            Err(e) => {
                user_err = Some(e);
                Err(bsql_driver_postgres::DriverError::Protocol(
                    "for_each closure error".into(),
                ))
            }
        });
        if let Some(e) = user_err {
            return Err(e);
        }
        driver_result.map_err(BsqlError::from_driver_query)
    }
}

impl fmt::Debug for Transaction {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("Transaction")
            .field("finished", &self.finished)
            .finish()
    }
}

impl Drop for Transaction {
    fn drop(&mut self) {
        if !self.finished {
            // The transaction was dropped without commit() or rollback().
            // The driver-level Transaction::drop discards the connection from the
            // pool — PG server auto-rollbacks when it sees the disconnect.
            // Log a warning to help catch forgotten commits during development.
            log::warn!(
                "bsql: Transaction dropped without commit() or rollback() — \
                 connection discarded from pool. This is safe but wasteful."
            );
        }
    }
}

/// Delegate to shared savepoint name validator.
fn validate_savepoint_name(name: &str) -> BsqlResult<()> {
    crate::util::validate_savepoint_name(name)
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn validate_savepoint_name_valid() {
        assert!(validate_savepoint_name("sp1").is_ok());
        assert!(validate_savepoint_name("_sp").is_ok());
        assert!(validate_savepoint_name("my_savepoint_123").is_ok());
    }

    #[test]
    fn validate_savepoint_name_empty() {
        assert!(validate_savepoint_name("").is_err());
    }

    #[test]
    fn validate_savepoint_name_too_long() {
        let long = "a".repeat(64);
        assert!(validate_savepoint_name(&long).is_err());
    }

    #[test]
    fn validate_savepoint_name_max_length() {
        let max = "a".repeat(63);
        assert!(validate_savepoint_name(&max).is_ok());
    }

    #[test]
    fn validate_savepoint_name_starts_with_digit() {
        assert!(validate_savepoint_name("1sp").is_err());
    }

    #[test]
    fn validate_savepoint_name_starts_with_underscore() {
        assert!(validate_savepoint_name("_sp").is_ok());
    }

    #[test]
    fn validate_savepoint_name_special_chars() {
        assert!(validate_savepoint_name("sp-1").is_err());
        assert!(validate_savepoint_name("sp.1").is_err());
        assert!(validate_savepoint_name("sp 1").is_err());
        assert!(validate_savepoint_name("sp;1").is_err());
        assert!(validate_savepoint_name("sp'1").is_err());
    }

    #[test]
    fn isolation_level_display() {
        assert_eq!(
            IsolationLevel::ReadUncommitted.to_string(),
            "READ UNCOMMITTED"
        );
        assert_eq!(IsolationLevel::ReadCommitted.to_string(), "READ COMMITTED");
        assert_eq!(
            IsolationLevel::RepeatableRead.to_string(),
            "REPEATABLE READ"
        );
        assert_eq!(IsolationLevel::Serializable.to_string(), "SERIALIZABLE");
    }

    // --- IsolationLevel traits ---

    #[test]
    fn isolation_level_clone() {
        let level = IsolationLevel::Serializable;
        let cloned = level;
        assert_eq!(level, cloned);
    }

    #[test]
    fn isolation_level_debug() {
        let level = IsolationLevel::RepeatableRead;
        let dbg = format!("{level:?}");
        assert!(
            dbg.contains("RepeatableRead"),
            "Debug should show variant name: {dbg}"
        );
    }

    #[test]
    fn isolation_level_eq() {
        assert_eq!(IsolationLevel::Serializable, IsolationLevel::Serializable);
        assert_ne!(IsolationLevel::Serializable, IsolationLevel::ReadCommitted);
    }

    // --- Transaction Debug ---

    #[test]
    fn transaction_debug_shows_finished_false() {
        // Transaction cannot be constructed in tests without a driver,
        // but we verify the Debug impl exists at compile time.
        fn _assert_debug<T: std::fmt::Debug>() {}
        _assert_debug::<Transaction>();
    }

    // --- Send + Sync assertions ---

    fn _assert_send<T: Send>() {}

    #[test]
    fn transaction_is_send() {
        _assert_send::<Transaction>();
    }

    #[test]
    fn isolation_level_is_send() {
        _assert_send::<IsolationLevel>();
    }

    // --- IsolationLevel as_sql covers all variants ---

    #[test]
    fn isolation_level_as_sql_all_variants() {
        assert_eq!(IsolationLevel::ReadUncommitted.as_sql(), "READ UNCOMMITTED");
        assert_eq!(IsolationLevel::ReadCommitted.as_sql(), "READ COMMITTED");
        assert_eq!(IsolationLevel::RepeatableRead.as_sql(), "REPEATABLE READ");
        assert_eq!(IsolationLevel::Serializable.as_sql(), "SERIALIZABLE");
    }

    // --- Savepoint name validation edge cases ---

    #[test]
    fn validate_savepoint_name_single_char() {
        assert!(validate_savepoint_name("a").is_ok());
        assert!(validate_savepoint_name("_").is_ok());
    }

    #[test]
    fn validate_savepoint_name_all_digits_after_letter() {
        assert!(validate_savepoint_name("a123456789").is_ok());
    }

    #[test]
    fn validate_savepoint_name_all_underscores() {
        assert!(validate_savepoint_name("___").is_ok());
    }

    #[test]
    fn validate_savepoint_name_unicode_rejected() {
        assert!(
            validate_savepoint_name("sp_\u{00e9}").is_err(),
            "unicode chars should be rejected"
        );
    }

    #[test]
    fn validate_savepoint_name_sql_injection_rejected() {
        assert!(validate_savepoint_name("sp; DROP TABLE").is_err());
        assert!(validate_savepoint_name("sp'--").is_err());
        assert!(validate_savepoint_name("sp\"test").is_err());
    }

    // --- Transaction consumed error message ---

    #[test]
    fn consumed_error_message_is_descriptive() {
        let e = Transaction::consumed_error();
        let display = e.to_string();
        assert!(
            display.contains("transaction already consumed"),
            "consumed error should be descriptive: {display}"
        );
    }

    // --- IsolationLevel as_sql is idempotent ---

    #[test]
    fn isolation_level_as_sql_is_idempotent() {
        let level = IsolationLevel::Serializable;
        assert_eq!(level.as_sql(), level.as_sql());
        assert_eq!(level.as_sql(), "SERIALIZABLE");
    }

    // --- IsolationLevel Display matches as_sql ---

    #[test]
    fn isolation_level_display_matches_as_sql() {
        for level in [
            IsolationLevel::ReadUncommitted,
            IsolationLevel::ReadCommitted,
            IsolationLevel::RepeatableRead,
            IsolationLevel::Serializable,
        ] {
            assert_eq!(level.to_string(), level.as_sql());
        }
    }

    // --- Transaction from_driver sets finished to false ---

    // Cannot construct without driver, but verify at compile-time level
    #[test]
    fn transaction_from_driver_compiles() {
        fn _check(_tx: bsql_driver_postgres::Transaction) -> Transaction {
            Transaction::from_driver(_tx)
        }
    }

    // --- Savepoint name: null bytes rejected ---

    #[test]
    fn validate_savepoint_name_null_byte_rejected() {
        assert!(
            validate_savepoint_name("sp\0name").is_err(),
            "null byte in savepoint name should be rejected"
        );
    }

    // --- Savepoint name: exactly 63 chars OK, 64 chars error ---

    #[test]
    fn validate_savepoint_name_boundary_63_and_64() {
        let ok_63 = format!("a{}", "b".repeat(62));
        assert!(validate_savepoint_name(&ok_63).is_ok());
        let err_64 = format!("a{}", "b".repeat(63));
        assert!(validate_savepoint_name(&err_64).is_err());
    }

    // --- consumed_error produces Query variant ---

    #[test]
    fn consumed_error_is_query_variant() {
        let e = Transaction::consumed_error();
        assert!(
            matches!(e, BsqlError::Query(_)),
            "consumed_error should be Query variant"
        );
    }
}