chromiumoxide/handler/
target_message_future.rs1use pin_project_lite::pin_project;
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use tokio::sync::{mpsc, oneshot};
6
7use crate::handler::sender::PageSender;
8use crate::handler::target::TargetMessage;
9use crate::{error::Result, ArcHttpRequest};
10
11type SendFut = Pin<
12 Box<dyn Future<Output = std::result::Result<(), mpsc::error::SendError<TargetMessage>>> + Send>,
13>;
14
15pin_project! {
16 pub struct TargetMessageFuture<T> {
17 #[pin]
18 rx_request: oneshot::Receiver<T>,
19 target_sender: PageSender,
20 #[pin]
21 delay: tokio::time::Sleep,
22 request_timeout: std::time::Duration,
23 message: Option<TargetMessage>,
24 send_fut: Option<SendFut>,
25 }
26}
27
28impl<T> TargetMessageFuture<T> {
29 pub fn new(
30 target_sender: PageSender,
31 message: TargetMessage,
32 rx_request: oneshot::Receiver<T>,
33 request_timeout: std::time::Duration,
34 ) -> Self {
35 Self {
36 target_sender,
37 rx_request,
38 delay: tokio::time::sleep(request_timeout),
39 request_timeout,
40 message: Some(message),
41 send_fut: None,
42 }
43 }
44
45 pub(crate) fn wait(
52 target_sender: PageSender,
53 request_timeout: std::time::Duration,
54 make_msg: impl FnOnce(oneshot::Sender<ArcHttpRequest>) -> TargetMessage,
55 ) -> TargetMessageFuture<ArcHttpRequest> {
56 let (tx, rx_request) = oneshot::channel();
57 let message = make_msg(tx);
58 TargetMessageFuture::new(target_sender, message, rx_request, request_timeout)
59 }
60
61 pub fn wait_for_navigation(
66 target_sender: PageSender,
67 request_timeout: std::time::Duration,
68 ) -> TargetMessageFuture<ArcHttpRequest> {
69 Self::wait(
70 target_sender,
71 request_timeout,
72 TargetMessage::WaitForNavigation,
73 )
74 }
75
76 pub fn wait_for_dom_content_loaded(
81 target_sender: PageSender,
82 request_timeout: std::time::Duration,
83 ) -> TargetMessageFuture<ArcHttpRequest> {
84 Self::wait(
85 target_sender,
86 request_timeout,
87 TargetMessage::WaitForDomContentLoaded,
88 )
89 }
90
91 pub fn wait_for_network_idle(
97 target_sender: PageSender,
98 request_timeout: std::time::Duration,
99 ) -> TargetMessageFuture<ArcHttpRequest> {
100 Self::wait(
101 target_sender,
102 request_timeout,
103 TargetMessage::WaitForNetworkIdle,
104 )
105 }
106
107 pub fn reset_deadline(self: Pin<&mut Self>) {
111 let this = self.project();
112 let deadline = tokio::time::Instant::now() + *this.request_timeout;
113 this.delay.reset(deadline);
114 }
115
116 pub fn wait_for_network_almost_idle(
121 target_sender: PageSender,
122 request_timeout: std::time::Duration,
123 ) -> TargetMessageFuture<ArcHttpRequest> {
124 Self::wait(
125 target_sender,
126 request_timeout,
127 TargetMessage::WaitForNetworkAlmostIdle,
128 )
129 }
130}
131
132impl<T> Future for TargetMessageFuture<T> {
133 type Output = Result<T>;
134
135 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
136 let mut this = self.project();
137
138 if let Some(message) = this.message.take() {
140 match this.target_sender.try_send(message) {
141 Ok(()) => {
142 }
144 Err(mpsc::error::TrySendError::Full(msg)) => {
145 let sender = this.target_sender.clone();
149 *this.send_fut = Some(Box::pin(async move { sender.send(msg).await }));
150 }
151 Err(e) => return Poll::Ready(Err(e.into())),
152 }
153 }
154
155 if let Some(fut) = this.send_fut.as_mut() {
156 match fut.as_mut().poll(cx) {
157 Poll::Ready(Ok(())) => {
158 *this.send_fut = None;
159 }
161 Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())),
162 Poll::Pending => {
163 if this.delay.as_mut().poll(cx).is_ready() {
165 return Poll::Ready(Err(crate::error::CdpError::Timeout));
166 }
167 return Poll::Pending;
168 }
169 }
170 }
171
172 if this.delay.as_mut().poll(cx).is_ready() {
174 Poll::Ready(Err(crate::error::CdpError::Timeout))
175 } else {
176 this.rx_request.as_mut().poll(cx).map_err(Into::into)
177 }
178 }
179}