immortal_http/
session.rs

1use std::collections::HashMap;
2use std::sync::atomic::AtomicBool;
3use std::time::{Instant, Duration};
4use std::sync::Arc;
5use std::sync::atomic::Ordering::Relaxed;
6
7use atomic_time::{AtomicDuration, AtomicInstant};
8use dashmap::DashMap;
9use debug_print::debug_eprintln;
10use uuid::Uuid;
11
12#[cfg(feature = "threading")]
13use rayon::prelude::*;
14
15use crate::cookie::Cookie;
16
17pub struct Session {
18    data: HashMap<String, String>,
19    created: Instant,
20    last_mutated: Instant,
21    last_accessed: Instant,
22}
23
24impl Session {
25    fn new() -> Session {
26        let now = Instant::now();
27        Session {
28            data: HashMap::new(),
29            created: now,
30            last_mutated: now,
31            last_accessed: now,
32        }
33    }
34}
35
36pub fn session_prune_task(
37    session_manager: Arc<SessionManager>,
38    stop: Arc<AtomicBool>,
39) {
40    loop {
41        if stop.load(std::sync::atomic::Ordering::Relaxed) {
42            return;
43        }
44        session_manager.prune();
45        std::thread::sleep(Duration::from_secs(2));
46    }
47}
48
49/// provides APIs to interact with user session stores.
50pub struct SessionManager {
51    is_enabled: AtomicBool,
52    store: DashMap<Uuid, Session>,
53    /// The maximum duration that a session may be allowed to persist for
54    /// regardless of inactivity.
55    session_duration: AtomicDuration,
56    /// The duration a session will persist for if inactive.
57    inactive_duration: AtomicDuration,
58    /// How often the session store is pruned
59    prune_rate: AtomicDuration,
60    /// When the list was last pruned
61    last_prune: AtomicInstant,
62}
63
64impl Default for SessionManager {
65    fn default() -> Self {
66        SessionManager::new(
67            Duration::from_secs(12 * 3600), // 12 hours session duration
68            Duration::from_secs(3600),      //  1 hour  inactive duration
69            Duration::from_secs(60)         //  1 min   prune rate
70        )
71    }
72}
73
74impl SessionManager {
75    pub fn new(
76        session_duration: Duration,
77        inactive_duration: Duration,
78        prune_rate: Duration,
79    ) -> SessionManager {
80        SessionManager {
81            is_enabled: AtomicBool::new(false),
82            store: DashMap::new(),
83            session_duration: AtomicDuration::new(session_duration),
84            inactive_duration: AtomicDuration::new(inactive_duration),
85            prune_rate: AtomicDuration::new(prune_rate),
86            last_prune: AtomicInstant::now(),
87        }
88    }
89
90    /// returns true if sessions are enabled
91    pub fn is_enabled(&self) -> bool {
92        self.is_enabled.load(Relaxed)
93    }
94
95    /// enables sessions
96    pub fn enable(&self) {
97        self.is_enabled.store(true, Relaxed);
98    }
99
100    /// disables sessions and clears all existing sessions
101    pub fn disable(&self) {
102        self.is_enabled.store(false, Relaxed);
103        self.store.clear();
104    }
105
106    /// sets the maximum duration that a session may be allowed to persist for 
107    /// regardless of inactivity
108    pub fn set_session_duration(&self, duration: Duration) {
109        self.session_duration.store(duration, Relaxed);
110    }
111
112    /// sets the expiry duration for sessions
113    pub fn set_inactive_duration(&self, duration: Duration) {
114        self.inactive_duration.store(duration, Relaxed);
115    }
116
117    /// sets the prune rate for sessions, will attempt to prune old sessions every `duration`
118    pub fn set_prune_rate(&self, duration: Duration) {
119        self.prune_rate.store(duration, Relaxed);
120    }
121
122    /// generates a new session id without storing a session
123    pub fn generate_id() -> Uuid {
124        uuid::Uuid::new_v4()
125    }
126
127    /// generates a new session and returns the ID
128    pub fn create_session(&self) -> Uuid {
129        if !self.is_enabled() {
130            return Uuid::nil();
131        }
132        let mut out;
133        loop {
134            out = Self::generate_id();
135            if !self.store.contains_key(&out) {
136                break;
137            }
138        }
139        self.store.insert(out, Session::new());
140
141        #[cfg(not(feature = "threading"))]
142        self.prune();
143        out
144    }
145
146    /// writes `value` to the key-value session store as `key` for the `session_id` session store.
147    pub fn write_session(&self, session_id: Uuid, key: &str, value: &str) -> bool {
148        if !self.is_enabled() {
149            return false;
150        }
151        if session_id.is_nil() {
152            #[cfg(not(feature = "threading"))]
153            self.prune();
154            return false;
155        }
156        if let Some(mut session) = self.store.get_mut(&session_id) {
157            let now = Instant::now();
158            session.last_mutated = now;
159            session.last_accessed = now;
160            if value.is_empty() {
161                session.data.remove(key);
162                session.data.shrink_to_fit();
163            } else {
164                session.data.insert(key.to_owned(), value.to_owned());
165            }
166            return true;
167        }
168        #[cfg(not(feature = "threading"))]
169        self.prune();
170        false
171    }
172
173    /// reads the value associated with `key` for the `session_id` session store.
174    pub fn read_session(&self, session_id: Uuid, key: &str) -> Option<String> {
175        if !self.is_enabled() {
176            return None;
177        }
178        if session_id.is_nil() {
179            return None;
180        }
181        if let Some(mut session) = self.store.get_mut(&session_id) {
182            session.last_accessed = Instant::now();
183            if let Some(value) = session.data.get(key) {
184                return Some(value.to_owned());
185            }
186        }
187        None
188    }
189
190    /// empties the session store for `session_id`
191    pub fn clear_session(&self, session_id: Uuid) {
192        if !self.is_enabled() {
193            return;
194        }
195        if session_id.is_nil() {
196            #[cfg(not(feature = "threading"))]
197            self.prune();
198            return;
199        }
200        if let Some(mut session) = self.store.get_mut(&session_id) {
201            let now = Instant::now();
202            session.last_mutated = now;
203            session.last_accessed = now;
204            session.data.clear();
205            session.data.shrink_to_fit();
206        } else {
207            #[cfg(not(feature = "threading"))]
208            self.prune();
209        }
210    }
211
212    /// removes the session store for `session_id`
213    pub fn delete_session(&self, session_id: Uuid) {
214        if !self.is_enabled() {
215            return;
216        }
217        if session_id.is_nil() {
218            return;
219        }
220        self.store.remove(&session_id);
221        #[cfg(not(feature = "threading"))]
222        self.prune();
223    }
224
225    /// checks if a session store for `session_id` exists
226    pub fn session_exists(&self, session_id: Uuid) -> bool {
227        if !self.is_enabled() {
228            return false;
229        }
230        if session_id.is_nil() {
231            return false;
232        }
233        self.store.contains_key(&session_id)
234    }
235
236    /// Accepts a session id, creates a session with it if the ID is not already for an existing
237    /// session.
238    /// Returns false if the session id was not good or the store already contains the ID
239    pub fn add_session(&self, session_id: Uuid) -> bool {
240        if !self.is_enabled() {
241            return false;
242        }
243        if session_id.is_nil() {
244            #[cfg(not(feature = "threading"))]
245            self.prune();
246            return false;
247        }
248        if !self.store.contains_key(&session_id) {
249            let session = Session::new();
250            self.store.insert(session_id, session);
251            return true;
252        }
253        #[cfg(not(feature = "threading"))]
254        self.prune();
255        false
256    }
257
258    /// tries to get an existing session 
259    /// if a session does not exist, a session is created.
260    /// The returned tuple contains the session id and a boolean, the boolean is true if the
261    /// created session is new, if it is false, the session id is for an existing session.
262    pub fn get_or_create_session(&self, cookies: &HashMap<String, Cookie>) -> Option<(Uuid, bool)>{
263        if !self.is_enabled() {
264            return None;
265        }
266        let mut session_id = Uuid::nil();
267
268        let mut is_new_session = false;
269        if cookies.contains_key("id") {
270            session_id = cookies.get("id")
271                .and_then(|id| id.value.parse::<Uuid>().ok())
272                .unwrap_or(Uuid::nil());
273            if !self.session_exists(session_id) {
274                session_id = self.create_session();
275                is_new_session = true;
276            }
277        } else if !self.session_exists(session_id) {
278            session_id = self.create_session();
279            is_new_session = true;
280        }
281        if !session_id.is_nil() {
282            Some((session_id, is_new_session))
283        } else {
284            #[cfg(not(feature = "threading"))]
285            self.prune();
286            None
287        }
288    }
289
290    pub fn prune(&self) {
291        #[cfg(not(feature = "threading"))]
292        if !self.is_enabled() {
293            return;
294        }
295
296        if self.last_prune.load(Relaxed).elapsed() < self.prune_rate.load(Relaxed) { return; }
297        self.last_prune.store(Instant::now(), Relaxed);
298
299        #[cfg(feature = "threading")]
300        {
301            let to_remove = self.store.par_iter()
302                .filter(|pair| (pair.last_accessed.elapsed() >= self.inactive_duration.load(Relaxed)) 
303                            || (pair.created.elapsed()       >=  self.session_duration.load(Relaxed)))
304                .map(|pair| *pair.pair().0)
305                .collect::<Vec<Uuid>>();
306            let _total = self.store.len();
307            to_remove.par_iter().for_each(|id| { self.store.remove(id); });
308            debug_eprintln!("Pruned {}/{} sessions.", to_remove.len(), _total);
309        }
310
311        #[cfg(not(feature = "threading"))]
312        {
313            let mut to_remove = Vec::new();
314            for session in self.store.iter() {
315                if (session.value().last_accessed.elapsed() >= self.inactive_duration.load(Relaxed)) 
316                            || (session.created.elapsed() >=  self.session_duration.load(Relaxed)) {
317                    to_remove.push(*session.key());
318                }
319            }
320            let _total = self.store.len();
321            for id in &to_remove {
322                self.store.remove(id);
323            }
324            debug_eprintln!("Pruned {}/{} sessions.", to_remove.len(), _total);
325        }
326        self.store.shrink_to_fit();
327    }
328}
329