1use crate::listeners::{EventListenerRequest, EventListeners};
2use chromiumoxide_cdp::cdp::browser_protocol::browser::*;
3use chromiumoxide_cdp::cdp::browser_protocol::target::*;
4use chromiumoxide_cdp::cdp::events::CdpEvent;
5use chromiumoxide_cdp::cdp::events::CdpEventMessage;
6use chromiumoxide_types::{CallId, Message, Method, Response};
7use chromiumoxide_types::{MethodId, Request as CdpRequest};
8use fnv::FnvHashMap;
9use futures_util::Stream;
10use hashbrown::{HashMap, HashSet};
11use spider_network_blocker::intercept_manager::NetworkInterceptManager;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14use std::time::{Duration, Instant};
15use tokio::sync::mpsc::Receiver;
16use tokio::sync::oneshot::Sender as OneshotSender;
17use tokio_tungstenite::tungstenite::error::ProtocolError;
18use tokio_tungstenite::tungstenite::Error;
19
20use std::sync::Arc;
21use tokio::sync::Notify;
22
23use crate::cmd::{to_command_response, CommandMessage};
24use crate::conn::Connection;
25use crate::error::{CdpError, Result};
26use crate::handler::browser::BrowserContext;
27use crate::handler::frame::FrameRequestedNavigation;
28use crate::handler::frame::{NavigationError, NavigationId, NavigationOk};
29use crate::handler::job::PeriodicJob;
30use crate::handler::session::Session;
31use crate::handler::target::TargetEvent;
32use crate::handler::target::{Target, TargetConfig};
33use crate::handler::viewport::Viewport;
34use crate::page::Page;
35pub(crate) use page::PageInner;
36
37pub const REQUEST_TIMEOUT: u64 = 30_000;
39
40pub mod blockers;
41pub mod browser;
42pub mod commandfuture;
43pub mod domworld;
44pub mod emulation;
45pub mod frame;
46pub mod http;
47pub mod httpfuture;
48mod job;
49pub mod network;
50pub mod network_utils;
51pub mod page;
52pub mod sender;
53mod session;
54pub mod target;
55pub mod target_message_future;
56pub mod viewport;
57
58#[must_use = "streams do nothing unless polled"]
61#[derive(Debug)]
62pub struct Handler {
63 pub default_browser_context: BrowserContext,
64 pub browser_contexts: HashSet<BrowserContext>,
65 pending_commands: FnvHashMap<CallId, (PendingRequest, MethodId, Instant)>,
69 from_browser: Receiver<HandlerMessage>,
71 target_ids: Vec<TargetId>,
73 targets: HashMap<TargetId, Target>,
75 navigations: FnvHashMap<NavigationId, NavigationRequest>,
77 sessions: HashMap<SessionId, Session>,
81 conn: Option<Connection<CdpEventMessage>>,
84 evict_command_timeout: PeriodicJob,
86 next_navigation_id: usize,
88 config: HandlerConfig,
90 event_listeners: EventListeners,
92 closing: bool,
94 remaining_bytes: Option<u64>,
96 budget_exhausted: bool,
98 attached_targets: HashSet<TargetId>,
100 page_wake: Option<Arc<Notify>>,
103}
104
105lazy_static::lazy_static! {
106 static ref DISCOVER_ID: (std::borrow::Cow<'static, str>, serde_json::Value) = {
108 let discover = SetDiscoverTargetsParams::new(true);
109 (discover.identifier(), serde_json::to_value(discover).expect("valid discover target params"))
110 };
111 static ref TARGET_PARAMS_ID: (std::borrow::Cow<'static, str>, serde_json::Value) = {
113 let msg = GetTargetsParams { filter: None };
114 (msg.identifier(), serde_json::to_value(msg).expect("valid paramtarget"))
115 };
116 static ref CLOSE_PARAMS_ID: (std::borrow::Cow<'static, str>, serde_json::Value) = {
118 let close_msg = CloseParams::default();
119 (close_msg.identifier(), serde_json::to_value(close_msg).expect("valid close params"))
120 };
121}
122
123fn maybe_store_attach_session_id(target: &mut Target, method: &MethodId, resp: &Response) {
124 if method.as_ref() != AttachToTargetParams::IDENTIFIER {
125 return;
126 }
127
128 if let Ok(resp) = to_command_response::<AttachToTargetParams>(resp.clone(), method.clone()) {
129 target.set_session_id(resp.result.session_id);
130 }
131}
132
133impl Handler {
134 pub(crate) fn new(
137 mut conn: Connection<CdpEventMessage>,
138 rx: Receiver<HandlerMessage>,
139 config: HandlerConfig,
140 ) -> Self {
141 let discover = DISCOVER_ID.clone();
142 let _ = conn.submit_command(discover.0, None, discover.1);
143 let conn = Some(conn);
144
145 let browser_contexts = config
146 .context_ids
147 .iter()
148 .map(|id| BrowserContext::from(id.clone()))
149 .collect();
150
151 Self {
152 pending_commands: Default::default(),
153 from_browser: rx,
154 default_browser_context: Default::default(),
155 browser_contexts,
156 target_ids: Default::default(),
157 targets: Default::default(),
158 navigations: Default::default(),
159 sessions: Default::default(),
160 conn,
161 evict_command_timeout: PeriodicJob::new(config.request_timeout),
162 next_navigation_id: 0,
163 config,
164 event_listeners: Default::default(),
165 closing: false,
166 remaining_bytes: None,
167 budget_exhausted: false,
168 attached_targets: Default::default(),
169 page_wake: None,
170 }
171 }
172
173 #[inline]
176 fn conn(&mut self) -> Result<&mut Connection<CdpEventMessage>> {
177 self.conn
178 .as_mut()
179 .ok_or_else(|| CdpError::msg("connection consumed by Handler::run()"))
180 }
181
182 pub fn get_target(&self, target_id: &TargetId) -> Option<&Target> {
184 self.targets.get(target_id)
185 }
186
187 pub fn targets(&self) -> impl Iterator<Item = &Target> + '_ {
189 self.targets.values()
190 }
191
192 pub fn default_browser_context(&self) -> &BrowserContext {
194 &self.default_browser_context
195 }
196
197 pub fn browser_contexts(&self) -> impl Iterator<Item = &BrowserContext> + '_ {
199 self.browser_contexts.iter()
200 }
201
202 fn on_navigation_response(&mut self, id: NavigationId, resp: Response) {
204 if let Some(nav) = self.navigations.remove(&id) {
205 match nav {
206 NavigationRequest::Navigate(mut nav) => {
207 if nav.navigated {
208 let _ = nav.tx.send(Ok(resp));
209 } else {
210 nav.set_response(resp);
211 self.navigations
212 .insert(id, NavigationRequest::Navigate(nav));
213 }
214 }
215 }
216 }
217 }
218
219 fn on_navigation_lifecycle_completed(&mut self, res: Result<NavigationOk, NavigationError>) {
221 match res {
222 Ok(ok) => {
223 let id = *ok.navigation_id();
224 if let Some(nav) = self.navigations.remove(&id) {
225 match nav {
226 NavigationRequest::Navigate(mut nav) => {
227 if let Some(resp) = nav.response.take() {
228 let _ = nav.tx.send(Ok(resp));
229 } else {
230 nav.set_navigated();
231 self.navigations
232 .insert(id, NavigationRequest::Navigate(nav));
233 }
234 }
235 }
236 }
237 }
238 Err(err) => {
239 if let Some(nav) = self.navigations.remove(err.navigation_id()) {
240 match nav {
241 NavigationRequest::Navigate(nav) => {
242 let _ = nav.tx.send(Err(err.into()));
243 }
244 }
245 }
246 }
247 }
248 }
249
250 fn on_response(&mut self, resp: Response) {
252 if let Some((req, method, _)) = self.pending_commands.remove(&resp.id) {
253 match req {
254 PendingRequest::CreateTarget(tx) => {
255 match to_command_response::<CreateTargetParams>(resp, method) {
256 Ok(resp) => {
257 if let Some(target) = self.targets.get_mut(&resp.target_id) {
258 target.set_initiator(tx);
259 } else {
260 let _ = tx.send(Err(CdpError::NotFound)).ok();
261 }
262 }
263 Err(err) => {
264 let _ = tx.send(Err(err)).ok();
265 }
266 }
267 }
268 PendingRequest::GetTargets(tx) => {
269 match to_command_response::<GetTargetsParams>(resp, method) {
270 Ok(resp) => {
271 let targets = resp.result.target_infos;
272 let results = targets.clone();
273
274 for target_info in targets {
275 let event: EventTargetCreated = EventTargetCreated { target_info };
276 self.on_target_created(event);
277 }
278
279 let _ = tx.send(Ok(results)).ok();
280 }
281 Err(err) => {
282 let _ = tx.send(Err(err)).ok();
283 }
284 }
285 }
286 PendingRequest::Navigate(id) => {
287 self.on_navigation_response(id, resp);
288 if self.config.only_html && !self.config.created_first_target {
289 self.config.created_first_target = true;
290 }
291 }
292 PendingRequest::ExternalCommand { tx, .. } => {
293 let _ = tx.send(Ok(resp)).ok();
294 }
295 PendingRequest::InternalCommand(target_id) => {
296 if let Some(target) = self.targets.get_mut(&target_id) {
297 maybe_store_attach_session_id(target, &method, &resp);
298 target.on_response(resp, method.as_ref());
299 }
300 }
301 PendingRequest::CloseBrowser(tx) => {
302 self.closing = true;
303 let _ = tx.send(Ok(CloseReturns {})).ok();
304 }
305 }
306 }
307 }
308
309 pub(crate) fn submit_external_command(
311 &mut self,
312 msg: CommandMessage,
313 now: Instant,
314 ) -> Result<()> {
315 let target_id = msg
320 .session_id
321 .as_ref()
322 .and_then(|sid| self.sessions.get(sid.as_ref()))
323 .map(|s| s.target_id().clone());
324 let call_id =
325 self.conn()?
326 .submit_command(msg.method.clone(), msg.session_id, msg.params)?;
327 self.pending_commands.insert(
328 call_id,
329 (
330 PendingRequest::ExternalCommand {
331 tx: msg.sender,
332 target_id,
333 },
334 msg.method,
335 now,
336 ),
337 );
338 Ok(())
339 }
340
341 pub(crate) fn submit_internal_command(
342 &mut self,
343 target_id: TargetId,
344 req: CdpRequest,
345 now: Instant,
346 ) -> Result<()> {
347 let call_id = self.conn()?.submit_command(
348 req.method.clone(),
349 req.session_id.map(Into::into),
350 req.params,
351 )?;
352 self.pending_commands.insert(
353 call_id,
354 (PendingRequest::InternalCommand(target_id), req.method, now),
355 );
356 Ok(())
357 }
358
359 fn submit_fetch_targets(&mut self, tx: OneshotSender<Result<Vec<TargetInfo>>>, now: Instant) {
360 let msg = TARGET_PARAMS_ID.clone();
361
362 if let Some(conn) = self.conn.as_mut() {
363 if let Ok(call_id) = conn.submit_command(msg.0.clone(), None, msg.1) {
364 self.pending_commands
365 .insert(call_id, (PendingRequest::GetTargets(tx), msg.0, now));
366 }
367 }
368 }
369
370 fn submit_navigation(&mut self, id: NavigationId, req: CdpRequest, now: Instant) {
373 if let Some(conn) = self.conn.as_mut() {
374 if let Ok(call_id) = conn.submit_command(
375 req.method.clone(),
376 req.session_id.map(Into::into),
377 req.params,
378 ) {
379 self.pending_commands
380 .insert(call_id, (PendingRequest::Navigate(id), req.method, now));
381 }
382 }
383 }
384
385 fn submit_close(&mut self, tx: OneshotSender<Result<CloseReturns>>, now: Instant) {
386 let close_msg = CLOSE_PARAMS_ID.clone();
387
388 if let Some(conn) = self.conn.as_mut() {
389 if let Ok(call_id) = conn.submit_command(close_msg.0.clone(), None, close_msg.1) {
390 self.pending_commands.insert(
391 call_id,
392 (PendingRequest::CloseBrowser(tx), close_msg.0, now),
393 );
394 }
395 }
396 }
397
398 fn on_target_message(&mut self, target: &mut Target, msg: CommandMessage, now: Instant) {
400 if msg.is_navigation() {
401 let (req, tx) = msg.split();
402 let id = self.next_navigation_id();
403
404 target.goto(FrameRequestedNavigation::new(
405 id,
406 req,
407 self.config.request_timeout,
408 ));
409
410 self.navigations.insert(
411 id,
412 NavigationRequest::Navigate(NavigationInProgress::new(tx)),
413 );
414 } else {
415 let _ = self.submit_external_command(msg, now);
416 }
417 }
418
419 fn next_navigation_id(&mut self) -> NavigationId {
421 let id = NavigationId(self.next_navigation_id);
422 self.next_navigation_id = self.next_navigation_id.wrapping_add(1);
423 id
424 }
425
426 fn create_page(&mut self, params: CreateTargetParams, tx: OneshotSender<Result<Page>>) {
437 let about_blank = params.url == "about:blank";
438 let http_check =
439 !about_blank && params.url.starts_with("http") || params.url.starts_with("file://");
440
441 if about_blank || http_check {
442 let method = params.identifier();
443
444 let Some(conn) = self.conn.as_mut() else {
445 let _ = tx.send(Err(CdpError::msg("connection consumed"))).ok();
446 return;
447 };
448 match serde_json::to_value(params) {
449 Ok(params) => match conn.submit_command(method.clone(), None, params) {
450 Ok(call_id) => {
451 self.pending_commands.insert(
452 call_id,
453 (PendingRequest::CreateTarget(tx), method, Instant::now()),
454 );
455 }
456 Err(err) => {
457 let _ = tx.send(Err(err.into())).ok();
458 }
459 },
460 Err(err) => {
461 let _ = tx.send(Err(err.into())).ok();
462 }
463 }
464 } else {
465 let _ = tx.send(Err(CdpError::NotFound)).ok();
466 }
467 }
468
469 fn on_event(&mut self, event: CdpEventMessage) {
471 if let Some(session_id) = &event.session_id {
472 if let Some(session) = self.sessions.get(session_id.as_str()) {
473 if let Some(target) = self.targets.get_mut(session.target_id()) {
474 return target.on_event(event);
475 }
476 }
477 }
478 let CdpEventMessage { params, method, .. } = event;
479
480 match params {
481 CdpEvent::TargetTargetCreated(ref ev) => self.on_target_created((**ev).clone()),
482 CdpEvent::TargetAttachedToTarget(ref ev) => self.on_attached_to_target(ev.clone()),
483 CdpEvent::TargetTargetDestroyed(ref ev) => self.on_target_destroyed(ev.clone()),
484 CdpEvent::TargetTargetCrashed(ref ev) => self.on_target_crashed(ev.clone()),
485 CdpEvent::TargetDetachedFromTarget(ref ev) => self.on_detached_from_target(ev.clone()),
486 _ => {}
487 }
488
489 chromiumoxide_cdp::consume_event!(match params {
490 |ev| self.event_listeners.start_send(ev),
491 |json| { let _ = self.event_listeners.try_send_custom(&method, json);}
492 });
493 }
494
495 fn on_target_created(&mut self, event: EventTargetCreated) {
499 if !self.browser_contexts.is_empty() {
500 if let Some(ref context_id) = event.target_info.browser_context_id {
501 let bc = BrowserContext {
502 id: Some(context_id.clone()),
503 };
504 if !self.browser_contexts.contains(&bc) {
505 return;
506 }
507 }
508 }
509 let browser_ctx = event
510 .target_info
511 .browser_context_id
512 .clone()
513 .map(BrowserContext::from)
514 .unwrap_or_else(|| self.default_browser_context.clone());
515 let target = Target::new(
516 event.target_info,
517 TargetConfig {
518 ignore_https_errors: self.config.ignore_https_errors,
519 request_timeout: self.config.request_timeout,
520 viewport: self.config.viewport.clone(),
521 request_intercept: self.config.request_intercept,
522 cache_enabled: self.config.cache_enabled,
523 service_worker_enabled: self.config.service_worker_enabled,
524 ignore_visuals: self.config.ignore_visuals,
525 ignore_stylesheets: self.config.ignore_stylesheets,
526 ignore_javascript: self.config.ignore_javascript,
527 ignore_analytics: self.config.ignore_analytics,
528 ignore_prefetch: self.config.ignore_prefetch,
529 extra_headers: self.config.extra_headers.clone(),
530 only_html: self.config.only_html && self.config.created_first_target,
531 intercept_manager: self.config.intercept_manager,
532 max_bytes_allowed: self.config.max_bytes_allowed,
533 max_redirects: self.config.max_redirects,
534 max_main_frame_navigations: self.config.max_main_frame_navigations,
535 whitelist_patterns: self.config.whitelist_patterns.clone(),
536 blacklist_patterns: self.config.blacklist_patterns.clone(),
537 #[cfg(feature = "adblock")]
538 adblock_filter_rules: self.config.adblock_filter_rules.clone(),
539 page_wake: self.page_wake.clone(),
540 page_channel_capacity: self.config.page_channel_capacity,
541 },
542 browser_ctx,
543 );
544
545 let tid = target.target_id().clone();
546 self.target_ids.push(tid.clone());
547 self.targets.insert(tid, target);
548 }
549
550 fn on_attached_to_target(&mut self, event: Box<EventAttachedToTarget>) {
552 let session = Session::new(event.session_id.clone(), event.target_info.target_id);
553 if let Some(target) = self.targets.get_mut(session.target_id()) {
554 target.set_session_id(session.session_id().clone())
555 }
556 self.sessions.insert(event.session_id, session);
557 }
558
559 fn on_detached_from_target(&mut self, event: EventDetachedFromTarget) {
563 if let Some(session) = self.sessions.remove(&event.session_id) {
565 if let Some(target) = self.targets.get_mut(session.target_id()) {
566 target.session_id_mut().take();
567 }
568 }
569 }
570
571 fn on_target_destroyed(&mut self, event: EventTargetDestroyed) {
573 self.attached_targets.remove(&event.target_id);
574
575 if let Some(target) = self.targets.remove(&event.target_id) {
576 if let Some(session) = target.session_id() {
578 self.sessions.remove(session);
579 }
580 }
581 }
582
583 fn on_target_crashed(&mut self, event: EventTargetCrashed) {
606 let crashed_id = event.target_id.clone();
607 let status = event.status.clone();
608 let error_code = event.error_code;
609
610 let to_cancel: Vec<CallId> = self
616 .pending_commands
617 .iter()
618 .filter_map(|(&call_id, (req, _, _))| match req {
619 PendingRequest::ExternalCommand {
620 target_id: Some(tid),
621 ..
622 } if *tid == crashed_id => Some(call_id),
623 PendingRequest::InternalCommand(tid) if *tid == crashed_id => Some(call_id),
624 _ => None,
625 })
626 .collect();
627
628 for call_id in to_cancel {
629 if let Some((req, _, _)) = self.pending_commands.remove(&call_id) {
630 match req {
631 PendingRequest::ExternalCommand { tx, .. } => {
632 let _ = tx.send(Err(CdpError::msg(format!(
633 "target {:?} crashed: {} (errorCode={})",
634 crashed_id, status, error_code
635 ))));
636 }
637 PendingRequest::InternalCommand(_) => {
638 }
641 _ => {}
642 }
643 }
644 }
645
646 self.attached_targets.remove(&crashed_id);
648 if let Some(target) = self.targets.remove(&crashed_id) {
649 if let Some(session) = target.session_id() {
650 self.sessions.remove(session);
651 }
652 }
653 }
654
655 fn evict_timed_out_commands(&mut self, now: Instant) {
660 let deadline = match now.checked_sub(self.config.request_timeout) {
661 Some(d) => d,
662 None => return,
663 };
664
665 let timed_out: Vec<_> = self
666 .pending_commands
667 .iter()
668 .filter(|(_, (_, _, timestamp))| *timestamp < deadline)
669 .map(|(k, _)| *k)
670 .collect();
671
672 for call in timed_out {
673 if let Some((req, _, _)) = self.pending_commands.remove(&call) {
674 match req {
675 PendingRequest::CreateTarget(tx) => {
676 let _ = tx.send(Err(CdpError::Timeout));
677 }
678 PendingRequest::GetTargets(tx) => {
679 let _ = tx.send(Err(CdpError::Timeout));
680 }
681 PendingRequest::Navigate(nav) => {
682 if let Some(nav) = self.navigations.remove(&nav) {
683 match nav {
684 NavigationRequest::Navigate(nav) => {
685 let _ = nav.tx.send(Err(CdpError::Timeout));
686 }
687 }
688 }
689 }
690 PendingRequest::ExternalCommand { tx, .. } => {
691 let _ = tx.send(Err(CdpError::Timeout));
692 }
693 PendingRequest::InternalCommand(_) => {}
694 PendingRequest::CloseBrowser(tx) => {
695 let _ = tx.send(Err(CdpError::Timeout));
696 }
697 }
698 }
699 }
700 }
701
702 pub fn event_listeners_mut(&mut self) -> &mut EventListeners {
703 &mut self.event_listeners
704 }
705
706 pub async fn run(mut self) -> Result<()> {
736 use chromiumoxide_types::Message;
737 use tokio::time::MissedTickBehavior;
738 use tokio_tungstenite::tungstenite::{self, error::ProtocolError};
739
740 let page_wake = Arc::new(Notify::new());
742 self.page_wake = Some(page_wake.clone());
743
744 let conn = self
746 .conn
747 .take()
748 .ok_or_else(|| CdpError::msg("Handler::run() called with no connection"))?;
749 let async_conn = conn.into_async();
750 let mut ws_reader = async_conn.reader;
751 let ws_tx = async_conn.cmd_tx;
752 let mut writer_handle = async_conn.writer_handle;
753 let mut next_call_id = async_conn.next_id;
754
755 let mut alloc_call_id = || {
757 let id = chromiumoxide_types::CallId::new(next_call_id);
758 next_call_id = next_call_id.wrapping_add(1);
759 id
760 };
761
762 let mut evict_timer = tokio::time::interval_at(
764 tokio::time::Instant::now() + self.config.request_timeout,
765 self.config.request_timeout,
766 );
767 evict_timer.set_missed_tick_behavior(MissedTickBehavior::Delay);
768
769 macro_rules! ws_submit {
771 ($method:expr, $session_id:expr, $params:expr) => {{
772 let id = alloc_call_id();
773 let call = chromiumoxide_types::MethodCall {
774 id,
775 method: $method,
776 session_id: $session_id,
777 params: $params,
778 };
779 match ws_tx.try_send(call) {
780 Ok(()) => Ok::<_, CdpError>(id),
781 Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
782 tracing::warn!("WS command channel full — dropping command");
783 Err(CdpError::msg("WS command channel full"))
784 }
785 Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
786 Err(CdpError::msg("WS writer closed"))
787 }
788 }
789 }};
790 }
791
792 loop {
794 let now = std::time::Instant::now();
795
796 const PER_TARGET_DRAIN_BUDGET: usize = 128;
802
803 for n in (0..self.target_ids.len()).rev() {
804 let target_id = self.target_ids.swap_remove(n);
805
806 if let Some((id, mut target)) = self.targets.remove_entry(&target_id) {
807 {
809 let mut msgs = Vec::new();
810 if let Some(handle) = target.page_mut() {
811 while msgs.len() < PER_TARGET_DRAIN_BUDGET {
812 match handle.rx.try_recv() {
813 Ok(msg) => msgs.push(msg),
814 Err(_) => break,
815 }
816 }
817 }
818 for msg in msgs {
819 target.on_page_message(msg);
820 }
821 }
822
823 while let Some(event) = target.advance(now) {
825 match event {
826 TargetEvent::Request(req) => {
827 if let Ok(call_id) =
828 ws_submit!(req.method.clone(), req.session_id, req.params)
829 {
830 self.pending_commands.insert(
831 call_id,
832 (
833 PendingRequest::InternalCommand(
834 target.target_id().clone(),
835 ),
836 req.method,
837 now,
838 ),
839 );
840 }
841 }
842 TargetEvent::Command(msg) => {
843 if msg.is_navigation() {
844 let (req, tx) = msg.split();
845 let nav_id = self.next_navigation_id();
846 target.goto(FrameRequestedNavigation::new(
847 nav_id,
848 req.clone(),
849 self.config.request_timeout,
850 ));
851 if let Ok(call_id) =
852 ws_submit!(req.method.clone(), req.session_id, req.params)
853 {
854 self.pending_commands.insert(
855 call_id,
856 (PendingRequest::Navigate(nav_id), req.method, now),
857 );
858 }
859 self.navigations.insert(
860 nav_id,
861 NavigationRequest::Navigate(NavigationInProgress::new(tx)),
862 );
863 } else if let Ok(call_id) = ws_submit!(
864 msg.method.clone(),
865 msg.session_id.map(Into::into),
866 msg.params
867 ) {
868 let target_id = Some(target.target_id().clone());
872 self.pending_commands.insert(
873 call_id,
874 (
875 PendingRequest::ExternalCommand {
876 tx: msg.sender,
877 target_id,
878 },
879 msg.method,
880 now,
881 ),
882 );
883 }
884 }
885 TargetEvent::NavigationRequest(nav_id, req) => {
886 if let Ok(call_id) =
887 ws_submit!(req.method.clone(), req.session_id, req.params)
888 {
889 self.pending_commands.insert(
890 call_id,
891 (PendingRequest::Navigate(nav_id), req.method, now),
892 );
893 }
894 }
895 TargetEvent::NavigationResult(res) => {
896 self.on_navigation_lifecycle_completed(res);
897 }
898 TargetEvent::BytesConsumed(n) => {
899 if let Some(rem) = self.remaining_bytes.as_mut() {
900 *rem = rem.saturating_sub(n);
901 if *rem == 0 {
902 self.budget_exhausted = true;
903 }
904 }
905 }
906 }
907 }
908
909 target.event_listeners_mut().flush();
911
912 self.targets.insert(id, target);
913 self.target_ids.push(target_id);
914 }
915 }
916
917 self.event_listeners.flush();
919
920 if self.budget_exhausted {
921 for t in self.targets.values_mut() {
922 t.network_manager.set_block_all(true);
923 }
924 }
925
926 if self.closing {
927 break;
928 }
929
930 tokio::select! {
932 msg = self.from_browser.recv() => {
933 match msg {
934 Some(msg) => {
935 match msg {
936 HandlerMessage::Command(cmd) => {
937 let target_id = cmd
940 .session_id
941 .as_ref()
942 .and_then(|sid| self.sessions.get(sid.as_ref()))
943 .map(|s| s.target_id().clone());
944 if let Ok(call_id) = ws_submit!(
945 cmd.method.clone(),
946 cmd.session_id.map(Into::into),
947 cmd.params
948 ) {
949 self.pending_commands.insert(
950 call_id,
951 (
952 PendingRequest::ExternalCommand {
953 tx: cmd.sender,
954 target_id,
955 },
956 cmd.method,
957 now,
958 ),
959 );
960 }
961 }
962 HandlerMessage::FetchTargets(tx) => {
963 let msg = TARGET_PARAMS_ID.clone();
964 if let Ok(call_id) = ws_submit!(msg.0.clone(), None, msg.1) {
965 self.pending_commands.insert(
966 call_id,
967 (PendingRequest::GetTargets(tx), msg.0, now),
968 );
969 }
970 }
971 HandlerMessage::CloseBrowser(tx) => {
972 let close_msg = CLOSE_PARAMS_ID.clone();
973 if let Ok(call_id) = ws_submit!(close_msg.0.clone(), None, close_msg.1) {
974 self.pending_commands.insert(
975 call_id,
976 (PendingRequest::CloseBrowser(tx), close_msg.0, now),
977 );
978 }
979 }
980 HandlerMessage::CreatePage(params, tx) => {
981 if let Some(ref id) = params.browser_context_id {
982 self.browser_contexts.insert(BrowserContext::from(id.clone()));
983 }
984 self.create_page_async(params, tx, &mut alloc_call_id, &ws_tx, now);
985 }
986 HandlerMessage::GetPages(tx) => {
987 let pages: Vec<_> = self.targets.values_mut()
988 .filter(|p| p.is_page())
989 .filter_map(|target| target.get_or_create_page())
990 .map(|page| Page::from(page.clone()))
991 .collect();
992 let _ = tx.send(pages);
993 }
994 HandlerMessage::InsertContext(ctx) => {
995 if self.default_browser_context.id().is_none() {
996 self.default_browser_context = ctx.clone();
997 }
998 self.browser_contexts.insert(ctx);
999 }
1000 HandlerMessage::DisposeContext(ctx) => {
1001 self.browser_contexts.remove(&ctx);
1002 self.attached_targets.retain(|tid| {
1003 self.targets.get(tid)
1004 .and_then(|t| t.browser_context_id())
1005 .map(|id| Some(id) != ctx.id())
1006 .unwrap_or(true)
1007 });
1008 self.closing = true;
1009 }
1010 HandlerMessage::GetPage(target_id, tx) => {
1011 let page = self.targets.get_mut(&target_id)
1012 .and_then(|target| target.get_or_create_page())
1013 .map(|page| Page::from(page.clone()));
1014 let _ = tx.send(page);
1015 }
1016 HandlerMessage::AddEventListener(req) => {
1017 self.event_listeners.add_listener(req);
1018 }
1019 }
1020 }
1021 None => break, }
1023 }
1024
1025 frame = ws_reader.next_message() => {
1026 match frame {
1027 Some(Ok(boxed_msg)) => match *boxed_msg {
1028 Message::Response(resp) => {
1029 self.on_response(resp);
1030 }
1031 Message::Event(ev) => {
1032 self.on_event(ev);
1033 }
1034 },
1035 Some(Err(err)) => {
1036 tracing::error!("WS Connection error: {:?}", err);
1037 if let CdpError::Ws(ref ws_error) = err {
1038 match ws_error {
1039 tungstenite::Error::AlreadyClosed => break,
1040 tungstenite::Error::Protocol(detail)
1041 if detail == &ProtocolError::ResetWithoutClosingHandshake =>
1042 {
1043 break;
1044 }
1045 _ => return Err(err),
1046 }
1047 } else {
1048 return Err(err);
1049 }
1050 }
1051 None => break, }
1053 }
1054
1055 _ = page_wake.notified() => {
1056 }
1058
1059 _ = evict_timer.tick() => {
1060 self.evict_timed_out_commands(now);
1061 for t in self.targets.values_mut() {
1062 t.network_manager.evict_stale_entries(now);
1063 t.frame_manager_mut().evict_stale_context_ids();
1064 }
1065 }
1066
1067 result = &mut writer_handle => {
1068 match result {
1070 Ok(Ok(())) => break,
1071 Ok(Err(e)) => return Err(e),
1072 Err(e) => return Err(CdpError::msg(format!("WS writer panicked: {e}"))),
1073 }
1074 }
1075 }
1076 }
1077
1078 writer_handle.abort();
1079 Ok(())
1080 }
1081
1082 fn create_page_async(
1084 &mut self,
1085 params: CreateTargetParams,
1086 tx: OneshotSender<Result<Page>>,
1087 alloc_call_id: &mut impl FnMut() -> chromiumoxide_types::CallId,
1088 ws_tx: &tokio::sync::mpsc::Sender<chromiumoxide_types::MethodCall>,
1089 now: std::time::Instant,
1090 ) {
1091 let about_blank = params.url == "about:blank";
1092 let http_check =
1093 !about_blank && params.url.starts_with("http") || params.url.starts_with("file://");
1094
1095 if about_blank || http_check {
1096 let method = params.identifier();
1097 match serde_json::to_value(params) {
1098 Ok(params) => {
1099 let id = alloc_call_id();
1100 let call = chromiumoxide_types::MethodCall {
1101 id,
1102 method: method.clone(),
1103 session_id: None,
1104 params,
1105 };
1106 match ws_tx.try_send(call) {
1107 Ok(()) => {
1108 self.pending_commands
1109 .insert(id, (PendingRequest::CreateTarget(tx), method, now));
1110 }
1111 Err(_) => {
1112 let _ = tx
1113 .send(Err(CdpError::msg("WS command channel full or closed")))
1114 .ok();
1115 }
1116 }
1117 }
1118 Err(err) => {
1119 let _ = tx.send(Err(err.into())).ok();
1120 }
1121 }
1122 } else {
1123 let _ = tx.send(Err(CdpError::NotFound)).ok();
1124 }
1125 }
1126}
1127
1128impl Stream for Handler {
1129 type Item = Result<()>;
1130
1131 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1132 const BROWSER_MSG_BUDGET: usize = 128;
1138 const PER_TARGET_DRAIN_BUDGET: usize = 128;
1139 const WS_MSG_BUDGET: usize = 512;
1140
1141 let pin = self.get_mut();
1142
1143 let mut dispose = false;
1144 let mut budget_hit = false;
1145
1146 let now = Instant::now();
1147
1148 loop {
1149 let mut browser_msgs = 0usize;
1153 while let Poll::Ready(Some(msg)) = pin.from_browser.poll_recv(cx) {
1154 match msg {
1155 HandlerMessage::Command(cmd) => {
1156 pin.submit_external_command(cmd, now)?;
1157 }
1158 HandlerMessage::FetchTargets(tx) => {
1159 pin.submit_fetch_targets(tx, now);
1160 }
1161 HandlerMessage::CloseBrowser(tx) => {
1162 pin.submit_close(tx, now);
1163 }
1164 HandlerMessage::CreatePage(params, tx) => {
1165 if let Some(ref id) = params.browser_context_id {
1166 pin.browser_contexts
1167 .insert(BrowserContext::from(id.clone()));
1168 }
1169 pin.create_page(params, tx);
1170 }
1171 HandlerMessage::GetPages(tx) => {
1172 let pages: Vec<_> = pin
1173 .targets
1174 .values_mut()
1175 .filter(|p: &&mut Target| p.is_page())
1176 .filter_map(|target| target.get_or_create_page())
1177 .map(|page| Page::from(page.clone()))
1178 .collect();
1179 let _ = tx.send(pages);
1180 }
1181 HandlerMessage::InsertContext(ctx) => {
1182 if pin.default_browser_context.id().is_none() {
1183 pin.default_browser_context = ctx.clone();
1184 }
1185 pin.browser_contexts.insert(ctx);
1186 }
1187 HandlerMessage::DisposeContext(ctx) => {
1188 pin.browser_contexts.remove(&ctx);
1189 pin.attached_targets.retain(|tid| {
1190 pin.targets
1191 .get(tid)
1192 .and_then(|t| t.browser_context_id()) .map(|id| Some(id) != ctx.id())
1194 .unwrap_or(true)
1195 });
1196 pin.closing = true;
1197 dispose = true;
1198 }
1199 HandlerMessage::GetPage(target_id, tx) => {
1200 let page = pin
1201 .targets
1202 .get_mut(&target_id)
1203 .and_then(|target| target.get_or_create_page())
1204 .map(|page| Page::from(page.clone()));
1205 let _ = tx.send(page);
1206 }
1207 HandlerMessage::AddEventListener(req) => {
1208 pin.event_listeners.add_listener(req);
1209 }
1210 }
1211 browser_msgs += 1;
1212 if browser_msgs >= BROWSER_MSG_BUDGET {
1213 budget_hit = true;
1214 break;
1215 }
1216 }
1217
1218 for n in (0..pin.target_ids.len()).rev() {
1219 let target_id = pin.target_ids.swap_remove(n);
1220
1221 if let Some((id, mut target)) = pin.targets.remove_entry(&target_id) {
1222 let mut drained = 0usize;
1223 while let Some(event) = target.poll(cx, now) {
1224 match event {
1225 TargetEvent::Request(req) => {
1226 let _ = pin.submit_internal_command(
1227 target.target_id().clone(),
1228 req,
1229 now,
1230 );
1231 }
1232 TargetEvent::Command(msg) => {
1233 pin.on_target_message(&mut target, msg, now);
1234 }
1235 TargetEvent::NavigationRequest(id, req) => {
1236 pin.submit_navigation(id, req, now);
1237 }
1238 TargetEvent::NavigationResult(res) => {
1239 pin.on_navigation_lifecycle_completed(res)
1240 }
1241 TargetEvent::BytesConsumed(n) => {
1242 if let Some(rem) = pin.remaining_bytes.as_mut() {
1243 *rem = rem.saturating_sub(n);
1244 if *rem == 0 {
1245 pin.budget_exhausted = true;
1246 }
1247 }
1248 }
1249 }
1250 drained += 1;
1251 if drained >= PER_TARGET_DRAIN_BUDGET {
1252 budget_hit = true;
1253 break;
1254 }
1255 }
1256
1257 target.event_listeners_mut().poll(cx);
1259
1260 pin.targets.insert(id, target);
1261 pin.target_ids.push(target_id);
1262 }
1263 }
1264
1265 pin.event_listeners_mut().poll(cx);
1268
1269 let mut done = true;
1270
1271 let mut ws_msgs = Vec::new();
1274 let mut ws_err = None;
1275 {
1276 let Some(conn) = pin.conn.as_mut() else {
1277 return Poll::Ready(Some(Err(CdpError::msg(
1278 "connection consumed by Handler::run()",
1279 ))));
1280 };
1281 while let Poll::Ready(Some(ev)) = Pin::new(&mut *conn).poll_next(cx) {
1282 match ev {
1283 Ok(msg) => ws_msgs.push(msg),
1284 Err(err) => {
1285 ws_err = Some(err);
1286 break;
1287 }
1288 }
1289 if ws_msgs.len() >= WS_MSG_BUDGET {
1290 budget_hit = true;
1291 break;
1292 }
1293 }
1294 }
1295
1296 for boxed_msg in ws_msgs {
1297 match *boxed_msg {
1298 Message::Response(resp) => {
1299 pin.on_response(resp);
1300 if pin.closing {
1301 return Poll::Ready(None);
1302 }
1303 }
1304 Message::Event(ev) => {
1305 pin.on_event(ev);
1306 }
1307 }
1308 done = false;
1309 }
1310
1311 if let Some(err) = ws_err {
1312 tracing::error!("WS Connection error: {:?}", err);
1313 if let CdpError::Ws(ref ws_error) = err {
1314 match ws_error {
1315 Error::AlreadyClosed => {
1316 pin.closing = true;
1317 dispose = true;
1318 }
1319 Error::Protocol(detail)
1320 if detail == &ProtocolError::ResetWithoutClosingHandshake =>
1321 {
1322 pin.closing = true;
1323 dispose = true;
1324 }
1325 _ => return Poll::Ready(Some(Err(err))),
1326 }
1327 } else {
1328 return Poll::Ready(Some(Err(err)));
1329 }
1330 }
1331
1332 if pin.evict_command_timeout.poll_ready(cx) {
1333 pin.evict_timed_out_commands(now);
1335 for t in pin.targets.values_mut() {
1338 t.network_manager.evict_stale_entries(now);
1339 t.frame_manager_mut().evict_stale_context_ids();
1340 }
1341 }
1342
1343 if pin.budget_exhausted {
1344 for t in pin.targets.values_mut() {
1345 t.network_manager.set_block_all(true);
1346 }
1347 }
1348
1349 if dispose {
1350 return Poll::Ready(None);
1351 }
1352
1353 if budget_hit {
1354 cx.waker().wake_by_ref();
1358 return Poll::Pending;
1359 }
1360
1361 if done {
1362 return Poll::Pending;
1364 }
1365 }
1366 }
1367}
1368
1369#[derive(Debug, Clone)]
1371pub struct HandlerConfig {
1372 pub ignore_https_errors: bool,
1374 pub viewport: Option<Viewport>,
1376 pub context_ids: Vec<BrowserContextId>,
1378 pub request_timeout: Duration,
1380 pub request_intercept: bool,
1382 pub cache_enabled: bool,
1384 pub service_worker_enabled: bool,
1386 pub ignore_visuals: bool,
1388 pub ignore_stylesheets: bool,
1390 pub ignore_javascript: bool,
1392 pub ignore_analytics: bool,
1394 pub ignore_prefetch: bool,
1396 pub ignore_ads: bool,
1398 pub extra_headers: Option<std::collections::HashMap<String, String>>,
1400 pub only_html: bool,
1402 pub created_first_target: bool,
1404 pub intercept_manager: NetworkInterceptManager,
1406 pub max_bytes_allowed: Option<u64>,
1408 pub max_redirects: Option<usize>,
1415 pub max_main_frame_navigations: Option<u32>,
1419 pub whitelist_patterns: Option<Vec<String>>,
1421 pub blacklist_patterns: Option<Vec<String>>,
1423 #[cfg(feature = "adblock")]
1425 pub adblock_filter_rules: Option<Vec<String>>,
1426 pub channel_capacity: usize,
1429 pub page_channel_capacity: usize,
1440 pub connection_retries: u32,
1443}
1444
1445impl Default for HandlerConfig {
1446 fn default() -> Self {
1447 Self {
1448 ignore_https_errors: true,
1449 viewport: Default::default(),
1450 context_ids: Vec::new(),
1451 request_timeout: Duration::from_millis(REQUEST_TIMEOUT),
1452 request_intercept: false,
1453 cache_enabled: true,
1454 service_worker_enabled: true,
1455 ignore_visuals: false,
1456 ignore_stylesheets: false,
1457 ignore_ads: false,
1458 ignore_javascript: false,
1459 ignore_analytics: true,
1460 ignore_prefetch: true,
1461 only_html: false,
1462 extra_headers: Default::default(),
1463 created_first_target: false,
1464 intercept_manager: NetworkInterceptManager::Unknown,
1465 max_bytes_allowed: None,
1466 max_redirects: None,
1467 max_main_frame_navigations: None,
1468 whitelist_patterns: None,
1469 blacklist_patterns: None,
1470 #[cfg(feature = "adblock")]
1471 adblock_filter_rules: None,
1472 channel_capacity: 4096,
1473 page_channel_capacity: crate::handler::page::DEFAULT_PAGE_CHANNEL_CAPACITY,
1474 connection_retries: crate::conn::DEFAULT_CONNECTION_RETRIES,
1475 }
1476 }
1477}
1478
1479#[derive(Debug)]
1481pub struct NavigationInProgress<T> {
1482 navigated: bool,
1484 response: Option<Response>,
1486 tx: OneshotSender<T>,
1488}
1489
1490impl<T> NavigationInProgress<T> {
1491 fn new(tx: OneshotSender<T>) -> Self {
1492 Self {
1493 navigated: false,
1494 response: None,
1495 tx,
1496 }
1497 }
1498
1499 fn set_response(&mut self, resp: Response) {
1501 self.response = Some(resp);
1502 }
1503
1504 fn set_navigated(&mut self) {
1506 self.navigated = true;
1507 }
1508}
1509
1510#[derive(Debug)]
1512enum NavigationRequest {
1513 Navigate(NavigationInProgress<Result<Response>>),
1515 }
1517
1518#[derive(Debug)]
1521enum PendingRequest {
1522 CreateTarget(OneshotSender<Result<Page>>),
1525 GetTargets(OneshotSender<Result<Vec<TargetInfo>>>),
1527 Navigate(NavigationId),
1534 ExternalCommand {
1543 tx: OneshotSender<Result<Response>>,
1544 target_id: Option<TargetId>,
1545 },
1546 InternalCommand(TargetId),
1549 CloseBrowser(OneshotSender<Result<CloseReturns>>),
1551}
1552
1553#[derive(Debug)]
1557pub(crate) enum HandlerMessage {
1558 CreatePage(CreateTargetParams, OneshotSender<Result<Page>>),
1559 FetchTargets(OneshotSender<Result<Vec<TargetInfo>>>),
1560 InsertContext(BrowserContext),
1561 DisposeContext(BrowserContext),
1562 GetPages(OneshotSender<Vec<Page>>),
1563 Command(CommandMessage),
1564 GetPage(TargetId, OneshotSender<Option<Page>>),
1565 AddEventListener(EventListenerRequest),
1566 CloseBrowser(OneshotSender<Result<CloseReturns>>),
1567}
1568
1569#[cfg(test)]
1570mod tests {
1571 use super::*;
1572 use chromiumoxide_cdp::cdp::browser_protocol::target::{AttachToTargetReturns, TargetInfo};
1573
1574 #[test]
1575 fn attach_to_target_response_sets_session_id_before_event_arrives() {
1576 let info = TargetInfo::builder()
1577 .target_id("target-1".to_string())
1578 .r#type("page")
1579 .title("")
1580 .url("about:blank")
1581 .attached(false)
1582 .can_access_opener(false)
1583 .build()
1584 .expect("target info");
1585 let mut target = Target::new(info, TargetConfig::default(), BrowserContext::default());
1586 let method: MethodId = AttachToTargetParams::IDENTIFIER.into();
1587 let result = serde_json::to_value(AttachToTargetReturns::new("session-1".to_string()))
1588 .expect("attach result");
1589 let resp = Response {
1590 id: CallId::new(1),
1591 result: Some(result),
1592 error: None,
1593 };
1594
1595 maybe_store_attach_session_id(&mut target, &method, &resp);
1596
1597 assert_eq!(
1598 target.session_id().map(AsRef::as_ref),
1599 Some("session-1"),
1600 "attach response should seed the flat session id even before Target.attachedToTarget"
1601 );
1602 }
1603
1604 #[test]
1609 fn page_channel_capacity_defaults_to_2048_across_configs() {
1610 use crate::browser::BrowserConfigBuilder;
1611 use crate::handler::page::DEFAULT_PAGE_CHANNEL_CAPACITY;
1612 use crate::handler::target::TargetConfig;
1613
1614 assert_eq!(DEFAULT_PAGE_CHANNEL_CAPACITY, 2048);
1615 assert_eq!(
1616 HandlerConfig::default().page_channel_capacity,
1617 DEFAULT_PAGE_CHANNEL_CAPACITY,
1618 "HandlerConfig default must match the historical 2048 slot count"
1619 );
1620 assert_eq!(
1621 TargetConfig::default().page_channel_capacity,
1622 DEFAULT_PAGE_CHANNEL_CAPACITY,
1623 "TargetConfig default must match the historical 2048 slot count"
1624 );
1625 let builder = BrowserConfigBuilder::default();
1628 let bc = format!("{:?}", builder);
1629 assert!(
1630 bc.contains("page_channel_capacity: 2048"),
1631 "BrowserConfigBuilder must default page_channel_capacity to 2048, got: {bc}",
1632 );
1633 }
1634}