1mod action;
2mod cluster_time;
3mod pool;
4#[cfg(test)]
5mod test;
6
7use std::{
8 collections::HashSet,
9 sync::Arc,
10 time::{Duration, Instant},
11};
12
13use once_cell::sync::Lazy;
14use uuid::Uuid;
15
16use crate::{
17 bson::{doc, spec::BinarySubtype, Binary, Bson, Document, Timestamp},
18 cmap::conn::PinnedConnectionHandle,
19 options::{SessionOptions, TransactionOptions},
20 sdam::ServerInfo,
21 selection_criteria::SelectionCriteria,
22 Client,
23};
24pub use cluster_time::ClusterTime;
25pub(super) use pool::ServerSessionPool;
26
27use super::{options::ServerAddress, AsyncDropToken};
28
29pub(crate) static SESSIONS_UNSUPPORTED_COMMANDS: Lazy<HashSet<&'static str>> = Lazy::new(|| {
30 let mut hash_set = HashSet::new();
31 hash_set.insert("killcursors");
32 hash_set.insert("parallelcollectionscan");
33 hash_set
34});
35
36#[derive(Debug)]
100pub struct ClientSession {
101 cluster_time: Option<ClusterTime>,
102 server_session: ServerSession,
103 client: Client,
104 is_implicit: bool,
105 options: Option<SessionOptions>,
106 drop_token: AsyncDropToken,
107 pub(crate) transaction: Transaction,
108 pub(crate) snapshot_time: Option<Timestamp>,
109 pub(crate) operation_time: Option<Timestamp>,
110 #[cfg(test)]
111 pub(crate) convenient_transaction_timeout: Option<Duration>,
112}
113
114#[derive(Debug)]
115pub(crate) struct Transaction {
116 pub(crate) state: TransactionState,
117 pub(crate) options: Option<TransactionOptions>,
118 pub(crate) pinned: Option<TransactionPin>,
119 pub(crate) recovery_token: Option<Document>,
120}
121
122impl Transaction {
123 pub(crate) fn start(&mut self, options: Option<TransactionOptions>) {
124 self.state = TransactionState::Starting;
125 self.options = options;
126 self.recovery_token = None;
127 }
128
129 pub(crate) fn commit(&mut self, data_committed: bool) {
130 self.state = TransactionState::Committed { data_committed };
131 }
132
133 pub(crate) fn abort(&mut self) {
134 self.state = TransactionState::Aborted;
135 self.options = None;
136 self.pinned = None;
137 }
138
139 pub(crate) fn reset(&mut self) {
140 self.state = TransactionState::None;
141 self.options = None;
142 self.pinned = None;
143 self.recovery_token = None;
144 }
145
146 #[cfg(test)]
147 pub(crate) fn is_pinned(&self) -> bool {
148 self.pinned.is_some()
149 }
150
151 pub(crate) fn pinned_mongos(&self) -> Option<&SelectionCriteria> {
152 match &self.pinned {
153 Some(TransactionPin::Mongos(s)) => Some(s),
154 _ => None,
155 }
156 }
157
158 pub(crate) fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
159 match &self.pinned {
160 Some(TransactionPin::Connection(c)) => Some(c),
161 _ => None,
162 }
163 }
164
165 fn take(&mut self) -> Self {
166 Transaction {
167 state: self.state.clone(),
168 options: self.options.take(),
169 pinned: self.pinned.take(),
170 recovery_token: self.recovery_token.take(),
171 }
172 }
173}
174
175impl Default for Transaction {
176 fn default() -> Self {
177 Self {
178 state: TransactionState::None,
179 options: None,
180 pinned: None,
181 recovery_token: None,
182 }
183 }
184}
185
186#[derive(Clone, Debug, PartialEq)]
187pub(crate) enum TransactionState {
188 None,
189 Starting,
190 InProgress,
191 Committed {
192 data_committed: bool,
196 },
197 Aborted,
198}
199
200#[derive(Debug)]
201pub(crate) enum TransactionPin {
202 Mongos(SelectionCriteria),
203 Connection(PinnedConnectionHandle),
204}
205
206impl ClientSession {
207 pub(crate) async fn new(
210 client: Client,
211 options: Option<SessionOptions>,
212 is_implicit: bool,
213 ) -> Self {
214 let timeout = client.inner.topology.logical_session_timeout();
215 let server_session = client.inner.session_pool.check_out(timeout).await;
216 Self {
217 drop_token: client.register_async_drop(),
218 client,
219 server_session,
220 cluster_time: None,
221 is_implicit,
222 options,
223 transaction: Default::default(),
224 snapshot_time: None,
225 operation_time: None,
226 #[cfg(test)]
227 convenient_transaction_timeout: None,
228 }
229 }
230
231 pub fn client(&self) -> Client {
233 self.client.clone()
234 }
235
236 pub fn id(&self) -> &Document {
238 &self.server_session.id
239 }
240
241 pub(crate) fn is_implicit(&self) -> bool {
243 self.is_implicit
244 }
245
246 pub(crate) fn in_transaction(&self) -> bool {
248 self.transaction.state == TransactionState::Starting
249 || self.transaction.state == TransactionState::InProgress
250 }
251
252 pub fn cluster_time(&self) -> Option<&ClusterTime> {
255 self.cluster_time.as_ref()
256 }
257
258 pub(crate) fn options(&self) -> Option<&SessionOptions> {
260 self.options.as_ref()
261 }
262
263 pub fn advance_cluster_time(&mut self, to: &ClusterTime) {
266 if self.cluster_time().map(|ct| ct < to).unwrap_or(true) {
267 self.cluster_time = Some(to.clone());
268 }
269 }
270
271 pub fn advance_operation_time(&mut self, ts: Timestamp) {
274 self.operation_time = match self.operation_time {
275 Some(current_op_time) if current_op_time < ts => Some(ts),
276 None => Some(ts),
277 _ => self.operation_time,
278 }
279 }
280
281 pub fn operation_time(&self) -> Option<Timestamp> {
283 self.operation_time
284 }
285
286 pub(crate) fn causal_consistency(&self) -> bool {
287 self.options()
288 .and_then(|opts| opts.causal_consistency)
289 .unwrap_or(!self.is_implicit())
290 }
291
292 pub(crate) fn mark_dirty(&mut self) {
294 self.server_session.dirty = true;
295 }
296
297 pub(crate) fn update_last_use(&mut self) {
300 self.server_session.last_use = Instant::now();
301 }
302
303 pub(crate) fn txn_number(&self) -> i64 {
305 self.server_session.txn_number
306 }
307
308 pub(crate) fn increment_txn_number(&mut self) {
310 self.server_session.txn_number += 1;
311 }
312
313 pub(crate) fn get_and_increment_txn_number(&mut self) -> i64 {
315 self.increment_txn_number();
316 self.server_session.txn_number
317 }
318
319 pub(crate) fn pin_mongos(&mut self, address: ServerAddress) {
321 self.transaction.pinned = Some(TransactionPin::Mongos(SelectionCriteria::Predicate(
322 Arc::new(move |server_info: &ServerInfo| *server_info.address() == address),
323 )));
324 }
325
326 pub(crate) fn pin_connection(&mut self, handle: PinnedConnectionHandle) {
328 self.transaction.pinned = Some(TransactionPin::Connection(handle));
329 }
330
331 pub(crate) fn unpin(&mut self) {
332 self.transaction.pinned = None;
333 }
334
335 #[cfg(test)]
337 pub(crate) fn is_dirty(&self) -> bool {
338 self.server_session.dirty
339 }
340
341 fn default_transaction_options(&self) -> Option<&TransactionOptions> {
342 self.options
343 .as_ref()
344 .and_then(|options| options.default_transaction_options.as_ref())
345 }
346}
347
348struct DroppedClientSession {
349 cluster_time: Option<ClusterTime>,
350 server_session: ServerSession,
351 client: Client,
352 is_implicit: bool,
353 options: Option<SessionOptions>,
354 transaction: Transaction,
355 snapshot_time: Option<Timestamp>,
356 operation_time: Option<Timestamp>,
357}
358
359impl From<DroppedClientSession> for ClientSession {
360 fn from(dropped_session: DroppedClientSession) -> Self {
361 Self {
362 cluster_time: dropped_session.cluster_time,
363 server_session: dropped_session.server_session,
364 drop_token: dropped_session.client.register_async_drop(),
365 client: dropped_session.client,
366 is_implicit: dropped_session.is_implicit,
367 options: dropped_session.options,
368 transaction: dropped_session.transaction,
369 snapshot_time: dropped_session.snapshot_time,
370 operation_time: dropped_session.operation_time,
371 #[cfg(test)]
372 convenient_transaction_timeout: None,
373 }
374 }
375}
376
377impl Drop for ClientSession {
378 fn drop(&mut self) {
379 if self.transaction.state == TransactionState::InProgress {
380 let dropped_session = DroppedClientSession {
381 cluster_time: self.cluster_time.clone(),
382 server_session: self.server_session.clone(),
383 client: self.client.clone(),
384 is_implicit: self.is_implicit,
385 options: self.options.clone(),
386 transaction: self.transaction.take(),
387 snapshot_time: self.snapshot_time,
388 operation_time: self.operation_time,
389 };
390 self.drop_token.spawn(async move {
391 let mut session: ClientSession = dropped_session.into();
392 let _result = session.abort_transaction().await;
393 });
394 } else {
395 let client = self.client.clone();
396 let server_session = self.server_session.clone();
397 self.drop_token.spawn(async move {
398 client.check_in_server_session(server_session).await;
399 });
400 }
401 }
402}
403
404#[derive(Clone, Debug)]
407pub(crate) struct ServerSession {
408 pub(crate) id: Document,
410
411 last_use: std::time::Instant,
413
414 dirty: bool,
416
417 txn_number: i64,
419}
420
421impl ServerSession {
422 fn new() -> Self {
424 let binary = Bson::Binary(Binary {
425 subtype: BinarySubtype::Uuid,
426 bytes: Uuid::new_v4().as_bytes().to_vec(),
427 });
428
429 Self {
430 id: doc! { "id": binary },
431 last_use: Instant::now(),
432 dirty: false,
433 txn_number: 0,
434 }
435 }
436
437 fn is_about_to_expire(&self, logical_session_timeout: Option<Duration>) -> bool {
439 let timeout = match logical_session_timeout {
440 Some(t) => t,
441 None => return false,
442 };
443 let expiration_date = self.last_use + timeout;
444 expiration_date < Instant::now() + Duration::from_secs(60)
445 }
446}