Skip to main content

chromiumoxide/
runtime_release.rs

1//! Background batcher for `Runtime.releaseObject` calls.
2//!
3//! # Lock-freedom
4//!
5//! Every hot-path operation reduces to one or two relaxed-ordering atomic
6//! ops, with no mutex, no `Once`-style sync after the first init:
7//!
8//! | operation       | primitive                                                |
9//! | --------------- | -------------------------------------------------------- |
10//! | `try_release`   | `OnceLock::get()` (atomic load) + `UnboundedSender::send` (tokio's lock-free MPSC list) |
11//! | `init_worker`   | `OnceLock::get()` fast-path check; `get_or_init` only on the *first* call ever |
12//! | `Drop` guards   | `Arc::clone` (atomic fetch-add) + `try_release`          |
13//!
14//! The **first** `init_worker` call runs `OnceLock::get_or_init`, which
15//! briefly uses the `Once` sync primitive to serialise racing initialisers.
16//! This is a one-time per-process cost; subsequent calls skip it entirely
17//! via an explicit `get()` fast-path check.
18//!
19//! # Deadlock-freedom
20//!
21//! - `Drop` handlers never await, never acquire a lock another async task
22//!   holds, and never allocate beyond an atomic pointer push into the MPSC.
23//! - The worker holds no locks across its awaits; `rx.recv().await` parks
24//!   on a lock-free `Notify`, and `page.execute` goes through the existing
25//!   CDP command pipeline (no new locks introduced).
26//! - If the worker task panics or the runtime shuts down, `try_release`
27//!   continues to succeed (channel send does not require a live receiver);
28//!   releases are silently dropped and V8 reclaims on context teardown.
29//!
30//! # Batching + per-batch spawn
31//!
32//! The dispatcher drains up to [`MAX_BATCH`] pending releases per wake
33//! via `rx.recv_many(..)`, then hands the batch to **one** spawned
34//! worker task that drives a `FuturesUnordered` of every release
35//! concurrently. This gives:
36//!
37//! * ~64× fewer `tokio::spawn` calls than spawn-per-release, while
38//!   preserving concurrent execution inside each batch.
39//! * **Slow-release isolation**: one `page.execute` that hits
40//!   `request_timeout` (e.g. a crashed target) can no longer block
41//!   the dispatcher from picking up subsequent batches — the
42//!   dispatcher's loop body is strictly `drain → spawn batch-worker`
43//!   with no `.await` on cleanup work. Multiple batch workers can be
44//!   in flight simultaneously across runtime worker threads.
45//! * **No Arc<PageInner> retention backlog**: queued `(Page, id)`
46//!   pairs hold Pages alive until drained, so spending 30s inside a
47//!   single `page.execute` used to keep up to 63 other Pages pinned
48//!   unnecessarily. With per-batch spawn, each batch's Pages are
49//!   freed as its `FuturesUnordered` completes, independently of
50//!   other batches.
51
52use crate::page::Page;
53use chromiumoxide_cdp::cdp::js_protocol::runtime::{ReleaseObjectParams, RemoteObjectId};
54use futures_util::stream::{FuturesUnordered, StreamExt};
55use std::sync::OnceLock;
56use tokio::sync::mpsc;
57
58/// Max items drained per dispatcher wake. Under burst the dispatcher
59/// spawns one batch worker per drain; concurrent batches are not
60/// limited by this constant.
61const MAX_BATCH: usize = 64;
62
63static RELEASE_TX: OnceLock<mpsc::UnboundedSender<(Page, RemoteObjectId)>> = OnceLock::new();
64
65/// Spawn the dispatcher and return its sender. Only ever invoked once,
66/// from inside the `OnceLock::get_or_init` closure on the very first
67/// init.
68///
69/// The dispatcher is strictly a `recv_many + spawn` loop: it waits
70/// for at least one release, then hands the whole batch to a spawned
71/// worker that drives a `FuturesUnordered` of `page.execute` calls.
72/// The dispatcher never `.await`s on CDP work, so a slow or hung
73/// release cannot block it from picking up the next batch.
74fn spawn_worker() -> mpsc::UnboundedSender<(Page, RemoteObjectId)> {
75    let (tx, mut rx) = mpsc::unbounded_channel::<(Page, RemoteObjectId)>();
76
77    tokio::spawn(async move {
78        // Reused across iterations; `mem::replace` swaps in a fresh
79        // capacity-sized buffer so the next `recv_many` never grows
80        // incrementally.
81        let mut batch: Vec<(Page, RemoteObjectId)> = Vec::with_capacity(MAX_BATCH);
82        loop {
83            let n = rx.recv_many(&mut batch, MAX_BATCH).await;
84            if n == 0 {
85                break; // channel closed — no more producers
86            }
87            let next_cap = n.min(MAX_BATCH);
88            let releases: Vec<(Page, RemoteObjectId)> =
89                std::mem::replace(&mut batch, Vec::with_capacity(next_cap));
90            tokio::spawn(async move {
91                let mut in_flight: FuturesUnordered<_> = releases
92                    .into_iter()
93                    .map(|(page, id)| async move {
94                        let _ = page.execute(ReleaseObjectParams::new(id)).await;
95                    })
96                    .collect();
97                // Drain to completion; each resolved future is dropped
98                // immediately, freeing its `Arc<PageInner>` before the
99                // batch as a whole finishes.
100                while in_flight.next().await.is_some() {}
101            });
102        }
103    });
104
105    tx
106}
107
108/// Ensure the background release worker is running.
109///
110/// Hot path is a single atomic `OnceLock::get()` load; only the very first
111/// call ever touches `get_or_init` (and thus the `Once` sync primitive).
112/// Must be invoked from a tokio runtime context on the first call.
113#[inline]
114pub fn init_worker() {
115    // Explicit fast path: one atomic load, zero sync primitives after the
116    // first init.
117    if RELEASE_TX.get().is_some() {
118        return;
119    }
120    let _ = RELEASE_TX.get_or_init(spawn_worker);
121}
122
123/// Returns `true` if the worker has been initialised in this process.
124#[inline]
125pub fn worker_inited() -> bool {
126    RELEASE_TX.get().is_some()
127}
128
129/// Enqueue a remote-object release for background processing.
130///
131/// Lock-free (one atomic load + one wait-free MPSC push), safe to invoke
132/// from a `Drop` implementation on any thread.  If the worker has not yet
133/// been initialised the release is silently dropped; V8 reclaims the
134/// object on the next execution-context teardown.
135#[inline]
136pub fn try_release(page: Page, object_id: RemoteObjectId) {
137    if let Some(tx) = RELEASE_TX.get() {
138        let _ = tx.send((page, object_id));
139    }
140}