mod cluster_time;
mod pool;
#[cfg(test)]
mod test;
use std::{
collections::HashSet,
sync::Arc,
time::{Duration, Instant},
};
use lazy_static::lazy_static;
use uuid::Uuid;
use crate::{
bson::{doc, spec::BinarySubtype, Binary, Bson, Document, Timestamp},
error::{ErrorKind, Result},
operation::{AbortTransaction, CommitTransaction, Operation},
options::{SessionOptions, TransactionOptions},
sdam::{ServerInfo, TransactionSupportStatus},
selection_criteria::SelectionCriteria,
Client,
RUNTIME,
};
pub use cluster_time::ClusterTime;
pub(super) use pool::ServerSessionPool;
use super::options::ServerAddress;
lazy_static! {
pub(crate) static ref SESSIONS_UNSUPPORTED_COMMANDS: HashSet<&'static str> = {
let mut hash_set = HashSet::new();
hash_set.insert("killcursors");
hash_set.insert("parallelcollectionscan");
hash_set
};
}
#[derive(Debug)]
pub struct ClientSession {
cluster_time: Option<ClusterTime>,
server_session: ServerSession,
client: Client,
is_implicit: bool,
options: Option<SessionOptions>,
pub(crate) transaction: Transaction,
pub(crate) snapshot_time: Option<Timestamp>,
}
#[derive(Clone, Debug)]
pub(crate) struct Transaction {
pub(crate) state: TransactionState,
pub(crate) options: Option<TransactionOptions>,
pub(crate) pinned_mongos: Option<SelectionCriteria>,
pub(crate) recovery_token: Option<Document>,
}
impl Transaction {
pub(crate) fn start(&mut self, options: Option<TransactionOptions>) {
self.state = TransactionState::Starting;
self.options = options;
self.recovery_token = None;
}
pub(crate) fn commit(&mut self, data_committed: bool) {
self.state = TransactionState::Committed { data_committed };
}
pub(crate) fn abort(&mut self) {
self.state = TransactionState::Aborted;
self.options = None;
self.pinned_mongos = None;
}
pub(crate) fn reset(&mut self) {
self.state = TransactionState::None;
self.options = None;
self.pinned_mongos = None;
self.recovery_token = None;
}
}
impl Default for Transaction {
fn default() -> Self {
Self {
state: TransactionState::None,
options: None,
pinned_mongos: None,
recovery_token: None,
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub(crate) enum TransactionState {
None,
Starting,
InProgress,
Committed {
data_committed: bool,
},
Aborted,
}
impl ClientSession {
pub(crate) fn new(
server_session: ServerSession,
client: Client,
options: Option<SessionOptions>,
is_implicit: bool,
) -> Self {
Self {
client,
server_session,
cluster_time: None,
is_implicit,
options,
transaction: Default::default(),
snapshot_time: None,
}
}
pub fn client(&self) -> Client {
self.client.clone()
}
pub fn id(&self) -> &Document {
&self.server_session.id
}
pub(crate) fn is_implicit(&self) -> bool {
self.is_implicit
}
pub(crate) fn in_transaction(&self) -> bool {
self.transaction.state == TransactionState::Starting
|| self.transaction.state == TransactionState::InProgress
}
pub fn cluster_time(&self) -> Option<&ClusterTime> {
self.cluster_time.as_ref()
}
pub fn options(&self) -> Option<&SessionOptions> {
self.options.as_ref()
}
pub fn advance_cluster_time(&mut self, to: &ClusterTime) {
if self.cluster_time().map(|ct| ct < to).unwrap_or(true) {
self.cluster_time = Some(to.clone());
}
}
pub(crate) fn mark_dirty(&mut self) {
self.server_session.dirty = true;
}
pub(crate) fn update_last_use(&mut self) {
self.server_session.last_use = Instant::now();
}
pub(crate) fn txn_number(&self) -> i64 {
self.server_session.txn_number
}
pub(crate) fn increment_txn_number(&mut self) {
self.server_session.txn_number += 1;
}
pub(crate) fn get_and_increment_txn_number(&mut self) -> i64 {
self.increment_txn_number();
self.server_session.txn_number
}
pub(crate) fn pin_mongos(&mut self, address: ServerAddress) {
self.transaction.pinned_mongos = Some(SelectionCriteria::Predicate(Arc::new(
move |server_info: &ServerInfo| *server_info.address() == address,
)));
}
pub(crate) fn unpin_mongos(&mut self) {
self.transaction.pinned_mongos = None;
}
#[cfg(test)]
pub(crate) fn is_dirty(&self) -> bool {
self.server_session.dirty
}
pub async fn start_transaction(
&mut self,
options: impl Into<Option<TransactionOptions>>,
) -> Result<()> {
if self
.options
.as_ref()
.and_then(|o| o.snapshot)
.unwrap_or(false)
{
return Err(ErrorKind::Transaction {
message: "Transactions are not supported in snapshot sessions".into(),
}
.into());
}
match self.transaction.state {
TransactionState::Starting | TransactionState::InProgress => {
return Err(ErrorKind::Transaction {
message: "transaction already in progress".into(),
}
.into());
}
TransactionState::Committed { .. } => {
self.unpin_mongos(); }
_ => {}
}
match self.client.transaction_support_status().await? {
TransactionSupportStatus::Supported => {
let mut options = match options.into() {
Some(mut options) => {
if let Some(defaults) = self.default_transaction_options() {
merge_options!(
defaults,
&mut options,
[
read_concern,
write_concern,
selection_criteria,
max_commit_time
]
);
}
Some(options)
}
None => self.default_transaction_options().cloned(),
};
resolve_options!(
self.client,
options,
[read_concern, write_concern, selection_criteria]
);
if let Some(ref options) = options {
if !options
.write_concern
.as_ref()
.map(|wc| wc.is_acknowledged())
.unwrap_or(true)
{
return Err(ErrorKind::Transaction {
message: "transactions do not support unacknowledged write concerns"
.into(),
}
.into());
}
}
self.increment_txn_number();
self.transaction.start(options);
Ok(())
}
_ => Err(ErrorKind::Transaction {
message: "Transactions are not supported by this deployment".into(),
}
.into()),
}
}
pub async fn commit_transaction(&mut self) -> Result<()> {
match &mut self.transaction.state {
TransactionState::None => Err(ErrorKind::Transaction {
message: "no transaction started".into(),
}
.into()),
TransactionState::Aborted => Err(ErrorKind::Transaction {
message: "Cannot call commitTransaction after calling abortTransaction".into(),
}
.into()),
TransactionState::Starting => {
self.transaction.commit(false);
Ok(())
}
TransactionState::InProgress => {
let commit_transaction = CommitTransaction::new(self.transaction.options.clone());
self.transaction.commit(true);
self.client
.clone()
.execute_operation(commit_transaction, self)
.await
}
TransactionState::Committed {
data_committed: true,
} => {
let mut commit_transaction =
CommitTransaction::new(self.transaction.options.clone());
commit_transaction.update_for_retry();
self.client
.clone()
.execute_operation(commit_transaction, self)
.await
}
TransactionState::Committed {
data_committed: false,
} => Ok(()),
}
}
pub async fn abort_transaction(&mut self) -> Result<()> {
match self.transaction.state {
TransactionState::None => Err(ErrorKind::Transaction {
message: "no transaction started".into(),
}
.into()),
TransactionState::Committed { .. } => Err(ErrorKind::Transaction {
message: "Cannot call abortTransaction after calling commitTransaction".into(),
}
.into()),
TransactionState::Aborted => Err(ErrorKind::Transaction {
message: "cannot call abortTransaction twice".into(),
}
.into()),
TransactionState::Starting => {
self.transaction.abort();
Ok(())
}
TransactionState::InProgress => {
let write_concern = self
.transaction
.options
.as_ref()
.and_then(|options| options.write_concern.as_ref())
.cloned();
let selection_criteria = self.transaction.pinned_mongos.clone();
let abort_transaction = AbortTransaction::new(write_concern, selection_criteria);
self.transaction.abort();
let _result = self
.client
.clone()
.execute_operation(abort_transaction, &mut *self)
.await;
Ok(())
}
}
}
fn default_transaction_options(&self) -> Option<&TransactionOptions> {
self.options
.as_ref()
.and_then(|options| options.default_transaction_options.as_ref())
}
}
struct DroppedClientSession {
cluster_time: Option<ClusterTime>,
server_session: ServerSession,
client: Client,
is_implicit: bool,
options: Option<SessionOptions>,
transaction: Transaction,
snapshot_time: Option<Timestamp>,
}
impl From<DroppedClientSession> for ClientSession {
fn from(dropped_session: DroppedClientSession) -> Self {
Self {
cluster_time: dropped_session.cluster_time,
server_session: dropped_session.server_session,
client: dropped_session.client,
is_implicit: dropped_session.is_implicit,
options: dropped_session.options,
transaction: dropped_session.transaction,
snapshot_time: dropped_session.snapshot_time,
}
}
}
impl Drop for ClientSession {
fn drop(&mut self) {
if self.transaction.state == TransactionState::InProgress {
let dropped_session = DroppedClientSession {
cluster_time: self.cluster_time.clone(),
server_session: self.server_session.clone(),
client: self.client.clone(),
is_implicit: self.is_implicit,
options: self.options.clone(),
transaction: self.transaction.clone(),
snapshot_time: self.snapshot_time,
};
RUNTIME.execute(async move {
let mut session: ClientSession = dropped_session.into();
let _result = session.abort_transaction().await;
});
} else {
let client = self.client.clone();
let server_session = self.server_session.clone();
RUNTIME.execute(async move {
client.check_in_server_session(server_session).await;
});
}
}
}
#[derive(Clone, Debug)]
pub(crate) struct ServerSession {
id: Document,
last_use: std::time::Instant,
dirty: bool,
txn_number: i64,
}
impl ServerSession {
fn new() -> Self {
let binary = Bson::Binary(Binary {
subtype: BinarySubtype::Uuid,
bytes: Uuid::new_v4().as_bytes().to_vec(),
});
Self {
id: doc! { "id": binary },
last_use: Instant::now(),
dirty: false,
txn_number: 0,
}
}
fn is_about_to_expire(&self, logical_session_timeout: Option<Duration>) -> bool {
let timeout = match logical_session_timeout {
Some(t) => t,
None => return false,
};
let expiration_date = self.last_use + timeout;
expiration_date < Instant::now() + Duration::from_secs(60)
}
}