hypen_server/remote/
session_manager.rs1use std::collections::{HashMap, HashSet};
2use std::sync::{Arc, Mutex};
3use std::time::{Duration, Instant};
4
5#[derive(Debug, Clone)]
7pub struct SessionInfo {
8 pub id: String,
9 pub created_at: Instant,
10 pub last_connected_at: Instant,
11 pub props: HashMap<String, serde_json::Value>,
12}
13
14pub struct SessionManagerConfig {
16 pub ttl: Duration,
18 pub generate_id: Option<Box<dyn Fn() -> String + Send + Sync>>,
20}
21
22impl Default for SessionManagerConfig {
23 fn default() -> Self {
24 Self {
25 ttl: Duration::from_secs(3600),
26 generate_id: None,
27 }
28 }
29}
30
31pub struct PendingSession {
33 pub info: SessionInfo,
34 pub saved_state: serde_json::Value,
35 cancel: Arc<Mutex<bool>>,
36}
37
38pub struct SessionManager {
71 ttl: Duration,
72 generate_id: Box<dyn Fn() -> String + Send + Sync>,
73 inner: Mutex<Inner>,
74}
75
76struct Inner {
77 active: HashMap<String, SessionInfo>,
78 pending: HashMap<String, PendingEntry>,
79 connections: HashMap<String, HashSet<u64>>,
80}
81
82struct PendingEntry {
83 info: SessionInfo,
84 saved_state: serde_json::Value,
85 cancel: Arc<Mutex<bool>>,
86}
87
88fn default_generate_id() -> String {
89 use std::sync::atomic::{AtomicU64, Ordering};
90 use std::time::SystemTime;
91 static COUNTER: AtomicU64 = AtomicU64::new(0);
92 let ns = SystemTime::now()
93 .duration_since(SystemTime::UNIX_EPOCH)
94 .unwrap_or_default()
95 .as_nanos() as u64;
96 let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
97 format!("{:016x}{:04x}", ns, seq & 0xFFFF)
98}
99
100impl SessionManager {
101 pub fn new(config: SessionManagerConfig) -> Self {
103 let generate_id = config
104 .generate_id
105 .unwrap_or_else(|| Box::new(default_generate_id));
106 Self {
107 ttl: config.ttl,
108 generate_id,
109 inner: Mutex::new(Inner {
110 active: HashMap::new(),
111 pending: HashMap::new(),
112 connections: HashMap::new(),
113 }),
114 }
115 }
116
117 pub fn create_session(&self, props: HashMap<String, serde_json::Value>) -> SessionInfo {
119 let mut inner = self.inner.lock().unwrap();
120 let mut id = (self.generate_id)();
121 for _ in 0..10 {
122 if !inner.active.contains_key(&id) && !inner.pending.contains_key(&id) {
123 break;
124 }
125 id = (self.generate_id)();
126 }
127 let now = Instant::now();
128 let info = SessionInfo {
129 id: id.clone(),
130 created_at: now,
131 last_connected_at: now,
132 props,
133 };
134 inner.active.insert(id, info.clone());
135 info
136 }
137
138 pub fn get_active_session(&self, id: &str) -> Option<SessionInfo> {
140 self.inner.lock().unwrap().active.get(id).cloned()
141 }
142
143 pub fn suspend_session<F>(&self, id: &str, saved_state: serde_json::Value, on_expire: F) -> bool
152 where
153 F: FnOnce() + Send + 'static,
154 {
155 let mut inner = self.inner.lock().unwrap();
156 let info = match inner.active.remove(id) {
157 Some(s) => s,
158 None => return false,
159 };
160
161 let cancel = Arc::new(Mutex::new(false));
162 let entry = PendingEntry {
163 info: info.clone(),
164 saved_state,
165 cancel: Arc::clone(&cancel),
166 };
167 inner.pending.insert(id.to_string(), entry);
168 drop(inner);
169
170 let ttl = self.ttl;
173 let id_owned = id.to_string();
174 let inner_ref = &self.inner as *const Mutex<Inner>;
175 let inner_ptr = inner_ref as usize;
181 std::thread::spawn(move || {
182 std::thread::sleep(ttl);
183 if *cancel.lock().unwrap() {
184 return;
185 }
186 let inner: &Mutex<Inner> = unsafe { &*(inner_ptr as *const Mutex<Inner>) };
192 let mut guard = inner.lock().unwrap();
193 if guard.pending.remove(&id_owned).is_some() {
194 drop(guard);
195 on_expire();
196 }
197 });
198
199 true
200 }
201
202 pub fn resume_session(&self, id: &str) -> Option<PendingSession> {
205 let mut inner = self.inner.lock().unwrap();
206 let entry = inner.pending.remove(id)?;
207 *entry.cancel.lock().unwrap() = true;
208 let mut info = entry.info;
209 info.last_connected_at = Instant::now();
210 inner.active.insert(id.to_string(), info.clone());
211 Some(PendingSession {
212 info,
213 saved_state: entry.saved_state,
214 cancel: entry.cancel,
215 })
216 }
217
218 pub fn destroy_session(&self, id: &str) {
220 let mut inner = self.inner.lock().unwrap();
221 inner.active.remove(id);
222 if let Some(entry) = inner.pending.remove(id) {
223 *entry.cancel.lock().unwrap() = true;
224 }
225 inner.connections.remove(id);
226 }
227
228 pub fn track_connection(&self, session_id: &str, conn_id: u64) {
231 let mut inner = self.inner.lock().unwrap();
232 inner
233 .connections
234 .entry(session_id.to_string())
235 .or_default()
236 .insert(conn_id);
237 }
238
239 pub fn untrack_connection(&self, session_id: &str, conn_id: u64) {
241 let mut inner = self.inner.lock().unwrap();
242 if let Some(conns) = inner.connections.get_mut(session_id) {
243 conns.remove(&conn_id);
244 }
245 }
246
247 pub fn connection_count(&self, session_id: &str) -> usize {
249 self.inner
250 .lock()
251 .unwrap()
252 .connections
253 .get(session_id)
254 .map(|c| c.len())
255 .unwrap_or(0)
256 }
257
258 pub fn shutdown(&self) {
260 let mut inner = self.inner.lock().unwrap();
261 for (_, entry) in inner.pending.drain() {
262 *entry.cancel.lock().unwrap() = true;
263 }
264 inner.active.clear();
265 inner.connections.clear();
266 }
267}