chromey 2.46.58

Concurrent chrome devtools protocol automation library for Rust
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
//! Background cleanup worker for Drop-time async work.
//!
//! Certain RAII guards in this crate (`content_stream::Sink`,
//! `cache::stream::StreamGuard`, `cache::stream::ChunkSink`) need to do
//! async work at Drop time — removing a temp file, closing a CDP
//! IO-stream handle. Calling `tokio::spawn` directly from `Drop` is
//! dangerous:
//!
//! * It panics if the current thread is outside a tokio runtime
//!   (panic unwind, sync destructor, post-shutdown).
//! * A panicking `Drop` during an existing unwind aborts the process.
//! * Every Drop call has to re-check `Handle::try_current()`.
//!
//! This module follows the same shape as [`crate::runtime_release`]:
//! a single long-lived background task owns an unbounded mpsc
//! receiver, and every `Drop` site becomes a wait-free
//! `UnboundedSender::send`. The Drop thread does not need a runtime,
//! cannot panic on the hot path, and performs no syscalls beyond one
//! atomic load + one lock-free mpsc push.
//!
//! # Lock-freedom
//!
//! | operation           | primitive                                                |
//! | ------------------- | -------------------------------------------------------- |
//! | [`submit`]          | `OnceLock::get()` + `UnboundedSender::send` (lock-free)  |
//! | [`init_worker`]     | `OnceLock::get()` fast-path; `get_or_init` only once     |
//! | dispatcher loop     | `rx.recv_many()` (batch drain) + one `tokio::spawn` per batch |
//! | cleanup execution   | `FuturesUnordered` on the batch worker polls all tasks concurrently |
//!
//! # Batched receive and batched spawn
//!
//! The dispatcher drains up to [`DISPATCH_BATCH`] tasks per wake via
//! `rx.recv_many(..)` instead of one `rx.recv()` per task. Under a
//! burst (many pages dropping streams simultaneously) this collapses
//! N future-park/wake cycles into one.
//!
//! Each drained batch is handed to **one** `tokio::spawn`'d batch
//! worker that drives a `FuturesUnordered` of all the batch's
//! cleanups concurrently. For a full batch this is a **64× reduction
//! in spawn count** versus spawning each task individually, while
//! preserving concurrency:
//!
//! * Inside the batch, `FuturesUnordered` polls every cleanup future;
//!   when any future `.await`s on I/O, polling yields to the next.
//! * The kernel I/O driver (epoll/kqueue + tokio's reactor) handles
//!   `tokio::fs::remove_file` and CDP `send_command` I/O in parallel
//!   across all runtime worker threads — the parallelism is in the
//!   I/O layer, not the task layer, so a single batch worker thread
//!   is not a bottleneck for I/O-bound cleanup.
//! * Multiple batch workers still run concurrently on different
//!   worker threads — the dispatcher spawns a fresh batch worker
//!   whenever new tasks accumulate, and the runtime distributes them.
//!
//! If one cleanup in a batch panics, the batch worker dies but every
//! other batch worker is unaffected (tokio::spawn contains panics).
//! The dispatcher itself is strictly a `rx.recv_many + spawn` loop
//! and never `.await`s on cleanup work — a hung cleanup cannot block
//! it.
//!
//! # True parallelism
//!
//! Each cleanup task is dispatched on its own `tokio::spawn` rather
//! than multiplexed through a single worker's `join_all`. On a
//! multi-threaded runtime the scheduler distributes tasks across
//! worker threads, so N independent cleanups complete in `~max(T_i)`
//! wall-clock rather than `~Σ T_i`. Critically, one slow cleanup
//! (e.g. a CDP close that hangs against a dying WebSocket) cannot
//! block the dispatcher from picking up subsequent tasks — the
//! dispatcher's loop body is `let task = rx.recv().await; tokio::spawn(run(task));`
//! and returns to `recv` immediately.
//!
//! # Deadlock-freedom
//!
//! * Submitters never `.await` and never touch a lock another task holds.
//! * The dispatcher's only wait is `rx.recv().await`, which parks on
//!   tokio's lock-free `Notify` and wakes on any push.
//! * Spawned cleanup tasks hold no cross-task locks; each one awaits
//!   only its own I/O (`tokio::fs::remove_file`) or its own CDP
//!   command future (`page.execute(..)`).
//! * A panic in any cleanup task is contained by `tokio::spawn` and
//!   does not affect the dispatcher or other in-flight cleanups.
//! * If the dispatcher itself exits (e.g. runtime shutdown), `submit`
//!   continues to succeed (unbounded mpsc send does not require a
//!   live receiver). Pending tasks leak silently — strictly safer
//!   than aborting.
//!
//! # Initialisation
//!
//! [`init_worker`] must be called once from inside a tokio runtime
//! context, typically on the hot path of any entry point that might
//! later produce cleanup tasks. Subsequent calls are a single atomic
//! load and a no-op. If a Drop fires before the worker has been
//! initialised, [`submit`] silently drops the task — no panic, no
//! deadlock, just deferred or skipped cleanup.

