use pin_project_lite::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::{mpsc, oneshot};
use crate::handler::target::TargetMessage;
use crate::{error::Result, ArcHttpRequest};
type SendFut = Pin<Box<dyn Future<Output = std::result::Result<(), mpsc::error::SendError<TargetMessage>>> + Send>>;
type TargetSender = mpsc::Sender<TargetMessage>;
pin_project! {
pub struct TargetMessageFuture<T> {
#[pin]
rx_request: oneshot::Receiver<T>,
target_sender: mpsc::Sender<TargetMessage>,
#[pin]
delay: tokio::time::Sleep,
message: Option<TargetMessage>,
send_fut: Option<SendFut>,
}
}
impl<T> TargetMessageFuture<T> {
pub fn new(
target_sender: TargetSender,
message: TargetMessage,
rx_request: oneshot::Receiver<T>,
request_timeout: std::time::Duration,
) -> Self {
Self {
target_sender,
rx_request,
delay: tokio::time::sleep(request_timeout),
message: Some(message),
send_fut: None,
}
}
pub(crate) fn wait(
target_sender: TargetSender,
request_timeout: std::time::Duration,
make_msg: impl FnOnce(oneshot::Sender<ArcHttpRequest>) -> TargetMessage,
) -> TargetMessageFuture<ArcHttpRequest> {
let (tx, rx_request) = oneshot::channel();
let message = make_msg(tx);
TargetMessageFuture::new(target_sender, message, rx_request, request_timeout)
}
pub fn wait_for_navigation(
target_sender: TargetSender,
request_timeout: std::time::Duration,
) -> TargetMessageFuture<ArcHttpRequest> {
Self::wait(target_sender, request_timeout, TargetMessage::WaitForNavigation)
}
pub fn wait_for_network_idle(
target_sender: TargetSender,
request_timeout: std::time::Duration,
) -> TargetMessageFuture<ArcHttpRequest> {
Self::wait(target_sender, request_timeout, TargetMessage::WaitForNetworkIdle)
}
pub fn wait_for_network_almost_idle(
target_sender: TargetSender,
request_timeout: std::time::Duration,
) -> TargetMessageFuture<ArcHttpRequest> {
Self::wait(target_sender, request_timeout, TargetMessage::WaitForNetworkAlmostIdle)
}
}
impl<T> Future for TargetMessageFuture<T> {
type Output = Result<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
if let Some(message) = this.message.take() {
match this.target_sender.try_send(message) {
Ok(()) => {
}
Err(mpsc::error::TrySendError::Full(msg)) => {
let sender = this.target_sender.clone();
*this.send_fut =
Some(Box::pin(async move { sender.send(msg).await }));
cx.waker().wake_by_ref();
return Poll::Pending;
}
Err(e) => return Poll::Ready(Err(e.into())),
}
}
if let Some(fut) = this.send_fut.as_mut() {
match fut.as_mut().poll(cx) {
Poll::Ready(Ok(())) => {
*this.send_fut = None;
}
Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())),
Poll::Pending => {
if this.delay.as_mut().poll(cx).is_ready() {
return Poll::Ready(Err(crate::error::CdpError::Timeout));
}
return Poll::Pending;
}
}
}
if this.delay.as_mut().poll(cx).is_ready() {
Poll::Ready(Err(crate::error::CdpError::Timeout))
} else {
this.rx_request.as_mut().poll(cx).map_err(Into::into)
}
}
}