chromiumoxide/runtime_release.rs
1//! Background batcher for `Runtime.releaseObject` calls.
2//!
3//! # Lock-freedom
4//!
5//! Every hot-path operation reduces to one or two relaxed-ordering atomic
6//! ops, with no mutex, no `Once`-style sync after the first init:
7//!
8//! | operation | primitive |
9//! | --------------- | -------------------------------------------------------- |
10//! | `try_release` | `OnceLock::get()` (atomic load) + `UnboundedSender::send` (tokio's lock-free MPSC list) |
11//! | `init_worker` | `OnceLock::get()` fast-path check; `get_or_init` only on the *first* call ever |
12//! | `Drop` guards | `Arc::clone` (atomic fetch-add) + `try_release` |
13//!
14//! The **first** `init_worker` call runs `OnceLock::get_or_init`, which
15//! briefly uses the `Once` sync primitive to serialise racing initialisers.
16//! This is a one-time per-process cost; subsequent calls skip it entirely
17//! via an explicit `get()` fast-path check.
18//!
19//! # Deadlock-freedom
20//!
21//! - `Drop` handlers never await, never acquire a lock another async task
22//! holds, and never allocate beyond an atomic pointer push into the MPSC.
23//! - The worker holds no locks across its awaits; `rx.recv().await` parks
24//! on a lock-free `Notify`, and `page.execute` goes through the existing
25//! CDP command pipeline (no new locks introduced).
26//! - If the worker task panics or the runtime shuts down, `try_release`
27//! continues to succeed (channel send does not require a live receiver);
28//! releases are silently dropped and V8 reclaims on context teardown.
29//!
30//! # Batching
31//!
32//! The worker drains up to [`MAX_BATCH`] pending releases per round and
33//! fires them **concurrently** through `futures::join_all`, multiplexing on
34//! the shared CDP connection rather than stalling per call.
35
36use crate::page::Page;
37use chromiumoxide_cdp::cdp::js_protocol::runtime::{ReleaseObjectParams, RemoteObjectId};
38use std::sync::OnceLock;
39use tokio::sync::mpsc;
40
41/// Max items drained per worker round. Cap keeps a burst from holding a
42/// giant `Vec` while we issue concurrent CDP commands.
43const MAX_BATCH: usize = 64;
44
45static RELEASE_TX: OnceLock<mpsc::UnboundedSender<(Page, RemoteObjectId)>> = OnceLock::new();
46
47/// Spawn the worker and return its sender. Only ever invoked once, from
48/// inside the `OnceLock::get_or_init` closure on the very first init.
49fn spawn_worker() -> mpsc::UnboundedSender<(Page, RemoteObjectId)> {
50 let (tx, mut rx) = mpsc::unbounded_channel::<(Page, RemoteObjectId)>();
51
52 tokio::spawn(async move {
53 let mut batch: Vec<(Page, RemoteObjectId)> = Vec::with_capacity(MAX_BATCH);
54 loop {
55 // Block until at least one release shows up.
56 let first = match rx.recv().await {
57 Some(v) => v,
58 None => break, // channel closed — no more producers
59 };
60 batch.push(first);
61
62 // Drain up to MAX_BATCH-1 more without waiting.
63 while batch.len() < MAX_BATCH {
64 match rx.try_recv() {
65 Ok(v) => batch.push(v),
66 Err(_) => break,
67 }
68 }
69
70 // Fire all release commands concurrently. Each `page.execute`
71 // multiplexes onto the shared CDP connection; `join_all` lets
72 // Chrome process them in parallel rather than strict serial.
73 let futs = batch.drain(..).map(|(page, id)| async move {
74 let _ = page.execute(ReleaseObjectParams::new(id)).await;
75 });
76 futures_util::future::join_all(futs).await;
77 }
78 });
79
80 tx
81}
82
83/// Ensure the background release worker is running.
84///
85/// Hot path is a single atomic `OnceLock::get()` load; only the very first
86/// call ever touches `get_or_init` (and thus the `Once` sync primitive).
87/// Must be invoked from a tokio runtime context on the first call.
88#[inline]
89pub fn init_worker() {
90 // Explicit fast path: one atomic load, zero sync primitives after the
91 // first init.
92 if RELEASE_TX.get().is_some() {
93 return;
94 }
95 let _ = RELEASE_TX.get_or_init(spawn_worker);
96}
97
98/// Returns `true` if the worker has been initialised in this process.
99#[inline]
100pub fn worker_inited() -> bool {
101 RELEASE_TX.get().is_some()
102}
103
104/// Enqueue a remote-object release for background processing.
105///
106/// Lock-free (one atomic load + one wait-free MPSC push), safe to invoke
107/// from a `Drop` implementation on any thread. If the worker has not yet
108/// been initialised the release is silently dropped; V8 reclaims the
109/// object on the next execution-context teardown.
110#[inline]
111pub fn try_release(page: Page, object_id: RemoteObjectId) {
112 if let Some(tx) = RELEASE_TX.get() {
113 let _ = tx.send((page, object_id));
114 }
115}