Skip to main content

zendriver_interception/
actor.rs

1//! Background interception actor.
2//!
3//! The crate-private `run_actor` is the rule-driven loop spawned by
4//! [`InterceptBuilder::start`](crate::builder::InterceptBuilder::start). It
5//! owns a single tab's `Fetch.*` interception lifecycle:
6//!
7//! 1. Subscribes to `Fetch.requestPaused` on the supplied [`SessionHandle`]
8//!    **before** firing `Fetch.enable`. Mirrors the subscriber pattern used
9//!    by the zendriver core's frame-lifecycle and network-idle trackers —
10//!    events Chrome fires between the enable round-trip and our subscription
11//!    would otherwise be dropped, and the `MockConnection` test harness in
12//!    `zendriver-transport` (gated `feature = "testing"`) never replies to
13//!    fire-and-forget enables anyway.
14//! 2. Sends `Fetch.enable { patterns, handleAuthRequests }` with the
15//!    explicit pattern list supplied by the builder; `handleAuthRequests`
16//!    flips to `true` when the builder also called
17//!    [`InterceptBuilder::handle_auth`](crate::builder::InterceptBuilder::handle_auth)
18//!    so Chrome surfaces `Fetch.authRequired` events the actor answers with
19//!    `Fetch.continueWithAuth`.
20//! 3. Per `Fetch.requestPaused` event: walks `rules` in registration order,
21//!    first match wins, dispatches the matching action's CDP call. No
22//!    match → plain `Fetch.continueRequest` (let through).
23//! 4. On cancellation: fires `Fetch.disable` and exits. The handle returned
24//!    by the builder owns a [`CancellationToken`] that fires on `Drop`, so
25//!    interception always tears down deterministically when the handle
26//!    leaves scope.
27//!
28//! [`InterceptHandle`] is the user-facing RAII guard. Its [`stop`] method is
29//! the explicit-shutdown path — it cancels the token *and* awaits a oneshot
30//! the actor signals on exit, so the caller observes `Fetch.disable` has
31//! reached the wire before `stop()` returns.
32//!
33//! [`stop`]: InterceptHandle::stop
34
35use base64::Engine as _;
36use base64::engine::general_purpose::STANDARD as BASE64;
37use futures::StreamExt;
38use serde::Deserialize;
39use serde_json::{Map, Value, json};
40use std::collections::HashMap;
41use tokio::sync::oneshot;
42use tokio_util::sync::CancellationToken;
43use tracing::{trace, warn};
44use zendriver_transport::SessionHandle;
45
46use crate::builder::RequestPattern;
47use crate::error::InterceptionError;
48use crate::rule::Rule;
49use crate::types::{RequestInfo, RequestOverrides, ResourceType, ResponseInfo};
50
51/// RAII guard returned by `InterceptBuilder::start` (Task 7).
52///
53/// The guard cancels the actor on [`Drop`] so interception always tears down
54/// when the handle leaves scope. Call [`stop`](Self::stop) instead when the
55/// caller needs to observe `Fetch.disable` reaching the wire before
56/// proceeding — `Drop` is fire-and-forget by construction.
57#[derive(Debug)]
58#[must_use = "interception stops when the handle is dropped — bind it to a variable to keep it alive"]
59pub struct InterceptHandle {
60    cancel: CancellationToken,
61    // `Option` so `stop(self)` can `.take()` the receiver without `Drop`
62    // racing on a half-moved field. `None` after `stop()` consumed it.
63    done: Option<oneshot::Receiver<()>>,
64}
65
66impl InterceptHandle {
67    /// Construct a handle from the cancel token + actor-exit receiver. The
68    /// constructor is `pub(crate)` so the only public path is via
69    /// [`InterceptBuilder::start`](crate::builder::InterceptBuilder::start).
70    pub(crate) fn new(cancel: CancellationToken, done: oneshot::Receiver<()>) -> Self {
71        Self {
72            cancel,
73            done: Some(done),
74        }
75    }
76
77    /// Test-support constructor: build a no-op handle backed by an unused
78    /// cancel token + a `oneshot::channel`'s receiver whose sender is
79    /// immediately dropped. Intended for downstream unit tests that need
80    /// to populate a registry of `InterceptHandle`s without going through
81    /// the actor pipeline. Dropping the handle still calls `.cancel()`
82    /// on the token (no observable side effect — nothing is listening).
83    ///
84    /// Gated behind the `test-support` feature so production builds don't
85    /// expose it.
86    #[cfg(any(test, feature = "test-support"))]
87    #[doc(hidden)]
88    pub fn for_tests() -> Self {
89        let (_done_tx, done_rx) = oneshot::channel();
90        Self {
91            cancel: CancellationToken::new(),
92            done: Some(done_rx),
93        }
94    }
95
96    /// Stop the actor and wait for it to acknowledge exit.
97    ///
98    /// Cancels the actor's token, then awaits the oneshot the actor sends
99    /// after `Fetch.disable` reaches the wire. Returns
100    /// [`InterceptionError::SubscriptionClosed`] if the actor was already
101    /// gone (channel closed without a signal — e.g. transport torn down
102    /// mid-flight); callers can usually treat that as success since the
103    /// effect (interception is off) is identical.
104    pub async fn stop(mut self) -> Result<(), InterceptionError> {
105        self.cancel.cancel();
106        match self.done.take() {
107            Some(rx) => rx.await.map_err(|_| InterceptionError::SubscriptionClosed),
108            None => Ok(()),
109        }
110    }
111}
112
113impl Drop for InterceptHandle {
114    fn drop(&mut self) {
115        // Fire-and-forget on drop: cancel the actor's token. The actor's
116        // own `Fetch.disable` call will race the transport teardown, but
117        // since `Fetch.disable` is harmless when the session is already
118        // closing we don't try to await anything here.
119        self.cancel.cancel();
120    }
121}
122
123/// Decoded `Fetch.requestPaused` event payload.
124///
125/// Projects only the fields the actor consumes. Extra fields Chrome sends
126/// (e.g. `frameId`, `networkId`) are deliberately ignored — the rule API
127/// surfaces what callers asked for via [`RequestInfo`] / [`ResponseInfo`].
128///
129/// `pub(crate)` so [`crate::builder::InterceptBuilder::subscribe`] can reuse
130/// the same projection on the stream path.
131#[derive(Debug, Deserialize)]
132pub(crate) struct RequestPausedEvent {
133    #[serde(rename = "requestId")]
134    pub(crate) request_id: String,
135    pub(crate) request: RequestPayload,
136    #[serde(rename = "resourceType", default)]
137    pub(crate) resource_type: Option<String>,
138    // Only populated at the `Response` stage.
139    #[serde(rename = "responseStatusCode", default)]
140    pub(crate) response_status_code: Option<u16>,
141    #[serde(rename = "responseStatusText", default)]
142    pub(crate) response_status_text: Option<String>,
143    #[serde(rename = "responseHeaders", default)]
144    pub(crate) response_headers: Option<Vec<HeaderPair>>,
145}
146
147#[derive(Debug, Deserialize)]
148pub(crate) struct RequestPayload {
149    pub(crate) url: String,
150    pub(crate) method: String,
151    #[serde(default)]
152    pub(crate) headers: HashMap<String, String>,
153    /// Chrome's text representation of the request body. For multipart /
154    /// binary uploads this can be lossy — Chrome rebuilds via UTF-8 best
155    /// effort. Prefer [`Self::post_data_entries`] when present.
156    #[serde(rename = "postData", default)]
157    pub(crate) post_data: Option<String>,
158    #[serde(rename = "hasPostData", default)]
159    _has_post_data: Option<bool>,
160    /// Per-chunk base64-encoded bytes. Chrome emits this for binary /
161    /// multipart bodies where the text representation would be lossy.
162    /// When present, it is the canonical source of truth for the body.
163    #[serde(rename = "postDataEntries", default)]
164    pub(crate) post_data_entries: Option<Vec<PostDataEntry>>,
165}
166
167#[derive(Debug, Deserialize)]
168pub(crate) struct PostDataEntry {
169    /// Base64-encoded body bytes. Per CDP `Network.PostDataEntry`.
170    #[serde(default)]
171    pub(crate) bytes: Option<String>,
172}
173
174#[derive(Debug, Deserialize)]
175pub(crate) struct HeaderPair {
176    pub(crate) name: String,
177    pub(crate) value: String,
178}
179
180/// Run the interception actor until `cancel` fires.
181///
182/// See the module-level docs for the lifecycle contract. The function exits
183/// after `Fetch.disable` is dispatched on cancellation, or immediately if
184/// the event stream closes (e.g. transport torn down).
185///
186/// `done` is the oneshot the actor signals on exit so the matching
187/// [`InterceptHandle::stop`] call can synchronize on actor teardown.
188pub(crate) async fn run_actor(
189    session: SessionHandle,
190    rules: Vec<Rule>,
191    patterns: Vec<RequestPattern>,
192    auth: Option<(String, String)>,
193    cancel: CancellationToken,
194    done: oneshot::Sender<()>,
195) {
196    // Step 1: subscribe BEFORE enable (P4 pattern). Events Chrome emits
197    // between our enable round-trip and the subscription registration would
198    // otherwise be lost. Also: the mock test harness never replies to the
199    // synthetic `Fetch.enable` call, so awaiting it first would deadlock the
200    // actor before any subscription existed.
201    let mut paused = session.subscribe::<Value>("Fetch.requestPaused");
202    // When `handleAuthRequests: true`, Chrome additionally emits
203    // `Fetch.authRequired` events for proxy / HTTP basic-auth challenges.
204    // Subscribe up-front for the same race-free reason as `requestPaused`.
205    let mut auth_required = session.subscribe::<Value>("Fetch.authRequired");
206
207    // Step 2: fire-and-forget `Fetch.enable`. Mirrors `InFlightTracker::run`
208    // / `frame::lifecycle::run`: a failed enable surfaces as a `warn!` but
209    // the actor keeps running (no events arrive — interception silently
210    // no-ops — which is the same observable behavior the user gets from
211    // any other torn-down session).
212    let enable_session = session.clone();
213    let enable_patterns: Vec<Value> = patterns.iter().map(serialize_pattern).collect();
214    let handle_auth_requests = auth.is_some();
215    tokio::spawn(async move {
216        if let Err(e) = enable_session
217            .call(
218                "Fetch.enable",
219                json!({
220                    "patterns": enable_patterns,
221                    "handleAuthRequests": handle_auth_requests,
222                }),
223            )
224            .await
225        {
226            warn!(error = %e, "interception: Fetch.enable failed; interception inactive");
227        }
228    });
229
230    // Step 3: event loop.
231    loop {
232        tokio::select! {
233            () = cancel.cancelled() => {
234                trace!("interception: cancellation received, disabling Fetch and exiting");
235                break;
236            }
237            Some(ev_value) = paused.next() => {
238                // Decode to our projection. Chrome may add fields in future
239                // protocol versions — we skip ones we don't understand.
240                let ev: RequestPausedEvent = match serde_json::from_value(ev_value) {
241                    Ok(ev) => ev,
242                    Err(e) => {
243                        warn!(error = %e, "interception: skipping malformed Fetch.requestPaused event");
244                        continue;
245                    }
246                };
247                if let Err(e) = handle_paused(&session, &rules, ev).await {
248                    warn!(error = %e, "interception: handler dispatch failed");
249                }
250            }
251            Some(ev_value) = auth_required.next() => {
252                // `Fetch.authRequired` carries a `requestId` we must echo
253                // back via `Fetch.continueWithAuth`. If `auth` is None the
254                // user didn't ask for auth handling — fall back to
255                // `Default` so Chrome surfaces a normal auth dialog instead
256                // of hanging the pause forever.
257                let Some(request_id) = ev_value
258                    .get("requestId")
259                    .and_then(Value::as_str)
260                    .map(str::to_owned)
261                else {
262                    warn!("interception: Fetch.authRequired without requestId");
263                    continue;
264                };
265                let response = match &auth {
266                    Some((user, pass)) => json!({
267                        "response": "ProvideCredentials",
268                        "username": user,
269                        "password": pass,
270                    }),
271                    None => json!({ "response": "Default" }),
272                };
273                if let Err(e) = session
274                    .call(
275                        "Fetch.continueWithAuth",
276                        json!({
277                            "requestId": request_id,
278                            "authChallengeResponse": response,
279                        }),
280                    )
281                    .await
282                {
283                    warn!(error = %e, "interception: Fetch.continueWithAuth failed");
284                }
285            }
286            else => {
287                // Stream closed (transport gone). Nothing left to observe.
288                trace!("interception: event stream closed, exiting without Fetch.disable");
289                // Skip the disable below — the transport is gone, the call
290                // would fail anyway.
291                let _ = done.send(());
292                return;
293            }
294        }
295    }
296
297    // Step 4: best-effort `Fetch.disable` on shutdown. If it fails (session
298    // already torn down) we log and exit — the handle's caller still gets
299    // the oneshot signal so `stop()` doesn't hang.
300    if let Err(e) = session.call("Fetch.disable", json!({})).await {
301        warn!(error = %e, "interception: Fetch.disable failed during shutdown");
302    }
303    // Signal exit. The receiver may already be gone (handle dropped without
304    // `stop()`), which is fine — the `Drop` path didn't await it.
305    let _ = done.send(());
306}
307
308/// Walk the rule list against `ev.request.url` and dispatch the first match.
309/// No match → plain `Fetch.continueRequest` so Chrome proceeds as if no
310/// interception were registered.
311async fn handle_paused(
312    session: &SessionHandle,
313    rules: &[Rule],
314    ev: RequestPausedEvent,
315) -> Result<(), InterceptionError> {
316    let url = ev.request.url.clone();
317
318    // Find the first rule whose pattern matches. Walk the slice rather than
319    // building an iterator — the rule list is small (typically < 10) and
320    // this keeps the borrow checker quiet without `find` + closure lifetimes.
321    let matched = rules.iter().find(|r| r.matches(&url));
322
323    match matched {
324        Some(Rule::Block { .. }) => fail_request(session, &ev.request_id, "BlockedByClient").await,
325        Some(Rule::Redirect { to, .. }) => continue_with_url(session, &ev.request_id, to).await,
326        Some(Rule::Respond {
327            status,
328            headers,
329            body,
330            ..
331        }) => fulfill_request(session, &ev.request_id, *status, headers, body).await,
332        Some(Rule::Modify { modify, .. }) => {
333            let info = build_request_info(&ev);
334            let overrides = modify(&info);
335            continue_with_overrides(session, &ev.request_id, overrides).await
336        }
337        None => continue_passthrough(session, &ev.request_id).await,
338    }
339}
340
341/// Serialize a [`RequestPattern`] into the JSON shape CDP expects on
342/// `Fetch.enable.patterns[]`. All three fields are optional per CDP.
343pub(crate) fn serialize_pattern(p: &RequestPattern) -> Value {
344    let mut obj = Map::new();
345    if let Some(url) = &p.url_pattern {
346        obj.insert("urlPattern".into(), Value::String(url.clone()));
347    }
348    if let Some(rt) = p.resource_type {
349        obj.insert("resourceType".into(), Value::String(rt.as_cdp_str().into()));
350    }
351    if let Some(stage) = p.request_stage {
352        obj.insert(
353            "requestStage".into(),
354            Value::String(stage.as_cdp_str().into()),
355        );
356    }
357    Value::Object(obj)
358}
359
360/// Build a [`RequestInfo`] from the decoded event for `Modify` closures.
361///
362/// Body precedence: `postDataEntries` (canonical, base64-decoded + concatenated)
363/// when present, else `postData` interpreted as UTF-8 bytes. The string
364/// fallback is necessarily lossy for binary bodies — Chrome only emits
365/// `postDataEntries` when it knows the text form would mangle the bytes.
366///
367/// Headers come from `Network.Request.headers` (CDP object) so we materialize
368/// them as a `Vec<(name, value)>` on the boundary; the upstream HashMap may
369/// have collapsed duplicates already, but for the request side CDP also
370/// pre-merges so this is faithful.
371pub(crate) fn build_request_info(ev: &RequestPausedEvent) -> RequestInfo {
372    RequestInfo {
373        url: ev.request.url.clone(),
374        method: ev.request.method.clone(),
375        headers: ev
376            .request
377            .headers
378            .iter()
379            .map(|(k, v)| (k.clone(), v.clone()))
380            .collect(),
381        post_data: decode_post_data(&ev.request),
382        resource_type: parse_resource_type(ev.resource_type.as_deref()),
383    }
384}
385
386fn decode_post_data(req: &RequestPayload) -> Option<Vec<u8>> {
387    use base64::Engine as _;
388    use base64::engine::general_purpose::STANDARD as BASE64;
389
390    if let Some(entries) = req.post_data_entries.as_ref() {
391        let mut buf = Vec::new();
392        for entry in entries {
393            let Some(b64) = entry.bytes.as_deref() else {
394                continue;
395            };
396            match BASE64.decode(b64) {
397                Ok(bytes) => buf.extend_from_slice(&bytes),
398                Err(e) => {
399                    tracing::warn!(error = %e, "interception: bad base64 in postDataEntries; skipping entry");
400                }
401            }
402        }
403        return Some(buf);
404    }
405    req.post_data.as_deref().map(|s| s.as_bytes().to_vec())
406}
407
408/// Build a [`ResponseInfo`] from the decoded event when Chrome paused at the
409/// `Response` stage. Returns `None` at the `Request` stage (the event
410/// payload's `responseStatusCode` is absent).
411///
412/// Used on both the rule-driven actor path and the
413/// [`crate::builder::InterceptBuilder::subscribe`] stream path.
414pub(crate) fn build_response_info(ev: &RequestPausedEvent) -> Option<ResponseInfo> {
415    let status = ev.response_status_code?;
416    let status_text = ev.response_status_text.clone().unwrap_or_default();
417    let headers: Vec<(String, String)> = ev
418        .response_headers
419        .as_ref()
420        .map(|hs| {
421            hs.iter()
422                .map(|h| (h.name.clone(), h.value.clone()))
423                .collect()
424        })
425        .unwrap_or_default();
426    Some(ResponseInfo {
427        status,
428        status_text,
429        headers,
430    })
431}
432
433/// Serialize a `[(name, value)]` slice into CDP's `[{name, value}]` JSON
434/// array shape used by `Fetch.continueRequest.headers` and
435/// `Fetch.fulfillRequest.responseHeaders`.
436pub(crate) fn headers_to_cdp(headers: &[(String, String)]) -> Vec<Value> {
437    headers
438        .iter()
439        .map(|(name, value)| json!({ "name": name, "value": value }))
440        .collect()
441}
442
443/// Best-effort parse of a CDP `Network.ResourceType` string into our enum.
444/// Defaults to [`ResourceType::Other`] for unknown strings rather than
445/// failing the whole event — Chrome occasionally adds new types we don't
446/// know about yet, and dropping a real intercepted request for that would
447/// be a worse failure mode than reporting `Other`.
448fn parse_resource_type(s: Option<&str>) -> ResourceType {
449    match s.unwrap_or("Other") {
450        "Document" => ResourceType::Document,
451        "Stylesheet" => ResourceType::Stylesheet,
452        "Image" => ResourceType::Image,
453        "Media" => ResourceType::Media,
454        "Font" => ResourceType::Font,
455        "Script" => ResourceType::Script,
456        "TextTrack" => ResourceType::TextTrack,
457        "XHR" => ResourceType::XHR,
458        "Fetch" => ResourceType::Fetch,
459        "EventSource" => ResourceType::EventSource,
460        "WebSocket" => ResourceType::WebSocket,
461        "Manifest" => ResourceType::Manifest,
462        "SignedExchange" => ResourceType::SignedExchange,
463        "Ping" => ResourceType::Ping,
464        "CSPViolationReport" => ResourceType::CSPViolationReport,
465        "Preflight" => ResourceType::Preflight,
466        _ => ResourceType::Other,
467    }
468}
469
470// --- CDP dispatch helpers --------------------------------------------------
471
472async fn fail_request(
473    session: &SessionHandle,
474    request_id: &str,
475    error_reason: &str,
476) -> Result<(), InterceptionError> {
477    session
478        .call(
479            "Fetch.failRequest",
480            json!({
481                "requestId": request_id,
482                "errorReason": error_reason,
483            }),
484        )
485        .await?;
486    Ok(())
487}
488
489async fn continue_passthrough(
490    session: &SessionHandle,
491    request_id: &str,
492) -> Result<(), InterceptionError> {
493    session
494        .call("Fetch.continueRequest", json!({ "requestId": request_id }))
495        .await?;
496    Ok(())
497}
498
499async fn continue_with_url(
500    session: &SessionHandle,
501    request_id: &str,
502    url: &str,
503) -> Result<(), InterceptionError> {
504    session
505        .call(
506            "Fetch.continueRequest",
507            json!({
508                "requestId": request_id,
509                "url": url,
510            }),
511        )
512        .await?;
513    Ok(())
514}
515
516async fn continue_with_overrides(
517    session: &SessionHandle,
518    request_id: &str,
519    overrides: RequestOverrides,
520) -> Result<(), InterceptionError> {
521    let mut params = Map::new();
522    params.insert("requestId".into(), Value::String(request_id.into()));
523    if let Some(url) = overrides.url {
524        params.insert("url".into(), Value::String(url));
525    }
526    if let Some(method) = overrides.method {
527        params.insert("method".into(), Value::String(method));
528    }
529    if let Some(headers) = overrides.headers {
530        params.insert("headers".into(), Value::Array(headers_to_cdp(&headers)));
531    }
532    if let Some(post_data) = overrides.post_data {
533        params.insert("postData".into(), Value::String(BASE64.encode(&post_data)));
534    }
535    session
536        .call("Fetch.continueRequest", Value::Object(params))
537        .await?;
538    Ok(())
539}
540
541async fn fulfill_request(
542    session: &SessionHandle,
543    request_id: &str,
544    status: u16,
545    headers: &[(String, String)],
546    body: &[u8],
547) -> Result<(), InterceptionError> {
548    let response_headers = headers_to_cdp(headers);
549    session
550        .call(
551            "Fetch.fulfillRequest",
552            json!({
553                "requestId": request_id,
554                "responseCode": status,
555                "responseHeaders": response_headers,
556                "body": BASE64.encode(body),
557            }),
558        )
559        .await?;
560    Ok(())
561}
562
563#[cfg(test)]
564#[allow(clippy::panic, clippy::unwrap_used)]
565mod tests {
566    use super::*;
567    use crate::url_pattern::UrlPattern;
568    use std::time::Duration;
569    use zendriver_transport::testing::MockConnection;
570
571    /// End-to-end mock drive of the rule-based actor:
572    ///   1. Spawn `run_actor` with a single Block rule for `*/blocked/*`.
573    ///   2. Expect the fire-and-forget `Fetch.enable` and reply.
574    ///   3. Emit a matching `Fetch.requestPaused` event.
575    ///   4. Assert the actor dispatches `Fetch.failRequest` with
576    ///      `errorReason = BlockedByClient`.
577    ///   5. Cancel + expect `Fetch.disable` (RAII teardown contract).
578    #[tokio::test]
579    async fn block_rule_dispatches_fail_request_with_blocked_by_client() {
580        let (mut mock, conn) = MockConnection::pair();
581        let sess = SessionHandle::new(conn.clone(), "S1");
582
583        let rules = vec![Rule::Block {
584            pattern: UrlPattern::new("*/blocked/*").unwrap(),
585        }];
586        let patterns = vec![RequestPattern {
587            url_pattern: Some("*".into()),
588            ..RequestPattern::default()
589        }];
590        let cancel = CancellationToken::new();
591        let (done_tx, done_rx) = oneshot::channel();
592        let actor_cancel = cancel.clone();
593        let actor = tokio::spawn(async move {
594            run_actor(sess, rules, patterns, None, actor_cancel, done_tx).await;
595        });
596
597        // Step 1: the actor fires `Fetch.enable` in a side-task. The mock
598        // never replies to the call (per the P4 pattern — InFlightTracker /
599        // frame::lifecycle do the same); we just observe it landed so the
600        // subsequent `emit_event_for_session` runs after the subscription
601        // is in place.
602        let enable_id =
603            tokio::time::timeout(Duration::from_secs(2), mock.expect_cmd("Fetch.enable"))
604                .await
605                .expect("actor did not send Fetch.enable within 2s");
606        let enable_params = mock.last_sent()["params"].clone();
607        assert_eq!(enable_params["handleAuthRequests"], false);
608        assert_eq!(enable_params["patterns"][0]["urlPattern"], "*");
609        // Reply so the side-task completes cleanly (not strictly required —
610        // the mock harness usually doesn't — but it keeps the warn! quiet).
611        mock.reply(enable_id, json!({})).await;
612
613        // Step 2: emit a `Fetch.requestPaused` event whose URL matches the
614        // Block rule. The actor should dispatch `Fetch.failRequest`.
615        mock.emit_event_for_session(
616            "Fetch.requestPaused",
617            json!({
618                "requestId": "REQ-1",
619                "request": {
620                    "url": "https://example.test/blocked/banner.png",
621                    "method": "GET",
622                    "headers": {},
623                },
624                "resourceType": "Image",
625            }),
626            "S1",
627        )
628        .await;
629
630        // Step 3: expect the fail_request dispatch.
631        let fail_id =
632            tokio::time::timeout(Duration::from_secs(2), mock.expect_cmd("Fetch.failRequest"))
633                .await
634                .expect("actor did not send Fetch.failRequest within 2s");
635        let fail_params = mock.last_sent()["params"].clone();
636        assert_eq!(fail_params["requestId"], "REQ-1");
637        assert_eq!(fail_params["errorReason"], "BlockedByClient");
638        mock.reply(fail_id, json!({})).await;
639
640        // Step 4: cancel the actor + verify it dispatches `Fetch.disable`
641        // on shutdown and signals exit through the oneshot.
642        cancel.cancel();
643        let disable_id =
644            tokio::time::timeout(Duration::from_secs(2), mock.expect_cmd("Fetch.disable"))
645                .await
646                .expect("actor did not send Fetch.disable on cancel");
647        mock.reply(disable_id, json!({})).await;
648
649        tokio::time::timeout(Duration::from_secs(2), done_rx)
650            .await
651            .expect("actor did not signal exit within 2s")
652            .expect("oneshot sender dropped without sending");
653        actor.await.unwrap();
654        conn.shutdown();
655    }
656
657    #[tokio::test]
658    async fn actor_handles_auth_required_with_credentials() {
659        // cdpdriver/zendriver#208: proxy / HTTP basic-auth support. When the
660        // builder is configured with `handle_auth(user, pass)`, the actor
661        // must (a) send `Fetch.enable { handleAuthRequests: true }` and
662        // (b) respond to each `Fetch.authRequired` event with
663        // `Fetch.continueWithAuth { authChallengeResponse:
664        // ProvideCredentials + user/pass }`.
665        let (mut mock, conn) = MockConnection::pair();
666        let sess = SessionHandle::new(conn.clone(), "S1");
667        let cancel = CancellationToken::new();
668        let (done_tx, done_rx) = oneshot::channel();
669        let actor_cancel = cancel.clone();
670        let auth = Some(("user1".to_string(), "pass1".to_string()));
671        let actor = tokio::spawn(async move {
672            run_actor(
673                sess,
674                Vec::new(),
675                vec![RequestPattern {
676                    url_pattern: Some("*".into()),
677                    ..RequestPattern::default()
678                }],
679                auth,
680                actor_cancel,
681                done_tx,
682            )
683            .await;
684        });
685
686        let enable_id =
687            tokio::time::timeout(Duration::from_secs(2), mock.expect_cmd("Fetch.enable"))
688                .await
689                .expect("actor did not send Fetch.enable within 2s");
690        assert_eq!(
691            mock.last_sent()["params"]["handleAuthRequests"],
692            true,
693            "auth-enabled actor must flip handleAuthRequests"
694        );
695        mock.reply(enable_id, json!({})).await;
696
697        mock.emit_event_for_session(
698            "Fetch.authRequired",
699            json!({
700                "requestId": "AUTH-REQ-1",
701                "request": { "url": "https://example.test/", "method": "GET" },
702                "frameId": "F1",
703                "resourceType": "Document",
704                "authChallenge": {
705                    "source": "Proxy",
706                    "origin": "http://proxy.test",
707                    "scheme": "basic",
708                    "realm": "",
709                },
710            }),
711            "S1",
712        )
713        .await;
714
715        let auth_id = tokio::time::timeout(
716            Duration::from_secs(2),
717            mock.expect_cmd("Fetch.continueWithAuth"),
718        )
719        .await
720        .expect("actor did not send Fetch.continueWithAuth within 2s");
721        let params = mock.last_sent()["params"].clone();
722        assert_eq!(params["requestId"], "AUTH-REQ-1");
723        assert_eq!(
724            params["authChallengeResponse"]["response"],
725            "ProvideCredentials"
726        );
727        assert_eq!(params["authChallengeResponse"]["username"], "user1");
728        assert_eq!(params["authChallengeResponse"]["password"], "pass1");
729        mock.reply(auth_id, json!({})).await;
730
731        cancel.cancel();
732        let disable_id =
733            tokio::time::timeout(Duration::from_secs(2), mock.expect_cmd("Fetch.disable"))
734                .await
735                .expect("actor did not send Fetch.disable on cancel");
736        mock.reply(disable_id, json!({})).await;
737        tokio::time::timeout(Duration::from_secs(2), done_rx)
738            .await
739            .expect("actor did not signal exit")
740            .expect("oneshot sender dropped");
741        actor.await.unwrap();
742        conn.shutdown();
743    }
744
745    #[tokio::test]
746    async fn actor_without_auth_responds_default_to_auth_required() {
747        // Defensive: even when the builder did NOT configure auth, an
748        // `authRequired` event must be released (Default response) so Chrome
749        // doesn't hang. handleAuthRequests stays false so this path only
750        // triggers if the server pushed a stray event we didn't ask for —
751        // exercising it confirms the actor degrades gracefully.
752        let (mut mock, conn) = MockConnection::pair();
753        let sess = SessionHandle::new(conn.clone(), "S2");
754        let cancel = CancellationToken::new();
755        let (done_tx, done_rx) = oneshot::channel();
756        let actor_cancel = cancel.clone();
757        let actor = tokio::spawn(async move {
758            run_actor(
759                sess,
760                Vec::new(),
761                vec![RequestPattern {
762                    url_pattern: Some("*".into()),
763                    ..RequestPattern::default()
764                }],
765                None,
766                actor_cancel,
767                done_tx,
768            )
769            .await;
770        });
771
772        let enable_id =
773            tokio::time::timeout(Duration::from_secs(2), mock.expect_cmd("Fetch.enable"))
774                .await
775                .expect("actor did not send Fetch.enable");
776        assert_eq!(mock.last_sent()["params"]["handleAuthRequests"], false);
777        mock.reply(enable_id, json!({})).await;
778
779        mock.emit_event_for_session(
780            "Fetch.authRequired",
781            json!({ "requestId": "AUTH-REQ-2" }),
782            "S2",
783        )
784        .await;
785
786        let auth_id = tokio::time::timeout(
787            Duration::from_secs(2),
788            mock.expect_cmd("Fetch.continueWithAuth"),
789        )
790        .await
791        .expect("actor did not respond to stray authRequired");
792        assert_eq!(
793            mock.last_sent()["params"]["authChallengeResponse"]["response"],
794            "Default"
795        );
796        mock.reply(auth_id, json!({})).await;
797
798        cancel.cancel();
799        let disable_id =
800            tokio::time::timeout(Duration::from_secs(2), mock.expect_cmd("Fetch.disable"))
801                .await
802                .expect("actor did not send Fetch.disable");
803        mock.reply(disable_id, json!({})).await;
804        tokio::time::timeout(Duration::from_secs(2), done_rx)
805            .await
806            .expect("actor did not exit")
807            .expect("oneshot dropped");
808        actor.await.unwrap();
809        conn.shutdown();
810    }
811}