duckdb/transaction.rs
1use crate::{Connection, Result};
2use std::ops::Deref;
3
4/// Options for transaction behavior. See [BEGIN
5/// TRANSACTION](http://www.sqlite.org/lang_transaction.html) for details.
6#[derive(Copy, Clone)]
7#[non_exhaustive]
8pub enum TransactionBehavior {
9 /// DEFERRED means that the transaction does not actually start until the
10 /// database is first accessed.
11 Deferred,
12 /// IMMEDIATE cause the database connection to start a new write
13 /// immediately, without waiting for a writes statement.
14 Immediate,
15 /// EXCLUSIVE prevents other database connections from reading the database
16 /// while the transaction is underway.
17 Exclusive,
18}
19
20/// Options for how a Transaction or Savepoint should behave when it is dropped.
21#[derive(Copy, Clone, Debug, PartialEq, Eq)]
22#[non_exhaustive]
23pub enum DropBehavior {
24 /// Roll back the changes. This is the default.
25 Rollback,
26
27 /// Commit the changes.
28 Commit,
29
30 /// Do not commit or roll back changes - this will leave the transaction or
31 /// savepoint open, so should be used with care.
32 Ignore,
33
34 /// Panic. Used to enforce intentional behavior during development.
35 Panic,
36}
37
38/// Represents a transaction on a database connection.
39///
40/// ## Note
41///
42/// Transactions will roll back by default. Use `commit` method to explicitly
43/// commit the transaction, or use `set_drop_behavior` to change what happens
44/// when the transaction is dropped.
45///
46/// ## Example
47///
48/// ```rust,no_run
49/// # use duckdb::{Connection, Result};
50/// # fn do_queries_part_1(_conn: &Connection) -> Result<()> { Ok(()) }
51/// # fn do_queries_part_2(_conn: &Connection) -> Result<()> { Ok(()) }
52/// fn perform_queries(conn: &mut Connection) -> Result<()> {
53/// let tx = conn.transaction()?;
54///
55/// do_queries_part_1(&tx)?; // tx causes rollback if this fails
56/// do_queries_part_2(&tx)?; // tx causes rollback if this fails
57///
58/// tx.commit()
59/// }
60/// ```
61#[derive(Debug)]
62pub struct Transaction<'conn> {
63 conn: &'conn Connection,
64 drop_behavior: DropBehavior,
65}
66
67/// Represents a savepoint on a database connection.
68///
69/// ## Note
70///
71/// Savepoints will roll back by default. Use `commit` method to explicitly
72/// commit the savepoint, or use `set_drop_behavior` to change what happens
73/// when the savepoint is dropped.
74///
75/// ## Example
76///
77/// ```rust,no_run
78/// # use duckdb::{Connection, Result};
79/// # fn do_queries_part_1(_conn: &Connection) -> Result<()> { Ok(()) }
80/// # fn do_queries_part_2(_conn: &Connection) -> Result<()> { Ok(()) }
81/// fn perform_queries(conn: &mut Connection) -> Result<()> {
82/// let sp = conn.savepoint()?;
83///
84/// do_queries_part_1(&sp)?; // sp causes rollback if this fails
85/// do_queries_part_2(&sp)?; // sp causes rollback if this fails
86///
87/// sp.commit()
88/// }
89/// ```
90#[derive(Debug)]
91pub struct Savepoint<'conn> {
92 conn: &'conn Connection,
93 name: String,
94 depth: u32,
95 drop_behavior: DropBehavior,
96 committed: bool,
97}
98
99impl Transaction<'_> {
100 /// Begin a new transaction. Cannot be nested; see `savepoint` for nested
101 /// transactions.
102 ///
103 /// Even though we don't mutate the connection, we take a `&mut Connection`
104 /// so as to prevent nested transactions on the same connection. For cases
105 /// where this is unacceptable, [`Transaction::new_unchecked`] is available.
106 #[inline]
107 pub fn new(conn: &mut Connection) -> Result<Transaction<'_>> {
108 Self::new_unchecked(conn, TransactionBehavior::Deferred)
109 }
110
111 /// Begin a new transaction, failing if a transaction is open.
112 ///
113 /// If a transaction is already open, this will return an error. Where
114 /// possible, [`Transaction::new`] should be preferred, as it provides a
115 /// compile-time guarantee that transactions are not nested.
116 #[inline]
117 fn new_unchecked(conn: &Connection, _: TransactionBehavior) -> Result<Transaction<'_>> {
118 // TODO(wangfenjin): not supported
119 // let query = match behavior {
120 // TransactionBehavior::Deferred => "BEGIN DEFERRED",
121 // TransactionBehavior::Immediate => "BEGIN IMMEDIATE",
122 // TransactionBehavior::Exclusive => "BEGIN EXCLUSIVE",
123 // };
124 let query = "BEGIN Transaction";
125 conn.execute_batch(query).map(move |_| Transaction {
126 conn,
127 drop_behavior: DropBehavior::Rollback,
128 })
129 }
130
131 /// Starts a new [savepoint](http://www.sqlite.org/lang_savepoint.html), allowing nested
132 /// transactions.
133 ///
134 /// ## Note
135 ///
136 /// Just like outer level transactions, savepoint transactions rollback by
137 /// default.
138 ///
139 /// ## Example
140 ///
141 /// ```rust,no_run
142 /// # use duckdb::{Connection, Result};
143 /// # fn perform_queries_part_1_succeeds(_conn: &Connection) -> bool { true }
144 /// fn perform_queries(conn: &mut Connection) -> Result<()> {
145 /// let mut tx = conn.transaction()?;
146 ///
147 /// {
148 /// let sp = tx.savepoint()?;
149 /// if perform_queries_part_1_succeeds(&sp) {
150 /// sp.commit()?;
151 /// }
152 /// // otherwise, sp will rollback
153 /// }
154 ///
155 /// tx.commit()
156 /// }
157 /// ```
158 #[inline]
159 pub fn savepoint(&mut self) -> Result<Savepoint<'_>> {
160 Savepoint::with_depth(self.conn, 1)
161 }
162
163 /// Create a new savepoint with a custom savepoint name. See `savepoint()`.
164 #[inline]
165 pub fn savepoint_with_name<T: Into<String>>(&mut self, name: T) -> Result<Savepoint<'_>> {
166 Savepoint::with_depth_and_name(self.conn, 1, name)
167 }
168
169 /// Get the current setting for what happens to the transaction when it is
170 /// dropped.
171 #[inline]
172 pub fn drop_behavior(&self) -> DropBehavior {
173 self.drop_behavior
174 }
175
176 /// Configure the transaction to perform the specified action when it is
177 /// dropped.
178 #[inline]
179 pub fn set_drop_behavior(&mut self, drop_behavior: DropBehavior) {
180 self.drop_behavior = drop_behavior
181 }
182
183 /// A convenience method which consumes and commits a transaction.
184 #[inline]
185 pub fn commit(mut self) -> Result<()> {
186 self.commit_()
187 }
188
189 #[inline]
190 fn commit_(&mut self) -> Result<()> {
191 self.conn.execute_batch("COMMIT")?;
192 Ok(())
193 }
194
195 /// A convenience method which consumes and rolls back a transaction.
196 #[inline]
197 pub fn rollback(mut self) -> Result<()> {
198 self.rollback_()
199 }
200
201 #[inline]
202 fn rollback_(&mut self) -> Result<()> {
203 self.conn.execute_batch("ROLLBACK")?;
204 Ok(())
205 }
206
207 /// Consumes the transaction, committing or rolling back according to the
208 /// current setting (see `drop_behavior`).
209 ///
210 /// Functionally equivalent to the `Drop` implementation, but allows
211 /// callers to see any errors that occur.
212 #[inline]
213 pub fn finish(mut self) -> Result<()> {
214 self.finish_()
215 }
216
217 #[inline]
218 fn finish_(&mut self) -> Result<()> {
219 // if self.conn.is_autocommit() {
220 // println!("is autocommit");
221 // return Ok(());
222 // }
223 match self.drop_behavior() {
224 DropBehavior::Commit => self.commit_().or_else(|_| self.rollback_()),
225 DropBehavior::Rollback => self.rollback_(),
226 DropBehavior::Ignore => Ok(()),
227 DropBehavior::Panic => panic!("Transaction dropped unexpectedly."),
228 }
229 }
230}
231
232impl Deref for Transaction<'_> {
233 type Target = Connection;
234
235 #[inline]
236 fn deref(&self) -> &Connection {
237 self.conn
238 }
239}
240
241#[allow(unused_must_use)]
242impl Drop for Transaction<'_> {
243 #[inline]
244 fn drop(&mut self) {
245 self.finish_();
246 }
247}
248
249impl Savepoint<'_> {
250 #[inline]
251 fn with_depth_and_name<T: Into<String>>(conn: &Connection, depth: u32, name: T) -> Result<Savepoint<'_>> {
252 let name = name.into();
253 conn.execute_batch(&format!("SAVEPOINT {name}")).map(|_| Savepoint {
254 conn,
255 name,
256 depth,
257 drop_behavior: DropBehavior::Rollback,
258 committed: false,
259 })
260 }
261
262 #[inline]
263 fn with_depth(conn: &Connection, depth: u32) -> Result<Savepoint<'_>> {
264 let name = format!("_duckdb_sp_{depth}");
265 Savepoint::with_depth_and_name(conn, depth, name)
266 }
267
268 /// Begin a new savepoint. Can be nested.
269 #[inline]
270 pub fn new(conn: &mut Connection) -> Result<Savepoint<'_>> {
271 Savepoint::with_depth(conn, 0)
272 }
273
274 /// Begin a new savepoint with a user-provided savepoint name.
275 #[inline]
276 pub fn with_name<T: Into<String>>(conn: &mut Connection, name: T) -> Result<Savepoint<'_>> {
277 Savepoint::with_depth_and_name(conn, 0, name)
278 }
279
280 /// Begin a nested savepoint.
281 #[inline]
282 pub fn savepoint(&mut self) -> Result<Savepoint<'_>> {
283 Savepoint::with_depth(self.conn, self.depth + 1)
284 }
285
286 /// Begin a nested savepoint with a user-provided savepoint name.
287 #[inline]
288 pub fn savepoint_with_name<T: Into<String>>(&mut self, name: T) -> Result<Savepoint<'_>> {
289 Savepoint::with_depth_and_name(self.conn, self.depth + 1, name)
290 }
291
292 /// Get the current setting for what happens to the savepoint when it is
293 /// dropped.
294 #[inline]
295 pub fn drop_behavior(&self) -> DropBehavior {
296 self.drop_behavior
297 }
298
299 /// Configure the savepoint to perform the specified action when it is
300 /// dropped.
301 #[inline]
302 pub fn set_drop_behavior(&mut self, drop_behavior: DropBehavior) {
303 self.drop_behavior = drop_behavior
304 }
305
306 /// A convenience method which consumes and commits a savepoint.
307 #[inline]
308 pub fn commit(mut self) -> Result<()> {
309 self.commit_()
310 }
311
312 #[inline]
313 fn commit_(&mut self) -> Result<()> {
314 self.conn.execute_batch(&format!("RELEASE {}", self.name))?;
315 self.committed = true;
316 Ok(())
317 }
318
319 /// A convenience method which rolls back a savepoint.
320 ///
321 /// ## Note
322 ///
323 /// Unlike `Transaction`s, savepoints remain active after they have been
324 /// rolled back, and can be rolled back again or committed.
325 #[inline]
326 pub fn rollback(&mut self) -> Result<()> {
327 self.conn.execute_batch(&format!("ROLLBACK TO {}", self.name))
328 }
329
330 /// Consumes the savepoint, committing or rolling back according to the
331 /// current setting (see `drop_behavior`).
332 ///
333 /// Functionally equivalent to the `Drop` implementation, but allows
334 /// callers to see any errors that occur.
335 #[inline]
336 pub fn finish(mut self) -> Result<()> {
337 self.finish_()
338 }
339
340 #[inline]
341 fn finish_(&mut self) -> Result<()> {
342 if self.committed {
343 return Ok(());
344 }
345 match self.drop_behavior() {
346 DropBehavior::Commit => self.commit_().or_else(|_| self.rollback()),
347 DropBehavior::Rollback => self.rollback(),
348 DropBehavior::Ignore => Ok(()),
349 DropBehavior::Panic => panic!("Savepoint dropped unexpectedly."),
350 }
351 }
352}
353
354impl Deref for Savepoint<'_> {
355 type Target = Connection;
356
357 #[inline]
358 fn deref(&self) -> &Connection {
359 self.conn
360 }
361}
362
363#[allow(unused_must_use)]
364impl Drop for Savepoint<'_> {
365 #[inline]
366 fn drop(&mut self) {
367 self.finish_();
368 }
369}
370
371impl Connection {
372 /// Begin a new transaction with the default behavior (DEFERRED).
373 ///
374 /// The transaction defaults to rolling back when it is dropped. If you
375 /// want the transaction to commit, you must call
376 /// [`commit`](Transaction::commit) or [`set_drop_behavior(DropBehavior:
377 /// :Commit)`](Transaction::set_drop_behavior).
378 ///
379 /// ## Example
380 ///
381 /// ```rust,no_run
382 /// # use duckdb::{Connection, Result};
383 /// # fn do_queries_part_1(_conn: &Connection) -> Result<()> { Ok(()) }
384 /// # fn do_queries_part_2(_conn: &Connection) -> Result<()> { Ok(()) }
385 /// fn perform_queries(conn: &mut Connection) -> Result<()> {
386 /// let tx = conn.transaction()?;
387 ///
388 /// do_queries_part_1(&tx)?; // tx causes rollback if this fails
389 /// do_queries_part_2(&tx)?; // tx causes rollback if this fails
390 ///
391 /// tx.commit()
392 /// }
393 /// ```
394 ///
395 /// # Failure
396 ///
397 /// Will return `Err` if the underlying DuckDB call fails.
398 #[inline]
399 pub fn transaction(&mut self) -> Result<Transaction<'_>> {
400 Transaction::new(self)
401 }
402
403 /// Begin a new transaction with a specified behavior.
404 ///
405 /// See [`transaction`](Connection::transaction).
406 ///
407 /// # Failure
408 ///
409 /// Will return `Err` if the underlying DuckDB call fails.
410 #[inline]
411 #[allow(dead_code)]
412 fn transaction_with_behavior(&mut self, behavior: TransactionBehavior) -> Result<Transaction<'_>> {
413 Transaction::new_unchecked(self, behavior)
414 }
415
416 /// Begin a new transaction with the default behavior (DEFERRED).
417 ///
418 /// Attempt to open a nested transaction will result in a DuckDB error.
419 /// `Connection::transaction` prevents this at compile time by taking `&mut
420 /// self`, but `Connection::unchecked_transaction()` may be used to defer
421 /// the checking until runtime.
422 ///
423 /// See [`Connection::transaction`] and [`Transaction::new_unchecked`]
424 /// (which can be used if the default transaction behavior is undesirable).
425 ///
426 /// ## Example
427 ///
428 /// ```rust,no_run
429 /// # use duckdb::{Connection, Result};
430 /// # use std::rc::Rc;
431 /// # fn do_queries_part_1(_conn: &Connection) -> Result<()> { Ok(()) }
432 /// # fn do_queries_part_2(_conn: &Connection) -> Result<()> { Ok(()) }
433 /// fn perform_queries(conn: Rc<Connection>) -> Result<()> {
434 /// let tx = conn.unchecked_transaction()?;
435 ///
436 /// do_queries_part_1(&tx)?; // tx causes rollback if this fails
437 /// do_queries_part_2(&tx)?; // tx causes rollback if this fails
438 ///
439 /// tx.commit()
440 /// }
441 /// ```
442 ///
443 /// # Failure
444 ///
445 /// Will return `Err` if the underlying DuckDB call fails. The specific
446 /// error returned if transactions are nested is currently unspecified.
447 pub fn unchecked_transaction(&self) -> Result<Transaction<'_>> {
448 Transaction::new_unchecked(self, TransactionBehavior::Deferred)
449 }
450
451 /// Begin a new savepoint with the default behavior (DEFERRED).
452 ///
453 /// The savepoint defaults to rolling back when it is dropped. If you want
454 /// the savepoint to commit, you must call [`commit`](Savepoint::commit) or
455 /// [`set_drop_behavior(DropBehavior::Commit)`](Savepoint::
456 /// set_drop_behavior).
457 ///
458 /// ## Example
459 ///
460 /// ```rust,no_run
461 /// # use duckdb::{Connection, Result};
462 /// # fn do_queries_part_1(_conn: &Connection) -> Result<()> { Ok(()) }
463 /// # fn do_queries_part_2(_conn: &Connection) -> Result<()> { Ok(()) }
464 /// fn perform_queries(conn: &mut Connection) -> Result<()> {
465 /// let sp = conn.savepoint()?;
466 ///
467 /// do_queries_part_1(&sp)?; // sp causes rollback if this fails
468 /// do_queries_part_2(&sp)?; // sp causes rollback if this fails
469 ///
470 /// sp.commit()
471 /// }
472 /// ```
473 ///
474 /// # Failure
475 ///
476 /// Will return `Err` if the underlying DuckDB call fails.
477 #[inline]
478 pub fn savepoint(&mut self) -> Result<Savepoint<'_>> {
479 Savepoint::new(self)
480 }
481
482 /// Begin a new savepoint with a specified name.
483 ///
484 /// See [`savepoint`](Connection::savepoint).
485 ///
486 /// # Failure
487 ///
488 /// Will return `Err` if the underlying DuckDB call fails.
489 #[inline]
490 pub fn savepoint_with_name<T: Into<String>>(&mut self, name: T) -> Result<Savepoint<'_>> {
491 Savepoint::with_name(self, name)
492 }
493}
494
495#[cfg(test)]
496mod test {
497 use super::DropBehavior;
498 use crate::{Connection, Result};
499
500 fn checked_no_autocommit_memory_handle() -> Result<Connection> {
501 let db = Connection::open_in_memory()?;
502 db.execute_batch(
503 r"
504 CREATE TABLE foo (x INTEGER);
505 -- SET AUTOCOMMIT TO OFF;
506 ",
507 )?;
508 Ok(db)
509 }
510
511 #[test]
512 fn test_drop() -> Result<()> {
513 let mut db = checked_no_autocommit_memory_handle()?;
514 {
515 let tx = db.transaction()?;
516 assert!(tx.execute_batch("INSERT INTO foo VALUES(1)").is_ok());
517 // default: rollback
518 }
519 {
520 let mut tx = db.transaction()?;
521 tx.execute_batch("INSERT INTO foo VALUES(2)")?;
522 tx.set_drop_behavior(DropBehavior::Commit);
523 }
524 {
525 let tx = db.transaction()?;
526 assert_eq!(
527 2i32,
528 tx.query_row::<i32, _, _>("SELECT SUM(x) FROM foo", [], |r| r.get(0))?
529 );
530 }
531 Ok(())
532 }
533
534 #[test]
535 fn test_unchecked_nesting() -> Result<()> {
536 let db = checked_no_autocommit_memory_handle()?;
537
538 {
539 db.unchecked_transaction()?;
540 // default: rollback
541 }
542 {
543 let tx = db.unchecked_transaction()?;
544 tx.execute_batch("INSERT INTO foo VALUES(1)")?;
545 // Ensure this doesn't interfere with ongoing transaction
546 // let e = tx.unchecked_transaction().unwrap_err();
547 // assert_nested_tx_error(e);
548 tx.execute_batch("INSERT INTO foo VALUES(1)")?;
549 tx.commit()?;
550 }
551
552 assert_eq!(
553 2i32,
554 db.query_row::<i32, _, _>("SELECT SUM(x) FROM foo", [], |r| r.get(0))?
555 );
556 Ok(())
557 }
558
559 #[test]
560 fn test_explicit_rollback_commit() -> Result<()> {
561 let mut db = checked_no_autocommit_memory_handle()?;
562 {
563 let tx = db.transaction()?;
564 tx.execute_batch("INSERT INTO foo VALUES(1)")?;
565 tx.rollback()?;
566 }
567 {
568 let tx = db.transaction()?;
569 tx.execute_batch("INSERT INTO foo VALUES(4)")?;
570 tx.commit()?;
571 }
572 {
573 let tx = db.transaction()?;
574 assert_eq!(
575 4i32,
576 tx.query_row::<i32, _, _>("SELECT SUM(x) FROM foo", [], |r| r.get(0))?
577 );
578 }
579 Ok(())
580 }
581
582 #[test]
583 #[ignore = "not supported"]
584 fn test_savepoint() -> Result<()> {
585 let mut db = checked_no_autocommit_memory_handle()?;
586 {
587 let mut tx = db.transaction()?;
588 tx.execute_batch("INSERT INTO foo VALUES(1)")?;
589 assert_current_sum(1, &tx)?;
590 tx.set_drop_behavior(DropBehavior::Commit);
591 {
592 let mut sp1 = tx.savepoint()?;
593 sp1.execute_batch("INSERT INTO foo VALUES(2)")?;
594 assert_current_sum(3, &sp1)?;
595 // will rollback sp1
596 {
597 let mut sp2 = sp1.savepoint()?;
598 sp2.execute_batch("INSERT INTO foo VALUES(4)")?;
599 assert_current_sum(7, &sp2)?;
600 // will rollback sp2
601 {
602 let sp3 = sp2.savepoint()?;
603 sp3.execute_batch("INSERT INTO foo VALUES(8)")?;
604 assert_current_sum(15, &sp3)?;
605 sp3.commit()?;
606 // committed sp3, but will be erased by sp2 rollback
607 }
608 assert_current_sum(15, &sp2)?;
609 }
610 assert_current_sum(3, &sp1)?;
611 }
612 assert_current_sum(1, &tx)?;
613 }
614 assert_current_sum(1, &db)?;
615 Ok(())
616 }
617
618 #[test]
619 #[ignore = "not supported"]
620 fn test_ignore_drop_behavior() -> Result<()> {
621 let mut db = checked_no_autocommit_memory_handle()?;
622
623 let mut tx = db.transaction()?;
624 {
625 let mut sp1 = tx.savepoint()?;
626 insert(1, &sp1)?;
627 sp1.rollback()?;
628 insert(2, &sp1)?;
629 {
630 let mut sp2 = sp1.savepoint()?;
631 sp2.set_drop_behavior(DropBehavior::Ignore);
632 insert(4, &sp2)?;
633 }
634 assert_current_sum(6, &sp1)?;
635 sp1.commit()?;
636 }
637 assert_current_sum(6, &tx)?;
638 Ok(())
639 }
640
641 #[test]
642 #[ignore = "not supported"]
643 fn test_savepoint_names() -> Result<()> {
644 let mut db = checked_no_autocommit_memory_handle()?;
645
646 {
647 let mut sp1 = db.savepoint_with_name("my_sp")?;
648 insert(1, &sp1)?;
649 assert_current_sum(1, &sp1)?;
650 {
651 let mut sp2 = sp1.savepoint_with_name("my_sp")?;
652 sp2.set_drop_behavior(DropBehavior::Commit);
653 insert(2, &sp2)?;
654 assert_current_sum(3, &sp2)?;
655 sp2.rollback()?;
656 assert_current_sum(1, &sp2)?;
657 insert(4, &sp2)?;
658 }
659 assert_current_sum(5, &sp1)?;
660 sp1.rollback()?;
661 {
662 let mut sp2 = sp1.savepoint_with_name("my_sp")?;
663 sp2.set_drop_behavior(DropBehavior::Ignore);
664 insert(8, &sp2)?;
665 }
666 assert_current_sum(8, &sp1)?;
667 sp1.commit()?;
668 }
669 assert_current_sum(8, &db)?;
670 Ok(())
671 }
672
673 #[test]
674 fn test_rc() -> Result<()> {
675 use std::rc::Rc;
676 let mut conn = Connection::open_in_memory()?;
677 let rc_txn = Rc::new(conn.transaction()?);
678
679 // This will compile only if Transaction is Debug
680 Rc::try_unwrap(rc_txn).unwrap();
681 Ok(())
682 }
683
684 fn insert(x: i32, conn: &Connection) -> Result<usize> {
685 conn.execute("INSERT INTO foo VALUES(?)", [x])
686 }
687
688 fn assert_current_sum(x: i32, conn: &Connection) -> Result<()> {
689 let i = conn.query_row::<i32, _, _>("SELECT SUM(x) FROM foo", [], |r| r.get(0))?;
690 assert_eq!(x, i);
691 Ok(())
692 }
693}