tower_sessions_auto_memory_store/
lib.rs

1use std::{collections::{HashMap, VecDeque}, sync::Arc, time::Duration};
2
3use async_trait::async_trait;
4use tokio::{sync::RwLock, task::AbortHandle};
5use tower_sessions::{cookie::time::OffsetDateTime, session::{Id, Record}, session_store, SessionStore};
6
7/// Why? The tower_session MemoryStore by default does not delete entries in time. Which may cause memory leak over long run.
8/// Java's auto session management has no memory leak issue. Rust Axum should not have that issue, too!
9/// 
10/// The `ExpiringMemoryStore` automatically deletes object that is expired
11/// The data is stored internally as Arc<RwLock<HashMap<Id, Record>>
12/// The expiry is tracked by the ascending VecDequeue using creation time
13/// 
14/// Upon create, a background tokio thread is immediately started to delete expired session automatically every 5 seconds.
15/// Upon dropping the `ExpiringMemoryStore`, the background tokio thread is automatically cancelled.
16
17/// #Example
18/// ```rust,no_run
19/// use axum::routing::get;
20/// use axum::Router;
21/// use std::net::SocketAddr;
22/// use tower_sessions::{cookie::time::{self, Duration, OffsetDateTime}, Expiry, Session, SessionManagerLayer};
23/// 
24/// #[tokio::main]
25/// async fn main() {
26///   let session_store = tower_sessions_auto_memory_store::ExpiringMemoryStore::new();
27///   let session_layer = SessionManagerLayer::new(session_store)
28///      .with_secure(false)
29///      .with_expiry(Expiry::OnInactivity(Duration::seconds(1800)));
30///   let app = Router::new()
31///      .route("/debug/test", get(root))
32///      .layer(session_layer);
33///    let addr = SocketAddr::from(([0, 0, 0, 0], 3000));
34///    println!("listening on {}", addr);
35///    axum_server::bind(addr)
36///      .serve(app.into_make_service())
37///      .await
38///      .unwrap();
39/// }
40/// 
41/// async fn root() -> &'static str {
42///   "Hello, World!"
43/// }
44/// ```
45#[derive(Debug, Clone)]
46pub struct ExpiringMemoryStore {
47    data: Arc<RwLock<HashMap<Id, Record>>>,
48    order: Arc<RwLock<VecDeque<(Id, OffsetDateTime)>>>,
49    handle: Option<AbortHandle>,
50}
51
52/// Upon dropping, the background thread will be killed.
53impl Drop for ExpiringMemoryStore {
54    fn drop(&mut self) {
55        let hdl = &self.handle;
56        match hdl {
57            None => {
58            },
59            Some(inner) => {
60                inner.abort();
61            }
62        }
63    }
64}
65
66impl ExpiringMemoryStore {
67    /// Create a new ExpiringMemoryStore
68    pub fn new() -> Self {
69        let data = Default::default();
70        let order = Default::default();
71        let mut result = ExpiringMemoryStore {
72            data: Arc::clone(&data),
73            handle: None,
74            order: Arc::clone(&order),
75        };
76        let handle = tokio::spawn( async move {
77            loop {
78                let mut session_map = data.write().await;
79                let mut order_list = order.write().await;
80                let mut to_delete = Vec::new();
81                loop {
82                    let front = order_list.front();
83                    if let Some((id, offset)) = front {
84                        if is_expired(*offset) {
85                            to_delete.push(id.clone());
86                            order_list.pop_front();
87                        } else {
88                            break;
89                        }
90                    } else {
91                        break;
92                    }
93                }
94                for id in &to_delete {
95                    let remove_result = session_map.remove(&id);
96                    if let Some(_) = remove_result {
97                    } else {
98                    }
99                }
100                //println!("Dropped {} sessions. Remaining {}:{}", to_delete.len(), order_list.len(), session_map.len());
101                drop(order_list);
102                drop(session_map);
103                tokio::time::sleep(Duration::from_millis(5000)).await;
104            }
105        });
106        let hdl1 = handle.abort_handle();
107        result.handle.replace(hdl1);
108        return result;
109    }
110}
111
112#[async_trait]
113impl SessionStore for ExpiringMemoryStore {
114    async fn save(&self, record: &Record) ->  session_store::Result<()> {
115        let mut store_guard = self.data.write().await;
116        store_guard.insert(record.id, record.clone());
117        Ok(())
118    }
119    async fn create(&self, record: &mut Record) -> session_store::Result<()> {
120        let mut store_guard = self.data.write().await;
121        while store_guard.contains_key(&record.id) {
122            // Session ID collision mitigation.
123            record.id = Id::default();
124        }
125        store_guard.insert(record.id, record.clone());
126        let mut order_guard = self.order.write().await;
127        order_guard.push_back((record.id.clone(), record.expiry_date.clone()));
128        drop(order_guard);
129        Ok(())
130    }
131    async fn load(&self,session_id: &Id) ->  session_store::Result<Option<Record>>{
132        Ok(self
133            .data
134            .read()
135            .await
136            .get(session_id)
137            .filter(|Record { expiry_date, .. }| is_active(*expiry_date))
138            .cloned())
139    }
140
141    async fn delete(&self,session_id: &Id) ->  session_store::Result<()> {
142        let mut w = self.data.write().await;
143        w.remove(session_id);
144        Ok(())
145    }
146}
147
148fn is_active(expiry_date: OffsetDateTime) -> bool {
149    !is_expired(expiry_date)
150}
151
152fn is_expired(expiry_date: OffsetDateTime) -> bool {
153    expiry_date + Duration::from_secs(60) < OffsetDateTime::now_utc()
154}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159    #[tokio::test]
160    async fn it_works() {
161        let store = ExpiringMemoryStore::new();
162        drop(store);
163        assert_eq!(1, 1)
164    }
165}