1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
use pin_project_lite::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::{mpsc, oneshot};
use crate::handler::target::TargetMessage;
use crate::{error::Result, ArcHttpRequest};
type SendFut = Pin<
Box<dyn Future<Output = std::result::Result<(), mpsc::error::SendError<TargetMessage>>> + Send>,
>;
/// Convenience alias for sending messages to a Target task/actor.
///
/// This channel is typically owned by the Target event loop and accepts
/// `TargetMessage` commands to be processed serially.
type TargetSender = mpsc::Sender<TargetMessage>;
pin_project! {
pub struct TargetMessageFuture<T> {
#[pin]
rx_request: oneshot::Receiver<T>,
target_sender: mpsc::Sender<TargetMessage>,
#[pin]
delay: tokio::time::Sleep,
request_timeout: std::time::Duration,
message: Option<TargetMessage>,
send_fut: Option<SendFut>,
}
}
impl<T> TargetMessageFuture<T> {
pub fn new(
target_sender: TargetSender,
message: TargetMessage,
rx_request: oneshot::Receiver<T>,
request_timeout: std::time::Duration,
) -> Self {
Self {
target_sender,
rx_request,
delay: tokio::time::sleep(request_timeout),
request_timeout,
message: Some(message),
send_fut: None,
}
}
/// Helper to build a `TargetMessageFuture<ArcHttpRequest>` for any
/// "wait" style target message (navigation, network idle, etc.).
///
/// The `make_msg` closure receives the `oneshot::Sender<ArcHttpRequest>` and
/// must wrap it into the appropriate `TargetMessage` variant
/// (e.g. `TargetMessage::WaitForNavigation(tx)`).
pub(crate) fn wait(
target_sender: TargetSender,
request_timeout: std::time::Duration,
make_msg: impl FnOnce(oneshot::Sender<ArcHttpRequest>) -> TargetMessage,
) -> TargetMessageFuture<ArcHttpRequest> {
let (tx, rx_request) = oneshot::channel();
let message = make_msg(tx);
TargetMessageFuture::new(target_sender, message, rx_request, request_timeout)
}
/// Wait for the main-frame navigation to finish.
///
/// This triggers a `TargetMessage::WaitForNavigation` and resolves with
/// the final `ArcHttpRequest` associated with that navigation (if any).
pub fn wait_for_navigation(
target_sender: TargetSender,
request_timeout: std::time::Duration,
) -> TargetMessageFuture<ArcHttpRequest> {
Self::wait(
target_sender,
request_timeout,
TargetMessage::WaitForNavigation,
)
}
/// Wait until the main frame reaches `networkIdle`.
///
/// This triggers a `TargetMessage::WaitForNetworkIdle` and resolves with
/// the `ArcHttpRequest` associated with the navigation that led to the
/// idle state (if any).
pub fn wait_for_network_idle(
target_sender: TargetSender,
request_timeout: std::time::Duration,
) -> TargetMessageFuture<ArcHttpRequest> {
Self::wait(
target_sender,
request_timeout,
TargetMessage::WaitForNetworkIdle,
)
}
/// Reset the internal timer deadline to `now + request_timeout`.
/// Used by `HttpFuture` to start the navigation timeout only after
/// the command phase completes, not from future creation time.
pub fn reset_deadline(self: Pin<&mut Self>) {
let this = self.project();
let deadline = tokio::time::Instant::now() + *this.request_timeout;
this.delay.reset(deadline);
}
/// Wait until the main frame reaches `networkAlmostIdle`.
///
/// This triggers a `TargetMessage::WaitForNetworkAlmostIdle` and resolves
/// with the `ArcHttpRequest` associated with that navigation (if any).
pub fn wait_for_network_almost_idle(
target_sender: TargetSender,
request_timeout: std::time::Duration,
) -> TargetMessageFuture<ArcHttpRequest> {
Self::wait(
target_sender,
request_timeout,
TargetMessage::WaitForNetworkAlmostIdle,
)
}
}
impl<T> Future for TargetMessageFuture<T> {
type Output = Result<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
// Phase 1: deliver the message to the target channel.
if let Some(message) = this.message.take() {
match this.target_sender.try_send(message) {
Ok(()) => {
// Sent — fall through to phase 2.
}
Err(mpsc::error::TrySendError::Full(msg)) => {
// Channel full — park via async send with timeout enforcement.
// The send_fut path (below) polls both the send and the delay,
// so we fall through instead of returning Pending with a spurious wake.
let sender = this.target_sender.clone();
*this.send_fut = Some(Box::pin(async move { sender.send(msg).await }));
}
Err(e) => return Poll::Ready(Err(e.into())),
}
}
if let Some(fut) = this.send_fut.as_mut() {
match fut.as_mut().poll(cx) {
Poll::Ready(Ok(())) => {
*this.send_fut = None;
// Sent — fall through to phase 2.
}
Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())),
Poll::Pending => {
// Enforce timeout while waiting for channel capacity.
if this.delay.as_mut().poll(cx).is_ready() {
return Poll::Ready(Err(crate::error::CdpError::Timeout));
}
return Poll::Pending;
}
}
}
// Phase 2: wait for the result on the oneshot.
if this.delay.as_mut().poll(cx).is_ready() {
Poll::Ready(Err(crate::error::CdpError::Timeout))
} else {
this.rx_request.as_mut().poll(cx).map_err(Into::into)
}
}
}