Skip to main content

mongodb/client/session/
pool.rs

1use std::{collections::VecDeque, time::Duration};
2
3use tokio::sync::Mutex;
4
5use super::ServerSession;
6use crate::bson::Document;
7
8#[derive(Debug)]
9pub(crate) struct ServerSessionPool {
10    pool: Mutex<VecDeque<ServerSession>>,
11}
12
13impl ServerSessionPool {
14    pub(crate) fn new() -> Self {
15        Self {
16            pool: Default::default(),
17        }
18    }
19
20    /// Checks out a server session from the pool. Before doing so, it first clears out all the
21    /// expired sessions. If there are no sessions left in the pool after clearing expired ones
22    /// out, a new session will be created.
23    pub(crate) async fn check_out(
24        &self,
25        logical_session_timeout: Option<Duration>,
26    ) -> ServerSession {
27        let mut pool = self.pool.lock().await;
28        while let Some(session) = pool.pop_front() {
29            // If a session is about to expire within the next minute, remove it from pool.
30            if session.is_about_to_expire(logical_session_timeout) {
31                continue;
32            }
33            return session;
34        }
35        ServerSession::new()
36    }
37
38    /// Checks in a server session to the pool. If it is about to expire or is dirty, it will be
39    /// discarded.
40    ///
41    /// This method will also clear out any expired session from the pool before checking in.
42    pub(crate) async fn check_in(
43        &self,
44        session: ServerSession,
45        logical_session_timeout: Option<Duration>,
46    ) {
47        let mut pool = self.pool.lock().await;
48        while let Some(pooled_session) = pool.pop_back() {
49            if session.is_about_to_expire(logical_session_timeout) {
50                continue;
51            }
52            pool.push_back(pooled_session);
53            break;
54        }
55
56        if !session.dirty && !session.is_about_to_expire(logical_session_timeout) {
57            pool.push_front(session);
58        }
59    }
60
61    #[cfg(test)]
62    pub(crate) async fn clear(&self) {
63        self.pool.lock().await.clear();
64    }
65
66    #[cfg(test)]
67    pub(crate) async fn contains(&self, id: &Document) -> bool {
68        self.pool.lock().await.iter().any(|s| &s.id == id)
69    }
70
71    /// Returns a list of the IDs of the sessions contained in the pool.
72    pub(crate) async fn get_session_ids(&self) -> Vec<Document> {
73        let sessions = self.pool.lock().await;
74        sessions.iter().map(|session| session.id.clone()).collect()
75    }
76}