Skip to main content

ferridriver_script/
session_table.rs

1//! Persistent per-session script VMs, as one owned aggregate, with a
2//! crisp two-tier lifetime:
3//!
4//! - **The VM** (`globalThis`, compiled plugin bytecode, timers) is the
5//!   heavy, disposable tier. It is rebuilt on poison (timeout/OOM), on a
6//!   browser-session swap (relaunch/reconnect under the same name), and
7//!   dropped under the warm-VM cap when another session needs a slot.
8//! - **`vars`** is the light, durable tier: a string store that lives
9//!   for the *logical session's* whole lifetime. It survives every VM
10//!   rebuild above — cap eviction drops only the VM, not the session
11//!   record. The single thing `globalThis` cannot give you.
12//!
13//! A logical session ends (and its `vars` are released) only on: an
14//! idle-TTL reap, an explicit [`SessionTable::remove`], or
15//! [`SessionTable::clear`] (server shutdown). That is the whole `vars`
16//! durability contract — no fuzzier than that.
17//!
18//! Browser-agnostic by construction: a `RunContext` carries whatever
19//! browser handles a call has (or `None`) and the browser `epoch` is
20//! passed in, so every policy here is unit-testable without a browser.
21
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24
25use dashmap::DashMap;
26use tokio::sync::Mutex as AsyncMutex;
27
28use crate::bundle::CompiledBundle;
29use crate::engine::{RunContext, RunOptions, ScriptEngineConfig, Session, SessionRun};
30use crate::error::ScriptError;
31use crate::result::ScriptResult;
32use crate::vars::InMemoryVars;
33
34/// One logical session: the (disposable) persistent VM, the (durable)
35/// session-scoped `vars`, and the browser generation the live VM was
36/// built against. Access is serialized by the [`AsyncMutex`]
37/// [`SessionTable`] hands out — that slot lock IS the per-session
38/// execution guard, so the invariant is structural, not a comment.
39pub struct BrowserSession {
40  vm: Option<Session>,
41  vars: Arc<InMemoryVars>,
42  /// Persistent child processes (servers/watchers) started by this
43  /// session. Durable tier alongside `vars`: survives VM rebuild;
44  /// `Drop` (idle reap / close / shutdown) SIGKILLs every group.
45  procs: Arc<crate::session_procs::SessionProcs>,
46  last_used: Instant,
47  /// Browser instance generation the live `vm` was built against.
48  /// `None` until first build, or when no browser is bound.
49  epoch: Option<u64>,
50}
51
52impl BrowserSession {
53  fn new() -> Self {
54    Self {
55      vm: None,
56      vars: Arc::new(InMemoryVars::new()),
57      procs: Arc::new(crate::session_procs::SessionProcs::default()),
58      last_used: Instant::now(),
59      epoch: None,
60    }
61  }
62
63  /// The session-scoped `vars` store. Outlives every VM rebuild and cap
64  /// eviction for the session's whole lifetime; released only when the
65  /// session record itself is dropped (idle-TTL reap / explicit close).
66  /// The caller threads this into the `RunContext` for [`Self::run`].
67  #[must_use]
68  pub fn vars(&self) -> Arc<InMemoryVars> {
69    self.vars.clone()
70  }
71
72  fn has_vm(&self) -> bool {
73    self.vm.is_some()
74  }
75
76  /// Drop only the VM, keeping the durable `vars` and the session
77  /// record. Used by the warm-VM cap: a capped-out session keeps its
78  /// identity + `vars`, just loses its compiled VM until next call.
79  fn drop_vm(&mut self) {
80    self.vm = None;
81  }
82
83  /// Execute one script against the persistent VM.
84  ///
85  /// Rebuilds the VM when: it does not exist yet, a prior call poisoned
86  /// it (timeout/OOM force-halt), or `epoch` no longer matches the
87  /// browser session it was built against (relaunch/reconnect under the
88  /// same session name — a *different* browser, so any JS handles the
89  /// old `globalThis` cached are dead and must not be reachable). In
90  /// every one of those cases `vars` is untouched.
91  /// Ensure the session VM exists for `epoch`, rebuilding it when the
92  /// epoch changed (a browser reattach invalidates the prior VM).
93  async fn ensure_vm(
94    &mut self,
95    config: ScriptEngineConfig,
96    context: &RunContext,
97    epoch: Option<u64>,
98  ) -> Result<(), ScriptError> {
99    if self.vm.is_some() && self.epoch != epoch {
100      self.vm = None;
101    }
102    if self.vm.is_none() {
103      self.vm = Some(Session::create(config, context).await?);
104      self.epoch = epoch;
105    }
106    Ok(())
107  }
108
109  /// Apply the poison rule (discard the VM after a timeout/OOM force-halt
110  /// so the next call rebuilds; a plain throw keeps the warm VM) and the
111  /// last-used bookkeeping, returning the run's result.
112  fn finish_run(&mut self, run: SessionRun) -> ScriptResult {
113    if run.poisoned {
114      self.vm = None;
115    }
116    self.last_used = Instant::now();
117    run.result
118  }
119
120  pub async fn run(
121    &mut self,
122    config: ScriptEngineConfig,
123    source: &str,
124    args: &[serde_json::Value],
125    options: RunOptions,
126    context: RunContext,
127    epoch: Option<u64>,
128  ) -> ScriptResult {
129    if let Err(e) = self.ensure_vm(config, &context, epoch).await {
130      self.last_used = Instant::now();
131      return ScriptResult::err(e, 0, Vec::new());
132    }
133    let Some(vm) = self.vm.as_ref() else {
134      return ScriptResult::err(
135        ScriptError::internal("session vm unexpectedly absent".to_string()),
136        0,
137        Vec::new(),
138      );
139    };
140    // Re-install the durable process registry on every (re)built VM so a
141    // tool's `commands` start/status/stop reaches the SAME registry that
142    // outlives the VM.
143    vm.install_session_procs(self.procs.clone()).await;
144    let run = vm.execute(source, args, options, &context).await;
145    self.finish_run(run)
146  }
147
148  /// Like [`Self::run`], but executes a precompiled bundled ES module — the
149  /// TypeScript / `import` path. The run's result is the module's
150  /// `default` export. Shares VM lifecycle + poison handling with `run`.
151  pub async fn run_module(
152    &mut self,
153    config: ScriptEngineConfig,
154    bundle: &CompiledBundle,
155    args: &[serde_json::Value],
156    options: RunOptions,
157    context: RunContext,
158    epoch: Option<u64>,
159  ) -> ScriptResult {
160    if let Err(e) = self.ensure_vm(config, &context, epoch).await {
161      self.last_used = Instant::now();
162      return ScriptResult::err(e, 0, Vec::new());
163    }
164    let Some(vm) = self.vm.as_ref() else {
165      return ScriptResult::err(
166        ScriptError::internal("session vm unexpectedly absent".to_string()),
167        0,
168        Vec::new(),
169      );
170    };
171    vm.install_session_procs(self.procs.clone()).await;
172    let run = vm.execute_module(bundle, args, options, &context).await;
173    self.finish_run(run)
174  }
175}
176
177/// The set of live sessions plus the retention policy. Cheap to share
178/// (`Arc` it); every method takes `&self`.
179pub struct SessionTable {
180  map: DashMap<String, Arc<AsyncMutex<BrowserSession>>>,
181  /// Upper bound on concurrently-warm VMs (not session records).
182  max_vms: usize,
183  idle_ttl: Option<Duration>,
184}
185
186impl SessionTable {
187  #[must_use]
188  pub fn new(max_vms: usize, idle_ttl: Option<Duration>) -> Self {
189    Self {
190      map: DashMap::new(),
191      max_vms: max_vms.max(1),
192      idle_ttl,
193    }
194  }
195
196  /// Get (or create) the slot for `name`. Before returning it this:
197  ///
198  /// 1. Reaps idle sessions whole (past `idle_ttl`) — the only implicit
199  ///    end of a logical session; its `vars` go with it.
200  /// 2. If this acquire will build a VM and the warm-VM cap is already
201  ///    met, drops the *VM* of the least-recently-used other session
202  ///    (its session record + `vars` stay; it rebuilds on next use).
203  ///
204  /// A slot currently locked (execution in flight) is never reaped nor
205  /// VM-evicted — the cap is soft; correctness over the bound. The
206  /// returned slot's [`AsyncMutex`] is the per-session execution guard:
207  /// `lock().await` it, build a `RunContext` with its `vars()`, then
208  /// call [`BrowserSession::run`].
209  pub fn acquire(&self, name: &str) -> Arc<AsyncMutex<BrowserSession>> {
210    if let Some(ttl) = self.idle_ttl {
211      let now = Instant::now();
212      let expired: Vec<String> = self
213        .map
214        .iter()
215        .filter_map(|entry| match entry.value().try_lock() {
216          Ok(s) if now.duration_since(s.last_used) >= ttl => Some(entry.key().clone()),
217          Ok(_) | Err(_) => None,
218        })
219        .collect();
220      for key in expired {
221        self.map.remove(&key);
222      }
223    }
224
225    // A build happens unless an entry already holds a live VM for this
226    // name (a locked entry owns its own VM lifecycle — don't second-guess).
227    let will_build = self.map.get(name).is_none_or(|slot| match slot.value().try_lock() {
228      Ok(s) => !s.has_vm(),
229      Err(_) => false,
230    });
231
232    if will_build {
233      let mut live: Vec<(String, Instant)> = self
234        .map
235        .iter()
236        .filter(|entry| entry.key().as_str() != name)
237        .filter_map(|entry| {
238          entry
239            .value()
240            .try_lock()
241            .ok()
242            .and_then(|g| g.has_vm().then(|| (entry.key().clone(), g.last_used)))
243        })
244        .collect();
245      if live.len() >= self.max_vms {
246        live.sort_by_key(|(_, t)| *t);
247        if let Some((victim, _)) = live.first()
248          && let Some(slot) = self.map.get(victim)
249          && let Ok(mut g) = slot.value().try_lock()
250        {
251          g.drop_vm();
252        }
253      }
254    }
255
256    let entry = self
257      .map
258      .entry(name.to_string())
259      .or_insert_with(|| Arc::new(AsyncMutex::new(BrowserSession::new())));
260    Arc::clone(entry.value())
261  }
262
263  /// End a logical session (explicit close / browser shutdown): drops
264  /// the slot and its durable `vars`. A later `acquire` starts fresh.
265  pub fn remove(&self, name: &str) {
266    self.map.remove(name);
267  }
268
269  /// End every session (server shutdown).
270  pub fn clear(&self) {
271    self.map.clear();
272  }
273
274  /// Number of live session records (durable tier), warm or not.
275  #[must_use]
276  pub fn len(&self) -> usize {
277    self.map.len()
278  }
279
280  #[must_use]
281  pub fn is_empty(&self) -> bool {
282    self.len() == 0
283  }
284
285  /// Number of sessions currently holding a warm VM (heavy tier).
286  /// Bounded by `max_vms` modulo in-flight soft-cap slack.
287  #[must_use]
288  pub fn live_vm_count(&self) -> usize {
289    self
290      .map
291      .iter()
292      .filter(|entry| entry.value().try_lock().map_or(true, |g| g.has_vm()))
293      .count()
294  }
295}
296
297#[cfg(test)]
298mod tests {
299  use std::sync::Arc;
300
301  use super::*;
302  use crate::fs::PathSandbox;
303
304  fn ctx_with(vars: Arc<InMemoryVars>) -> (tempfile::TempDir, RunContext) {
305    let tmp = tempfile::tempdir().expect("tempdir");
306    let ctx = RunContext {
307      vars,
308      sandbox: Arc::new(PathSandbox::new(tmp.path()).expect("sandbox")),
309      artifacts: None,
310      page: None,
311      browser_context: None,
312      request: None,
313      browser: None,
314      plugins: Vec::new(),
315      trusted_modules: false,
316      host: crate::engine::ExtensionHost::Script,
317      caps: crate::engine::ScriptCaps::default(),
318    };
319    (tmp, ctx)
320  }
321
322  async fn run(slot: &Arc<AsyncMutex<BrowserSession>>, src: &str, epoch: Option<u64>) -> ScriptResult {
323    let mut s = slot.lock().await;
324    let vars = s.vars();
325    let (_tmp, ctx) = ctx_with(vars);
326    s.run(
327      ScriptEngineConfig::default(),
328      src,
329      &[],
330      RunOptions::default(),
331      ctx,
332      epoch,
333    )
334    .await
335  }
336
337  #[track_caller]
338  fn assert_ok(actual: &ScriptResult, expected: serde_json::Value) {
339    match &actual.outcome {
340      crate::result::Outcome::Ok { success } => assert_eq!(success.value, expected, "unexpected script value"),
341      crate::result::Outcome::Error { error } => panic!("expected ok {expected}, got error: {error:?}"),
342    }
343  }
344
345  #[tokio::test(flavor = "multi_thread")]
346  async fn vars_persist_but_globalthis_dies_on_browser_swap() {
347    let table = SessionTable::new(8, None);
348    let slot = table.acquire("s");
349
350    let r = run(&slot, "globalThis.k = 7; vars.set('v', 'keep'); return 'a';", Some(1)).await;
351    assert!(r.is_ok(), "{r:?}");
352    let r = run(&slot, "return globalThis.k ?? 'gone';", Some(1)).await;
353    assert_ok(&r, serde_json::json!(7));
354
355    // Browser relaunched (epoch change): VM rebuilt, globalThis gone,
356    // durable vars survive.
357    let r = run(&slot, "return globalThis.k ?? 'gone';", Some(2)).await;
358    assert_ok(&r, serde_json::json!("gone"));
359    let r = run(&slot, "return vars.get('v') ?? 'missing';", Some(2)).await;
360    assert_ok(&r, serde_json::json!("keep"));
361  }
362
363  #[tokio::test(flavor = "multi_thread")]
364  async fn cap_evicts_vm_but_vars_survive_the_eviction() {
365    let table = SessionTable::new(1, None);
366    let a = table.acquire("a");
367    let r = run(&a, "globalThis.g = 1; vars.set('tok', 'abc'); return 1;", None).await;
368    assert!(r.is_ok(), "{r:?}");
369
370    // Acquiring + running "b" needs a VM; cap is 1, so "a"'s VM is
371    // evicted — but its session record + vars stay.
372    let b = table.acquire("b");
373    let _ = run(&b, "return 1;", None).await;
374
375    assert_eq!(table.len(), 2, "both session records live (vars tier)");
376    {
377      let slot = table.map.get("a").unwrap();
378      let ga = slot.value().try_lock().unwrap();
379      assert!(!ga.has_vm(), "a's VM was evicted under the cap");
380    }
381
382    // "a" rebuilds on next use: globalThis gone, durable vars intact.
383    let r = run(&a, "return globalThis.g ?? 'rebuilt';", None).await;
384    assert_ok(&r, serde_json::json!("rebuilt"));
385    let r = run(&a, "return vars.get('tok') ?? 'lost';", None).await;
386    assert_ok(&r, serde_json::json!("abc"));
387  }
388
389  #[tokio::test(flavor = "multi_thread")]
390  async fn in_flight_vm_is_never_cap_evicted() {
391    let table = SessionTable::new(1, None);
392    let a = table.acquire("a");
393    run(&a, "return 1;", None).await;
394
395    // Hold "a" locked (in flight) while "b" forces cap pressure: "a"'s
396    // VM must NOT be dropped (locked => skipped), soft cap slack.
397    let a_guard = a.lock().await;
398    let b = table.acquire("b");
399    run(&b, "return 1;", None).await;
400    assert!(a_guard.has_vm(), "in-flight VM kept despite cap pressure");
401    drop(a_guard);
402  }
403
404  #[tokio::test(flavor = "multi_thread")]
405  async fn idle_ttl_reaps_whole_session_including_vars() {
406    let table = SessionTable::new(64, Some(Duration::from_millis(60)));
407    let a = table.acquire("a");
408    run(&a, "vars.set('x','1'); return 1;", None).await;
409    tokio::time::sleep(Duration::from_millis(120)).await;
410    let _b = table.acquire("b"); // triggers reap sweep
411    let present = (table.map.contains_key("a"), table.map.contains_key("b"));
412    assert_eq!(present, (false, true), "idle session reaped whole; fresh kept");
413  }
414
415  #[tokio::test(flavor = "multi_thread")]
416  async fn poison_on_timeout_rebuilds_next_call() {
417    let table = SessionTable::new(8, None);
418    let slot = table.acquire("s");
419    {
420      let mut s = slot.lock().await;
421      let vars = s.vars();
422      let (_tmp, ctx) = ctx_with(vars);
423      let opts = RunOptions {
424        timeout: Some(Duration::from_millis(50)),
425        ..RunOptions::default()
426      };
427      let r = s
428        .run(
429          ScriptEngineConfig::default(),
430          "globalThis.before = 1; while (true) {}",
431          &[],
432          opts,
433          ctx,
434          None,
435        )
436        .await;
437      assert!(r.is_err(), "infinite loop must time out");
438      assert!(!s.has_vm(), "timeout must poison (discard) the VM");
439    }
440    let r = run(&slot, "return globalThis.before ?? 'fresh';", None).await;
441    assert_ok(&r, serde_json::json!("fresh"));
442  }
443}