1use crate::listeners::{EventListenerRequest, EventListeners};
2use chromiumoxide_cdp::cdp::browser_protocol::browser::*;
3use chromiumoxide_cdp::cdp::browser_protocol::target::*;
4use chromiumoxide_cdp::cdp::events::CdpEvent;
5use chromiumoxide_cdp::cdp::events::CdpEventMessage;
6use chromiumoxide_types::{CallId, Message, Method, Response};
7use chromiumoxide_types::{MethodId, Request as CdpRequest};
8use fnv::FnvHashMap;
9use futures_util::Stream;
10use hashbrown::{HashMap, HashSet};
11use spider_network_blocker::intercept_manager::NetworkInterceptManager;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14use std::time::{Duration, Instant};
15use tokio::sync::mpsc::Receiver;
16use tokio::sync::oneshot::Sender as OneshotSender;
17use tokio_tungstenite::tungstenite::error::ProtocolError;
18use tokio_tungstenite::tungstenite::Error;
19
20use std::sync::Arc;
21use tokio::sync::Notify;
22
23use crate::cmd::{to_command_response, CommandMessage};
24use crate::conn::Connection;
25use crate::error::{CdpError, Result};
26use crate::handler::browser::BrowserContext;
27use crate::handler::frame::FrameRequestedNavigation;
28use crate::handler::frame::{NavigationError, NavigationId, NavigationOk};
29use crate::handler::job::PeriodicJob;
30use crate::handler::session::Session;
31use crate::handler::target::TargetEvent;
32use crate::handler::target::{Target, TargetConfig};
33use crate::handler::viewport::Viewport;
34use crate::page::Page;
35pub(crate) use page::PageInner;
36
37pub const REQUEST_TIMEOUT: u64 = 30_000;
39
40pub mod blockers;
41pub mod browser;
42pub mod commandfuture;
43pub mod domworld;
44pub mod emulation;
45pub mod frame;
46pub mod http;
47pub mod httpfuture;
48mod job;
49pub mod network;
50pub mod network_utils;
51pub mod page;
52pub mod sender;
53mod session;
54pub mod target;
55pub mod target_message_future;
56pub mod viewport;
57
58#[must_use = "streams do nothing unless polled"]
61#[derive(Debug)]
62pub struct Handler {
63 pub default_browser_context: BrowserContext,
64 pub browser_contexts: HashSet<BrowserContext>,
65 pending_commands: FnvHashMap<CallId, (PendingRequest, MethodId, Instant)>,
69 from_browser: Receiver<HandlerMessage>,
71 target_ids: Vec<TargetId>,
73 targets: HashMap<TargetId, Target>,
75 navigations: FnvHashMap<NavigationId, NavigationRequest>,
77 sessions: HashMap<SessionId, Session>,
81 conn: Option<Connection<CdpEventMessage>>,
84 evict_command_timeout: PeriodicJob,
86 next_navigation_id: usize,
88 config: HandlerConfig,
90 event_listeners: EventListeners,
92 closing: bool,
94 remaining_bytes: Option<u64>,
96 budget_exhausted: bool,
98 attached_targets: HashSet<TargetId>,
100 page_wake: Option<Arc<Notify>>,
103}
104
105lazy_static::lazy_static! {
106 static ref DISCOVER_ID: (std::borrow::Cow<'static, str>, serde_json::Value) = {
108 let discover = SetDiscoverTargetsParams::new(true);
109 (discover.identifier(), serde_json::to_value(discover).expect("valid discover target params"))
110 };
111 static ref TARGET_PARAMS_ID: (std::borrow::Cow<'static, str>, serde_json::Value) = {
113 let msg = GetTargetsParams { filter: None };
114 (msg.identifier(), serde_json::to_value(msg).expect("valid paramtarget"))
115 };
116 static ref CLOSE_PARAMS_ID: (std::borrow::Cow<'static, str>, serde_json::Value) = {
118 let close_msg = CloseParams::default();
119 (close_msg.identifier(), serde_json::to_value(close_msg).expect("valid close params"))
120 };
121}
122
123fn maybe_store_attach_session_id(target: &mut Target, method: &MethodId, resp: &Response) {
124 if method.as_ref() != AttachToTargetParams::IDENTIFIER {
125 return;
126 }
127
128 if let Ok(resp) = to_command_response::<AttachToTargetParams>(resp.clone(), method.clone()) {
129 target.set_session_id(resp.result.session_id);
130 }
131}
132
133impl Handler {
134 pub(crate) fn new(
137 mut conn: Connection<CdpEventMessage>,
138 rx: Receiver<HandlerMessage>,
139 config: HandlerConfig,
140 ) -> Self {
141 let discover = DISCOVER_ID.clone();
142 let _ = conn.submit_command(discover.0, None, discover.1);
143 let conn = Some(conn);
144
145 let browser_contexts = config
146 .context_ids
147 .iter()
148 .map(|id| BrowserContext::from(id.clone()))
149 .collect();
150
151 Self {
152 pending_commands: Default::default(),
153 from_browser: rx,
154 default_browser_context: Default::default(),
155 browser_contexts,
156 target_ids: Default::default(),
157 targets: Default::default(),
158 navigations: Default::default(),
159 sessions: Default::default(),
160 conn,
161 evict_command_timeout: PeriodicJob::new(config.request_timeout),
162 next_navigation_id: 0,
163 config,
164 event_listeners: Default::default(),
165 closing: false,
166 remaining_bytes: None,
167 budget_exhausted: false,
168 attached_targets: Default::default(),
169 page_wake: None,
170 }
171 }
172
173 #[inline]
176 fn conn(&mut self) -> Result<&mut Connection<CdpEventMessage>> {
177 self.conn
178 .as_mut()
179 .ok_or_else(|| CdpError::msg("connection consumed by Handler::run()"))
180 }
181
182 pub fn get_target(&self, target_id: &TargetId) -> Option<&Target> {
184 self.targets.get(target_id)
185 }
186
187 pub fn targets(&self) -> impl Iterator<Item = &Target> + '_ {
189 self.targets.values()
190 }
191
192 pub fn default_browser_context(&self) -> &BrowserContext {
194 &self.default_browser_context
195 }
196
197 pub fn browser_contexts(&self) -> impl Iterator<Item = &BrowserContext> + '_ {
199 self.browser_contexts.iter()
200 }
201
202 fn on_navigation_response(&mut self, id: NavigationId, resp: Response) {
204 if let Some(nav) = self.navigations.remove(&id) {
205 match nav {
206 NavigationRequest::Navigate(mut nav) => {
207 if nav.navigated {
208 let _ = nav.tx.send(Ok(resp));
209 } else {
210 nav.set_response(resp);
211 self.navigations
212 .insert(id, NavigationRequest::Navigate(nav));
213 }
214 }
215 }
216 }
217 }
218
219 fn on_navigation_lifecycle_completed(&mut self, res: Result<NavigationOk, NavigationError>) {
221 match res {
222 Ok(ok) => {
223 let id = *ok.navigation_id();
224 if let Some(nav) = self.navigations.remove(&id) {
225 match nav {
226 NavigationRequest::Navigate(mut nav) => {
227 if let Some(resp) = nav.response.take() {
228 let _ = nav.tx.send(Ok(resp));
229 } else {
230 nav.set_navigated();
231 self.navigations
232 .insert(id, NavigationRequest::Navigate(nav));
233 }
234 }
235 }
236 }
237 }
238 Err(err) => {
239 if let Some(nav) = self.navigations.remove(err.navigation_id()) {
240 match nav {
241 NavigationRequest::Navigate(nav) => {
242 let _ = nav.tx.send(Err(err.into()));
243 }
244 }
245 }
246 }
247 }
248 }
249
250 fn on_response(&mut self, resp: Response) {
252 if let Some((req, method, _)) = self.pending_commands.remove(&resp.id) {
253 match req {
254 PendingRequest::CreateTarget(tx) => {
255 match to_command_response::<CreateTargetParams>(resp, method) {
256 Ok(resp) => {
257 if let Some(target) = self.targets.get_mut(&resp.target_id) {
258 target.set_initiator(tx);
259 } else {
260 let _ = tx.send(Err(CdpError::NotFound)).ok();
261 }
262 }
263 Err(err) => {
264 let _ = tx.send(Err(err)).ok();
265 }
266 }
267 }
268 PendingRequest::GetTargets(tx) => {
269 match to_command_response::<GetTargetsParams>(resp, method) {
270 Ok(resp) => {
271 let targets = resp.result.target_infos;
272 let results = targets.clone();
273
274 for target_info in targets {
275 let event: EventTargetCreated = EventTargetCreated { target_info };
276 self.on_target_created(event);
277 }
278
279 let _ = tx.send(Ok(results)).ok();
280 }
281 Err(err) => {
282 let _ = tx.send(Err(err)).ok();
283 }
284 }
285 }
286 PendingRequest::Navigate(id) => {
287 self.on_navigation_response(id, resp);
288 if self.config.only_html && !self.config.created_first_target {
289 self.config.created_first_target = true;
290 }
291 }
292 PendingRequest::ExternalCommand(tx) => {
293 let _ = tx.send(Ok(resp)).ok();
294 }
295 PendingRequest::InternalCommand(target_id) => {
296 if let Some(target) = self.targets.get_mut(&target_id) {
297 maybe_store_attach_session_id(target, &method, &resp);
298 target.on_response(resp, method.as_ref());
299 }
300 }
301 PendingRequest::CloseBrowser(tx) => {
302 self.closing = true;
303 let _ = tx.send(Ok(CloseReturns {})).ok();
304 }
305 }
306 }
307 }
308
309 pub(crate) fn submit_external_command(
311 &mut self,
312 msg: CommandMessage,
313 now: Instant,
314 ) -> Result<()> {
315 let call_id =
316 self.conn()?
317 .submit_command(msg.method.clone(), msg.session_id, msg.params)?;
318 self.pending_commands.insert(
319 call_id,
320 (PendingRequest::ExternalCommand(msg.sender), msg.method, now),
321 );
322 Ok(())
323 }
324
325 pub(crate) fn submit_internal_command(
326 &mut self,
327 target_id: TargetId,
328 req: CdpRequest,
329 now: Instant,
330 ) -> Result<()> {
331 let call_id = self.conn()?.submit_command(
332 req.method.clone(),
333 req.session_id.map(Into::into),
334 req.params,
335 )?;
336 self.pending_commands.insert(
337 call_id,
338 (PendingRequest::InternalCommand(target_id), req.method, now),
339 );
340 Ok(())
341 }
342
343 fn submit_fetch_targets(&mut self, tx: OneshotSender<Result<Vec<TargetInfo>>>, now: Instant) {
344 let msg = TARGET_PARAMS_ID.clone();
345
346 if let Some(conn) = self.conn.as_mut() {
347 if let Ok(call_id) = conn.submit_command(msg.0.clone(), None, msg.1) {
348 self.pending_commands
349 .insert(call_id, (PendingRequest::GetTargets(tx), msg.0, now));
350 }
351 }
352 }
353
354 fn submit_navigation(&mut self, id: NavigationId, req: CdpRequest, now: Instant) {
357 if let Some(conn) = self.conn.as_mut() {
358 if let Ok(call_id) = conn.submit_command(
359 req.method.clone(),
360 req.session_id.map(Into::into),
361 req.params,
362 ) {
363 self.pending_commands
364 .insert(call_id, (PendingRequest::Navigate(id), req.method, now));
365 }
366 }
367 }
368
369 fn submit_close(&mut self, tx: OneshotSender<Result<CloseReturns>>, now: Instant) {
370 let close_msg = CLOSE_PARAMS_ID.clone();
371
372 if let Some(conn) = self.conn.as_mut() {
373 if let Ok(call_id) = conn.submit_command(close_msg.0.clone(), None, close_msg.1) {
374 self.pending_commands.insert(
375 call_id,
376 (PendingRequest::CloseBrowser(tx), close_msg.0, now),
377 );
378 }
379 }
380 }
381
382 fn on_target_message(&mut self, target: &mut Target, msg: CommandMessage, now: Instant) {
384 if msg.is_navigation() {
385 let (req, tx) = msg.split();
386 let id = self.next_navigation_id();
387
388 target.goto(FrameRequestedNavigation::new(
389 id,
390 req,
391 self.config.request_timeout,
392 ));
393
394 self.navigations.insert(
395 id,
396 NavigationRequest::Navigate(NavigationInProgress::new(tx)),
397 );
398 } else {
399 let _ = self.submit_external_command(msg, now);
400 }
401 }
402
403 fn next_navigation_id(&mut self) -> NavigationId {
405 let id = NavigationId(self.next_navigation_id);
406 self.next_navigation_id = self.next_navigation_id.wrapping_add(1);
407 id
408 }
409
410 fn create_page(&mut self, params: CreateTargetParams, tx: OneshotSender<Result<Page>>) {
421 let about_blank = params.url == "about:blank";
422 let http_check =
423 !about_blank && params.url.starts_with("http") || params.url.starts_with("file://");
424
425 if about_blank || http_check {
426 let method = params.identifier();
427
428 let Some(conn) = self.conn.as_mut() else {
429 let _ = tx.send(Err(CdpError::msg("connection consumed"))).ok();
430 return;
431 };
432 match serde_json::to_value(params) {
433 Ok(params) => match conn.submit_command(method.clone(), None, params) {
434 Ok(call_id) => {
435 self.pending_commands.insert(
436 call_id,
437 (PendingRequest::CreateTarget(tx), method, Instant::now()),
438 );
439 }
440 Err(err) => {
441 let _ = tx.send(Err(err.into())).ok();
442 }
443 },
444 Err(err) => {
445 let _ = tx.send(Err(err.into())).ok();
446 }
447 }
448 } else {
449 let _ = tx.send(Err(CdpError::NotFound)).ok();
450 }
451 }
452
453 fn on_event(&mut self, event: CdpEventMessage) {
455 if let Some(session_id) = &event.session_id {
456 if let Some(session) = self.sessions.get(session_id.as_str()) {
457 if let Some(target) = self.targets.get_mut(session.target_id()) {
458 return target.on_event(event);
459 }
460 }
461 }
462 let CdpEventMessage { params, method, .. } = event;
463
464 match params {
465 CdpEvent::TargetTargetCreated(ref ev) => self.on_target_created((**ev).clone()),
466 CdpEvent::TargetAttachedToTarget(ref ev) => self.on_attached_to_target(ev.clone()),
467 CdpEvent::TargetTargetDestroyed(ref ev) => self.on_target_destroyed(ev.clone()),
468 CdpEvent::TargetDetachedFromTarget(ref ev) => self.on_detached_from_target(ev.clone()),
469 _ => {}
470 }
471
472 chromiumoxide_cdp::consume_event!(match params {
473 |ev| self.event_listeners.start_send(ev),
474 |json| { let _ = self.event_listeners.try_send_custom(&method, json);}
475 });
476 }
477
478 fn on_target_created(&mut self, event: EventTargetCreated) {
482 if !self.browser_contexts.is_empty() {
483 if let Some(ref context_id) = event.target_info.browser_context_id {
484 let bc = BrowserContext {
485 id: Some(context_id.clone()),
486 };
487 if !self.browser_contexts.contains(&bc) {
488 return;
489 }
490 }
491 }
492 let browser_ctx = event
493 .target_info
494 .browser_context_id
495 .clone()
496 .map(BrowserContext::from)
497 .unwrap_or_else(|| self.default_browser_context.clone());
498 let target = Target::new(
499 event.target_info,
500 TargetConfig {
501 ignore_https_errors: self.config.ignore_https_errors,
502 request_timeout: self.config.request_timeout,
503 viewport: self.config.viewport.clone(),
504 request_intercept: self.config.request_intercept,
505 cache_enabled: self.config.cache_enabled,
506 service_worker_enabled: self.config.service_worker_enabled,
507 ignore_visuals: self.config.ignore_visuals,
508 ignore_stylesheets: self.config.ignore_stylesheets,
509 ignore_javascript: self.config.ignore_javascript,
510 ignore_analytics: self.config.ignore_analytics,
511 ignore_prefetch: self.config.ignore_prefetch,
512 extra_headers: self.config.extra_headers.clone(),
513 only_html: self.config.only_html && self.config.created_first_target,
514 intercept_manager: self.config.intercept_manager,
515 max_bytes_allowed: self.config.max_bytes_allowed,
516 whitelist_patterns: self.config.whitelist_patterns.clone(),
517 blacklist_patterns: self.config.blacklist_patterns.clone(),
518 #[cfg(feature = "adblock")]
519 adblock_filter_rules: self.config.adblock_filter_rules.clone(),
520 page_wake: self.page_wake.clone(),
521 },
522 browser_ctx,
523 );
524
525 let tid = target.target_id().clone();
526 self.target_ids.push(tid.clone());
527 self.targets.insert(tid, target);
528 }
529
530 fn on_attached_to_target(&mut self, event: Box<EventAttachedToTarget>) {
532 let session = Session::new(event.session_id.clone(), event.target_info.target_id);
533 if let Some(target) = self.targets.get_mut(session.target_id()) {
534 target.set_session_id(session.session_id().clone())
535 }
536 self.sessions.insert(event.session_id, session);
537 }
538
539 fn on_detached_from_target(&mut self, event: EventDetachedFromTarget) {
543 if let Some(session) = self.sessions.remove(&event.session_id) {
545 if let Some(target) = self.targets.get_mut(session.target_id()) {
546 target.session_id_mut().take();
547 }
548 }
549 }
550
551 fn on_target_destroyed(&mut self, event: EventTargetDestroyed) {
553 self.attached_targets.remove(&event.target_id);
554
555 if let Some(target) = self.targets.remove(&event.target_id) {
556 if let Some(session) = target.session_id() {
558 self.sessions.remove(session);
559 }
560 }
561 }
562
563 fn evict_timed_out_commands(&mut self, now: Instant) {
568 let deadline = match now.checked_sub(self.config.request_timeout) {
569 Some(d) => d,
570 None => return,
571 };
572
573 let timed_out: Vec<_> = self
574 .pending_commands
575 .iter()
576 .filter(|(_, (_, _, timestamp))| *timestamp < deadline)
577 .map(|(k, _)| *k)
578 .collect();
579
580 for call in timed_out {
581 if let Some((req, _, _)) = self.pending_commands.remove(&call) {
582 match req {
583 PendingRequest::CreateTarget(tx) => {
584 let _ = tx.send(Err(CdpError::Timeout));
585 }
586 PendingRequest::GetTargets(tx) => {
587 let _ = tx.send(Err(CdpError::Timeout));
588 }
589 PendingRequest::Navigate(nav) => {
590 if let Some(nav) = self.navigations.remove(&nav) {
591 match nav {
592 NavigationRequest::Navigate(nav) => {
593 let _ = nav.tx.send(Err(CdpError::Timeout));
594 }
595 }
596 }
597 }
598 PendingRequest::ExternalCommand(tx) => {
599 let _ = tx.send(Err(CdpError::Timeout));
600 }
601 PendingRequest::InternalCommand(_) => {}
602 PendingRequest::CloseBrowser(tx) => {
603 let _ = tx.send(Err(CdpError::Timeout));
604 }
605 }
606 }
607 }
608 }
609
610 pub fn event_listeners_mut(&mut self) -> &mut EventListeners {
611 &mut self.event_listeners
612 }
613
614 pub async fn run(mut self) -> Result<()> {
644 use chromiumoxide_types::Message;
645 use tokio::time::MissedTickBehavior;
646 use tokio_tungstenite::tungstenite::{self, error::ProtocolError};
647
648 let page_wake = Arc::new(Notify::new());
650 self.page_wake = Some(page_wake.clone());
651
652 let conn = self
654 .conn
655 .take()
656 .ok_or_else(|| CdpError::msg("Handler::run() called with no connection"))?;
657 let async_conn = conn.into_async();
658 let mut ws_reader = async_conn.reader;
659 let ws_tx = async_conn.cmd_tx;
660 let mut writer_handle = async_conn.writer_handle;
661 let mut next_call_id = async_conn.next_id;
662
663 let mut alloc_call_id = || {
665 let id = chromiumoxide_types::CallId::new(next_call_id);
666 next_call_id = next_call_id.wrapping_add(1);
667 id
668 };
669
670 let mut evict_timer = tokio::time::interval_at(
672 tokio::time::Instant::now() + self.config.request_timeout,
673 self.config.request_timeout,
674 );
675 evict_timer.set_missed_tick_behavior(MissedTickBehavior::Delay);
676
677 macro_rules! ws_submit {
679 ($method:expr, $session_id:expr, $params:expr) => {{
680 let id = alloc_call_id();
681 let call = chromiumoxide_types::MethodCall {
682 id,
683 method: $method,
684 session_id: $session_id,
685 params: $params,
686 };
687 match ws_tx.try_send(call) {
688 Ok(()) => Ok::<_, CdpError>(id),
689 Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
690 tracing::warn!("WS command channel full — dropping command");
691 Err(CdpError::msg("WS command channel full"))
692 }
693 Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
694 Err(CdpError::msg("WS writer closed"))
695 }
696 }
697 }};
698 }
699
700 loop {
702 let now = std::time::Instant::now();
703
704 const PER_TARGET_DRAIN_BUDGET: usize = 128;
710
711 for n in (0..self.target_ids.len()).rev() {
712 let target_id = self.target_ids.swap_remove(n);
713
714 if let Some((id, mut target)) = self.targets.remove_entry(&target_id) {
715 {
717 let mut msgs = Vec::new();
718 if let Some(handle) = target.page_mut() {
719 while msgs.len() < PER_TARGET_DRAIN_BUDGET {
720 match handle.rx.try_recv() {
721 Ok(msg) => msgs.push(msg),
722 Err(_) => break,
723 }
724 }
725 }
726 for msg in msgs {
727 target.on_page_message(msg);
728 }
729 }
730
731 while let Some(event) = target.advance(now) {
733 match event {
734 TargetEvent::Request(req) => {
735 if let Ok(call_id) = ws_submit!(
736 req.method.clone(),
737 req.session_id.map(Into::into),
738 req.params
739 ) {
740 self.pending_commands.insert(
741 call_id,
742 (
743 PendingRequest::InternalCommand(
744 target.target_id().clone(),
745 ),
746 req.method,
747 now,
748 ),
749 );
750 }
751 }
752 TargetEvent::Command(msg) => {
753 if msg.is_navigation() {
754 let (req, tx) = msg.split();
755 let nav_id = self.next_navigation_id();
756 target.goto(FrameRequestedNavigation::new(
757 nav_id,
758 req.clone(),
759 self.config.request_timeout,
760 ));
761 if let Ok(call_id) = ws_submit!(
762 req.method.clone(),
763 req.session_id.map(Into::into),
764 req.params
765 ) {
766 self.pending_commands.insert(
767 call_id,
768 (PendingRequest::Navigate(nav_id), req.method, now),
769 );
770 }
771 self.navigations.insert(
772 nav_id,
773 NavigationRequest::Navigate(NavigationInProgress::new(tx)),
774 );
775 } else {
776 if let Ok(call_id) = ws_submit!(
777 msg.method.clone(),
778 msg.session_id.map(Into::into),
779 msg.params
780 ) {
781 self.pending_commands.insert(
782 call_id,
783 (
784 PendingRequest::ExternalCommand(msg.sender),
785 msg.method,
786 now,
787 ),
788 );
789 }
790 }
791 }
792 TargetEvent::NavigationRequest(nav_id, req) => {
793 if let Ok(call_id) = ws_submit!(
794 req.method.clone(),
795 req.session_id.map(Into::into),
796 req.params
797 ) {
798 self.pending_commands.insert(
799 call_id,
800 (PendingRequest::Navigate(nav_id), req.method, now),
801 );
802 }
803 }
804 TargetEvent::NavigationResult(res) => {
805 self.on_navigation_lifecycle_completed(res);
806 }
807 TargetEvent::BytesConsumed(n) => {
808 if let Some(rem) = self.remaining_bytes.as_mut() {
809 *rem = rem.saturating_sub(n);
810 if *rem == 0 {
811 self.budget_exhausted = true;
812 }
813 }
814 }
815 }
816 }
817
818 target.event_listeners_mut().flush();
820
821 self.targets.insert(id, target);
822 self.target_ids.push(target_id);
823 }
824 }
825
826 self.event_listeners.flush();
828
829 if self.budget_exhausted {
830 for t in self.targets.values_mut() {
831 t.network_manager.set_block_all(true);
832 }
833 }
834
835 if self.closing {
836 break;
837 }
838
839 tokio::select! {
841 msg = self.from_browser.recv() => {
842 match msg {
843 Some(msg) => {
844 match msg {
845 HandlerMessage::Command(cmd) => {
846 if let Ok(call_id) = ws_submit!(
847 cmd.method.clone(),
848 cmd.session_id.map(Into::into),
849 cmd.params
850 ) {
851 self.pending_commands.insert(
852 call_id,
853 (PendingRequest::ExternalCommand(cmd.sender), cmd.method, now),
854 );
855 }
856 }
857 HandlerMessage::FetchTargets(tx) => {
858 let msg = TARGET_PARAMS_ID.clone();
859 if let Ok(call_id) = ws_submit!(msg.0.clone(), None, msg.1) {
860 self.pending_commands.insert(
861 call_id,
862 (PendingRequest::GetTargets(tx), msg.0, now),
863 );
864 }
865 }
866 HandlerMessage::CloseBrowser(tx) => {
867 let close_msg = CLOSE_PARAMS_ID.clone();
868 if let Ok(call_id) = ws_submit!(close_msg.0.clone(), None, close_msg.1) {
869 self.pending_commands.insert(
870 call_id,
871 (PendingRequest::CloseBrowser(tx), close_msg.0, now),
872 );
873 }
874 }
875 HandlerMessage::CreatePage(params, tx) => {
876 if let Some(ref id) = params.browser_context_id {
877 self.browser_contexts.insert(BrowserContext::from(id.clone()));
878 }
879 self.create_page_async(params, tx, &mut alloc_call_id, &ws_tx, now);
880 }
881 HandlerMessage::GetPages(tx) => {
882 let pages: Vec<_> = self.targets.values_mut()
883 .filter(|p| p.is_page())
884 .filter_map(|target| target.get_or_create_page())
885 .map(|page| Page::from(page.clone()))
886 .collect();
887 let _ = tx.send(pages);
888 }
889 HandlerMessage::InsertContext(ctx) => {
890 if self.default_browser_context.id().is_none() {
891 self.default_browser_context = ctx.clone();
892 }
893 self.browser_contexts.insert(ctx);
894 }
895 HandlerMessage::DisposeContext(ctx) => {
896 self.browser_contexts.remove(&ctx);
897 self.attached_targets.retain(|tid| {
898 self.targets.get(tid)
899 .and_then(|t| t.browser_context_id())
900 .map(|id| Some(id) != ctx.id())
901 .unwrap_or(true)
902 });
903 self.closing = true;
904 }
905 HandlerMessage::GetPage(target_id, tx) => {
906 let page = self.targets.get_mut(&target_id)
907 .and_then(|target| target.get_or_create_page())
908 .map(|page| Page::from(page.clone()));
909 let _ = tx.send(page);
910 }
911 HandlerMessage::AddEventListener(req) => {
912 self.event_listeners.add_listener(req);
913 }
914 }
915 }
916 None => break, }
918 }
919
920 frame = ws_reader.next_message() => {
921 match frame {
922 Some(Ok(boxed_msg)) => match *boxed_msg {
923 Message::Response(resp) => {
924 self.on_response(resp);
925 }
926 Message::Event(ev) => {
927 self.on_event(ev);
928 }
929 },
930 Some(Err(err)) => {
931 tracing::error!("WS Connection error: {:?}", err);
932 if let CdpError::Ws(ref ws_error) = err {
933 match ws_error {
934 tungstenite::Error::AlreadyClosed => break,
935 tungstenite::Error::Protocol(detail)
936 if detail == &ProtocolError::ResetWithoutClosingHandshake =>
937 {
938 break;
939 }
940 _ => return Err(err),
941 }
942 } else {
943 return Err(err);
944 }
945 }
946 None => break, }
948 }
949
950 _ = page_wake.notified() => {
951 }
953
954 _ = evict_timer.tick() => {
955 self.evict_timed_out_commands(now);
956 for t in self.targets.values_mut() {
957 t.network_manager.evict_stale_entries(now);
958 t.frame_manager_mut().evict_stale_context_ids();
959 }
960 }
961
962 result = &mut writer_handle => {
963 match result {
965 Ok(Ok(())) => break,
966 Ok(Err(e)) => return Err(e),
967 Err(e) => return Err(CdpError::msg(format!("WS writer panicked: {e}"))),
968 }
969 }
970 }
971 }
972
973 writer_handle.abort();
974 Ok(())
975 }
976
977 fn create_page_async(
979 &mut self,
980 params: CreateTargetParams,
981 tx: OneshotSender<Result<Page>>,
982 alloc_call_id: &mut impl FnMut() -> chromiumoxide_types::CallId,
983 ws_tx: &tokio::sync::mpsc::Sender<chromiumoxide_types::MethodCall>,
984 now: std::time::Instant,
985 ) {
986 let about_blank = params.url == "about:blank";
987 let http_check =
988 !about_blank && params.url.starts_with("http") || params.url.starts_with("file://");
989
990 if about_blank || http_check {
991 let method = params.identifier();
992 match serde_json::to_value(params) {
993 Ok(params) => {
994 let id = alloc_call_id();
995 let call = chromiumoxide_types::MethodCall {
996 id,
997 method: method.clone(),
998 session_id: None,
999 params,
1000 };
1001 match ws_tx.try_send(call) {
1002 Ok(()) => {
1003 self.pending_commands
1004 .insert(id, (PendingRequest::CreateTarget(tx), method, now));
1005 }
1006 Err(_) => {
1007 let _ = tx
1008 .send(Err(CdpError::msg("WS command channel full or closed")))
1009 .ok();
1010 }
1011 }
1012 }
1013 Err(err) => {
1014 let _ = tx.send(Err(err.into())).ok();
1015 }
1016 }
1017 } else {
1018 let _ = tx.send(Err(CdpError::NotFound)).ok();
1019 }
1020 }
1021}
1022
1023impl Stream for Handler {
1024 type Item = Result<()>;
1025
1026 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1027 let pin = self.get_mut();
1028
1029 let mut dispose = false;
1030
1031 let now = Instant::now();
1032
1033 loop {
1034 while let Poll::Ready(Some(msg)) = pin.from_browser.poll_recv(cx) {
1038 match msg {
1039 HandlerMessage::Command(cmd) => {
1040 pin.submit_external_command(cmd, now)?;
1041 }
1042 HandlerMessage::FetchTargets(tx) => {
1043 pin.submit_fetch_targets(tx, now);
1044 }
1045 HandlerMessage::CloseBrowser(tx) => {
1046 pin.submit_close(tx, now);
1047 }
1048 HandlerMessage::CreatePage(params, tx) => {
1049 if let Some(ref id) = params.browser_context_id {
1050 pin.browser_contexts
1051 .insert(BrowserContext::from(id.clone()));
1052 }
1053 pin.create_page(params, tx);
1054 }
1055 HandlerMessage::GetPages(tx) => {
1056 let pages: Vec<_> = pin
1057 .targets
1058 .values_mut()
1059 .filter(|p: &&mut Target| p.is_page())
1060 .filter_map(|target| target.get_or_create_page())
1061 .map(|page| Page::from(page.clone()))
1062 .collect();
1063 let _ = tx.send(pages);
1064 }
1065 HandlerMessage::InsertContext(ctx) => {
1066 if pin.default_browser_context.id().is_none() {
1067 pin.default_browser_context = ctx.clone();
1068 }
1069 pin.browser_contexts.insert(ctx);
1070 }
1071 HandlerMessage::DisposeContext(ctx) => {
1072 pin.browser_contexts.remove(&ctx);
1073 pin.attached_targets.retain(|tid| {
1074 pin.targets
1075 .get(tid)
1076 .and_then(|t| t.browser_context_id()) .map(|id| Some(id) != ctx.id())
1078 .unwrap_or(true)
1079 });
1080 pin.closing = true;
1081 dispose = true;
1082 }
1083 HandlerMessage::GetPage(target_id, tx) => {
1084 let page = pin
1085 .targets
1086 .get_mut(&target_id)
1087 .and_then(|target| target.get_or_create_page())
1088 .map(|page| Page::from(page.clone()));
1089 let _ = tx.send(page);
1090 }
1091 HandlerMessage::AddEventListener(req) => {
1092 pin.event_listeners.add_listener(req);
1093 }
1094 }
1095 }
1096
1097 for n in (0..pin.target_ids.len()).rev() {
1098 let target_id = pin.target_ids.swap_remove(n);
1099
1100 if let Some((id, mut target)) = pin.targets.remove_entry(&target_id) {
1101 while let Some(event) = target.poll(cx, now) {
1102 match event {
1103 TargetEvent::Request(req) => {
1104 let _ = pin.submit_internal_command(
1105 target.target_id().clone(),
1106 req,
1107 now,
1108 );
1109 }
1110 TargetEvent::Command(msg) => {
1111 pin.on_target_message(&mut target, msg, now);
1112 }
1113 TargetEvent::NavigationRequest(id, req) => {
1114 pin.submit_navigation(id, req, now);
1115 }
1116 TargetEvent::NavigationResult(res) => {
1117 pin.on_navigation_lifecycle_completed(res)
1118 }
1119 TargetEvent::BytesConsumed(n) => {
1120 if let Some(rem) = pin.remaining_bytes.as_mut() {
1121 *rem = rem.saturating_sub(n);
1122 if *rem == 0 {
1123 pin.budget_exhausted = true;
1124 }
1125 }
1126 }
1127 }
1128 }
1129
1130 target.event_listeners_mut().poll(cx);
1132
1133 pin.targets.insert(id, target);
1134 pin.target_ids.push(target_id);
1135 }
1136 }
1137
1138 pin.event_listeners_mut().poll(cx);
1141
1142 let mut done = true;
1143
1144 let mut ws_msgs = Vec::new();
1147 let mut ws_err = None;
1148 {
1149 let Some(conn) = pin.conn.as_mut() else {
1150 return Poll::Ready(Some(Err(CdpError::msg(
1151 "connection consumed by Handler::run()",
1152 ))));
1153 };
1154 while let Poll::Ready(Some(ev)) = Pin::new(&mut *conn).poll_next(cx) {
1155 match ev {
1156 Ok(msg) => ws_msgs.push(msg),
1157 Err(err) => {
1158 ws_err = Some(err);
1159 break;
1160 }
1161 }
1162 }
1163 }
1164
1165 for boxed_msg in ws_msgs {
1166 match *boxed_msg {
1167 Message::Response(resp) => {
1168 pin.on_response(resp);
1169 if pin.closing {
1170 return Poll::Ready(None);
1171 }
1172 }
1173 Message::Event(ev) => {
1174 pin.on_event(ev);
1175 }
1176 }
1177 done = false;
1178 }
1179
1180 if let Some(err) = ws_err {
1181 tracing::error!("WS Connection error: {:?}", err);
1182 if let CdpError::Ws(ref ws_error) = err {
1183 match ws_error {
1184 Error::AlreadyClosed => {
1185 pin.closing = true;
1186 dispose = true;
1187 }
1188 Error::Protocol(detail)
1189 if detail == &ProtocolError::ResetWithoutClosingHandshake =>
1190 {
1191 pin.closing = true;
1192 dispose = true;
1193 }
1194 _ => return Poll::Ready(Some(Err(err))),
1195 }
1196 } else {
1197 return Poll::Ready(Some(Err(err)));
1198 }
1199 }
1200
1201 if pin.evict_command_timeout.poll_ready(cx) {
1202 pin.evict_timed_out_commands(now);
1204 for t in pin.targets.values_mut() {
1207 t.network_manager.evict_stale_entries(now);
1208 t.frame_manager_mut().evict_stale_context_ids();
1209 }
1210 }
1211
1212 if pin.budget_exhausted {
1213 for t in pin.targets.values_mut() {
1214 t.network_manager.set_block_all(true);
1215 }
1216 }
1217
1218 if dispose {
1219 return Poll::Ready(None);
1220 }
1221
1222 if done {
1223 return Poll::Pending;
1225 }
1226 }
1227 }
1228}
1229
1230#[derive(Debug, Clone)]
1232pub struct HandlerConfig {
1233 pub ignore_https_errors: bool,
1235 pub viewport: Option<Viewport>,
1237 pub context_ids: Vec<BrowserContextId>,
1239 pub request_timeout: Duration,
1241 pub request_intercept: bool,
1243 pub cache_enabled: bool,
1245 pub service_worker_enabled: bool,
1247 pub ignore_visuals: bool,
1249 pub ignore_stylesheets: bool,
1251 pub ignore_javascript: bool,
1253 pub ignore_analytics: bool,
1255 pub ignore_prefetch: bool,
1257 pub ignore_ads: bool,
1259 pub extra_headers: Option<std::collections::HashMap<String, String>>,
1261 pub only_html: bool,
1263 pub created_first_target: bool,
1265 pub intercept_manager: NetworkInterceptManager,
1267 pub max_bytes_allowed: Option<u64>,
1269 pub whitelist_patterns: Option<Vec<String>>,
1271 pub blacklist_patterns: Option<Vec<String>>,
1273 #[cfg(feature = "adblock")]
1275 pub adblock_filter_rules: Option<Vec<String>>,
1276 pub channel_capacity: usize,
1279 pub connection_retries: u32,
1282}
1283
1284impl Default for HandlerConfig {
1285 fn default() -> Self {
1286 Self {
1287 ignore_https_errors: true,
1288 viewport: Default::default(),
1289 context_ids: Vec::new(),
1290 request_timeout: Duration::from_millis(REQUEST_TIMEOUT),
1291 request_intercept: false,
1292 cache_enabled: true,
1293 service_worker_enabled: true,
1294 ignore_visuals: false,
1295 ignore_stylesheets: false,
1296 ignore_ads: false,
1297 ignore_javascript: false,
1298 ignore_analytics: true,
1299 ignore_prefetch: true,
1300 only_html: false,
1301 extra_headers: Default::default(),
1302 created_first_target: false,
1303 intercept_manager: NetworkInterceptManager::Unknown,
1304 max_bytes_allowed: None,
1305 whitelist_patterns: None,
1306 blacklist_patterns: None,
1307 #[cfg(feature = "adblock")]
1308 adblock_filter_rules: None,
1309 channel_capacity: 4096,
1310 connection_retries: crate::conn::DEFAULT_CONNECTION_RETRIES,
1311 }
1312 }
1313}
1314
1315#[derive(Debug)]
1317pub struct NavigationInProgress<T> {
1318 navigated: bool,
1320 response: Option<Response>,
1322 tx: OneshotSender<T>,
1324}
1325
1326impl<T> NavigationInProgress<T> {
1327 fn new(tx: OneshotSender<T>) -> Self {
1328 Self {
1329 navigated: false,
1330 response: None,
1331 tx,
1332 }
1333 }
1334
1335 fn set_response(&mut self, resp: Response) {
1337 self.response = Some(resp);
1338 }
1339
1340 fn set_navigated(&mut self) {
1342 self.navigated = true;
1343 }
1344}
1345
1346#[derive(Debug)]
1348enum NavigationRequest {
1349 Navigate(NavigationInProgress<Result<Response>>),
1351 }
1353
1354#[derive(Debug)]
1357enum PendingRequest {
1358 CreateTarget(OneshotSender<Result<Page>>),
1361 GetTargets(OneshotSender<Result<Vec<TargetInfo>>>),
1363 Navigate(NavigationId),
1370 ExternalCommand(OneshotSender<Result<Response>>),
1372 InternalCommand(TargetId),
1375 CloseBrowser(OneshotSender<Result<CloseReturns>>),
1377}
1378
1379#[derive(Debug)]
1383pub(crate) enum HandlerMessage {
1384 CreatePage(CreateTargetParams, OneshotSender<Result<Page>>),
1385 FetchTargets(OneshotSender<Result<Vec<TargetInfo>>>),
1386 InsertContext(BrowserContext),
1387 DisposeContext(BrowserContext),
1388 GetPages(OneshotSender<Vec<Page>>),
1389 Command(CommandMessage),
1390 GetPage(TargetId, OneshotSender<Option<Page>>),
1391 AddEventListener(EventListenerRequest),
1392 CloseBrowser(OneshotSender<Result<CloseReturns>>),
1393}
1394
1395#[cfg(test)]
1396mod tests {
1397 use super::*;
1398 use chromiumoxide_cdp::cdp::browser_protocol::target::{AttachToTargetReturns, TargetInfo};
1399
1400 #[test]
1401 fn attach_to_target_response_sets_session_id_before_event_arrives() {
1402 let info = TargetInfo::builder()
1403 .target_id("target-1".to_string())
1404 .r#type("page")
1405 .title("")
1406 .url("about:blank")
1407 .attached(false)
1408 .can_access_opener(false)
1409 .build()
1410 .expect("target info");
1411 let mut target = Target::new(info, TargetConfig::default(), BrowserContext::default());
1412 let method: MethodId = AttachToTargetParams::IDENTIFIER.into();
1413 let result = serde_json::to_value(AttachToTargetReturns::new("session-1".to_string()))
1414 .expect("attach result");
1415 let resp = Response {
1416 id: CallId::new(1),
1417 result: Some(result),
1418 error: None,
1419 };
1420
1421 maybe_store_attach_session_id(&mut target, &method, &resp);
1422
1423 assert_eq!(
1424 target.session_id().map(AsRef::as_ref),
1425 Some("session-1"),
1426 "attach response should seed the flat session id even before Target.attachedToTarget"
1427 );
1428 }
1429}