mongodb/client/
session.rs

1mod action;
2mod cluster_time;
3mod pool;
4#[cfg(test)]
5mod test;
6
7use std::{
8    collections::HashSet,
9    sync::Arc,
10    time::{Duration, Instant},
11};
12
13use once_cell::sync::Lazy;
14use uuid::Uuid;
15
16use crate::{
17    bson::{doc, spec::BinarySubtype, Binary, Bson, Document, Timestamp},
18    cmap::conn::PinnedConnectionHandle,
19    options::{SessionOptions, TransactionOptions},
20    sdam::ServerInfo,
21    selection_criteria::SelectionCriteria,
22    Client,
23};
24pub use cluster_time::ClusterTime;
25pub(super) use pool::ServerSessionPool;
26
27use super::{options::ServerAddress, AsyncDropToken};
28
29pub(crate) static SESSIONS_UNSUPPORTED_COMMANDS: Lazy<HashSet<&'static str>> = Lazy::new(|| {
30    let mut hash_set = HashSet::new();
31    hash_set.insert("killcursors");
32    hash_set.insert("parallelcollectionscan");
33    hash_set
34});
35
36/// A MongoDB client session. This struct represents a logical session used for ordering sequential
37/// operations. To create a `ClientSession`, call `start_session` on a `Client`.
38///
39/// `ClientSession` instances are not thread safe or fork safe. They can only be used by one thread
40/// or process at a time.
41///
42/// ## Transactions
43/// Transactions are used to execute a series of operations across multiple documents and
44/// collections atomically. For more information about when and how to use transactions in MongoDB,
45/// see the [manual](https://www.mongodb.com/docs/manual/core/transactions/).
46///
47/// Replica set transactions are supported on MongoDB 4.0+. Sharded transactions are supported on
48/// MongoDDB 4.2+. Transactions are associated with a `ClientSession`. To begin a transaction, call
49/// [`ClientSession::start_transaction`] on a `ClientSession`. The `ClientSession` must be passed to
50/// operations to be executed within the transaction.
51///
52/// ```rust
53/// use mongodb::{
54///     bson::{doc, Document},
55///     error::{Result, TRANSIENT_TRANSACTION_ERROR, UNKNOWN_TRANSACTION_COMMIT_RESULT},
56///     options::{Acknowledgment, ReadConcern, TransactionOptions, WriteConcern},
57/// #   Client,
58///     ClientSession,
59///     Collection,
60/// };
61///
62/// # async fn do_stuff() -> Result<()> {
63/// # let client = Client::with_uri_str("mongodb://example.com").await?;
64/// # let coll: Collection<Document> = client.database("foo").collection("bar");
65/// let mut session = client.start_session().await?;
66/// session
67///     .start_transaction()
68///     .read_concern(ReadConcern::majority())
69///     .write_concern(WriteConcern::majority())
70///     .await?;
71/// // A "TransientTransactionError" label indicates that the entire transaction can be retried
72/// // with a reasonable expectation that it will succeed.
73/// while let Err(error) = execute_transaction(&coll, &mut session).await {
74///     if !error.contains_label(TRANSIENT_TRANSACTION_ERROR) {
75///         break;
76///     }
77/// }
78/// # Ok(())
79/// # }
80///
81/// async fn execute_transaction(coll: &Collection<Document>, session: &mut ClientSession) -> Result<()> {
82///     coll.insert_one(doc! { "x": 1 }).session(&mut *session).await?;
83///     coll.delete_one(doc! { "y": 2 }).session(&mut *session).await?;
84///     // An "UnknownTransactionCommitResult" label indicates that it is unknown whether the
85///     // commit has satisfied the write concern associated with the transaction. If an error
86///     // with this label is returned, it is safe to retry the commit until the write concern is
87///     // satisfied or an error without the label is returned.
88///     loop {
89///         let result = session.commit_transaction().await;
90///         if let Err(ref error) = result {
91///             if error.contains_label(UNKNOWN_TRANSACTION_COMMIT_RESULT) {
92///                 continue;
93///             }
94///         }
95///         result?
96///     }
97/// }
98/// ```
99#[derive(Debug)]
100pub struct ClientSession {
101    cluster_time: Option<ClusterTime>,
102    server_session: ServerSession,
103    client: Client,
104    is_implicit: bool,
105    options: Option<SessionOptions>,
106    drop_token: AsyncDropToken,
107    pub(crate) transaction: Transaction,
108    pub(crate) snapshot_time: Option<Timestamp>,
109    pub(crate) operation_time: Option<Timestamp>,
110    #[cfg(test)]
111    pub(crate) convenient_transaction_timeout: Option<Duration>,
112}
113
114#[derive(Debug)]
115pub(crate) struct Transaction {
116    pub(crate) state: TransactionState,
117    pub(crate) options: Option<TransactionOptions>,
118    pub(crate) pinned: Option<TransactionPin>,
119    pub(crate) recovery_token: Option<Document>,
120}
121
122impl Transaction {
123    pub(crate) fn start(&mut self, options: Option<TransactionOptions>) {
124        self.state = TransactionState::Starting;
125        self.options = options;
126        self.recovery_token = None;
127    }
128
129    pub(crate) fn commit(&mut self, data_committed: bool) {
130        self.state = TransactionState::Committed { data_committed };
131    }
132
133    pub(crate) fn abort(&mut self) {
134        self.state = TransactionState::Aborted;
135        self.options = None;
136        self.pinned = None;
137    }
138
139    pub(crate) fn reset(&mut self) {
140        self.state = TransactionState::None;
141        self.options = None;
142        self.pinned = None;
143        self.recovery_token = None;
144    }
145
146    #[cfg(test)]
147    pub(crate) fn is_pinned(&self) -> bool {
148        self.pinned.is_some()
149    }
150
151    pub(crate) fn pinned_mongos(&self) -> Option<&SelectionCriteria> {
152        match &self.pinned {
153            Some(TransactionPin::Mongos(s)) => Some(s),
154            _ => None,
155        }
156    }
157
158    pub(crate) fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
159        match &self.pinned {
160            Some(TransactionPin::Connection(c)) => Some(c),
161            _ => None,
162        }
163    }
164
165    fn take(&mut self) -> Self {
166        Transaction {
167            state: self.state.clone(),
168            options: self.options.take(),
169            pinned: self.pinned.take(),
170            recovery_token: self.recovery_token.take(),
171        }
172    }
173}
174
175impl Default for Transaction {
176    fn default() -> Self {
177        Self {
178            state: TransactionState::None,
179            options: None,
180            pinned: None,
181            recovery_token: None,
182        }
183    }
184}
185
186#[derive(Clone, Debug, PartialEq)]
187pub(crate) enum TransactionState {
188    None,
189    Starting,
190    InProgress,
191    Committed {
192        /// Whether any data was committed when commit_transaction was initially called. This is
193        /// required to determine whether a commitTransaction command should be run if the user
194        /// calls commit_transaction again.
195        data_committed: bool,
196    },
197    Aborted,
198}
199
200#[derive(Debug)]
201pub(crate) enum TransactionPin {
202    Mongos(SelectionCriteria),
203    Connection(PinnedConnectionHandle),
204}
205
206impl ClientSession {
207    /// Creates a new `ClientSession` by checking out a corresponding `ServerSession` from the
208    /// provided client's session pool.
209    pub(crate) async fn new(
210        client: Client,
211        options: Option<SessionOptions>,
212        is_implicit: bool,
213    ) -> Self {
214        let timeout = client.inner.topology.logical_session_timeout();
215        let server_session = client.inner.session_pool.check_out(timeout).await;
216        Self {
217            drop_token: client.register_async_drop(),
218            client,
219            server_session,
220            cluster_time: None,
221            is_implicit,
222            options,
223            transaction: Default::default(),
224            snapshot_time: None,
225            operation_time: None,
226            #[cfg(test)]
227            convenient_transaction_timeout: None,
228        }
229    }
230
231    /// The client used to create this session.
232    pub fn client(&self) -> Client {
233        self.client.clone()
234    }
235
236    /// The id of this session.
237    pub fn id(&self) -> &Document {
238        &self.server_session.id
239    }
240
241    /// Whether this session was created implicitly by the driver or explcitly by the user.
242    pub(crate) fn is_implicit(&self) -> bool {
243        self.is_implicit
244    }
245
246    /// Whether this session is currently in a transaction.
247    pub(crate) fn in_transaction(&self) -> bool {
248        self.transaction.state == TransactionState::Starting
249            || self.transaction.state == TransactionState::InProgress
250    }
251
252    /// The highest seen cluster time this session has seen so far.
253    /// This will be `None` if this session has not been used in an operation yet.
254    pub fn cluster_time(&self) -> Option<&ClusterTime> {
255        self.cluster_time.as_ref()
256    }
257
258    /// The options used to create this session.
259    pub(crate) fn options(&self) -> Option<&SessionOptions> {
260        self.options.as_ref()
261    }
262
263    /// Set the cluster time to the provided one if it is greater than this session's highest seen
264    /// cluster time or if this session's cluster time is `None`.
265    pub fn advance_cluster_time(&mut self, to: &ClusterTime) {
266        if self.cluster_time().map(|ct| ct < to).unwrap_or(true) {
267            self.cluster_time = Some(to.clone());
268        }
269    }
270
271    /// Advance operation time for this session. If the provided timestamp is earlier than this
272    /// session's current operation time, then the operation time is unchanged.
273    pub fn advance_operation_time(&mut self, ts: Timestamp) {
274        self.operation_time = match self.operation_time {
275            Some(current_op_time) if current_op_time < ts => Some(ts),
276            None => Some(ts),
277            _ => self.operation_time,
278        }
279    }
280
281    /// The operation time returned by the last operation executed in this session.
282    pub fn operation_time(&self) -> Option<Timestamp> {
283        self.operation_time
284    }
285
286    pub(crate) fn causal_consistency(&self) -> bool {
287        self.options()
288            .and_then(|opts| opts.causal_consistency)
289            .unwrap_or(!self.is_implicit())
290    }
291
292    /// Mark this session (and the underlying server session) as dirty.
293    pub(crate) fn mark_dirty(&mut self) {
294        self.server_session.dirty = true;
295    }
296
297    /// Updates the date that the underlying server session was last used as part of an operation
298    /// sent to the server.
299    pub(crate) fn update_last_use(&mut self) {
300        self.server_session.last_use = Instant::now();
301    }
302
303    /// Gets the current txn_number.
304    pub(crate) fn txn_number(&self) -> i64 {
305        self.server_session.txn_number
306    }
307
308    /// Increments the txn_number.
309    pub(crate) fn increment_txn_number(&mut self) {
310        self.server_session.txn_number += 1;
311    }
312
313    /// Increments the txn_number and returns the new value.
314    pub(crate) fn get_and_increment_txn_number(&mut self) -> i64 {
315        self.increment_txn_number();
316        self.server_session.txn_number
317    }
318
319    /// Pin mongos to session.
320    pub(crate) fn pin_mongos(&mut self, address: ServerAddress) {
321        self.transaction.pinned = Some(TransactionPin::Mongos(SelectionCriteria::Predicate(
322            Arc::new(move |server_info: &ServerInfo| *server_info.address() == address),
323        )));
324    }
325
326    /// Pin the connection to the session.
327    pub(crate) fn pin_connection(&mut self, handle: PinnedConnectionHandle) {
328        self.transaction.pinned = Some(TransactionPin::Connection(handle));
329    }
330
331    pub(crate) fn unpin(&mut self) {
332        self.transaction.pinned = None;
333    }
334
335    /// Whether this session is dirty.
336    #[cfg(test)]
337    pub(crate) fn is_dirty(&self) -> bool {
338        self.server_session.dirty
339    }
340
341    fn default_transaction_options(&self) -> Option<&TransactionOptions> {
342        self.options
343            .as_ref()
344            .and_then(|options| options.default_transaction_options.as_ref())
345    }
346}
347
348struct DroppedClientSession {
349    cluster_time: Option<ClusterTime>,
350    server_session: ServerSession,
351    client: Client,
352    is_implicit: bool,
353    options: Option<SessionOptions>,
354    transaction: Transaction,
355    snapshot_time: Option<Timestamp>,
356    operation_time: Option<Timestamp>,
357}
358
359impl From<DroppedClientSession> for ClientSession {
360    fn from(dropped_session: DroppedClientSession) -> Self {
361        Self {
362            cluster_time: dropped_session.cluster_time,
363            server_session: dropped_session.server_session,
364            drop_token: dropped_session.client.register_async_drop(),
365            client: dropped_session.client,
366            is_implicit: dropped_session.is_implicit,
367            options: dropped_session.options,
368            transaction: dropped_session.transaction,
369            snapshot_time: dropped_session.snapshot_time,
370            operation_time: dropped_session.operation_time,
371            #[cfg(test)]
372            convenient_transaction_timeout: None,
373        }
374    }
375}
376
377impl Drop for ClientSession {
378    fn drop(&mut self) {
379        if self.transaction.state == TransactionState::InProgress {
380            let dropped_session = DroppedClientSession {
381                cluster_time: self.cluster_time.clone(),
382                server_session: self.server_session.clone(),
383                client: self.client.clone(),
384                is_implicit: self.is_implicit,
385                options: self.options.clone(),
386                transaction: self.transaction.take(),
387                snapshot_time: self.snapshot_time,
388                operation_time: self.operation_time,
389            };
390            self.drop_token.spawn(async move {
391                let mut session: ClientSession = dropped_session.into();
392                let _result = session.abort_transaction().await;
393            });
394        } else {
395            let client = self.client.clone();
396            let server_session = self.server_session.clone();
397            self.drop_token.spawn(async move {
398                client.check_in_server_session(server_session).await;
399            });
400        }
401    }
402}
403
404/// Client side abstraction of a server session. These are pooled and may be associated with
405/// multiple `ClientSession`s over the course of their lifetime.
406#[derive(Clone, Debug)]
407pub(crate) struct ServerSession {
408    /// The id of the server session to which this corresponds.
409    pub(crate) id: Document,
410
411    /// The last time an operation was executed with this session.
412    last_use: std::time::Instant,
413
414    /// Whether a network error was encountered while using this session.
415    dirty: bool,
416
417    /// A monotonically increasing transaction number for this session.
418    txn_number: i64,
419}
420
421impl ServerSession {
422    /// Creates a new session, generating the id client side.
423    fn new() -> Self {
424        let binary = Bson::Binary(Binary {
425            subtype: BinarySubtype::Uuid,
426            bytes: Uuid::new_v4().as_bytes().to_vec(),
427        });
428
429        Self {
430            id: doc! { "id": binary },
431            last_use: Instant::now(),
432            dirty: false,
433            txn_number: 0,
434        }
435    }
436
437    /// Determines if this server session is about to expire in a short amount of time (1 minute).
438    fn is_about_to_expire(&self, logical_session_timeout: Option<Duration>) -> bool {
439        let timeout = match logical_session_timeout {
440            Some(t) => t,
441            None => return false,
442        };
443        let expiration_date = self.last_use + timeout;
444        expiration_date < Instant::now() + Duration::from_secs(60)
445    }
446}