Skip to main content

zendriver_interception/
actor.rs

1//! Background interception actor.
2//!
3//! The crate-private `run_actor` is the rule-driven loop spawned by
4//! [`InterceptBuilder::start`](crate::builder::InterceptBuilder::start). It
5//! owns a single tab's `Fetch.*` interception lifecycle:
6//!
7//! 1. Subscribes to `Fetch.requestPaused` on the supplied [`SessionHandle`]
8//!    **before** firing `Fetch.enable`. Mirrors the subscriber pattern used
9//!    by the zendriver core's frame-lifecycle and network-idle trackers —
10//!    events Chrome fires between the enable round-trip and our subscription
11//!    would otherwise be dropped, and the `MockConnection` test harness in
12//!    `zendriver-transport` (gated `feature = "testing"`) never replies to
13//!    fire-and-forget enables anyway.
14//! 2. Sends `Fetch.enable { patterns, handleAuthRequests }` with the
15//!    explicit pattern list supplied by the builder; `handleAuthRequests`
16//!    flips to `true` when the builder also called
17//!    [`InterceptBuilder::handle_auth`](crate::builder::InterceptBuilder::handle_auth)
18//!    so Chrome surfaces `Fetch.authRequired` events the actor answers with
19//!    `Fetch.continueWithAuth`.
20//! 3. Per `Fetch.requestPaused` event: walks `rules` in registration order,
21//!    first match wins, dispatches the matching action's CDP call. No
22//!    match → plain `Fetch.continueRequest` (let through).
23//! 4. On cancellation: fires `Fetch.disable` and exits. The handle returned
24//!    by the builder owns a [`CancellationToken`] that fires on `Drop`, so
25//!    interception always tears down deterministically when the handle
26//!    leaves scope.
27//!
28//! [`InterceptHandle`] is the user-facing RAII guard. Its [`stop`] method is
29//! the explicit-shutdown path — it cancels the token *and* awaits a oneshot
30//! the actor signals on exit, so the caller observes `Fetch.disable` has
31//! reached the wire before `stop()` returns.
32//!
33//! [`stop`]: InterceptHandle::stop
34
35use base64::Engine as _;
36use base64::engine::general_purpose::STANDARD as BASE64;
37use futures::StreamExt;
38use serde::Deserialize;
39use serde_json::{Map, Value, json};
40use std::collections::HashMap;
41use tokio::sync::oneshot;
42use tokio_util::sync::CancellationToken;
43use tracing::{trace, warn};
44use zendriver_transport::SessionHandle;
45
46use crate::builder::RequestPattern;
47use crate::error::InterceptionError;
48use crate::rule::Rule;
49use crate::types::{RequestInfo, RequestOverrides, ResourceType, ResponseInfo};
50
51/// RAII guard returned by `InterceptBuilder::start` (Task 7).
52///
53/// The guard cancels the actor on [`Drop`] so interception always tears down
54/// when the handle leaves scope. Call [`stop`](Self::stop) instead when the
55/// caller needs to observe `Fetch.disable` reaching the wire before
56/// proceeding — `Drop` is fire-and-forget by construction.
57#[derive(Debug)]
58#[must_use = "interception stops when the handle is dropped — bind it to a variable to keep it alive"]
59pub struct InterceptHandle {
60    cancel: CancellationToken,
61    // `Option` so `stop(self)` can `.take()` the receiver without `Drop`
62    // racing on a half-moved field. `None` after `stop()` consumed it.
63    done: Option<oneshot::Receiver<()>>,
64}
65
66impl InterceptHandle {
67    /// Construct a handle from the cancel token + actor-exit receiver. The
68    /// constructor is `pub(crate)` so the only public path is via
69    /// [`InterceptBuilder::start`](crate::builder::InterceptBuilder::start).
70    pub(crate) fn new(cancel: CancellationToken, done: oneshot::Receiver<()>) -> Self {
71        Self {
72            cancel,
73            done: Some(done),
74        }
75    }
76
77    /// Stop the actor and wait for it to acknowledge exit.
78    ///
79    /// Cancels the actor's token, then awaits the oneshot the actor sends
80    /// after `Fetch.disable` reaches the wire. Returns
81    /// [`InterceptionError::SubscriptionClosed`] if the actor was already
82    /// gone (channel closed without a signal — e.g. transport torn down
83    /// mid-flight); callers can usually treat that as success since the
84    /// effect (interception is off) is identical.
85    pub async fn stop(mut self) -> Result<(), InterceptionError> {
86        self.cancel.cancel();
87        match self.done.take() {
88            Some(rx) => rx.await.map_err(|_| InterceptionError::SubscriptionClosed),
89            None => Ok(()),
90        }
91    }
92}
93
94impl Drop for InterceptHandle {
95    fn drop(&mut self) {
96        // Fire-and-forget on drop: cancel the actor's token. The actor's
97        // own `Fetch.disable` call will race the transport teardown, but
98        // since `Fetch.disable` is harmless when the session is already
99        // closing we don't try to await anything here.
100        self.cancel.cancel();
101    }
102}
103
104/// Decoded `Fetch.requestPaused` event payload.
105///
106/// Projects only the fields the actor consumes. Extra fields Chrome sends
107/// (e.g. `frameId`, `networkId`) are deliberately ignored — the rule API
108/// surfaces what callers asked for via [`RequestInfo`] / [`ResponseInfo`].
109///
110/// `pub(crate)` so [`crate::builder::InterceptBuilder::subscribe`] can reuse
111/// the same projection on the stream path.
112#[derive(Debug, Deserialize)]
113pub(crate) struct RequestPausedEvent {
114    #[serde(rename = "requestId")]
115    pub(crate) request_id: String,
116    pub(crate) request: RequestPayload,
117    #[serde(rename = "resourceType", default)]
118    pub(crate) resource_type: Option<String>,
119    // Only populated at the `Response` stage.
120    #[serde(rename = "responseStatusCode", default)]
121    pub(crate) response_status_code: Option<u16>,
122    #[serde(rename = "responseStatusText", default)]
123    pub(crate) response_status_text: Option<String>,
124    #[serde(rename = "responseHeaders", default)]
125    pub(crate) response_headers: Option<Vec<HeaderPair>>,
126}
127
128#[derive(Debug, Deserialize)]
129pub(crate) struct RequestPayload {
130    pub(crate) url: String,
131    pub(crate) method: String,
132    #[serde(default)]
133    pub(crate) headers: HashMap<String, String>,
134    /// Chrome's text representation of the request body. For multipart /
135    /// binary uploads this can be lossy — Chrome rebuilds via UTF-8 best
136    /// effort. Prefer [`Self::post_data_entries`] when present.
137    #[serde(rename = "postData", default)]
138    pub(crate) post_data: Option<String>,
139    #[serde(rename = "hasPostData", default)]
140    _has_post_data: Option<bool>,
141    /// Per-chunk base64-encoded bytes. Chrome emits this for binary /
142    /// multipart bodies where the text representation would be lossy.
143    /// When present, it is the canonical source of truth for the body.
144    #[serde(rename = "postDataEntries", default)]
145    pub(crate) post_data_entries: Option<Vec<PostDataEntry>>,
146}
147
148#[derive(Debug, Deserialize)]
149pub(crate) struct PostDataEntry {
150    /// Base64-encoded body bytes. Per CDP `Network.PostDataEntry`.
151    #[serde(default)]
152    pub(crate) bytes: Option<String>,
153}
154
155#[derive(Debug, Deserialize)]
156pub(crate) struct HeaderPair {
157    pub(crate) name: String,
158    pub(crate) value: String,
159}
160
161/// Run the interception actor until `cancel` fires.
162///
163/// See the module-level docs for the lifecycle contract. The function exits
164/// after `Fetch.disable` is dispatched on cancellation, or immediately if
165/// the event stream closes (e.g. transport torn down).
166///
167/// `done` is the oneshot the actor signals on exit so the matching
168/// [`InterceptHandle::stop`] call can synchronize on actor teardown.
169pub(crate) async fn run_actor(
170    session: SessionHandle,
171    rules: Vec<Rule>,
172    patterns: Vec<RequestPattern>,
173    auth: Option<(String, String)>,
174    cancel: CancellationToken,
175    done: oneshot::Sender<()>,
176) {
177    // Step 1: subscribe BEFORE enable (P4 pattern). Events Chrome emits
178    // between our enable round-trip and the subscription registration would
179    // otherwise be lost. Also: the mock test harness never replies to the
180    // synthetic `Fetch.enable` call, so awaiting it first would deadlock the
181    // actor before any subscription existed.
182    let mut paused = session.subscribe::<Value>("Fetch.requestPaused");
183    // When `handleAuthRequests: true`, Chrome additionally emits
184    // `Fetch.authRequired` events for proxy / HTTP basic-auth challenges.
185    // Subscribe up-front for the same race-free reason as `requestPaused`.
186    let mut auth_required = session.subscribe::<Value>("Fetch.authRequired");
187
188    // Step 2: fire-and-forget `Fetch.enable`. Mirrors `InFlightTracker::run`
189    // / `frame::lifecycle::run`: a failed enable surfaces as a `warn!` but
190    // the actor keeps running (no events arrive — interception silently
191    // no-ops — which is the same observable behavior the user gets from
192    // any other torn-down session).
193    let enable_session = session.clone();
194    let enable_patterns: Vec<Value> = patterns.iter().map(serialize_pattern).collect();
195    let handle_auth_requests = auth.is_some();
196    tokio::spawn(async move {
197        if let Err(e) = enable_session
198            .call(
199                "Fetch.enable",
200                json!({
201                    "patterns": enable_patterns,
202                    "handleAuthRequests": handle_auth_requests,
203                }),
204            )
205            .await
206        {
207            warn!(error = %e, "interception: Fetch.enable failed; interception inactive");
208        }
209    });
210
211    // Step 3: event loop.
212    loop {
213        tokio::select! {
214            () = cancel.cancelled() => {
215                trace!("interception: cancellation received, disabling Fetch and exiting");
216                break;
217            }
218            Some(ev_value) = paused.next() => {
219                // Decode to our projection. Chrome may add fields in future
220                // protocol versions — we skip ones we don't understand.
221                let ev: RequestPausedEvent = match serde_json::from_value(ev_value) {
222                    Ok(ev) => ev,
223                    Err(e) => {
224                        warn!(error = %e, "interception: skipping malformed Fetch.requestPaused event");
225                        continue;
226                    }
227                };
228                if let Err(e) = handle_paused(&session, &rules, ev).await {
229                    warn!(error = %e, "interception: handler dispatch failed");
230                }
231            }
232            Some(ev_value) = auth_required.next() => {
233                // `Fetch.authRequired` carries a `requestId` we must echo
234                // back via `Fetch.continueWithAuth`. If `auth` is None the
235                // user didn't ask for auth handling — fall back to
236                // `Default` so Chrome surfaces a normal auth dialog instead
237                // of hanging the pause forever.
238                let Some(request_id) = ev_value
239                    .get("requestId")
240                    .and_then(Value::as_str)
241                    .map(str::to_owned)
242                else {
243                    warn!("interception: Fetch.authRequired without requestId");
244                    continue;
245                };
246                let response = match &auth {
247                    Some((user, pass)) => json!({
248                        "response": "ProvideCredentials",
249                        "username": user,
250                        "password": pass,
251                    }),
252                    None => json!({ "response": "Default" }),
253                };
254                if let Err(e) = session
255                    .call(
256                        "Fetch.continueWithAuth",
257                        json!({
258                            "requestId": request_id,
259                            "authChallengeResponse": response,
260                        }),
261                    )
262                    .await
263                {
264                    warn!(error = %e, "interception: Fetch.continueWithAuth failed");
265                }
266            }
267            else => {
268                // Stream closed (transport gone). Nothing left to observe.
269                trace!("interception: event stream closed, exiting without Fetch.disable");
270                // Skip the disable below — the transport is gone, the call
271                // would fail anyway.
272                let _ = done.send(());
273                return;
274            }
275        }
276    }
277
278    // Step 4: best-effort `Fetch.disable` on shutdown. If it fails (session
279    // already torn down) we log and exit — the handle's caller still gets
280    // the oneshot signal so `stop()` doesn't hang.
281    if let Err(e) = session.call("Fetch.disable", json!({})).await {
282        warn!(error = %e, "interception: Fetch.disable failed during shutdown");
283    }
284    // Signal exit. The receiver may already be gone (handle dropped without
285    // `stop()`), which is fine — the `Drop` path didn't await it.
286    let _ = done.send(());
287}
288
289/// Walk the rule list against `ev.request.url` and dispatch the first match.
290/// No match → plain `Fetch.continueRequest` so Chrome proceeds as if no
291/// interception were registered.
292async fn handle_paused(
293    session: &SessionHandle,
294    rules: &[Rule],
295    ev: RequestPausedEvent,
296) -> Result<(), InterceptionError> {
297    let url = ev.request.url.clone();
298
299    // Find the first rule whose pattern matches. Walk the slice rather than
300    // building an iterator — the rule list is small (typically < 10) and
301    // this keeps the borrow checker quiet without `find` + closure lifetimes.
302    let matched = rules.iter().find(|r| r.matches(&url));
303
304    match matched {
305        Some(Rule::Block { .. }) => fail_request(session, &ev.request_id, "BlockedByClient").await,
306        Some(Rule::Redirect { to, .. }) => continue_with_url(session, &ev.request_id, to).await,
307        Some(Rule::Respond {
308            status,
309            headers,
310            body,
311            ..
312        }) => fulfill_request(session, &ev.request_id, *status, headers, body).await,
313        Some(Rule::Modify { modify, .. }) => {
314            let info = build_request_info(&ev);
315            let overrides = modify(&info);
316            continue_with_overrides(session, &ev.request_id, overrides).await
317        }
318        None => continue_passthrough(session, &ev.request_id).await,
319    }
320}
321
322/// Serialize a [`RequestPattern`] into the JSON shape CDP expects on
323/// `Fetch.enable.patterns[]`. All three fields are optional per CDP.
324pub(crate) fn serialize_pattern(p: &RequestPattern) -> Value {
325    let mut obj = Map::new();
326    if let Some(url) = &p.url_pattern {
327        obj.insert("urlPattern".into(), Value::String(url.clone()));
328    }
329    if let Some(rt) = p.resource_type {
330        obj.insert("resourceType".into(), Value::String(rt.as_cdp_str().into()));
331    }
332    if let Some(stage) = p.request_stage {
333        obj.insert(
334            "requestStage".into(),
335            Value::String(stage.as_cdp_str().into()),
336        );
337    }
338    Value::Object(obj)
339}
340
341/// Build a [`RequestInfo`] from the decoded event for `Modify` closures.
342///
343/// Body precedence: `postDataEntries` (canonical, base64-decoded + concatenated)
344/// when present, else `postData` interpreted as UTF-8 bytes. The string
345/// fallback is necessarily lossy for binary bodies — Chrome only emits
346/// `postDataEntries` when it knows the text form would mangle the bytes.
347///
348/// Headers come from `Network.Request.headers` (CDP object) so we materialize
349/// them as a `Vec<(name, value)>` on the boundary; the upstream HashMap may
350/// have collapsed duplicates already, but for the request side CDP also
351/// pre-merges so this is faithful.
352pub(crate) fn build_request_info(ev: &RequestPausedEvent) -> RequestInfo {
353    RequestInfo {
354        url: ev.request.url.clone(),
355        method: ev.request.method.clone(),
356        headers: ev
357            .request
358            .headers
359            .iter()
360            .map(|(k, v)| (k.clone(), v.clone()))
361            .collect(),
362        post_data: decode_post_data(&ev.request),
363        resource_type: parse_resource_type(ev.resource_type.as_deref()),
364    }
365}
366
367fn decode_post_data(req: &RequestPayload) -> Option<Vec<u8>> {
368    use base64::Engine as _;
369    use base64::engine::general_purpose::STANDARD as BASE64;
370
371    if let Some(entries) = req.post_data_entries.as_ref() {
372        let mut buf = Vec::new();
373        for entry in entries {
374            let Some(b64) = entry.bytes.as_deref() else {
375                continue;
376            };
377            match BASE64.decode(b64) {
378                Ok(bytes) => buf.extend_from_slice(&bytes),
379                Err(e) => {
380                    tracing::warn!(error = %e, "interception: bad base64 in postDataEntries; skipping entry");
381                }
382            }
383        }
384        return Some(buf);
385    }
386    req.post_data.as_deref().map(|s| s.as_bytes().to_vec())
387}
388
389/// Build a [`ResponseInfo`] from the decoded event when Chrome paused at the
390/// `Response` stage. Returns `None` at the `Request` stage (the event
391/// payload's `responseStatusCode` is absent).
392///
393/// Used on both the rule-driven actor path and the
394/// [`crate::builder::InterceptBuilder::subscribe`] stream path.
395pub(crate) fn build_response_info(ev: &RequestPausedEvent) -> Option<ResponseInfo> {
396    let status = ev.response_status_code?;
397    let status_text = ev.response_status_text.clone().unwrap_or_default();
398    let headers: Vec<(String, String)> = ev
399        .response_headers
400        .as_ref()
401        .map(|hs| {
402            hs.iter()
403                .map(|h| (h.name.clone(), h.value.clone()))
404                .collect()
405        })
406        .unwrap_or_default();
407    Some(ResponseInfo {
408        status,
409        status_text,
410        headers,
411    })
412}
413
414/// Serialize a `[(name, value)]` slice into CDP's `[{name, value}]` JSON
415/// array shape used by `Fetch.continueRequest.headers` and
416/// `Fetch.fulfillRequest.responseHeaders`.
417pub(crate) fn headers_to_cdp(headers: &[(String, String)]) -> Vec<Value> {
418    headers
419        .iter()
420        .map(|(name, value)| json!({ "name": name, "value": value }))
421        .collect()
422}
423
424/// Best-effort parse of a CDP `Network.ResourceType` string into our enum.
425/// Defaults to [`ResourceType::Other`] for unknown strings rather than
426/// failing the whole event — Chrome occasionally adds new types we don't
427/// know about yet, and dropping a real intercepted request for that would
428/// be a worse failure mode than reporting `Other`.
429fn parse_resource_type(s: Option<&str>) -> ResourceType {
430    match s.unwrap_or("Other") {
431        "Document" => ResourceType::Document,
432        "Stylesheet" => ResourceType::Stylesheet,
433        "Image" => ResourceType::Image,
434        "Media" => ResourceType::Media,
435        "Font" => ResourceType::Font,
436        "Script" => ResourceType::Script,
437        "TextTrack" => ResourceType::TextTrack,
438        "XHR" => ResourceType::XHR,
439        "Fetch" => ResourceType::Fetch,
440        "EventSource" => ResourceType::EventSource,
441        "WebSocket" => ResourceType::WebSocket,
442        "Manifest" => ResourceType::Manifest,
443        "SignedExchange" => ResourceType::SignedExchange,
444        "Ping" => ResourceType::Ping,
445        "CSPViolationReport" => ResourceType::CSPViolationReport,
446        "Preflight" => ResourceType::Preflight,
447        _ => ResourceType::Other,
448    }
449}
450
451// --- CDP dispatch helpers --------------------------------------------------
452
453async fn fail_request(
454    session: &SessionHandle,
455    request_id: &str,
456    error_reason: &str,
457) -> Result<(), InterceptionError> {
458    session
459        .call(
460            "Fetch.failRequest",
461            json!({
462                "requestId": request_id,
463                "errorReason": error_reason,
464            }),
465        )
466        .await?;
467    Ok(())
468}
469
470async fn continue_passthrough(
471    session: &SessionHandle,
472    request_id: &str,
473) -> Result<(), InterceptionError> {
474    session
475        .call("Fetch.continueRequest", json!({ "requestId": request_id }))
476        .await?;
477    Ok(())
478}
479
480async fn continue_with_url(
481    session: &SessionHandle,
482    request_id: &str,
483    url: &str,
484) -> Result<(), InterceptionError> {
485    session
486        .call(
487            "Fetch.continueRequest",
488            json!({
489                "requestId": request_id,
490                "url": url,
491            }),
492        )
493        .await?;
494    Ok(())
495}
496
497async fn continue_with_overrides(
498    session: &SessionHandle,
499    request_id: &str,
500    overrides: RequestOverrides,
501) -> Result<(), InterceptionError> {
502    let mut params = Map::new();
503    params.insert("requestId".into(), Value::String(request_id.into()));
504    if let Some(url) = overrides.url {
505        params.insert("url".into(), Value::String(url));
506    }
507    if let Some(method) = overrides.method {
508        params.insert("method".into(), Value::String(method));
509    }
510    if let Some(headers) = overrides.headers {
511        params.insert("headers".into(), Value::Array(headers_to_cdp(&headers)));
512    }
513    if let Some(post_data) = overrides.post_data {
514        params.insert("postData".into(), Value::String(BASE64.encode(&post_data)));
515    }
516    session
517        .call("Fetch.continueRequest", Value::Object(params))
518        .await?;
519    Ok(())
520}
521
522async fn fulfill_request(
523    session: &SessionHandle,
524    request_id: &str,
525    status: u16,
526    headers: &[(String, String)],
527    body: &[u8],
528) -> Result<(), InterceptionError> {
529    let response_headers = headers_to_cdp(headers);
530    session
531        .call(
532            "Fetch.fulfillRequest",
533            json!({
534                "requestId": request_id,
535                "responseCode": status,
536                "responseHeaders": response_headers,
537                "body": BASE64.encode(body),
538            }),
539        )
540        .await?;
541    Ok(())
542}
543
544#[cfg(test)]
545#[allow(clippy::panic, clippy::unwrap_used)]
546mod tests {
547    use super::*;
548    use crate::url_pattern::UrlPattern;
549    use std::time::Duration;
550    use zendriver_transport::testing::MockConnection;
551
552    /// End-to-end mock drive of the rule-based actor:
553    ///   1. Spawn `run_actor` with a single Block rule for `*/blocked/*`.
554    ///   2. Expect the fire-and-forget `Fetch.enable` and reply.
555    ///   3. Emit a matching `Fetch.requestPaused` event.
556    ///   4. Assert the actor dispatches `Fetch.failRequest` with
557    ///      `errorReason = BlockedByClient`.
558    ///   5. Cancel + expect `Fetch.disable` (RAII teardown contract).
559    #[tokio::test]
560    async fn block_rule_dispatches_fail_request_with_blocked_by_client() {
561        let (mut mock, conn) = MockConnection::pair();
562        let sess = SessionHandle::new(conn.clone(), "S1");
563
564        let rules = vec![Rule::Block {
565            pattern: UrlPattern::new("*/blocked/*").unwrap(),
566        }];
567        let patterns = vec![RequestPattern {
568            url_pattern: Some("*".into()),
569            ..RequestPattern::default()
570        }];
571        let cancel = CancellationToken::new();
572        let (done_tx, done_rx) = oneshot::channel();
573        let actor_cancel = cancel.clone();
574        let actor = tokio::spawn(async move {
575            run_actor(sess, rules, patterns, None, actor_cancel, done_tx).await;
576        });
577
578        // Step 1: the actor fires `Fetch.enable` in a side-task. The mock
579        // never replies to the call (per the P4 pattern — InFlightTracker /
580        // frame::lifecycle do the same); we just observe it landed so the
581        // subsequent `emit_event_for_session` runs after the subscription
582        // is in place.
583        let enable_id =
584            tokio::time::timeout(Duration::from_secs(2), mock.expect_cmd("Fetch.enable"))
585                .await
586                .expect("actor did not send Fetch.enable within 2s");
587        let enable_params = mock.last_sent()["params"].clone();
588        assert_eq!(enable_params["handleAuthRequests"], false);
589        assert_eq!(enable_params["patterns"][0]["urlPattern"], "*");
590        // Reply so the side-task completes cleanly (not strictly required —
591        // the mock harness usually doesn't — but it keeps the warn! quiet).
592        mock.reply(enable_id, json!({})).await;
593
594        // Step 2: emit a `Fetch.requestPaused` event whose URL matches the
595        // Block rule. The actor should dispatch `Fetch.failRequest`.
596        mock.emit_event_for_session(
597            "Fetch.requestPaused",
598            json!({
599                "requestId": "REQ-1",
600                "request": {
601                    "url": "https://example.test/blocked/banner.png",
602                    "method": "GET",
603                    "headers": {},
604                },
605                "resourceType": "Image",
606            }),
607            "S1",
608        )
609        .await;
610
611        // Step 3: expect the fail_request dispatch.
612        let fail_id =
613            tokio::time::timeout(Duration::from_secs(2), mock.expect_cmd("Fetch.failRequest"))
614                .await
615                .expect("actor did not send Fetch.failRequest within 2s");
616        let fail_params = mock.last_sent()["params"].clone();
617        assert_eq!(fail_params["requestId"], "REQ-1");
618        assert_eq!(fail_params["errorReason"], "BlockedByClient");
619        mock.reply(fail_id, json!({})).await;
620
621        // Step 4: cancel the actor + verify it dispatches `Fetch.disable`
622        // on shutdown and signals exit through the oneshot.
623        cancel.cancel();
624        let disable_id =
625            tokio::time::timeout(Duration::from_secs(2), mock.expect_cmd("Fetch.disable"))
626                .await
627                .expect("actor did not send Fetch.disable on cancel");
628        mock.reply(disable_id, json!({})).await;
629
630        tokio::time::timeout(Duration::from_secs(2), done_rx)
631            .await
632            .expect("actor did not signal exit within 2s")
633            .expect("oneshot sender dropped without sending");
634        actor.await.unwrap();
635        conn.shutdown();
636    }
637
638    #[tokio::test]
639    async fn actor_handles_auth_required_with_credentials() {
640        // cdpdriver/zendriver#208: proxy / HTTP basic-auth support. When the
641        // builder is configured with `handle_auth(user, pass)`, the actor
642        // must (a) send `Fetch.enable { handleAuthRequests: true }` and
643        // (b) respond to each `Fetch.authRequired` event with
644        // `Fetch.continueWithAuth { authChallengeResponse:
645        // ProvideCredentials + user/pass }`.
646        let (mut mock, conn) = MockConnection::pair();
647        let sess = SessionHandle::new(conn.clone(), "S1");
648        let cancel = CancellationToken::new();
649        let (done_tx, done_rx) = oneshot::channel();
650        let actor_cancel = cancel.clone();
651        let auth = Some(("user1".to_string(), "pass1".to_string()));
652        let actor = tokio::spawn(async move {
653            run_actor(
654                sess,
655                Vec::new(),
656                vec![RequestPattern {
657                    url_pattern: Some("*".into()),
658                    ..RequestPattern::default()
659                }],
660                auth,
661                actor_cancel,
662                done_tx,
663            )
664            .await;
665        });
666
667        let enable_id =
668            tokio::time::timeout(Duration::from_secs(2), mock.expect_cmd("Fetch.enable"))
669                .await
670                .expect("actor did not send Fetch.enable within 2s");
671        assert_eq!(
672            mock.last_sent()["params"]["handleAuthRequests"],
673            true,
674            "auth-enabled actor must flip handleAuthRequests"
675        );
676        mock.reply(enable_id, json!({})).await;
677
678        mock.emit_event_for_session(
679            "Fetch.authRequired",
680            json!({
681                "requestId": "AUTH-REQ-1",
682                "request": { "url": "https://example.test/", "method": "GET" },
683                "frameId": "F1",
684                "resourceType": "Document",
685                "authChallenge": {
686                    "source": "Proxy",
687                    "origin": "http://proxy.test",
688                    "scheme": "basic",
689                    "realm": "",
690                },
691            }),
692            "S1",
693        )
694        .await;
695
696        let auth_id = tokio::time::timeout(
697            Duration::from_secs(2),
698            mock.expect_cmd("Fetch.continueWithAuth"),
699        )
700        .await
701        .expect("actor did not send Fetch.continueWithAuth within 2s");
702        let params = mock.last_sent()["params"].clone();
703        assert_eq!(params["requestId"], "AUTH-REQ-1");
704        assert_eq!(
705            params["authChallengeResponse"]["response"],
706            "ProvideCredentials"
707        );
708        assert_eq!(params["authChallengeResponse"]["username"], "user1");
709        assert_eq!(params["authChallengeResponse"]["password"], "pass1");
710        mock.reply(auth_id, json!({})).await;
711
712        cancel.cancel();
713        let disable_id =
714            tokio::time::timeout(Duration::from_secs(2), mock.expect_cmd("Fetch.disable"))
715                .await
716                .expect("actor did not send Fetch.disable on cancel");
717        mock.reply(disable_id, json!({})).await;
718        tokio::time::timeout(Duration::from_secs(2), done_rx)
719            .await
720            .expect("actor did not signal exit")
721            .expect("oneshot sender dropped");
722        actor.await.unwrap();
723        conn.shutdown();
724    }
725
726    #[tokio::test]
727    async fn actor_without_auth_responds_default_to_auth_required() {
728        // Defensive: even when the builder did NOT configure auth, an
729        // `authRequired` event must be released (Default response) so Chrome
730        // doesn't hang. handleAuthRequests stays false so this path only
731        // triggers if the server pushed a stray event we didn't ask for —
732        // exercising it confirms the actor degrades gracefully.
733        let (mut mock, conn) = MockConnection::pair();
734        let sess = SessionHandle::new(conn.clone(), "S2");
735        let cancel = CancellationToken::new();
736        let (done_tx, done_rx) = oneshot::channel();
737        let actor_cancel = cancel.clone();
738        let actor = tokio::spawn(async move {
739            run_actor(
740                sess,
741                Vec::new(),
742                vec![RequestPattern {
743                    url_pattern: Some("*".into()),
744                    ..RequestPattern::default()
745                }],
746                None,
747                actor_cancel,
748                done_tx,
749            )
750            .await;
751        });
752
753        let enable_id =
754            tokio::time::timeout(Duration::from_secs(2), mock.expect_cmd("Fetch.enable"))
755                .await
756                .expect("actor did not send Fetch.enable");
757        assert_eq!(mock.last_sent()["params"]["handleAuthRequests"], false);
758        mock.reply(enable_id, json!({})).await;
759
760        mock.emit_event_for_session(
761            "Fetch.authRequired",
762            json!({ "requestId": "AUTH-REQ-2" }),
763            "S2",
764        )
765        .await;
766
767        let auth_id = tokio::time::timeout(
768            Duration::from_secs(2),
769            mock.expect_cmd("Fetch.continueWithAuth"),
770        )
771        .await
772        .expect("actor did not respond to stray authRequired");
773        assert_eq!(
774            mock.last_sent()["params"]["authChallengeResponse"]["response"],
775            "Default"
776        );
777        mock.reply(auth_id, json!({})).await;
778
779        cancel.cancel();
780        let disable_id =
781            tokio::time::timeout(Duration::from_secs(2), mock.expect_cmd("Fetch.disable"))
782                .await
783                .expect("actor did not send Fetch.disable");
784        mock.reply(disable_id, json!({})).await;
785        tokio::time::timeout(Duration::from_secs(2), done_rx)
786            .await
787            .expect("actor did not exit")
788            .expect("oneshot dropped");
789        actor.await.unwrap();
790        conn.shutdown();
791    }
792}