1use crate::handler::blockers::intercept_manager::NetworkInterceptManager;
2use hashbrown::{HashMap, HashSet};
3use std::pin::Pin;
4use std::time::{Duration, Instant};
5use tokio_tungstenite::tungstenite::error::ProtocolError;
6use tokio_tungstenite::tungstenite::Error;
7
8use fnv::FnvHashMap;
9use futures::channel::mpsc::Receiver;
10use futures::channel::oneshot::Sender as OneshotSender;
11use futures::stream::{Fuse, Stream, StreamExt};
12use futures::task::{Context, Poll};
13
14use crate::listeners::{EventListenerRequest, EventListeners};
15use chromiumoxide_cdp::cdp::browser_protocol::browser::*;
16use chromiumoxide_cdp::cdp::browser_protocol::target::*;
17use chromiumoxide_cdp::cdp::events::CdpEvent;
18use chromiumoxide_cdp::cdp::events::CdpEventMessage;
19use chromiumoxide_types::{CallId, Message, Method, Response};
20use chromiumoxide_types::{MethodId, Request as CdpRequest};
21pub(crate) use page::PageInner;
22
23use crate::cmd::{to_command_response, CommandMessage};
24use crate::conn::Connection;
25use crate::error::{CdpError, Result};
26use crate::handler::browser::BrowserContext;
27use crate::handler::frame::FrameRequestedNavigation;
28use crate::handler::frame::{NavigationError, NavigationId, NavigationOk};
29use crate::handler::job::PeriodicJob;
30use crate::handler::session::Session;
31use crate::handler::target::TargetEvent;
32use crate::handler::target::{Target, TargetConfig};
33use crate::handler::viewport::Viewport;
34use crate::page::Page;
35
36pub const REQUEST_TIMEOUT: u64 = 30_000;
38
39pub mod blockers;
40pub mod browser;
41pub mod commandfuture;
42pub mod domworld;
43pub mod emulation;
44pub mod frame;
45pub mod http;
46pub mod httpfuture;
47mod job;
48pub mod network;
49mod page;
50mod session;
51pub mod target;
52pub mod target_message_future;
53pub mod viewport;
54
55#[must_use = "streams do nothing unless polled"]
58#[derive(Debug)]
59pub struct Handler {
60 pub default_browser_context: BrowserContext,
61 pub browser_contexts: HashSet<BrowserContext>,
62 pending_commands: FnvHashMap<CallId, (PendingRequest, MethodId, Instant)>,
66 from_browser: Fuse<Receiver<HandlerMessage>>,
68 target_ids: Vec<TargetId>,
70 targets: HashMap<TargetId, Target>,
72 navigations: FnvHashMap<NavigationId, NavigationRequest>,
74 sessions: HashMap<SessionId, Session>,
78 conn: Connection<CdpEventMessage>,
80 evict_command_timeout: PeriodicJob,
82 next_navigation_id: usize,
84 config: HandlerConfig,
86 event_listeners: EventListeners,
88 closing: bool,
90}
91
92lazy_static::lazy_static! {
93 static ref DISCOVER_ID: (std::borrow::Cow<'static, str>, serde_json::Value) = {
95 let discover = SetDiscoverTargetsParams::new(true);
96 (discover.identifier(), serde_json::to_value(discover).expect("valid discover target params"))
97 };
98 static ref TARGET_PARAMS_ID: (std::borrow::Cow<'static, str>, serde_json::Value) = {
100 let msg = GetTargetsParams { filter: None };
101 (msg.identifier(), serde_json::to_value(msg).expect("valid paramtarget"))
102 };
103 static ref CLOSE_PARAMS_ID: (std::borrow::Cow<'static, str>, serde_json::Value) = {
105 let close_msg = CloseParams::default();
106 (close_msg.identifier(), serde_json::to_value(close_msg).expect("valid close params"))
107 };
108}
109
110impl Handler {
111 pub(crate) fn new(
114 mut conn: Connection<CdpEventMessage>,
115 rx: Receiver<HandlerMessage>,
116 config: HandlerConfig,
117 ) -> Self {
118 let discover = DISCOVER_ID.clone();
119 let _ = conn.submit_command(discover.0, None, discover.1);
120
121 let browser_contexts = config
122 .context_ids
123 .iter()
124 .map(|id| BrowserContext::from(id.clone()))
125 .collect();
126
127 Self {
128 pending_commands: Default::default(),
129 from_browser: rx.fuse(),
130 default_browser_context: Default::default(),
131 browser_contexts,
132 target_ids: Default::default(),
133 targets: Default::default(),
134 navigations: Default::default(),
135 sessions: Default::default(),
136 conn,
137 evict_command_timeout: PeriodicJob::new(config.request_timeout),
138 next_navigation_id: 0,
139 config,
140 event_listeners: Default::default(),
141 closing: false,
142 }
143 }
144
145 pub fn get_target(&self, target_id: &TargetId) -> Option<&Target> {
147 self.targets.get(target_id)
148 }
149
150 pub fn targets(&self) -> impl Iterator<Item = &Target> + '_ {
152 self.targets.values()
153 }
154
155 pub fn default_browser_context(&self) -> &BrowserContext {
157 &self.default_browser_context
158 }
159
160 pub fn browser_contexts(&self) -> impl Iterator<Item = &BrowserContext> + '_ {
162 self.browser_contexts.iter()
163 }
164
165 fn on_navigation_response(&mut self, id: NavigationId, resp: Response) {
167 if let Some(nav) = self.navigations.remove(&id) {
168 match nav {
169 NavigationRequest::Navigate(mut nav) => {
170 if nav.navigated {
171 let _ = nav.tx.send(Ok(resp));
172 } else {
173 nav.set_response(resp);
174 self.navigations
175 .insert(id, NavigationRequest::Navigate(nav));
176 }
177 }
178 }
179 }
180 }
181
182 fn on_navigation_lifecycle_completed(&mut self, res: Result<NavigationOk, NavigationError>) {
184 match res {
185 Ok(ok) => {
186 let id = *ok.navigation_id();
187 if let Some(nav) = self.navigations.remove(&id) {
188 match nav {
189 NavigationRequest::Navigate(mut nav) => {
190 if let Some(resp) = nav.response.take() {
191 let _ = nav.tx.send(Ok(resp));
192 } else {
193 nav.set_navigated();
194 self.navigations
195 .insert(id, NavigationRequest::Navigate(nav));
196 }
197 }
198 }
199 }
200 }
201 Err(err) => {
202 if let Some(nav) = self.navigations.remove(err.navigation_id()) {
203 match nav {
204 NavigationRequest::Navigate(nav) => {
205 let _ = nav.tx.send(Err(err.into()));
206 }
207 }
208 }
209 }
210 }
211 }
212
213 fn on_response(&mut self, resp: Response) {
215 if let Some((req, method, _)) = self.pending_commands.remove(&resp.id) {
216 match req {
217 PendingRequest::CreateTarget(tx) => {
218 match to_command_response::<CreateTargetParams>(resp, method) {
219 Ok(resp) => {
220 if let Some(target) = self.targets.get_mut(&resp.target_id) {
221 target.set_initiator(tx);
224 }
225 }
226 Err(err) => {
227 let _ = tx.send(Err(err)).ok();
228 }
229 }
230 }
231 PendingRequest::GetTargets(tx) => {
232 match to_command_response::<GetTargetsParams>(resp, method) {
233 Ok(resp) => {
234 let targets: Vec<TargetInfo> = resp.result.target_infos;
235 let results = targets.clone();
236 for target_info in targets {
237 let target_id = target_info.target_id.clone();
238 let event: EventTargetCreated = EventTargetCreated { target_info };
239 self.on_target_created(event);
240 let attach = AttachToTargetParams::new(target_id);
241
242 let _ = self.conn.submit_command(
243 attach.identifier(),
244 None,
245 serde_json::to_value(attach).unwrap_or_default(),
246 );
247 }
248
249 let _ = tx.send(Ok(results)).ok();
250 }
251 Err(err) => {
252 let _ = tx.send(Err(err)).ok();
253 }
254 }
255 }
256 PendingRequest::Navigate(id) => {
257 self.on_navigation_response(id, resp);
258 if self.config.only_html && !self.config.created_first_target {
259 self.config.created_first_target = true;
260 }
261 }
262 PendingRequest::ExternalCommand(tx) => {
263 let _ = tx.send(Ok(resp)).ok();
264 }
265 PendingRequest::InternalCommand(target_id) => {
266 if let Some(target) = self.targets.get_mut(&target_id) {
267 target.on_response(resp, method.as_ref());
268 }
269 }
270 PendingRequest::CloseBrowser(tx) => {
271 self.closing = true;
272 let _ = tx.send(Ok(CloseReturns {})).ok();
273 }
274 }
275 }
276 }
277
278 pub(crate) fn submit_external_command(
280 &mut self,
281 msg: CommandMessage,
282 now: Instant,
283 ) -> Result<()> {
284 let call_id = self
285 .conn
286 .submit_command(msg.method.clone(), msg.session_id, msg.params)?;
287 self.pending_commands.insert(
288 call_id,
289 (PendingRequest::ExternalCommand(msg.sender), msg.method, now),
290 );
291 Ok(())
292 }
293
294 pub(crate) fn submit_internal_command(
295 &mut self,
296 target_id: TargetId,
297 req: CdpRequest,
298 now: Instant,
299 ) -> Result<()> {
300 let call_id = self.conn.submit_command(
301 req.method.clone(),
302 req.session_id.map(Into::into),
303 req.params,
304 )?;
305 self.pending_commands.insert(
306 call_id,
307 (PendingRequest::InternalCommand(target_id), req.method, now),
308 );
309 Ok(())
310 }
311
312 fn submit_fetch_targets(&mut self, tx: OneshotSender<Result<Vec<TargetInfo>>>, now: Instant) {
313 let msg = TARGET_PARAMS_ID.clone();
314
315 if let Ok(call_id) = self.conn.submit_command(msg.0.clone(), None, msg.1) {
316 self.pending_commands
317 .insert(call_id, (PendingRequest::GetTargets(tx), msg.0, now));
318 }
319 }
320
321 fn submit_navigation(&mut self, id: NavigationId, req: CdpRequest, now: Instant) {
324 if let Ok(call_id) = self.conn.submit_command(
325 req.method.clone(),
326 req.session_id.map(Into::into),
327 req.params,
328 ) {
329 self.pending_commands
330 .insert(call_id, (PendingRequest::Navigate(id), req.method, now));
331 }
332 }
333
334 fn submit_close(&mut self, tx: OneshotSender<Result<CloseReturns>>, now: Instant) {
335 let close_msg = CLOSE_PARAMS_ID.clone();
336
337 if let Ok(call_id) = self
338 .conn
339 .submit_command(close_msg.0.clone(), None, close_msg.1)
340 {
341 self.pending_commands.insert(
342 call_id,
343 (PendingRequest::CloseBrowser(tx), close_msg.0, now),
344 );
345 }
346 }
347
348 fn on_target_message(&mut self, target: &mut Target, msg: CommandMessage, now: Instant) {
350 if msg.is_navigation() {
351 let (req, tx) = msg.split();
352 let id = self.next_navigation_id();
353
354 target.goto(FrameRequestedNavigation::new(id, req));
355
356 self.navigations.insert(
357 id,
358 NavigationRequest::Navigate(NavigationInProgress::new(tx)),
359 );
360 } else {
361 let _ = self.submit_external_command(msg, now);
362 }
363 }
364
365 fn next_navigation_id(&mut self) -> NavigationId {
367 let id = NavigationId(self.next_navigation_id);
368 self.next_navigation_id = self.next_navigation_id.wrapping_add(1);
369 id
370 }
371
372 fn create_page(&mut self, params: CreateTargetParams, tx: OneshotSender<Result<Page>>) {
383 let about_blank = params.url == "about:blank";
384 let http_check =
385 !about_blank && params.url.starts_with("http") || params.url.starts_with("file://");
386
387 if about_blank || http_check {
388 let method = params.identifier();
389
390 match serde_json::to_value(params) {
391 Ok(params) => match self.conn.submit_command(method.clone(), None, params) {
392 Ok(call_id) => {
393 self.pending_commands.insert(
394 call_id,
395 (PendingRequest::CreateTarget(tx), method, Instant::now()),
396 );
397 }
398 Err(err) => {
399 let _ = tx.send(Err(err.into())).ok();
400 }
401 },
402 Err(err) => {
403 let _ = tx.send(Err(err.into())).ok();
404 }
405 }
406 } else {
407 let _ = tx.send(Err(CdpError::NotFound)).ok();
408 }
409 }
410
411 fn on_event(&mut self, event: CdpEventMessage) {
413 if let Some(ref session_id) = event.session_id {
414 if let Some(session) = self.sessions.get(session_id.as_str()) {
415 if let Some(target) = self.targets.get_mut(session.target_id()) {
416 return target.on_event(event);
417 }
418 }
419 }
420 let CdpEventMessage { params, method, .. } = event;
421
422 match params {
423 CdpEvent::TargetTargetCreated(ref ev) => self.on_target_created(ev.clone()),
424 CdpEvent::TargetAttachedToTarget(ref ev) => self.on_attached_to_target(ev.clone()),
425 CdpEvent::TargetTargetDestroyed(ref ev) => self.on_target_destroyed(ev.clone()),
426 CdpEvent::TargetDetachedFromTarget(ref ev) => self.on_detached_from_target(ev.clone()),
427 _ => {}
428 }
429
430 chromiumoxide_cdp::consume_event!(match params {
431 |ev| self.event_listeners.start_send(ev),
432 |json| { let _ = self.event_listeners.try_send_custom(&method, json);}
433 });
434 }
435
436 fn on_target_created(&mut self, event: EventTargetCreated) {
440 let browser_ctx = match event.target_info.browser_context_id {
441 Some(ref context_id) => {
442 let browser_context = BrowserContext {
443 id: Some(context_id.clone()),
444 };
445 if self.default_browser_context.id.is_none() {
446 self.default_browser_context = browser_context.clone();
447 };
448 self.browser_contexts.insert(browser_context.clone());
449
450 browser_context
451 }
452 _ => event
453 .target_info
454 .browser_context_id
455 .clone()
456 .map(BrowserContext::from)
457 .filter(|id| self.browser_contexts.contains(id))
458 .unwrap_or_else(|| self.default_browser_context.clone()),
459 };
460
461 let target = Target::new(
462 event.target_info,
463 TargetConfig {
464 ignore_https_errors: self.config.ignore_https_errors,
465 request_timeout: self.config.request_timeout,
466 viewport: self.config.viewport.clone(),
467 request_intercept: self.config.request_intercept,
468 cache_enabled: self.config.cache_enabled,
469 service_worker_enabled: self.config.service_worker_enabled,
470 ignore_visuals: self.config.ignore_visuals,
471 ignore_stylesheets: self.config.ignore_stylesheets,
472 ignore_javascript: self.config.ignore_javascript,
473 ignore_analytics: self.config.ignore_analytics,
474 extra_headers: self.config.extra_headers.clone(),
475 only_html: self.config.only_html && self.config.created_first_target,
476 intercept_manager: self.config.intercept_manager,
477 },
478 browser_ctx,
479 );
480
481 self.target_ids.push(target.target_id().clone());
482 self.targets.insert(target.target_id().clone(), target);
483 }
484
485 fn on_attached_to_target(&mut self, event: Box<EventAttachedToTarget>) {
487 let session = Session::new(event.session_id.clone(), event.target_info.target_id);
488 if let Some(target) = self.targets.get_mut(session.target_id()) {
489 target.set_session_id(session.session_id().clone())
490 }
491 self.sessions.insert(event.session_id, session);
492 }
493
494 fn on_detached_from_target(&mut self, event: EventDetachedFromTarget) {
498 if let Some(session) = self.sessions.remove(&event.session_id) {
500 if let Some(target) = self.targets.get_mut(session.target_id()) {
501 target.session_id().take();
502 }
503 }
504 }
505
506 fn on_target_destroyed(&mut self, event: EventTargetDestroyed) {
508 if let Some(target) = self.targets.remove(&event.target_id) {
509 if let Some(session) = target.session_id() {
511 self.sessions.remove(session);
512 }
513 }
514 }
515
516 fn evict_timed_out_commands(&mut self, now: Instant) {
521 let timed_out = self
522 .pending_commands
523 .iter()
524 .filter(|(_, (_, _, timestamp))| now > (*timestamp + self.config.request_timeout))
525 .map(|(k, _)| *k)
526 .collect::<Vec<_>>();
527
528 for call in timed_out {
529 if let Some((req, _, _)) = self.pending_commands.remove(&call) {
530 match req {
531 PendingRequest::CreateTarget(tx) => {
532 let _ = tx.send(Err(CdpError::Timeout));
533 }
534 PendingRequest::GetTargets(tx) => {
535 let _ = tx.send(Err(CdpError::Timeout));
536 }
537 PendingRequest::Navigate(nav) => {
538 if let Some(nav) = self.navigations.remove(&nav) {
539 match nav {
540 NavigationRequest::Navigate(nav) => {
541 let _ = nav.tx.send(Err(CdpError::Timeout));
542 }
543 }
544 }
545 }
546 PendingRequest::ExternalCommand(tx) => {
547 let _ = tx.send(Err(CdpError::Timeout));
548 }
549 PendingRequest::InternalCommand(_) => {}
550 PendingRequest::CloseBrowser(tx) => {
551 let _ = tx.send(Err(CdpError::Timeout));
552 }
553 }
554 }
555 }
556 }
557
558 pub fn event_listeners_mut(&mut self) -> &mut EventListeners {
559 &mut self.event_listeners
560 }
561}
562
563impl Stream for Handler {
564 type Item = Result<()>;
565
566 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
567 let pin = self.get_mut();
568
569 let mut dispose = false;
570
571 loop {
572 let now = Instant::now();
573 while let Poll::Ready(Some(msg)) = Pin::new(&mut pin.from_browser).poll_next(cx) {
577 match msg {
578 HandlerMessage::Command(cmd) => {
579 pin.submit_external_command(cmd, now)?;
580 }
581 HandlerMessage::FetchTargets(tx) => {
582 pin.submit_fetch_targets(tx, now);
583 }
584 HandlerMessage::CloseBrowser(tx) => {
585 pin.submit_close(tx, now);
586 }
587 HandlerMessage::CreatePage(params, tx) => {
588 pin.create_page(params, tx);
589 }
590 HandlerMessage::GetPages(tx) => {
591 let pages: Vec<_> = pin
592 .targets
593 .values_mut()
594 .filter(|p: &&mut Target| p.is_page())
595 .filter_map(|target| target.get_or_create_page())
596 .map(|page| Page::from(page.clone()))
597 .collect();
598 let _ = tx.send(pages);
599 }
600 HandlerMessage::InsertContext(ctx) => {
601 pin.default_browser_context = ctx.clone();
602 pin.browser_contexts.insert(ctx);
603 }
604 HandlerMessage::DisposeContext(ctx) => {
605 pin.browser_contexts.remove(&ctx);
606 pin.closing = true;
607 dispose = true;
608 }
609 HandlerMessage::GetPage(target_id, tx) => {
610 let page = pin
611 .targets
612 .get_mut(&target_id)
613 .and_then(|target| target.get_or_create_page())
614 .map(|page| Page::from(page.clone()));
615 let _ = tx.send(page);
616 }
617 HandlerMessage::AddEventListener(req) => {
618 pin.event_listeners.add_listener(req);
619 }
620 }
621 }
622
623 for n in (0..pin.target_ids.len()).rev() {
624 let target_id = pin.target_ids.swap_remove(n);
625
626 if let Some((id, mut target)) = pin.targets.remove_entry(&target_id) {
627 while let Some(event) = target.poll(cx, now) {
628 match event {
629 TargetEvent::Request(req) => {
630 let _ = pin.submit_internal_command(
631 target.target_id().clone(),
632 req,
633 now,
634 );
635 }
636 TargetEvent::Command(msg) => {
637 pin.on_target_message(&mut target, msg, now);
638 }
639 TargetEvent::NavigationRequest(id, req) => {
640 pin.submit_navigation(id, req, now);
641 }
642 TargetEvent::NavigationResult(res) => {
643 pin.on_navigation_lifecycle_completed(res)
644 }
645 }
646 }
647
648 target.event_listeners_mut().poll(cx);
650 pin.event_listeners_mut().poll(cx);
652
653 pin.targets.insert(id, target);
654 pin.target_ids.push(target_id);
655 }
656 }
657
658 let mut done = true;
659
660 while let Poll::Ready(Some(ev)) = Pin::new(&mut pin.conn).poll_next(cx) {
661 match ev {
662 Ok(Message::Response(resp)) => {
663 pin.on_response(resp);
664 if pin.closing {
665 return Poll::Ready(None);
667 }
668 }
669 Ok(Message::Event(ev)) => {
670 pin.on_event(ev);
671 }
672 Err(err) => {
673 tracing::error!("WS Connection error: {:?}", err);
674 match err {
675 CdpError::Ws(ref ws_error) => match ws_error {
676 Error::AlreadyClosed => {
677 pin.closing = true;
678 dispose = true;
679 break;
680 }
681 Error::Protocol(detail)
682 if detail == &ProtocolError::ResetWithoutClosingHandshake =>
683 {
684 pin.closing = true;
685 dispose = true;
686 break;
687 }
688 _ => {}
689 },
690 _ => {}
691 };
692 return Poll::Ready(Some(Err(err)));
693 }
694 }
695 done = false;
696 }
697
698 if pin.evict_command_timeout.poll_ready(cx) {
699 pin.evict_timed_out_commands(now);
701 }
702
703 if dispose {
704 return Poll::Ready(None);
705 }
706
707 if done {
708 return Poll::Pending;
710 }
711 }
712 }
713}
714
715#[derive(Debug, Clone)]
717pub struct HandlerConfig {
718 pub ignore_https_errors: bool,
720 pub viewport: Option<Viewport>,
722 pub context_ids: Vec<BrowserContextId>,
724 pub request_timeout: Duration,
726 pub request_intercept: bool,
728 pub cache_enabled: bool,
730 pub service_worker_enabled: bool,
732 pub ignore_visuals: bool,
734 pub ignore_stylesheets: bool,
736 pub ignore_javascript: bool,
738 pub ignore_analytics: bool,
740 pub ignore_ads: bool,
742 pub extra_headers: Option<std::collections::HashMap<String, String>>,
744 pub only_html: bool,
746 pub created_first_target: bool,
748 pub intercept_manager: NetworkInterceptManager,
750}
751
752impl Default for HandlerConfig {
753 fn default() -> Self {
754 Self {
755 ignore_https_errors: true,
756 viewport: Default::default(),
757 context_ids: Vec::new(),
758 request_timeout: Duration::from_millis(REQUEST_TIMEOUT),
759 request_intercept: false,
760 cache_enabled: true,
761 service_worker_enabled: true,
762 ignore_visuals: false,
763 ignore_stylesheets: false,
764 ignore_ads: false,
765 ignore_javascript: false,
766 ignore_analytics: true,
767 only_html: false,
768 extra_headers: Default::default(),
769 created_first_target: false,
770 intercept_manager: NetworkInterceptManager::Unknown,
771 }
772 }
773}
774
775#[derive(Debug)]
777pub struct NavigationInProgress<T> {
778 navigated: bool,
780 response: Option<Response>,
782 tx: OneshotSender<T>,
784}
785
786impl<T> NavigationInProgress<T> {
787 fn new(tx: OneshotSender<T>) -> Self {
788 Self {
789 navigated: false,
790 response: None,
791 tx,
792 }
793 }
794
795 fn set_response(&mut self, resp: Response) {
797 self.response = Some(resp);
798 }
799
800 fn set_navigated(&mut self) {
802 self.navigated = true;
803 }
804}
805
806#[derive(Debug)]
808enum NavigationRequest {
809 Navigate(NavigationInProgress<Result<Response>>),
811 }
813
814#[derive(Debug)]
817enum PendingRequest {
818 CreateTarget(OneshotSender<Result<Page>>),
821 GetTargets(OneshotSender<Result<Vec<TargetInfo>>>),
823 Navigate(NavigationId),
830 ExternalCommand(OneshotSender<Result<Response>>),
832 InternalCommand(TargetId),
835 CloseBrowser(OneshotSender<Result<CloseReturns>>),
837}
838
839#[derive(Debug)]
843pub(crate) enum HandlerMessage {
844 CreatePage(CreateTargetParams, OneshotSender<Result<Page>>),
845 FetchTargets(OneshotSender<Result<Vec<TargetInfo>>>),
846 InsertContext(BrowserContext),
847 DisposeContext(BrowserContext),
848 GetPages(OneshotSender<Vec<Page>>),
849 Command(CommandMessage),
850 GetPage(TargetId, OneshotSender<Option<Page>>),
851 AddEventListener(EventListenerRequest),
852 CloseBrowser(OneshotSender<Result<CloseReturns>>),
853}