1use futures::{future, pin_mut, select, FutureExt};
2use std::collections::{hash_map::Entry, HashMap, HashSet};
3use std::iter::Iterator;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6use tokio::sync::{mpsc, Mutex};
7use tokio::time;
8
9use crate::page::Page;
10
11#[derive(Debug, Clone)]
14pub struct Config {
15 pub heartbeat_interval: Duration,
17 pub keep_alive_duration: Duration,
21 pub default_buffer_len: usize,
25}
26
27pub struct Session {
31 touch_path: mpsc::UnboundedSender<String>,
32 active_paths: Arc<Mutex<HashSet<String>>>,
33 pages: Arc<Mutex<PageMap>>,
34 default_buffer_len: usize,
35}
36
37type PageMap = HashMap<String, (Instant, Arc<Page>)>;
39
40impl Session {
41 pub async fn start(
44 Config {
45 heartbeat_interval,
46 keep_alive_duration,
47 default_buffer_len,
48 }: Config,
49 ) -> Session {
50 let (touch_path, recv_path) = mpsc::unbounded_channel();
51 let session = Session {
52 touch_path,
53 active_paths: Arc::new(Mutex::new(HashSet::new())),
54 pages: Arc::new(Mutex::new(HashMap::new())),
55 default_buffer_len,
56 };
57 let heartbeat = heartbeat_loop(
58 heartbeat_interval,
59 keep_alive_duration,
60 recv_path,
61 session.active_paths.clone(),
62 session.pages.clone(),
63 );
64 tokio::spawn(heartbeat);
65 session
66 }
67
68 pub async fn page(&self, path: &str) -> Arc<Page> {
70 let page = match self.pages.lock().await.entry(path.to_string()) {
71 Entry::Vacant(e) => {
72 let page = Arc::new(Page::new(self.default_buffer_len));
73 e.insert((Instant::now(), page.clone()));
74 page
75 }
76 Entry::Occupied(mut e) => {
77 let (last_access, page) = e.get_mut();
78 *last_access = Instant::now();
79 page.clone()
80 }
81 };
82
83 self.touch_path.send(path.to_string()).unwrap_or(());
85
86 page
87 }
88}
89
90async fn heartbeat_loop(
93 interval: Duration,
94 keep_alive: Duration,
95 mut recv_path: mpsc::UnboundedReceiver<String>,
96 active_paths: Arc<Mutex<HashSet<String>>>,
97 pages: Arc<Mutex<PageMap>>,
98) {
99 let recv_paths = async {
101 while let Some(path) = recv_path.recv().await {
102 active_paths.lock().await.insert(path);
103 }
104 }
105 .fuse();
106
107 let heartbeat = async {
111 loop {
112 time::delay_for(interval).await;
114
115 let mut paths = active_paths.lock().await;
118 let pruned = Arc::new(Mutex::new(Vec::new()));
119 future::join_all(paths.iter().map(|path| {
120 let pruned = pruned.clone();
121 let pages = pages.clone();
122 async move {
123 let mut pages = pages.lock().await;
124 if let Some((path, (last_access, page))) = pages.remove_entry(path) {
125 if last_access.elapsed() < keep_alive || !page.is_empty().await {
126 pages.insert(path, (last_access, page));
127 } else {
128 pruned.lock().await.push(path);
129 }
130 }
131 }
132 }))
133 .await;
134
135 for path in pruned.lock().await.iter() {
137 paths.remove(path);
138 }
139
140 paths.shrink_to_fit();
142 pages.lock().await.shrink_to_fit();
143 }
144 }
145 .fuse();
146
147 pin_mut!(recv_paths, heartbeat);
149 select! {
150 () = recv_paths => (),
151 () = heartbeat => (),
152 }
153}