1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
use pin_project_lite::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::{mpsc, oneshot};
use crate::handler::sender::PageSender;
use crate::handler::target::TargetMessage;
use crate::{error::Result, ArcHttpRequest};
type SendFut = Pin<
Box<dyn Future<Output = std::result::Result<(), mpsc::error::SendError<TargetMessage>>> + Send>,
>;
pin_project! {
pub struct TargetMessageFuture<T> {
#[pin]
rx_request: oneshot::Receiver<T>,
target_sender: PageSender,
#[pin]
delay: tokio::time::Sleep,
request_timeout: std::time::Duration,
message: Option<TargetMessage>,
send_fut: Option<SendFut>,
}
}
impl<T> TargetMessageFuture<T> {
pub fn new(
target_sender: PageSender,
message: TargetMessage,
rx_request: oneshot::Receiver<T>,
request_timeout: std::time::Duration,
) -> Self {
Self {
target_sender,
rx_request,
delay: tokio::time::sleep(request_timeout),
request_timeout,
message: Some(message),
send_fut: None,
}
}
/// Helper to build a `TargetMessageFuture<ArcHttpRequest>` for any
/// "wait" style target message (navigation, network idle, etc.).
///
/// The `make_msg` closure receives the `oneshot::Sender<ArcHttpRequest>` and
/// must wrap it into the appropriate `TargetMessage` variant
/// (e.g. `TargetMessage::WaitForNavigation(tx)`).
pub(crate) fn wait(
target_sender: PageSender,
request_timeout: std::time::Duration,
make_msg: impl FnOnce(oneshot::Sender<ArcHttpRequest>) -> TargetMessage,
) -> TargetMessageFuture<ArcHttpRequest> {
let (tx, rx_request) = oneshot::channel();
let message = make_msg(tx);
TargetMessageFuture::new(target_sender, message, rx_request, request_timeout)
}
/// Wait for the main-frame navigation to finish.
///
/// This triggers a `TargetMessage::WaitForNavigation` and resolves with
/// the final `ArcHttpRequest` associated with that navigation (if any).
pub fn wait_for_navigation(
target_sender: PageSender,
request_timeout: std::time::Duration,
) -> TargetMessageFuture<ArcHttpRequest> {
Self::wait(
target_sender,
request_timeout,
TargetMessage::WaitForNavigation,
)
}
/// Wait for `DOMContentLoaded` — fires once the HTML is fully parsed and
/// synchronous scripts have executed, but before subresources (images,
/// fonts, late XHRs) finish loading. Much faster than `wait_for_navigation`
/// through slow proxies.
pub fn wait_for_dom_content_loaded(
target_sender: PageSender,
request_timeout: std::time::Duration,
) -> TargetMessageFuture<ArcHttpRequest> {
Self::wait(
target_sender,
request_timeout,
TargetMessage::WaitForDomContentLoaded,
)
}
/// Wait for the `load` event — all subresources (images, fonts, XHRs)
/// have finished loading. Slower than `wait_for_navigation` /
/// `wait_for_dom_content_loaded` through proxies.
pub fn wait_for_load(
target_sender: PageSender,
request_timeout: std::time::Duration,
) -> TargetMessageFuture<ArcHttpRequest> {
Self::wait(target_sender, request_timeout, TargetMessage::WaitForLoad)
}
/// Wait until the main frame reaches `networkIdle`.
///
/// This triggers a `TargetMessage::WaitForNetworkIdle` and resolves with
/// the `ArcHttpRequest` associated with the navigation that led to the
/// idle state (if any).
pub fn wait_for_network_idle(
target_sender: PageSender,
request_timeout: std::time::Duration,
) -> TargetMessageFuture<ArcHttpRequest> {
Self::wait(
target_sender,
request_timeout,
TargetMessage::WaitForNetworkIdle,
)
}
/// Reset the internal timer deadline to `now + request_timeout`.
/// Used by `HttpFuture` to start the navigation timeout only after
/// the command phase completes, not from future creation time.
pub fn reset_deadline(self: Pin<&mut Self>) {
let this = self.project();
let deadline = tokio::time::Instant::now() + *this.request_timeout;
this.delay.reset(deadline);
}
/// Wait until the main frame reaches `networkAlmostIdle`.
///
/// This triggers a `TargetMessage::WaitForNetworkAlmostIdle` and resolves
/// with the `ArcHttpRequest` associated with that navigation (if any).
pub fn wait_for_network_almost_idle(
target_sender: PageSender,
request_timeout: std::time::Duration,
) -> TargetMessageFuture<ArcHttpRequest> {
Self::wait(
target_sender,
request_timeout,
TargetMessage::WaitForNetworkAlmostIdle,
)
}
}
impl<T> Future for TargetMessageFuture<T> {
type Output = Result<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
// Phase 1: deliver the message to the target channel.
if let Some(message) = this.message.take() {
match this.target_sender.try_send(message) {
Ok(()) => {
// Sent — fall through to phase 2.
}
Err(mpsc::error::TrySendError::Full(msg)) => {
// Channel full — park via async send with timeout enforcement.
// The send_fut path (below) polls both the send and the delay,
// so we fall through instead of returning Pending with a spurious wake.
let sender = this.target_sender.clone();
*this.send_fut = Some(Box::pin(async move { sender.send(msg).await }));
}
Err(e) => return Poll::Ready(Err(e.into())),
}
}
if let Some(fut) = this.send_fut.as_mut() {
match fut.as_mut().poll(cx) {
Poll::Ready(Ok(())) => {
*this.send_fut = None;
// Sent — fall through to phase 2.
}
Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())),
Poll::Pending => {
// Enforce timeout while waiting for channel capacity.
if this.delay.as_mut().poll(cx).is_ready() {
return Poll::Ready(Err(crate::error::CdpError::Timeout));
}
return Poll::Pending;
}
}
}
// Phase 2: wait for the result on the oneshot.
if this.delay.as_mut().poll(cx).is_ready() {
Poll::Ready(Err(crate::error::CdpError::Timeout))
} else {
this.rx_request.as_mut().poll(cx).map_err(Into::into)
}
}
}