Skip to main content

mongodb/action/
transaction.rs

1use std::time::Duration;
2
3use crate::{
4    client::options::TransactionOptions,
5    options::{ReadConcern, WriteConcern},
6    selection_criteria::SelectionCriteria,
7    ClientSession,
8};
9
10use super::{export_doc, option_setters, options_doc};
11
12impl ClientSession {
13    /// Starts a new transaction on this session. If no options are set, the session's
14    /// `defaultTransactionOptions` will be used. This session must be passed into each operation
15    /// within the transaction; otherwise, the operation will be executed outside of the
16    /// transaction.
17    ///
18    /// Errors returned from operations executed within a transaction may include a
19    /// [`crate::error::TRANSIENT_TRANSACTION_ERROR`] label. This label indicates that the entire
20    /// transaction can be retried with a reasonable expectation that it will succeed.
21    ///
22    /// ```rust
23    /// # use mongodb::{bson::{doc, Document}, error::Result, Client, ClientSession};
24    /// #
25    /// # async fn do_stuff() -> Result<()> {
26    /// # let client = Client::with_uri_str("mongodb://example.com").await?;
27    /// # let coll = client.database("foo").collection::<Document>("bar");
28    /// # let mut session = client.start_session().await?;
29    /// session.start_transaction().await?;
30    /// let result = coll.insert_one(doc! { "x": 1 }).session(&mut session).await?;
31    /// session.commit_transaction().await?;
32    /// # Ok(())
33    /// # }
34    /// ```
35    ///
36    /// `await` will return [`Result<()>`].
37    #[options_doc(start_transaction)]
38    pub fn start_transaction(&mut self) -> StartTransaction<&mut Self> {
39        StartTransaction {
40            session: self,
41            options: None,
42        }
43    }
44
45    /// Commits the transaction that is currently active on this session.
46    ///
47    /// This method may return an error with a [`crate::error::UNKNOWN_TRANSACTION_COMMIT_RESULT`]
48    /// label. This label indicates that it is unknown whether the commit has satisfied the write
49    /// concern associated with the transaction. If an error with this label is returned, it is
50    /// safe to retry the commit until the write concern is satisfied or an error without the label
51    /// is returned.
52    ///
53    /// ```rust
54    /// # use mongodb::{bson::{doc, Document}, error::Result, Client, ClientSession};
55    /// #
56    /// # async fn do_stuff() -> Result<()> {
57    /// # let client = Client::with_uri_str("mongodb://example.com").await?;
58    /// # let coll = client.database("foo").collection::<Document>("bar");
59    /// # let mut session = client.start_session().await?;
60    /// session.start_transaction().await?;
61    /// let result = coll.insert_one(doc! { "x": 1 }).session(&mut session).await?;
62    /// session.commit_transaction().await?;
63    /// # Ok(())
64    /// # }
65    /// ```
66    ///
67    /// This operation will retry once upon failure if the connection and encountered error support
68    /// retryability. See the documentation
69    /// [here](https://www.mongodb.com/docs/manual/core/retryable-writes/) for more information on
70    /// retryable writes.
71    ///
72    /// `await` will return [`Result<()>`].
73    pub fn commit_transaction(&mut self) -> CommitTransaction<'_> {
74        CommitTransaction { session: self }
75    }
76
77    /// Aborts the transaction that is currently active on this session. Any open transaction will
78    /// be aborted automatically in the `Drop` implementation of `ClientSession`.
79    ///
80    /// ```rust
81    /// # use mongodb::{bson::{doc, Document}, error::Result, Client, ClientSession, Collection};
82    /// #
83    /// # async fn do_stuff() -> Result<()> {
84    /// # let client = Client::with_uri_str("mongodb://example.com").await?;
85    /// # let coll = client.database("foo").collection::<Document>("bar");
86    /// # let mut session = client.start_session().await?;
87    /// session.start_transaction().await?;
88    /// match execute_transaction(&coll, &mut session).await {
89    ///     Ok(_) => session.commit_transaction().await?,
90    ///     Err(_) => session.abort_transaction().await?,
91    /// }
92    /// # Ok(())
93    /// # }
94    ///
95    /// async fn execute_transaction(coll: &Collection<Document>, session: &mut ClientSession) -> Result<()> {
96    ///     coll.insert_one(doc! { "x": 1 }).session(&mut *session).await?;
97    ///     coll.delete_one(doc! { "y": 2 }).session(&mut *session).await?;
98    ///     Ok(())
99    /// }
100    /// ```
101    ///
102    /// This operation will retry once upon failure if the connection and encountered error support
103    /// retryability. See the documentation
104    /// [here](https://www.mongodb.com/docs/manual/core/retryable-writes/) for more information on
105    /// retryable writes.
106    ///
107    /// `await` will return [`Result<()>`].
108    pub fn abort_transaction(&mut self) -> AbortTransaction<'_> {
109        AbortTransaction { session: self }
110    }
111}
112
113#[cfg(feature = "sync")]
114impl crate::sync::ClientSession {
115    /// Starts a new transaction on this session with the given `TransactionOptions`. If no options
116    /// are provided, the session's `defaultTransactionOptions` will be used. This session must
117    /// be passed into each operation within the transaction; otherwise, the operation will be
118    /// executed outside of the transaction.
119    ///
120    /// ```rust
121    /// # use mongodb::{bson::{doc, Document}, error::Result, sync::{Client, ClientSession}};
122    /// #
123    /// # async fn do_stuff() -> Result<()> {
124    /// # let client = Client::with_uri_str("mongodb://example.com")?;
125    /// # let coll = client.database("foo").collection::<Document>("bar");
126    /// # let mut session = client.start_session().run()?;
127    /// session.start_transaction().run()?;
128    /// let result = coll.insert_one(doc! { "x": 1 }).session(&mut session).run()?;
129    /// session.commit_transaction().run()?;
130    /// # Ok(())
131    /// # }
132    /// ```
133    ///
134    /// [`run`](StartTransaction::run) will return [`Result<()>`].
135    #[options_doc(start_transaction, "run")]
136    pub fn start_transaction(&mut self) -> StartTransaction<&mut Self> {
137        StartTransaction {
138            session: self,
139            options: None,
140        }
141    }
142
143    /// Commits the transaction that is currently active on this session.
144    ///
145    /// ```rust
146    /// # use mongodb::{bson::{doc, Document}, error::Result, sync::{Client, ClientSession}};
147    /// #
148    /// # async fn do_stuff() -> Result<()> {
149    /// # let client = Client::with_uri_str("mongodb://example.com")?;
150    /// # let coll = client.database("foo").collection::<Document>("bar");
151    /// # let mut session = client.start_session().run()?;
152    /// session.start_transaction().run()?;
153    /// let result = coll.insert_one(doc! { "x": 1 }).session(&mut session).run()?;
154    /// session.commit_transaction().run()?;
155    /// # Ok(())
156    /// # }
157    /// ```
158    ///
159    /// This operation will retry once upon failure if the connection and encountered error support
160    /// retryability. See the documentation
161    /// [here](https://www.mongodb.com/docs/manual/core/retryable-writes/) for more information on
162    /// retryable writes.
163    ///
164    /// [`run`](CommitTransaction::run) will return [`Result<()>`].
165    pub fn commit_transaction(&mut self) -> CommitTransaction<'_> {
166        self.async_client_session.commit_transaction()
167    }
168
169    /// Aborts the transaction that is currently active on this session. Any open transaction will
170    /// be aborted automatically in the `Drop` implementation of `ClientSession`.
171    ///
172    /// ```rust
173    /// # use mongodb::{bson::{doc, Document}, error::Result, sync::{Client, ClientSession, Collection}};
174    /// #
175    /// # async fn do_stuff() -> Result<()> {
176    /// # let client = Client::with_uri_str("mongodb://example.com")?;
177    /// # let coll = client.database("foo").collection::<Document>("bar");
178    /// # let mut session = client.start_session().run()?;
179    /// session.start_transaction().run()?;
180    /// match execute_transaction(coll, &mut session) {
181    ///     Ok(_) => session.commit_transaction().run()?,
182    ///     Err(_) => session.abort_transaction().run()?,
183    /// }
184    /// # Ok(())
185    /// # }
186    ///
187    /// fn execute_transaction(coll: Collection<Document>, session: &mut ClientSession) -> Result<()> {
188    ///     coll.insert_one(doc! { "x": 1 }).session(&mut *session).run()?;
189    ///     coll.delete_one(doc! { "y": 2 }).session(&mut *session).run()?;
190    ///     Ok(())
191    /// }
192    /// ```
193    ///
194    /// This operation will retry once upon failure if the connection and encountered error support
195    /// retryability. See the documentation
196    /// [here](https://www.mongodb.com/docs/manual/core/retryable-writes/) for more information on
197    /// retryable writes.
198    ///
199    /// [`run`](AbortTransaction::run) will return [`Result<()>`].
200    pub fn abort_transaction(&mut self) -> AbortTransaction<'_> {
201        self.async_client_session.abort_transaction()
202    }
203}
204
205/// Start a new transaction.  Construct with [`ClientSession::start_transaction`].
206#[must_use]
207pub struct StartTransaction<S> {
208    pub(crate) session: S,
209    pub(crate) options: Option<TransactionOptions>,
210}
211
212#[option_setters(crate::client::options::TransactionOptions)]
213#[export_doc(start_transaction)]
214impl<S> StartTransaction<S> {}
215
216/// Commits a currently-active transaction.  Construct with [`ClientSession::commit_transaction`].
217#[must_use]
218pub struct CommitTransaction<'a> {
219    pub(crate) session: &'a mut ClientSession,
220}
221
222/// Abort the currently active transaction on a session.  Construct with
223/// [`ClientSession::abort_transaction`].
224#[must_use]
225pub struct AbortTransaction<'a> {
226    pub(crate) session: &'a mut ClientSession,
227}
228
229// Action impls at src/client/session/action.rs