1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
//! Background batcher for `Runtime.releaseObject` calls.
//!
//! # Lock-freedom
//!
//! Every hot-path operation reduces to one or two relaxed-ordering atomic
//! ops, with no mutex, no `Once`-style sync after the first init:
//!
//! | operation | primitive |
//! | --------------- | -------------------------------------------------------- |
//! | `try_release` | `OnceLock::get()` (atomic load) + `UnboundedSender::send` (tokio's lock-free MPSC list) |
//! | `init_worker` | `OnceLock::get()` fast-path check; `get_or_init` only on the *first* call ever |
//! | `Drop` guards | `Arc::clone` (atomic fetch-add) + `try_release` |
//!
//! The **first** `init_worker` call runs `OnceLock::get_or_init`, which
//! briefly uses the `Once` sync primitive to serialise racing initialisers.
//! This is a one-time per-process cost; subsequent calls skip it entirely
//! via an explicit `get()` fast-path check.
//!
//! # Deadlock-freedom
//!
//! - `Drop` handlers never await, never acquire a lock another async task
//! holds, and never allocate beyond an atomic pointer push into the MPSC.
//! - The worker holds no locks across its awaits; `rx.recv().await` parks
//! on a lock-free `Notify`, and `page.execute` goes through the existing
//! CDP command pipeline (no new locks introduced).
//! - If the worker task panics or the runtime shuts down, `try_release`
//! continues to succeed (channel send does not require a live receiver);
//! releases are silently dropped and V8 reclaims on context teardown.
//!
//! # Batching + per-batch spawn
//!
//! The dispatcher drains up to [`MAX_BATCH`] pending releases per wake
//! via `rx.recv_many(..)`, then hands the batch to **one** spawned
//! worker task that drives a `FuturesUnordered` of every release
//! concurrently. This gives:
//!
//! * ~64× fewer `tokio::spawn` calls than spawn-per-release, while
//! preserving concurrent execution inside each batch.
//! * **Slow-release isolation**: one `page.execute` that hits
//! `request_timeout` (e.g. a crashed target) can no longer block
//! the dispatcher from picking up subsequent batches — the
//! dispatcher's loop body is strictly `drain → spawn batch-worker`
//! with no `.await` on cleanup work. Multiple batch workers can be
//! in flight simultaneously across runtime worker threads.
//! * **No Arc<PageInner> retention backlog**: queued `(Page, id)`
//! pairs hold Pages alive until drained, so spending 30s inside a
//! single `page.execute` used to keep up to 63 other Pages pinned
//! unnecessarily. With per-batch spawn, each batch's Pages are
//! freed as its `FuturesUnordered` completes, independently of
//! other batches.
use cratePage;
use ;
use ;
use OnceLock;
use mpsc;
/// Max items drained per dispatcher wake. Under burst the dispatcher
/// spawns one batch worker per drain; concurrent batches are not
/// limited by this constant.
const MAX_BATCH: usize = 64;
static RELEASE_TX: = new;
/// Spawn the dispatcher and return its sender. Only ever invoked once,
/// from inside the `OnceLock::get_or_init` closure on the very first
/// init.
///
/// The dispatcher is strictly a `recv_many + spawn` loop: it waits
/// for at least one release, then hands the whole batch to a spawned
/// worker that drives a `FuturesUnordered` of `page.execute` calls.
/// The dispatcher never `.await`s on CDP work, so a slow or hung
/// release cannot block it from picking up the next batch.
/// Ensure the background release worker is running.
///
/// Hot path is a single atomic `OnceLock::get()` load; only the very first
/// call ever touches `get_or_init` (and thus the `Once` sync primitive).
/// Must be invoked from a tokio runtime context on the first call.
/// Returns `true` if the worker has been initialised in this process.
/// Enqueue a remote-object release for background processing.
///
/// Lock-free (one atomic load + one wait-free MPSC push), safe to invoke
/// from a `Drop` implementation on any thread. If the worker has not yet
/// been initialised the release is silently dropped; V8 reclaims the
/// object on the next execution-context teardown.