chromiumoxide/runtime_release.rs
1//! Background batcher for `Runtime.releaseObject` calls.
2//!
3//! # Lock-freedom
4//!
5//! Every hot-path operation reduces to one or two relaxed-ordering atomic
6//! ops, with no mutex, no `Once`-style sync after the first init:
7//!
8//! | operation | primitive |
9//! | --------------- | -------------------------------------------------------- |
10//! | `try_release` | `OnceLock::get()` (atomic load) + `UnboundedSender::send` (tokio's lock-free MPSC list) |
11//! | `init_worker` | `OnceLock::get()` fast-path check; `get_or_init` only on the *first* call ever |
12//! | `Drop` guards | `Arc::clone` (atomic fetch-add) + `try_release` |
13//!
14//! The **first** `init_worker` call runs `OnceLock::get_or_init`, which
15//! briefly uses the `Once` sync primitive to serialise racing initialisers.
16//! This is a one-time per-process cost; subsequent calls skip it entirely
17//! via an explicit `get()` fast-path check.
18//!
19//! # Deadlock-freedom
20//!
21//! - `Drop` handlers never await, never acquire a lock another async task
22//! holds, and never allocate beyond an atomic pointer push into the MPSC.
23//! - The worker holds no locks across its awaits; `rx.recv().await` parks
24//! on a lock-free `Notify`, and `page.execute` goes through the existing
25//! CDP command pipeline (no new locks introduced).
26//! - If the worker task panics or the runtime shuts down, `try_release`
27//! continues to succeed (channel send does not require a live receiver);
28//! releases are silently dropped and V8 reclaims on context teardown.
29//!
30//! # Batching + per-batch spawn
31//!
32//! The dispatcher drains up to [`MAX_BATCH`] pending releases per wake
33//! via `rx.recv_many(..)`, then hands the batch to **one** spawned
34//! worker task that drives a `FuturesUnordered` of every release
35//! concurrently. This gives:
36//!
37//! * ~64× fewer `tokio::spawn` calls than spawn-per-release, while
38//! preserving concurrent execution inside each batch.
39//! * **Slow-release isolation**: one `page.execute` that hits
40//! `request_timeout` (e.g. a crashed target) can no longer block
41//! the dispatcher from picking up subsequent batches — the
42//! dispatcher's loop body is strictly `drain → spawn batch-worker`
43//! with no `.await` on cleanup work. Multiple batch workers can be
44//! in flight simultaneously across runtime worker threads.
45//! * **No Arc<PageInner> retention backlog**: queued `(Page, id)`
46//! pairs hold Pages alive until drained, so spending 30s inside a
47//! single `page.execute` used to keep up to 63 other Pages pinned
48//! unnecessarily. With per-batch spawn, each batch's Pages are
49//! freed as its `FuturesUnordered` completes, independently of
50//! other batches.
51
52use crate::page::Page;
53use chromiumoxide_cdp::cdp::js_protocol::runtime::{ReleaseObjectParams, RemoteObjectId};
54use futures_util::stream::{FuturesUnordered, StreamExt};
55use std::sync::OnceLock;
56use tokio::sync::mpsc;
57
58/// Max items drained per dispatcher wake. Under burst the dispatcher
59/// spawns one batch worker per drain; concurrent batches are not
60/// limited by this constant.
61const MAX_BATCH: usize = 64;
62
63static RELEASE_TX: OnceLock<mpsc::UnboundedSender<(Page, RemoteObjectId)>> = OnceLock::new();
64
65/// Spawn the dispatcher and return its sender. Only ever invoked once,
66/// from inside the `OnceLock::get_or_init` closure on the very first
67/// init.
68///
69/// The dispatcher is strictly a `recv_many + spawn` loop: it waits
70/// for at least one release, then hands the whole batch to a spawned
71/// worker that drives a `FuturesUnordered` of `page.execute` calls.
72/// The dispatcher never `.await`s on CDP work, so a slow or hung
73/// release cannot block it from picking up the next batch.
74fn spawn_worker() -> mpsc::UnboundedSender<(Page, RemoteObjectId)> {
75 let (tx, mut rx) = mpsc::unbounded_channel::<(Page, RemoteObjectId)>();
76
77 tokio::spawn(async move {
78 // Reused across iterations; `mem::replace` swaps in a fresh
79 // capacity-sized buffer so the next `recv_many` never grows
80 // incrementally.
81 let mut batch: Vec<(Page, RemoteObjectId)> = Vec::with_capacity(MAX_BATCH);
82 loop {
83 let n = rx.recv_many(&mut batch, MAX_BATCH).await;
84 if n == 0 {
85 break; // channel closed — no more producers
86 }
87 let next_cap = n.min(MAX_BATCH);
88 let releases: Vec<(Page, RemoteObjectId)> =
89 std::mem::replace(&mut batch, Vec::with_capacity(next_cap));
90 tokio::spawn(async move {
91 let mut in_flight: FuturesUnordered<_> = releases
92 .into_iter()
93 .map(|(page, id)| async move {
94 let _ = page.execute(ReleaseObjectParams::new(id)).await;
95 })
96 .collect();
97 // Drain to completion; each resolved future is dropped
98 // immediately, freeing its `Arc<PageInner>` before the
99 // batch as a whole finishes.
100 while in_flight.next().await.is_some() {}
101 });
102 }
103 });
104
105 tx
106}
107
108/// Ensure the background release worker is running.
109///
110/// Hot path is a single atomic `OnceLock::get()` load; only the very first
111/// call ever touches `get_or_init` (and thus the `Once` sync primitive).
112/// Must be invoked from a tokio runtime context on the first call.
113#[inline]
114pub fn init_worker() {
115 // Explicit fast path: one atomic load, zero sync primitives after the
116 // first init.
117 if RELEASE_TX.get().is_some() {
118 return;
119 }
120 let _ = RELEASE_TX.get_or_init(spawn_worker);
121}
122
123/// Returns `true` if the worker has been initialised in this process.
124#[inline]
125pub fn worker_inited() -> bool {
126 RELEASE_TX.get().is_some()
127}
128
129/// Enqueue a remote-object release for background processing.
130///
131/// Lock-free (one atomic load + one wait-free MPSC push), safe to invoke
132/// from a `Drop` implementation on any thread. If the worker has not yet
133/// been initialised the release is silently dropped; V8 reclaims the
134/// object on the next execution-context teardown.
135#[inline]
136pub fn try_release(page: Page, object_id: RemoteObjectId) {
137 if let Some(tx) = RELEASE_TX.get() {
138 let _ = tx.send((page, object_id));
139 }
140}