Skip to main content

chromiumoxide/handler/
target_message_future.rs

1use pin_project_lite::pin_project;
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use tokio::sync::{mpsc, oneshot};
6
7use crate::handler::sender::PageSender;
8use crate::handler::target::TargetMessage;
9use crate::{error::Result, ArcHttpRequest};
10
11type SendFut = Pin<
12    Box<dyn Future<Output = std::result::Result<(), mpsc::error::SendError<TargetMessage>>> + Send>,
13>;
14
15pin_project! {
16    pub struct TargetMessageFuture<T> {
17        #[pin]
18        rx_request: oneshot::Receiver<T>,
19        target_sender: PageSender,
20        #[pin]
21        delay: tokio::time::Sleep,
22        request_timeout: std::time::Duration,
23        message: Option<TargetMessage>,
24        send_fut: Option<SendFut>,
25    }
26}
27
28impl<T> TargetMessageFuture<T> {
29    pub fn new(
30        target_sender: PageSender,
31        message: TargetMessage,
32        rx_request: oneshot::Receiver<T>,
33        request_timeout: std::time::Duration,
34    ) -> Self {
35        Self {
36            target_sender,
37            rx_request,
38            delay: tokio::time::sleep(request_timeout),
39            request_timeout,
40            message: Some(message),
41            send_fut: None,
42        }
43    }
44
45    /// Helper to build a `TargetMessageFuture<ArcHttpRequest>` for any
46    /// "wait" style target message (navigation, network idle, etc.).
47    ///
48    /// The `make_msg` closure receives the `oneshot::Sender<ArcHttpRequest>` and
49    /// must wrap it into the appropriate `TargetMessage` variant
50    /// (e.g. `TargetMessage::WaitForNavigation(tx)`).
51    pub(crate) fn wait(
52        target_sender: PageSender,
53        request_timeout: std::time::Duration,
54        make_msg: impl FnOnce(oneshot::Sender<ArcHttpRequest>) -> TargetMessage,
55    ) -> TargetMessageFuture<ArcHttpRequest> {
56        let (tx, rx_request) = oneshot::channel();
57        let message = make_msg(tx);
58        TargetMessageFuture::new(target_sender, message, rx_request, request_timeout)
59    }
60
61    /// Wait for the main-frame navigation to finish.
62    ///
63    /// This triggers a `TargetMessage::WaitForNavigation` and resolves with
64    /// the final `ArcHttpRequest` associated with that navigation (if any).
65    pub fn wait_for_navigation(
66        target_sender: PageSender,
67        request_timeout: std::time::Duration,
68    ) -> TargetMessageFuture<ArcHttpRequest> {
69        Self::wait(
70            target_sender,
71            request_timeout,
72            TargetMessage::WaitForNavigation,
73        )
74    }
75
76    /// Wait for `DOMContentLoaded` — fires once the HTML is fully parsed and
77    /// synchronous scripts have executed, but before subresources (images,
78    /// fonts, late XHRs) finish loading.  Much faster than `wait_for_navigation`
79    /// through slow proxies.
80    pub fn wait_for_dom_content_loaded(
81        target_sender: PageSender,
82        request_timeout: std::time::Duration,
83    ) -> TargetMessageFuture<ArcHttpRequest> {
84        Self::wait(
85            target_sender,
86            request_timeout,
87            TargetMessage::WaitForDomContentLoaded,
88        )
89    }
90
91    /// Wait for the `load` event — all subresources (images, fonts, XHRs)
92    /// have finished loading. Slower than `wait_for_navigation` /
93    /// `wait_for_dom_content_loaded` through proxies.
94    pub fn wait_for_load(
95        target_sender: PageSender,
96        request_timeout: std::time::Duration,
97    ) -> TargetMessageFuture<ArcHttpRequest> {
98        Self::wait(target_sender, request_timeout, TargetMessage::WaitForLoad)
99    }
100
101    /// Wait until the main frame reaches `networkIdle`.
102    ///
103    /// This triggers a `TargetMessage::WaitForNetworkIdle` and resolves with
104    /// the `ArcHttpRequest` associated with the navigation that led to the
105    /// idle state (if any).
106    pub fn wait_for_network_idle(
107        target_sender: PageSender,
108        request_timeout: std::time::Duration,
109    ) -> TargetMessageFuture<ArcHttpRequest> {
110        Self::wait(
111            target_sender,
112            request_timeout,
113            TargetMessage::WaitForNetworkIdle,
114        )
115    }
116
117    /// Reset the internal timer deadline to `now + request_timeout`.
118    /// Used by `HttpFuture` to start the navigation timeout only after
119    /// the command phase completes, not from future creation time.
120    pub fn reset_deadline(self: Pin<&mut Self>) {
121        let this = self.project();
122        let deadline = tokio::time::Instant::now() + *this.request_timeout;
123        this.delay.reset(deadline);
124    }
125
126    /// Wait until the main frame reaches `networkAlmostIdle`.
127    ///
128    /// This triggers a `TargetMessage::WaitForNetworkAlmostIdle` and resolves
129    /// with the `ArcHttpRequest` associated with that navigation (if any).
130    pub fn wait_for_network_almost_idle(
131        target_sender: PageSender,
132        request_timeout: std::time::Duration,
133    ) -> TargetMessageFuture<ArcHttpRequest> {
134        Self::wait(
135            target_sender,
136            request_timeout,
137            TargetMessage::WaitForNetworkAlmostIdle,
138        )
139    }
140}
141
142impl<T> Future for TargetMessageFuture<T> {
143    type Output = Result<T>;
144
145    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
146        let mut this = self.project();
147
148        // Phase 1: deliver the message to the target channel.
149        if let Some(message) = this.message.take() {
150            match this.target_sender.try_send(message) {
151                Ok(()) => {
152                    // Sent — fall through to phase 2.
153                }
154                Err(mpsc::error::TrySendError::Full(msg)) => {
155                    // Channel full — park via async send with timeout enforcement.
156                    // The send_fut path (below) polls both the send and the delay,
157                    // so we fall through instead of returning Pending with a spurious wake.
158                    let sender = this.target_sender.clone();
159                    *this.send_fut = Some(Box::pin(async move { sender.send(msg).await }));
160                }
161                Err(e) => return Poll::Ready(Err(e.into())),
162            }
163        }
164
165        if let Some(fut) = this.send_fut.as_mut() {
166            match fut.as_mut().poll(cx) {
167                Poll::Ready(Ok(())) => {
168                    *this.send_fut = None;
169                    // Sent — fall through to phase 2.
170                }
171                Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())),
172                Poll::Pending => {
173                    // Enforce timeout while waiting for channel capacity.
174                    if this.delay.as_mut().poll(cx).is_ready() {
175                        return Poll::Ready(Err(crate::error::CdpError::Timeout));
176                    }
177                    return Poll::Pending;
178                }
179            }
180        }
181
182        // Phase 2: wait for the result on the oneshot.
183        if this.delay.as_mut().poll(cx).is_ready() {
184            Poll::Ready(Err(crate::error::CdpError::Timeout))
185        } else {
186            this.rx_request.as_mut().poll(cx).map_err(Into::into)
187        }
188    }
189}