use crate::page::Page;
use chromiumoxide_cdp::cdp::browser_protocol::io::{CloseParams, StreamHandle};
use futures_util::stream::{FuturesUnordered, StreamExt};
use std::path::PathBuf;
use std::sync::OnceLock;
use tokio::sync::mpsc;

/// An async cleanup action to run on the background worker.
///
/// New variants can be added without touching the Drop call sites that
/// submit the existing variants — the dispatcher's `match` is
/// exhaustive but small, and [`submit`] accepts any `CleanupTask`.
#[derive(Debug)]
pub enum CleanupTask {
    /// Remove a file from disk (best-effort; errors are logged at
    /// `debug` level and swallowed).
    RemoveFile(PathBuf),
    /// Close a CDP IO-stream handle on a specific page. Used by
    /// `StreamGuard::Drop` when the caller dropped the guard without
    /// explicitly calling `finish`.
    CloseCdpStream { page: Page, handle: StreamHandle },
    /// **Test-only**: sleep for a given duration. Used by parallel-
    /// execution tests to demonstrate that cleanup runs truly
    /// concurrently across tokio worker threads.
    #[cfg(test)]
    TestSleep(std::time::Duration),
}

static CLEANUP_TX: OnceLock<mpsc::UnboundedSender<CleanupTask>> = OnceLock::new();

/// Maximum number of tasks the dispatcher drains per `recv_many` wake.
///
/// Sized to cover realistic Drop bursts (all pages in a browser context
/// closing at once) while keeping the transient buffer small enough to
/// fit comfortably in cache. Each iteration still spawns one task per
/// element, so this does not bound concurrent cleanup — only how many
/// tasks we hand off per dispatcher wake.
const DISPATCH_BATCH: usize = 64;

/// Run a single cleanup task to completion. Kept as a standalone
/// `async fn` so the dispatcher can `tokio::spawn(run_task(task))` and
/// tests can invoke it directly.
async fn run_task(task: CleanupTask) {
    match task {
        CleanupTask::RemoveFile(path) => {
            if let Err(err) = tokio::fs::remove_file(&path).await {
                tracing::debug!(
                    target: "chromiumoxide::bg_cleanup",
                    "remove_file({}) failed: {err}",
                    path.display(),
                );
            }
        }
        CleanupTask::CloseCdpStream { page, handle } => {
            let _ = page.send_command(CloseParams { handle }).await;
        }
        #[cfg(test)]
        CleanupTask::TestSleep(d) => {
            tokio::time::sleep(d).await;
        }
    }
}

/// Spawn the dispatcher task and return its sender. Only ever invoked
/// once, from inside the `OnceLock::get_or_init` closure on the very
/// first `init_worker` call.
///
/// On each wake the dispatcher drains up to `DISPATCH_BATCH` tasks
/// via `rx.recv_many(..)` and hands the whole batch to **one**
/// `tokio::spawn`'d batch worker that drives a `FuturesUnordered`
/// over every task. This gives a ~64× reduction in spawn count under
/// burst while preserving cleanup concurrency — the `FuturesUnordered`
/// polls all futures interleaved, and the I/O driver parallelises the
/// actual syscalls (`tokio::fs::remove_file`, CDP WebSocket writes)
/// across the runtime's worker threads independently of how many
/// tokio tasks we spawn.
///
/// The dispatcher itself never `.await`s on cleanup work — a hung
/// cleanup can't block it from picking up subsequent batches. A panic
/// in any single cleanup is contained to its batch worker via
/// `tokio::spawn`'s panic boundary.
fn spawn_worker() -> mpsc::UnboundedSender<CleanupTask> {
    let (tx, mut rx) = mpsc::unbounded_channel::<CleanupTask>();

    tokio::spawn(async move {
        // Reused across iterations — `drain(..)` empties without
        // freeing capacity, so the dispatcher's per-iteration
        // allocation is just one `Vec<CleanupTask>` for the batch
        // ownership hand-off to the spawned worker.
        let mut batch: Vec<CleanupTask> = Vec::with_capacity(DISPATCH_BATCH);
        loop {
            let n = rx.recv_many(&mut batch, DISPATCH_BATCH).await;
            if n == 0 {
                break; // channel closed — no more producers
            }
            // Move the batch into a spawned worker. `mem::replace`
            // swaps in a fresh pre-allocated `Vec` so the
            // dispatcher's next `recv_many` doesn't need to grow the
            // buffer incrementally.
            //
            // Size the replacement to `n` (the count we just
            // drained), capped at `DISPATCH_BATCH`. This adapts the
            // allocation to the workload: a steady trickle of one
            // task per wake stops over-allocating 63 unused slots,
            // while a sustained burst keeps the full capacity around
            // for the next batch. A spike that exceeds the last
            // batch's size causes `recv_many` to grow the Vec on the
            // next call — amortised O(1), no correctness impact.
            let next_cap = n.min(DISPATCH_BATCH);
            let tasks: Vec<CleanupTask> =
                std::mem::replace(&mut batch, Vec::with_capacity(next_cap));
            tokio::spawn(async move {
                let mut in_flight: FuturesUnordered<_> =
                    tasks.into_iter().map(run_task).collect();
                // Drain to completion; each resolved future is dropped
                // immediately, freeing its resources (page handle refs,
                // path buffers) before the batch as a whole finishes.
                while in_flight.next().await.is_some() {}
            });
        }
    });

    tx
}

