duckdb/
transaction.rs

1use crate::{Connection, Result};
2use std::ops::Deref;
3
4/// Options for transaction behavior. See [BEGIN
5/// TRANSACTION](http://www.sqlite.org/lang_transaction.html) for details.
6#[derive(Copy, Clone)]
7#[non_exhaustive]
8pub enum TransactionBehavior {
9    /// DEFERRED means that the transaction does not actually start until the
10    /// database is first accessed.
11    Deferred,
12    /// IMMEDIATE cause the database connection to start a new write
13    /// immediately, without waiting for a writes statement.
14    Immediate,
15    /// EXCLUSIVE prevents other database connections from reading the database
16    /// while the transaction is underway.
17    Exclusive,
18}
19
20/// Options for how a Transaction or Savepoint should behave when it is dropped.
21#[derive(Copy, Clone, Debug, PartialEq, Eq)]
22#[non_exhaustive]
23pub enum DropBehavior {
24    /// Roll back the changes. This is the default.
25    Rollback,
26
27    /// Commit the changes.
28    Commit,
29
30    /// Do not commit or roll back changes - this will leave the transaction or
31    /// savepoint open, so should be used with care.
32    Ignore,
33
34    /// Panic. Used to enforce intentional behavior during development.
35    Panic,
36}
37
38/// Represents a transaction on a database connection.
39///
40/// ## Note
41///
42/// Transactions will roll back by default. Use `commit` method to explicitly
43/// commit the transaction, or use `set_drop_behavior` to change what happens
44/// when the transaction is dropped.
45///
46/// ## Example
47///
48/// ```rust,no_run
49/// # use duckdb::{Connection, Result};
50/// # fn do_queries_part_1(_conn: &Connection) -> Result<()> { Ok(()) }
51/// # fn do_queries_part_2(_conn: &Connection) -> Result<()> { Ok(()) }
52/// fn perform_queries(conn: &mut Connection) -> Result<()> {
53///     let tx = conn.transaction()?;
54///
55///     do_queries_part_1(&tx)?; // tx causes rollback if this fails
56///     do_queries_part_2(&tx)?; // tx causes rollback if this fails
57///
58///     tx.commit()
59/// }
60/// ```
61#[derive(Debug)]
62pub struct Transaction<'conn> {
63    conn: &'conn Connection,
64    drop_behavior: DropBehavior,
65}
66
67/// Represents a savepoint on a database connection.
68///
69/// ## Note
70///
71/// Savepoints will roll back by default. Use `commit` method to explicitly
72/// commit the savepoint, or use `set_drop_behavior` to change what happens
73/// when the savepoint is dropped.
74///
75/// ## Example
76///
77/// ```rust,no_run
78/// # use duckdb::{Connection, Result};
79/// # fn do_queries_part_1(_conn: &Connection) -> Result<()> { Ok(()) }
80/// # fn do_queries_part_2(_conn: &Connection) -> Result<()> { Ok(()) }
81/// fn perform_queries(conn: &mut Connection) -> Result<()> {
82///     let sp = conn.savepoint()?;
83///
84///     do_queries_part_1(&sp)?; // sp causes rollback if this fails
85///     do_queries_part_2(&sp)?; // sp causes rollback if this fails
86///
87///     sp.commit()
88/// }
89/// ```
90#[derive(Debug)]
91pub struct Savepoint<'conn> {
92    conn: &'conn Connection,
93    name: String,
94    depth: u32,
95    drop_behavior: DropBehavior,
96    committed: bool,
97}
98
99impl Transaction<'_> {
100    /// Begin a new transaction. Cannot be nested; see `savepoint` for nested
101    /// transactions.
102    ///
103    /// Even though we don't mutate the connection, we take a `&mut Connection`
104    /// so as to prevent nested transactions on the same connection. For cases
105    /// where this is unacceptable, [`Transaction::new_unchecked`] is available.
106    #[inline]
107    pub fn new(conn: &mut Connection) -> Result<Transaction<'_>> {
108        Self::new_unchecked(conn, TransactionBehavior::Deferred)
109    }
110
111    /// Begin a new transaction, failing if a transaction is open.
112    ///
113    /// If a transaction is already open, this will return an error. Where
114    /// possible, [`Transaction::new`] should be preferred, as it provides a
115    /// compile-time guarantee that transactions are not nested.
116    #[inline]
117    fn new_unchecked(conn: &Connection, _: TransactionBehavior) -> Result<Transaction<'_>> {
118        // TODO(wangfenjin): not supported
119        // let query = match behavior {
120        //     TransactionBehavior::Deferred => "BEGIN DEFERRED",
121        //     TransactionBehavior::Immediate => "BEGIN IMMEDIATE",
122        //     TransactionBehavior::Exclusive => "BEGIN EXCLUSIVE",
123        // };
124        let query = "BEGIN Transaction";
125        conn.execute_batch(query).map(move |_| Transaction {
126            conn,
127            drop_behavior: DropBehavior::Rollback,
128        })
129    }
130
131    /// Starts a new [savepoint](http://www.sqlite.org/lang_savepoint.html), allowing nested
132    /// transactions.
133    ///
134    /// ## Note
135    ///
136    /// Just like outer level transactions, savepoint transactions rollback by
137    /// default.
138    ///
139    /// ## Example
140    ///
141    /// ```rust,no_run
142    /// # use duckdb::{Connection, Result};
143    /// # fn perform_queries_part_1_succeeds(_conn: &Connection) -> bool { true }
144    /// fn perform_queries(conn: &mut Connection) -> Result<()> {
145    ///     let mut tx = conn.transaction()?;
146    ///
147    ///     {
148    ///         let sp = tx.savepoint()?;
149    ///         if perform_queries_part_1_succeeds(&sp) {
150    ///             sp.commit()?;
151    ///         }
152    ///         // otherwise, sp will rollback
153    ///     }
154    ///
155    ///     tx.commit()
156    /// }
157    /// ```
158    #[inline]
159    pub fn savepoint(&mut self) -> Result<Savepoint<'_>> {
160        Savepoint::with_depth(self.conn, 1)
161    }
162
163    /// Create a new savepoint with a custom savepoint name. See `savepoint()`.
164    #[inline]
165    pub fn savepoint_with_name<T: Into<String>>(&mut self, name: T) -> Result<Savepoint<'_>> {
166        Savepoint::with_depth_and_name(self.conn, 1, name)
167    }
168
169    /// Get the current setting for what happens to the transaction when it is
170    /// dropped.
171    #[inline]
172    pub fn drop_behavior(&self) -> DropBehavior {
173        self.drop_behavior
174    }
175
176    /// Configure the transaction to perform the specified action when it is
177    /// dropped.
178    #[inline]
179    pub fn set_drop_behavior(&mut self, drop_behavior: DropBehavior) {
180        self.drop_behavior = drop_behavior
181    }
182
183    /// A convenience method which consumes and commits a transaction.
184    #[inline]
185    pub fn commit(mut self) -> Result<()> {
186        self.commit_()
187    }
188
189    #[inline]
190    fn commit_(&mut self) -> Result<()> {
191        self.conn.execute_batch("COMMIT")?;
192        Ok(())
193    }
194
195    /// A convenience method which consumes and rolls back a transaction.
196    #[inline]
197    pub fn rollback(mut self) -> Result<()> {
198        self.rollback_()
199    }
200
201    #[inline]
202    fn rollback_(&mut self) -> Result<()> {
203        self.conn.execute_batch("ROLLBACK")?;
204        Ok(())
205    }
206
207    /// Consumes the transaction, committing or rolling back according to the
208    /// current setting (see `drop_behavior`).
209    ///
210    /// Functionally equivalent to the `Drop` implementation, but allows
211    /// callers to see any errors that occur.
212    #[inline]
213    pub fn finish(mut self) -> Result<()> {
214        self.finish_()
215    }
216
217    #[inline]
218    fn finish_(&mut self) -> Result<()> {
219        // if self.conn.is_autocommit() {
220        //     println!("is autocommit");
221        //     return Ok(());
222        // }
223        match self.drop_behavior() {
224            DropBehavior::Commit => self.commit_().or_else(|_| self.rollback_()),
225            DropBehavior::Rollback => self.rollback_(),
226            DropBehavior::Ignore => Ok(()),
227            DropBehavior::Panic => panic!("Transaction dropped unexpectedly."),
228        }
229    }
230}
231
232impl Deref for Transaction<'_> {
233    type Target = Connection;
234
235    #[inline]
236    fn deref(&self) -> &Connection {
237        self.conn
238    }
239}
240
241#[allow(unused_must_use)]
242impl Drop for Transaction<'_> {
243    #[inline]
244    fn drop(&mut self) {
245        self.finish_();
246    }
247}
248
249impl Savepoint<'_> {
250    #[inline]
251    fn with_depth_and_name<T: Into<String>>(conn: &Connection, depth: u32, name: T) -> Result<Savepoint<'_>> {
252        let name = name.into();
253        conn.execute_batch(&format!("SAVEPOINT {name}")).map(|_| Savepoint {
254            conn,
255            name,
256            depth,
257            drop_behavior: DropBehavior::Rollback,
258            committed: false,
259        })
260    }
261
262    #[inline]
263    fn with_depth(conn: &Connection, depth: u32) -> Result<Savepoint<'_>> {
264        let name = format!("_duckdb_sp_{depth}");
265        Savepoint::with_depth_and_name(conn, depth, name)
266    }
267
268    /// Begin a new savepoint. Can be nested.
269    #[inline]
270    pub fn new(conn: &mut Connection) -> Result<Savepoint<'_>> {
271        Savepoint::with_depth(conn, 0)
272    }
273
274    /// Begin a new savepoint with a user-provided savepoint name.
275    #[inline]
276    pub fn with_name<T: Into<String>>(conn: &mut Connection, name: T) -> Result<Savepoint<'_>> {
277        Savepoint::with_depth_and_name(conn, 0, name)
278    }
279
280    /// Begin a nested savepoint.
281    #[inline]
282    pub fn savepoint(&mut self) -> Result<Savepoint<'_>> {
283        Savepoint::with_depth(self.conn, self.depth + 1)
284    }
285
286    /// Begin a nested savepoint with a user-provided savepoint name.
287    #[inline]
288    pub fn savepoint_with_name<T: Into<String>>(&mut self, name: T) -> Result<Savepoint<'_>> {
289        Savepoint::with_depth_and_name(self.conn, self.depth + 1, name)
290    }
291
292    /// Get the current setting for what happens to the savepoint when it is
293    /// dropped.
294    #[inline]
295    pub fn drop_behavior(&self) -> DropBehavior {
296        self.drop_behavior
297    }
298
299    /// Configure the savepoint to perform the specified action when it is
300    /// dropped.
301    #[inline]
302    pub fn set_drop_behavior(&mut self, drop_behavior: DropBehavior) {
303        self.drop_behavior = drop_behavior
304    }
305
306    /// A convenience method which consumes and commits a savepoint.
307    #[inline]
308    pub fn commit(mut self) -> Result<()> {
309        self.commit_()
310    }
311
312    #[inline]
313    fn commit_(&mut self) -> Result<()> {
314        self.conn.execute_batch(&format!("RELEASE {}", self.name))?;
315        self.committed = true;
316        Ok(())
317    }
318
319    /// A convenience method which rolls back a savepoint.
320    ///
321    /// ## Note
322    ///
323    /// Unlike `Transaction`s, savepoints remain active after they have been
324    /// rolled back, and can be rolled back again or committed.
325    #[inline]
326    pub fn rollback(&mut self) -> Result<()> {
327        self.conn.execute_batch(&format!("ROLLBACK TO {}", self.name))
328    }
329
330    /// Consumes the savepoint, committing or rolling back according to the
331    /// current setting (see `drop_behavior`).
332    ///
333    /// Functionally equivalent to the `Drop` implementation, but allows
334    /// callers to see any errors that occur.
335    #[inline]
336    pub fn finish(mut self) -> Result<()> {
337        self.finish_()
338    }
339
340    #[inline]
341    fn finish_(&mut self) -> Result<()> {
342        if self.committed {
343            return Ok(());
344        }
345        match self.drop_behavior() {
346            DropBehavior::Commit => self.commit_().or_else(|_| self.rollback()),
347            DropBehavior::Rollback => self.rollback(),
348            DropBehavior::Ignore => Ok(()),
349            DropBehavior::Panic => panic!("Savepoint dropped unexpectedly."),
350        }
351    }
352}
353
354impl Deref for Savepoint<'_> {
355    type Target = Connection;
356
357    #[inline]
358    fn deref(&self) -> &Connection {
359        self.conn
360    }
361}
362
363#[allow(unused_must_use)]
364impl Drop for Savepoint<'_> {
365    #[inline]
366    fn drop(&mut self) {
367        self.finish_();
368    }
369}
370
371impl Connection {
372    /// Begin a new transaction with the default behavior (DEFERRED).
373    ///
374    /// The transaction defaults to rolling back when it is dropped. If you
375    /// want the transaction to commit, you must call
376    /// [`commit`](Transaction::commit) or [`set_drop_behavior(DropBehavior:
377    /// :Commit)`](Transaction::set_drop_behavior).
378    ///
379    /// ## Example
380    ///
381    /// ```rust,no_run
382    /// # use duckdb::{Connection, Result};
383    /// # fn do_queries_part_1(_conn: &Connection) -> Result<()> { Ok(()) }
384    /// # fn do_queries_part_2(_conn: &Connection) -> Result<()> { Ok(()) }
385    /// fn perform_queries(conn: &mut Connection) -> Result<()> {
386    ///     let tx = conn.transaction()?;
387    ///
388    ///     do_queries_part_1(&tx)?; // tx causes rollback if this fails
389    ///     do_queries_part_2(&tx)?; // tx causes rollback if this fails
390    ///
391    ///     tx.commit()
392    /// }
393    /// ```
394    ///
395    /// # Failure
396    ///
397    /// Will return `Err` if the underlying DuckDB call fails.
398    #[inline]
399    pub fn transaction(&mut self) -> Result<Transaction<'_>> {
400        Transaction::new(self)
401    }
402
403    /// Begin a new transaction with a specified behavior.
404    ///
405    /// See [`transaction`](Connection::transaction).
406    ///
407    /// # Failure
408    ///
409    /// Will return `Err` if the underlying DuckDB call fails.
410    #[inline]
411    #[allow(dead_code)]
412    fn transaction_with_behavior(&mut self, behavior: TransactionBehavior) -> Result<Transaction<'_>> {
413        Transaction::new_unchecked(self, behavior)
414    }
415
416    /// Begin a new transaction with the default behavior (DEFERRED).
417    ///
418    /// Attempt to open a nested transaction will result in a DuckDB error.
419    /// `Connection::transaction` prevents this at compile time by taking `&mut
420    /// self`, but `Connection::unchecked_transaction()` may be used to defer
421    /// the checking until runtime.
422    ///
423    /// See [`Connection::transaction`] and [`Transaction::new_unchecked`]
424    /// (which can be used if the default transaction behavior is undesirable).
425    ///
426    /// ## Example
427    ///
428    /// ```rust,no_run
429    /// # use duckdb::{Connection, Result};
430    /// # use std::rc::Rc;
431    /// # fn do_queries_part_1(_conn: &Connection) -> Result<()> { Ok(()) }
432    /// # fn do_queries_part_2(_conn: &Connection) -> Result<()> { Ok(()) }
433    /// fn perform_queries(conn: Rc<Connection>) -> Result<()> {
434    ///     let tx = conn.unchecked_transaction()?;
435    ///
436    ///     do_queries_part_1(&tx)?; // tx causes rollback if this fails
437    ///     do_queries_part_2(&tx)?; // tx causes rollback if this fails
438    ///
439    ///     tx.commit()
440    /// }
441    /// ```
442    ///
443    /// # Failure
444    ///
445    /// Will return `Err` if the underlying DuckDB call fails. The specific
446    /// error returned if transactions are nested is currently unspecified.
447    pub fn unchecked_transaction(&self) -> Result<Transaction<'_>> {
448        Transaction::new_unchecked(self, TransactionBehavior::Deferred)
449    }
450
451    /// Begin a new savepoint with the default behavior (DEFERRED).
452    ///
453    /// The savepoint defaults to rolling back when it is dropped. If you want
454    /// the savepoint to commit, you must call [`commit`](Savepoint::commit) or
455    /// [`set_drop_behavior(DropBehavior::Commit)`](Savepoint::
456    /// set_drop_behavior).
457    ///
458    /// ## Example
459    ///
460    /// ```rust,no_run
461    /// # use duckdb::{Connection, Result};
462    /// # fn do_queries_part_1(_conn: &Connection) -> Result<()> { Ok(()) }
463    /// # fn do_queries_part_2(_conn: &Connection) -> Result<()> { Ok(()) }
464    /// fn perform_queries(conn: &mut Connection) -> Result<()> {
465    ///     let sp = conn.savepoint()?;
466    ///
467    ///     do_queries_part_1(&sp)?; // sp causes rollback if this fails
468    ///     do_queries_part_2(&sp)?; // sp causes rollback if this fails
469    ///
470    ///     sp.commit()
471    /// }
472    /// ```
473    ///
474    /// # Failure
475    ///
476    /// Will return `Err` if the underlying DuckDB call fails.
477    #[inline]
478    pub fn savepoint(&mut self) -> Result<Savepoint<'_>> {
479        Savepoint::new(self)
480    }
481
482    /// Begin a new savepoint with a specified name.
483    ///
484    /// See [`savepoint`](Connection::savepoint).
485    ///
486    /// # Failure
487    ///
488    /// Will return `Err` if the underlying DuckDB call fails.
489    #[inline]
490    pub fn savepoint_with_name<T: Into<String>>(&mut self, name: T) -> Result<Savepoint<'_>> {
491        Savepoint::with_name(self, name)
492    }
493}
494
495#[cfg(test)]
496mod test {
497    use super::DropBehavior;
498    use crate::{Connection, Result};
499
500    fn checked_no_autocommit_memory_handle() -> Result<Connection> {
501        let db = Connection::open_in_memory()?;
502        db.execute_batch(
503            r"
504            CREATE TABLE foo (x INTEGER);
505            -- SET AUTOCOMMIT TO OFF;
506        ",
507        )?;
508        Ok(db)
509    }
510
511    #[test]
512    fn test_drop() -> Result<()> {
513        let mut db = checked_no_autocommit_memory_handle()?;
514        {
515            let tx = db.transaction()?;
516            assert!(tx.execute_batch("INSERT INTO foo VALUES(1)").is_ok());
517            // default: rollback
518        }
519        {
520            let mut tx = db.transaction()?;
521            tx.execute_batch("INSERT INTO foo VALUES(2)")?;
522            tx.set_drop_behavior(DropBehavior::Commit);
523        }
524        {
525            let tx = db.transaction()?;
526            assert_eq!(
527                2i32,
528                tx.query_row::<i32, _, _>("SELECT SUM(x) FROM foo", [], |r| r.get(0))?
529            );
530        }
531        Ok(())
532    }
533
534    #[test]
535    fn test_unchecked_nesting() -> Result<()> {
536        let db = checked_no_autocommit_memory_handle()?;
537
538        {
539            db.unchecked_transaction()?;
540            // default: rollback
541        }
542        {
543            let tx = db.unchecked_transaction()?;
544            tx.execute_batch("INSERT INTO foo VALUES(1)")?;
545            // Ensure this doesn't interfere with ongoing transaction
546            // let e = tx.unchecked_transaction().unwrap_err();
547            // assert_nested_tx_error(e);
548            tx.execute_batch("INSERT INTO foo VALUES(1)")?;
549            tx.commit()?;
550        }
551
552        assert_eq!(
553            2i32,
554            db.query_row::<i32, _, _>("SELECT SUM(x) FROM foo", [], |r| r.get(0))?
555        );
556        Ok(())
557    }
558
559    #[test]
560    fn test_explicit_rollback_commit() -> Result<()> {
561        let mut db = checked_no_autocommit_memory_handle()?;
562        {
563            let tx = db.transaction()?;
564            tx.execute_batch("INSERT INTO foo VALUES(1)")?;
565            tx.rollback()?;
566        }
567        {
568            let tx = db.transaction()?;
569            tx.execute_batch("INSERT INTO foo VALUES(4)")?;
570            tx.commit()?;
571        }
572        {
573            let tx = db.transaction()?;
574            assert_eq!(
575                4i32,
576                tx.query_row::<i32, _, _>("SELECT SUM(x) FROM foo", [], |r| r.get(0))?
577            );
578        }
579        Ok(())
580    }
581
582    #[test]
583    #[ignore = "not supported"]
584    fn test_savepoint() -> Result<()> {
585        let mut db = checked_no_autocommit_memory_handle()?;
586        {
587            let mut tx = db.transaction()?;
588            tx.execute_batch("INSERT INTO foo VALUES(1)")?;
589            assert_current_sum(1, &tx)?;
590            tx.set_drop_behavior(DropBehavior::Commit);
591            {
592                let mut sp1 = tx.savepoint()?;
593                sp1.execute_batch("INSERT INTO foo VALUES(2)")?;
594                assert_current_sum(3, &sp1)?;
595                // will rollback sp1
596                {
597                    let mut sp2 = sp1.savepoint()?;
598                    sp2.execute_batch("INSERT INTO foo VALUES(4)")?;
599                    assert_current_sum(7, &sp2)?;
600                    // will rollback sp2
601                    {
602                        let sp3 = sp2.savepoint()?;
603                        sp3.execute_batch("INSERT INTO foo VALUES(8)")?;
604                        assert_current_sum(15, &sp3)?;
605                        sp3.commit()?;
606                        // committed sp3, but will be erased by sp2 rollback
607                    }
608                    assert_current_sum(15, &sp2)?;
609                }
610                assert_current_sum(3, &sp1)?;
611            }
612            assert_current_sum(1, &tx)?;
613        }
614        assert_current_sum(1, &db)?;
615        Ok(())
616    }
617
618    #[test]
619    #[ignore = "not supported"]
620    fn test_ignore_drop_behavior() -> Result<()> {
621        let mut db = checked_no_autocommit_memory_handle()?;
622
623        let mut tx = db.transaction()?;
624        {
625            let mut sp1 = tx.savepoint()?;
626            insert(1, &sp1)?;
627            sp1.rollback()?;
628            insert(2, &sp1)?;
629            {
630                let mut sp2 = sp1.savepoint()?;
631                sp2.set_drop_behavior(DropBehavior::Ignore);
632                insert(4, &sp2)?;
633            }
634            assert_current_sum(6, &sp1)?;
635            sp1.commit()?;
636        }
637        assert_current_sum(6, &tx)?;
638        Ok(())
639    }
640
641    #[test]
642    #[ignore = "not supported"]
643    fn test_savepoint_names() -> Result<()> {
644        let mut db = checked_no_autocommit_memory_handle()?;
645
646        {
647            let mut sp1 = db.savepoint_with_name("my_sp")?;
648            insert(1, &sp1)?;
649            assert_current_sum(1, &sp1)?;
650            {
651                let mut sp2 = sp1.savepoint_with_name("my_sp")?;
652                sp2.set_drop_behavior(DropBehavior::Commit);
653                insert(2, &sp2)?;
654                assert_current_sum(3, &sp2)?;
655                sp2.rollback()?;
656                assert_current_sum(1, &sp2)?;
657                insert(4, &sp2)?;
658            }
659            assert_current_sum(5, &sp1)?;
660            sp1.rollback()?;
661            {
662                let mut sp2 = sp1.savepoint_with_name("my_sp")?;
663                sp2.set_drop_behavior(DropBehavior::Ignore);
664                insert(8, &sp2)?;
665            }
666            assert_current_sum(8, &sp1)?;
667            sp1.commit()?;
668        }
669        assert_current_sum(8, &db)?;
670        Ok(())
671    }
672
673    #[test]
674    fn test_rc() -> Result<()> {
675        use std::rc::Rc;
676        let mut conn = Connection::open_in_memory()?;
677        let rc_txn = Rc::new(conn.transaction()?);
678
679        // This will compile only if Transaction is Debug
680        Rc::try_unwrap(rc_txn).unwrap();
681        Ok(())
682    }
683
684    fn insert(x: i32, conn: &Connection) -> Result<usize> {
685        conn.execute("INSERT INTO foo VALUES(?)", [x])
686    }
687
688    fn assert_current_sum(x: i32, conn: &Connection) -> Result<()> {
689        let i = conn.query_row::<i32, _, _>("SELECT SUM(x) FROM foo", [], |r| r.get(0))?;
690        assert_eq!(x, i);
691        Ok(())
692    }
693}