mongodb/client/session/action.rs
1use std::time::{Duration, Instant};
2
3use crate::{
4 action::{action_impl, AbortTransaction, CommitTransaction, StartTransaction},
5 client::options::TransactionOptions,
6 error::{ErrorKind, Result},
7 operation::{self, Operation},
8 sdam::TransactionSupportStatus,
9 BoxFuture,
10 ClientSession,
11};
12
13use super::TransactionState;
14
15impl ClientSession {
16 async fn start_transaction_impl(&mut self, options: Option<TransactionOptions>) -> Result<()> {
17 if self
18 .options
19 .as_ref()
20 .and_then(|o| o.snapshot)
21 .unwrap_or(false)
22 {
23 return Err(ErrorKind::Transaction {
24 message: "Transactions are not supported in snapshot sessions".into(),
25 }
26 .into());
27 }
28 match self.transaction.state {
29 TransactionState::Starting | TransactionState::InProgress => {
30 return Err(ErrorKind::Transaction {
31 message: "transaction already in progress".into(),
32 }
33 .into());
34 }
35 TransactionState::Committed { .. } => {
36 self.unpin(); // Unpin session if previous transaction is committed.
37 }
38 _ => {}
39 }
40 match self.client.transaction_support_status().await? {
41 TransactionSupportStatus::Supported => {
42 let mut options = match options {
43 Some(mut options) => {
44 if let Some(defaults) = self.default_transaction_options() {
45 merge_options!(
46 defaults,
47 options,
48 [
49 read_concern,
50 write_concern,
51 selection_criteria,
52 max_commit_time
53 ]
54 );
55 }
56 Some(options)
57 }
58 None => self.default_transaction_options().cloned(),
59 };
60 resolve_options!(
61 self.client,
62 options,
63 [read_concern, write_concern, selection_criteria]
64 );
65
66 if let Some(ref options) = options {
67 if !options
68 .write_concern
69 .as_ref()
70 .map(|wc| wc.is_acknowledged())
71 .unwrap_or(true)
72 {
73 return Err(ErrorKind::Transaction {
74 message: "transactions do not support unacknowledged write concerns"
75 .into(),
76 }
77 .into());
78 }
79 }
80
81 self.increment_txn_number();
82 self.transaction.start(
83 options,
84 #[cfg(feature = "opentelemetry")]
85 self.client.start_transaction_span(),
86 );
87 Ok(())
88 }
89 _ => Err(ErrorKind::Transaction {
90 message: "Transactions are not supported by this deployment".into(),
91 }
92 .into()),
93 }
94 }
95}
96
97#[action_impl]
98impl<'a> Action for StartTransaction<&'a mut ClientSession> {
99 type Future = StartTransactionFuture;
100
101 async fn execute(self) -> Result<()> {
102 self.session.start_transaction_impl(self.options).await
103 }
104}
105
106macro_rules! convenient_run {
107 (
108 $session:expr,
109 $start_transaction:expr,
110 $callback:expr,
111 $abort_transaction:expr,
112 $commit_transaction:expr,
113 ) => {{
114 let timeout = Duration::from_secs(120);
115 #[cfg(test)]
116 let timeout = $session.convenient_transaction_timeout.unwrap_or(timeout);
117 let start = Instant::now();
118
119 use crate::error::{TRANSIENT_TRANSACTION_ERROR, UNKNOWN_TRANSACTION_COMMIT_RESULT};
120
121 'transaction: loop {
122 $start_transaction?;
123 let ret = match $callback {
124 Ok(v) => v,
125 Err(e) => {
126 if matches!(
127 $session.transaction.state,
128 TransactionState::Starting | TransactionState::InProgress
129 ) {
130 $abort_transaction?;
131 }
132 if e.contains_label(TRANSIENT_TRANSACTION_ERROR) && start.elapsed() < timeout {
133 continue 'transaction;
134 }
135 return Err(e);
136 }
137 };
138 if matches!(
139 $session.transaction.state,
140 TransactionState::None
141 | TransactionState::Aborted
142 | TransactionState::Committed { .. }
143 ) {
144 return Ok(ret);
145 }
146 'commit: loop {
147 match $commit_transaction {
148 Ok(()) => return Ok(ret),
149 Err(e) => {
150 if e.is_max_time_ms_expired_error() || start.elapsed() >= timeout {
151 return Err(e);
152 }
153 if e.contains_label(UNKNOWN_TRANSACTION_COMMIT_RESULT) {
154 continue 'commit;
155 }
156 if e.contains_label(TRANSIENT_TRANSACTION_ERROR) {
157 continue 'transaction;
158 }
159 return Err(e);
160 }
161 }
162 }
163 }
164 }};
165}
166
167impl StartTransaction<&mut ClientSession> {
168 /// Starts a transaction, runs the given callback, and commits or aborts the transaction. In
169 /// most circumstances, [`and_run2`](StartTransaction::and_run2) will be more convenient.
170 ///
171 /// Transient transaction errors will cause the callback or the commit to be retried;
172 /// other errors will cause the transaction to be aborted and the error returned to the
173 /// caller. If the callback needs to provide its own error information, the
174 /// [`Error::custom`](crate::error::Error::custom) method can accept an arbitrary payload that
175 /// can be retrieved via [`Error::get_custom`](crate::error::Error::get_custom).
176 ///
177 /// If a command inside the callback fails, it may cause the transaction on the server to be
178 /// aborted. This situation is normally handled transparently by the driver. However, if the
179 /// application does not return that error from the callback, the driver will not be able to
180 /// determine whether the transaction was aborted or not. The driver will then retry the
181 /// callback indefinitely. To avoid this situation, the application MUST NOT silently handle
182 /// errors within the callback. If the application needs to handle errors within the
183 /// callback, it MUST return them after doing so.
184 ///
185 /// Because the callback can be repeatedly executed and because it returns a future, the rust
186 /// closure borrowing rules for captured values can be overly restrictive. As a
187 /// convenience, `and_run` accepts a context argument that will be passed to the
188 /// callback along with the session:
189 ///
190 /// ```no_run
191 /// # use mongodb::{bson::{doc, Document}, error::Result, Client};
192 /// # use futures::FutureExt;
193 /// # async fn wrapper() -> Result<()> {
194 /// # let client = Client::with_uri_str("mongodb://example.com").await?;
195 /// # let mut session = client.start_session().await?;
196 /// let coll = client.database("mydb").collection::<Document>("mycoll");
197 /// let my_data = "my data".to_string();
198 /// // This works:
199 /// session.start_transaction().and_run(
200 /// (&coll, &my_data),
201 /// |session, (coll, my_data)| async move {
202 /// coll.insert_one(doc! { "data": *my_data }).session(session).await
203 /// }.boxed()
204 /// ).await?;
205 /// /* This will not compile with a "variable moved due to use in generator" error:
206 /// session.start_transaction().and_run(
207 /// (),
208 /// |session, _| async move {
209 /// coll.insert_one(doc! { "data": my_data }).session(session).await
210 /// }.boxed()
211 /// ).await?;
212 /// */
213 /// # Ok(())
214 /// # }
215 /// ```
216 pub async fn and_run<R, C, F>(self, mut context: C, mut callback: F) -> Result<R>
217 where
218 F: for<'b> FnMut(&'b mut ClientSession, &'b mut C) -> BoxFuture<'b, Result<R>>,
219 {
220 convenient_run!(
221 self.session,
222 self.session
223 .start_transaction()
224 .with_options(self.options.clone())
225 .await,
226 callback(self.session, &mut context).await,
227 self.session.abort_transaction().await,
228 self.session.commit_transaction().await,
229 )
230 }
231
232 /// Starts a transaction, runs the given callback, and commits or aborts the transaction.
233 ///
234 /// Transient transaction errors will cause the callback or the commit to be retried;
235 /// other errors will cause the transaction to be aborted and the error returned to the
236 /// caller. If the callback needs to provide its own error information, the
237 /// [`Error::custom`](crate::error::Error::custom) method can accept an arbitrary payload that
238 /// can be retrieved via [`Error::get_custom`](crate::error::Error::get_custom).
239 ///
240 /// If a command inside the callback fails, it may cause the transaction on the server to be
241 /// aborted. This situation is normally handled transparently by the driver. However, if the
242 /// application does not return that error from the callback, the driver will not be able to
243 /// determine whether the transaction was aborted or not. The driver will then retry the
244 /// callback indefinitely. To avoid this situation, the application MUST NOT silently handle
245 /// errors within the callback. If the application needs to handle errors within the
246 /// callback, it MUST return them after doing so.
247 ///
248 /// This version of the method uses an async closure, which means it's both more convenient and
249 /// avoids the lifetime issues of `and_run`, but is only available in Rust versions 1.85 and
250 /// above.
251 ///
252 /// In some circumstances, using this method can trigger a
253 /// [compiler bug](https://github.com/rust-lang/rust/issues/96865) that results in
254 /// `implementation of Send is not general enough` errors. If this is encountered, we
255 /// recommend these workarounds:
256 /// * Avoid capturing references in the transaction closure (e.g. by cloning)
257 /// * Use the `context` parameter of [`and_run`](StartTransaction::and_run).
258 ///
259 /// Because the callback can be repeatedly executed, code within the callback cannot consume
260 /// owned values, even values owned by the callback itself:
261 ///
262 /// ```no_run
263 /// # use mongodb::{bson::{doc, Document}, error::Result, Client};
264 /// # use futures::FutureExt;
265 /// # async fn wrapper() -> Result<()> {
266 /// # let client = Client::with_uri_str("mongodb://example.com").await?;
267 /// # let mut session = client.start_session().await?;
268 /// let coll = client.database("mydb").collection::<Document>("mycoll");
269 /// let my_data = "my data".to_string();
270 /// // This works:
271 /// session.start_transaction().and_run2(
272 /// async move |session| {
273 /// coll.insert_one(doc! { "data": my_data.clone() }).session(session).await
274 /// }
275 /// ).await?;
276 /// /* This will not compile:
277 /// session.start_transaction().and_run2(
278 /// async move |session| {
279 /// coll.insert_one(doc! { "data": my_data }).session(session).await
280 /// }
281 /// ).await?;
282 /// */
283 /// # Ok(())
284 /// # }
285 /// ```
286 #[rustversion::since(1.85)]
287 pub async fn and_run2<R, F>(self, mut callback: F) -> Result<R>
288 where
289 F: for<'b> AsyncFnMut(&'b mut ClientSession) -> Result<R>,
290 {
291 convenient_run!(
292 self.session,
293 self.session
294 .start_transaction()
295 .with_options(self.options.clone())
296 .await,
297 callback(self.session).await,
298 self.session.abort_transaction().await,
299 self.session.commit_transaction().await,
300 )
301 }
302}
303
304#[cfg(feature = "sync")]
305impl StartTransaction<&mut crate::sync::ClientSession> {
306 /// Synchronously execute this action.
307 pub fn run(self) -> Result<()> {
308 crate::sync::TOKIO_RUNTIME.block_on(
309 self.session
310 .async_client_session
311 .start_transaction_impl(self.options),
312 )
313 }
314
315 /// Starts a transaction, runs the given callback, and commits or aborts the transaction.
316 /// Transient transaction errors will cause the callback or the commit to be retried;
317 /// other errors will cause the transaction to be aborted and the error returned to the
318 /// caller. If the callback needs to provide its own error information, the
319 /// [`Error::custom`](crate::error::Error::custom) method can accept an arbitrary payload that
320 /// can be retrieved via [`Error::get_custom`](crate::error::Error::get_custom).
321 ///
322 /// If a command inside the callback fails, it may cause the transaction on the server to be
323 /// aborted. This situation is normally handled transparently by the driver. However, if the
324 /// application does not return that error from the callback, the driver will not be able to
325 /// determine whether the transaction was aborted or not. The driver will then retry the
326 /// callback indefinitely. To avoid this situation, the application MUST NOT silently handle
327 /// errors within the callback. If the application needs to handle errors within the
328 /// callback, it MUST return them after doing so.
329 pub fn and_run<R, F>(self, mut callback: F) -> Result<R>
330 where
331 F: for<'b> FnMut(&'b mut crate::sync::ClientSession) -> Result<R>,
332 {
333 convenient_run!(
334 self.session.async_client_session,
335 self.session
336 .start_transaction()
337 .with_options(self.options.clone())
338 .run(),
339 callback(self.session),
340 self.session.abort_transaction().run(),
341 self.session.commit_transaction().run(),
342 )
343 }
344}
345
346#[action_impl]
347impl<'a> Action for CommitTransaction<'a> {
348 type Future = CommitTransactionFuture;
349
350 async fn execute(self) -> Result<()> {
351 match &mut self.session.transaction.state {
352 TransactionState::None => Err(ErrorKind::Transaction {
353 message: "no transaction started".into(),
354 }
355 .into()),
356 TransactionState::Aborted => Err(ErrorKind::Transaction {
357 message: "Cannot call commitTransaction after calling abortTransaction".into(),
358 }
359 .into()),
360 TransactionState::Starting => {
361 self.session.transaction.commit(false);
362 self.session.transaction.drop_span();
363 Ok(())
364 }
365 TransactionState::InProgress => {
366 let commit_transaction =
367 operation::CommitTransaction::new(self.session.transaction.options.clone());
368 self.session.transaction.commit(true);
369 let out = self
370 .session
371 .client
372 .clone()
373 .execute_operation(commit_transaction, &mut *self.session)
374 .await;
375 self.session.transaction.drop_span();
376 out
377 }
378 TransactionState::Committed {
379 data_committed: true,
380 } => {
381 let mut commit_transaction =
382 operation::CommitTransaction::new(self.session.transaction.options.clone());
383 commit_transaction.update_for_retry();
384 self.session
385 .client
386 .clone()
387 .execute_operation(commit_transaction, self.session)
388 .await
389 }
390 TransactionState::Committed {
391 data_committed: false,
392 } => Ok(()),
393 }
394 }
395}
396
397#[action_impl]
398impl<'a> Action for AbortTransaction<'a> {
399 type Future = AbortTransactionFuture;
400
401 async fn execute(self) -> Result<()> {
402 match self.session.transaction.state {
403 TransactionState::None => Err(ErrorKind::Transaction {
404 message: "no transaction started".into(),
405 }
406 .into()),
407 TransactionState::Committed { .. } => Err(ErrorKind::Transaction {
408 message: "Cannot call abortTransaction after calling commitTransaction".into(),
409 }
410 .into()),
411 TransactionState::Aborted => Err(ErrorKind::Transaction {
412 message: "cannot call abortTransaction twice".into(),
413 }
414 .into()),
415 TransactionState::Starting => {
416 self.session.transaction.abort();
417 self.session.transaction.drop_span();
418 Ok(())
419 }
420 TransactionState::InProgress => {
421 let write_concern = self
422 .session
423 .transaction
424 .options
425 .as_ref()
426 .and_then(|options| options.write_concern.as_ref())
427 .cloned();
428 let abort_transaction = operation::AbortTransaction::new(
429 write_concern,
430 self.session.transaction.pinned.take(),
431 );
432 self.session.transaction.abort();
433 // Errors returned from running an abortTransaction command should be ignored.
434 let _result = self
435 .session
436 .client
437 .clone()
438 .execute_operation(abort_transaction, &mut *self.session)
439 .await;
440 self.session.transaction.drop_span();
441 Ok(())
442 }
443 }
444 }
445}