Skip to main content

zendriver_interception/
actor.rs

1//! Background interception actor.
2//!
3//! The crate-private `run_actor` is the rule-driven loop spawned by
4//! [`InterceptBuilder::start`](crate::builder::InterceptBuilder::start). It
5//! owns a single tab's `Fetch.*` interception lifecycle:
6//!
7//! 1. Subscribes to `Fetch.requestPaused` on the supplied [`SessionHandle`]
8//!    **before** firing `Fetch.enable`. Mirrors the subscriber pattern used
9//!    by the zendriver core's frame-lifecycle and network-idle trackers —
10//!    events Chrome fires between the enable round-trip and our subscription
11//!    would otherwise be dropped, and the `MockConnection` test harness in
12//!    `zendriver-transport` (gated `feature = "testing"`) never replies to
13//!    fire-and-forget enables anyway.
14//! 2. Sends `Fetch.enable { patterns, handleAuthRequests }` with the
15//!    explicit pattern list supplied by the builder; `handleAuthRequests`
16//!    flips to `true` when the builder also called
17//!    [`InterceptBuilder::handle_auth`](crate::builder::InterceptBuilder::handle_auth)
18//!    so Chrome surfaces `Fetch.authRequired` events the actor answers with
19//!    `Fetch.continueWithAuth`.
20//! 3. Per `Fetch.requestPaused` event: walks `rules` in registration order,
21//!    first match wins, dispatches the matching action's CDP call. No
22//!    match → plain `Fetch.continueRequest` (let through).
23//! 4. On cancellation: fires `Fetch.disable` and exits. The handle returned
24//!    by the builder owns a [`CancellationToken`] that fires on `Drop`, so
25//!    interception always tears down deterministically when the handle
26//!    leaves scope.
27//!
28//! [`InterceptHandle`] is the user-facing RAII guard. Its [`stop`] method is
29//! the explicit-shutdown path — it cancels the token *and* awaits a oneshot
30//! the actor signals on exit, so the caller observes `Fetch.disable` has
31//! reached the wire before `stop()` returns.
32//!
33//! [`stop`]: InterceptHandle::stop
34
35use base64::Engine as _;
36use base64::engine::general_purpose::STANDARD as BASE64;
37use futures::StreamExt;
38use serde::Deserialize;
39use serde_json::{Map, Value, json};
40use std::collections::HashMap;
41use tokio::sync::oneshot;
42use tokio_util::sync::CancellationToken;
43use tracing::{trace, warn};
44use zendriver_transport::SessionHandle;
45
46use crate::builder::RequestPattern;
47use crate::error::InterceptionError;
48use crate::rule::Rule;
49use crate::types::{RequestInfo, RequestOverrides, ResourceType, ResponseInfo, ResponseOverrides};
50
51/// RAII guard returned by `InterceptBuilder::start` (Task 7).
52///
53/// The guard cancels the actor on [`Drop`] so interception always tears down
54/// when the handle leaves scope. Call [`stop`](Self::stop) instead when the
55/// caller needs to observe `Fetch.disable` reaching the wire before
56/// proceeding — `Drop` is fire-and-forget by construction.
57#[derive(Debug)]
58#[must_use = "interception stops when the handle is dropped — bind it to a variable to keep it alive"]
59pub struct InterceptHandle {
60    cancel: CancellationToken,
61    // `Option` so `stop(self)` can `.take()` the receiver without `Drop`
62    // racing on a half-moved field. `None` after `stop()` consumed it.
63    done: Option<oneshot::Receiver<()>>,
64}
65
66impl InterceptHandle {
67    /// Construct a handle from the cancel token + actor-exit receiver. The
68    /// constructor is `pub(crate)` so the only public path is via
69    /// [`InterceptBuilder::start`](crate::builder::InterceptBuilder::start).
70    pub(crate) fn new(cancel: CancellationToken, done: oneshot::Receiver<()>) -> Self {
71        Self {
72            cancel,
73            done: Some(done),
74        }
75    }
76
77    /// Test-support constructor: build a no-op handle backed by an unused
78    /// cancel token + a `oneshot::channel`'s receiver whose sender is
79    /// immediately dropped. Intended for downstream unit tests that need
80    /// to populate a registry of `InterceptHandle`s without going through
81    /// the actor pipeline. Dropping the handle still calls `.cancel()`
82    /// on the token (no observable side effect — nothing is listening).
83    ///
84    /// Gated behind the `test-support` feature so production builds don't
85    /// expose it.
86    #[cfg(any(test, feature = "test-support"))]
87    #[doc(hidden)]
88    pub fn for_tests() -> Self {
89        let (_done_tx, done_rx) = oneshot::channel();
90        Self {
91            cancel: CancellationToken::new(),
92            done: Some(done_rx),
93        }
94    }
95
96    /// Stop the actor and wait for it to acknowledge exit.
97    ///
98    /// Cancels the actor's token, then awaits the oneshot the actor sends
99    /// after `Fetch.disable` reaches the wire. Returns
100    /// [`InterceptionError::SubscriptionClosed`] if the actor was already
101    /// gone (channel closed without a signal — e.g. transport torn down
102    /// mid-flight); callers can usually treat that as success since the
103    /// effect (interception is off) is identical.
104    pub async fn stop(mut self) -> Result<(), InterceptionError> {
105        self.cancel.cancel();
106        match self.done.take() {
107            Some(rx) => rx.await.map_err(|_| InterceptionError::SubscriptionClosed),
108            None => Ok(()),
109        }
110    }
111}
112
113impl Drop for InterceptHandle {
114    fn drop(&mut self) {
115        // Fire-and-forget on drop: cancel the actor's token. The actor's
116        // own `Fetch.disable` call will race the transport teardown, but
117        // since `Fetch.disable` is harmless when the session is already
118        // closing we don't try to await anything here.
119        self.cancel.cancel();
120    }
121}
122
123/// Decoded `Fetch.requestPaused` event payload.
124///
125/// Projects only the fields the actor consumes. Extra fields Chrome sends
126/// (e.g. `frameId`, `networkId`) are deliberately ignored — the rule API
127/// surfaces what callers asked for via [`RequestInfo`] / [`ResponseInfo`].
128///
129/// `pub(crate)` so [`crate::builder::InterceptBuilder::subscribe`] can reuse
130/// the same projection on the stream path.
131#[derive(Debug, Deserialize)]
132pub(crate) struct RequestPausedEvent {
133    #[serde(rename = "requestId")]
134    pub(crate) request_id: String,
135    pub(crate) request: RequestPayload,
136    #[serde(rename = "resourceType", default)]
137    pub(crate) resource_type: Option<String>,
138    // Only populated at the `Response` stage.
139    #[serde(rename = "responseStatusCode", default)]
140    pub(crate) response_status_code: Option<u16>,
141    #[serde(rename = "responseStatusText", default)]
142    pub(crate) response_status_text: Option<String>,
143    #[serde(rename = "responseHeaders", default)]
144    pub(crate) response_headers: Option<Vec<HeaderPair>>,
145}
146
147#[derive(Debug, Deserialize)]
148pub(crate) struct RequestPayload {
149    pub(crate) url: String,
150    pub(crate) method: String,
151    #[serde(default)]
152    pub(crate) headers: HashMap<String, String>,
153    /// Chrome's text representation of the request body. For multipart /
154    /// binary uploads this can be lossy — Chrome rebuilds via UTF-8 best
155    /// effort. Prefer [`Self::post_data_entries`] when present.
156    #[serde(rename = "postData", default)]
157    pub(crate) post_data: Option<String>,
158    #[serde(rename = "hasPostData", default)]
159    _has_post_data: Option<bool>,
160    /// Per-chunk base64-encoded bytes. Chrome emits this for binary /
161    /// multipart bodies where the text representation would be lossy.
162    /// When present, it is the canonical source of truth for the body.
163    #[serde(rename = "postDataEntries", default)]
164    pub(crate) post_data_entries: Option<Vec<PostDataEntry>>,
165}
166
167#[derive(Debug, Deserialize)]
168pub(crate) struct PostDataEntry {
169    /// Base64-encoded body bytes. Per CDP `Network.PostDataEntry`.
170    #[serde(default)]
171    pub(crate) bytes: Option<String>,
172}
173
174#[derive(Debug, Deserialize)]
175pub(crate) struct HeaderPair {
176    pub(crate) name: String,
177    pub(crate) value: String,
178}
179
180/// Run the interception actor until `cancel` fires.
181///
182/// See the module-level docs for the lifecycle contract. The function exits
183/// after `Fetch.disable` is dispatched on cancellation, or immediately if
184/// the event stream closes (e.g. transport torn down).
185///
186/// `done` is the oneshot the actor signals on exit so the matching
187/// [`InterceptHandle::stop`] call can synchronize on actor teardown.
188pub(crate) async fn run_actor(
189    session: SessionHandle,
190    rules: Vec<Rule>,
191    patterns: Vec<RequestPattern>,
192    auth: Option<(String, String)>,
193    cancel: CancellationToken,
194    done: oneshot::Sender<()>,
195) {
196    // Step 1: subscribe BEFORE enable (P4 pattern). Events Chrome emits
197    // between our enable round-trip and the subscription registration would
198    // otherwise be lost. Also: the mock test harness never replies to the
199    // synthetic `Fetch.enable` call, so awaiting it first would deadlock the
200    // actor before any subscription existed.
201    let mut paused = session.subscribe::<Value>("Fetch.requestPaused");
202    // When `handleAuthRequests: true`, Chrome additionally emits
203    // `Fetch.authRequired` events for proxy / HTTP basic-auth challenges.
204    // Subscribe up-front for the same race-free reason as `requestPaused`.
205    let mut auth_required = session.subscribe::<Value>("Fetch.authRequired");
206
207    // Step 2: fire-and-forget `Fetch.enable`. Mirrors `InFlightTracker::run`
208    // / `frame::lifecycle::run`: a failed enable surfaces as a `warn!` but
209    // the actor keeps running (no events arrive — interception silently
210    // no-ops — which is the same observable behavior the user gets from
211    // any other torn-down session).
212    let enable_session = session.clone();
213    let enable_patterns: Vec<Value> = patterns.iter().map(serialize_pattern).collect();
214    let handle_auth_requests = auth.is_some();
215    tokio::spawn(async move {
216        if let Err(e) = enable_session
217            .call(
218                "Fetch.enable",
219                json!({
220                    "patterns": enable_patterns,
221                    "handleAuthRequests": handle_auth_requests,
222                }),
223            )
224            .await
225        {
226            warn!(error = %e, "interception: Fetch.enable failed; interception inactive");
227        }
228    });
229
230    // Step 3: event loop.
231    loop {
232        tokio::select! {
233            () = cancel.cancelled() => {
234                trace!("interception: cancellation received, disabling Fetch and exiting");
235                break;
236            }
237            Some(ev_value) = paused.next() => {
238                // Decode to our projection. Chrome may add fields in future
239                // protocol versions — we skip ones we don't understand.
240                let ev: RequestPausedEvent = match serde_json::from_value(ev_value) {
241                    Ok(ev) => ev,
242                    Err(e) => {
243                        warn!(error = %e, "interception: skipping malformed Fetch.requestPaused event");
244                        continue;
245                    }
246                };
247                if let Err(e) = handle_paused(&session, &rules, ev).await {
248                    warn!(error = %e, "interception: handler dispatch failed");
249                }
250            }
251            Some(ev_value) = auth_required.next() => {
252                // `Fetch.authRequired` carries a `requestId` we must echo
253                // back via `Fetch.continueWithAuth`. If `auth` is None the
254                // user didn't ask for auth handling — fall back to
255                // `Default` so Chrome surfaces a normal auth dialog instead
256                // of hanging the pause forever.
257                let Some(request_id) = ev_value
258                    .get("requestId")
259                    .and_then(Value::as_str)
260                    .map(str::to_owned)
261                else {
262                    warn!("interception: Fetch.authRequired without requestId");
263                    continue;
264                };
265                let response = match &auth {
266                    Some((user, pass)) => json!({
267                        "response": "ProvideCredentials",
268                        "username": user,
269                        "password": pass,
270                    }),
271                    None => json!({ "response": "Default" }),
272                };
273                if let Err(e) = session
274                    .call(
275                        "Fetch.continueWithAuth",
276                        json!({
277                            "requestId": request_id,
278                            "authChallengeResponse": response,
279                        }),
280                    )
281                    .await
282                {
283                    warn!(error = %e, "interception: Fetch.continueWithAuth failed");
284                }
285            }
286            else => {
287                // Stream closed (transport gone). Nothing left to observe.
288                trace!("interception: event stream closed, exiting without Fetch.disable");
289                // Skip the disable below — the transport is gone, the call
290                // would fail anyway.
291                let _ = done.send(());
292                return;
293            }
294        }
295    }
296
297    // Step 4: best-effort `Fetch.disable` on shutdown. If it fails (session
298    // already torn down) we log and exit — the handle's caller still gets
299    // the oneshot signal so `stop()` doesn't hang.
300    if let Err(e) = session.call("Fetch.disable", json!({})).await {
301        warn!(error = %e, "interception: Fetch.disable failed during shutdown");
302    }
303    // Signal exit. The receiver may already be gone (handle dropped without
304    // `stop()`), which is fine — the `Drop` path didn't await it.
305    let _ = done.send(());
306}
307
308/// Walk the rule list against `ev.request.url` and dispatch the first match.
309/// No match → plain `Fetch.continueRequest` so Chrome proceeds as if no
310/// interception were registered.
311async fn handle_paused(
312    session: &SessionHandle,
313    rules: &[Rule],
314    ev: RequestPausedEvent,
315) -> Result<(), InterceptionError> {
316    let url = ev.request.url.clone();
317
318    // Find the first rule whose pattern matches. Walk the slice rather than
319    // building an iterator — the rule list is small (typically < 10) and
320    // this keeps the borrow checker quiet without `find` + closure lifetimes.
321    let matched = rules.iter().find(|r| r.matches(&url));
322
323    match matched {
324        Some(Rule::Block { .. }) | Some(Rule::BlockHosts { .. }) => {
325            fail_request(session, &ev.request_id, "BlockedByClient").await
326        }
327        Some(Rule::Redirect { to, .. }) => continue_with_url(session, &ev.request_id, to).await,
328        Some(Rule::Respond {
329            status,
330            headers,
331            body,
332            ..
333        }) => fulfill_request(session, &ev.request_id, *status, headers, body).await,
334        Some(Rule::Modify { modify, .. }) => {
335            let info = build_request_info(&ev);
336            let overrides = modify(&info);
337            continue_with_overrides(session, &ev.request_id, overrides).await
338        }
339        Some(Rule::ModifyResponse { modify, .. }) => match build_response_info(&ev) {
340            Some(info) => {
341                let overrides = modify(&info);
342                continue_response_with_overrides(session, &ev.request_id, overrides).await
343            }
344            None => {
345                // Matched at the Request stage: Chrome has no response yet, so
346                // `Fetch.continueResponse` would be rejected. Let the request
347                // through unchanged so the pause is still released — the
348                // closure runs later if a Response-stage pattern re-pauses it.
349                tracing::debug!(
350                    request_id = %ev.request_id,
351                    url = %url,
352                    "interception: ModifyResponse matched at Request stage; no response yet, passing through"
353                );
354                continue_passthrough(session, &ev.request_id).await
355            }
356        },
357        None => continue_passthrough(session, &ev.request_id).await,
358    }
359}
360
361/// Serialize a [`RequestPattern`] into the JSON shape CDP expects on
362/// `Fetch.enable.patterns[]`. All three fields are optional per CDP.
363pub(crate) fn serialize_pattern(p: &RequestPattern) -> Value {
364    let mut obj = Map::new();
365    if let Some(url) = &p.url_pattern {
366        obj.insert("urlPattern".into(), Value::String(url.clone()));
367    }
368    if let Some(rt) = p.resource_type {
369        obj.insert("resourceType".into(), Value::String(rt.as_cdp_str().into()));
370    }
371    if let Some(stage) = p.request_stage {
372        obj.insert(
373            "requestStage".into(),
374            Value::String(stage.as_cdp_str().into()),
375        );
376    }
377    Value::Object(obj)
378}
379
380/// Build a [`RequestInfo`] from the decoded event for `Modify` closures.
381///
382/// Body precedence: `postDataEntries` (canonical, base64-decoded + concatenated)
383/// when present, else `postData` interpreted as UTF-8 bytes. The string
384/// fallback is necessarily lossy for binary bodies — Chrome only emits
385/// `postDataEntries` when it knows the text form would mangle the bytes.
386///
387/// Headers come from `Network.Request.headers` (CDP object) so we materialize
388/// them as a `Vec<(name, value)>` on the boundary; the upstream HashMap may
389/// have collapsed duplicates already, but for the request side CDP also
390/// pre-merges so this is faithful.
391pub(crate) fn build_request_info(ev: &RequestPausedEvent) -> RequestInfo {
392    RequestInfo {
393        url: ev.request.url.clone(),
394        method: ev.request.method.clone(),
395        headers: ev
396            .request
397            .headers
398            .iter()
399            .map(|(k, v)| (k.clone(), v.clone()))
400            .collect(),
401        post_data: decode_post_data(&ev.request),
402        resource_type: parse_resource_type(ev.resource_type.as_deref()),
403    }
404}
405
406fn decode_post_data(req: &RequestPayload) -> Option<Vec<u8>> {
407    use base64::Engine as _;
408    use base64::engine::general_purpose::STANDARD as BASE64;
409
410    if let Some(entries) = req.post_data_entries.as_ref() {
411        let mut buf = Vec::new();
412        for entry in entries {
413            let Some(b64) = entry.bytes.as_deref() else {
414                continue;
415            };
416            match BASE64.decode(b64) {
417                Ok(bytes) => buf.extend_from_slice(&bytes),
418                Err(e) => {
419                    tracing::warn!(error = %e, "interception: bad base64 in postDataEntries; skipping entry");
420                }
421            }
422        }
423        return Some(buf);
424    }
425    req.post_data.as_deref().map(|s| s.as_bytes().to_vec())
426}
427
428/// Build a [`ResponseInfo`] from the decoded event when Chrome paused at the
429/// `Response` stage. Returns `None` at the `Request` stage (the event
430/// payload's `responseStatusCode` is absent).
431///
432/// Used on both the rule-driven actor path and the
433/// [`crate::builder::InterceptBuilder::subscribe`] stream path.
434pub(crate) fn build_response_info(ev: &RequestPausedEvent) -> Option<ResponseInfo> {
435    let status = ev.response_status_code?;
436    let status_text = ev.response_status_text.clone().unwrap_or_default();
437    let headers: Vec<(String, String)> = ev
438        .response_headers
439        .as_ref()
440        .map(|hs| {
441            hs.iter()
442                .map(|h| (h.name.clone(), h.value.clone()))
443                .collect()
444        })
445        .unwrap_or_default();
446    Some(ResponseInfo {
447        status,
448        status_text,
449        headers,
450    })
451}
452
453/// Serialize a `[(name, value)]` slice into CDP's `[{name, value}]` JSON
454/// array shape used by `Fetch.continueRequest.headers` and
455/// `Fetch.fulfillRequest.responseHeaders`.
456pub(crate) fn headers_to_cdp(headers: &[(String, String)]) -> Vec<Value> {
457    headers
458        .iter()
459        .map(|(name, value)| json!({ "name": name, "value": value }))
460        .collect()
461}
462
463/// Best-effort parse of a CDP `Network.ResourceType` string into our enum.
464/// Defaults to [`ResourceType::Other`] for unknown strings rather than
465/// failing the whole event — Chrome occasionally adds new types we don't
466/// know about yet, and dropping a real intercepted request for that would
467/// be a worse failure mode than reporting `Other`.
468fn parse_resource_type(s: Option<&str>) -> ResourceType {
469    match s.unwrap_or("Other") {
470        "Document" => ResourceType::Document,
471        "Stylesheet" => ResourceType::Stylesheet,
472        "Image" => ResourceType::Image,
473        "Media" => ResourceType::Media,
474        "Font" => ResourceType::Font,
475        "Script" => ResourceType::Script,
476        "TextTrack" => ResourceType::TextTrack,
477        "XHR" => ResourceType::XHR,
478        "Fetch" => ResourceType::Fetch,
479        "EventSource" => ResourceType::EventSource,
480        "WebSocket" => ResourceType::WebSocket,
481        "Manifest" => ResourceType::Manifest,
482        "SignedExchange" => ResourceType::SignedExchange,
483        "Ping" => ResourceType::Ping,
484        "CSPViolationReport" => ResourceType::CSPViolationReport,
485        "Preflight" => ResourceType::Preflight,
486        _ => ResourceType::Other,
487    }
488}
489
490// --- CDP dispatch helpers --------------------------------------------------
491
492async fn fail_request(
493    session: &SessionHandle,
494    request_id: &str,
495    error_reason: &str,
496) -> Result<(), InterceptionError> {
497    session
498        .call(
499            "Fetch.failRequest",
500            json!({
501                "requestId": request_id,
502                "errorReason": error_reason,
503            }),
504        )
505        .await?;
506    Ok(())
507}
508
509async fn continue_passthrough(
510    session: &SessionHandle,
511    request_id: &str,
512) -> Result<(), InterceptionError> {
513    session
514        .call("Fetch.continueRequest", json!({ "requestId": request_id }))
515        .await?;
516    Ok(())
517}
518
519async fn continue_with_url(
520    session: &SessionHandle,
521    request_id: &str,
522    url: &str,
523) -> Result<(), InterceptionError> {
524    session
525        .call(
526            "Fetch.continueRequest",
527            json!({
528                "requestId": request_id,
529                "url": url,
530            }),
531        )
532        .await?;
533    Ok(())
534}
535
536async fn continue_with_overrides(
537    session: &SessionHandle,
538    request_id: &str,
539    overrides: RequestOverrides,
540) -> Result<(), InterceptionError> {
541    let mut params = Map::new();
542    params.insert("requestId".into(), Value::String(request_id.into()));
543    if let Some(url) = overrides.url {
544        params.insert("url".into(), Value::String(url));
545    }
546    if let Some(method) = overrides.method {
547        params.insert("method".into(), Value::String(method));
548    }
549    if let Some(headers) = overrides.headers {
550        params.insert("headers".into(), Value::Array(headers_to_cdp(&headers)));
551    }
552    if let Some(post_data) = overrides.post_data {
553        params.insert("postData".into(), Value::String(BASE64.encode(&post_data)));
554    }
555    session
556        .call("Fetch.continueRequest", Value::Object(params))
557        .await?;
558    Ok(())
559}
560
561async fn fulfill_request(
562    session: &SessionHandle,
563    request_id: &str,
564    status: u16,
565    headers: &[(String, String)],
566    body: &[u8],
567) -> Result<(), InterceptionError> {
568    let response_headers = headers_to_cdp(headers);
569    session
570        .call(
571            "Fetch.fulfillRequest",
572            json!({
573                "requestId": request_id,
574                "responseCode": status,
575                "responseHeaders": response_headers,
576                "body": BASE64.encode(body),
577            }),
578        )
579        .await?;
580    Ok(())
581}
582
583/// Dispatch `Fetch.continueResponse` with the closure-produced overrides for a
584/// [`Rule::ModifyResponse`](crate::rule::Rule::ModifyResponse) match. Mirrors
585/// [`PausedRequest::continue_response`](crate::PausedRequest::continue_response):
586/// `None` fields are omitted so Chrome keeps its originals, and
587/// `responseHeaders` is *replacement* per CDP. The caller guarantees the
588/// `Response` stage (we only reach here after `build_response_info` succeeded).
589async fn continue_response_with_overrides(
590    session: &SessionHandle,
591    request_id: &str,
592    overrides: ResponseOverrides,
593) -> Result<(), InterceptionError> {
594    let mut params = Map::new();
595    params.insert("requestId".into(), Value::String(request_id.into()));
596    if let Some(status) = overrides.status {
597        params.insert("responseCode".into(), Value::from(status));
598    }
599    if let Some(phrase) = overrides.phrase {
600        params.insert("responsePhrase".into(), Value::String(phrase));
601    }
602    if let Some(headers) = overrides.headers {
603        params.insert(
604            "responseHeaders".into(),
605            Value::Array(headers_to_cdp(&headers)),
606        );
607    }
608    session
609        .call("Fetch.continueResponse", Value::Object(params))
610        .await?;
611    Ok(())
612}
613
614#[cfg(test)]
615#[allow(clippy::panic, clippy::unwrap_used)]
616mod tests {
617    use super::*;
618    use crate::url_pattern::UrlPattern;
619    use std::time::Duration;
620    use zendriver_transport::testing::MockConnection;
621
622    /// End-to-end mock drive of the rule-based actor:
623    ///   1. Spawn `run_actor` with a single Block rule for `*/blocked/*`.
624    ///   2. Expect the fire-and-forget `Fetch.enable` and reply.
625    ///   3. Emit a matching `Fetch.requestPaused` event.
626    ///   4. Assert the actor dispatches `Fetch.failRequest` with
627    ///      `errorReason = BlockedByClient`.
628    ///   5. Cancel + expect `Fetch.disable` (RAII teardown contract).
629    #[tokio::test]
630    async fn block_rule_dispatches_fail_request_with_blocked_by_client() {
631        let (mut mock, conn) = MockConnection::pair();
632        let sess = SessionHandle::new(conn.clone(), "S1");
633
634        let rules = vec![Rule::Block {
635            pattern: UrlPattern::new("*/blocked/*").unwrap(),
636        }];
637        let patterns = vec![RequestPattern {
638            url_pattern: Some("*".into()),
639            ..RequestPattern::default()
640        }];
641        let cancel = CancellationToken::new();
642        let (done_tx, done_rx) = oneshot::channel();
643        let actor_cancel = cancel.clone();
644        let actor = tokio::spawn(async move {
645            run_actor(sess, rules, patterns, None, actor_cancel, done_tx).await;
646        });
647
648        // Step 1: the actor fires `Fetch.enable` in a side-task. The mock
649        // never replies to the call (per the P4 pattern — InFlightTracker /
650        // frame::lifecycle do the same); we just observe it landed so the
651        // subsequent `emit_event_for_session` runs after the subscription
652        // is in place.
653        let enable_id =
654            tokio::time::timeout(Duration::from_secs(2), mock.expect_cmd("Fetch.enable"))
655                .await
656                .expect("actor did not send Fetch.enable within 2s");
657        let enable_params = mock.last_sent()["params"].clone();
658        assert_eq!(enable_params["handleAuthRequests"], false);
659        assert_eq!(enable_params["patterns"][0]["urlPattern"], "*");
660        // Reply so the side-task completes cleanly (not strictly required —
661        // the mock harness usually doesn't — but it keeps the warn! quiet).
662        mock.reply(enable_id, json!({})).await;
663
664        // Step 2: emit a `Fetch.requestPaused` event whose URL matches the
665        // Block rule. The actor should dispatch `Fetch.failRequest`.
666        mock.emit_event_for_session(
667            "Fetch.requestPaused",
668            json!({
669                "requestId": "REQ-1",
670                "request": {
671                    "url": "https://example.test/blocked/banner.png",
672                    "method": "GET",
673                    "headers": {},
674                },
675                "resourceType": "Image",
676            }),
677            "S1",
678        )
679        .await;
680
681        // Step 3: expect the fail_request dispatch.
682        let fail_id =
683            tokio::time::timeout(Duration::from_secs(2), mock.expect_cmd("Fetch.failRequest"))
684                .await
685                .expect("actor did not send Fetch.failRequest within 2s");
686        let fail_params = mock.last_sent()["params"].clone();
687        assert_eq!(fail_params["requestId"], "REQ-1");
688        assert_eq!(fail_params["errorReason"], "BlockedByClient");
689        mock.reply(fail_id, json!({})).await;
690
691        // Step 4: cancel the actor + verify it dispatches `Fetch.disable`
692        // on shutdown and signals exit through the oneshot.
693        cancel.cancel();
694        let disable_id =
695            tokio::time::timeout(Duration::from_secs(2), mock.expect_cmd("Fetch.disable"))
696                .await
697                .expect("actor did not send Fetch.disable on cancel");
698        mock.reply(disable_id, json!({})).await;
699
700        tokio::time::timeout(Duration::from_secs(2), done_rx)
701            .await
702            .expect("actor did not signal exit within 2s")
703            .expect("oneshot sender dropped without sending");
704        actor.await.unwrap();
705        conn.shutdown();
706    }
707
708    /// Same end-to-end drive as the `Block` test, but the rule is a
709    /// `BlockHosts` matcher and the request matches by host (subdomain walk).
710    #[tokio::test]
711    async fn block_hosts_rule_dispatches_fail_request() {
712        use crate::host_matcher::HostMatcher;
713        let (mut mock, conn) = MockConnection::pair();
714        let sess = SessionHandle::new(conn.clone(), "S1");
715
716        let rules = vec![Rule::BlockHosts {
717            matcher: std::sync::Arc::new(HostMatcher::new(["evil.com".to_string()])),
718        }];
719        let patterns = vec![RequestPattern {
720            url_pattern: Some("*".into()),
721            ..RequestPattern::default()
722        }];
723        let cancel = CancellationToken::new();
724        let (done_tx, done_rx) = oneshot::channel();
725        let actor_cancel = cancel.clone();
726        let actor = tokio::spawn(async move {
727            run_actor(sess, rules, patterns, None, actor_cancel, done_tx).await;
728        });
729
730        let enable_id =
731            tokio::time::timeout(Duration::from_secs(2), mock.expect_cmd("Fetch.enable"))
732                .await
733                .expect("actor did not send Fetch.enable within 2s");
734        mock.reply(enable_id, json!({})).await;
735
736        // Subdomain of a listed host -> must be failed.
737        mock.emit_event_for_session(
738            "Fetch.requestPaused",
739            json!({
740                "requestId": "REQ-1",
741                "request": {
742                    "url": "https://cdn.evil.com/fp.js",
743                    "method": "GET",
744                    "headers": {},
745                },
746                "resourceType": "Script",
747            }),
748            "S1",
749        )
750        .await;
751
752        let fail_id =
753            tokio::time::timeout(Duration::from_secs(2), mock.expect_cmd("Fetch.failRequest"))
754                .await
755                .expect("actor did not send Fetch.failRequest within 2s");
756        let fail_params = mock.last_sent()["params"].clone();
757        assert_eq!(fail_params["requestId"], "REQ-1");
758        assert_eq!(fail_params["errorReason"], "BlockedByClient");
759        mock.reply(fail_id, json!({})).await;
760
761        cancel.cancel();
762        let disable_id =
763            tokio::time::timeout(Duration::from_secs(2), mock.expect_cmd("Fetch.disable"))
764                .await
765                .expect("actor did not send Fetch.disable on cancel");
766        mock.reply(disable_id, json!({})).await;
767
768        tokio::time::timeout(Duration::from_secs(2), done_rx)
769            .await
770            .expect("actor did not signal exit within 2s")
771            .expect("oneshot sender dropped without sending");
772        actor.await.unwrap();
773        conn.shutdown();
774    }
775
776    #[tokio::test]
777    async fn actor_handles_auth_required_with_credentials() {
778        // cdpdriver/zendriver#208: proxy / HTTP basic-auth support. When the
779        // builder is configured with `handle_auth(user, pass)`, the actor
780        // must (a) send `Fetch.enable { handleAuthRequests: true }` and
781        // (b) respond to each `Fetch.authRequired` event with
782        // `Fetch.continueWithAuth { authChallengeResponse:
783        // ProvideCredentials + user/pass }`.
784        let (mut mock, conn) = MockConnection::pair();
785        let sess = SessionHandle::new(conn.clone(), "S1");
786        let cancel = CancellationToken::new();
787        let (done_tx, done_rx) = oneshot::channel();
788        let actor_cancel = cancel.clone();
789        let auth = Some(("user1".to_string(), "pass1".to_string()));
790        let actor = tokio::spawn(async move {
791            run_actor(
792                sess,
793                Vec::new(),
794                vec![RequestPattern {
795                    url_pattern: Some("*".into()),
796                    ..RequestPattern::default()
797                }],
798                auth,
799                actor_cancel,
800                done_tx,
801            )
802            .await;
803        });
804
805        let enable_id =
806            tokio::time::timeout(Duration::from_secs(2), mock.expect_cmd("Fetch.enable"))
807                .await
808                .expect("actor did not send Fetch.enable within 2s");
809        assert_eq!(
810            mock.last_sent()["params"]["handleAuthRequests"],
811            true,
812            "auth-enabled actor must flip handleAuthRequests"
813        );
814        mock.reply(enable_id, json!({})).await;
815
816        mock.emit_event_for_session(
817            "Fetch.authRequired",
818            json!({
819                "requestId": "AUTH-REQ-1",
820                "request": { "url": "https://example.test/", "method": "GET" },
821                "frameId": "F1",
822                "resourceType": "Document",
823                "authChallenge": {
824                    "source": "Proxy",
825                    "origin": "http://proxy.test",
826                    "scheme": "basic",
827                    "realm": "",
828                },
829            }),
830            "S1",
831        )
832        .await;
833
834        let auth_id = tokio::time::timeout(
835            Duration::from_secs(2),
836            mock.expect_cmd("Fetch.continueWithAuth"),
837        )
838        .await
839        .expect("actor did not send Fetch.continueWithAuth within 2s");
840        let params = mock.last_sent()["params"].clone();
841        assert_eq!(params["requestId"], "AUTH-REQ-1");
842        assert_eq!(
843            params["authChallengeResponse"]["response"],
844            "ProvideCredentials"
845        );
846        assert_eq!(params["authChallengeResponse"]["username"], "user1");
847        assert_eq!(params["authChallengeResponse"]["password"], "pass1");
848        mock.reply(auth_id, json!({})).await;
849
850        cancel.cancel();
851        let disable_id =
852            tokio::time::timeout(Duration::from_secs(2), mock.expect_cmd("Fetch.disable"))
853                .await
854                .expect("actor did not send Fetch.disable on cancel");
855        mock.reply(disable_id, json!({})).await;
856        tokio::time::timeout(Duration::from_secs(2), done_rx)
857            .await
858            .expect("actor did not signal exit")
859            .expect("oneshot sender dropped");
860        actor.await.unwrap();
861        conn.shutdown();
862    }
863
864    #[tokio::test]
865    async fn actor_without_auth_responds_default_to_auth_required() {
866        // Defensive: even when the builder did NOT configure auth, an
867        // `authRequired` event must be released (Default response) so Chrome
868        // doesn't hang. handleAuthRequests stays false so this path only
869        // triggers if the server pushed a stray event we didn't ask for —
870        // exercising it confirms the actor degrades gracefully.
871        let (mut mock, conn) = MockConnection::pair();
872        let sess = SessionHandle::new(conn.clone(), "S2");
873        let cancel = CancellationToken::new();
874        let (done_tx, done_rx) = oneshot::channel();
875        let actor_cancel = cancel.clone();
876        let actor = tokio::spawn(async move {
877            run_actor(
878                sess,
879                Vec::new(),
880                vec![RequestPattern {
881                    url_pattern: Some("*".into()),
882                    ..RequestPattern::default()
883                }],
884                None,
885                actor_cancel,
886                done_tx,
887            )
888            .await;
889        });
890
891        let enable_id =
892            tokio::time::timeout(Duration::from_secs(2), mock.expect_cmd("Fetch.enable"))
893                .await
894                .expect("actor did not send Fetch.enable");
895        assert_eq!(mock.last_sent()["params"]["handleAuthRequests"], false);
896        mock.reply(enable_id, json!({})).await;
897
898        mock.emit_event_for_session(
899            "Fetch.authRequired",
900            json!({ "requestId": "AUTH-REQ-2" }),
901            "S2",
902        )
903        .await;
904
905        let auth_id = tokio::time::timeout(
906            Duration::from_secs(2),
907            mock.expect_cmd("Fetch.continueWithAuth"),
908        )
909        .await
910        .expect("actor did not respond to stray authRequired");
911        assert_eq!(
912            mock.last_sent()["params"]["authChallengeResponse"]["response"],
913            "Default"
914        );
915        mock.reply(auth_id, json!({})).await;
916
917        cancel.cancel();
918        let disable_id =
919            tokio::time::timeout(Duration::from_secs(2), mock.expect_cmd("Fetch.disable"))
920                .await
921                .expect("actor did not send Fetch.disable");
922        mock.reply(disable_id, json!({})).await;
923        tokio::time::timeout(Duration::from_secs(2), done_rx)
924            .await
925            .expect("actor did not exit")
926            .expect("oneshot dropped");
927        actor.await.unwrap();
928        conn.shutdown();
929    }
930}