1use chromiumoxide_cdp::cdp::js_protocol::runtime::ExecutionContextId;
2use dashmap::DashMap;
3use std::collections::{HashMap, HashSet};
4use std::pin::Pin;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7
8use fnv::FnvHashMap;
9use futures::channel::mpsc::Receiver;
10use futures::channel::oneshot::Sender as OneshotSender;
11use futures::stream::{Fuse, Stream, StreamExt};
12use futures::task::{Context, Poll};
13
14use crate::listeners::{EventListenerRequest, EventListeners};
15use chromiumoxide_cdp::cdp::browser_protocol::browser::*;
16use chromiumoxide_cdp::cdp::browser_protocol::target::*;
17use chromiumoxide_cdp::cdp::events::CdpEvent;
18use chromiumoxide_cdp::cdp::events::CdpEventMessage;
19use chromiumoxide_types::{CallId, Message, Method, Response};
20use chromiumoxide_types::{MethodId, Request as CdpRequest};
21pub(crate) use page::PageInner;
22
23use crate::cmd::{to_command_response, CommandMessage};
24use crate::conn::Connection;
25use crate::error::{CdpError, Result};
26use crate::handler::browser::BrowserContext;
27use crate::handler::frame::FrameNavigationRequest;
28use crate::handler::frame::{NavigationError, NavigationId, NavigationOk};
29use crate::handler::job::PeriodicJob;
30use crate::handler::session::Session;
31use crate::handler::target::TargetEvent;
32use crate::handler::target::{Target, TargetConfig};
33use crate::handler::viewport::Viewport;
34use crate::page::Page;
35
36pub const REQUEST_TIMEOUT: u64 = 30_000;
38
39pub mod browser;
40pub mod commandfuture;
41pub mod domworld;
42pub mod emulation;
43pub mod frame;
44pub mod http;
45pub mod httpfuture;
46mod job;
47pub mod network;
48mod page;
49mod session;
50pub mod target;
51pub mod target_message_future;
52pub mod viewport;
53
54#[must_use = "streams do nothing unless polled"]
57#[derive(Debug)]
58pub struct Handler {
59 pending_commands: FnvHashMap<CallId, (PendingRequest, MethodId, Instant)>,
63 from_browser: Fuse<Receiver<HandlerMessage>>,
65 default_browser_context: BrowserContext,
66 browser_contexts: HashSet<BrowserContext>,
67 target_ids: Vec<TargetId>,
69 targets: HashMap<TargetId, Target>,
71 navigations: FnvHashMap<NavigationId, NavigationRequest>,
73 sessions: HashMap<SessionId, Session>,
77 conn: Connection<CdpEventMessage>,
79 evict_command_timeout: PeriodicJob,
81 next_navigation_id: usize,
83 config: HandlerConfig,
85 event_listeners: EventListeners,
87 closing: bool,
89 contexts: Arc<DashMap<TargetId, ExecutionContextId>>,
91}
92
93impl Handler {
94 pub(crate) fn new(
97 mut conn: Connection<CdpEventMessage>,
98 rx: Receiver<HandlerMessage>,
99 config: HandlerConfig,
100 ) -> Self {
101 let discover = SetDiscoverTargetsParams::new(true);
102 let _ = conn.submit_command(
103 discover.identifier(),
104 None,
105 serde_json::to_value(discover).unwrap(),
106 );
107
108 let browser_contexts = config
109 .context_ids
110 .iter()
111 .map(|id| BrowserContext::from(id.clone()))
112 .collect();
113
114 Self {
115 pending_commands: Default::default(),
116 from_browser: rx.fuse(),
117 default_browser_context: Default::default(),
118 browser_contexts,
119 target_ids: Default::default(),
120 targets: Default::default(),
121 navigations: Default::default(),
122 sessions: Default::default(),
123 conn,
124 evict_command_timeout: PeriodicJob::new(config.request_timeout),
125 next_navigation_id: 0,
126 config,
127 event_listeners: Default::default(),
128 closing: false,
129 contexts: Arc::new(DashMap::new()),
130 }
131 }
132
133 pub fn get_target(&self, target_id: &TargetId) -> Option<&Target> {
135 self.targets.get(target_id)
136 }
137
138 pub fn targets(&self) -> impl Iterator<Item = &Target> + '_ {
140 self.targets.values()
141 }
142
143 pub fn default_browser_context(&self) -> &BrowserContext {
145 &self.default_browser_context
146 }
147
148 pub fn browser_contexts(&self) -> impl Iterator<Item = &BrowserContext> + '_ {
150 self.browser_contexts.iter()
151 }
152
153 fn on_navigation_response(&mut self, id: NavigationId, resp: Response) {
155 if let Some(nav) = self.navigations.remove(&id) {
156 match nav {
157 NavigationRequest::Navigate(mut nav) => {
158 if nav.navigated {
159 let _ = nav.tx.send(Ok(resp));
160 } else {
161 nav.set_response(resp);
162 self.navigations
163 .insert(id, NavigationRequest::Navigate(nav));
164 }
165 }
166 }
167 }
168 }
169
170 fn on_navigation_lifecycle_completed(&mut self, res: Result<NavigationOk, NavigationError>) {
172 match res {
173 Ok(ok) => {
174 let id = *ok.navigation_id();
175 if let Some(nav) = self.navigations.remove(&id) {
176 match nav {
177 NavigationRequest::Navigate(mut nav) => {
178 if let Some(resp) = nav.response.take() {
179 let _ = nav.tx.send(Ok(resp));
180 } else {
181 nav.set_navigated();
182 self.navigations
183 .insert(id, NavigationRequest::Navigate(nav));
184 }
185 }
186 }
187 }
188 }
189 Err(err) => {
190 if let Some(nav) = self.navigations.remove(err.navigation_id()) {
191 match nav {
192 NavigationRequest::Navigate(nav) => {
193 let _ = nav.tx.send(Err(err.into()));
194 }
195 }
196 }
197 }
198 }
199 }
200
201 fn on_response(&mut self, resp: Response) {
203 if let Some((req, method, _)) = self.pending_commands.remove(&resp.id) {
204 match req {
205 PendingRequest::CreateTarget(tx) => {
206 match to_command_response::<CreateTargetParams>(resp, method) {
207 Ok(resp) => {
208 if let Some(target) = self.targets.get_mut(&resp.target_id) {
209 target.set_initiator(tx);
212 } else {
213 panic!("Created target not present")
215 }
216 }
217 Err(err) => {
218 let _ = tx.send(Err(err)).ok();
219 }
220 }
221 }
222 PendingRequest::GetTargets(tx) => {
223 match to_command_response::<GetTargetsParams>(resp, method) {
224 Ok(resp) => {
225 let targets: Vec<TargetInfo> = resp.result.target_infos;
226 let results = targets.clone();
227 for target_info in targets {
228 let target_id = target_info.target_id.clone();
229 let event: EventTargetCreated = EventTargetCreated { target_info };
230 self.on_target_created(event);
231 let attach = AttachToTargetParams::new(target_id);
232 let _ = self.conn.submit_command(
233 attach.identifier(),
234 None,
235 serde_json::to_value(attach).unwrap(),
236 );
237 }
238
239 let _ = tx.send(Ok(results)).ok();
240 }
241 Err(err) => {
242 let _ = tx.send(Err(err)).ok();
243 }
244 }
245 }
246 PendingRequest::Navigate(id) => {
247 self.on_navigation_response(id, resp);
248 }
249 PendingRequest::ExternalCommand(tx) => {
250 let _ = tx.send(Ok(resp)).ok();
251 }
252 PendingRequest::InternalCommand(target_id) => {
253 if let Some(target) = self.targets.get_mut(&target_id) {
254 target.on_response(resp, method.as_ref());
255 }
256 }
257 PendingRequest::CloseBrowser(tx) => {
258 self.closing = true;
259 let _ = tx.send(Ok(CloseReturns {})).ok();
260 }
261 }
262 }
263 }
264
265 pub(crate) fn submit_external_command(
267 &mut self,
268 msg: CommandMessage,
269 now: Instant,
270 ) -> Result<()> {
271 let call_id = self
272 .conn
273 .submit_command(msg.method.clone(), msg.session_id, msg.params)?;
274 self.pending_commands.insert(
275 call_id,
276 (PendingRequest::ExternalCommand(msg.sender), msg.method, now),
277 );
278 Ok(())
279 }
280
281 pub(crate) fn submit_internal_command(
282 &mut self,
283 target_id: TargetId,
284 req: CdpRequest,
285 now: Instant,
286 ) -> Result<()> {
287 let call_id = self.conn.submit_command(
288 req.method.clone(),
289 req.session_id.map(Into::into),
290 req.params,
291 )?;
292 self.pending_commands.insert(
293 call_id,
294 (PendingRequest::InternalCommand(target_id), req.method, now),
295 );
296 Ok(())
297 }
298
299 fn submit_fetch_targets(&mut self, tx: OneshotSender<Result<Vec<TargetInfo>>>, now: Instant) {
300 let msg = GetTargetsParams { filter: None };
301 let method = msg.identifier();
302 let call_id = self
303 .conn
304 .submit_command(method.clone(), None, serde_json::to_value(msg).unwrap())
305 .unwrap();
306
307 self.pending_commands
308 .insert(call_id, (PendingRequest::GetTargets(tx), method, now));
309 }
310
311 fn submit_navigation(&mut self, id: NavigationId, req: CdpRequest, now: Instant) {
314 let call_id = self
315 .conn
316 .submit_command(
317 req.method.clone(),
318 req.session_id.map(Into::into),
319 req.params,
320 )
321 .unwrap();
322
323 self.pending_commands
324 .insert(call_id, (PendingRequest::Navigate(id), req.method, now));
325 }
326
327 fn submit_close(&mut self, tx: OneshotSender<Result<CloseReturns>>, now: Instant) {
328 let close_msg = CloseParams::default();
329 let method = close_msg.identifier();
330
331 let call_id = self
332 .conn
333 .submit_command(
334 method.clone(),
335 None,
336 serde_json::to_value(close_msg).unwrap(),
337 )
338 .unwrap();
339
340 self.pending_commands
341 .insert(call_id, (PendingRequest::CloseBrowser(tx), method, now));
342 }
343
344 fn on_target_message(&mut self, target: &mut Target, msg: CommandMessage, now: Instant) {
346 if msg.is_navigation() {
348 let (req, tx) = msg.split();
349 let id = self.next_navigation_id();
350 target.goto(FrameNavigationRequest::new(id, req));
351 self.navigations.insert(
352 id,
353 NavigationRequest::Navigate(NavigationInProgress::new(tx)),
354 );
355 } else {
356 let _ = self.submit_external_command(msg, now);
357 }
358 }
359
360 fn next_navigation_id(&mut self) -> NavigationId {
362 let id = NavigationId(self.next_navigation_id);
363 self.next_navigation_id = self.next_navigation_id.wrapping_add(1);
364 id
365 }
366
367 fn create_page(&mut self, params: CreateTargetParams, tx: OneshotSender<Result<Page>>) {
378 match url::Url::parse(¶ms.url) {
379 Ok(_) => {
380 let method = params.identifier();
381 match serde_json::to_value(params) {
382 Ok(params) => match self.conn.submit_command(method.clone(), None, params) {
383 Ok(call_id) => {
384 self.pending_commands.insert(
385 call_id,
386 (PendingRequest::CreateTarget(tx), method, Instant::now()),
387 );
388 }
389 Err(err) => {
390 let _ = tx.send(Err(err.into())).ok();
391 }
392 },
393 Err(err) => {
394 let _ = tx.send(Err(err.into())).ok();
395 }
396 }
397 }
398 Err(err) => {
399 let _ = tx.send(Err(err.into())).ok();
400 }
401 }
402 }
403
404 fn on_event(&mut self, event: CdpEventMessage) {
406 if let CdpEvent::RuntimeBindingCalled(ev) = &event.params {
409 if ev.name == "chaser_init" {
410 if let Some(session_id) = &event.session_id {
411 if let Some(session) = self.sessions.get(session_id.as_str()) {
412 self.contexts
414 .insert(session.target_id().clone(), ev.execution_context_id);
415 }
416 }
417 }
418 }
419
420 if let Some(ref session_id) = event.session_id {
421 if let Some(session) = self.sessions.get(session_id.as_str()) {
422 if let Some(target) = self.targets.get_mut(session.target_id()) {
423 return target.on_event(event);
424 }
425 }
426 }
427
428 let CdpEventMessage { params, method, .. } = event;
429 match params.clone() {
430 CdpEvent::TargetTargetCreated(ev) => self.on_target_created(*ev),
431 CdpEvent::TargetAttachedToTarget(ev) => self.on_attached_to_target(ev),
432 CdpEvent::TargetTargetDestroyed(ev) => self.on_target_destroyed(ev),
433 CdpEvent::TargetDetachedFromTarget(ev) => self.on_detached_from_target(ev),
434 _ => {}
435 }
436 chromiumoxide_cdp::consume_event!(match params {
437 |ev| self.event_listeners.start_send(ev),
438 |json| { let _ = self.event_listeners.try_send_custom(&method, json);}
439 });
440 }
441
442 fn on_target_created(&mut self, event: EventTargetCreated) {
446 let browser_ctx = event
447 .target_info
448 .browser_context_id
449 .clone()
450 .map(BrowserContext::from)
451 .filter(|id| self.browser_contexts.contains(id))
452 .unwrap_or_else(|| self.default_browser_context.clone());
453 let target = Target::new(
454 event.target_info,
455 TargetConfig {
456 ignore_https_errors: self.config.ignore_https_errors,
457 request_timeout: self.config.request_timeout,
458 viewport: self.config.viewport.clone(),
459 request_intercept: self.config.request_intercept,
460 cache_enabled: self.config.cache_enabled,
461 },
462 browser_ctx,
463 );
464 self.target_ids.push(target.target_id().clone());
465 self.targets.insert(target.target_id().clone(), target);
466 }
467
468 fn on_attached_to_target(&mut self, event: Box<EventAttachedToTarget>) {
470 let session = Session::new(event.session_id.clone(), event.target_info.target_id);
471 if let Some(target) = self.targets.get_mut(session.target_id()) {
472 target.set_session_id(session.session_id().clone())
473 }
474 self.sessions.insert(event.session_id, session);
475 }
476
477 fn on_detached_from_target(&mut self, event: EventDetachedFromTarget) {
481 if let Some(session) = self.sessions.remove(&event.session_id) {
483 if let Some(target) = self.targets.get_mut(session.target_id()) {
484 target.session_id_mut().take();
485 }
486 }
487 }
488
489 fn on_target_destroyed(&mut self, event: EventTargetDestroyed) {
491 if let Some(target) = self.targets.remove(&event.target_id) {
492 if let Some(session) = target.session_id() {
494 self.sessions.remove(session);
495 }
496 }
497 }
498
499 fn evict_timed_out_commands(&mut self, now: Instant) {
504 let timed_out = self
505 .pending_commands
506 .iter()
507 .filter(|(_, (_, _, timestamp))| now > (*timestamp + self.config.request_timeout))
508 .map(|(k, _)| *k)
509 .collect::<Vec<_>>();
510 for call in timed_out {
511 if let Some((req, _, _)) = self.pending_commands.remove(&call) {
512 match req {
513 PendingRequest::CreateTarget(tx) => {
514 let _ = tx.send(Err(CdpError::Timeout));
515 }
516 PendingRequest::GetTargets(tx) => {
517 let _ = tx.send(Err(CdpError::Timeout));
518 }
519 PendingRequest::Navigate(nav) => {
520 if let Some(nav) = self.navigations.remove(&nav) {
521 match nav {
522 NavigationRequest::Navigate(nav) => {
523 let _ = nav.tx.send(Err(CdpError::Timeout));
524 }
525 }
526 }
527 }
528 PendingRequest::ExternalCommand(tx) => {
529 let _ = tx.send(Err(CdpError::Timeout));
530 }
531 PendingRequest::InternalCommand(_) => {}
532 PendingRequest::CloseBrowser(tx) => {
533 let _ = tx.send(Err(CdpError::Timeout));
534 }
535 }
536 }
537 }
538 }
539
540 pub fn event_listeners_mut(&mut self) -> &mut EventListeners {
541 &mut self.event_listeners
542 }
543
544 pub fn contexts(&self) -> Arc<DashMap<TargetId, ExecutionContextId>> {
545 self.contexts.clone()
546 }
547}
548
549impl Stream for Handler {
550 type Item = Result<()>;
551
552 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
553 let pin = self.get_mut();
554
555 loop {
556 let now = Instant::now();
557 while let Poll::Ready(Some(msg)) = Pin::new(&mut pin.from_browser).poll_next(cx) {
561 match msg {
562 HandlerMessage::Command(cmd) => {
563 pin.submit_external_command(cmd, now)?;
564 }
565 HandlerMessage::FetchTargets(tx) => {
566 pin.submit_fetch_targets(tx, now);
567 }
568 HandlerMessage::CloseBrowser(tx) => {
569 pin.submit_close(tx, now);
570 }
571 HandlerMessage::CreatePage(params, tx) => {
572 pin.create_page(params, tx);
573 }
574 HandlerMessage::GetPages(tx) => {
575 let pages: Vec<_> = pin
576 .targets
577 .values_mut()
578 .filter(|p| p.is_page())
579 .filter_map(|target| target.get_or_create_page())
580 .map(|page| Page::from(page.clone()))
581 .collect();
582 let _ = tx.send(pages);
583 }
584 HandlerMessage::InsertContext(ctx) => {
585 pin.browser_contexts.insert(ctx);
586 }
587 HandlerMessage::DisposeContext(ctx) => {
588 pin.browser_contexts.remove(&ctx);
589 }
590 HandlerMessage::GetPage(target_id, tx) => {
591 let page = pin
592 .targets
593 .get_mut(&target_id)
594 .and_then(|target| target.get_or_create_page())
595 .map(|page| Page::from(page.clone()));
596 let _ = tx.send(page);
597 }
598 HandlerMessage::AddEventListener(req) => {
599 pin.event_listeners.add_listener(req);
600 }
601 }
602 }
603
604 for n in (0..pin.target_ids.len()).rev() {
605 let target_id = pin.target_ids.swap_remove(n);
606 if let Some((id, mut target)) = pin.targets.remove_entry(&target_id) {
607 while let Some(event) = target.poll(cx, now) {
608 match event {
609 TargetEvent::Request(req) => {
610 let _ = pin.submit_internal_command(
611 target.target_id().clone(),
612 req,
613 now,
614 );
615 }
616 TargetEvent::Command(msg) => {
617 pin.on_target_message(&mut target, msg, now);
618 }
619 TargetEvent::NavigationRequest(id, req) => {
620 pin.submit_navigation(id, req, now);
621 }
622 TargetEvent::NavigationResult(res) => {
623 pin.on_navigation_lifecycle_completed(res)
624 }
625 }
626 }
627
628 target.event_listeners_mut().poll(cx);
630 pin.event_listeners_mut().poll(cx);
632
633 pin.targets.insert(id, target);
634 pin.target_ids.push(target_id);
635 }
636 }
637
638 let mut done = true;
639
640 while let Poll::Ready(Some(ev)) = Pin::new(&mut pin.conn).poll_next(cx) {
641 match ev {
642 Ok(Message::Response(resp)) => {
643 pin.on_response(resp);
644 if pin.closing {
645 return Poll::Ready(None);
647 }
648 }
649 Ok(Message::Event(ev)) => {
650 pin.on_event(ev);
651 }
652 Err(err @ CdpError::InvalidMessage(_, _)) => {
653 if pin.config.ignore_invalid_messages {
654 tracing::warn!("WS Invalid message: {}", err);
655 } else {
656 return Poll::Ready(Some(Err(err)));
657 }
658 }
659 Err(err) => {
660 tracing::error!("WS Connection error: {:?}", err);
661 return Poll::Ready(Some(Err(err)));
662 }
663 }
664 done = false;
665 }
666
667 if pin.evict_command_timeout.poll_ready(cx) {
668 pin.evict_timed_out_commands(now);
670 }
671
672 if done {
673 return Poll::Pending;
675 }
676 }
677 }
678}
679
680#[derive(Debug, Clone)]
682pub struct HandlerConfig {
683 pub ignore_https_errors: bool,
685 pub ignore_invalid_messages: bool,
687 pub viewport: Option<Viewport>,
689 pub context_ids: Vec<BrowserContextId>,
691 pub request_timeout: Duration,
693 pub request_intercept: bool,
695 pub cache_enabled: bool,
697}
698
699impl Default for HandlerConfig {
700 fn default() -> Self {
701 Self {
702 ignore_https_errors: true,
703 ignore_invalid_messages: true,
704 viewport: Default::default(),
705 context_ids: Vec::new(),
706 request_timeout: Duration::from_millis(REQUEST_TIMEOUT),
707 request_intercept: false,
708 cache_enabled: true,
709 }
710 }
711}
712
713#[derive(Debug)]
715pub struct NavigationInProgress<T> {
716 navigated: bool,
718 response: Option<Response>,
720 tx: OneshotSender<T>,
722}
723
724impl<T> NavigationInProgress<T> {
725 fn new(tx: OneshotSender<T>) -> Self {
726 Self {
727 navigated: false,
728 response: None,
729 tx,
730 }
731 }
732
733 fn set_response(&mut self, resp: Response) {
735 self.response = Some(resp);
736 }
737
738 fn set_navigated(&mut self) {
740 self.navigated = true;
741 }
742}
743
744#[derive(Debug)]
746enum NavigationRequest {
747 Navigate(NavigationInProgress<Result<Response>>),
749 }
751
752#[derive(Debug)]
755enum PendingRequest {
756 CreateTarget(OneshotSender<Result<Page>>),
759 GetTargets(OneshotSender<Result<Vec<TargetInfo>>>),
761 Navigate(NavigationId),
768 ExternalCommand(OneshotSender<Result<Response>>),
770 InternalCommand(TargetId),
773 CloseBrowser(OneshotSender<Result<CloseReturns>>),
775}
776
777#[derive(Debug)]
781pub(crate) enum HandlerMessage {
782 CreatePage(CreateTargetParams, OneshotSender<Result<Page>>),
783 FetchTargets(OneshotSender<Result<Vec<TargetInfo>>>),
784 InsertContext(BrowserContext),
785 DisposeContext(BrowserContext),
786 GetPages(OneshotSender<Vec<Page>>),
787 Command(CommandMessage),
788 GetPage(TargetId, OneshotSender<Option<Page>>),
789 AddEventListener(EventListenerRequest),
790 CloseBrowser(OneshotSender<Result<CloseReturns>>),
791}