myxine_core/
session.rs

1use futures::{future, pin_mut, select, FutureExt};
2use std::collections::{hash_map::Entry, HashMap, HashSet};
3use std::iter::Iterator;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6use tokio::sync::{mpsc, Mutex};
7use tokio::time;
8
9use crate::page::Page;
10
11/// The configuration for a session, describing the behavior of the heartbeat /
12/// page garbage collector, and the default parameters for each new page.
13#[derive(Debug, Clone)]
14pub struct Config {
15    /// The delay between successive heartbeats to all pages.
16    pub heartbeat_interval: Duration,
17    /// The minimum time a page must remain untouched by any external method
18    /// (including events coming in from the page itself) in order to be
19    /// eligible for garbage collection.
20    pub keep_alive_duration: Duration,
21    /// The default size of the event buffer for each page. The larger this is,
22    /// the more memory a page will consume, but clients will be able to lag
23    /// by more events without dropping them.
24    pub default_buffer_len: usize,
25}
26
27/// A collection of `Page`s, uniquely keyed by a `String` path, which are
28/// periodically pruned by a garbage collector thread to remove inactive and
29/// empty pages from the pool.
30pub struct Session {
31    touch_path: mpsc::UnboundedSender<String>,
32    active_paths: Arc<Mutex<HashSet<String>>>,
33    pages: Arc<Mutex<PageMap>>,
34    default_buffer_len: usize,
35}
36
37/// The map from paths to pages tagged with last-touched instants.
38type PageMap = HashMap<String, (Instant, Arc<Page>)>;
39
40impl Session {
41    /// Create a new session, starting a thread to maintain heartbeats to any
42    /// Pages created in this session.
43    pub async fn start(
44        Config {
45            heartbeat_interval,
46            keep_alive_duration,
47            default_buffer_len,
48        }: Config,
49    ) -> Session {
50        let (touch_path, recv_path) = mpsc::unbounded_channel();
51        let session = Session {
52            touch_path,
53            active_paths: Arc::new(Mutex::new(HashSet::new())),
54            pages: Arc::new(Mutex::new(HashMap::new())),
55            default_buffer_len,
56        };
57        let heartbeat = heartbeat_loop(
58            heartbeat_interval,
59            keep_alive_duration,
60            recv_path,
61            session.active_paths.clone(),
62            session.pages.clone(),
63        );
64        tokio::spawn(heartbeat);
65        session
66    }
67
68    /// Retrieve or create a page at this path.
69    pub async fn page(&self, path: &str) -> Arc<Page> {
70        let page = match self.pages.lock().await.entry(path.to_string()) {
71            Entry::Vacant(e) => {
72                let page = Arc::new(Page::new(self.default_buffer_len));
73                e.insert((Instant::now(), page.clone()));
74                page
75            }
76            Entry::Occupied(mut e) => {
77                let (last_access, page) = e.get_mut();
78                *last_access = Instant::now();
79                page.clone()
80            }
81        };
82
83        // Make sure to send heartbeats to this page now
84        self.touch_path.send(path.to_string()).unwrap_or(());
85
86        page
87    }
88}
89
90/// Send a heartbeat message to keep all page connections alive, simultaneously
91/// pruning all pages from memory which have no content and no subscribers.
92async fn heartbeat_loop(
93    interval: Duration,
94    keep_alive: Duration,
95    mut recv_path: mpsc::UnboundedReceiver<String>,
96    active_paths: Arc<Mutex<HashSet<String>>>,
97    pages: Arc<Mutex<PageMap>>,
98) {
99    // Receive all new paths into the set of known active paths
100    let recv_paths = async {
101        while let Some(path) = recv_path.recv().await {
102            active_paths.lock().await.insert(path);
103        }
104    }
105    .fuse();
106
107    // At the specified `HEARTBEAT_INTERVAL`, traverse all active paths, sending
108    // heartbeats to all pages, and removing all pages which are identical to
109    // the initial dynamic page (to free up memory).
110    let heartbeat = async {
111        loop {
112            // Wait for next heartbeat interval...
113            time::delay_for(interval).await;
114
115            // Lock the active set of paths and send a heartbeat to each one,
116            // noting which paths are identical to the empty page
117            let mut paths = active_paths.lock().await;
118            let pruned = Arc::new(Mutex::new(Vec::new()));
119            future::join_all(paths.iter().map(|path| {
120                let pruned = pruned.clone();
121                let pages = pages.clone();
122                async move {
123                    let mut pages = pages.lock().await;
124                    if let Some((path, (last_access, page))) = pages.remove_entry(path) {
125                        if last_access.elapsed() < keep_alive || !page.is_empty().await {
126                            pages.insert(path, (last_access, page));
127                        } else {
128                            pruned.lock().await.push(path);
129                        }
130                    }
131                }
132            }))
133            .await;
134
135            // Remove all paths that are identical to the empty page
136            for path in pruned.lock().await.iter() {
137                paths.remove(path);
138            }
139
140            // Free memory for all the removed pages and paths
141            paths.shrink_to_fit();
142            pages.lock().await.shrink_to_fit();
143        }
144    }
145    .fuse();
146
147    // Run them both concurrently, quit when session is dropped
148    pin_mut!(recv_paths, heartbeat);
149    select! {
150        () = recv_paths => (),
151        () = heartbeat => (),
152    }
153}