1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
//! Background worker for the Fetch-interceptor response-stage cache
//! dump path.
//!
//! `spawn_fetch_interceptor_cache_listener` produces a stream of
//! `Fetch.requestPaused` events. At the *response* stage, each
//! eligible event drives a multi-step async pipeline:
//!
//! 1. Stream the response body via CDP `IO.read`.
//! 2. Build an `HttpResponse` from headers + streamed body.
//! 3. Hand off to `put_hybrid_cache`, which enqueues the remote
//! dump to the shared `spider_remote_cache` worker.
//!
//! Previously, every response-stage event was dispatched via its
//! own `tokio::spawn`. Each spawn allocates a task header (~2-4 KB)
//! and costs ~1 µs of setup — manageable per-event, but it adds
//! up under burst (one page with 100 subresources = 100 spawns,
//! ~200-400 KB transient task memory).
//!
//! This module consolidates dispatch behind a singleton
//! `OnceLock<UnboundedSender<FetchResponseJob>>` + batched
//! dispatcher, identical in shape to [`crate::bg_cleanup`] and
//! [`crate::runtime_release`]:
//!
//! * Submit is a single atomic `OnceLock::get()` + lock-free
//! `UnboundedSender::send` — wait-free, safe from any async
//! context.
//! * The dispatcher drains up to [`DISPATCH_BATCH`] jobs per wake
//! via `rx.recv_many(..)` and hands each batch to **one**
//! `tokio::spawn`'d batch worker.
//! * The batch worker drives a `FuturesUnordered` of every job's
//! pipeline, polling all of them concurrently. Independent jobs
//! (different `request_id`s, different URLs) have no cross-job
//! ordering constraint, so unordered polling is correct and
//! maximally parallel.
//! * One spawn per batch vs per job → ~64× reduction in spawn
//! count while preserving full concurrency of the I/O pipeline.
//!
//! Safety properties:
//!
//! * **No deadlock.** Dispatcher never `.await`s on job work;
//! loop body is `drain → spawn batch-worker`. A slow job cannot
//! block subsequent batches from being picked up.
//! * **No panic propagation.** Any panic inside a job is contained
//! by the batch worker's `tokio::spawn` boundary and does not
//! affect the dispatcher or other batches.
//! * **No cross-job ordering needed.** CDP dispatches events in
//! arrival order and each job's `request_id` is unique, so the
//! per-job pipelines are independent.
use crate;
use crateEventRequestPaused;
use cratePage;
use ;
use ;
use mpsc;
/// Maximum number of jobs drained per dispatcher wake. Under burst
/// the dispatcher spawns one batch worker per drain; concurrent
/// batches are not limited by this constant.
const DISPATCH_BATCH: usize = 64;
/// A queued response-stage handling job.
///
/// Carries only cheap-clone / `Arc`-wrapped references:
/// `Page` is `Arc<PageInner>`, `ev` is already `Arc<_>` from the
/// event listener, `auth` is an owned `String`, `cache_strategy`
/// is `Copy`. No heavy clone is performed at submit time.
static FETCH_TX: = new;
/// Spawn the dispatcher and return its sender. Only ever invoked
/// once, from inside the `OnceLock::get_or_init` closure on the
/// very first `init_worker` call.
/// Ensure the fetch-response background worker is running.
///
/// Must be called from a tokio runtime context on the first call.
/// Subsequent calls are a single atomic load and return
/// immediately.
/// Enqueue a fetch-response job for background processing.
///
/// Lock-free (one atomic load + one wait-free mpsc push). If the
/// worker has not been initialised the job is silently dropped —
/// the caller's page will fall back to the non-streaming
/// Network-listener path.
/// Returns `true` if the worker has been initialised. Intended for
/// diagnostics; not part of the hot path.