1mod action;
2mod cluster_time;
3mod pool;
4#[cfg(test)]
5mod test;
6
7use std::{
8 collections::HashSet,
9 sync::Arc,
10 time::{Duration, Instant},
11};
12
13use std::sync::LazyLock;
14use uuid::Uuid;
15
16use crate::{
17 bson::{doc, spec::BinarySubtype, Binary, Bson, Document, Timestamp},
18 cmap::conn::PinnedConnectionHandle,
19 operation::Retryability,
20 options::{SessionOptions, TransactionOptions},
21 sdam::ServerInfo,
22 selection_criteria::SelectionCriteria,
23 Client,
24};
25pub use cluster_time::ClusterTime;
26pub(super) use pool::ServerSessionPool;
27
28use super::{options::ServerAddress, AsyncDropToken};
29
30pub(crate) static SESSIONS_UNSUPPORTED_COMMANDS: LazyLock<HashSet<&'static str>> =
31 LazyLock::new(|| {
32 let mut hash_set = HashSet::new();
33 hash_set.insert("killcursors");
34 hash_set.insert("parallelcollectionscan");
35 hash_set
36 });
37
38#[derive(Debug)]
101pub struct ClientSession {
102 cluster_time: Option<ClusterTime>,
103 server_session: ServerSession,
104 client: Client,
105 is_implicit: bool,
106 options: Option<SessionOptions>,
107 drop_token: AsyncDropToken,
108 pub(crate) transaction: Transaction,
109 pub(crate) snapshot_time: Option<Timestamp>,
110 pub(crate) operation_time: Option<Timestamp>,
111 #[cfg(test)]
112 pub(crate) convenient_transaction_timeout: Option<Duration>,
113 #[cfg(test)]
114 pub(crate) convenient_transaction_jitter: Option<f64>,
115}
116
117#[derive(Debug)]
118pub(crate) struct Transaction {
119 pub(crate) state: TransactionState,
120 pub(crate) options: Option<TransactionOptions>,
121 pub(crate) pinned: Option<TransactionPin>,
122 pub(crate) recovery_token: Option<Document>,
123 #[cfg(feature = "opentelemetry")]
124 pub(crate) otel_span: Option<crate::otel::TxnSpan>,
125}
126
127impl Transaction {
128 pub(crate) fn start(
129 &mut self,
130 options: Option<TransactionOptions>,
131 #[cfg(feature = "opentelemetry")] otel_span: crate::otel::TxnSpan,
132 ) {
133 self.state = TransactionState::Starting;
134 self.options = options;
135 self.recovery_token = None;
136 #[cfg(feature = "opentelemetry")]
137 {
138 self.otel_span = Some(otel_span);
139 }
140 }
141
142 pub(crate) fn commit(&mut self, data_committed: bool) {
143 self.state = TransactionState::Committed { data_committed };
144 }
145
146 pub(crate) fn abort(&mut self) {
147 self.state = TransactionState::Aborted;
148 self.options = None;
149 self.pinned = None;
150 }
151
152 pub(crate) fn reset(&mut self) {
153 self.state = TransactionState::None;
154 self.options = None;
155 self.pinned = None;
156 self.recovery_token = None;
157 self.drop_span();
158 }
159
160 pub(crate) fn drop_span(&mut self) {
161 #[cfg(feature = "opentelemetry")]
162 {
163 self.otel_span = None;
164 }
165 }
166
167 #[cfg(test)]
168 pub(crate) fn is_pinned(&self) -> bool {
169 self.pinned.is_some()
170 }
171
172 pub(crate) fn pinned_mongos(&self) -> Option<&SelectionCriteria> {
173 match &self.pinned {
174 Some(TransactionPin::Mongos(s)) => Some(s),
175 _ => None,
176 }
177 }
178
179 pub(crate) fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
180 match &self.pinned {
181 Some(TransactionPin::Connection(c)) => Some(c),
182 _ => None,
183 }
184 }
185
186 fn take(&mut self) -> Self {
187 Transaction {
188 state: self.state.clone(),
189 options: self.options.take(),
190 pinned: self.pinned.take(),
191 recovery_token: self.recovery_token.take(),
192 #[cfg(feature = "opentelemetry")]
193 otel_span: self.otel_span.take(),
194 }
195 }
196}
197
198impl Default for Transaction {
199 fn default() -> Self {
200 Self {
201 state: TransactionState::None,
202 options: None,
203 pinned: None,
204 recovery_token: None,
205 #[cfg(feature = "opentelemetry")]
206 otel_span: None,
207 }
208 }
209}
210
211#[derive(Clone, Debug, PartialEq)]
212pub(crate) enum TransactionState {
213 None,
214 Starting,
215 InProgress,
216 Committed {
217 data_committed: bool,
221 },
222 Aborted,
223}
224
225#[derive(Debug)]
226pub(crate) enum TransactionPin {
227 Mongos(SelectionCriteria),
228 Connection(PinnedConnectionHandle),
229}
230
231impl ClientSession {
232 pub(crate) async fn new(
235 client: Client,
236 options: Option<SessionOptions>,
237 is_implicit: bool,
238 ) -> Self {
239 let timeout = client.inner.topology.watcher().logical_session_timeout();
240 let server_session = client.inner.session_pool.check_out(timeout).await;
241 let snapshot_time = options.as_ref().and_then(|o| o.snapshot_time);
242 Self {
243 drop_token: client.register_async_drop(),
244 client,
245 server_session,
246 cluster_time: None,
247 is_implicit,
248 options,
249 transaction: Default::default(),
250 snapshot_time,
251 operation_time: None,
252 #[cfg(test)]
253 convenient_transaction_timeout: None,
254 #[cfg(test)]
255 convenient_transaction_jitter: None,
256 }
257 }
258
259 pub fn client(&self) -> Client {
261 self.client.clone()
262 }
263
264 pub fn id(&self) -> &Document {
266 &self.server_session.id
267 }
268
269 pub(crate) fn is_implicit(&self) -> bool {
271 self.is_implicit
272 }
273
274 pub(crate) fn in_transaction(&self) -> bool {
276 matches!(
277 self.transaction.state,
278 TransactionState::Starting | TransactionState::InProgress
279 )
280 }
281
282 pub fn cluster_time(&self) -> Option<&ClusterTime> {
285 self.cluster_time.as_ref()
286 }
287
288 pub(crate) fn options(&self) -> Option<&SessionOptions> {
290 self.options.as_ref()
291 }
292
293 pub fn advance_cluster_time(&mut self, to: &ClusterTime) {
296 if self.cluster_time().map(|ct| ct < to).unwrap_or(true) {
297 self.cluster_time = Some(to.clone());
298 }
299 }
300
301 pub fn advance_operation_time(&mut self, ts: Timestamp) {
304 self.operation_time = match self.operation_time {
305 Some(current_op_time) if current_op_time < ts => Some(ts),
306 None => Some(ts),
307 _ => self.operation_time,
308 }
309 }
310
311 pub fn operation_time(&self) -> Option<Timestamp> {
313 self.operation_time
314 }
315
316 pub fn snapshot_time(&self) -> Option<Timestamp> {
320 self.snapshot_time
321 }
322
323 pub(crate) fn causal_consistency(&self) -> bool {
324 self.options()
325 .and_then(|opts| opts.causal_consistency)
326 .unwrap_or(!self.is_implicit())
327 }
328
329 pub(crate) fn mark_dirty(&mut self) {
331 self.server_session.dirty = true;
332 }
333
334 pub(crate) fn update_last_use(&mut self) {
337 self.server_session.last_use = Instant::now();
338 }
339
340 pub(crate) fn txn_number(&self) -> i64 {
342 self.server_session.txn_number
343 }
344
345 pub(crate) fn increment_txn_number(&mut self) {
347 self.server_session.txn_number += 1;
348 }
349
350 pub(crate) fn get_txn_number_for_operation(
353 &mut self,
354 retryability: Retryability,
355 ) -> Option<i64> {
356 if self.transaction.state != TransactionState::None {
357 Some(self.txn_number())
358 } else if retryability == Retryability::Write {
359 self.increment_txn_number();
360 Some(self.txn_number())
361 } else {
362 None
363 }
364 }
365
366 pub(crate) fn pin_mongos(&mut self, address: ServerAddress) {
368 self.transaction.pinned = Some(TransactionPin::Mongos(SelectionCriteria::Predicate(
369 Arc::new(move |server_info: &ServerInfo| *server_info.address() == address),
370 )));
371 }
372
373 pub(crate) fn pin_connection(&mut self, handle: PinnedConnectionHandle) {
375 self.transaction.pinned = Some(TransactionPin::Connection(handle));
376 }
377
378 pub(crate) fn unpin(&mut self) {
379 self.transaction.pinned = None;
380 }
381
382 #[cfg(test)]
384 pub(crate) fn is_dirty(&self) -> bool {
385 self.server_session.dirty
386 }
387
388 fn default_transaction_options(&self) -> Option<&TransactionOptions> {
389 self.options
390 .as_ref()
391 .and_then(|options| options.default_transaction_options.as_ref())
392 }
393}
394
395struct DroppedClientSession {
396 cluster_time: Option<ClusterTime>,
397 server_session: ServerSession,
398 client: Client,
399 is_implicit: bool,
400 options: Option<SessionOptions>,
401 transaction: Transaction,
402 snapshot_time: Option<Timestamp>,
403 operation_time: Option<Timestamp>,
404}
405
406impl From<DroppedClientSession> for ClientSession {
407 fn from(dropped_session: DroppedClientSession) -> Self {
408 Self {
409 cluster_time: dropped_session.cluster_time,
410 server_session: dropped_session.server_session,
411 drop_token: dropped_session.client.register_async_drop(),
412 client: dropped_session.client,
413 is_implicit: dropped_session.is_implicit,
414 options: dropped_session.options,
415 transaction: dropped_session.transaction,
416 snapshot_time: dropped_session.snapshot_time,
417 operation_time: dropped_session.operation_time,
418 #[cfg(test)]
419 convenient_transaction_timeout: None,
420 #[cfg(test)]
421 convenient_transaction_jitter: None,
422 }
423 }
424}
425
426impl Drop for ClientSession {
427 fn drop(&mut self) {
428 if self.transaction.state == TransactionState::InProgress {
429 let dropped_session = DroppedClientSession {
430 cluster_time: self.cluster_time.clone(),
431 server_session: self.server_session.clone(),
432 client: self.client.clone(),
433 is_implicit: self.is_implicit,
434 options: self.options.clone(),
435 transaction: self.transaction.take(),
436 snapshot_time: self.snapshot_time,
437 operation_time: self.operation_time,
438 };
439 self.drop_token.spawn(async move {
440 let mut session: ClientSession = dropped_session.into();
441 let _result = session.abort_transaction().await;
442 });
443 } else {
444 let client = self.client.clone();
445 let server_session = self.server_session.clone();
446 self.drop_token.spawn(async move {
447 client.check_in_server_session(server_session).await;
448 });
449 }
450 }
451}
452
453#[derive(Clone, Debug)]
456pub(crate) struct ServerSession {
457 pub(crate) id: Document,
459
460 last_use: std::time::Instant,
462
463 dirty: bool,
465
466 txn_number: i64,
468}
469
470impl ServerSession {
471 fn new() -> Self {
473 let binary = Bson::Binary(Binary {
474 subtype: BinarySubtype::Uuid,
475 bytes: Uuid::new_v4().as_bytes().to_vec(),
476 });
477
478 Self {
479 id: doc! { "id": binary },
480 last_use: Instant::now(),
481 dirty: false,
482 txn_number: 0,
483 }
484 }
485
486 fn is_about_to_expire(&self, logical_session_timeout: Option<Duration>) -> bool {
488 let timeout = match logical_session_timeout {
489 Some(t) => t,
490 None => return false,
491 };
492 let expiration_date = self.last_use + timeout;
493 expiration_date < Instant::now() + Duration::from_secs(60)
494 }
495}