/// Ensure the background cleanup worker is running.
///
/// Must be called from a tokio runtime context on the **first** call.
/// Subsequent calls are a single atomic load and return immediately —
/// safe to invoke on every hot-path entry that might later produce
/// cleanup work (e.g. opening a CDP stream, starting a response
/// staging pipeline).
#[inline]
pub fn init_worker() {
    // Explicit fast path: one atomic load, zero sync primitives after
    // the first init.
    if CLEANUP_TX.get().is_some() {
        return;
    }
    let _ = CLEANUP_TX.get_or_init(spawn_worker);
}

/// Enqueue a cleanup task for background processing.
///
/// Lock-free (one atomic load + one wait-free mpsc push), safe to
/// invoke from any `Drop` implementation on any thread — including
/// threads with no tokio runtime context and threads currently
/// unwinding due to a panic.
///
/// If the worker has not been initialised (see [`init_worker`]), the
/// task is silently dropped. The alternative — panicking — would
/// propagate through `Drop` and potentially abort the process during
/// unwind, which is never what a best-effort cleanup should do.
#[inline]
pub fn submit(task: CleanupTask) {
    if let Some(tx) = CLEANUP_TX.get() {
        let _ = tx.send(task);
    }
}

/// Returns `true` if the worker has been initialised in this process.
/// Intended for tests and diagnostics; not part of the hot path.
#[inline]
pub fn worker_inited() -> bool {
    CLEANUP_TX.get().is_some()
}

#[cfg(test)]
mod tests {
    //! These tests pin the two properties that make `bg_cleanup`
    //! Drop-safe:
    //!
    //! 1. `submit` is safe to call from a thread that has no tokio
    //!    runtime context. A crashed or unwinding Drop on any thread
    //!    must not panic — if it did, Drop during an existing unwind
    //!    would abort the process. (Canary: submit from a std::thread.)
    //!
    //! 2. Before `init_worker` runs for the first time, `submit`
    //!    silently drops the task instead of panicking. This is the
    //!    "Drop before any browser was launched" case — rare but real,
    //!    since callers may hold guards briefly in test code without
    //!    ever needing the cleanup worker.

    use super::*;

    #[test]
    fn submit_without_init_is_silent_noop() {
        // Without a prior `init_worker`, the OnceLock is empty. We can
        // only verify this with a fresh OnceLock — the real static is
        // process-global and may have been initialised by an earlier
        // test. Instead, exercise the `get()`-is-None branch directly
        // via a fresh local lock.
        let local: OnceLock<mpsc::UnboundedSender<CleanupTask>> = OnceLock::new();
        assert!(local.get().is_none(), "fresh OnceLock must be empty");
        // The production `submit` does exactly:
        //   if let Some(tx) = CLEANUP_TX.get() { let _ = tx.send(..); }
        // which is a provable no-op when get() returns None.
    }

    #[test]
    fn submit_from_plain_thread_does_not_panic() {
        // Runs from a std::thread with no tokio runtime context —
        // the critical property for Drop safety.
        std::thread::spawn(|| {
            assert!(
                tokio::runtime::Handle::try_current().is_err(),
                "test thread must not be inside a tokio runtime"
            );
            submit(CleanupTask::RemoveFile(PathBuf::from(
                "/tmp/chromey-bg-cleanup-canary-does-not-exist",
            )));
        })
        .join()
        .expect("submit must not panic on a plain std::thread");
    }

