use std::sync::Arc;
use std::time::{Duration, Instant};
use dashmap::DashMap;
use tokio::sync::Mutex as AsyncMutex;
use crate::bundle::CompiledBundle;
use crate::engine::{RunContext, RunOptions, ScriptEngineConfig, Session, SessionRun};
use crate::error::ScriptError;
use crate::result::ScriptResult;
use crate::vars::InMemoryVars;
pub struct BrowserSession {
vm: Option<Session>,
vars: Arc<InMemoryVars>,
procs: Arc<crate::session_procs::SessionProcs>,
last_used: Instant,
epoch: Option<u64>,
}
impl BrowserSession {
fn new() -> Self {
Self {
vm: None,
vars: Arc::new(InMemoryVars::new()),
procs: Arc::new(crate::session_procs::SessionProcs::default()),
last_used: Instant::now(),
epoch: None,
}
}
#[must_use]
pub fn vars(&self) -> Arc<InMemoryVars> {
self.vars.clone()
}
fn has_vm(&self) -> bool {
self.vm.is_some()
}
fn drop_vm(&mut self) {
self.vm = None;
}
async fn ensure_vm(
&mut self,
config: ScriptEngineConfig,
context: &RunContext,
epoch: Option<u64>,
) -> Result<(), ScriptError> {
if self.vm.is_some() && self.epoch != epoch {
self.vm = None;
}
if self.vm.is_none() {
self.vm = Some(Session::create(config, context).await?);
self.epoch = epoch;
}
Ok(())
}
fn finish_run(&mut self, run: SessionRun) -> ScriptResult {
if run.poisoned {
self.vm = None;
}
self.last_used = Instant::now();
run.result
}
pub async fn run(
&mut self,
config: ScriptEngineConfig,
source: &str,
args: &[serde_json::Value],
options: RunOptions,
context: RunContext,
epoch: Option<u64>,
) -> ScriptResult {
if let Err(e) = self.ensure_vm(config, &context, epoch).await {
self.last_used = Instant::now();
return ScriptResult::err(e, 0, Vec::new());
}
let Some(vm) = self.vm.as_ref() else {
return ScriptResult::err(
ScriptError::internal("session vm unexpectedly absent".to_string()),
0,
Vec::new(),
);
};
vm.install_session_procs(self.procs.clone()).await;
let run = vm.execute(source, args, options, &context).await;
self.finish_run(run)
}
pub async fn run_module(
&mut self,
config: ScriptEngineConfig,
bundle: &CompiledBundle,
args: &[serde_json::Value],
options: RunOptions,
context: RunContext,
epoch: Option<u64>,
) -> ScriptResult {
if let Err(e) = self.ensure_vm(config, &context, epoch).await {
self.last_used = Instant::now();
return ScriptResult::err(e, 0, Vec::new());
}
let Some(vm) = self.vm.as_ref() else {
return ScriptResult::err(
ScriptError::internal("session vm unexpectedly absent".to_string()),
0,
Vec::new(),
);
};
vm.install_session_procs(self.procs.clone()).await;
let run = vm.execute_module(bundle, args, options, &context).await;
self.finish_run(run)
}
}
pub struct SessionTable {
map: DashMap<String, Arc<AsyncMutex<BrowserSession>>>,
max_vms: usize,
idle_ttl: Option<Duration>,
}
impl SessionTable {
#[must_use]
pub fn new(max_vms: usize, idle_ttl: Option<Duration>) -> Self {
Self {
map: DashMap::new(),
max_vms: max_vms.max(1),
idle_ttl,
}
}
pub fn acquire(&self, name: &str) -> Arc<AsyncMutex<BrowserSession>> {
if let Some(ttl) = self.idle_ttl {
let now = Instant::now();
let expired: Vec<String> = self
.map
.iter()
.filter_map(|entry| match entry.value().try_lock() {
Ok(s) if now.duration_since(s.last_used) >= ttl => Some(entry.key().clone()),
Ok(_) | Err(_) => None,
})
.collect();
for key in expired {
self.map.remove(&key);
}
}
let will_build = self.map.get(name).is_none_or(|slot| match slot.value().try_lock() {
Ok(s) => !s.has_vm(),
Err(_) => false,
});
if will_build {
let mut live: Vec<(String, Instant)> = self
.map
.iter()
.filter(|entry| entry.key().as_str() != name)
.filter_map(|entry| {
entry
.value()
.try_lock()
.ok()
.and_then(|g| g.has_vm().then(|| (entry.key().clone(), g.last_used)))
})
.collect();
if live.len() >= self.max_vms {
live.sort_by_key(|(_, t)| *t);
if let Some((victim, _)) = live.first()
&& let Some(slot) = self.map.get(victim)
&& let Ok(mut g) = slot.value().try_lock()
{
g.drop_vm();
}
}
}
let entry = self
.map
.entry(name.to_string())
.or_insert_with(|| Arc::new(AsyncMutex::new(BrowserSession::new())));
Arc::clone(entry.value())
}
pub fn remove(&self, name: &str) {
self.map.remove(name);
}
pub fn clear(&self) {
self.map.clear();
}
#[must_use]
pub fn len(&self) -> usize {
self.map.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[must_use]
pub fn live_vm_count(&self) -> usize {
self
.map
.iter()
.filter(|entry| entry.value().try_lock().map_or(true, |g| g.has_vm()))
.count()
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::fs::PathSandbox;
fn ctx_with(vars: Arc<InMemoryVars>) -> (tempfile::TempDir, RunContext) {
let tmp = tempfile::tempdir().expect("tempdir");
let ctx = RunContext {
vars,
sandbox: Arc::new(PathSandbox::new(tmp.path()).expect("sandbox")),
artifacts: None,
page: None,
browser_context: None,
request: None,
browser: None,
plugins: Vec::new(),
trusted_modules: false,
host: crate::engine::ExtensionHost::Script,
caps: crate::engine::ScriptCaps::default(),
};
(tmp, ctx)
}
async fn run(slot: &Arc<AsyncMutex<BrowserSession>>, src: &str, epoch: Option<u64>) -> ScriptResult {
let mut s = slot.lock().await;
let vars = s.vars();
let (_tmp, ctx) = ctx_with(vars);
s.run(
ScriptEngineConfig::default(),
src,
&[],
RunOptions::default(),
ctx,
epoch,
)
.await
}
#[track_caller]
fn assert_ok(actual: &ScriptResult, expected: serde_json::Value) {
match &actual.outcome {
crate::result::Outcome::Ok { success } => assert_eq!(success.value, expected, "unexpected script value"),
crate::result::Outcome::Error { error } => panic!("expected ok {expected}, got error: {error:?}"),
}
}
#[tokio::test(flavor = "multi_thread")]
async fn vars_persist_but_globalthis_dies_on_browser_swap() {
let table = SessionTable::new(8, None);
let slot = table.acquire("s");
let r = run(&slot, "globalThis.k = 7; vars.set('v', 'keep'); return 'a';", Some(1)).await;
assert!(r.is_ok(), "{r:?}");
let r = run(&slot, "return globalThis.k ?? 'gone';", Some(1)).await;
assert_ok(&r, serde_json::json!(7));
let r = run(&slot, "return globalThis.k ?? 'gone';", Some(2)).await;
assert_ok(&r, serde_json::json!("gone"));
let r = run(&slot, "return vars.get('v') ?? 'missing';", Some(2)).await;
assert_ok(&r, serde_json::json!("keep"));
}
#[tokio::test(flavor = "multi_thread")]
async fn cap_evicts_vm_but_vars_survive_the_eviction() {
let table = SessionTable::new(1, None);
let a = table.acquire("a");
let r = run(&a, "globalThis.g = 1; vars.set('tok', 'abc'); return 1;", None).await;
assert!(r.is_ok(), "{r:?}");
let b = table.acquire("b");
let _ = run(&b, "return 1;", None).await;
assert_eq!(table.len(), 2, "both session records live (vars tier)");
{
let slot = table.map.get("a").unwrap();
let ga = slot.value().try_lock().unwrap();
assert!(!ga.has_vm(), "a's VM was evicted under the cap");
}
let r = run(&a, "return globalThis.g ?? 'rebuilt';", None).await;
assert_ok(&r, serde_json::json!("rebuilt"));
let r = run(&a, "return vars.get('tok') ?? 'lost';", None).await;
assert_ok(&r, serde_json::json!("abc"));
}
#[tokio::test(flavor = "multi_thread")]
async fn in_flight_vm_is_never_cap_evicted() {
let table = SessionTable::new(1, None);
let a = table.acquire("a");
run(&a, "return 1;", None).await;
let a_guard = a.lock().await;
let b = table.acquire("b");
run(&b, "return 1;", None).await;
assert!(a_guard.has_vm(), "in-flight VM kept despite cap pressure");
drop(a_guard);
}
#[tokio::test(flavor = "multi_thread")]
async fn idle_ttl_reaps_whole_session_including_vars() {
let table = SessionTable::new(64, Some(Duration::from_millis(60)));
let a = table.acquire("a");
run(&a, "vars.set('x','1'); return 1;", None).await;
tokio::time::sleep(Duration::from_millis(120)).await;
let _b = table.acquire("b"); let present = (table.map.contains_key("a"), table.map.contains_key("b"));
assert_eq!(present, (false, true), "idle session reaped whole; fresh kept");
}
#[tokio::test(flavor = "multi_thread")]
async fn poison_on_timeout_rebuilds_next_call() {
let table = SessionTable::new(8, None);
let slot = table.acquire("s");
{
let mut s = slot.lock().await;
let vars = s.vars();
let (_tmp, ctx) = ctx_with(vars);
let opts = RunOptions {
timeout: Some(Duration::from_millis(50)),
..RunOptions::default()
};
let r = s
.run(
ScriptEngineConfig::default(),
"globalThis.before = 1; while (true) {}",
&[],
opts,
ctx,
None,
)
.await;
assert!(r.is_err(), "infinite loop must time out");
assert!(!s.has_vm(), "timeout must poison (discard) the VM");
}
let r = run(&slot, "return globalThis.before ?? 'fresh';", None).await;
assert_ok(&r, serde_json::json!("fresh"));
}
}