mod cluster_time;
mod pool;
#[cfg(test)]
mod test;
use std::{
collections::HashSet,
time::{Duration, Instant},
};
use lazy_static::lazy_static;
use uuid::Uuid;
use crate::{
bson::{doc, spec::BinarySubtype, Binary, Bson, Document},
Client,
RUNTIME,
};
pub(crate) use cluster_time::ClusterTime;
pub(super) use pool::ServerSessionPool;
lazy_static! {
pub(crate) static ref SESSIONS_UNSUPPORTED_COMMANDS: HashSet<&'static str> = {
let mut hash_set = HashSet::new();
hash_set.insert("killcursors");
hash_set.insert("parallelcollectionscan");
hash_set
};
}
#[derive(Debug)]
pub(crate) struct ClientSession {
cluster_time: Option<ClusterTime>,
server_session: ServerSession,
client: Client,
is_implicit: bool,
}
impl ClientSession {
pub(crate) fn new_implicit(server_session: ServerSession, client: Client) -> Self {
Self {
client,
server_session,
cluster_time: None,
is_implicit: true,
}
}
pub(crate) fn id(&self) -> &Document {
&self.server_session.id
}
pub(crate) fn is_implicit(&self) -> bool {
self.is_implicit
}
pub(crate) fn cluster_time(&self) -> Option<&ClusterTime> {
self.cluster_time.as_ref()
}
pub(crate) fn advance_cluster_time(&mut self, to: &ClusterTime) {
if self.cluster_time().map(|ct| ct < to).unwrap_or(true) {
self.cluster_time = Some(to.clone());
}
}
pub(crate) fn mark_dirty(&mut self) {
self.server_session.dirty = true;
}
pub(crate) fn update_last_use(&mut self) {
self.server_session.last_use = Instant::now();
}
pub(crate) fn get_and_increment_txn_number(&mut self) -> u64 {
self.server_session.txn_number += 1;
self.server_session.txn_number
}
}
impl Drop for ClientSession {
fn drop(&mut self) {
let client = self.client.clone();
let server_session = ServerSession {
id: self.server_session.id.clone(),
last_use: self.server_session.last_use,
dirty: self.server_session.dirty,
txn_number: self.server_session.txn_number,
};
RUNTIME.execute(async move {
client.check_in_server_session(server_session).await;
})
}
}
#[derive(Debug)]
pub(crate) struct ServerSession {
id: Document,
last_use: std::time::Instant,
dirty: bool,
txn_number: u64,
}
impl ServerSession {
fn new() -> Self {
let binary = Bson::Binary(Binary {
subtype: BinarySubtype::Uuid,
bytes: Uuid::new_v4().as_bytes().to_vec(),
});
Self {
id: doc! { "id": binary },
last_use: Instant::now(),
dirty: false,
txn_number: 0,
}
}
fn is_about_to_expire(&self, logical_session_timeout: Duration) -> bool {
let expiration_date = self.last_use + logical_session_timeout;
expiration_date < Instant::now() + Duration::from_secs(60)
}
}