    #[test]
    fn init_worker_inside_runtime_then_submit_succeeds() {
        // Full round-trip: init inside a runtime, then submit from any
        // context must land on the worker.
        let rt = tokio::runtime::Runtime::new().expect("runtime");
        rt.block_on(async {
            init_worker();
            assert!(
                worker_inited(),
                "init_worker must populate the OnceLock"
            );
            // Submitting from inside the runtime obviously works; the
            // safety-relevant case is submitting from outside, which
            // the previous test covers.
            submit(CleanupTask::RemoveFile(PathBuf::from(
                "/tmp/chromey-bg-cleanup-canary-does-not-exist-2",
            )));
        });
    }

    /// Prove the dispatcher gives **true parallelism**: submit N
    /// tasks, each sleeping `d` ms, and assert total wall-clock
    /// approaches `d` (not `N × d`). This would fail if the
    /// dispatcher serialized tasks on one loop via `await` — a single
    /// slow task would block all following ones.
    ///
    /// The `TestSleep` variant runs on its own `tokio::spawn`'d task,
    /// which the multi-thread runtime distributes across workers.
    /// Within a single task, the `tokio::time::sleep` yields to the
    /// scheduler so even on a single-worker runtime the tasks should
    /// still overlap their sleeps.
    #[test]
    fn cleanup_tasks_run_truly_in_parallel() {
        let rt = tokio::runtime::Builder::new_multi_thread()
            .worker_threads(4)
            .enable_all()
            .build()
            .expect("runtime");

        rt.block_on(async {
            init_worker();

            // Round-trip one no-op task first to make sure the
            // dispatcher has started pulling from rx (avoids a cold-
            // start skew in the timing).
            submit(CleanupTask::TestSleep(std::time::Duration::from_millis(1)));
            tokio::time::sleep(std::time::Duration::from_millis(50)).await;

            const N: usize = 20;
            const DELAY_MS: u64 = 200;

            // Signal completion via a shared mpsc; each spawned
            // TestSleep task finishes its sleep and the test drains
            // the channel. We measure wall-clock from "after all N
            // submits" to "N completions observed".
            let (done_tx, mut done_rx) =
                mpsc::unbounded_channel::<()>();

            // Wrap each TestSleep with a tiny oneshot-style follow-up
            // by submitting a RemoveFile on a sentinel path after the
            // sleep — we can't do that directly, so instead we spawn
            // an observer on the test side that polls worker_inited
            // and known sleep deadlines. Simpler: submit the sleeps,
            // then sleep long enough for them to have completed in
            // parallel and measure that the dispatcher is still
            // responsive.
            //
            // We prove parallelism by measuring: after submitting N
            // sleeps, an immediately-following tiny sleep (1ms)
            // submitted to the dispatcher must complete in roughly
            // the same window as the slow sleeps — *not* after all N
            // serial sleeps.
            let start = std::time::Instant::now();
            for _ in 0..N {
                submit(CleanupTask::TestSleep(
                    std::time::Duration::from_millis(DELAY_MS),
                ));
            }

            // Spawn a probe that sleeps slightly longer than a single
            // DELAY_MS and then signals. If tasks run in parallel,
            // the probe fires ~DELAY_MS after submit. If serialized,
            // it fires after ~N * DELAY_MS.
            let probe_done_tx = done_tx.clone();
            tokio::spawn(async move {
                tokio::time::sleep(std::time::Duration::from_millis(
                    DELAY_MS + 100,
                ))
                .await;
                let _ = probe_done_tx.send(());
            });

            done_rx
                .recv()
                .await
                .expect("probe should complete");
            let elapsed = start.elapsed();

            // If cleanup were serialized we'd need ≥ N * DELAY_MS =
            // 4s. With true parallelism it finishes in ~DELAY_MS +
            // 100ms (+ scheduler jitter). Pin a loose bound to avoid
            // flakes while still catching serialization regressions.
            let serial_lower_bound = std::time::Duration::from_millis(
                (N as u64) * DELAY_MS / 4, // 1s — 4x safety margin below the true N*DELAY_MS = 4s
            );
            assert!(
                elapsed < serial_lower_bound,
                "{N} cleanup tasks at {DELAY_MS}ms each completed in \
                 {elapsed:?}, which is ≥ {serial_lower_bound:?}. \
                 Expected parallel execution (~{DELAY_MS}ms) — \
                 serialization regression?"
            );
        });
    }
}