chromiumoxide/bg_cleanup.rs
1//! Background cleanup worker for Drop-time async work.
2//!
3//! Certain RAII guards in this crate (`content_stream::Sink`,
4//! `cache::stream::StreamGuard`, `cache::stream::ChunkSink`) need to do
5//! async work at Drop time — removing a temp file, closing a CDP
6//! IO-stream handle. Calling `tokio::spawn` directly from `Drop` is
7//! dangerous:
8//!
9//! * It panics if the current thread is outside a tokio runtime
10//! (panic unwind, sync destructor, post-shutdown).
11//! * A panicking `Drop` during an existing unwind aborts the process.
12//! * Every Drop call has to re-check `Handle::try_current()`.
13//!
14//! This module follows the same shape as [`crate::runtime_release`]:
15//! a single long-lived background task owns an unbounded mpsc
16//! receiver, and every `Drop` site becomes a wait-free
17//! `UnboundedSender::send`. The Drop thread does not need a runtime,
18//! cannot panic on the hot path, and performs no syscalls beyond one
19//! atomic load + one lock-free mpsc push.
20//!
21//! # Lock-freedom
22//!
23//! | operation | primitive |
24//! | ------------------- | -------------------------------------------------------- |
25//! | [`submit`] | `OnceLock::get()` + `UnboundedSender::send` (lock-free) |
26//! | [`init_worker`] | `OnceLock::get()` fast-path; `get_or_init` only once |
27//! | dispatcher loop | `rx.recv_many()` (batch drain) + one `tokio::spawn` per batch |
28//! | cleanup execution | `FuturesUnordered` on the batch worker polls all tasks concurrently |
29//!
30//! # Batched receive and batched spawn
31//!
32//! The dispatcher drains up to [`DISPATCH_BATCH`] tasks per wake via
33//! `rx.recv_many(..)` instead of one `rx.recv()` per task. Under a
34//! burst (many pages dropping streams simultaneously) this collapses
35//! N future-park/wake cycles into one.
36//!
37//! Each drained batch is handed to **one** `tokio::spawn`'d batch
38//! worker that drives a `FuturesUnordered` of all the batch's
39//! cleanups concurrently. For a full batch this is a **64× reduction
40//! in spawn count** versus spawning each task individually, while
41//! preserving concurrency:
42//!
43//! * Inside the batch, `FuturesUnordered` polls every cleanup future;
44//! when any future `.await`s on I/O, polling yields to the next.
45//! * The kernel I/O driver (epoll/kqueue + tokio's reactor) handles
46//! `tokio::fs::remove_file` and CDP `send_command` I/O in parallel
47//! across all runtime worker threads — the parallelism is in the
48//! I/O layer, not the task layer, so a single batch worker thread
49//! is not a bottleneck for I/O-bound cleanup.
50//! * Multiple batch workers still run concurrently on different
51//! worker threads — the dispatcher spawns a fresh batch worker
52//! whenever new tasks accumulate, and the runtime distributes them.
53//!
54//! If one cleanup in a batch panics, the batch worker dies but every
55//! other batch worker is unaffected (tokio::spawn contains panics).
56//! The dispatcher itself is strictly a `rx.recv_many + spawn` loop
57//! and never `.await`s on cleanup work — a hung cleanup cannot block
58//! it.
59//!
60//! # True parallelism
61//!
62//! Each cleanup task is dispatched on its own `tokio::spawn` rather
63//! than multiplexed through a single worker's `join_all`. On a
64//! multi-threaded runtime the scheduler distributes tasks across
65//! worker threads, so N independent cleanups complete in `~max(T_i)`
66//! wall-clock rather than `~Σ T_i`. Critically, one slow cleanup
67//! (e.g. a CDP close that hangs against a dying WebSocket) cannot
68//! block the dispatcher from picking up subsequent tasks — the
69//! dispatcher's loop body is `let task = rx.recv().await; tokio::spawn(run(task));`
70//! and returns to `recv` immediately.
71//!
72//! # Deadlock-freedom
73//!
74//! * Submitters never `.await` and never touch a lock another task holds.
75//! * The dispatcher's only wait is `rx.recv().await`, which parks on
76//! tokio's lock-free `Notify` and wakes on any push.
77//! * Spawned cleanup tasks hold no cross-task locks; each one awaits
78//! only its own I/O (`tokio::fs::remove_file`) or its own CDP
79//! command future (`page.execute(..)`).
80//! * A panic in any cleanup task is contained by `tokio::spawn` and
81//! does not affect the dispatcher or other in-flight cleanups.
82//! * If the dispatcher itself exits (e.g. runtime shutdown), `submit`
83//! continues to succeed (unbounded mpsc send does not require a
84//! live receiver). Pending tasks leak silently — strictly safer
85//! than aborting.
86//!
87//! # Initialisation
88//!
89//! [`init_worker`] must be called once from inside a tokio runtime
90//! context, typically on the hot path of any entry point that might
91//! later produce cleanup tasks. Subsequent calls are a single atomic
92//! load and a no-op. If a Drop fires before the worker has been
93//! initialised, [`submit`] silently drops the task — no panic, no
94//! deadlock, just deferred or skipped cleanup.
95
96use crate::page::Page;
97use chromiumoxide_cdp::cdp::browser_protocol::io::{CloseParams, StreamHandle};
98use futures_util::stream::{FuturesUnordered, StreamExt};
99use std::path::PathBuf;
100use std::sync::OnceLock;
101use tokio::sync::mpsc;
102
103/// An async cleanup action to run on the background worker.
104///
105/// New variants can be added without touching the Drop call sites that
106/// submit the existing variants — the dispatcher's `match` is
107/// exhaustive but small, and [`submit`] accepts any `CleanupTask`.
108#[derive(Debug)]
109pub enum CleanupTask {
110 /// Remove a file from disk (best-effort; errors are logged at
111 /// `debug` level and swallowed).
112 RemoveFile(PathBuf),
113 /// Close a CDP IO-stream handle on a specific page. Used by
114 /// `StreamGuard::Drop` when the caller dropped the guard without
115 /// explicitly calling `finish`.
116 CloseCdpStream { page: Page, handle: StreamHandle },
117 /// **Test-only**: sleep for a given duration. Used by parallel-
118 /// execution tests to demonstrate that cleanup runs truly
119 /// concurrently across tokio worker threads.
120 #[cfg(test)]
121 TestSleep(std::time::Duration),
122}
123
124static CLEANUP_TX: OnceLock<mpsc::UnboundedSender<CleanupTask>> = OnceLock::new();
125
126/// Maximum number of tasks the dispatcher drains per `recv_many` wake.
127///
128/// Sized to cover realistic Drop bursts (all pages in a browser context
129/// closing at once) while keeping the transient buffer small enough to
130/// fit comfortably in cache. Each iteration still spawns one task per
131/// element, so this does not bound concurrent cleanup — only how many
132/// tasks we hand off per dispatcher wake.
133const DISPATCH_BATCH: usize = 64;
134
135/// Run a single cleanup task to completion. Kept as a standalone
136/// `async fn` so the dispatcher can `tokio::spawn(run_task(task))` and
137/// tests can invoke it directly.
138async fn run_task(task: CleanupTask) {
139 match task {
140 CleanupTask::RemoveFile(path) => {
141 if let Err(err) = tokio::fs::remove_file(&path).await {
142 tracing::debug!(
143 target: "chromiumoxide::bg_cleanup",
144 "remove_file({}) failed: {err}",
145 path.display(),
146 );
147 }
148 }
149 CleanupTask::CloseCdpStream { page, handle } => {
150 let _ = page.send_command(CloseParams { handle }).await;
151 }
152 #[cfg(test)]
153 CleanupTask::TestSleep(d) => {
154 tokio::time::sleep(d).await;
155 }
156 }
157}
158
159/// Spawn the dispatcher task and return its sender. Only ever invoked
160/// once, from inside the `OnceLock::get_or_init` closure on the very
161/// first `init_worker` call.
162///
163/// On each wake the dispatcher drains up to `DISPATCH_BATCH` tasks
164/// via `rx.recv_many(..)` and hands the whole batch to **one**
165/// `tokio::spawn`'d batch worker that drives a `FuturesUnordered`
166/// over every task. This gives a ~64× reduction in spawn count under
167/// burst while preserving cleanup concurrency — the `FuturesUnordered`
168/// polls all futures interleaved, and the I/O driver parallelises the
169/// actual syscalls (`tokio::fs::remove_file`, CDP WebSocket writes)
170/// across the runtime's worker threads independently of how many
171/// tokio tasks we spawn.
172///
173/// The dispatcher itself never `.await`s on cleanup work — a hung
174/// cleanup can't block it from picking up subsequent batches. A panic
175/// in any single cleanup is contained to its batch worker via
176/// `tokio::spawn`'s panic boundary.
177fn spawn_worker() -> mpsc::UnboundedSender<CleanupTask> {
178 let (tx, mut rx) = mpsc::unbounded_channel::<CleanupTask>();
179
180 tokio::spawn(async move {
181 // Reused across iterations — `drain(..)` empties without
182 // freeing capacity, so the dispatcher's per-iteration
183 // allocation is just one `Vec<CleanupTask>` for the batch
184 // ownership hand-off to the spawned worker.
185 let mut batch: Vec<CleanupTask> = Vec::with_capacity(DISPATCH_BATCH);
186 loop {
187 let n = rx.recv_many(&mut batch, DISPATCH_BATCH).await;
188 if n == 0 {
189 break; // channel closed — no more producers
190 }
191 // Move the batch into a spawned worker. `mem::replace`
192 // swaps in a fresh pre-allocated `Vec` so the
193 // dispatcher's next `recv_many` doesn't need to grow the
194 // buffer incrementally.
195 //
196 // Size the replacement to `n` (the count we just
197 // drained), capped at `DISPATCH_BATCH`. This adapts the
198 // allocation to the workload: a steady trickle of one
199 // task per wake stops over-allocating 63 unused slots,
200 // while a sustained burst keeps the full capacity around
201 // for the next batch. A spike that exceeds the last
202 // batch's size causes `recv_many` to grow the Vec on the
203 // next call — amortised O(1), no correctness impact.
204 let next_cap = n.min(DISPATCH_BATCH);
205 let tasks: Vec<CleanupTask> =
206 std::mem::replace(&mut batch, Vec::with_capacity(next_cap));
207 tokio::spawn(async move {
208 let mut in_flight: FuturesUnordered<_> = tasks.into_iter().map(run_task).collect();
209 // Drain to completion; each resolved future is dropped
210 // immediately, freeing its resources (page handle refs,
211 // path buffers) before the batch as a whole finishes.
212 while in_flight.next().await.is_some() {}
213 });
214 }
215 });
216
217 tx
218}
219
220/// Ensure the background cleanup worker is running.
221///
222/// Must be called from a tokio runtime context on the **first** call.
223/// Subsequent calls are a single atomic load and return immediately —
224/// safe to invoke on every hot-path entry that might later produce
225/// cleanup work (e.g. opening a CDP stream, starting a response
226/// staging pipeline).
227#[inline]
228pub fn init_worker() {
229 // Explicit fast path: one atomic load, zero sync primitives after
230 // the first init.
231 if CLEANUP_TX.get().is_some() {
232 return;
233 }
234 let _ = CLEANUP_TX.get_or_init(spawn_worker);
235}
236
237/// Enqueue a cleanup task for background processing.
238///
239/// Lock-free (one atomic load + one wait-free mpsc push), safe to
240/// invoke from any `Drop` implementation on any thread — including
241/// threads with no tokio runtime context and threads currently
242/// unwinding due to a panic.
243///
244/// If the worker has not been initialised (see [`init_worker`]), the
245/// task is silently dropped. The alternative — panicking — would
246/// propagate through `Drop` and potentially abort the process during
247/// unwind, which is never what a best-effort cleanup should do.
248#[inline]
249pub fn submit(task: CleanupTask) {
250 if let Some(tx) = CLEANUP_TX.get() {
251 let _ = tx.send(task);
252 }
253}
254
255/// Returns `true` if the worker has been initialised in this process.
256/// Intended for tests and diagnostics; not part of the hot path.
257#[inline]
258pub fn worker_inited() -> bool {
259 CLEANUP_TX.get().is_some()
260}
261
262#[cfg(test)]
263mod tests {
264 //! These tests pin the two properties that make `bg_cleanup`
265 //! Drop-safe:
266 //!
267 //! 1. `submit` is safe to call from a thread that has no tokio
268 //! runtime context. A crashed or unwinding Drop on any thread
269 //! must not panic — if it did, Drop during an existing unwind
270 //! would abort the process. (Canary: submit from a std::thread.)
271 //!
272 //! 2. Before `init_worker` runs for the first time, `submit`
273 //! silently drops the task instead of panicking. This is the
274 //! "Drop before any browser was launched" case — rare but real,
275 //! since callers may hold guards briefly in test code without
276 //! ever needing the cleanup worker.
277
278 use super::*;
279
280 #[test]
281 fn submit_without_init_is_silent_noop() {
282 // Without a prior `init_worker`, the OnceLock is empty. We can
283 // only verify this with a fresh OnceLock — the real static is
284 // process-global and may have been initialised by an earlier
285 // test. Instead, exercise the `get()`-is-None branch directly
286 // via a fresh local lock.
287 let local: OnceLock<mpsc::UnboundedSender<CleanupTask>> = OnceLock::new();
288 assert!(local.get().is_none(), "fresh OnceLock must be empty");
289 // The production `submit` does exactly:
290 // if let Some(tx) = CLEANUP_TX.get() { let _ = tx.send(..); }
291 // which is a provable no-op when get() returns None.
292 }
293
294 #[test]
295 fn submit_from_plain_thread_does_not_panic() {
296 // Runs from a std::thread with no tokio runtime context —
297 // the critical property for Drop safety.
298 std::thread::spawn(|| {
299 assert!(
300 tokio::runtime::Handle::try_current().is_err(),
301 "test thread must not be inside a tokio runtime"
302 );
303 submit(CleanupTask::RemoveFile(PathBuf::from(
304 "/tmp/chromey-bg-cleanup-canary-does-not-exist",
305 )));
306 })
307 .join()
308 .expect("submit must not panic on a plain std::thread");
309 }
310
311 #[test]
312 fn init_worker_inside_runtime_then_submit_succeeds() {
313 // Full round-trip: init inside a runtime, then submit from any
314 // context must land on the worker.
315 let rt = tokio::runtime::Runtime::new().expect("runtime");
316 rt.block_on(async {
317 init_worker();
318 assert!(worker_inited(), "init_worker must populate the OnceLock");
319 // Submitting from inside the runtime obviously works; the
320 // safety-relevant case is submitting from outside, which
321 // the previous test covers.
322 submit(CleanupTask::RemoveFile(PathBuf::from(
323 "/tmp/chromey-bg-cleanup-canary-does-not-exist-2",
324 )));
325 });
326 }
327
328 /// Prove the dispatcher gives **true parallelism**: submit N
329 /// tasks, each sleeping `d` ms, and assert total wall-clock
330 /// approaches `d` (not `N × d`). This would fail if the
331 /// dispatcher serialized tasks on one loop via `await` — a single
332 /// slow task would block all following ones.
333 ///
334 /// The `TestSleep` variant runs on its own `tokio::spawn`'d task,
335 /// which the multi-thread runtime distributes across workers.
336 /// Within a single task, the `tokio::time::sleep` yields to the
337 /// scheduler so even on a single-worker runtime the tasks should
338 /// still overlap their sleeps.
339 #[test]
340 fn cleanup_tasks_run_truly_in_parallel() {
341 let rt = tokio::runtime::Builder::new_multi_thread()
342 .worker_threads(4)
343 .enable_all()
344 .build()
345 .expect("runtime");
346
347 rt.block_on(async {
348 init_worker();
349
350 // Round-trip one no-op task first to make sure the
351 // dispatcher has started pulling from rx (avoids a cold-
352 // start skew in the timing).
353 submit(CleanupTask::TestSleep(std::time::Duration::from_millis(1)));
354 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
355
356 const N: usize = 20;
357 const DELAY_MS: u64 = 200;
358
359 // Signal completion via a shared mpsc; each spawned
360 // TestSleep task finishes its sleep and the test drains
361 // the channel. We measure wall-clock from "after all N
362 // submits" to "N completions observed".
363 let (done_tx, mut done_rx) = mpsc::unbounded_channel::<()>();
364
365 // Wrap each TestSleep with a tiny oneshot-style follow-up
366 // by submitting a RemoveFile on a sentinel path after the
367 // sleep — we can't do that directly, so instead we spawn
368 // an observer on the test side that polls worker_inited
369 // and known sleep deadlines. Simpler: submit the sleeps,
370 // then sleep long enough for them to have completed in
371 // parallel and measure that the dispatcher is still
372 // responsive.
373 //
374 // We prove parallelism by measuring: after submitting N
375 // sleeps, an immediately-following tiny sleep (1ms)
376 // submitted to the dispatcher must complete in roughly
377 // the same window as the slow sleeps — *not* after all N
378 // serial sleeps.
379 let start = std::time::Instant::now();
380 for _ in 0..N {
381 submit(CleanupTask::TestSleep(std::time::Duration::from_millis(
382 DELAY_MS,
383 )));
384 }
385
386 // Spawn a probe that sleeps slightly longer than a single
387 // DELAY_MS and then signals. If tasks run in parallel,
388 // the probe fires ~DELAY_MS after submit. If serialized,
389 // it fires after ~N * DELAY_MS.
390 let probe_done_tx = done_tx.clone();
391 tokio::spawn(async move {
392 tokio::time::sleep(std::time::Duration::from_millis(DELAY_MS + 100)).await;
393 let _ = probe_done_tx.send(());
394 });
395
396 done_rx.recv().await.expect("probe should complete");
397 let elapsed = start.elapsed();
398
399 // If cleanup were serialized we'd need ≥ N * DELAY_MS =
400 // 4s. With true parallelism it finishes in ~DELAY_MS +
401 // 100ms (+ scheduler jitter). Pin a loose bound to avoid
402 // flakes while still catching serialization regressions.
403 let serial_lower_bound = std::time::Duration::from_millis(
404 (N as u64) * DELAY_MS / 4, // 1s — 4x safety margin below the true N*DELAY_MS = 4s
405 );
406 assert!(
407 elapsed < serial_lower_bound,
408 "{N} cleanup tasks at {DELAY_MS}ms each completed in \
409 {elapsed:?}, which is ≥ {serial_lower_bound:?}. \
410 Expected parallel execution (~{DELAY_MS}ms) — \
411 serialization regression?"
412 );
413 });
414 }
415}