chromiumoxide/handler/
commandfuture.rs

1use futures::channel::{
2    mpsc,
3    oneshot::{self, channel as oneshot_channel},
4};
5use pin_project_lite::pin_project;
6use std::future::Future;
7use std::marker::PhantomData;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10
11use crate::cmd::{to_command_response, CommandMessage};
12use crate::error::Result;
13use crate::handler::target::TargetMessage;
14use chromiumoxide_cdp::cdp::browser_protocol::target::SessionId;
15use chromiumoxide_types::{Command, CommandResponse, MethodId, Response};
16
17pin_project! {
18
19    pub struct CommandFuture<T, M = Result<Response>> {
20        #[pin]
21        rx_command: oneshot::Receiver<M>,
22        #[pin]
23        target_sender: mpsc::Sender<TargetMessage>,
24        // We need delay to be pinned because it's a future
25        // and we need to be able to poll it
26        // it is used to timeout the command if page was closed while waiting for response
27        #[pin]
28        delay: futures_timer::Delay,
29
30        message: Option<TargetMessage>,
31
32        method: MethodId,
33
34        _marker: PhantomData<T>
35    }
36}
37
38impl<T: Command> CommandFuture<T> {
39    /// A new command future.
40    pub fn new(
41        cmd: T,
42        target_sender: mpsc::Sender<TargetMessage>,
43        session: Option<SessionId>,
44    ) -> Result<Self> {
45        let (tx, rx_command) = oneshot_channel::<Result<Response>>();
46        let method = cmd.identifier();
47
48        let message = Some(TargetMessage::Command(CommandMessage::with_session(
49            cmd, tx, session,
50        )?));
51
52        let delay = futures_timer::Delay::new(std::time::Duration::from_millis(
53            crate::handler::REQUEST_TIMEOUT,
54        ));
55
56        Ok(Self {
57            target_sender,
58            rx_command,
59            message,
60            delay,
61            method,
62            _marker: PhantomData,
63        })
64    }
65}
66
67impl<T> Future for CommandFuture<T>
68where
69    T: Command,
70{
71    type Output = Result<CommandResponse<T::Response>>;
72
73    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
74        let mut this = self.project();
75
76        if this.message.is_some() {
77            match this.target_sender.poll_ready(cx) {
78                Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
79                Poll::Ready(Ok(_)) => {
80                    let message = this.message.take().expect("existence checked above");
81                    this.target_sender.start_send(message)?;
82
83                    cx.waker().wake_by_ref();
84                    Poll::Pending
85                }
86                Poll::Pending => Poll::Pending,
87            }
88        } else if this.delay.poll(cx).is_ready() {
89            Poll::Ready(Err(crate::error::CdpError::Timeout))
90        } else {
91            match this.rx_command.as_mut().poll(cx) {
92                Poll::Ready(Ok(Ok(response))) => {
93                    Poll::Ready(to_command_response::<T>(response, this.method.clone()))
94                }
95                Poll::Ready(Ok(Err(e))) => Poll::Ready(Err(e)),
96                Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
97                Poll::Pending => Poll::Pending,
98            }
99        }
100    }
101}