hypen_server/remote/
session_manager.rs1use std::collections::{HashMap, HashSet};
2use std::sync::{Arc, Mutex};
3use std::time::{Duration, Instant};
4
5#[derive(Debug, Clone)]
7pub struct SessionInfo {
8 pub id: String,
9 pub created_at: Instant,
10 pub last_connected_at: Instant,
11 pub props: HashMap<String, serde_json::Value>,
12}
13
14pub struct SessionManagerConfig {
16 pub ttl: Duration,
18 pub generate_id: Option<Box<dyn Fn() -> String + Send + Sync>>,
20}
21
22impl Default for SessionManagerConfig {
23 fn default() -> Self {
24 Self {
25 ttl: Duration::from_secs(3600),
26 generate_id: None,
27 }
28 }
29}
30
31pub struct PendingSession {
33 pub info: SessionInfo,
34 pub saved_state: serde_json::Value,
35 cancel: Arc<Mutex<bool>>,
36}
37
38pub struct SessionManager {
71 ttl: Duration,
72 generate_id: Box<dyn Fn() -> String + Send + Sync>,
73 inner: Mutex<Inner>,
74}
75
76struct Inner {
77 active: HashMap<String, SessionInfo>,
78 pending: HashMap<String, PendingEntry>,
79 connections: HashMap<String, HashSet<u64>>,
80}
81
82struct PendingEntry {
83 info: SessionInfo,
84 saved_state: serde_json::Value,
85 cancel: Arc<Mutex<bool>>,
86}
87
88fn default_generate_id() -> String {
89 use std::sync::atomic::{AtomicU64, Ordering};
90 use std::time::SystemTime;
91 static COUNTER: AtomicU64 = AtomicU64::new(0);
92 let ns = SystemTime::now()
93 .duration_since(SystemTime::UNIX_EPOCH)
94 .unwrap_or_default()
95 .as_nanos() as u64;
96 let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
97 format!("{:016x}{:04x}", ns, seq & 0xFFFF)
98}
99
100impl SessionManager {
101 pub fn new(config: SessionManagerConfig) -> Self {
103 let generate_id = config
104 .generate_id
105 .unwrap_or_else(|| Box::new(default_generate_id));
106 Self {
107 ttl: config.ttl,
108 generate_id,
109 inner: Mutex::new(Inner {
110 active: HashMap::new(),
111 pending: HashMap::new(),
112 connections: HashMap::new(),
113 }),
114 }
115 }
116
117 pub fn create_session(
119 &self,
120 props: HashMap<String, serde_json::Value>,
121 ) -> SessionInfo {
122 let mut inner = self.inner.lock().unwrap();
123 let mut id = (self.generate_id)();
124 for _ in 0..10 {
125 if !inner.active.contains_key(&id) && !inner.pending.contains_key(&id) {
126 break;
127 }
128 id = (self.generate_id)();
129 }
130 let now = Instant::now();
131 let info = SessionInfo {
132 id: id.clone(),
133 created_at: now,
134 last_connected_at: now,
135 props,
136 };
137 inner.active.insert(id, info.clone());
138 info
139 }
140
141 pub fn get_active_session(&self, id: &str) -> Option<SessionInfo> {
143 self.inner.lock().unwrap().active.get(id).cloned()
144 }
145
146 pub fn suspend_session<F>(&self, id: &str, saved_state: serde_json::Value, on_expire: F) -> bool
155 where
156 F: FnOnce() + Send + 'static,
157 {
158 let mut inner = self.inner.lock().unwrap();
159 let info = match inner.active.remove(id) {
160 Some(s) => s,
161 None => return false,
162 };
163
164 let cancel = Arc::new(Mutex::new(false));
165 let entry = PendingEntry {
166 info: info.clone(),
167 saved_state,
168 cancel: Arc::clone(&cancel),
169 };
170 inner.pending.insert(id.to_string(), entry);
171 drop(inner);
172
173 let ttl = self.ttl;
176 let id_owned = id.to_string();
177 let inner_ref = &self.inner as *const Mutex<Inner>;
178 let inner_ptr = inner_ref as usize;
184 std::thread::spawn(move || {
185 std::thread::sleep(ttl);
186 if *cancel.lock().unwrap() {
187 return;
188 }
189 let inner: &Mutex<Inner> = unsafe { &*(inner_ptr as *const Mutex<Inner>) };
195 let mut guard = inner.lock().unwrap();
196 if guard.pending.remove(&id_owned).is_some() {
197 drop(guard);
198 on_expire();
199 }
200 });
201
202 true
203 }
204
205 pub fn resume_session(&self, id: &str) -> Option<PendingSession> {
208 let mut inner = self.inner.lock().unwrap();
209 let entry = inner.pending.remove(id)?;
210 *entry.cancel.lock().unwrap() = true;
211 let mut info = entry.info;
212 info.last_connected_at = Instant::now();
213 inner.active.insert(id.to_string(), info.clone());
214 Some(PendingSession {
215 info,
216 saved_state: entry.saved_state,
217 cancel: entry.cancel,
218 })
219 }
220
221 pub fn destroy_session(&self, id: &str) {
223 let mut inner = self.inner.lock().unwrap();
224 inner.active.remove(id);
225 if let Some(entry) = inner.pending.remove(id) {
226 *entry.cancel.lock().unwrap() = true;
227 }
228 inner.connections.remove(id);
229 }
230
231 pub fn track_connection(&self, session_id: &str, conn_id: u64) {
234 let mut inner = self.inner.lock().unwrap();
235 inner
236 .connections
237 .entry(session_id.to_string())
238 .or_default()
239 .insert(conn_id);
240 }
241
242 pub fn untrack_connection(&self, session_id: &str, conn_id: u64) {
244 let mut inner = self.inner.lock().unwrap();
245 if let Some(conns) = inner.connections.get_mut(session_id) {
246 conns.remove(&conn_id);
247 }
248 }
249
250 pub fn connection_count(&self, session_id: &str) -> usize {
252 self.inner
253 .lock()
254 .unwrap()
255 .connections
256 .get(session_id)
257 .map(|c| c.len())
258 .unwrap_or(0)
259 }
260
261 pub fn shutdown(&self) {
263 let mut inner = self.inner.lock().unwrap();
264 for (_, entry) in inner.pending.drain() {
265 *entry.cancel.lock().unwrap() = true;
266 }
267 inner.active.clear();
268 inner.connections.clear();
269 }
270}