mod action;
mod cluster_time;
mod pool;
#[cfg(test)]
mod test;
use std::{
collections::HashSet,
sync::Arc,
time::{Duration, Instant},
};
use std::sync::LazyLock;
use uuid::Uuid;
use crate::{
bson::{doc, spec::BinarySubtype, Binary, Bson, Document, Timestamp},
cmap::conn::PinnedConnectionHandle,
operation::Retryability,
options::{SessionOptions, TransactionOptions},
sdam::ServerInfo,
selection_criteria::SelectionCriteria,
Client,
};
pub use cluster_time::ClusterTime;
pub(super) use pool::ServerSessionPool;
use super::{options::ServerAddress, AsyncDropToken};
pub(crate) static SESSIONS_UNSUPPORTED_COMMANDS: LazyLock<HashSet<&'static str>> =
LazyLock::new(|| {
let mut hash_set = HashSet::new();
hash_set.insert("killcursors");
hash_set.insert("parallelcollectionscan");
hash_set
});
#[derive(Debug)]
pub struct ClientSession {
cluster_time: Option<ClusterTime>,
server_session: ServerSession,
client: Client,
is_implicit: bool,
options: Option<SessionOptions>,
drop_token: AsyncDropToken,
pub(crate) transaction: Transaction,
pub(crate) snapshot_time: Option<Timestamp>,
pub(crate) operation_time: Option<Timestamp>,
#[cfg(test)]
pub(crate) convenient_transaction_timeout: Option<Duration>,
#[cfg(test)]
pub(crate) convenient_transaction_jitter: Option<f64>,
}
#[derive(Debug)]
pub(crate) struct Transaction {
pub(crate) state: TransactionState,
pub(crate) options: Option<TransactionOptions>,
pub(crate) pinned: Option<TransactionPin>,
pub(crate) recovery_token: Option<Document>,
#[cfg(feature = "opentelemetry")]
pub(crate) otel_span: Option<crate::otel::TxnSpan>,
}
impl Transaction {
pub(crate) fn start(
&mut self,
options: Option<TransactionOptions>,
#[cfg(feature = "opentelemetry")] otel_span: crate::otel::TxnSpan,
) {
self.state = TransactionState::Starting;
self.options = options;
self.recovery_token = None;
#[cfg(feature = "opentelemetry")]
{
self.otel_span = Some(otel_span);
}
}
pub(crate) fn commit(&mut self, data_committed: bool) {
self.state = TransactionState::Committed { data_committed };
}
pub(crate) fn abort(&mut self) {
self.state = TransactionState::Aborted;
self.options = None;
self.pinned = None;
}
pub(crate) fn reset(&mut self) {
self.state = TransactionState::None;
self.options = None;
self.pinned = None;
self.recovery_token = None;
self.drop_span();
}
pub(crate) fn drop_span(&mut self) {
#[cfg(feature = "opentelemetry")]
{
self.otel_span = None;
}
}
#[cfg(test)]
pub(crate) fn is_pinned(&self) -> bool {
self.pinned.is_some()
}
pub(crate) fn pinned_mongos(&self) -> Option<&SelectionCriteria> {
match &self.pinned {
Some(TransactionPin::Mongos(s)) => Some(s),
_ => None,
}
}
pub(crate) fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
match &self.pinned {
Some(TransactionPin::Connection(c)) => Some(c),
_ => None,
}
}
fn take(&mut self) -> Self {
Transaction {
state: self.state.clone(),
options: self.options.take(),
pinned: self.pinned.take(),
recovery_token: self.recovery_token.take(),
#[cfg(feature = "opentelemetry")]
otel_span: self.otel_span.take(),
}
}
}
impl Default for Transaction {
fn default() -> Self {
Self {
state: TransactionState::None,
options: None,
pinned: None,
recovery_token: None,
#[cfg(feature = "opentelemetry")]
otel_span: None,
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub(crate) enum TransactionState {
None,
Starting,
InProgress,
Committed {
data_committed: bool,
},
Aborted,
}
#[derive(Debug)]
pub(crate) enum TransactionPin {
Mongos(SelectionCriteria),
Connection(PinnedConnectionHandle),
}
impl ClientSession {
pub(crate) async fn new(
client: Client,
options: Option<SessionOptions>,
is_implicit: bool,
) -> Self {
let timeout = client.inner.topology.watcher().logical_session_timeout();
let server_session = client.inner.session_pool.check_out(timeout).await;
let snapshot_time = options.as_ref().and_then(|o| o.snapshot_time);
Self {
drop_token: client.register_async_drop(),
client,
server_session,
cluster_time: None,
is_implicit,
options,
transaction: Default::default(),
snapshot_time,
operation_time: None,
#[cfg(test)]
convenient_transaction_timeout: None,
#[cfg(test)]
convenient_transaction_jitter: None,
}
}
pub fn client(&self) -> Client {
self.client.clone()
}
pub fn id(&self) -> &Document {
&self.server_session.id
}
pub(crate) fn is_implicit(&self) -> bool {
self.is_implicit
}
pub(crate) fn in_transaction(&self) -> bool {
matches!(
self.transaction.state,
TransactionState::Starting | TransactionState::InProgress
)
}
pub fn cluster_time(&self) -> Option<&ClusterTime> {
self.cluster_time.as_ref()
}
pub(crate) fn options(&self) -> Option<&SessionOptions> {
self.options.as_ref()
}
pub fn advance_cluster_time(&mut self, to: &ClusterTime) {
if self.cluster_time().map(|ct| ct < to).unwrap_or(true) {
self.cluster_time = Some(to.clone());
}
}
pub fn advance_operation_time(&mut self, ts: Timestamp) {
self.operation_time = match self.operation_time {
Some(current_op_time) if current_op_time < ts => Some(ts),
None => Some(ts),
_ => self.operation_time,
}
}
pub fn operation_time(&self) -> Option<Timestamp> {
self.operation_time
}
pub fn snapshot_time(&self) -> Option<Timestamp> {
self.snapshot_time
}
pub(crate) fn causal_consistency(&self) -> bool {
self.options()
.and_then(|opts| opts.causal_consistency)
.unwrap_or(!self.is_implicit())
}
pub(crate) fn mark_dirty(&mut self) {
self.server_session.dirty = true;
}
pub(crate) fn update_last_use(&mut self) {
self.server_session.last_use = Instant::now();
}
pub(crate) fn txn_number(&self) -> i64 {
self.server_session.txn_number
}
pub(crate) fn increment_txn_number(&mut self) {
self.server_session.txn_number += 1;
}
pub(crate) fn get_txn_number_for_operation(
&mut self,
retryability: Retryability,
) -> Option<i64> {
if self.transaction.state != TransactionState::None {
Some(self.txn_number())
} else if retryability == Retryability::Write {
self.increment_txn_number();
Some(self.txn_number())
} else {
None
}
}
pub(crate) fn pin_mongos(&mut self, address: ServerAddress) {
self.transaction.pinned = Some(TransactionPin::Mongos(SelectionCriteria::Predicate(
Arc::new(move |server_info: &ServerInfo| *server_info.address() == address),
)));
}
pub(crate) fn pin_connection(&mut self, handle: PinnedConnectionHandle) {
self.transaction.pinned = Some(TransactionPin::Connection(handle));
}
pub(crate) fn unpin(&mut self) {
self.transaction.pinned = None;
}
#[cfg(test)]
pub(crate) fn is_dirty(&self) -> bool {
self.server_session.dirty
}
fn default_transaction_options(&self) -> Option<&TransactionOptions> {
self.options
.as_ref()
.and_then(|options| options.default_transaction_options.as_ref())
}
}
struct DroppedClientSession {
cluster_time: Option<ClusterTime>,
server_session: ServerSession,
client: Client,
is_implicit: bool,
options: Option<SessionOptions>,
transaction: Transaction,
snapshot_time: Option<Timestamp>,
operation_time: Option<Timestamp>,
}
impl From<DroppedClientSession> for ClientSession {
fn from(dropped_session: DroppedClientSession) -> Self {
Self {
cluster_time: dropped_session.cluster_time,
server_session: dropped_session.server_session,
drop_token: dropped_session.client.register_async_drop(),
client: dropped_session.client,
is_implicit: dropped_session.is_implicit,
options: dropped_session.options,
transaction: dropped_session.transaction,
snapshot_time: dropped_session.snapshot_time,
operation_time: dropped_session.operation_time,
#[cfg(test)]
convenient_transaction_timeout: None,
#[cfg(test)]
convenient_transaction_jitter: None,
}
}
}
impl Drop for ClientSession {
fn drop(&mut self) {
if self.transaction.state == TransactionState::InProgress {
let dropped_session = DroppedClientSession {
cluster_time: self.cluster_time.clone(),
server_session: self.server_session.clone(),
client: self.client.clone(),
is_implicit: self.is_implicit,
options: self.options.clone(),
transaction: self.transaction.take(),
snapshot_time: self.snapshot_time,
operation_time: self.operation_time,
};
self.drop_token.spawn(async move {
let mut session: ClientSession = dropped_session.into();
let _result = session.abort_transaction().await;
});
} else {
let client = self.client.clone();
let server_session = self.server_session.clone();
self.drop_token.spawn(async move {
client.check_in_server_session(server_session).await;
});
}
}
}
#[derive(Clone, Debug)]
pub(crate) struct ServerSession {
pub(crate) id: Document,
last_use: std::time::Instant,
dirty: bool,
txn_number: i64,
}
impl ServerSession {
fn new() -> Self {
let binary = Bson::Binary(Binary {
subtype: BinarySubtype::Uuid,
bytes: Uuid::new_v4().as_bytes().to_vec(),
});
Self {
id: doc! { "id": binary },
last_use: Instant::now(),
dirty: false,
txn_number: 0,
}
}
fn is_about_to_expire(&self, logical_session_timeout: Option<Duration>) -> bool {
let timeout = match logical_session_timeout {
Some(t) => t,
None => return false,
};
let expiration_date = self.last_use + timeout;
expiration_date < Instant::now() + Duration::from_secs(60)
}
}