Skip to main content

ferridriver_script/
session_table.rs

1//! Persistent per-session script VMs, as one owned aggregate, with a
2//! crisp two-tier lifetime:
3//!
4//! - **The VM** (`globalThis`, compiled plugin bytecode, timers) is the
5//!   heavy, disposable tier. It is rebuilt on poison (timeout/OOM), on a
6//!   browser-session swap (relaunch/reconnect under the same name), and
7//!   dropped under the warm-VM cap when another session needs a slot.
8//! - **`vars`** is the light, durable tier: a string store that lives
9//!   for the *logical session's* whole lifetime. It survives every VM
10//!   rebuild above — cap eviction drops only the VM, not the session
11//!   record. The single thing `globalThis` cannot give you.
12//!
13//! A logical session ends (and its `vars` are released) only on: an
14//! idle-TTL reap, an explicit [`SessionTable::remove`], or
15//! [`SessionTable::clear`] (server shutdown). That is the whole `vars`
16//! durability contract — no fuzzier than that.
17//!
18//! Browser-agnostic by construction: a `RunContext` carries whatever
19//! browser handles a call has (or `None`) and the browser `epoch` is
20//! passed in, so every policy here is unit-testable without a browser.
21
22use std::collections::HashMap;
23use std::sync::{Arc, Mutex, PoisonError};
24use std::time::{Duration, Instant};
25
26use tokio::sync::Mutex as AsyncMutex;
27
28use crate::engine::{RunContext, RunOptions, ScriptEngineConfig, Session};
29use crate::error::ScriptError;
30use crate::result::ScriptResult;
31use crate::vars::InMemoryVars;
32
33/// One logical session: the (disposable) persistent VM, the (durable)
34/// session-scoped `vars`, and the browser generation the live VM was
35/// built against. Access is serialized by the [`AsyncMutex`]
36/// [`SessionTable`] hands out — that slot lock IS the per-session
37/// execution guard, so the invariant is structural, not a comment.
38pub struct BrowserSession {
39  vm: Option<Session>,
40  vars: Arc<InMemoryVars>,
41  /// Persistent child processes (servers/watchers) started by this
42  /// session. Durable tier alongside `vars`: survives VM rebuild;
43  /// `Drop` (idle reap / close / shutdown) SIGKILLs every group.
44  procs: Arc<crate::session_procs::SessionProcs>,
45  last_used: Instant,
46  /// Browser instance generation the live `vm` was built against.
47  /// `None` until first build, or when no browser is bound.
48  epoch: Option<u64>,
49}
50
51impl BrowserSession {
52  fn new() -> Self {
53    Self {
54      vm: None,
55      vars: Arc::new(InMemoryVars::new()),
56      procs: Arc::new(crate::session_procs::SessionProcs::default()),
57      last_used: Instant::now(),
58      epoch: None,
59    }
60  }
61
62  /// The session-scoped `vars` store. Outlives every VM rebuild and cap
63  /// eviction for the session's whole lifetime; released only when the
64  /// session record itself is dropped (idle-TTL reap / explicit close).
65  /// The caller threads this into the `RunContext` for [`Self::run`].
66  #[must_use]
67  pub fn vars(&self) -> Arc<InMemoryVars> {
68    self.vars.clone()
69  }
70
71  fn has_vm(&self) -> bool {
72    self.vm.is_some()
73  }
74
75  /// Drop only the VM, keeping the durable `vars` and the session
76  /// record. Used by the warm-VM cap: a capped-out session keeps its
77  /// identity + `vars`, just loses its compiled VM until next call.
78  fn drop_vm(&mut self) {
79    self.vm = None;
80  }
81
82  /// Execute one script against the persistent VM.
83  ///
84  /// Rebuilds the VM when: it does not exist yet, a prior call poisoned
85  /// it (timeout/OOM force-halt), or `epoch` no longer matches the
86  /// browser session it was built against (relaunch/reconnect under the
87  /// same session name — a *different* browser, so any JS handles the
88  /// old `globalThis` cached are dead and must not be reachable). In
89  /// every one of those cases `vars` is untouched.
90  pub async fn run(
91    &mut self,
92    config: ScriptEngineConfig,
93    source: &str,
94    args: &[serde_json::Value],
95    options: RunOptions,
96    context: RunContext,
97    epoch: Option<u64>,
98  ) -> ScriptResult {
99    if self.vm.is_some() && self.epoch != epoch {
100      self.vm = None;
101    }
102
103    if self.vm.is_none() {
104      match Session::create(config, &context).await {
105        Ok(vm) => {
106          self.vm = Some(vm);
107          self.epoch = epoch;
108        },
109        Err(e) => {
110          self.last_used = Instant::now();
111          return ScriptResult::err(e, 0, Vec::new());
112        },
113      }
114    }
115
116    let run = match self.vm.as_ref() {
117      Some(vm) => {
118        // Re-install the durable process registry on every (re)built VM
119        // so a tool's `commands` start/status/stop reaches the SAME
120        // registry that outlives the VM.
121        vm.install_session_procs(self.procs.clone()).await;
122        vm.execute(source, args, options, &context).await
123      },
124      None => {
125        return ScriptResult::err(
126          ScriptError::internal("session vm unexpectedly absent".to_string()),
127          0,
128          Vec::new(),
129        );
130      },
131    };
132
133    // A poisoning fault (timeout interrupt / OOM) left the interpreter
134    // halted at an arbitrary point — discard so the NEXT call rebuilds.
135    // A plain JS throw is not poisoning and keeps the warm VM.
136    if run.poisoned {
137      self.vm = None;
138    }
139    self.last_used = Instant::now();
140    run.result
141  }
142}
143
144/// The set of live sessions plus the retention policy. Cheap to share
145/// (`Arc` it); every method takes `&self`.
146pub struct SessionTable {
147  map: Mutex<HashMap<String, Arc<AsyncMutex<BrowserSession>>>>,
148  /// Upper bound on concurrently-warm VMs (not session records).
149  max_vms: usize,
150  idle_ttl: Option<Duration>,
151}
152
153impl SessionTable {
154  #[must_use]
155  pub fn new(max_vms: usize, idle_ttl: Option<Duration>) -> Self {
156    Self {
157      map: Mutex::new(HashMap::new()),
158      max_vms: max_vms.max(1),
159      idle_ttl,
160    }
161  }
162
163  /// Get (or create) the slot for `name`. Before returning it this:
164  ///
165  /// 1. Reaps idle sessions whole (past `idle_ttl`) — the only implicit
166  ///    end of a logical session; its `vars` go with it.
167  /// 2. If this acquire will build a VM and the warm-VM cap is already
168  ///    met, drops the *VM* of the least-recently-used other session
169  ///    (its session record + `vars` stay; it rebuilds on next use).
170  ///
171  /// A slot currently locked (execution in flight) is never reaped nor
172  /// VM-evicted — the cap is soft; correctness over the bound. The
173  /// returned slot's [`AsyncMutex`] is the per-session execution guard:
174  /// `lock().await` it, build a `RunContext` with its `vars()`, then
175  /// call [`BrowserSession::run`].
176  pub fn acquire(&self, name: &str) -> Arc<AsyncMutex<BrowserSession>> {
177    let mut map = self.map.lock().unwrap_or_else(PoisonError::into_inner);
178
179    if let Some(ttl) = self.idle_ttl {
180      let now = Instant::now();
181      map.retain(|_, slot| match slot.try_lock() {
182        Ok(s) => now.duration_since(s.last_used) < ttl,
183        Err(_) => true, // in flight — keep
184      });
185    }
186
187    // A build happens unless an entry already holds a live VM for this
188    // name (a locked entry owns its own VM lifecycle — don't second-guess).
189    let will_build = match map.get(name).map(|s| s.try_lock()) {
190      Some(Ok(s)) => !s.has_vm(),
191      Some(Err(_)) => false,
192      None => true,
193    };
194
195    if will_build {
196      let mut live: Vec<(String, Instant)> = map
197        .iter()
198        .filter(|(k, _)| k.as_str() != name)
199        .filter_map(|(k, s)| {
200          s.try_lock()
201            .ok()
202            .and_then(|g| g.has_vm().then(|| (k.clone(), g.last_used)))
203        })
204        .collect();
205      if live.len() >= self.max_vms {
206        live.sort_by_key(|(_, t)| *t);
207        if let Some((victim, _)) = live.first()
208          && let Some(slot) = map.get(victim)
209          && let Ok(mut g) = slot.try_lock()
210        {
211          g.drop_vm();
212        }
213      }
214    }
215
216    map
217      .entry(name.to_string())
218      .or_insert_with(|| Arc::new(AsyncMutex::new(BrowserSession::new())))
219      .clone()
220  }
221
222  /// End a logical session (explicit close / browser shutdown): drops
223  /// the slot and its durable `vars`. A later `acquire` starts fresh.
224  pub fn remove(&self, name: &str) {
225    self.map.lock().unwrap_or_else(PoisonError::into_inner).remove(name);
226  }
227
228  /// End every session (server shutdown).
229  pub fn clear(&self) {
230    self.map.lock().unwrap_or_else(PoisonError::into_inner).clear();
231  }
232
233  /// Number of live session records (durable tier), warm or not.
234  #[must_use]
235  pub fn len(&self) -> usize {
236    self.map.lock().unwrap_or_else(PoisonError::into_inner).len()
237  }
238
239  #[must_use]
240  pub fn is_empty(&self) -> bool {
241    self.len() == 0
242  }
243
244  /// Number of sessions currently holding a warm VM (heavy tier).
245  /// Bounded by `max_vms` modulo in-flight soft-cap slack.
246  #[must_use]
247  pub fn live_vm_count(&self) -> usize {
248    self
249      .map
250      .lock()
251      .unwrap_or_else(PoisonError::into_inner)
252      .values()
253      .filter(|s| s.try_lock().map_or(true, |g| g.has_vm()))
254      .count()
255  }
256}
257
258#[cfg(test)]
259mod tests {
260  use std::sync::Arc;
261
262  use super::*;
263  use crate::fs::PathSandbox;
264
265  fn ctx_with(vars: Arc<InMemoryVars>) -> (tempfile::TempDir, RunContext) {
266    let tmp = tempfile::tempdir().expect("tempdir");
267    let ctx = RunContext {
268      vars,
269      sandbox: Arc::new(PathSandbox::new(tmp.path()).expect("sandbox")),
270      artifacts: None,
271      page: None,
272      browser_context: None,
273      request: None,
274      browser: None,
275      plugins: Vec::new(),
276      trusted_modules: false,
277      host: crate::engine::ExtensionHost::Script,
278      caps: crate::engine::ScriptCaps::default(),
279    };
280    (tmp, ctx)
281  }
282
283  async fn run(slot: &Arc<AsyncMutex<BrowserSession>>, src: &str, epoch: Option<u64>) -> ScriptResult {
284    let mut s = slot.lock().await;
285    let vars = s.vars();
286    let (_tmp, ctx) = ctx_with(vars);
287    s.run(
288      ScriptEngineConfig::default(),
289      src,
290      &[],
291      RunOptions::default(),
292      ctx,
293      epoch,
294    )
295    .await
296  }
297
298  #[track_caller]
299  fn assert_ok(actual: &ScriptResult, expected: serde_json::Value) {
300    match &actual.outcome {
301      crate::result::Outcome::Ok { success } => assert_eq!(success.value, expected, "unexpected script value"),
302      crate::result::Outcome::Error { error } => panic!("expected ok {expected}, got error: {error:?}"),
303    }
304  }
305
306  #[tokio::test(flavor = "multi_thread")]
307  async fn vars_persist_but_globalthis_dies_on_browser_swap() {
308    let table = SessionTable::new(8, None);
309    let slot = table.acquire("s");
310
311    let r = run(&slot, "globalThis.k = 7; vars.set('v', 'keep'); return 'a';", Some(1)).await;
312    assert!(r.is_ok(), "{r:?}");
313    let r = run(&slot, "return globalThis.k ?? 'gone';", Some(1)).await;
314    assert_ok(&r, serde_json::json!(7));
315
316    // Browser relaunched (epoch change): VM rebuilt, globalThis gone,
317    // durable vars survive.
318    let r = run(&slot, "return globalThis.k ?? 'gone';", Some(2)).await;
319    assert_ok(&r, serde_json::json!("gone"));
320    let r = run(&slot, "return vars.get('v') ?? 'missing';", Some(2)).await;
321    assert_ok(&r, serde_json::json!("keep"));
322  }
323
324  #[tokio::test(flavor = "multi_thread")]
325  async fn cap_evicts_vm_but_vars_survive_the_eviction() {
326    let table = SessionTable::new(1, None);
327    let a = table.acquire("a");
328    let r = run(&a, "globalThis.g = 1; vars.set('tok', 'abc'); return 1;", None).await;
329    assert!(r.is_ok(), "{r:?}");
330
331    // Acquiring + running "b" needs a VM; cap is 1, so "a"'s VM is
332    // evicted — but its session record + vars stay.
333    let b = table.acquire("b");
334    let _ = run(&b, "return 1;", None).await;
335
336    assert_eq!(table.len(), 2, "both session records live (vars tier)");
337    {
338      let m = table.map.lock().unwrap();
339      let ga = m.get("a").unwrap().try_lock().unwrap();
340      assert!(!ga.has_vm(), "a's VM was evicted under the cap");
341    }
342
343    // "a" rebuilds on next use: globalThis gone, durable vars intact.
344    let r = run(&a, "return globalThis.g ?? 'rebuilt';", None).await;
345    assert_ok(&r, serde_json::json!("rebuilt"));
346    let r = run(&a, "return vars.get('tok') ?? 'lost';", None).await;
347    assert_ok(&r, serde_json::json!("abc"));
348  }
349
350  #[tokio::test(flavor = "multi_thread")]
351  async fn in_flight_vm_is_never_cap_evicted() {
352    let table = SessionTable::new(1, None);
353    let a = table.acquire("a");
354    run(&a, "return 1;", None).await;
355
356    // Hold "a" locked (in flight) while "b" forces cap pressure: "a"'s
357    // VM must NOT be dropped (locked => skipped), soft cap slack.
358    let a_guard = a.lock().await;
359    let b = table.acquire("b");
360    run(&b, "return 1;", None).await;
361    assert!(a_guard.has_vm(), "in-flight VM kept despite cap pressure");
362    drop(a_guard);
363  }
364
365  #[tokio::test(flavor = "multi_thread")]
366  async fn idle_ttl_reaps_whole_session_including_vars() {
367    let table = SessionTable::new(64, Some(Duration::from_millis(60)));
368    let a = table.acquire("a");
369    run(&a, "vars.set('x','1'); return 1;", None).await;
370    tokio::time::sleep(Duration::from_millis(120)).await;
371    let _b = table.acquire("b"); // triggers reap sweep
372    let present = {
373      let m = table.map.lock().unwrap();
374      (m.contains_key("a"), m.contains_key("b"))
375    };
376    assert_eq!(present, (false, true), "idle session reaped whole; fresh kept");
377  }
378
379  #[tokio::test(flavor = "multi_thread")]
380  async fn poison_on_timeout_rebuilds_next_call() {
381    let table = SessionTable::new(8, None);
382    let slot = table.acquire("s");
383    {
384      let mut s = slot.lock().await;
385      let vars = s.vars();
386      let (_tmp, ctx) = ctx_with(vars);
387      let opts = RunOptions {
388        timeout: Some(Duration::from_millis(50)),
389        ..RunOptions::default()
390      };
391      let r = s
392        .run(
393          ScriptEngineConfig::default(),
394          "globalThis.before = 1; while (true) {}",
395          &[],
396          opts,
397          ctx,
398          None,
399        )
400        .await;
401      assert!(r.is_err(), "infinite loop must time out");
402      assert!(!s.has_vm(), "timeout must poison (discard) the VM");
403    }
404    let r = run(&slot, "return globalThis.before ?? 'fresh';", None).await;
405    assert_ok(&r, serde_json::json!("fresh"));
406  }
407}