Skip to main content

mongodb/client/
session.rs

1mod action;
2mod cluster_time;
3mod pool;
4#[cfg(test)]
5mod test;
6
7use std::{
8    collections::HashSet,
9    sync::Arc,
10    time::{Duration, Instant},
11};
12
13use std::sync::LazyLock;
14use uuid::Uuid;
15
16use crate::{
17    bson::{doc, spec::BinarySubtype, Binary, Bson, Document, Timestamp},
18    cmap::conn::PinnedConnectionHandle,
19    operation::Retryability,
20    options::{SessionOptions, TransactionOptions},
21    sdam::ServerInfo,
22    selection_criteria::SelectionCriteria,
23    Client,
24};
25pub use cluster_time::ClusterTime;
26pub(super) use pool::ServerSessionPool;
27
28use super::{options::ServerAddress, AsyncDropToken};
29
30pub(crate) static SESSIONS_UNSUPPORTED_COMMANDS: LazyLock<HashSet<&'static str>> =
31    LazyLock::new(|| {
32        let mut hash_set = HashSet::new();
33        hash_set.insert("killcursors");
34        hash_set.insert("parallelcollectionscan");
35        hash_set
36    });
37
38/// A MongoDB client session. This struct represents a logical session used for ordering sequential
39/// operations. To create a `ClientSession`, call `start_session` on a `Client`.
40///
41/// `ClientSession` instances are not thread safe or fork safe. They can only be used by one thread
42/// or process at a time.
43///
44/// ## Transactions
45/// Transactions are used to execute a series of operations across multiple documents and
46/// collections atomically. For more information about when and how to use transactions in MongoDB,
47/// see the [manual](https://www.mongodb.com/docs/manual/core/transactions/).
48///
49/// Transactions are associated with a `ClientSession`. To begin a transaction, call
50/// [`ClientSession::start_transaction`] on a `ClientSession`. The `ClientSession` must be passed to
51/// operations to be executed within the transaction.
52///
53/// ```rust
54/// use mongodb::{
55///     bson::{doc, Document},
56///     error::{Result, TRANSIENT_TRANSACTION_ERROR, UNKNOWN_TRANSACTION_COMMIT_RESULT},
57///     options::{Acknowledgment, ReadConcern, TransactionOptions, WriteConcern},
58/// #   Client,
59///     ClientSession,
60///     Collection,
61/// };
62///
63/// # async fn do_stuff() -> Result<()> {
64/// # let client = Client::with_uri_str("mongodb://example.com").await?;
65/// # let coll: Collection<Document> = client.database("foo").collection("bar");
66/// let mut session = client.start_session().await?;
67/// session
68///     .start_transaction()
69///     .read_concern(ReadConcern::majority())
70///     .write_concern(WriteConcern::majority())
71///     .await?;
72/// // A "TransientTransactionError" label indicates that the entire transaction can be retried
73/// // with a reasonable expectation that it will succeed.
74/// while let Err(error) = execute_transaction(&coll, &mut session).await {
75///     if !error.contains_label(TRANSIENT_TRANSACTION_ERROR) {
76///         break;
77///     }
78/// }
79/// # Ok(())
80/// # }
81///
82/// async fn execute_transaction(coll: &Collection<Document>, session: &mut ClientSession) -> Result<()> {
83///     coll.insert_one(doc! { "x": 1 }).session(&mut *session).await?;
84///     coll.delete_one(doc! { "y": 2 }).session(&mut *session).await?;
85///     // An "UnknownTransactionCommitResult" label indicates that it is unknown whether the
86///     // commit has satisfied the write concern associated with the transaction. If an error
87///     // with this label is returned, it is safe to retry the commit until the write concern is
88///     // satisfied or an error without the label is returned.
89///     loop {
90///         let result = session.commit_transaction().await;
91///         if let Err(ref error) = result {
92///             if error.contains_label(UNKNOWN_TRANSACTION_COMMIT_RESULT) {
93///                 continue;
94///             }
95///         }
96///         result?
97///     }
98/// }
99/// ```
100#[derive(Debug)]
101pub struct ClientSession {
102    cluster_time: Option<ClusterTime>,
103    server_session: ServerSession,
104    client: Client,
105    is_implicit: bool,
106    options: Option<SessionOptions>,
107    drop_token: AsyncDropToken,
108    pub(crate) transaction: Transaction,
109    pub(crate) snapshot_time: Option<Timestamp>,
110    pub(crate) operation_time: Option<Timestamp>,
111    #[cfg(test)]
112    pub(crate) convenient_transaction_timeout: Option<Duration>,
113    #[cfg(test)]
114    pub(crate) convenient_transaction_jitter: Option<f64>,
115}
116
117#[derive(Debug)]
118pub(crate) struct Transaction {
119    pub(crate) state: TransactionState,
120    pub(crate) options: Option<TransactionOptions>,
121    pub(crate) pinned: Option<TransactionPin>,
122    pub(crate) recovery_token: Option<Document>,
123    #[cfg(feature = "opentelemetry")]
124    pub(crate) otel_span: Option<crate::otel::TxnSpan>,
125}
126
127impl Transaction {
128    pub(crate) fn start(
129        &mut self,
130        options: Option<TransactionOptions>,
131        #[cfg(feature = "opentelemetry")] otel_span: crate::otel::TxnSpan,
132    ) {
133        self.state = TransactionState::Starting;
134        self.options = options;
135        self.recovery_token = None;
136        #[cfg(feature = "opentelemetry")]
137        {
138            self.otel_span = Some(otel_span);
139        }
140    }
141
142    pub(crate) fn commit(&mut self, data_committed: bool) {
143        self.state = TransactionState::Committed { data_committed };
144    }
145
146    pub(crate) fn abort(&mut self) {
147        self.state = TransactionState::Aborted;
148        self.options = None;
149        self.pinned = None;
150    }
151
152    pub(crate) fn reset(&mut self) {
153        self.state = TransactionState::None;
154        self.options = None;
155        self.pinned = None;
156        self.recovery_token = None;
157        self.drop_span();
158    }
159
160    pub(crate) fn drop_span(&mut self) {
161        #[cfg(feature = "opentelemetry")]
162        {
163            self.otel_span = None;
164        }
165    }
166
167    #[cfg(test)]
168    pub(crate) fn is_pinned(&self) -> bool {
169        self.pinned.is_some()
170    }
171
172    pub(crate) fn pinned_mongos(&self) -> Option<&SelectionCriteria> {
173        match &self.pinned {
174            Some(TransactionPin::Mongos(s)) => Some(s),
175            _ => None,
176        }
177    }
178
179    pub(crate) fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
180        match &self.pinned {
181            Some(TransactionPin::Connection(c)) => Some(c),
182            _ => None,
183        }
184    }
185
186    fn take(&mut self) -> Self {
187        Transaction {
188            state: self.state.clone(),
189            options: self.options.take(),
190            pinned: self.pinned.take(),
191            recovery_token: self.recovery_token.take(),
192            #[cfg(feature = "opentelemetry")]
193            otel_span: self.otel_span.take(),
194        }
195    }
196}
197
198impl Default for Transaction {
199    fn default() -> Self {
200        Self {
201            state: TransactionState::None,
202            options: None,
203            pinned: None,
204            recovery_token: None,
205            #[cfg(feature = "opentelemetry")]
206            otel_span: None,
207        }
208    }
209}
210
211#[derive(Clone, Debug, PartialEq)]
212pub(crate) enum TransactionState {
213    None,
214    Starting,
215    InProgress,
216    Committed {
217        /// Whether any data was committed when commit_transaction was initially called. This is
218        /// required to determine whether a commitTransaction command should be run if the user
219        /// calls commit_transaction again.
220        data_committed: bool,
221    },
222    Aborted,
223}
224
225#[derive(Debug)]
226pub(crate) enum TransactionPin {
227    Mongos(SelectionCriteria),
228    Connection(PinnedConnectionHandle),
229}
230
231impl ClientSession {
232    /// Creates a new `ClientSession` by checking out a corresponding `ServerSession` from the
233    /// provided client's session pool.
234    pub(crate) async fn new(
235        client: Client,
236        options: Option<SessionOptions>,
237        is_implicit: bool,
238    ) -> Self {
239        let timeout = client.inner.topology.watcher().logical_session_timeout();
240        let server_session = client.inner.session_pool.check_out(timeout).await;
241        let snapshot_time = options.as_ref().and_then(|o| o.snapshot_time);
242        Self {
243            drop_token: client.register_async_drop(),
244            client,
245            server_session,
246            cluster_time: None,
247            is_implicit,
248            options,
249            transaction: Default::default(),
250            snapshot_time,
251            operation_time: None,
252            #[cfg(test)]
253            convenient_transaction_timeout: None,
254            #[cfg(test)]
255            convenient_transaction_jitter: None,
256        }
257    }
258
259    /// The client used to create this session.
260    pub fn client(&self) -> Client {
261        self.client.clone()
262    }
263
264    /// The id of this session.
265    pub fn id(&self) -> &Document {
266        &self.server_session.id
267    }
268
269    /// Whether this session was created implicitly by the driver or explcitly by the user.
270    pub(crate) fn is_implicit(&self) -> bool {
271        self.is_implicit
272    }
273
274    /// Whether this session is currently in a transaction.
275    pub(crate) fn in_transaction(&self) -> bool {
276        matches!(
277            self.transaction.state,
278            TransactionState::Starting | TransactionState::InProgress
279        )
280    }
281
282    /// The highest seen cluster time this session has seen so far.
283    /// This will be `None` if this session has not been used in an operation yet.
284    pub fn cluster_time(&self) -> Option<&ClusterTime> {
285        self.cluster_time.as_ref()
286    }
287
288    /// The options used to create this session.
289    pub(crate) fn options(&self) -> Option<&SessionOptions> {
290        self.options.as_ref()
291    }
292
293    /// Set the cluster time to the provided one if it is greater than this session's highest seen
294    /// cluster time or if this session's cluster time is `None`.
295    pub fn advance_cluster_time(&mut self, to: &ClusterTime) {
296        if self.cluster_time().map(|ct| ct < to).unwrap_or(true) {
297            self.cluster_time = Some(to.clone());
298        }
299    }
300
301    /// Advance operation time for this session. If the provided timestamp is earlier than this
302    /// session's current operation time, then the operation time is unchanged.
303    pub fn advance_operation_time(&mut self, ts: Timestamp) {
304        self.operation_time = match self.operation_time {
305            Some(current_op_time) if current_op_time < ts => Some(ts),
306            None => Some(ts),
307            _ => self.operation_time,
308        }
309    }
310
311    /// The operation time returned by the last operation executed in this session.
312    pub fn operation_time(&self) -> Option<Timestamp> {
313        self.operation_time
314    }
315
316    /// The snapshot time for a snapshot session. This will return `None` if this session is not a
317    /// snapshot session or if [`snapshot_time`](SessionOptions::snapshot_time) was not set and the
318    /// server has not yet responded with a snapshot time.
319    pub fn snapshot_time(&self) -> Option<Timestamp> {
320        self.snapshot_time
321    }
322
323    pub(crate) fn causal_consistency(&self) -> bool {
324        self.options()
325            .and_then(|opts| opts.causal_consistency)
326            .unwrap_or(!self.is_implicit())
327    }
328
329    /// Mark this session (and the underlying server session) as dirty.
330    pub(crate) fn mark_dirty(&mut self) {
331        self.server_session.dirty = true;
332    }
333
334    /// Updates the date that the underlying server session was last used as part of an operation
335    /// sent to the server.
336    pub(crate) fn update_last_use(&mut self) {
337        self.server_session.last_use = Instant::now();
338    }
339
340    /// Gets the current txn_number.
341    pub(crate) fn txn_number(&self) -> i64 {
342        self.server_session.txn_number
343    }
344
345    /// Increments the txn_number.
346    pub(crate) fn increment_txn_number(&mut self) {
347        self.server_session.txn_number += 1;
348    }
349
350    /// Gets the txn_number to use for an operation based on the current transaction status and the
351    /// operation's retryability.
352    pub(crate) fn get_txn_number_for_operation(
353        &mut self,
354        retryability: Retryability,
355    ) -> Option<i64> {
356        if self.transaction.state != TransactionState::None {
357            Some(self.txn_number())
358        } else if retryability == Retryability::Write {
359            self.increment_txn_number();
360            Some(self.txn_number())
361        } else {
362            None
363        }
364    }
365
366    /// Pin mongos to session.
367    pub(crate) fn pin_mongos(&mut self, address: ServerAddress) {
368        self.transaction.pinned = Some(TransactionPin::Mongos(SelectionCriteria::Predicate(
369            Arc::new(move |server_info: &ServerInfo| *server_info.address() == address),
370        )));
371    }
372
373    /// Pin the connection to the session.
374    pub(crate) fn pin_connection(&mut self, handle: PinnedConnectionHandle) {
375        self.transaction.pinned = Some(TransactionPin::Connection(handle));
376    }
377
378    pub(crate) fn unpin(&mut self) {
379        self.transaction.pinned = None;
380    }
381
382    /// Whether this session is dirty.
383    #[cfg(test)]
384    pub(crate) fn is_dirty(&self) -> bool {
385        self.server_session.dirty
386    }
387
388    fn default_transaction_options(&self) -> Option<&TransactionOptions> {
389        self.options
390            .as_ref()
391            .and_then(|options| options.default_transaction_options.as_ref())
392    }
393}
394
395struct DroppedClientSession {
396    cluster_time: Option<ClusterTime>,
397    server_session: ServerSession,
398    client: Client,
399    is_implicit: bool,
400    options: Option<SessionOptions>,
401    transaction: Transaction,
402    snapshot_time: Option<Timestamp>,
403    operation_time: Option<Timestamp>,
404}
405
406impl From<DroppedClientSession> for ClientSession {
407    fn from(dropped_session: DroppedClientSession) -> Self {
408        Self {
409            cluster_time: dropped_session.cluster_time,
410            server_session: dropped_session.server_session,
411            drop_token: dropped_session.client.register_async_drop(),
412            client: dropped_session.client,
413            is_implicit: dropped_session.is_implicit,
414            options: dropped_session.options,
415            transaction: dropped_session.transaction,
416            snapshot_time: dropped_session.snapshot_time,
417            operation_time: dropped_session.operation_time,
418            #[cfg(test)]
419            convenient_transaction_timeout: None,
420            #[cfg(test)]
421            convenient_transaction_jitter: None,
422        }
423    }
424}
425
426impl Drop for ClientSession {
427    fn drop(&mut self) {
428        if self.transaction.state == TransactionState::InProgress {
429            let dropped_session = DroppedClientSession {
430                cluster_time: self.cluster_time.clone(),
431                server_session: self.server_session.clone(),
432                client: self.client.clone(),
433                is_implicit: self.is_implicit,
434                options: self.options.clone(),
435                transaction: self.transaction.take(),
436                snapshot_time: self.snapshot_time,
437                operation_time: self.operation_time,
438            };
439            self.drop_token.spawn(async move {
440                let mut session: ClientSession = dropped_session.into();
441                let _result = session.abort_transaction().await;
442            });
443        } else {
444            let client = self.client.clone();
445            let server_session = self.server_session.clone();
446            self.drop_token.spawn(async move {
447                client.check_in_server_session(server_session).await;
448            });
449        }
450    }
451}
452
453/// Client side abstraction of a server session. These are pooled and may be associated with
454/// multiple `ClientSession`s over the course of their lifetime.
455#[derive(Clone, Debug)]
456pub(crate) struct ServerSession {
457    /// The id of the server session to which this corresponds.
458    pub(crate) id: Document,
459
460    /// The last time an operation was executed with this session.
461    last_use: std::time::Instant,
462
463    /// Whether a network error was encountered while using this session.
464    dirty: bool,
465
466    /// A monotonically increasing transaction number for this session.
467    txn_number: i64,
468}
469
470impl ServerSession {
471    /// Creates a new session, generating the id client side.
472    fn new() -> Self {
473        let binary = Bson::Binary(Binary {
474            subtype: BinarySubtype::Uuid,
475            bytes: Uuid::new_v4().as_bytes().to_vec(),
476        });
477
478        Self {
479            id: doc! { "id": binary },
480            last_use: Instant::now(),
481            dirty: false,
482            txn_number: 0,
483        }
484    }
485
486    /// Determines if this server session is about to expire in a short amount of time (1 minute).
487    fn is_about_to_expire(&self, logical_session_timeout: Option<Duration>) -> bool {
488        let timeout = match logical_session_timeout {
489            Some(t) => t,
490            None => return false,
491        };
492        let expiration_date = self.last_use + timeout;
493        expiration_date < Instant::now() + Duration::from_secs(60)
494    }
495}