1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
use crate::{backend::SessionBackend, utils::now};
use std::time::Duration;
use tokio::{
    sync::mpsc::{channel, Receiver, Sender},
    time::interval,
};

/// Garbage collector for sessions
pub struct SessionCollector<B> {
    backend: B,
    period: Duration,
    lifetime: Duration,
    sender: Sender<()>,
    receiver: Receiver<()>,
}

impl<B> SessionCollector<B>
where
    B: SessionBackend,
{
    /// Creates a new collector
    ///
    /// GC allows to remove all sessions with age longer than given `lifetime`
    ///
    /// # Arguments
    ///
    /// * backend - Store backend
    /// * period - Interval between GC calls
    /// * lifetime - Minimum session lifetime
    pub fn new(backend: B, period: Duration, lifetime: Duration) -> Self {
        let (sender, receiver) = channel(1);
        Self {
            backend,
            period,
            lifetime,
            sender,
            receiver,
        }
    }

    /// Returns a session collector handle
    pub fn get_handle(&self) -> SessionCollectorHandle {
        SessionCollectorHandle {
            sender: self.sender.clone(),
        }
    }

    async fn collect(&mut self) -> Result<(), String> {
        let lifetime = self.lifetime.as_secs();
        let session_ids = self
            .backend
            .get_sessions()
            .await
            .map_err(|err| err.to_string())?;
        let timestamp = now().map_err(|err| err.to_string())?;
        for session_id in session_ids {
            if let Some(age) = self
                .backend
                .get_session_age(&session_id)
                .await
                .map_err(|err| err.to_string())?
            {
                if timestamp - age >= lifetime {
                    self.backend
                        .remove_session(&session_id)
                        .await
                        .map_err(|err| err.to_string())?;
                }
            }
        }
        Ok(())
    }

    /// Starts GC loop
    pub async fn run(&mut self) {
        let mut interval = interval(self.period);
        loop {
            if self.receiver.try_recv().is_ok() {
                self.receiver.close();
                break;
            }
            interval.tick().await;
            if let Err(err) = self.collect().await {
                log::error!("An error occurred in session GC: {}", err)
            }
        }
    }
}

/// GC handle
pub struct SessionCollectorHandle {
    sender: Sender<()>,
}

impl SessionCollectorHandle {
    /// Stop GC loop
    pub async fn shutdown(mut self) {
        let _ = self.sender.send(()).await;
    }
}