1use crate::listeners::{EventListenerRequest, EventListeners};
2use chromiumoxide_cdp::cdp::browser_protocol::browser::*;
3use chromiumoxide_cdp::cdp::browser_protocol::target::*;
4use chromiumoxide_cdp::cdp::events::CdpEvent;
5use chromiumoxide_cdp::cdp::events::CdpEventMessage;
6use chromiumoxide_types::{CallId, Message, Method, Response};
7use chromiumoxide_types::{MethodId, Request as CdpRequest};
8use fnv::FnvHashMap;
9use futures_util::Stream;
10use hashbrown::{HashMap, HashSet};
11use spider_network_blocker::intercept_manager::NetworkInterceptManager;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14use std::time::{Duration, Instant};
15use tokio::sync::mpsc::Receiver;
16use tokio::sync::oneshot::Sender as OneshotSender;
17use tokio_tungstenite::tungstenite::error::ProtocolError;
18use tokio_tungstenite::tungstenite::Error;
19
20use std::sync::Arc;
21use tokio::sync::Notify;
22
23use crate::cmd::{to_command_response, CommandMessage};
24use crate::conn::Connection;
25use crate::error::{CdpError, Result};
26use crate::handler::browser::BrowserContext;
27use crate::handler::frame::FrameRequestedNavigation;
28use crate::handler::frame::{NavigationError, NavigationId, NavigationOk};
29use crate::handler::job::PeriodicJob;
30use crate::handler::session::Session;
31use crate::handler::target::TargetEvent;
32use crate::handler::target::{Target, TargetConfig};
33use crate::handler::viewport::Viewport;
34use crate::page::Page;
35pub(crate) use page::PageInner;
36
37pub const REQUEST_TIMEOUT: u64 = 30_000;
39
40pub mod blockers;
41pub mod browser;
42pub mod commandfuture;
43pub mod domworld;
44pub mod emulation;
45pub mod frame;
46pub mod http;
47pub mod httpfuture;
48mod job;
49pub mod network;
50pub mod network_utils;
51pub mod page;
52#[cfg(feature = "parallel-handler")]
53pub mod parallel;
54pub mod sender;
55mod session;
56pub mod target;
57pub mod target_message_future;
58pub mod viewport;
59
60#[must_use = "streams do nothing unless polled"]
63#[derive(Debug)]
64pub struct Handler {
65 pub default_browser_context: BrowserContext,
66 pub browser_contexts: HashSet<BrowserContext>,
67 pending_commands: FnvHashMap<CallId, (PendingRequest, MethodId, Instant)>,
71 from_browser: Receiver<HandlerMessage>,
73 target_ids: Vec<TargetId>,
75 targets: HashMap<TargetId, Target>,
77 navigations: FnvHashMap<NavigationId, NavigationRequest>,
79 sessions: HashMap<SessionId, Session>,
83 conn: Option<Connection<CdpEventMessage>>,
86 evict_command_timeout: PeriodicJob,
88 next_navigation_id: usize,
90 config: HandlerConfig,
92 event_listeners: EventListeners,
94 closing: bool,
96 remaining_bytes: Option<u64>,
98 budget_exhausted: bool,
100 attached_targets: HashSet<TargetId>,
102 page_wake: Option<Arc<Notify>>,
105}
106
107lazy_static::lazy_static! {
108 static ref DISCOVER_ID: (std::borrow::Cow<'static, str>, serde_json::Value) = {
110 let discover = SetDiscoverTargetsParams::new(true);
111 (discover.identifier(), serde_json::to_value(discover).expect("valid discover target params"))
112 };
113 static ref TARGET_PARAMS_ID: (std::borrow::Cow<'static, str>, serde_json::Value) = {
115 let msg = GetTargetsParams { filter: None };
116 (msg.identifier(), serde_json::to_value(msg).expect("valid paramtarget"))
117 };
118 static ref CLOSE_PARAMS_ID: (std::borrow::Cow<'static, str>, serde_json::Value) = {
120 let close_msg = CloseParams::default();
121 (close_msg.identifier(), serde_json::to_value(close_msg).expect("valid close params"))
122 };
123}
124
125fn maybe_store_attach_session_id(target: &mut Target, method: &MethodId, resp: &Response) {
126 if method.as_ref() != AttachToTargetParams::IDENTIFIER {
127 return;
128 }
129
130 if let Ok(resp) = to_command_response::<AttachToTargetParams>(resp.clone(), method.clone()) {
131 target.set_session_id(resp.result.session_id);
132 }
133}
134
135impl Handler {
136 pub(crate) fn new(
139 mut conn: Connection<CdpEventMessage>,
140 rx: Receiver<HandlerMessage>,
141 config: HandlerConfig,
142 ) -> Self {
143 let discover = DISCOVER_ID.clone();
144 let _ = conn.submit_command(discover.0, None, discover.1);
145 let conn = Some(conn);
146
147 let browser_contexts = config
148 .context_ids
149 .iter()
150 .map(|id| BrowserContext::from(id.clone()))
151 .collect();
152
153 Self {
154 pending_commands: Default::default(),
155 from_browser: rx,
156 default_browser_context: Default::default(),
157 browser_contexts,
158 target_ids: Default::default(),
159 targets: Default::default(),
160 navigations: Default::default(),
161 sessions: Default::default(),
162 conn,
163 evict_command_timeout: PeriodicJob::new(config.request_timeout),
164 next_navigation_id: 0,
165 config,
166 event_listeners: Default::default(),
167 closing: false,
168 remaining_bytes: None,
169 budget_exhausted: false,
170 attached_targets: Default::default(),
171 page_wake: None,
172 }
173 }
174
175 #[inline]
178 fn conn(&mut self) -> Result<&mut Connection<CdpEventMessage>> {
179 self.conn
180 .as_mut()
181 .ok_or_else(|| CdpError::msg("connection consumed by Handler::run()"))
182 }
183
184 pub fn get_target(&self, target_id: &TargetId) -> Option<&Target> {
186 self.targets.get(target_id)
187 }
188
189 pub fn targets(&self) -> impl Iterator<Item = &Target> + '_ {
191 self.targets.values()
192 }
193
194 pub fn default_browser_context(&self) -> &BrowserContext {
196 &self.default_browser_context
197 }
198
199 pub fn browser_contexts(&self) -> impl Iterator<Item = &BrowserContext> + '_ {
201 self.browser_contexts.iter()
202 }
203
204 fn on_navigation_response(&mut self, id: NavigationId, resp: Response) {
206 if let Some(nav) = self.navigations.remove(&id) {
207 match nav {
208 NavigationRequest::Navigate(mut nav) => {
209 if nav.navigated {
210 let _ = nav.tx.send(Ok(resp));
211 } else {
212 nav.set_response(resp);
213 self.navigations
214 .insert(id, NavigationRequest::Navigate(nav));
215 }
216 }
217 }
218 }
219 }
220
221 fn on_navigation_lifecycle_completed(&mut self, res: Result<NavigationOk, NavigationError>) {
223 match res {
224 Ok(ok) => {
225 let id = *ok.navigation_id();
226 if let Some(nav) = self.navigations.remove(&id) {
227 match nav {
228 NavigationRequest::Navigate(mut nav) => {
229 if let Some(resp) = nav.response.take() {
230 let _ = nav.tx.send(Ok(resp));
231 } else {
232 nav.set_navigated();
233 self.navigations
234 .insert(id, NavigationRequest::Navigate(nav));
235 }
236 }
237 }
238 }
239 }
240 Err(err) => {
241 if let Some(nav) = self.navigations.remove(err.navigation_id()) {
242 match nav {
243 NavigationRequest::Navigate(nav) => {
244 let _ = nav.tx.send(Err(err.into()));
245 }
246 }
247 }
248 }
249 }
250 }
251
252 fn on_response(&mut self, resp: Response) {
254 if let Some((req, method, _)) = self.pending_commands.remove(&resp.id) {
255 match req {
256 PendingRequest::CreateTarget(tx) => {
257 match to_command_response::<CreateTargetParams>(resp, method) {
258 Ok(resp) => {
259 if let Some(target) = self.targets.get_mut(&resp.target_id) {
260 target.set_initiator(tx);
261 } else {
262 let _ = tx.send(Err(CdpError::NotFound)).ok();
263 }
264 }
265 Err(err) => {
266 let _ = tx.send(Err(err)).ok();
267 }
268 }
269 }
270 PendingRequest::GetTargets(tx) => {
271 match to_command_response::<GetTargetsParams>(resp, method) {
272 Ok(resp) => {
273 let targets = resp.result.target_infos;
274 let results = targets.clone();
275
276 for target_info in targets {
277 let event: EventTargetCreated = EventTargetCreated { target_info };
278 self.on_target_created(event);
279 }
280
281 let _ = tx.send(Ok(results)).ok();
282 }
283 Err(err) => {
284 let _ = tx.send(Err(err)).ok();
285 }
286 }
287 }
288 PendingRequest::Navigate(id) => {
289 self.on_navigation_response(id, resp);
290 if self.config.only_html && !self.config.created_first_target {
291 self.config.created_first_target = true;
292 }
293 }
294 PendingRequest::ExternalCommand { tx, .. } => {
295 let _ = tx.send(Ok(resp)).ok();
296 }
297 PendingRequest::InternalCommand(target_id) => {
298 if let Some(target) = self.targets.get_mut(&target_id) {
299 maybe_store_attach_session_id(target, &method, &resp);
300 target.on_response(resp, method.as_ref());
301 }
302 }
303 PendingRequest::CloseBrowser(tx) => {
304 self.closing = true;
305 let _ = tx.send(Ok(CloseReturns {})).ok();
306 }
307 }
308 }
309 }
310
311 pub(crate) fn submit_external_command(
313 &mut self,
314 msg: CommandMessage,
315 now: Instant,
316 ) -> Result<()> {
317 let target_id = msg
322 .session_id
323 .as_ref()
324 .and_then(|sid| self.sessions.get(sid.as_ref()))
325 .map(|s| s.target_id().clone());
326 let call_id =
327 self.conn()?
328 .submit_command(msg.method.clone(), msg.session_id, msg.params)?;
329 self.pending_commands.insert(
330 call_id,
331 (
332 PendingRequest::ExternalCommand {
333 tx: msg.sender,
334 target_id,
335 },
336 msg.method,
337 now,
338 ),
339 );
340 Ok(())
341 }
342
343 pub(crate) fn submit_internal_command(
344 &mut self,
345 target_id: TargetId,
346 req: CdpRequest,
347 now: Instant,
348 ) -> Result<()> {
349 let call_id = self.conn()?.submit_command(
350 req.method.clone(),
351 req.session_id.map(Into::into),
352 req.params,
353 )?;
354 self.pending_commands.insert(
355 call_id,
356 (PendingRequest::InternalCommand(target_id), req.method, now),
357 );
358 Ok(())
359 }
360
361 fn submit_fetch_targets(&mut self, tx: OneshotSender<Result<Vec<TargetInfo>>>, now: Instant) {
362 let msg = TARGET_PARAMS_ID.clone();
363
364 if let Some(conn) = self.conn.as_mut() {
365 if let Ok(call_id) = conn.submit_command(msg.0.clone(), None, msg.1) {
366 self.pending_commands
367 .insert(call_id, (PendingRequest::GetTargets(tx), msg.0, now));
368 }
369 }
370 }
371
372 fn submit_navigation(&mut self, id: NavigationId, req: CdpRequest, now: Instant) {
375 if let Some(conn) = self.conn.as_mut() {
376 if let Ok(call_id) = conn.submit_command(
377 req.method.clone(),
378 req.session_id.map(Into::into),
379 req.params,
380 ) {
381 self.pending_commands
382 .insert(call_id, (PendingRequest::Navigate(id), req.method, now));
383 }
384 }
385 }
386
387 fn submit_close(&mut self, tx: OneshotSender<Result<CloseReturns>>, now: Instant) {
388 let close_msg = CLOSE_PARAMS_ID.clone();
389
390 if let Some(conn) = self.conn.as_mut() {
391 if let Ok(call_id) = conn.submit_command(close_msg.0.clone(), None, close_msg.1) {
392 self.pending_commands.insert(
393 call_id,
394 (PendingRequest::CloseBrowser(tx), close_msg.0, now),
395 );
396 }
397 }
398 }
399
400 fn on_target_message(&mut self, target: &mut Target, msg: CommandMessage, now: Instant) {
402 if msg.is_navigation() {
403 let (req, tx) = msg.split();
404 let id = self.next_navigation_id();
405
406 target.goto(FrameRequestedNavigation::new(
407 id,
408 req,
409 self.config.request_timeout,
410 ));
411
412 self.navigations.insert(
413 id,
414 NavigationRequest::Navigate(NavigationInProgress::new(tx)),
415 );
416 } else {
417 let _ = self.submit_external_command(msg, now);
418 }
419 }
420
421 fn next_navigation_id(&mut self) -> NavigationId {
423 let id = NavigationId(self.next_navigation_id);
424 self.next_navigation_id = self.next_navigation_id.wrapping_add(1);
425 id
426 }
427
428 fn create_page(&mut self, params: CreateTargetParams, tx: OneshotSender<Result<Page>>) {
439 let about_blank = params.url == "about:blank";
440 let http_check =
441 !about_blank && params.url.starts_with("http") || params.url.starts_with("file://");
442
443 if about_blank || http_check {
444 let method = params.identifier();
445
446 let Some(conn) = self.conn.as_mut() else {
447 let _ = tx.send(Err(CdpError::msg("connection consumed"))).ok();
448 return;
449 };
450 match serde_json::to_value(params) {
451 Ok(params) => match conn.submit_command(method.clone(), None, params) {
452 Ok(call_id) => {
453 self.pending_commands.insert(
454 call_id,
455 (PendingRequest::CreateTarget(tx), method, Instant::now()),
456 );
457 }
458 Err(err) => {
459 let _ = tx.send(Err(err.into())).ok();
460 }
461 },
462 Err(err) => {
463 let _ = tx.send(Err(err.into())).ok();
464 }
465 }
466 } else {
467 let _ = tx.send(Err(CdpError::NotFound)).ok();
468 }
469 }
470
471 fn on_event(&mut self, event: CdpEventMessage) {
473 if let Some(session_id) = &event.session_id {
474 if let Some(session) = self.sessions.get(session_id.as_str()) {
475 if let Some(target) = self.targets.get_mut(session.target_id()) {
476 return target.on_event(event);
477 }
478 }
479 }
480 let CdpEventMessage { params, method, .. } = event;
481
482 match params {
483 CdpEvent::TargetTargetCreated(ref ev) => self.on_target_created((**ev).clone()),
484 CdpEvent::TargetAttachedToTarget(ref ev) => self.on_attached_to_target(ev.clone()),
485 CdpEvent::TargetTargetDestroyed(ref ev) => self.on_target_destroyed(ev.clone()),
486 CdpEvent::TargetTargetCrashed(ref ev) => self.on_target_crashed(ev.clone()),
487 CdpEvent::TargetDetachedFromTarget(ref ev) => self.on_detached_from_target(ev.clone()),
488 _ => {}
489 }
490
491 chromiumoxide_cdp::consume_event!(match params {
492 |ev| self.event_listeners.start_send(ev),
493 |json| { let _ = self.event_listeners.try_send_custom(&method, json);}
494 });
495 }
496
497 fn on_target_created(&mut self, event: EventTargetCreated) {
501 if !self.browser_contexts.is_empty() {
502 if let Some(ref context_id) = event.target_info.browser_context_id {
503 let bc = BrowserContext {
504 id: Some(context_id.clone()),
505 };
506 if !self.browser_contexts.contains(&bc) {
507 return;
508 }
509 }
510 }
511 let browser_ctx = event
512 .target_info
513 .browser_context_id
514 .clone()
515 .map(BrowserContext::from)
516 .unwrap_or_else(|| self.default_browser_context.clone());
517 let target = Target::new(
518 event.target_info,
519 TargetConfig {
520 ignore_https_errors: self.config.ignore_https_errors,
521 request_timeout: self.config.request_timeout,
522 viewport: self.config.viewport.clone(),
523 request_intercept: self.config.request_intercept,
524 cache_enabled: self.config.cache_enabled,
525 service_worker_enabled: self.config.service_worker_enabled,
526 ignore_visuals: self.config.ignore_visuals,
527 ignore_stylesheets: self.config.ignore_stylesheets,
528 ignore_javascript: self.config.ignore_javascript,
529 ignore_analytics: self.config.ignore_analytics,
530 ignore_prefetch: self.config.ignore_prefetch,
531 allow_first_party_stylesheets: self.config.allow_first_party_stylesheets,
532 allow_first_party_javascript: self.config.allow_first_party_javascript,
533 allow_first_party_visuals: self.config.allow_first_party_visuals,
534 extra_headers: self.config.extra_headers.clone(),
535 only_html: self.config.only_html && self.config.created_first_target,
536 intercept_manager: self.config.intercept_manager,
537 remote_local_policy: self.config.remote_local_policy,
538 max_bytes_allowed: self.config.max_bytes_allowed,
539 max_redirects: self.config.max_redirects,
540 max_main_frame_navigations: self.config.max_main_frame_navigations,
541 whitelist_patterns: self.config.whitelist_patterns.clone(),
542 blacklist_patterns: self.config.blacklist_patterns.clone(),
543 #[cfg(feature = "adblock")]
544 adblock_filter_rules: self.config.adblock_filter_rules.clone(),
545 page_wake: self.page_wake.clone(),
546 page_channel_capacity: self.config.page_channel_capacity,
547 },
548 browser_ctx,
549 );
550
551 let tid = target.target_id().clone();
552 self.target_ids.push(tid.clone());
553 self.targets.insert(tid, target);
554 }
555
556 fn on_attached_to_target(&mut self, event: Box<EventAttachedToTarget>) {
558 let session = Session::new(event.session_id.clone(), event.target_info.target_id);
559 if let Some(target) = self.targets.get_mut(session.target_id()) {
560 target.set_session_id(session.session_id().clone())
561 }
562 self.sessions.insert(event.session_id, session);
563 }
564
565 fn on_detached_from_target(&mut self, event: EventDetachedFromTarget) {
569 if let Some(session) = self.sessions.remove(&event.session_id) {
571 if let Some(target) = self.targets.get_mut(session.target_id()) {
572 target.session_id_mut().take();
573 }
574 }
575 }
576
577 fn on_target_destroyed(&mut self, event: EventTargetDestroyed) {
579 self.attached_targets.remove(&event.target_id);
580
581 if let Some(target) = self.targets.remove(&event.target_id) {
582 if let Some(session) = target.session_id() {
584 self.sessions.remove(session);
585 }
586 }
587 }
588
589 fn on_target_crashed(&mut self, event: EventTargetCrashed) {
612 let crashed_id = event.target_id.clone();
613 let status = event.status.clone();
614 let error_code = event.error_code;
615
616 let to_cancel: Vec<CallId> = self
622 .pending_commands
623 .iter()
624 .filter_map(|(&call_id, (req, _, _))| match req {
625 PendingRequest::ExternalCommand {
626 target_id: Some(tid),
627 ..
628 } if *tid == crashed_id => Some(call_id),
629 PendingRequest::InternalCommand(tid) if *tid == crashed_id => Some(call_id),
630 _ => None,
631 })
632 .collect();
633
634 for call_id in to_cancel {
635 if let Some((req, _, _)) = self.pending_commands.remove(&call_id) {
636 match req {
637 PendingRequest::ExternalCommand { tx, .. } => {
638 let _ = tx.send(Err(CdpError::msg(format!(
639 "target {:?} crashed: {} (errorCode={})",
640 crashed_id, status, error_code
641 ))));
642 }
643 PendingRequest::InternalCommand(_) => {
644 }
647 _ => {}
648 }
649 }
650 }
651
652 self.attached_targets.remove(&crashed_id);
654 if let Some(target) = self.targets.remove(&crashed_id) {
655 if let Some(session) = target.session_id() {
656 self.sessions.remove(session);
657 }
658 }
659 }
660
661 fn evict_timed_out_commands(&mut self, now: Instant) {
666 let deadline = match now.checked_sub(self.config.request_timeout) {
667 Some(d) => d,
668 None => return,
669 };
670
671 let timed_out: Vec<_> = self
672 .pending_commands
673 .iter()
674 .filter(|(_, (_, _, timestamp))| *timestamp < deadline)
675 .map(|(k, _)| *k)
676 .collect();
677
678 for call in timed_out {
679 if let Some((req, _, _)) = self.pending_commands.remove(&call) {
680 match req {
681 PendingRequest::CreateTarget(tx) => {
682 let _ = tx.send(Err(CdpError::Timeout));
683 }
684 PendingRequest::GetTargets(tx) => {
685 let _ = tx.send(Err(CdpError::Timeout));
686 }
687 PendingRequest::Navigate(nav) => {
688 if let Some(nav) = self.navigations.remove(&nav) {
689 match nav {
690 NavigationRequest::Navigate(nav) => {
691 let _ = nav.tx.send(Err(CdpError::Timeout));
692 }
693 }
694 }
695 }
696 PendingRequest::ExternalCommand { tx, .. } => {
697 let _ = tx.send(Err(CdpError::Timeout));
698 }
699 PendingRequest::InternalCommand(_) => {}
700 PendingRequest::CloseBrowser(tx) => {
701 let _ = tx.send(Err(CdpError::Timeout));
702 }
703 }
704 }
705 }
706 }
707
708 pub fn event_listeners_mut(&mut self) -> &mut EventListeners {
709 &mut self.event_listeners
710 }
711
712 pub async fn run(mut self) -> Result<()> {
742 use chromiumoxide_types::Message;
743 use tokio::time::MissedTickBehavior;
744 use tokio_tungstenite::tungstenite::{self, error::ProtocolError};
745
746 let page_wake = Arc::new(Notify::new());
748 self.page_wake = Some(page_wake.clone());
749
750 let conn = self
752 .conn
753 .take()
754 .ok_or_else(|| CdpError::msg("Handler::run() called with no connection"))?;
755 let async_conn = conn.into_async();
756 let mut ws_reader = async_conn.reader;
757 let ws_tx = async_conn.cmd_tx;
758 let mut writer_handle = async_conn.writer_handle;
759 let reader_handle = async_conn.reader_handle;
760 let mut next_call_id = async_conn.next_id;
761
762 let mut alloc_call_id = || {
764 let id = chromiumoxide_types::CallId::new(next_call_id);
765 next_call_id = next_call_id.wrapping_add(1);
766 id
767 };
768
769 let mut evict_timer = tokio::time::interval_at(
771 tokio::time::Instant::now() + self.config.request_timeout,
772 self.config.request_timeout,
773 );
774 evict_timer.set_missed_tick_behavior(MissedTickBehavior::Delay);
775
776 macro_rules! ws_submit {
778 ($method:expr, $session_id:expr, $params:expr) => {{
779 let id = alloc_call_id();
780 let call = chromiumoxide_types::MethodCall {
781 id,
782 method: $method,
783 session_id: $session_id,
784 params: $params,
785 };
786 match ws_tx.try_send(call) {
787 Ok(()) => Ok::<_, CdpError>(id),
788 Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
789 tracing::warn!("WS command channel full — dropping command");
790 Err(CdpError::msg("WS command channel full"))
791 }
792 Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
793 Err(CdpError::msg("WS writer closed"))
794 }
795 }
796 }};
797 }
798
799 let run_result: Result<()> = loop {
807 let now = std::time::Instant::now();
808
809 const PER_TARGET_DRAIN_BUDGET: usize = 128;
815
816 for n in (0..self.target_ids.len()).rev() {
817 let target_id = self.target_ids.swap_remove(n);
818
819 if let Some((id, mut target)) = self.targets.remove_entry(&target_id) {
820 {
822 let mut msgs = Vec::new();
823 if let Some(handle) = target.page_mut() {
824 while msgs.len() < PER_TARGET_DRAIN_BUDGET {
825 match handle.rx.try_recv() {
826 Ok(msg) => msgs.push(msg),
827 Err(_) => break,
828 }
829 }
830 }
831 for msg in msgs {
832 target.on_page_message(msg);
833 }
834 }
835
836 while let Some(event) = target.advance(now) {
838 match event {
839 TargetEvent::Request(req) => {
840 if let Ok(call_id) =
841 ws_submit!(req.method.clone(), req.session_id, req.params)
842 {
843 self.pending_commands.insert(
844 call_id,
845 (
846 PendingRequest::InternalCommand(
847 target.target_id().clone(),
848 ),
849 req.method,
850 now,
851 ),
852 );
853 }
854 }
855 TargetEvent::Command(msg) => {
856 if msg.is_navigation() {
857 let (req, tx) = msg.split();
858 let nav_id = self.next_navigation_id();
859 target.goto(FrameRequestedNavigation::new(
860 nav_id,
861 req.clone(),
862 self.config.request_timeout,
863 ));
864 if let Ok(call_id) =
865 ws_submit!(req.method.clone(), req.session_id, req.params)
866 {
867 self.pending_commands.insert(
868 call_id,
869 (PendingRequest::Navigate(nav_id), req.method, now),
870 );
871 }
872 self.navigations.insert(
873 nav_id,
874 NavigationRequest::Navigate(NavigationInProgress::new(tx)),
875 );
876 } else if let Ok(call_id) = ws_submit!(
877 msg.method.clone(),
878 msg.session_id.map(Into::into),
879 msg.params
880 ) {
881 let target_id = Some(target.target_id().clone());
885 self.pending_commands.insert(
886 call_id,
887 (
888 PendingRequest::ExternalCommand {
889 tx: msg.sender,
890 target_id,
891 },
892 msg.method,
893 now,
894 ),
895 );
896 }
897 }
898 TargetEvent::NavigationRequest(nav_id, req) => {
899 if let Ok(call_id) =
900 ws_submit!(req.method.clone(), req.session_id, req.params)
901 {
902 self.pending_commands.insert(
903 call_id,
904 (PendingRequest::Navigate(nav_id), req.method, now),
905 );
906 }
907 }
908 TargetEvent::NavigationResult(res) => {
909 self.on_navigation_lifecycle_completed(res);
910 }
911 TargetEvent::BytesConsumed(n) => {
912 if let Some(rem) = self.remaining_bytes.as_mut() {
913 *rem = rem.saturating_sub(n);
914 if *rem == 0 {
915 self.budget_exhausted = true;
916 }
917 }
918 }
919 }
920 }
921
922 target.event_listeners_mut().flush();
924
925 self.targets.insert(id, target);
926 self.target_ids.push(target_id);
927 }
928 }
929
930 self.event_listeners.flush();
932
933 if self.budget_exhausted {
934 for t in self.targets.values_mut() {
935 t.network_manager.set_block_all(true);
936 }
937 }
938
939 if self.closing {
940 break Ok(());
941 }
942
943 tokio::select! {
945 msg = self.from_browser.recv() => {
946 match msg {
947 Some(msg) => {
948 match msg {
949 HandlerMessage::Command(cmd) => {
950 let target_id = cmd
953 .session_id
954 .as_ref()
955 .and_then(|sid| self.sessions.get(sid.as_ref()))
956 .map(|s| s.target_id().clone());
957 if let Ok(call_id) = ws_submit!(
958 cmd.method.clone(),
959 cmd.session_id.map(Into::into),
960 cmd.params
961 ) {
962 self.pending_commands.insert(
963 call_id,
964 (
965 PendingRequest::ExternalCommand {
966 tx: cmd.sender,
967 target_id,
968 },
969 cmd.method,
970 now,
971 ),
972 );
973 }
974 }
975 HandlerMessage::FetchTargets(tx) => {
976 let msg = TARGET_PARAMS_ID.clone();
977 if let Ok(call_id) = ws_submit!(msg.0.clone(), None, msg.1) {
978 self.pending_commands.insert(
979 call_id,
980 (PendingRequest::GetTargets(tx), msg.0, now),
981 );
982 }
983 }
984 HandlerMessage::CloseBrowser(tx) => {
985 let close_msg = CLOSE_PARAMS_ID.clone();
986 if let Ok(call_id) = ws_submit!(close_msg.0.clone(), None, close_msg.1) {
987 self.pending_commands.insert(
988 call_id,
989 (PendingRequest::CloseBrowser(tx), close_msg.0, now),
990 );
991 }
992 }
993 HandlerMessage::CreatePage(params, tx) => {
994 if let Some(ref id) = params.browser_context_id {
995 self.browser_contexts.insert(BrowserContext::from(id.clone()));
996 }
997 self.create_page_async(params, tx, &mut alloc_call_id, &ws_tx, now);
998 }
999 HandlerMessage::GetPages(tx) => {
1000 let pages: Vec<_> = self.targets.values_mut()
1001 .filter(|p| p.is_page())
1002 .filter_map(|target| target.get_or_create_page())
1003 .map(|page| Page::from(page.clone()))
1004 .collect();
1005 let _ = tx.send(pages);
1006 }
1007 HandlerMessage::InsertContext(ctx) => {
1008 if self.default_browser_context.id().is_none() {
1009 self.default_browser_context = ctx.clone();
1010 }
1011 self.browser_contexts.insert(ctx);
1012 }
1013 HandlerMessage::DisposeContext(ctx) => {
1014 self.browser_contexts.remove(&ctx);
1015 self.attached_targets.retain(|tid| {
1016 self.targets.get(tid)
1017 .and_then(|t| t.browser_context_id())
1018 .map(|id| Some(id) != ctx.id())
1019 .unwrap_or(true)
1020 });
1021 self.closing = true;
1022 }
1023 HandlerMessage::GetPage(target_id, tx) => {
1024 let page = self.targets.get_mut(&target_id)
1025 .and_then(|target| target.get_or_create_page())
1026 .map(|page| Page::from(page.clone()));
1027 let _ = tx.send(page);
1028 }
1029 HandlerMessage::AddEventListener(req) => {
1030 self.event_listeners.add_listener(req);
1031 }
1032 }
1033 }
1034 None => break Ok(()), }
1036 }
1037
1038 frame = ws_reader.next_message() => {
1039 match frame {
1040 Some(Ok(boxed_msg)) => match *boxed_msg {
1041 Message::Response(resp) => {
1042 self.on_response(resp);
1043 }
1044 Message::Event(ev) => {
1045 self.on_event(ev);
1046 }
1047 },
1048 Some(Err(err)) => {
1049 tracing::error!("WS Connection error: {:?}", err);
1050 if let CdpError::Ws(ref ws_error) = err {
1051 match ws_error {
1052 tungstenite::Error::AlreadyClosed => break Ok(()),
1053 tungstenite::Error::Protocol(detail)
1054 if detail == &ProtocolError::ResetWithoutClosingHandshake =>
1055 {
1056 break Ok(());
1057 }
1058 _ => break Err(err),
1059 }
1060 } else {
1061 break Err(err);
1062 }
1063 }
1064 None => break Ok(()), }
1066 }
1067
1068 _ = page_wake.notified() => {
1069 }
1071
1072 _ = evict_timer.tick() => {
1073 self.evict_timed_out_commands(now);
1074 for t in self.targets.values_mut() {
1075 t.network_manager.evict_stale_entries(now);
1076 t.frame_manager_mut().evict_stale_context_ids();
1077 }
1078 }
1079
1080 result = &mut writer_handle => {
1081 match result {
1083 Ok(Ok(())) => break Ok(()),
1084 Ok(Err(e)) => break Err(e),
1085 Err(e) => break Err(CdpError::msg(format!("WS writer panicked: {e}"))),
1086 }
1087 }
1088 }
1089 };
1090
1091 drop(ws_tx);
1099
1100 if !writer_handle.is_finished() {
1105 let _ = tokio::time::timeout(std::time::Duration::from_millis(500), &mut writer_handle)
1106 .await;
1107 if !writer_handle.is_finished() {
1108 writer_handle.abort();
1109 }
1110 }
1111
1112 reader_handle.abort();
1118
1119 run_result
1120 }
1121
1122 fn create_page_async(
1124 &mut self,
1125 params: CreateTargetParams,
1126 tx: OneshotSender<Result<Page>>,
1127 alloc_call_id: &mut impl FnMut() -> chromiumoxide_types::CallId,
1128 ws_tx: &tokio::sync::mpsc::Sender<chromiumoxide_types::MethodCall>,
1129 now: std::time::Instant,
1130 ) {
1131 let about_blank = params.url == "about:blank";
1132 let http_check =
1133 !about_blank && params.url.starts_with("http") || params.url.starts_with("file://");
1134
1135 if about_blank || http_check {
1136 let method = params.identifier();
1137 match serde_json::to_value(params) {
1138 Ok(params) => {
1139 let id = alloc_call_id();
1140 let call = chromiumoxide_types::MethodCall {
1141 id,
1142 method: method.clone(),
1143 session_id: None,
1144 params,
1145 };
1146 match ws_tx.try_send(call) {
1147 Ok(()) => {
1148 self.pending_commands
1149 .insert(id, (PendingRequest::CreateTarget(tx), method, now));
1150 }
1151 Err(_) => {
1152 let _ = tx
1153 .send(Err(CdpError::msg("WS command channel full or closed")))
1154 .ok();
1155 }
1156 }
1157 }
1158 Err(err) => {
1159 let _ = tx.send(Err(err.into())).ok();
1160 }
1161 }
1162 } else {
1163 let _ = tx.send(Err(CdpError::NotFound)).ok();
1164 }
1165 }
1166
1167 #[cfg(feature = "parallel-handler")]
1173 pub async fn run_parallel(mut self) -> Result<()> {
1174 let conn = self
1178 .conn
1179 .take()
1180 .ok_or_else(|| CdpError::msg("Handler::run_parallel() called with no connection"))?;
1181 let async_conn = conn.into_async();
1182
1183 let next_id = async_conn.next_id;
1186 let boot_call_id = chromiumoxide_types::CallId::new(next_id.saturating_sub(1));
1187 let boot_method = DISCOVER_ID.0.clone();
1188
1189 let router = parallel::Router::new(
1190 self.config,
1191 self.default_browser_context,
1192 self.from_browser,
1193 async_conn.reader,
1194 async_conn.cmd_tx,
1195 boot_call_id,
1196 boot_method,
1197 next_id,
1198 );
1199 let result = router.run().await;
1200
1201 async_conn.writer_handle.abort();
1203 async_conn.reader_handle.abort();
1204
1205 result
1206 }
1207}
1208
1209impl Stream for Handler {
1210 type Item = Result<()>;
1211
1212 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1213 const BROWSER_MSG_BUDGET: usize = 128;
1219 const PER_TARGET_DRAIN_BUDGET: usize = 128;
1220 const WS_MSG_BUDGET: usize = 512;
1221
1222 let pin = self.get_mut();
1223
1224 let mut dispose = false;
1225 let mut budget_hit = false;
1226
1227 let now = Instant::now();
1228
1229 loop {
1230 let mut browser_msgs = 0usize;
1234 while let Poll::Ready(Some(msg)) = pin.from_browser.poll_recv(cx) {
1235 match msg {
1236 HandlerMessage::Command(cmd) => {
1237 pin.submit_external_command(cmd, now)?;
1238 }
1239 HandlerMessage::FetchTargets(tx) => {
1240 pin.submit_fetch_targets(tx, now);
1241 }
1242 HandlerMessage::CloseBrowser(tx) => {
1243 pin.submit_close(tx, now);
1244 }
1245 HandlerMessage::CreatePage(params, tx) => {
1246 if let Some(ref id) = params.browser_context_id {
1247 pin.browser_contexts
1248 .insert(BrowserContext::from(id.clone()));
1249 }
1250 pin.create_page(params, tx);
1251 }
1252 HandlerMessage::GetPages(tx) => {
1253 let pages: Vec<_> = pin
1254 .targets
1255 .values_mut()
1256 .filter(|p: &&mut Target| p.is_page())
1257 .filter_map(|target| target.get_or_create_page())
1258 .map(|page| Page::from(page.clone()))
1259 .collect();
1260 let _ = tx.send(pages);
1261 }
1262 HandlerMessage::InsertContext(ctx) => {
1263 if pin.default_browser_context.id().is_none() {
1264 pin.default_browser_context = ctx.clone();
1265 }
1266 pin.browser_contexts.insert(ctx);
1267 }
1268 HandlerMessage::DisposeContext(ctx) => {
1269 pin.browser_contexts.remove(&ctx);
1270 pin.attached_targets.retain(|tid| {
1271 pin.targets
1272 .get(tid)
1273 .and_then(|t| t.browser_context_id()) .map(|id| Some(id) != ctx.id())
1275 .unwrap_or(true)
1276 });
1277 pin.closing = true;
1278 dispose = true;
1279 }
1280 HandlerMessage::GetPage(target_id, tx) => {
1281 let page = pin
1282 .targets
1283 .get_mut(&target_id)
1284 .and_then(|target| target.get_or_create_page())
1285 .map(|page| Page::from(page.clone()));
1286 let _ = tx.send(page);
1287 }
1288 HandlerMessage::AddEventListener(req) => {
1289 pin.event_listeners.add_listener(req);
1290 }
1291 }
1292 browser_msgs += 1;
1293 if browser_msgs >= BROWSER_MSG_BUDGET {
1294 budget_hit = true;
1295 break;
1296 }
1297 }
1298
1299 for n in (0..pin.target_ids.len()).rev() {
1300 let target_id = pin.target_ids.swap_remove(n);
1301
1302 if let Some((id, mut target)) = pin.targets.remove_entry(&target_id) {
1303 let mut drained = 0usize;
1304 while let Some(event) = target.poll(cx, now) {
1305 match event {
1306 TargetEvent::Request(req) => {
1307 let _ = pin.submit_internal_command(
1308 target.target_id().clone(),
1309 req,
1310 now,
1311 );
1312 }
1313 TargetEvent::Command(msg) => {
1314 pin.on_target_message(&mut target, msg, now);
1315 }
1316 TargetEvent::NavigationRequest(id, req) => {
1317 pin.submit_navigation(id, req, now);
1318 }
1319 TargetEvent::NavigationResult(res) => {
1320 pin.on_navigation_lifecycle_completed(res)
1321 }
1322 TargetEvent::BytesConsumed(n) => {
1323 if let Some(rem) = pin.remaining_bytes.as_mut() {
1324 *rem = rem.saturating_sub(n);
1325 if *rem == 0 {
1326 pin.budget_exhausted = true;
1327 }
1328 }
1329 }
1330 }
1331 drained += 1;
1332 if drained >= PER_TARGET_DRAIN_BUDGET {
1333 budget_hit = true;
1334 break;
1335 }
1336 }
1337
1338 target.event_listeners_mut().poll(cx);
1340
1341 pin.targets.insert(id, target);
1342 pin.target_ids.push(target_id);
1343 }
1344 }
1345
1346 pin.event_listeners_mut().poll(cx);
1349
1350 let mut done = true;
1351
1352 let mut ws_msgs = Vec::new();
1355 let mut ws_err = None;
1356 {
1357 let Some(conn) = pin.conn.as_mut() else {
1358 return Poll::Ready(Some(Err(CdpError::msg(
1359 "connection consumed by Handler::run()",
1360 ))));
1361 };
1362 while let Poll::Ready(Some(ev)) = Pin::new(&mut *conn).poll_next(cx) {
1363 match ev {
1364 Ok(msg) => ws_msgs.push(msg),
1365 Err(err) => {
1366 ws_err = Some(err);
1367 break;
1368 }
1369 }
1370 if ws_msgs.len() >= WS_MSG_BUDGET {
1371 budget_hit = true;
1372 break;
1373 }
1374 }
1375 }
1376
1377 for boxed_msg in ws_msgs {
1378 match *boxed_msg {
1379 Message::Response(resp) => {
1380 pin.on_response(resp);
1381 if pin.closing {
1382 return Poll::Ready(None);
1383 }
1384 }
1385 Message::Event(ev) => {
1386 pin.on_event(ev);
1387 }
1388 }
1389 done = false;
1390 }
1391
1392 if let Some(err) = ws_err {
1393 tracing::error!("WS Connection error: {:?}", err);
1394 if let CdpError::Ws(ref ws_error) = err {
1395 match ws_error {
1396 Error::AlreadyClosed => {
1397 pin.closing = true;
1398 dispose = true;
1399 }
1400 Error::Protocol(detail)
1401 if detail == &ProtocolError::ResetWithoutClosingHandshake =>
1402 {
1403 pin.closing = true;
1404 dispose = true;
1405 }
1406 _ => return Poll::Ready(Some(Err(err))),
1407 }
1408 } else {
1409 return Poll::Ready(Some(Err(err)));
1410 }
1411 }
1412
1413 if pin.evict_command_timeout.poll_ready(cx) {
1414 pin.evict_timed_out_commands(now);
1416 for t in pin.targets.values_mut() {
1419 t.network_manager.evict_stale_entries(now);
1420 t.frame_manager_mut().evict_stale_context_ids();
1421 }
1422 }
1423
1424 if pin.budget_exhausted {
1425 for t in pin.targets.values_mut() {
1426 t.network_manager.set_block_all(true);
1427 }
1428 }
1429
1430 if dispose {
1431 return Poll::Ready(None);
1432 }
1433
1434 if budget_hit {
1435 cx.waker().wake_by_ref();
1439 return Poll::Pending;
1440 }
1441
1442 if done {
1443 return Poll::Pending;
1445 }
1446 }
1447 }
1448}
1449
1450#[derive(Debug, Clone)]
1452pub struct HandlerConfig {
1453 pub ignore_https_errors: bool,
1455 pub viewport: Option<Viewport>,
1457 pub context_ids: Vec<BrowserContextId>,
1459 pub request_timeout: Duration,
1461 pub request_intercept: bool,
1463 pub cache_enabled: bool,
1465 pub service_worker_enabled: bool,
1467 pub ignore_visuals: bool,
1469 pub ignore_stylesheets: bool,
1471 pub ignore_javascript: bool,
1473 pub allow_first_party_stylesheets: bool,
1478 pub allow_first_party_javascript: bool,
1482 pub allow_first_party_visuals: bool,
1486 pub ignore_analytics: bool,
1488 pub ignore_prefetch: bool,
1490 pub ignore_ads: bool,
1492 pub extra_headers: Option<std::collections::HashMap<String, String>>,
1494 pub only_html: bool,
1496 pub created_first_target: bool,
1498 pub intercept_manager: NetworkInterceptManager,
1500 pub max_bytes_allowed: Option<u64>,
1502 pub max_redirects: Option<usize>,
1509 pub max_main_frame_navigations: Option<u32>,
1513 pub whitelist_patterns: Option<Vec<String>>,
1515 pub blacklist_patterns: Option<Vec<String>>,
1517 pub remote_local_policy: bool,
1522 #[cfg(feature = "adblock")]
1524 pub adblock_filter_rules: Option<Vec<String>>,
1525 pub channel_capacity: usize,
1528 pub page_channel_capacity: usize,
1539 pub connection_retries: u32,
1542}
1543
1544impl Default for HandlerConfig {
1545 fn default() -> Self {
1546 Self {
1547 ignore_https_errors: true,
1548 viewport: Default::default(),
1549 context_ids: Vec::new(),
1550 request_timeout: Duration::from_millis(REQUEST_TIMEOUT),
1551 request_intercept: false,
1552 cache_enabled: true,
1553 service_worker_enabled: true,
1554 ignore_visuals: false,
1555 ignore_stylesheets: false,
1556 ignore_ads: false,
1557 ignore_javascript: false,
1558 allow_first_party_stylesheets: true,
1559 allow_first_party_javascript: true,
1560 allow_first_party_visuals: true,
1561 ignore_analytics: true,
1562 ignore_prefetch: true,
1563 only_html: false,
1564 extra_headers: Default::default(),
1565 created_first_target: false,
1566 intercept_manager: NetworkInterceptManager::Unknown,
1567 max_bytes_allowed: None,
1568 max_redirects: None,
1569 max_main_frame_navigations: None,
1570 whitelist_patterns: None,
1571 blacklist_patterns: None,
1572 remote_local_policy: false,
1573 #[cfg(feature = "adblock")]
1574 adblock_filter_rules: None,
1575 channel_capacity: 4096,
1576 page_channel_capacity: crate::handler::page::DEFAULT_PAGE_CHANNEL_CAPACITY,
1577 connection_retries: crate::conn::DEFAULT_CONNECTION_RETRIES,
1578 }
1579 }
1580}
1581
1582#[derive(Debug)]
1584pub struct NavigationInProgress<T> {
1585 navigated: bool,
1587 response: Option<Response>,
1589 tx: OneshotSender<T>,
1591}
1592
1593impl<T> NavigationInProgress<T> {
1594 pub(crate) fn new(tx: OneshotSender<T>) -> Self {
1595 Self {
1596 navigated: false,
1597 response: None,
1598 tx,
1599 }
1600 }
1601
1602 pub(crate) fn set_response(&mut self, resp: Response) {
1604 self.response = Some(resp);
1605 }
1606
1607 pub(crate) fn set_navigated(&mut self) {
1609 self.navigated = true;
1610 }
1611
1612 #[cfg_attr(not(feature = "parallel-handler"), allow(dead_code))]
1616 pub(crate) fn is_navigated(&self) -> bool {
1617 self.navigated
1618 }
1619
1620 #[cfg_attr(not(feature = "parallel-handler"), allow(dead_code))]
1621 pub(crate) fn take_response(&mut self) -> Option<Response> {
1622 self.response.take()
1623 }
1624
1625 #[cfg_attr(not(feature = "parallel-handler"), allow(dead_code))]
1626 pub(crate) fn into_tx(self) -> OneshotSender<T> {
1627 self.tx
1628 }
1629}
1630
1631#[derive(Debug)]
1633enum NavigationRequest {
1634 Navigate(NavigationInProgress<Result<Response>>),
1636 }
1638
1639#[derive(Debug)]
1642enum PendingRequest {
1643 CreateTarget(OneshotSender<Result<Page>>),
1646 GetTargets(OneshotSender<Result<Vec<TargetInfo>>>),
1648 Navigate(NavigationId),
1655 ExternalCommand {
1664 tx: OneshotSender<Result<Response>>,
1665 target_id: Option<TargetId>,
1666 },
1667 InternalCommand(TargetId),
1670 CloseBrowser(OneshotSender<Result<CloseReturns>>),
1672}
1673
1674#[derive(Debug)]
1678pub(crate) enum HandlerMessage {
1679 CreatePage(CreateTargetParams, OneshotSender<Result<Page>>),
1680 FetchTargets(OneshotSender<Result<Vec<TargetInfo>>>),
1681 InsertContext(BrowserContext),
1682 DisposeContext(BrowserContext),
1683 GetPages(OneshotSender<Vec<Page>>),
1684 Command(CommandMessage),
1685 GetPage(TargetId, OneshotSender<Option<Page>>),
1686 AddEventListener(EventListenerRequest),
1687 CloseBrowser(OneshotSender<Result<CloseReturns>>),
1688}
1689
1690#[cfg(test)]
1691mod tests {
1692 use super::*;
1693 use chromiumoxide_cdp::cdp::browser_protocol::target::{AttachToTargetReturns, TargetInfo};
1694
1695 #[test]
1696 fn attach_to_target_response_sets_session_id_before_event_arrives() {
1697 let info = TargetInfo::builder()
1698 .target_id("target-1".to_string())
1699 .r#type("page")
1700 .title("")
1701 .url("about:blank")
1702 .attached(false)
1703 .can_access_opener(false)
1704 .build()
1705 .expect("target info");
1706 let mut target = Target::new(info, TargetConfig::default(), BrowserContext::default());
1707 let method: MethodId = AttachToTargetParams::IDENTIFIER.into();
1708 let result = serde_json::to_value(AttachToTargetReturns::new("session-1".to_string()))
1709 .expect("attach result");
1710 let resp = Response {
1711 id: CallId::new(1),
1712 result: Some(result),
1713 error: None,
1714 };
1715
1716 maybe_store_attach_session_id(&mut target, &method, &resp);
1717
1718 assert_eq!(
1719 target.session_id().map(AsRef::as_ref),
1720 Some("session-1"),
1721 "attach response should seed the flat session id even before Target.attachedToTarget"
1722 );
1723 }
1724
1725 #[test]
1730 fn page_channel_capacity_defaults_to_2048_across_configs() {
1731 use crate::browser::BrowserConfigBuilder;
1732 use crate::handler::page::DEFAULT_PAGE_CHANNEL_CAPACITY;
1733 use crate::handler::target::TargetConfig;
1734
1735 assert_eq!(DEFAULT_PAGE_CHANNEL_CAPACITY, 2048);
1736 assert_eq!(
1737 HandlerConfig::default().page_channel_capacity,
1738 DEFAULT_PAGE_CHANNEL_CAPACITY,
1739 "HandlerConfig default must match the historical 2048 slot count"
1740 );
1741 assert_eq!(
1742 TargetConfig::default().page_channel_capacity,
1743 DEFAULT_PAGE_CHANNEL_CAPACITY,
1744 "TargetConfig default must match the historical 2048 slot count"
1745 );
1746 let builder = BrowserConfigBuilder::default();
1749 let bc = format!("{:?}", builder);
1750 assert!(
1751 bc.contains("page_channel_capacity: 2048"),
1752 "BrowserConfigBuilder must default page_channel_capacity to 2048, got: {bc}",
1753 );
1754 }
1755}