1use std::sync::Arc;
23use std::time::{Duration, Instant};
24
25use dashmap::DashMap;
26use tokio::sync::Mutex as AsyncMutex;
27
28use crate::bundle::CompiledBundle;
29use crate::engine::{RunContext, RunOptions, ScriptEngineConfig, Session, SessionRun};
30use crate::error::ScriptError;
31use crate::result::ScriptResult;
32use crate::vars::InMemoryVars;
33
34pub struct BrowserSession {
40 vm: Option<Session>,
41 vars: Arc<InMemoryVars>,
42 procs: Arc<crate::session_procs::SessionProcs>,
46 last_used: Instant,
47 epoch: Option<u64>,
50}
51
52impl BrowserSession {
53 fn new() -> Self {
54 Self {
55 vm: None,
56 vars: Arc::new(InMemoryVars::new()),
57 procs: Arc::new(crate::session_procs::SessionProcs::default()),
58 last_used: Instant::now(),
59 epoch: None,
60 }
61 }
62
63 #[must_use]
68 pub fn vars(&self) -> Arc<InMemoryVars> {
69 self.vars.clone()
70 }
71
72 fn has_vm(&self) -> bool {
73 self.vm.is_some()
74 }
75
76 fn drop_vm(&mut self) {
80 self.vm = None;
81 }
82
83 async fn ensure_vm(
94 &mut self,
95 config: ScriptEngineConfig,
96 context: &RunContext,
97 epoch: Option<u64>,
98 ) -> Result<(), ScriptError> {
99 if self.vm.is_some() && self.epoch != epoch {
100 self.vm = None;
101 }
102 if self.vm.is_none() {
103 self.vm = Some(Session::create(config, context).await?);
104 self.epoch = epoch;
105 }
106 Ok(())
107 }
108
109 fn finish_run(&mut self, run: SessionRun) -> ScriptResult {
113 if run.poisoned {
114 self.vm = None;
115 }
116 self.last_used = Instant::now();
117 run.result
118 }
119
120 pub async fn run(
121 &mut self,
122 config: ScriptEngineConfig,
123 source: &str,
124 args: &[serde_json::Value],
125 options: RunOptions,
126 context: RunContext,
127 epoch: Option<u64>,
128 ) -> ScriptResult {
129 if let Err(e) = self.ensure_vm(config, &context, epoch).await {
130 self.last_used = Instant::now();
131 return ScriptResult::err(e, 0, Vec::new());
132 }
133 let Some(vm) = self.vm.as_ref() else {
134 return ScriptResult::err(
135 ScriptError::internal("session vm unexpectedly absent".to_string()),
136 0,
137 Vec::new(),
138 );
139 };
140 vm.install_session_procs(self.procs.clone()).await;
144 let run = vm.execute(source, args, options, &context).await;
145 self.finish_run(run)
146 }
147
148 pub async fn run_module(
152 &mut self,
153 config: ScriptEngineConfig,
154 bundle: &CompiledBundle,
155 args: &[serde_json::Value],
156 options: RunOptions,
157 context: RunContext,
158 epoch: Option<u64>,
159 ) -> ScriptResult {
160 if let Err(e) = self.ensure_vm(config, &context, epoch).await {
161 self.last_used = Instant::now();
162 return ScriptResult::err(e, 0, Vec::new());
163 }
164 let Some(vm) = self.vm.as_ref() else {
165 return ScriptResult::err(
166 ScriptError::internal("session vm unexpectedly absent".to_string()),
167 0,
168 Vec::new(),
169 );
170 };
171 vm.install_session_procs(self.procs.clone()).await;
172 let run = vm.execute_module(bundle, args, options, &context).await;
173 self.finish_run(run)
174 }
175}
176
177pub struct SessionTable {
180 map: DashMap<String, Arc<AsyncMutex<BrowserSession>>>,
181 max_vms: usize,
183 idle_ttl: Option<Duration>,
184}
185
186impl SessionTable {
187 #[must_use]
188 pub fn new(max_vms: usize, idle_ttl: Option<Duration>) -> Self {
189 Self {
190 map: DashMap::new(),
191 max_vms: max_vms.max(1),
192 idle_ttl,
193 }
194 }
195
196 pub fn acquire(&self, name: &str) -> Arc<AsyncMutex<BrowserSession>> {
210 if let Some(ttl) = self.idle_ttl {
211 let now = Instant::now();
212 let expired: Vec<String> = self
213 .map
214 .iter()
215 .filter_map(|entry| match entry.value().try_lock() {
216 Ok(s) if now.duration_since(s.last_used) >= ttl => Some(entry.key().clone()),
217 Ok(_) | Err(_) => None,
218 })
219 .collect();
220 for key in expired {
221 self.map.remove(&key);
222 }
223 }
224
225 let will_build = self.map.get(name).is_none_or(|slot| match slot.value().try_lock() {
228 Ok(s) => !s.has_vm(),
229 Err(_) => false,
230 });
231
232 if will_build {
233 let mut live: Vec<(String, Instant)> = self
234 .map
235 .iter()
236 .filter(|entry| entry.key().as_str() != name)
237 .filter_map(|entry| {
238 entry
239 .value()
240 .try_lock()
241 .ok()
242 .and_then(|g| g.has_vm().then(|| (entry.key().clone(), g.last_used)))
243 })
244 .collect();
245 if live.len() >= self.max_vms {
246 live.sort_by_key(|(_, t)| *t);
247 if let Some((victim, _)) = live.first()
248 && let Some(slot) = self.map.get(victim)
249 && let Ok(mut g) = slot.value().try_lock()
250 {
251 g.drop_vm();
252 }
253 }
254 }
255
256 let entry = self
257 .map
258 .entry(name.to_string())
259 .or_insert_with(|| Arc::new(AsyncMutex::new(BrowserSession::new())));
260 Arc::clone(entry.value())
261 }
262
263 pub fn remove(&self, name: &str) {
266 self.map.remove(name);
267 }
268
269 pub fn clear(&self) {
271 self.map.clear();
272 }
273
274 #[must_use]
276 pub fn len(&self) -> usize {
277 self.map.len()
278 }
279
280 #[must_use]
281 pub fn is_empty(&self) -> bool {
282 self.len() == 0
283 }
284
285 #[must_use]
288 pub fn live_vm_count(&self) -> usize {
289 self
290 .map
291 .iter()
292 .filter(|entry| entry.value().try_lock().map_or(true, |g| g.has_vm()))
293 .count()
294 }
295}
296
297#[cfg(test)]
298mod tests {
299 use std::sync::Arc;
300
301 use super::*;
302 use crate::fs::PathSandbox;
303
304 fn ctx_with(vars: Arc<InMemoryVars>) -> (tempfile::TempDir, RunContext) {
305 let tmp = tempfile::tempdir().expect("tempdir");
306 let ctx = RunContext {
307 vars,
308 sandbox: Arc::new(PathSandbox::new(tmp.path()).expect("sandbox")),
309 artifacts: None,
310 page: None,
311 browser_context: None,
312 request: None,
313 browser: None,
314 plugins: Vec::new(),
315 trusted_modules: false,
316 host: crate::engine::ExtensionHost::Script,
317 caps: crate::engine::ScriptCaps::default(),
318 };
319 (tmp, ctx)
320 }
321
322 async fn run(slot: &Arc<AsyncMutex<BrowserSession>>, src: &str, epoch: Option<u64>) -> ScriptResult {
323 let mut s = slot.lock().await;
324 let vars = s.vars();
325 let (_tmp, ctx) = ctx_with(vars);
326 s.run(
327 ScriptEngineConfig::default(),
328 src,
329 &[],
330 RunOptions::default(),
331 ctx,
332 epoch,
333 )
334 .await
335 }
336
337 #[track_caller]
338 fn assert_ok(actual: &ScriptResult, expected: serde_json::Value) {
339 match &actual.outcome {
340 crate::result::Outcome::Ok { success } => assert_eq!(success.value, expected, "unexpected script value"),
341 crate::result::Outcome::Error { error } => panic!("expected ok {expected}, got error: {error:?}"),
342 }
343 }
344
345 #[tokio::test(flavor = "multi_thread")]
346 async fn vars_persist_but_globalthis_dies_on_browser_swap() {
347 let table = SessionTable::new(8, None);
348 let slot = table.acquire("s");
349
350 let r = run(&slot, "globalThis.k = 7; vars.set('v', 'keep'); return 'a';", Some(1)).await;
351 assert!(r.is_ok(), "{r:?}");
352 let r = run(&slot, "return globalThis.k ?? 'gone';", Some(1)).await;
353 assert_ok(&r, serde_json::json!(7));
354
355 let r = run(&slot, "return globalThis.k ?? 'gone';", Some(2)).await;
358 assert_ok(&r, serde_json::json!("gone"));
359 let r = run(&slot, "return vars.get('v') ?? 'missing';", Some(2)).await;
360 assert_ok(&r, serde_json::json!("keep"));
361 }
362
363 #[tokio::test(flavor = "multi_thread")]
364 async fn cap_evicts_vm_but_vars_survive_the_eviction() {
365 let table = SessionTable::new(1, None);
366 let a = table.acquire("a");
367 let r = run(&a, "globalThis.g = 1; vars.set('tok', 'abc'); return 1;", None).await;
368 assert!(r.is_ok(), "{r:?}");
369
370 let b = table.acquire("b");
373 let _ = run(&b, "return 1;", None).await;
374
375 assert_eq!(table.len(), 2, "both session records live (vars tier)");
376 {
377 let slot = table.map.get("a").unwrap();
378 let ga = slot.value().try_lock().unwrap();
379 assert!(!ga.has_vm(), "a's VM was evicted under the cap");
380 }
381
382 let r = run(&a, "return globalThis.g ?? 'rebuilt';", None).await;
384 assert_ok(&r, serde_json::json!("rebuilt"));
385 let r = run(&a, "return vars.get('tok') ?? 'lost';", None).await;
386 assert_ok(&r, serde_json::json!("abc"));
387 }
388
389 #[tokio::test(flavor = "multi_thread")]
390 async fn in_flight_vm_is_never_cap_evicted() {
391 let table = SessionTable::new(1, None);
392 let a = table.acquire("a");
393 run(&a, "return 1;", None).await;
394
395 let a_guard = a.lock().await;
398 let b = table.acquire("b");
399 run(&b, "return 1;", None).await;
400 assert!(a_guard.has_vm(), "in-flight VM kept despite cap pressure");
401 drop(a_guard);
402 }
403
404 #[tokio::test(flavor = "multi_thread")]
405 async fn idle_ttl_reaps_whole_session_including_vars() {
406 let table = SessionTable::new(64, Some(Duration::from_millis(60)));
407 let a = table.acquire("a");
408 run(&a, "vars.set('x','1'); return 1;", None).await;
409 tokio::time::sleep(Duration::from_millis(120)).await;
410 let _b = table.acquire("b"); let present = (table.map.contains_key("a"), table.map.contains_key("b"));
412 assert_eq!(present, (false, true), "idle session reaped whole; fresh kept");
413 }
414
415 #[tokio::test(flavor = "multi_thread")]
416 async fn poison_on_timeout_rebuilds_next_call() {
417 let table = SessionTable::new(8, None);
418 let slot = table.acquire("s");
419 {
420 let mut s = slot.lock().await;
421 let vars = s.vars();
422 let (_tmp, ctx) = ctx_with(vars);
423 let opts = RunOptions {
424 timeout: Some(Duration::from_millis(50)),
425 ..RunOptions::default()
426 };
427 let r = s
428 .run(
429 ScriptEngineConfig::default(),
430 "globalThis.before = 1; while (true) {}",
431 &[],
432 opts,
433 ctx,
434 None,
435 )
436 .await;
437 assert!(r.is_err(), "infinite loop must time out");
438 assert!(!s.has_vm(), "timeout must poison (discard) the VM");
439 }
440 let r = run(&slot, "return globalThis.before ?? 'fresh';", None).await;
441 assert_ok(&r, serde_json::json!("fresh"));
442 }
443}