chromiumoxide/handler/
target_message_future.rs1use pin_project_lite::pin_project;
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use tokio::sync::{mpsc, oneshot};
6
7use crate::handler::target::TargetMessage;
8use crate::{error::Result, ArcHttpRequest};
9
10type TargetSender = mpsc::Sender<TargetMessage>;
15
16pin_project! {
17 pub struct TargetMessageFuture<T> {
18 #[pin]
19 rx_request: oneshot::Receiver<T>,
20 target_sender: mpsc::Sender<TargetMessage>,
21 message: Option<TargetMessage>,
22 }
23}
24
25impl<T> TargetMessageFuture<T> {
26 pub fn new(
27 target_sender: TargetSender,
28 message: TargetMessage,
29 rx_request: oneshot::Receiver<T>,
30 ) -> Self {
31 Self {
32 target_sender,
33 rx_request,
34 message: Some(message),
35 }
36 }
37
38 pub(crate) fn wait(
45 target_sender: TargetSender,
46 make_msg: impl FnOnce(oneshot::Sender<ArcHttpRequest>) -> TargetMessage,
47 ) -> TargetMessageFuture<ArcHttpRequest> {
48 let (tx, rx_request) = oneshot::channel();
49 let message = make_msg(tx);
50 TargetMessageFuture::new(target_sender, message, rx_request)
51 }
52
53 pub fn wait_for_navigation(target_sender: TargetSender) -> TargetMessageFuture<ArcHttpRequest> {
58 Self::wait(target_sender, TargetMessage::WaitForNavigation)
59 }
60
61 pub fn wait_for_network_idle(
67 target_sender: TargetSender,
68 ) -> TargetMessageFuture<ArcHttpRequest> {
69 Self::wait(target_sender, TargetMessage::WaitForNetworkIdle)
70 }
71
72 pub fn wait_for_network_almost_idle(
77 target_sender: TargetSender,
78 ) -> TargetMessageFuture<ArcHttpRequest> {
79 Self::wait(target_sender, TargetMessage::WaitForNetworkAlmostIdle)
80 }
81}
82
83impl<T> Future for TargetMessageFuture<T> {
84 type Output = Result<T>;
85
86 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
87 let mut this = self.project();
88
89 if this.message.is_some() {
90 let message = this.message.take().expect("existence checked above");
91 match this.target_sender.try_send(message) {
92 Ok(()) => {
93 cx.waker().wake_by_ref();
94 Poll::Pending
95 }
96 Err(tokio::sync::mpsc::error::TrySendError::Full(msg)) => {
97 *this.message = Some(msg);
98 cx.waker().wake_by_ref();
99 Poll::Pending
100 }
101 Err(e) => Poll::Ready(Err(e.into())),
102 }
103 } else {
104 this.rx_request.as_mut().poll(cx).map_err(Into::into)
105 }
106 }
107}