Skip to main content

mongodb/client/session/
action.rs

1use std::time::{Duration, Instant};
2
3use crate::{
4    action::{action_impl, AbortTransaction, CommitTransaction, StartTransaction},
5    client::options::TransactionOptions,
6    error::{ErrorKind, Result},
7    operation::{self, Operation},
8    sdam::TransactionSupportStatus,
9    BoxFuture,
10    ClientSession,
11};
12
13use super::TransactionState;
14
15impl ClientSession {
16    async fn start_transaction_impl(&mut self, options: Option<TransactionOptions>) -> Result<()> {
17        if self
18            .options
19            .as_ref()
20            .and_then(|o| o.snapshot)
21            .unwrap_or(false)
22        {
23            return Err(ErrorKind::Transaction {
24                message: "Transactions are not supported in snapshot sessions".into(),
25            }
26            .into());
27        }
28        match self.transaction.state {
29            TransactionState::Starting | TransactionState::InProgress => {
30                return Err(ErrorKind::Transaction {
31                    message: "transaction already in progress".into(),
32                }
33                .into());
34            }
35            TransactionState::Committed { .. } => {
36                self.unpin(); // Unpin session if previous transaction is committed.
37            }
38            _ => {}
39        }
40        match self.client.transaction_support_status().await? {
41            TransactionSupportStatus::Supported => {
42                let mut options = match options {
43                    Some(mut options) => {
44                        if let Some(defaults) = self.default_transaction_options() {
45                            merge_options!(
46                                defaults,
47                                options,
48                                [
49                                    read_concern,
50                                    write_concern,
51                                    selection_criteria,
52                                    max_commit_time
53                                ]
54                            );
55                        }
56                        Some(options)
57                    }
58                    None => self.default_transaction_options().cloned(),
59                };
60                resolve_options!(
61                    self.client,
62                    options,
63                    [read_concern, write_concern, selection_criteria]
64                );
65
66                if let Some(ref options) = options {
67                    if !options
68                        .write_concern
69                        .as_ref()
70                        .map(|wc| wc.is_acknowledged())
71                        .unwrap_or(true)
72                    {
73                        return Err(ErrorKind::Transaction {
74                            message: "transactions do not support unacknowledged write concerns"
75                                .into(),
76                        }
77                        .into());
78                    }
79                }
80
81                self.increment_txn_number();
82                self.transaction.start(
83                    options,
84                    #[cfg(feature = "opentelemetry")]
85                    self.client.start_transaction_span(),
86                );
87                Ok(())
88            }
89            _ => Err(ErrorKind::Transaction {
90                message: "Transactions are not supported by this deployment".into(),
91            }
92            .into()),
93        }
94    }
95}
96
97#[action_impl]
98impl<'a> Action for StartTransaction<&'a mut ClientSession> {
99    type Future = StartTransactionFuture;
100
101    async fn execute(self) -> Result<()> {
102        self.session.start_transaction_impl(self.options).await
103    }
104}
105
106macro_rules! convenient_run {
107    (
108        $session:expr,
109        $start_transaction:expr,
110        $callback:expr,
111        $abort_transaction:expr,
112        $commit_transaction:expr,
113    ) => {{
114        let timeout = Duration::from_secs(120);
115        #[cfg(test)]
116        let timeout = $session.convenient_transaction_timeout.unwrap_or(timeout);
117        let start = Instant::now();
118
119        use crate::error::{TRANSIENT_TRANSACTION_ERROR, UNKNOWN_TRANSACTION_COMMIT_RESULT};
120
121        'transaction: loop {
122            $start_transaction?;
123            let ret = match $callback {
124                Ok(v) => v,
125                Err(e) => {
126                    if matches!(
127                        $session.transaction.state,
128                        TransactionState::Starting | TransactionState::InProgress
129                    ) {
130                        $abort_transaction?;
131                    }
132                    if e.contains_label(TRANSIENT_TRANSACTION_ERROR) && start.elapsed() < timeout {
133                        continue 'transaction;
134                    }
135                    return Err(e);
136                }
137            };
138            if matches!(
139                $session.transaction.state,
140                TransactionState::None
141                    | TransactionState::Aborted
142                    | TransactionState::Committed { .. }
143            ) {
144                return Ok(ret);
145            }
146            'commit: loop {
147                match $commit_transaction {
148                    Ok(()) => return Ok(ret),
149                    Err(e) => {
150                        if e.is_max_time_ms_expired_error() || start.elapsed() >= timeout {
151                            return Err(e);
152                        }
153                        if e.contains_label(UNKNOWN_TRANSACTION_COMMIT_RESULT) {
154                            continue 'commit;
155                        }
156                        if e.contains_label(TRANSIENT_TRANSACTION_ERROR) {
157                            continue 'transaction;
158                        }
159                        return Err(e);
160                    }
161                }
162            }
163        }
164    }};
165}
166
167impl StartTransaction<&mut ClientSession> {
168    /// Starts a transaction, runs the given callback, and commits or aborts the transaction.  In
169    /// most circumstances, [`and_run2`](StartTransaction::and_run2) will be more convenient.
170    ///
171    /// Transient transaction errors will cause the callback or the commit to be retried;
172    /// other errors will cause the transaction to be aborted and the error returned to the
173    /// caller.  If the callback needs to provide its own error information, the
174    /// [`Error::custom`](crate::error::Error::custom) method can accept an arbitrary payload that
175    /// can be retrieved via [`Error::get_custom`](crate::error::Error::get_custom).
176    ///
177    /// If a command inside the callback fails, it may cause the transaction on the server to be
178    /// aborted. This situation is normally handled transparently by the driver. However, if the
179    /// application does not return that error from the callback, the driver will not be able to
180    /// determine whether the transaction was aborted or not. The driver will then retry the
181    /// callback indefinitely. To avoid this situation, the application MUST NOT silently handle
182    /// errors within the callback. If the application needs to handle errors within the
183    /// callback, it MUST return them after doing so.
184    ///
185    /// Because the callback can be repeatedly executed and because it returns a future, the rust
186    /// closure borrowing rules for captured values can be overly restrictive.  As a
187    /// convenience, `and_run` accepts a context argument that will be passed to the
188    /// callback along with the session:
189    ///
190    /// ```no_run
191    /// # use mongodb::{bson::{doc, Document}, error::Result, Client};
192    /// # use futures::FutureExt;
193    /// # async fn wrapper() -> Result<()> {
194    /// # let client = Client::with_uri_str("mongodb://example.com").await?;
195    /// # let mut session = client.start_session().await?;
196    /// let coll = client.database("mydb").collection::<Document>("mycoll");
197    /// let my_data = "my data".to_string();
198    /// // This works:
199    /// session.start_transaction().and_run(
200    ///     (&coll, &my_data),
201    ///     |session, (coll, my_data)| async move {
202    ///         coll.insert_one(doc! { "data": *my_data }).session(session).await
203    ///     }.boxed()
204    /// ).await?;
205    /// /* This will not compile with a "variable moved due to use in generator" error:
206    /// session.start_transaction().and_run(
207    ///     (),
208    ///     |session, _| async move {
209    ///         coll.insert_one(doc! { "data": my_data }).session(session).await
210    ///     }.boxed()
211    /// ).await?;
212    /// */
213    /// # Ok(())
214    /// # }
215    /// ```
216    pub async fn and_run<R, C, F>(self, mut context: C, mut callback: F) -> Result<R>
217    where
218        F: for<'b> FnMut(&'b mut ClientSession, &'b mut C) -> BoxFuture<'b, Result<R>>,
219    {
220        convenient_run!(
221            self.session,
222            self.session
223                .start_transaction()
224                .with_options(self.options.clone())
225                .await,
226            callback(self.session, &mut context).await,
227            self.session.abort_transaction().await,
228            self.session.commit_transaction().await,
229        )
230    }
231
232    /// Starts a transaction, runs the given callback, and commits or aborts the transaction.
233    ///
234    /// Transient transaction errors will cause the callback or the commit to be retried;
235    /// other errors will cause the transaction to be aborted and the error returned to the
236    /// caller.  If the callback needs to provide its own error information, the
237    /// [`Error::custom`](crate::error::Error::custom) method can accept an arbitrary payload that
238    /// can be retrieved via [`Error::get_custom`](crate::error::Error::get_custom).
239    ///
240    /// If a command inside the callback fails, it may cause the transaction on the server to be
241    /// aborted. This situation is normally handled transparently by the driver. However, if the
242    /// application does not return that error from the callback, the driver will not be able to
243    /// determine whether the transaction was aborted or not. The driver will then retry the
244    /// callback indefinitely. To avoid this situation, the application MUST NOT silently handle
245    /// errors within the callback. If the application needs to handle errors within the
246    /// callback, it MUST return them after doing so.
247    ///
248    /// This version of the method uses an async closure, which means it's both more convenient and
249    /// avoids the lifetime issues of `and_run`, but is only available in Rust versions 1.85 and
250    /// above.
251    ///
252    /// In some circumstances, using this method can trigger a
253    /// [compiler bug](https://github.com/rust-lang/rust/issues/96865) that results in
254    /// `implementation of Send is not general enough` errors.  If this is encountered, we
255    /// recommend these workarounds:
256    ///   * Avoid capturing references in the transaction closure (e.g. by cloning)
257    ///   * Use the `context` parameter of [`and_run`](StartTransaction::and_run).
258    ///
259    /// Because the callback can be repeatedly executed, code within the callback cannot consume
260    /// owned values, even values owned by the callback itself:
261    ///
262    /// ```no_run
263    /// # use mongodb::{bson::{doc, Document}, error::Result, Client};
264    /// # use futures::FutureExt;
265    /// # async fn wrapper() -> Result<()> {
266    /// # let client = Client::with_uri_str("mongodb://example.com").await?;
267    /// # let mut session = client.start_session().await?;
268    /// let coll = client.database("mydb").collection::<Document>("mycoll");
269    /// let my_data = "my data".to_string();
270    /// // This works:
271    /// session.start_transaction().and_run2(
272    ///     async move |session| {
273    ///         coll.insert_one(doc! { "data": my_data.clone() }).session(session).await
274    ///     }
275    /// ).await?;
276    /// /* This will not compile:
277    /// session.start_transaction().and_run2(
278    ///     async move |session| {
279    ///         coll.insert_one(doc! { "data": my_data }).session(session).await
280    ///     }
281    /// ).await?;
282    /// */
283    /// # Ok(())
284    /// # }
285    /// ```
286    #[rustversion::since(1.85)]
287    pub async fn and_run2<R, F>(self, mut callback: F) -> Result<R>
288    where
289        F: for<'b> AsyncFnMut(&'b mut ClientSession) -> Result<R>,
290    {
291        convenient_run!(
292            self.session,
293            self.session
294                .start_transaction()
295                .with_options(self.options.clone())
296                .await,
297            callback(self.session).await,
298            self.session.abort_transaction().await,
299            self.session.commit_transaction().await,
300        )
301    }
302}
303
304#[cfg(feature = "sync")]
305impl StartTransaction<&mut crate::sync::ClientSession> {
306    /// Synchronously execute this action.
307    pub fn run(self) -> Result<()> {
308        crate::sync::TOKIO_RUNTIME.block_on(
309            self.session
310                .async_client_session
311                .start_transaction_impl(self.options),
312        )
313    }
314
315    /// Starts a transaction, runs the given callback, and commits or aborts the transaction.
316    /// Transient transaction errors will cause the callback or the commit to be retried;
317    /// other errors will cause the transaction to be aborted and the error returned to the
318    /// caller.  If the callback needs to provide its own error information, the
319    /// [`Error::custom`](crate::error::Error::custom) method can accept an arbitrary payload that
320    /// can be retrieved via [`Error::get_custom`](crate::error::Error::get_custom).
321    ///
322    /// If a command inside the callback fails, it may cause the transaction on the server to be
323    /// aborted. This situation is normally handled transparently by the driver. However, if the
324    /// application does not return that error from the callback, the driver will not be able to
325    /// determine whether the transaction was aborted or not. The driver will then retry the
326    /// callback indefinitely. To avoid this situation, the application MUST NOT silently handle
327    /// errors within the callback. If the application needs to handle errors within the
328    /// callback, it MUST return them after doing so.
329    pub fn and_run<R, F>(self, mut callback: F) -> Result<R>
330    where
331        F: for<'b> FnMut(&'b mut crate::sync::ClientSession) -> Result<R>,
332    {
333        convenient_run!(
334            self.session.async_client_session,
335            self.session
336                .start_transaction()
337                .with_options(self.options.clone())
338                .run(),
339            callback(self.session),
340            self.session.abort_transaction().run(),
341            self.session.commit_transaction().run(),
342        )
343    }
344}
345
346#[action_impl]
347impl<'a> Action for CommitTransaction<'a> {
348    type Future = CommitTransactionFuture;
349
350    async fn execute(self) -> Result<()> {
351        match &mut self.session.transaction.state {
352            TransactionState::None => Err(ErrorKind::Transaction {
353                message: "no transaction started".into(),
354            }
355            .into()),
356            TransactionState::Aborted => Err(ErrorKind::Transaction {
357                message: "Cannot call commitTransaction after calling abortTransaction".into(),
358            }
359            .into()),
360            TransactionState::Starting => {
361                self.session.transaction.commit(false);
362                self.session.transaction.drop_span();
363                Ok(())
364            }
365            TransactionState::InProgress => {
366                let commit_transaction =
367                    operation::CommitTransaction::new(self.session.transaction.options.clone());
368                self.session.transaction.commit(true);
369                let out = self
370                    .session
371                    .client
372                    .clone()
373                    .execute_operation(commit_transaction, &mut *self.session)
374                    .await;
375                self.session.transaction.drop_span();
376                out
377            }
378            TransactionState::Committed {
379                data_committed: true,
380            } => {
381                let mut commit_transaction =
382                    operation::CommitTransaction::new(self.session.transaction.options.clone());
383                commit_transaction.update_for_retry();
384                self.session
385                    .client
386                    .clone()
387                    .execute_operation(commit_transaction, self.session)
388                    .await
389            }
390            TransactionState::Committed {
391                data_committed: false,
392            } => Ok(()),
393        }
394    }
395}
396
397#[action_impl]
398impl<'a> Action for AbortTransaction<'a> {
399    type Future = AbortTransactionFuture;
400
401    async fn execute(self) -> Result<()> {
402        match self.session.transaction.state {
403            TransactionState::None => Err(ErrorKind::Transaction {
404                message: "no transaction started".into(),
405            }
406            .into()),
407            TransactionState::Committed { .. } => Err(ErrorKind::Transaction {
408                message: "Cannot call abortTransaction after calling commitTransaction".into(),
409            }
410            .into()),
411            TransactionState::Aborted => Err(ErrorKind::Transaction {
412                message: "cannot call abortTransaction twice".into(),
413            }
414            .into()),
415            TransactionState::Starting => {
416                self.session.transaction.abort();
417                self.session.transaction.drop_span();
418                Ok(())
419            }
420            TransactionState::InProgress => {
421                let write_concern = self
422                    .session
423                    .transaction
424                    .options
425                    .as_ref()
426                    .and_then(|options| options.write_concern.as_ref())
427                    .cloned();
428                let abort_transaction = operation::AbortTransaction::new(
429                    write_concern,
430                    self.session.transaction.pinned.take(),
431                );
432                self.session.transaction.abort();
433                // Errors returned from running an abortTransaction command should be ignored.
434                let _result = self
435                    .session
436                    .client
437                    .clone()
438                    .execute_operation(abort_transaction, &mut *self.session)
439                    .await;
440                self.session.transaction.drop_span();
441                Ok(())
442            }
443        }
444    }
445}