Skip to main content

chromiumoxide/handler/
mod.rs

1use crate::listeners::{EventListenerRequest, EventListeners};
2use chromiumoxide_cdp::cdp::browser_protocol::browser::*;
3use chromiumoxide_cdp::cdp::browser_protocol::target::*;
4use chromiumoxide_cdp::cdp::events::CdpEvent;
5use chromiumoxide_cdp::cdp::events::CdpEventMessage;
6use chromiumoxide_types::{CallId, Message, Method, Response};
7use chromiumoxide_types::{MethodId, Request as CdpRequest};
8use fnv::FnvHashMap;
9use futures_util::Stream;
10use hashbrown::{HashMap, HashSet};
11use spider_network_blocker::intercept_manager::NetworkInterceptManager;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14use std::time::{Duration, Instant};
15use tokio::sync::mpsc::Receiver;
16use tokio::sync::oneshot::Sender as OneshotSender;
17use tokio_tungstenite::tungstenite::error::ProtocolError;
18use tokio_tungstenite::tungstenite::Error;
19
20use std::sync::Arc;
21use tokio::sync::Notify;
22
23use crate::cmd::{to_command_response, CommandMessage};
24use crate::conn::Connection;
25use crate::error::{CdpError, Result};
26use crate::handler::browser::BrowserContext;
27use crate::handler::frame::FrameRequestedNavigation;
28use crate::handler::frame::{NavigationError, NavigationId, NavigationOk};
29use crate::handler::job::PeriodicJob;
30use crate::handler::session::Session;
31use crate::handler::target::TargetEvent;
32use crate::handler::target::{Target, TargetConfig};
33use crate::handler::viewport::Viewport;
34use crate::page::Page;
35pub(crate) use page::PageInner;
36
37/// Standard timeout in MS
38pub const REQUEST_TIMEOUT: u64 = 30_000;
39
40pub mod blockers;
41pub mod browser;
42pub mod commandfuture;
43pub mod domworld;
44pub mod emulation;
45pub mod frame;
46pub mod http;
47pub mod httpfuture;
48mod job;
49pub mod network;
50pub mod network_utils;
51pub mod page;
52#[cfg(feature = "parallel-handler")]
53pub mod parallel;
54pub mod sender;
55mod session;
56pub mod target;
57pub mod target_message_future;
58pub mod viewport;
59
60/// The handler that monitors the state of the chromium browser and drives all
61/// the requests and events.
62#[must_use = "streams do nothing unless polled"]
63#[derive(Debug)]
64pub struct Handler {
65    pub default_browser_context: BrowserContext,
66    pub browser_contexts: HashSet<BrowserContext>,
67    /// Commands that are being processed and awaiting a response from the
68    /// chromium instance together with the timestamp when the request
69    /// started.
70    pending_commands: FnvHashMap<CallId, (PendingRequest, MethodId, Instant)>,
71    /// Connection to the browser instance
72    from_browser: Receiver<HandlerMessage>,
73    /// Used to loop over all targets in a consistent manner
74    target_ids: Vec<TargetId>,
75    /// The created and attached targets
76    targets: HashMap<TargetId, Target>,
77    /// Currently queued in navigations for targets
78    navigations: FnvHashMap<NavigationId, NavigationRequest>,
79    /// Keeps track of all the current active sessions
80    ///
81    /// There can be multiple sessions per target.
82    sessions: HashMap<SessionId, Session>,
83    /// The websocket connection to the chromium instance.
84    /// `Option` so that `run()` can `.take()` it for splitting.
85    conn: Option<Connection<CdpEventMessage>>,
86    /// Evicts timed out requests periodically
87    evict_command_timeout: PeriodicJob,
88    /// The internal identifier for a specific navigation
89    next_navigation_id: usize,
90    /// How this handler will configure targets etc,
91    config: HandlerConfig,
92    /// All registered event subscriptions
93    event_listeners: EventListeners,
94    /// Keeps track is the browser is closing
95    closing: bool,
96    /// Track the bytes remainder until network request will be blocked.
97    remaining_bytes: Option<u64>,
98    /// The budget is exhausted.
99    budget_exhausted: bool,
100    /// Tracks which targets we've already attached to, to avoid multiple sessions per target.
101    attached_targets: HashSet<TargetId>,
102    /// Optional notify for waking `Handler::run()`'s `tokio::select!` loop
103    /// when a page sends a message.  `None` when using the `Stream` API.
104    page_wake: Option<Arc<Notify>>,
105}
106
107lazy_static::lazy_static! {
108    /// Set the discovery ID target.
109    static ref DISCOVER_ID: (std::borrow::Cow<'static, str>, serde_json::Value) = {
110        let discover = SetDiscoverTargetsParams::new(true);
111        (discover.identifier(), serde_json::to_value(discover).expect("valid discover target params"))
112    };
113    /// Targets params id.
114    static ref TARGET_PARAMS_ID: (std::borrow::Cow<'static, str>, serde_json::Value) = {
115        let msg = GetTargetsParams { filter: None };
116        (msg.identifier(), serde_json::to_value(msg).expect("valid paramtarget"))
117    };
118    /// Set the close targets.
119    static ref CLOSE_PARAMS_ID: (std::borrow::Cow<'static, str>, serde_json::Value) = {
120        let close_msg = CloseParams::default();
121        (close_msg.identifier(), serde_json::to_value(close_msg).expect("valid close params"))
122    };
123}
124
125fn maybe_store_attach_session_id(target: &mut Target, method: &MethodId, resp: &Response) {
126    if method.as_ref() != AttachToTargetParams::IDENTIFIER {
127        return;
128    }
129
130    if let Ok(resp) = to_command_response::<AttachToTargetParams>(resp.clone(), method.clone()) {
131        target.set_session_id(resp.result.session_id);
132    }
133}
134
135impl Handler {
136    /// Create a new `Handler` that drives the connection and listens for
137    /// messages on the receiver `rx`.
138    pub(crate) fn new(
139        mut conn: Connection<CdpEventMessage>,
140        rx: Receiver<HandlerMessage>,
141        config: HandlerConfig,
142    ) -> Self {
143        let discover = DISCOVER_ID.clone();
144        let _ = conn.submit_command(discover.0, None, discover.1);
145        let conn = Some(conn);
146
147        let browser_contexts = config
148            .context_ids
149            .iter()
150            .map(|id| BrowserContext::from(id.clone()))
151            .collect();
152
153        Self {
154            pending_commands: Default::default(),
155            from_browser: rx,
156            default_browser_context: Default::default(),
157            browser_contexts,
158            target_ids: Default::default(),
159            targets: Default::default(),
160            navigations: Default::default(),
161            sessions: Default::default(),
162            conn,
163            evict_command_timeout: PeriodicJob::new(config.request_timeout),
164            next_navigation_id: 0,
165            config,
166            event_listeners: Default::default(),
167            closing: false,
168            remaining_bytes: None,
169            budget_exhausted: false,
170            attached_targets: Default::default(),
171            page_wake: None,
172        }
173    }
174
175    /// Borrow the WebSocket connection, returning an error if it has been
176    /// consumed by [`Handler::run()`].
177    #[inline]
178    fn conn(&mut self) -> Result<&mut Connection<CdpEventMessage>> {
179        self.conn
180            .as_mut()
181            .ok_or_else(|| CdpError::msg("connection consumed by Handler::run()"))
182    }
183
184    /// Return the target with the matching `target_id`
185    pub fn get_target(&self, target_id: &TargetId) -> Option<&Target> {
186        self.targets.get(target_id)
187    }
188
189    /// Iterator over all currently attached targets
190    pub fn targets(&self) -> impl Iterator<Item = &Target> + '_ {
191        self.targets.values()
192    }
193
194    /// The default Browser context
195    pub fn default_browser_context(&self) -> &BrowserContext {
196        &self.default_browser_context
197    }
198
199    /// Iterator over all currently available browser contexts
200    pub fn browser_contexts(&self) -> impl Iterator<Item = &BrowserContext> + '_ {
201        self.browser_contexts.iter()
202    }
203
204    /// received a response to a navigation request like `Page.navigate`
205    fn on_navigation_response(&mut self, id: NavigationId, resp: Response) {
206        if let Some(nav) = self.navigations.remove(&id) {
207            match nav {
208                NavigationRequest::Navigate(mut nav) => {
209                    if nav.navigated {
210                        let _ = nav.tx.send(Ok(resp));
211                    } else {
212                        nav.set_response(resp);
213                        self.navigations
214                            .insert(id, NavigationRequest::Navigate(nav));
215                    }
216                }
217            }
218        }
219    }
220
221    /// A navigation has finished.
222    fn on_navigation_lifecycle_completed(&mut self, res: Result<NavigationOk, NavigationError>) {
223        match res {
224            Ok(ok) => {
225                let id = *ok.navigation_id();
226                if let Some(nav) = self.navigations.remove(&id) {
227                    match nav {
228                        NavigationRequest::Navigate(mut nav) => {
229                            if let Some(resp) = nav.response.take() {
230                                let _ = nav.tx.send(Ok(resp));
231                            } else {
232                                nav.set_navigated();
233                                self.navigations
234                                    .insert(id, NavigationRequest::Navigate(nav));
235                            }
236                        }
237                    }
238                }
239            }
240            Err(err) => {
241                if let Some(nav) = self.navigations.remove(err.navigation_id()) {
242                    match nav {
243                        NavigationRequest::Navigate(nav) => {
244                            let _ = nav.tx.send(Err(err.into()));
245                        }
246                    }
247                }
248            }
249        }
250    }
251
252    /// Received a response to a request.
253    fn on_response(&mut self, resp: Response) {
254        if let Some((req, method, _)) = self.pending_commands.remove(&resp.id) {
255            match req {
256                PendingRequest::CreateTarget(tx) => {
257                    match to_command_response::<CreateTargetParams>(resp, method) {
258                        Ok(resp) => {
259                            if let Some(target) = self.targets.get_mut(&resp.target_id) {
260                                target.set_initiator(tx);
261                            } else {
262                                let _ = tx.send(Err(CdpError::NotFound)).ok();
263                            }
264                        }
265                        Err(err) => {
266                            let _ = tx.send(Err(err)).ok();
267                        }
268                    }
269                }
270                PendingRequest::GetTargets(tx) => {
271                    match to_command_response::<GetTargetsParams>(resp, method) {
272                        Ok(resp) => {
273                            let targets = resp.result.target_infos;
274                            let results = targets.clone();
275
276                            for target_info in targets {
277                                let event: EventTargetCreated = EventTargetCreated { target_info };
278                                self.on_target_created(event);
279                            }
280
281                            let _ = tx.send(Ok(results)).ok();
282                        }
283                        Err(err) => {
284                            let _ = tx.send(Err(err)).ok();
285                        }
286                    }
287                }
288                PendingRequest::Navigate(id) => {
289                    self.on_navigation_response(id, resp);
290                    if self.config.only_html && !self.config.created_first_target {
291                        self.config.created_first_target = true;
292                    }
293                }
294                PendingRequest::ExternalCommand { tx, .. } => {
295                    let _ = tx.send(Ok(resp)).ok();
296                }
297                PendingRequest::InternalCommand(target_id) => {
298                    if let Some(target) = self.targets.get_mut(&target_id) {
299                        maybe_store_attach_session_id(target, &method, &resp);
300                        target.on_response(resp, method.as_ref());
301                    }
302                }
303                PendingRequest::CloseBrowser(tx) => {
304                    self.closing = true;
305                    let _ = tx.send(Ok(CloseReturns {})).ok();
306                }
307            }
308        }
309    }
310
311    /// Submit a command initiated via channel
312    pub(crate) fn submit_external_command(
313        &mut self,
314        msg: CommandMessage,
315        now: Instant,
316    ) -> Result<()> {
317        // Resolve session_id → target_id before `submit_command`
318        // consumes `msg.session_id`. `None` when the session hasn't
319        // landed in `self.sessions` yet; that command then relies on
320        // the normal request_timeout path if the target later crashes.
321        let target_id = msg
322            .session_id
323            .as_ref()
324            .and_then(|sid| self.sessions.get(sid.as_ref()))
325            .map(|s| s.target_id().clone());
326        let call_id =
327            self.conn()?
328                .submit_command(msg.method.clone(), msg.session_id, msg.params)?;
329        self.pending_commands.insert(
330            call_id,
331            (
332                PendingRequest::ExternalCommand {
333                    tx: msg.sender,
334                    target_id,
335                },
336                msg.method,
337                now,
338            ),
339        );
340        Ok(())
341    }
342
343    pub(crate) fn submit_internal_command(
344        &mut self,
345        target_id: TargetId,
346        req: CdpRequest,
347        now: Instant,
348    ) -> Result<()> {
349        let call_id = self.conn()?.submit_command(
350            req.method.clone(),
351            req.session_id.map(Into::into),
352            req.params,
353        )?;
354        self.pending_commands.insert(
355            call_id,
356            (PendingRequest::InternalCommand(target_id), req.method, now),
357        );
358        Ok(())
359    }
360
361    fn submit_fetch_targets(&mut self, tx: OneshotSender<Result<Vec<TargetInfo>>>, now: Instant) {
362        let msg = TARGET_PARAMS_ID.clone();
363
364        if let Some(conn) = self.conn.as_mut() {
365            if let Ok(call_id) = conn.submit_command(msg.0.clone(), None, msg.1) {
366                self.pending_commands
367                    .insert(call_id, (PendingRequest::GetTargets(tx), msg.0, now));
368            }
369        }
370    }
371
372    /// Send the Request over to the server and store its identifier to handle
373    /// the response once received.
374    fn submit_navigation(&mut self, id: NavigationId, req: CdpRequest, now: Instant) {
375        if let Some(conn) = self.conn.as_mut() {
376            if let Ok(call_id) = conn.submit_command(
377                req.method.clone(),
378                req.session_id.map(Into::into),
379                req.params,
380            ) {
381                self.pending_commands
382                    .insert(call_id, (PendingRequest::Navigate(id), req.method, now));
383            }
384        }
385    }
386
387    fn submit_close(&mut self, tx: OneshotSender<Result<CloseReturns>>, now: Instant) {
388        let close_msg = CLOSE_PARAMS_ID.clone();
389
390        if let Some(conn) = self.conn.as_mut() {
391            if let Ok(call_id) = conn.submit_command(close_msg.0.clone(), None, close_msg.1) {
392                self.pending_commands.insert(
393                    call_id,
394                    (PendingRequest::CloseBrowser(tx), close_msg.0, now),
395                );
396            }
397        }
398    }
399
400    /// Process a message received by the target's page via channel
401    fn on_target_message(&mut self, target: &mut Target, msg: CommandMessage, now: Instant) {
402        if msg.is_navigation() {
403            let (req, tx) = msg.split();
404            let id = self.next_navigation_id();
405
406            target.goto(FrameRequestedNavigation::new(
407                id,
408                req,
409                self.config.request_timeout,
410            ));
411
412            self.navigations.insert(
413                id,
414                NavigationRequest::Navigate(NavigationInProgress::new(tx)),
415            );
416        } else {
417            let _ = self.submit_external_command(msg, now);
418        }
419    }
420
421    /// An identifier for queued `NavigationRequest`s.
422    fn next_navigation_id(&mut self) -> NavigationId {
423        let id = NavigationId(self.next_navigation_id);
424        self.next_navigation_id = self.next_navigation_id.wrapping_add(1);
425        id
426    }
427
428    /// Create a new page and send it to the receiver when ready
429    ///
430    /// First a `CreateTargetParams` is send to the server, this will trigger
431    /// `EventTargetCreated` which results in a new `Target` being created.
432    /// Once the response to the request is received the initialization process
433    /// of the target kicks in. This triggers a queue of initialization requests
434    /// of the `Target`, once those are all processed and the `url` fo the
435    /// `CreateTargetParams` has finished loading (The `Target`'s `Page` is
436    /// ready and idle), the `Target` sends its newly created `Page` as response
437    /// to the initiator (`tx`) of the `CreateTargetParams` request.
438    fn create_page(&mut self, params: CreateTargetParams, tx: OneshotSender<Result<Page>>) {
439        let about_blank = params.url == "about:blank";
440        let http_check =
441            !about_blank && params.url.starts_with("http") || params.url.starts_with("file://");
442
443        if about_blank || http_check {
444            let method = params.identifier();
445
446            let Some(conn) = self.conn.as_mut() else {
447                let _ = tx.send(Err(CdpError::msg("connection consumed"))).ok();
448                return;
449            };
450            match serde_json::to_value(params) {
451                Ok(params) => match conn.submit_command(method.clone(), None, params) {
452                    Ok(call_id) => {
453                        self.pending_commands.insert(
454                            call_id,
455                            (PendingRequest::CreateTarget(tx), method, Instant::now()),
456                        );
457                    }
458                    Err(err) => {
459                        let _ = tx.send(Err(err.into())).ok();
460                    }
461                },
462                Err(err) => {
463                    let _ = tx.send(Err(err.into())).ok();
464                }
465            }
466        } else {
467            let _ = tx.send(Err(CdpError::NotFound)).ok();
468        }
469    }
470
471    /// Process an incoming event read from the websocket
472    fn on_event(&mut self, event: CdpEventMessage) {
473        if let Some(session_id) = &event.session_id {
474            if let Some(session) = self.sessions.get(session_id.as_str()) {
475                if let Some(target) = self.targets.get_mut(session.target_id()) {
476                    return target.on_event(event);
477                }
478            }
479        }
480        let CdpEventMessage { params, method, .. } = event;
481
482        match params {
483            CdpEvent::TargetTargetCreated(ref ev) => self.on_target_created((**ev).clone()),
484            CdpEvent::TargetAttachedToTarget(ref ev) => self.on_attached_to_target(ev.clone()),
485            CdpEvent::TargetTargetDestroyed(ref ev) => self.on_target_destroyed(ev.clone()),
486            CdpEvent::TargetTargetCrashed(ref ev) => self.on_target_crashed(ev.clone()),
487            CdpEvent::TargetDetachedFromTarget(ref ev) => self.on_detached_from_target(ev.clone()),
488            _ => {}
489        }
490
491        chromiumoxide_cdp::consume_event!(match params {
492            |ev| self.event_listeners.start_send(ev),
493            |json| { let _ = self.event_listeners.try_send_custom(&method, json);}
494        });
495    }
496
497    /// Fired when a new target was created on the chromium instance
498    ///
499    /// Creates a new `Target` instance and keeps track of it
500    fn on_target_created(&mut self, event: EventTargetCreated) {
501        if !self.browser_contexts.is_empty() {
502            if let Some(ref context_id) = event.target_info.browser_context_id {
503                let bc = BrowserContext {
504                    id: Some(context_id.clone()),
505                };
506                if !self.browser_contexts.contains(&bc) {
507                    return;
508                }
509            }
510        }
511        let browser_ctx = event
512            .target_info
513            .browser_context_id
514            .clone()
515            .map(BrowserContext::from)
516            .unwrap_or_else(|| self.default_browser_context.clone());
517        let target = Target::new(
518            event.target_info,
519            TargetConfig {
520                ignore_https_errors: self.config.ignore_https_errors,
521                request_timeout: self.config.request_timeout,
522                viewport: self.config.viewport.clone(),
523                request_intercept: self.config.request_intercept,
524                cache_enabled: self.config.cache_enabled,
525                service_worker_enabled: self.config.service_worker_enabled,
526                ignore_visuals: self.config.ignore_visuals,
527                ignore_stylesheets: self.config.ignore_stylesheets,
528                ignore_javascript: self.config.ignore_javascript,
529                ignore_analytics: self.config.ignore_analytics,
530                ignore_prefetch: self.config.ignore_prefetch,
531                allow_first_party_stylesheets: self.config.allow_first_party_stylesheets,
532                allow_first_party_javascript: self.config.allow_first_party_javascript,
533                allow_first_party_visuals: self.config.allow_first_party_visuals,
534                extra_headers: self.config.extra_headers.clone(),
535                only_html: self.config.only_html && self.config.created_first_target,
536                intercept_manager: self.config.intercept_manager,
537                remote_local_policy: self.config.remote_local_policy,
538                max_bytes_allowed: self.config.max_bytes_allowed,
539                max_redirects: self.config.max_redirects,
540                max_main_frame_navigations: self.config.max_main_frame_navigations,
541                whitelist_patterns: self.config.whitelist_patterns.clone(),
542                blacklist_patterns: self.config.blacklist_patterns.clone(),
543                #[cfg(feature = "adblock")]
544                adblock_filter_rules: self.config.adblock_filter_rules.clone(),
545                page_wake: self.page_wake.clone(),
546                page_channel_capacity: self.config.page_channel_capacity,
547            },
548            browser_ctx,
549        );
550
551        let tid = target.target_id().clone();
552        self.target_ids.push(tid.clone());
553        self.targets.insert(tid, target);
554    }
555
556    /// A new session is attached to a target
557    fn on_attached_to_target(&mut self, event: Box<EventAttachedToTarget>) {
558        let session = Session::new(event.session_id.clone(), event.target_info.target_id);
559        if let Some(target) = self.targets.get_mut(session.target_id()) {
560            target.set_session_id(session.session_id().clone())
561        }
562        self.sessions.insert(event.session_id, session);
563    }
564
565    /// The session was detached from target.
566    /// Can be issued multiple times per target if multiple session have been
567    /// attached to it.
568    fn on_detached_from_target(&mut self, event: EventDetachedFromTarget) {
569        // remove the session
570        if let Some(session) = self.sessions.remove(&event.session_id) {
571            if let Some(target) = self.targets.get_mut(session.target_id()) {
572                target.session_id_mut().take();
573            }
574        }
575    }
576
577    /// Fired when the target was destroyed in the browser
578    fn on_target_destroyed(&mut self, event: EventTargetDestroyed) {
579        self.attached_targets.remove(&event.target_id);
580
581        if let Some(target) = self.targets.remove(&event.target_id) {
582            // TODO shutdown?
583            if let Some(session) = target.session_id() {
584                self.sessions.remove(session);
585            }
586        }
587    }
588
589    /// Fired when a target has crashed (`Target.targetCrashed`).
590    ///
591    /// Unlike `targetDestroyed` (clean teardown), a crash means any
592    /// in-flight commands on that target will never receive a
593    /// response. Without explicit cancellation those commands sit in
594    /// `pending_commands` until the `request_timeout` evicts them,
595    /// which surfaces to callers as long latency tails on what is
596    /// really an immediate failure.
597    ///
598    /// Cancellation policy:
599    /// * `ExternalCommand { target_id: Some(crashed), .. }` — the
600    ///   caller's oneshot resolves with an error carrying the
601    ///   termination `status` + `errorCode` from the crash event.
602    /// * `InternalCommand(crashed)` — dropped silently; these are
603    ///   target-init commands whose caller is the target itself,
604    ///   which we're about to remove.
605    /// * `ExternalCommand { target_id: None, .. }` — left alone;
606    ///   browser-level or pre-attach-race commands aren't bound to
607    ///   this target.
608    /// * `Navigate(_)` and entries in `self.navigations` — left to
609    ///   the normal timeout path; `on_navigation_response` drops
610    ///   late responses once the target is removed below.
611    fn on_target_crashed(&mut self, event: EventTargetCrashed) {
612        let crashed_id = event.target_id.clone();
613        let status = event.status.clone();
614        let error_code = event.error_code;
615
616        // Two-pass cancellation: collect matching call-ids, then
617        // remove + signal. Can't signal inside `iter()` because
618        // `OneshotSender::send` consumes the sender, and the
619        // borrow checker disallows taking ownership from inside
620        // the iterator.
621        let to_cancel: Vec<CallId> = self
622            .pending_commands
623            .iter()
624            .filter_map(|(&call_id, (req, _, _))| match req {
625                PendingRequest::ExternalCommand {
626                    target_id: Some(tid),
627                    ..
628                } if *tid == crashed_id => Some(call_id),
629                PendingRequest::InternalCommand(tid) if *tid == crashed_id => Some(call_id),
630                _ => None,
631            })
632            .collect();
633
634        for call_id in to_cancel {
635            if let Some((req, _, _)) = self.pending_commands.remove(&call_id) {
636                match req {
637                    PendingRequest::ExternalCommand { tx, .. } => {
638                        let _ = tx.send(Err(CdpError::msg(format!(
639                            "target {:?} crashed: {} (errorCode={})",
640                            crashed_id, status, error_code
641                        ))));
642                    }
643                    PendingRequest::InternalCommand(_) => {
644                        // Target-init command — the target is gone,
645                        // nobody is waiting on a user-facing reply.
646                    }
647                    _ => {}
648                }
649            }
650        }
651
652        // Same map cleanup as `on_target_destroyed`.
653        self.attached_targets.remove(&crashed_id);
654        if let Some(target) = self.targets.remove(&crashed_id) {
655            if let Some(session) = target.session_id() {
656                self.sessions.remove(session);
657            }
658        }
659    }
660
661    /// House keeping of commands
662    ///
663    /// Remove all commands where `now` > `timestamp of command starting point +
664    /// request timeout` and notify the senders that their request timed out.
665    fn evict_timed_out_commands(&mut self, now: Instant) {
666        let deadline = match now.checked_sub(self.config.request_timeout) {
667            Some(d) => d,
668            None => return,
669        };
670
671        let timed_out: Vec<_> = self
672            .pending_commands
673            .iter()
674            .filter(|(_, (_, _, timestamp))| *timestamp < deadline)
675            .map(|(k, _)| *k)
676            .collect();
677
678        for call in timed_out {
679            if let Some((req, _, _)) = self.pending_commands.remove(&call) {
680                match req {
681                    PendingRequest::CreateTarget(tx) => {
682                        let _ = tx.send(Err(CdpError::Timeout));
683                    }
684                    PendingRequest::GetTargets(tx) => {
685                        let _ = tx.send(Err(CdpError::Timeout));
686                    }
687                    PendingRequest::Navigate(nav) => {
688                        if let Some(nav) = self.navigations.remove(&nav) {
689                            match nav {
690                                NavigationRequest::Navigate(nav) => {
691                                    let _ = nav.tx.send(Err(CdpError::Timeout));
692                                }
693                            }
694                        }
695                    }
696                    PendingRequest::ExternalCommand { tx, .. } => {
697                        let _ = tx.send(Err(CdpError::Timeout));
698                    }
699                    PendingRequest::InternalCommand(_) => {}
700                    PendingRequest::CloseBrowser(tx) => {
701                        let _ = tx.send(Err(CdpError::Timeout));
702                    }
703                }
704            }
705        }
706    }
707
708    pub fn event_listeners_mut(&mut self) -> &mut EventListeners {
709        &mut self.event_listeners
710    }
711
712    // ------------------------------------------------------------------
713    //  Tokio-native async entry point
714    // ------------------------------------------------------------------
715
716    /// Run the handler as a fully async tokio task.
717    ///
718    /// This is the high-performance alternative to polling `Handler` as a
719    /// `Stream`.  Internally it:
720    ///
721    /// * Splits the WebSocket into independent read/write halves — the
722    ///   writer runs in its own tokio task with natural batching.
723    /// * Uses `tokio::select!` to multiplex the browser channel, page
724    ///   notifications, WebSocket reads, the eviction timer, and writer
725    ///   health.
726    /// * Drains every target's page channel via `try_recv()` (non-blocking)
727    ///   after each event, with an `Arc<Notify>` ensuring the select loop
728    ///   wakes up whenever a page sends a message.
729    ///
730    /// # Usage
731    ///
732    /// ```rust,no_run
733    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
734    /// use chromiumoxide::Browser;
735    /// let (browser, handler) = Browser::launch(Default::default()).await?;
736    /// let handler_task = tokio::spawn(handler.run());
737    /// // … use browser …
738    /// # Ok(())
739    /// # }
740    /// ```
741    pub async fn run(mut self) -> Result<()> {
742        use chromiumoxide_types::Message;
743        use tokio::time::MissedTickBehavior;
744        use tokio_tungstenite::tungstenite::{self, error::ProtocolError};
745
746        // --- set up page notification ---
747        let page_wake = Arc::new(Notify::new());
748        self.page_wake = Some(page_wake.clone());
749
750        // --- split WebSocket ---
751        let conn = self
752            .conn
753            .take()
754            .ok_or_else(|| CdpError::msg("Handler::run() called with no connection"))?;
755        let async_conn = conn.into_async();
756        let mut ws_reader = async_conn.reader;
757        let ws_tx = async_conn.cmd_tx;
758        let mut writer_handle = async_conn.writer_handle;
759        let reader_handle = async_conn.reader_handle;
760        let mut next_call_id = async_conn.next_id;
761
762        // Helper to mint call-ids without &mut self.conn.
763        let mut alloc_call_id = || {
764            let id = chromiumoxide_types::CallId::new(next_call_id);
765            next_call_id = next_call_id.wrapping_add(1);
766            id
767        };
768
769        // --- eviction timer ---
770        let mut evict_timer = tokio::time::interval_at(
771            tokio::time::Instant::now() + self.config.request_timeout,
772            self.config.request_timeout,
773        );
774        evict_timer.set_missed_tick_behavior(MissedTickBehavior::Delay);
775
776        // Helper closure: submit a MethodCall through the WS writer.
777        macro_rules! ws_submit {
778            ($method:expr, $session_id:expr, $params:expr) => {{
779                let id = alloc_call_id();
780                let call = chromiumoxide_types::MethodCall {
781                    id,
782                    method: $method,
783                    session_id: $session_id,
784                    params: $params,
785                };
786                match ws_tx.try_send(call) {
787                    Ok(()) => Ok::<_, CdpError>(id),
788                    Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
789                        tracing::warn!("WS command channel full — dropping command");
790                        Err(CdpError::msg("WS command channel full"))
791                    }
792                    Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
793                        Err(CdpError::msg("WS writer closed"))
794                    }
795                }
796            }};
797        }
798
799        // ---- main event loop ----
800        //
801        // Modeled as an expression-loop producing `Result<()>` so that every
802        // exit path falls through to the graceful-shutdown block below
803        // (drop ws_tx → writer drains queue + sends WS Close → reader
804        // aborted). This matters for remote browsers (`Browser::connect`)
805        // where there is no child process whose death closes the socket.
806        let run_result: Result<()> = loop {
807            let now = std::time::Instant::now();
808
809            // 1. Drain all target page channels (non-blocking) & advance
810            //    state machines.
811            //
812            // Budget: drain at most 128 messages per target per iteration
813            // so a single chatty page cannot starve the rest.
814            const PER_TARGET_DRAIN_BUDGET: usize = 128;
815
816            for n in (0..self.target_ids.len()).rev() {
817                let target_id = self.target_ids.swap_remove(n);
818
819                if let Some((id, mut target)) = self.targets.remove_entry(&target_id) {
820                    // Drain page channel (non-blocking — waker is the Notify).
821                    {
822                        let mut msgs = Vec::new();
823                        if let Some(handle) = target.page_mut() {
824                            while msgs.len() < PER_TARGET_DRAIN_BUDGET {
825                                match handle.rx.try_recv() {
826                                    Ok(msg) => msgs.push(msg),
827                                    Err(_) => break,
828                                }
829                            }
830                        }
831                        for msg in msgs {
832                            target.on_page_message(msg);
833                        }
834                    }
835
836                    // Advance target state machine & process events.
837                    while let Some(event) = target.advance(now) {
838                        match event {
839                            TargetEvent::Request(req) => {
840                                if let Ok(call_id) =
841                                    ws_submit!(req.method.clone(), req.session_id, req.params)
842                                {
843                                    self.pending_commands.insert(
844                                        call_id,
845                                        (
846                                            PendingRequest::InternalCommand(
847                                                target.target_id().clone(),
848                                            ),
849                                            req.method,
850                                            now,
851                                        ),
852                                    );
853                                }
854                            }
855                            TargetEvent::Command(msg) => {
856                                if msg.is_navigation() {
857                                    let (req, tx) = msg.split();
858                                    let nav_id = self.next_navigation_id();
859                                    target.goto(FrameRequestedNavigation::new(
860                                        nav_id,
861                                        req.clone(),
862                                        self.config.request_timeout,
863                                    ));
864                                    if let Ok(call_id) =
865                                        ws_submit!(req.method.clone(), req.session_id, req.params)
866                                    {
867                                        self.pending_commands.insert(
868                                            call_id,
869                                            (PendingRequest::Navigate(nav_id), req.method, now),
870                                        );
871                                    }
872                                    self.navigations.insert(
873                                        nav_id,
874                                        NavigationRequest::Navigate(NavigationInProgress::new(tx)),
875                                    );
876                                } else if let Ok(call_id) = ws_submit!(
877                                    msg.method.clone(),
878                                    msg.session_id.map(Into::into),
879                                    msg.params
880                                ) {
881                                    // `target` is in scope here, so bind
882                                    // the pending command to its target_id
883                                    // directly.
884                                    let target_id = Some(target.target_id().clone());
885                                    self.pending_commands.insert(
886                                        call_id,
887                                        (
888                                            PendingRequest::ExternalCommand {
889                                                tx: msg.sender,
890                                                target_id,
891                                            },
892                                            msg.method,
893                                            now,
894                                        ),
895                                    );
896                                }
897                            }
898                            TargetEvent::NavigationRequest(nav_id, req) => {
899                                if let Ok(call_id) =
900                                    ws_submit!(req.method.clone(), req.session_id, req.params)
901                                {
902                                    self.pending_commands.insert(
903                                        call_id,
904                                        (PendingRequest::Navigate(nav_id), req.method, now),
905                                    );
906                                }
907                            }
908                            TargetEvent::NavigationResult(res) => {
909                                self.on_navigation_lifecycle_completed(res);
910                            }
911                            TargetEvent::BytesConsumed(n) => {
912                                if let Some(rem) = self.remaining_bytes.as_mut() {
913                                    *rem = rem.saturating_sub(n);
914                                    if *rem == 0 {
915                                        self.budget_exhausted = true;
916                                    }
917                                }
918                            }
919                        }
920                    }
921
922                    // Flush event listeners (no Context needed).
923                    target.event_listeners_mut().flush();
924
925                    self.targets.insert(id, target);
926                    self.target_ids.push(target_id);
927                }
928            }
929
930            // Flush handler-level event listeners.
931            self.event_listeners.flush();
932
933            if self.budget_exhausted {
934                for t in self.targets.values_mut() {
935                    t.network_manager.set_block_all(true);
936                }
937            }
938
939            if self.closing {
940                break Ok(());
941            }
942
943            // 2. Multiplex all event sources via tokio::select!
944            tokio::select! {
945                msg = self.from_browser.recv() => {
946                    match msg {
947                        Some(msg) => {
948                            match msg {
949                                HandlerMessage::Command(cmd) => {
950                                    // See `submit_external_command` for
951                                    // the session_id → target_id resolve.
952                                    let target_id = cmd
953                                        .session_id
954                                        .as_ref()
955                                        .and_then(|sid| self.sessions.get(sid.as_ref()))
956                                        .map(|s| s.target_id().clone());
957                                    if let Ok(call_id) = ws_submit!(
958                                        cmd.method.clone(),
959                                        cmd.session_id.map(Into::into),
960                                        cmd.params
961                                    ) {
962                                        self.pending_commands.insert(
963                                            call_id,
964                                            (
965                                                PendingRequest::ExternalCommand {
966                                                    tx: cmd.sender,
967                                                    target_id,
968                                                },
969                                                cmd.method,
970                                                now,
971                                            ),
972                                        );
973                                    }
974                                }
975                                HandlerMessage::FetchTargets(tx) => {
976                                    let msg = TARGET_PARAMS_ID.clone();
977                                    if let Ok(call_id) = ws_submit!(msg.0.clone(), None, msg.1) {
978                                        self.pending_commands.insert(
979                                            call_id,
980                                            (PendingRequest::GetTargets(tx), msg.0, now),
981                                        );
982                                    }
983                                }
984                                HandlerMessage::CloseBrowser(tx) => {
985                                    let close_msg = CLOSE_PARAMS_ID.clone();
986                                    if let Ok(call_id) = ws_submit!(close_msg.0.clone(), None, close_msg.1) {
987                                        self.pending_commands.insert(
988                                            call_id,
989                                            (PendingRequest::CloseBrowser(tx), close_msg.0, now),
990                                        );
991                                    }
992                                }
993                                HandlerMessage::CreatePage(params, tx) => {
994                                    if let Some(ref id) = params.browser_context_id {
995                                        self.browser_contexts.insert(BrowserContext::from(id.clone()));
996                                    }
997                                    self.create_page_async(params, tx, &mut alloc_call_id, &ws_tx, now);
998                                }
999                                HandlerMessage::GetPages(tx) => {
1000                                    let pages: Vec<_> = self.targets.values_mut()
1001                                        .filter(|p| p.is_page())
1002                                        .filter_map(|target| target.get_or_create_page())
1003                                        .map(|page| Page::from(page.clone()))
1004                                        .collect();
1005                                    let _ = tx.send(pages);
1006                                }
1007                                HandlerMessage::InsertContext(ctx) => {
1008                                    if self.default_browser_context.id().is_none() {
1009                                        self.default_browser_context = ctx.clone();
1010                                    }
1011                                    self.browser_contexts.insert(ctx);
1012                                }
1013                                HandlerMessage::DisposeContext(ctx) => {
1014                                    self.browser_contexts.remove(&ctx);
1015                                    self.attached_targets.retain(|tid| {
1016                                        self.targets.get(tid)
1017                                            .and_then(|t| t.browser_context_id())
1018                                            .map(|id| Some(id) != ctx.id())
1019                                            .unwrap_or(true)
1020                                    });
1021                                    self.closing = true;
1022                                }
1023                                HandlerMessage::GetPage(target_id, tx) => {
1024                                    let page = self.targets.get_mut(&target_id)
1025                                        .and_then(|target| target.get_or_create_page())
1026                                        .map(|page| Page::from(page.clone()));
1027                                    let _ = tx.send(page);
1028                                }
1029                                HandlerMessage::AddEventListener(req) => {
1030                                    self.event_listeners.add_listener(req);
1031                                }
1032                            }
1033                        }
1034                        None => break Ok(()), // browser handle dropped
1035                    }
1036                }
1037
1038                frame = ws_reader.next_message() => {
1039                    match frame {
1040                        Some(Ok(boxed_msg)) => match *boxed_msg {
1041                            Message::Response(resp) => {
1042                                self.on_response(resp);
1043                            }
1044                            Message::Event(ev) => {
1045                                self.on_event(ev);
1046                            }
1047                        },
1048                        Some(Err(err)) => {
1049                            tracing::error!("WS Connection error: {:?}", err);
1050                            if let CdpError::Ws(ref ws_error) = err {
1051                                match ws_error {
1052                                    tungstenite::Error::AlreadyClosed => break Ok(()),
1053                                    tungstenite::Error::Protocol(detail)
1054                                        if detail == &ProtocolError::ResetWithoutClosingHandshake =>
1055                                    {
1056                                        break Ok(());
1057                                    }
1058                                    _ => break Err(err),
1059                                }
1060                            } else {
1061                                break Err(err);
1062                            }
1063                        }
1064                        None => break Ok(()), // WS closed
1065                    }
1066                }
1067
1068                _ = page_wake.notified() => {
1069                    // A page sent a message — loop back to drain targets.
1070                }
1071
1072                _ = evict_timer.tick() => {
1073                    self.evict_timed_out_commands(now);
1074                    for t in self.targets.values_mut() {
1075                        t.network_manager.evict_stale_entries(now);
1076                        t.frame_manager_mut().evict_stale_context_ids();
1077                    }
1078                }
1079
1080                result = &mut writer_handle => {
1081                    // WS writer exited — propagate error or break.
1082                    match result {
1083                        Ok(Ok(())) => break Ok(()),
1084                        Ok(Err(e)) => break Err(e),
1085                        Err(e) => break Err(CdpError::msg(format!("WS writer panicked: {e}"))),
1086                    }
1087                }
1088            }
1089        };
1090
1091        // ---- graceful shutdown ----
1092        //
1093        // Drop the WS command sender so the writer task's `rx.recv()`
1094        // returns `None`. The writer drains any queued commands, sends a
1095        // WebSocket Close frame to Chrome, and exits. For remote browsers
1096        // this is the only mechanism that closes the WS — there's no child
1097        // process whose death would close the socket.
1098        drop(ws_tx);
1099
1100        // Wait briefly for the writer to send the Close frame. If it's
1101        // already done (e.g. exited via the writer-handle select arm),
1102        // skip the wait. Polling a finished `JoinHandle` again would
1103        // panic.
1104        if !writer_handle.is_finished() {
1105            let _ = tokio::time::timeout(std::time::Duration::from_millis(500), &mut writer_handle)
1106                .await;
1107            if !writer_handle.is_finished() {
1108                writer_handle.abort();
1109            }
1110        }
1111
1112        // Reader may be parked on `stream.next().await` waiting for
1113        // frames from Chrome. Its output channel receiver (`ws_reader`)
1114        // is dropped at function exit, so there is no consumer either
1115        // way — abort directly rather than waiting for the remote to
1116        // ack the Close frame.
1117        reader_handle.abort();
1118
1119        run_result
1120    }
1121
1122    /// `create_page` variant for the `run()` path that submits via `ws_tx`.
1123    fn create_page_async(
1124        &mut self,
1125        params: CreateTargetParams,
1126        tx: OneshotSender<Result<Page>>,
1127        alloc_call_id: &mut impl FnMut() -> chromiumoxide_types::CallId,
1128        ws_tx: &tokio::sync::mpsc::Sender<chromiumoxide_types::MethodCall>,
1129        now: std::time::Instant,
1130    ) {
1131        let about_blank = params.url == "about:blank";
1132        let http_check =
1133            !about_blank && params.url.starts_with("http") || params.url.starts_with("file://");
1134
1135        if about_blank || http_check {
1136            let method = params.identifier();
1137            match serde_json::to_value(params) {
1138                Ok(params) => {
1139                    let id = alloc_call_id();
1140                    let call = chromiumoxide_types::MethodCall {
1141                        id,
1142                        method: method.clone(),
1143                        session_id: None,
1144                        params,
1145                    };
1146                    match ws_tx.try_send(call) {
1147                        Ok(()) => {
1148                            self.pending_commands
1149                                .insert(id, (PendingRequest::CreateTarget(tx), method, now));
1150                        }
1151                        Err(_) => {
1152                            let _ = tx
1153                                .send(Err(CdpError::msg("WS command channel full or closed")))
1154                                .ok();
1155                        }
1156                    }
1157                }
1158                Err(err) => {
1159                    let _ = tx.send(Err(err.into())).ok();
1160                }
1161            }
1162        } else {
1163            let _ = tx.send(Err(CdpError::NotFound)).ok();
1164        }
1165    }
1166
1167    /// Run the handler with one task per attached page (parallel handler).
1168    ///
1169    /// Opt-in via the `parallel-handler` Cargo feature. The single-task
1170    /// `Handler::run()` path is unchanged. See `src/handler/parallel/mod.rs`
1171    /// for the architectural notes and current scope limits.
1172    #[cfg(feature = "parallel-handler")]
1173    pub async fn run_parallel(mut self) -> Result<()> {
1174        // Reuse the existing setup that `run()` did inline: split the WS
1175        // connection, kick the boot `Target.setDiscoverTargets` command,
1176        // and hand everything to the Router.
1177        let conn = self
1178            .conn
1179            .take()
1180            .ok_or_else(|| CdpError::msg("Handler::run_parallel() called with no connection"))?;
1181        let async_conn = conn.into_async();
1182
1183        // The boot command has already been pushed by `Handler::new`; it
1184        // sits at call_id `next_id - 1`.
1185        let next_id = async_conn.next_id;
1186        let boot_call_id = chromiumoxide_types::CallId::new(next_id.saturating_sub(1));
1187        let boot_method = DISCOVER_ID.0.clone();
1188
1189        let router = parallel::Router::new(
1190            self.config,
1191            self.default_browser_context,
1192            self.from_browser,
1193            async_conn.reader,
1194            async_conn.cmd_tx,
1195            boot_call_id,
1196            boot_method,
1197            next_id,
1198        );
1199        let result = router.run().await;
1200
1201        // Make sure the writer drains and the reader task exits cleanly.
1202        async_conn.writer_handle.abort();
1203        async_conn.reader_handle.abort();
1204
1205        result
1206    }
1207}
1208
1209impl Stream for Handler {
1210    type Item = Result<()>;
1211
1212    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1213        // Budgets prevent a single chatty target or WS flood from
1214        // starving other futures on the runtime. Mirror the caps
1215        // used in `Handler::run()`; on exhaustion, self-wake and
1216        // return Pending so the executor gets a chance to schedule
1217        // other work before we resume.
1218        const BROWSER_MSG_BUDGET: usize = 128;
1219        const PER_TARGET_DRAIN_BUDGET: usize = 128;
1220        const WS_MSG_BUDGET: usize = 512;
1221
1222        let pin = self.get_mut();
1223
1224        let mut dispose = false;
1225        let mut budget_hit = false;
1226
1227        let now = Instant::now();
1228
1229        loop {
1230            // temporary pinning of the browser receiver should be safe as we are pinning
1231            // through the already pinned self. with the receivers we can also
1232            // safely ignore exhaustion as those are fused.
1233            let mut browser_msgs = 0usize;
1234            while let Poll::Ready(Some(msg)) = pin.from_browser.poll_recv(cx) {
1235                match msg {
1236                    HandlerMessage::Command(cmd) => {
1237                        pin.submit_external_command(cmd, now)?;
1238                    }
1239                    HandlerMessage::FetchTargets(tx) => {
1240                        pin.submit_fetch_targets(tx, now);
1241                    }
1242                    HandlerMessage::CloseBrowser(tx) => {
1243                        pin.submit_close(tx, now);
1244                    }
1245                    HandlerMessage::CreatePage(params, tx) => {
1246                        if let Some(ref id) = params.browser_context_id {
1247                            pin.browser_contexts
1248                                .insert(BrowserContext::from(id.clone()));
1249                        }
1250                        pin.create_page(params, tx);
1251                    }
1252                    HandlerMessage::GetPages(tx) => {
1253                        let pages: Vec<_> = pin
1254                            .targets
1255                            .values_mut()
1256                            .filter(|p: &&mut Target| p.is_page())
1257                            .filter_map(|target| target.get_or_create_page())
1258                            .map(|page| Page::from(page.clone()))
1259                            .collect();
1260                        let _ = tx.send(pages);
1261                    }
1262                    HandlerMessage::InsertContext(ctx) => {
1263                        if pin.default_browser_context.id().is_none() {
1264                            pin.default_browser_context = ctx.clone();
1265                        }
1266                        pin.browser_contexts.insert(ctx);
1267                    }
1268                    HandlerMessage::DisposeContext(ctx) => {
1269                        pin.browser_contexts.remove(&ctx);
1270                        pin.attached_targets.retain(|tid| {
1271                            pin.targets
1272                                .get(tid)
1273                                .and_then(|t| t.browser_context_id()) // however you expose it
1274                                .map(|id| Some(id) != ctx.id())
1275                                .unwrap_or(true)
1276                        });
1277                        pin.closing = true;
1278                        dispose = true;
1279                    }
1280                    HandlerMessage::GetPage(target_id, tx) => {
1281                        let page = pin
1282                            .targets
1283                            .get_mut(&target_id)
1284                            .and_then(|target| target.get_or_create_page())
1285                            .map(|page| Page::from(page.clone()));
1286                        let _ = tx.send(page);
1287                    }
1288                    HandlerMessage::AddEventListener(req) => {
1289                        pin.event_listeners.add_listener(req);
1290                    }
1291                }
1292                browser_msgs += 1;
1293                if browser_msgs >= BROWSER_MSG_BUDGET {
1294                    budget_hit = true;
1295                    break;
1296                }
1297            }
1298
1299            for n in (0..pin.target_ids.len()).rev() {
1300                let target_id = pin.target_ids.swap_remove(n);
1301
1302                if let Some((id, mut target)) = pin.targets.remove_entry(&target_id) {
1303                    let mut drained = 0usize;
1304                    while let Some(event) = target.poll(cx, now) {
1305                        match event {
1306                            TargetEvent::Request(req) => {
1307                                let _ = pin.submit_internal_command(
1308                                    target.target_id().clone(),
1309                                    req,
1310                                    now,
1311                                );
1312                            }
1313                            TargetEvent::Command(msg) => {
1314                                pin.on_target_message(&mut target, msg, now);
1315                            }
1316                            TargetEvent::NavigationRequest(id, req) => {
1317                                pin.submit_navigation(id, req, now);
1318                            }
1319                            TargetEvent::NavigationResult(res) => {
1320                                pin.on_navigation_lifecycle_completed(res)
1321                            }
1322                            TargetEvent::BytesConsumed(n) => {
1323                                if let Some(rem) = pin.remaining_bytes.as_mut() {
1324                                    *rem = rem.saturating_sub(n);
1325                                    if *rem == 0 {
1326                                        pin.budget_exhausted = true;
1327                                    }
1328                                }
1329                            }
1330                        }
1331                        drained += 1;
1332                        if drained >= PER_TARGET_DRAIN_BUDGET {
1333                            budget_hit = true;
1334                            break;
1335                        }
1336                    }
1337
1338                    // poll the target's event listeners
1339                    target.event_listeners_mut().poll(cx);
1340
1341                    pin.targets.insert(id, target);
1342                    pin.target_ids.push(target_id);
1343                }
1344            }
1345
1346            // poll the handler-level event listeners once per iteration,
1347            // not once per target.
1348            pin.event_listeners_mut().poll(cx);
1349
1350            let mut done = true;
1351
1352            // Read WS messages into a temporary buffer so the conn borrow
1353            // is released before we process them (which needs &mut pin).
1354            let mut ws_msgs = Vec::new();
1355            let mut ws_err = None;
1356            {
1357                let Some(conn) = pin.conn.as_mut() else {
1358                    return Poll::Ready(Some(Err(CdpError::msg(
1359                        "connection consumed by Handler::run()",
1360                    ))));
1361                };
1362                while let Poll::Ready(Some(ev)) = Pin::new(&mut *conn).poll_next(cx) {
1363                    match ev {
1364                        Ok(msg) => ws_msgs.push(msg),
1365                        Err(err) => {
1366                            ws_err = Some(err);
1367                            break;
1368                        }
1369                    }
1370                    if ws_msgs.len() >= WS_MSG_BUDGET {
1371                        budget_hit = true;
1372                        break;
1373                    }
1374                }
1375            }
1376
1377            for boxed_msg in ws_msgs {
1378                match *boxed_msg {
1379                    Message::Response(resp) => {
1380                        pin.on_response(resp);
1381                        if pin.closing {
1382                            return Poll::Ready(None);
1383                        }
1384                    }
1385                    Message::Event(ev) => {
1386                        pin.on_event(ev);
1387                    }
1388                }
1389                done = false;
1390            }
1391
1392            if let Some(err) = ws_err {
1393                tracing::error!("WS Connection error: {:?}", err);
1394                if let CdpError::Ws(ref ws_error) = err {
1395                    match ws_error {
1396                        Error::AlreadyClosed => {
1397                            pin.closing = true;
1398                            dispose = true;
1399                        }
1400                        Error::Protocol(detail)
1401                            if detail == &ProtocolError::ResetWithoutClosingHandshake =>
1402                        {
1403                            pin.closing = true;
1404                            dispose = true;
1405                        }
1406                        _ => return Poll::Ready(Some(Err(err))),
1407                    }
1408                } else {
1409                    return Poll::Ready(Some(Err(err)));
1410                }
1411            }
1412
1413            if pin.evict_command_timeout.poll_ready(cx) {
1414                // evict all commands that timed out
1415                pin.evict_timed_out_commands(now);
1416                // evict stale network race-condition buffers and
1417                // orphaned context_ids / frame entries
1418                for t in pin.targets.values_mut() {
1419                    t.network_manager.evict_stale_entries(now);
1420                    t.frame_manager_mut().evict_stale_context_ids();
1421                }
1422            }
1423
1424            if pin.budget_exhausted {
1425                for t in pin.targets.values_mut() {
1426                    t.network_manager.set_block_all(true);
1427                }
1428            }
1429
1430            if dispose {
1431                return Poll::Ready(None);
1432            }
1433
1434            if budget_hit {
1435                // yield to the scheduler; self-wake so the remaining
1436                // work resumes on the next tick without waiting for
1437                // a WS event.
1438                cx.waker().wake_by_ref();
1439                return Poll::Pending;
1440            }
1441
1442            if done {
1443                // no events/responses were read from the websocket
1444                return Poll::Pending;
1445            }
1446        }
1447    }
1448}
1449
1450/// How to configure the handler
1451#[derive(Debug, Clone)]
1452pub struct HandlerConfig {
1453    /// Whether the `NetworkManager`s should ignore https errors
1454    pub ignore_https_errors: bool,
1455    /// Window and device settings
1456    pub viewport: Option<Viewport>,
1457    /// Context ids to set from the get go
1458    pub context_ids: Vec<BrowserContextId>,
1459    /// default request timeout to use
1460    pub request_timeout: Duration,
1461    /// Whether to enable request interception
1462    pub request_intercept: bool,
1463    /// Whether to enable cache
1464    pub cache_enabled: bool,
1465    /// Whether to enable Service Workers
1466    pub service_worker_enabled: bool,
1467    /// Whether to ignore visuals.
1468    pub ignore_visuals: bool,
1469    /// Whether to ignore stylesheets.
1470    pub ignore_stylesheets: bool,
1471    /// Whether to ignore Javascript only allowing critical framework or lib based rendering.
1472    pub ignore_javascript: bool,
1473    /// When `ignore_stylesheets` would skip a stylesheet, allow it through if
1474    /// the request URL is first-party (registrable domain matches the page's
1475    /// primary frame). Default `true` so SPAs that load their own CSS via
1476    /// dynamic imports still hydrate. Set `false` for strict block-all.
1477    pub allow_first_party_stylesheets: bool,
1478    /// When a downstream blocker (intercept manager / adblock / blocklists)
1479    /// would skip a script, allow it through if first-party. Default `true`
1480    /// so SPA bootloaders are not collateral damage from third-party rules.
1481    pub allow_first_party_javascript: bool,
1482    /// When `ignore_visuals` would skip an image/media/font, allow it through
1483    /// if the request URL is first-party. Default `true`. Set `false` for
1484    /// strict bandwidth-minimal crawls that drop ALL visuals.
1485    pub allow_first_party_visuals: bool,
1486    /// Whether to ignore analytics.
1487    pub ignore_analytics: bool,
1488    /// Ignore prefetch request. Defaults to true.
1489    pub ignore_prefetch: bool,
1490    /// Whether to ignore ads.
1491    pub ignore_ads: bool,
1492    /// Extra headers.
1493    pub extra_headers: Option<std::collections::HashMap<String, String>>,
1494    /// Only Html.
1495    pub only_html: bool,
1496    /// Created the first target.
1497    pub created_first_target: bool,
1498    /// The network intercept manager.
1499    pub intercept_manager: NetworkInterceptManager,
1500    /// The max bytes to receive.
1501    pub max_bytes_allowed: Option<u64>,
1502    /// Cap on main-frame Document redirect hops (per navigation).
1503    ///
1504    /// `None` disables enforcement (default); `Some(n)` aborts once the chain length
1505    /// exceeds `n` by emitting `net::ERR_TOO_MANY_REDIRECTS` and calling
1506    /// `Page.stopLoading`. Preserves the accumulated `redirect_chain` on the failed
1507    /// request so consumers can inspect it.
1508    pub max_redirects: Option<usize>,
1509    /// Cap on main-frame cross-document navigations per `goto`. Defends against
1510    /// JS / meta-refresh loops that bypass the HTTP redirect guard. `None`
1511    /// disables the guard.
1512    pub max_main_frame_navigations: Option<u32>,
1513    /// Optional per-run/per-site whitelist of URL substrings (scripts/resources).
1514    pub whitelist_patterns: Option<Vec<String>>,
1515    /// Optional per-run/per-site blacklist of URL substrings (scripts/resources).
1516    pub blacklist_patterns: Option<Vec<String>>,
1517    /// Push the interception policy to a capable remote engine once per
1518    /// navigation (`Interception.setPolicy`) so it can resolve block/allow
1519    /// locally instead of round-tripping each `Fetch.requestPaused`. Default
1520    /// `false`; safe to enable against any target (unknown method is ignored).
1521    pub remote_local_policy: bool,
1522    /// Extra ABP/uBO filter rules for the adblock engine.
1523    #[cfg(feature = "adblock")]
1524    pub adblock_filter_rules: Option<Vec<String>>,
1525    /// Capacity of the channel between browser handle and handler.
1526    /// Defaults to 1000.
1527    pub channel_capacity: usize,
1528    /// Capacity of the per-page mpsc channel carrying `TargetMessage`s
1529    /// from each `Page` to the handler.
1530    ///
1531    /// Defaults to `DEFAULT_PAGE_CHANNEL_CAPACITY` (2048) — the previous
1532    /// hard-coded value. Tune upward for pages that burst many commands
1533    /// (heavy `evaluate`/selector use, high-concurrency tasks sharing
1534    /// one page) to avoid pushing each extra command onto the
1535    /// `CommandFuture` async-send fallback path on `TrySendError::Full`.
1536    /// Tune downward to apply back-pressure sooner. Values of `0` are
1537    /// clamped to `1` at channel creation.
1538    pub page_channel_capacity: usize,
1539    /// Number of WebSocket connection retry attempts with exponential backoff.
1540    /// Defaults to 4.
1541    pub connection_retries: u32,
1542}
1543
1544impl Default for HandlerConfig {
1545    fn default() -> Self {
1546        Self {
1547            ignore_https_errors: true,
1548            viewport: Default::default(),
1549            context_ids: Vec::new(),
1550            request_timeout: Duration::from_millis(REQUEST_TIMEOUT),
1551            request_intercept: false,
1552            cache_enabled: true,
1553            service_worker_enabled: true,
1554            ignore_visuals: false,
1555            ignore_stylesheets: false,
1556            ignore_ads: false,
1557            ignore_javascript: false,
1558            allow_first_party_stylesheets: true,
1559            allow_first_party_javascript: true,
1560            allow_first_party_visuals: true,
1561            ignore_analytics: true,
1562            ignore_prefetch: true,
1563            only_html: false,
1564            extra_headers: Default::default(),
1565            created_first_target: false,
1566            intercept_manager: NetworkInterceptManager::Unknown,
1567            max_bytes_allowed: None,
1568            max_redirects: None,
1569            max_main_frame_navigations: None,
1570            whitelist_patterns: None,
1571            blacklist_patterns: None,
1572            remote_local_policy: false,
1573            #[cfg(feature = "adblock")]
1574            adblock_filter_rules: None,
1575            channel_capacity: 4096,
1576            page_channel_capacity: crate::handler::page::DEFAULT_PAGE_CHANNEL_CAPACITY,
1577            connection_retries: crate::conn::DEFAULT_CONNECTION_RETRIES,
1578        }
1579    }
1580}
1581
1582/// Wraps the sender half of the channel who requested a navigation
1583#[derive(Debug)]
1584pub struct NavigationInProgress<T> {
1585    /// Marker to indicate whether a navigation lifecycle has completed
1586    navigated: bool,
1587    /// The response of the issued navigation request
1588    response: Option<Response>,
1589    /// Sender who initiated the navigation request
1590    tx: OneshotSender<T>,
1591}
1592
1593impl<T> NavigationInProgress<T> {
1594    pub(crate) fn new(tx: OneshotSender<T>) -> Self {
1595        Self {
1596            navigated: false,
1597            response: None,
1598            tx,
1599        }
1600    }
1601
1602    /// The response to the cdp request has arrived
1603    pub(crate) fn set_response(&mut self, resp: Response) {
1604        self.response = Some(resp);
1605    }
1606
1607    /// The navigation process has finished, the page finished loading.
1608    pub(crate) fn set_navigated(&mut self) {
1609        self.navigated = true;
1610    }
1611
1612    /// Used by the parallel handler when reconciling Page.navigate response
1613    /// vs. lifecycle completion order — the existing serial handler reads
1614    /// the field directly so these accessors are otherwise inert.
1615    #[cfg_attr(not(feature = "parallel-handler"), allow(dead_code))]
1616    pub(crate) fn is_navigated(&self) -> bool {
1617        self.navigated
1618    }
1619
1620    #[cfg_attr(not(feature = "parallel-handler"), allow(dead_code))]
1621    pub(crate) fn take_response(&mut self) -> Option<Response> {
1622        self.response.take()
1623    }
1624
1625    #[cfg_attr(not(feature = "parallel-handler"), allow(dead_code))]
1626    pub(crate) fn into_tx(self) -> OneshotSender<T> {
1627        self.tx
1628    }
1629}
1630
1631/// Request type for navigation
1632#[derive(Debug)]
1633enum NavigationRequest {
1634    /// Represents a simple `NavigateParams` ("Page.navigate")
1635    Navigate(NavigationInProgress<Result<Response>>),
1636    // TODO are there more?
1637}
1638
1639/// Different kind of submitted request submitted from the  `Handler` to the
1640/// `Connection` and being waited on for the response.
1641#[derive(Debug)]
1642enum PendingRequest {
1643    /// A Request to create a new `Target` that results in the creation of a
1644    /// `Page` that represents a browser page.
1645    CreateTarget(OneshotSender<Result<Page>>),
1646    /// A Request to fetch old `Target`s created before connection
1647    GetTargets(OneshotSender<Result<Vec<TargetInfo>>>),
1648    /// A Request to navigate a specific `Target`.
1649    ///
1650    /// Navigation requests are not automatically completed once the response to
1651    /// the raw cdp navigation request (like `NavigateParams`) arrives, but only
1652    /// after the `Target` notifies the `Handler` that the `Page` has finished
1653    /// loading, which comes after the response.
1654    Navigate(NavigationId),
1655    /// A common request received via a channel (`Page`).
1656    ///
1657    /// `target_id` is resolved at submit time from the caller's
1658    /// `session_id` against `self.sessions`, so `on_target_crashed`
1659    /// can cancel in-flight user commands immediately. `None` when
1660    /// the command has no session (browser-level) or was sent
1661    /// before the attach event arrived — those fall back to the
1662    /// normal `request_timeout` eviction.
1663    ExternalCommand {
1664        tx: OneshotSender<Result<Response>>,
1665        target_id: Option<TargetId>,
1666    },
1667    /// Requests that are initiated directly from a `Target` (all the
1668    /// initialization commands).
1669    InternalCommand(TargetId),
1670    // A Request to close the browser.
1671    CloseBrowser(OneshotSender<Result<CloseReturns>>),
1672}
1673
1674/// Events used internally to communicate with the handler, which are executed
1675/// in the background
1676// TODO rename to BrowserMessage
1677#[derive(Debug)]
1678pub(crate) enum HandlerMessage {
1679    CreatePage(CreateTargetParams, OneshotSender<Result<Page>>),
1680    FetchTargets(OneshotSender<Result<Vec<TargetInfo>>>),
1681    InsertContext(BrowserContext),
1682    DisposeContext(BrowserContext),
1683    GetPages(OneshotSender<Vec<Page>>),
1684    Command(CommandMessage),
1685    GetPage(TargetId, OneshotSender<Option<Page>>),
1686    AddEventListener(EventListenerRequest),
1687    CloseBrowser(OneshotSender<Result<CloseReturns>>),
1688}
1689
1690#[cfg(test)]
1691mod tests {
1692    use super::*;
1693    use chromiumoxide_cdp::cdp::browser_protocol::target::{AttachToTargetReturns, TargetInfo};
1694
1695    #[test]
1696    fn attach_to_target_response_sets_session_id_before_event_arrives() {
1697        let info = TargetInfo::builder()
1698            .target_id("target-1".to_string())
1699            .r#type("page")
1700            .title("")
1701            .url("about:blank")
1702            .attached(false)
1703            .can_access_opener(false)
1704            .build()
1705            .expect("target info");
1706        let mut target = Target::new(info, TargetConfig::default(), BrowserContext::default());
1707        let method: MethodId = AttachToTargetParams::IDENTIFIER.into();
1708        let result = serde_json::to_value(AttachToTargetReturns::new("session-1".to_string()))
1709            .expect("attach result");
1710        let resp = Response {
1711            id: CallId::new(1),
1712            result: Some(result),
1713            error: None,
1714        };
1715
1716        maybe_store_attach_session_id(&mut target, &method, &resp);
1717
1718        assert_eq!(
1719            target.session_id().map(AsRef::as_ref),
1720            Some("session-1"),
1721            "attach response should seed the flat session id even before Target.attachedToTarget"
1722        );
1723    }
1724
1725    /// Regression guard: `page_channel_capacity` must default to 2048
1726    /// everywhere, so existing callers see identical behavior to the
1727    /// previous hard-coded value. If this test ever fails, every caller
1728    /// that relied on the implicit 2048-slot channel silently changed.
1729    #[test]
1730    fn page_channel_capacity_defaults_to_2048_across_configs() {
1731        use crate::browser::BrowserConfigBuilder;
1732        use crate::handler::page::DEFAULT_PAGE_CHANNEL_CAPACITY;
1733        use crate::handler::target::TargetConfig;
1734
1735        assert_eq!(DEFAULT_PAGE_CHANNEL_CAPACITY, 2048);
1736        assert_eq!(
1737            HandlerConfig::default().page_channel_capacity,
1738            DEFAULT_PAGE_CHANNEL_CAPACITY,
1739            "HandlerConfig default must match the historical 2048 slot count"
1740        );
1741        assert_eq!(
1742            TargetConfig::default().page_channel_capacity,
1743            DEFAULT_PAGE_CHANNEL_CAPACITY,
1744            "TargetConfig default must match the historical 2048 slot count"
1745        );
1746        // BrowserConfigBuilder default → build a builder (no executable
1747        // check needed: we only inspect the numeric field, not `build()`).
1748        let builder = BrowserConfigBuilder::default();
1749        let bc = format!("{:?}", builder);
1750        assert!(
1751            bc.contains("page_channel_capacity: 2048"),
1752            "BrowserConfigBuilder must default page_channel_capacity to 2048, got: {bc}",
1753        );
1754    }
1755}