Skip to main content

chromiumoxide/
bg_cleanup.rs

1//! Background cleanup worker for Drop-time async work.
2//!
3//! Certain RAII guards in this crate (`content_stream::Sink`,
4//! `cache::stream::StreamGuard`, `cache::stream::ChunkSink`) need to do
5//! async work at Drop time — removing a temp file, closing a CDP
6//! IO-stream handle. Calling `tokio::spawn` directly from `Drop` is
7//! dangerous:
8//!
9//! * It panics if the current thread is outside a tokio runtime
10//!   (panic unwind, sync destructor, post-shutdown).
11//! * A panicking `Drop` during an existing unwind aborts the process.
12//! * Every Drop call has to re-check `Handle::try_current()`.
13//!
14//! This module follows the same shape as [`crate::runtime_release`]:
15//! a single long-lived background task owns an unbounded mpsc
16//! receiver, and every `Drop` site becomes a wait-free
17//! `UnboundedSender::send`. The Drop thread does not need a runtime,
18//! cannot panic on the hot path, and performs no syscalls beyond one
19//! atomic load + one lock-free mpsc push.
20//!
21//! # Lock-freedom
22//!
23//! | operation           | primitive                                                |
24//! | ------------------- | -------------------------------------------------------- |
25//! | [`submit`]          | `OnceLock::get()` + `UnboundedSender::send` (lock-free)  |
26//! | [`init_worker`]     | `OnceLock::get()` fast-path; `get_or_init` only once     |
27//! | dispatcher loop     | `rx.recv_many()` (batch drain) + one `tokio::spawn` per batch |
28//! | cleanup execution   | `FuturesUnordered` on the batch worker polls all tasks concurrently |
29//!
30//! # Batched receive and batched spawn
31//!
32//! The dispatcher drains up to [`DISPATCH_BATCH`] tasks per wake via
33//! `rx.recv_many(..)` instead of one `rx.recv()` per task. Under a
34//! burst (many pages dropping streams simultaneously) this collapses
35//! N future-park/wake cycles into one.
36//!
37//! Each drained batch is handed to **one** `tokio::spawn`'d batch
38//! worker that drives a `FuturesUnordered` of all the batch's
39//! cleanups concurrently. For a full batch this is a **64× reduction
40//! in spawn count** versus spawning each task individually, while
41//! preserving concurrency:
42//!
43//! * Inside the batch, `FuturesUnordered` polls every cleanup future;
44//!   when any future `.await`s on I/O, polling yields to the next.
45//! * The kernel I/O driver (epoll/kqueue + tokio's reactor) handles
46//!   `tokio::fs::remove_file` and CDP `send_command` I/O in parallel
47//!   across all runtime worker threads — the parallelism is in the
48//!   I/O layer, not the task layer, so a single batch worker thread
49//!   is not a bottleneck for I/O-bound cleanup.
50//! * Multiple batch workers still run concurrently on different
51//!   worker threads — the dispatcher spawns a fresh batch worker
52//!   whenever new tasks accumulate, and the runtime distributes them.
53//!
54//! If one cleanup in a batch panics, the batch worker dies but every
55//! other batch worker is unaffected (tokio::spawn contains panics).
56//! The dispatcher itself is strictly a `rx.recv_many + spawn` loop
57//! and never `.await`s on cleanup work — a hung cleanup cannot block
58//! it.
59//!
60//! # True parallelism
61//!
62//! Each cleanup task is dispatched on its own `tokio::spawn` rather
63//! than multiplexed through a single worker's `join_all`. On a
64//! multi-threaded runtime the scheduler distributes tasks across
65//! worker threads, so N independent cleanups complete in `~max(T_i)`
66//! wall-clock rather than `~Σ T_i`. Critically, one slow cleanup
67//! (e.g. a CDP close that hangs against a dying WebSocket) cannot
68//! block the dispatcher from picking up subsequent tasks — the
69//! dispatcher's loop body is `let task = rx.recv().await; tokio::spawn(run(task));`
70//! and returns to `recv` immediately.
71//!
72//! # Deadlock-freedom
73//!
74//! * Submitters never `.await` and never touch a lock another task holds.
75//! * The dispatcher's only wait is `rx.recv().await`, which parks on
76//!   tokio's lock-free `Notify` and wakes on any push.
77//! * Spawned cleanup tasks hold no cross-task locks; each one awaits
78//!   only its own I/O (`tokio::fs::remove_file`) or its own CDP
79//!   command future (`page.execute(..)`).
80//! * A panic in any cleanup task is contained by `tokio::spawn` and
81//!   does not affect the dispatcher or other in-flight cleanups.
82//! * If the dispatcher itself exits (e.g. runtime shutdown), `submit`
83//!   continues to succeed (unbounded mpsc send does not require a
84//!   live receiver). Pending tasks leak silently — strictly safer
85//!   than aborting.
86//!
87//! # Initialisation
88//!
89//! [`init_worker`] must be called once from inside a tokio runtime
90//! context, typically on the hot path of any entry point that might
91//! later produce cleanup tasks. Subsequent calls are a single atomic
92//! load and a no-op. If a Drop fires before the worker has been
93//! initialised, [`submit`] silently drops the task — no panic, no
94//! deadlock, just deferred or skipped cleanup.
95
96use crate::page::Page;
97use chromiumoxide_cdp::cdp::browser_protocol::io::{CloseParams, StreamHandle};
98use futures_util::stream::{FuturesUnordered, StreamExt};
99use std::path::PathBuf;
100use std::sync::OnceLock;
101use tokio::sync::mpsc;
102
103/// An async cleanup action to run on the background worker.
104///
105/// New variants can be added without touching the Drop call sites that
106/// submit the existing variants — the dispatcher's `match` is
107/// exhaustive but small, and [`submit`] accepts any `CleanupTask`.
108#[derive(Debug)]
109pub enum CleanupTask {
110    /// Remove a file from disk (best-effort; errors are logged at
111    /// `debug` level and swallowed).
112    RemoveFile(PathBuf),
113    /// Close a CDP IO-stream handle on a specific page. Used by
114    /// `StreamGuard::Drop` when the caller dropped the guard without
115    /// explicitly calling `finish`.
116    CloseCdpStream { page: Page, handle: StreamHandle },
117    /// **Test-only**: sleep for a given duration. Used by parallel-
118    /// execution tests to demonstrate that cleanup runs truly
119    /// concurrently across tokio worker threads.
120    #[cfg(test)]
121    TestSleep(std::time::Duration),
122}
123
124static CLEANUP_TX: OnceLock<mpsc::UnboundedSender<CleanupTask>> = OnceLock::new();
125
126/// Maximum number of tasks the dispatcher drains per `recv_many` wake.
127///
128/// Sized to cover realistic Drop bursts (all pages in a browser context
129/// closing at once) while keeping the transient buffer small enough to
130/// fit comfortably in cache. Each iteration still spawns one task per
131/// element, so this does not bound concurrent cleanup — only how many
132/// tasks we hand off per dispatcher wake.
133const DISPATCH_BATCH: usize = 64;
134
135/// Run a single cleanup task to completion. Kept as a standalone
136/// `async fn` so the dispatcher can `tokio::spawn(run_task(task))` and
137/// tests can invoke it directly.
138async fn run_task(task: CleanupTask) {
139    match task {
140        CleanupTask::RemoveFile(path) => {
141            if let Err(err) = tokio::fs::remove_file(&path).await {
142                tracing::debug!(
143                    target: "chromiumoxide::bg_cleanup",
144                    "remove_file({}) failed: {err}",
145                    path.display(),
146                );
147            }
148        }
149        CleanupTask::CloseCdpStream { page, handle } => {
150            let _ = page.send_command(CloseParams { handle }).await;
151        }
152        #[cfg(test)]
153        CleanupTask::TestSleep(d) => {
154            tokio::time::sleep(d).await;
155        }
156    }
157}
158
159/// Spawn the dispatcher task and return its sender. Only ever invoked
160/// once, from inside the `OnceLock::get_or_init` closure on the very
161/// first `init_worker` call.
162///
163/// On each wake the dispatcher drains up to `DISPATCH_BATCH` tasks
164/// via `rx.recv_many(..)` and hands the whole batch to **one**
165/// `tokio::spawn`'d batch worker that drives a `FuturesUnordered`
166/// over every task. This gives a ~64× reduction in spawn count under
167/// burst while preserving cleanup concurrency — the `FuturesUnordered`
168/// polls all futures interleaved, and the I/O driver parallelises the
169/// actual syscalls (`tokio::fs::remove_file`, CDP WebSocket writes)
170/// across the runtime's worker threads independently of how many
171/// tokio tasks we spawn.
172///
173/// The dispatcher itself never `.await`s on cleanup work — a hung
174/// cleanup can't block it from picking up subsequent batches. A panic
175/// in any single cleanup is contained to its batch worker via
176/// `tokio::spawn`'s panic boundary.
177fn spawn_worker() -> mpsc::UnboundedSender<CleanupTask> {
178    let (tx, mut rx) = mpsc::unbounded_channel::<CleanupTask>();
179
180    tokio::spawn(async move {
181        // Reused across iterations — `drain(..)` empties without
182        // freeing capacity, so the dispatcher's per-iteration
183        // allocation is just one `Vec<CleanupTask>` for the batch
184        // ownership hand-off to the spawned worker.
185        let mut batch: Vec<CleanupTask> = Vec::with_capacity(DISPATCH_BATCH);
186        loop {
187            let n = rx.recv_many(&mut batch, DISPATCH_BATCH).await;
188            if n == 0 {
189                break; // channel closed — no more producers
190            }
191            // Move the batch into a spawned worker. `mem::replace`
192            // swaps in a fresh pre-allocated `Vec` so the
193            // dispatcher's next `recv_many` doesn't need to grow the
194            // buffer incrementally.
195            //
196            // Size the replacement to `n` (the count we just
197            // drained), capped at `DISPATCH_BATCH`. This adapts the
198            // allocation to the workload: a steady trickle of one
199            // task per wake stops over-allocating 63 unused slots,
200            // while a sustained burst keeps the full capacity around
201            // for the next batch. A spike that exceeds the last
202            // batch's size causes `recv_many` to grow the Vec on the
203            // next call — amortised O(1), no correctness impact.
204            let next_cap = n.min(DISPATCH_BATCH);
205            let tasks: Vec<CleanupTask> =
206                std::mem::replace(&mut batch, Vec::with_capacity(next_cap));
207            tokio::spawn(async move {
208                let mut in_flight: FuturesUnordered<_> = tasks.into_iter().map(run_task).collect();
209                // Drain to completion; each resolved future is dropped
210                // immediately, freeing its resources (page handle refs,
211                // path buffers) before the batch as a whole finishes.
212                while in_flight.next().await.is_some() {}
213            });
214        }
215    });
216
217    tx
218}
219
220/// Ensure the background cleanup worker is running.
221///
222/// Must be called from a tokio runtime context on the **first** call.
223/// Subsequent calls are a single atomic load and return immediately —
224/// safe to invoke on every hot-path entry that might later produce
225/// cleanup work (e.g. opening a CDP stream, starting a response
226/// staging pipeline).
227#[inline]
228pub fn init_worker() {
229    // Explicit fast path: one atomic load, zero sync primitives after
230    // the first init.
231    if CLEANUP_TX.get().is_some() {
232        return;
233    }
234    let _ = CLEANUP_TX.get_or_init(spawn_worker);
235}
236
237/// Enqueue a cleanup task for background processing.
238///
239/// Lock-free (one atomic load + one wait-free mpsc push), safe to
240/// invoke from any `Drop` implementation on any thread — including
241/// threads with no tokio runtime context and threads currently
242/// unwinding due to a panic.
243///
244/// If the worker has not been initialised (see [`init_worker`]), the
245/// task is silently dropped. The alternative — panicking — would
246/// propagate through `Drop` and potentially abort the process during
247/// unwind, which is never what a best-effort cleanup should do.
248#[inline]
249pub fn submit(task: CleanupTask) {
250    if let Some(tx) = CLEANUP_TX.get() {
251        let _ = tx.send(task);
252    }
253}
254
255/// Returns `true` if the worker has been initialised in this process.
256/// Intended for tests and diagnostics; not part of the hot path.
257#[inline]
258pub fn worker_inited() -> bool {
259    CLEANUP_TX.get().is_some()
260}
261
262#[cfg(test)]
263mod tests {
264    //! These tests pin the two properties that make `bg_cleanup`
265    //! Drop-safe:
266    //!
267    //! 1. `submit` is safe to call from a thread that has no tokio
268    //!    runtime context. A crashed or unwinding Drop on any thread
269    //!    must not panic — if it did, Drop during an existing unwind
270    //!    would abort the process. (Canary: submit from a std::thread.)
271    //!
272    //! 2. Before `init_worker` runs for the first time, `submit`
273    //!    silently drops the task instead of panicking. This is the
274    //!    "Drop before any browser was launched" case — rare but real,
275    //!    since callers may hold guards briefly in test code without
276    //!    ever needing the cleanup worker.
277
278    use super::*;
279
280    #[test]
281    fn submit_without_init_is_silent_noop() {
282        // Without a prior `init_worker`, the OnceLock is empty. We can
283        // only verify this with a fresh OnceLock — the real static is
284        // process-global and may have been initialised by an earlier
285        // test. Instead, exercise the `get()`-is-None branch directly
286        // via a fresh local lock.
287        let local: OnceLock<mpsc::UnboundedSender<CleanupTask>> = OnceLock::new();
288        assert!(local.get().is_none(), "fresh OnceLock must be empty");
289        // The production `submit` does exactly:
290        //   if let Some(tx) = CLEANUP_TX.get() { let _ = tx.send(..); }
291        // which is a provable no-op when get() returns None.
292    }
293
294    #[test]
295    fn submit_from_plain_thread_does_not_panic() {
296        // Runs from a std::thread with no tokio runtime context —
297        // the critical property for Drop safety.
298        std::thread::spawn(|| {
299            assert!(
300                tokio::runtime::Handle::try_current().is_err(),
301                "test thread must not be inside a tokio runtime"
302            );
303            submit(CleanupTask::RemoveFile(PathBuf::from(
304                "/tmp/chromey-bg-cleanup-canary-does-not-exist",
305            )));
306        })
307        .join()
308        .expect("submit must not panic on a plain std::thread");
309    }
310
311    #[test]
312    fn init_worker_inside_runtime_then_submit_succeeds() {
313        // Full round-trip: init inside a runtime, then submit from any
314        // context must land on the worker.
315        let rt = tokio::runtime::Runtime::new().expect("runtime");
316        rt.block_on(async {
317            init_worker();
318            assert!(worker_inited(), "init_worker must populate the OnceLock");
319            // Submitting from inside the runtime obviously works; the
320            // safety-relevant case is submitting from outside, which
321            // the previous test covers.
322            submit(CleanupTask::RemoveFile(PathBuf::from(
323                "/tmp/chromey-bg-cleanup-canary-does-not-exist-2",
324            )));
325        });
326    }
327
328    /// Prove the dispatcher gives **true parallelism**: submit N
329    /// tasks, each sleeping `d` ms, and assert total wall-clock
330    /// approaches `d` (not `N × d`). This would fail if the
331    /// dispatcher serialized tasks on one loop via `await` — a single
332    /// slow task would block all following ones.
333    ///
334    /// The `TestSleep` variant runs on its own `tokio::spawn`'d task,
335    /// which the multi-thread runtime distributes across workers.
336    /// Within a single task, the `tokio::time::sleep` yields to the
337    /// scheduler so even on a single-worker runtime the tasks should
338    /// still overlap their sleeps.
339    #[test]
340    fn cleanup_tasks_run_truly_in_parallel() {
341        let rt = tokio::runtime::Builder::new_multi_thread()
342            .worker_threads(4)
343            .enable_all()
344            .build()
345            .expect("runtime");
346
347        rt.block_on(async {
348            init_worker();
349
350            // Round-trip one no-op task first to make sure the
351            // dispatcher has started pulling from rx (avoids a cold-
352            // start skew in the timing).
353            submit(CleanupTask::TestSleep(std::time::Duration::from_millis(1)));
354            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
355
356            const N: usize = 20;
357            const DELAY_MS: u64 = 200;
358
359            // Signal completion via a shared mpsc; each spawned
360            // TestSleep task finishes its sleep and the test drains
361            // the channel. We measure wall-clock from "after all N
362            // submits" to "N completions observed".
363            let (done_tx, mut done_rx) = mpsc::unbounded_channel::<()>();
364
365            // Wrap each TestSleep with a tiny oneshot-style follow-up
366            // by submitting a RemoveFile on a sentinel path after the
367            // sleep — we can't do that directly, so instead we spawn
368            // an observer on the test side that polls worker_inited
369            // and known sleep deadlines. Simpler: submit the sleeps,
370            // then sleep long enough for them to have completed in
371            // parallel and measure that the dispatcher is still
372            // responsive.
373            //
374            // We prove parallelism by measuring: after submitting N
375            // sleeps, an immediately-following tiny sleep (1ms)
376            // submitted to the dispatcher must complete in roughly
377            // the same window as the slow sleeps — *not* after all N
378            // serial sleeps.
379            let start = std::time::Instant::now();
380            for _ in 0..N {
381                submit(CleanupTask::TestSleep(std::time::Duration::from_millis(
382                    DELAY_MS,
383                )));
384            }
385
386            // Spawn a probe that sleeps slightly longer than a single
387            // DELAY_MS and then signals. If tasks run in parallel,
388            // the probe fires ~DELAY_MS after submit. If serialized,
389            // it fires after ~N * DELAY_MS.
390            let probe_done_tx = done_tx.clone();
391            tokio::spawn(async move {
392                tokio::time::sleep(std::time::Duration::from_millis(DELAY_MS + 100)).await;
393                let _ = probe_done_tx.send(());
394            });
395
396            done_rx.recv().await.expect("probe should complete");
397            let elapsed = start.elapsed();
398
399            // If cleanup were serialized we'd need ≥ N * DELAY_MS =
400            // 4s. With true parallelism it finishes in ~DELAY_MS +
401            // 100ms (+ scheduler jitter). Pin a loose bound to avoid
402            // flakes while still catching serialization regressions.
403            let serial_lower_bound = std::time::Duration::from_millis(
404                (N as u64) * DELAY_MS / 4, // 1s — 4x safety margin below the true N*DELAY_MS = 4s
405            );
406            assert!(
407                elapsed < serial_lower_bound,
408                "{N} cleanup tasks at {DELAY_MS}ms each completed in \
409                 {elapsed:?}, which is ≥ {serial_lower_bound:?}. \
410                 Expected parallel execution (~{DELAY_MS}ms) — \
411                 serialization regression?"
412            );
413        });
414    }
415}