Skip to main content

chromiumoxide/handler/
commandfuture.rs

1use pin_project_lite::pin_project;
2use std::future::Future;
3use std::marker::PhantomData;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6use tokio::sync::{mpsc, oneshot};
7
8use crate::cmd::{to_command_response, CommandMessage};
9use crate::error::Result;
10use crate::handler::target::TargetMessage;
11use chromiumoxide_cdp::cdp::browser_protocol::target::SessionId;
12use chromiumoxide_types::{Command, CommandResponse, MethodId, Response};
13
14pin_project! {
15
16    pub struct CommandFuture<T, M = Result<Response>> {
17        #[pin]
18        rx_command: oneshot::Receiver<M>,
19        target_sender: mpsc::Sender<TargetMessage>,
20        #[pin]
21        delay: tokio::time::Sleep,
22
23        message: Option<TargetMessage>,
24
25        method: MethodId,
26
27        _marker: PhantomData<T>
28    }
29}
30
31impl<T: Command> CommandFuture<T> {
32    /// A new command future.
33    pub fn new(
34        cmd: T,
35        target_sender: mpsc::Sender<TargetMessage>,
36        session: Option<SessionId>,
37        request_timeout: std::time::Duration,
38    ) -> Result<Self> {
39        let (tx, rx_command) = oneshot::channel::<Result<Response>>();
40        let method = cmd.identifier();
41
42        let message = Some(TargetMessage::Command(CommandMessage::with_session(
43            cmd, tx, session,
44        )?));
45
46        let delay = tokio::time::sleep(request_timeout);
47
48        Ok(Self {
49            target_sender,
50            rx_command,
51            message,
52            delay,
53            method,
54            _marker: PhantomData,
55        })
56    }
57}
58
59impl<T> Future for CommandFuture<T>
60where
61    T: Command,
62{
63    type Output = Result<CommandResponse<T::Response>>;
64
65    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
66        let mut this = self.project();
67
68        if this.message.is_some() {
69            let message = this.message.take().expect("existence checked above");
70            match this.target_sender.try_send(message) {
71                Ok(()) => {
72                    cx.waker().wake_by_ref();
73                    Poll::Pending
74                }
75                Err(tokio::sync::mpsc::error::TrySendError::Full(msg)) => {
76                    *this.message = Some(msg);
77                    cx.waker().wake_by_ref();
78                    Poll::Pending
79                }
80                Err(e) => Poll::Ready(Err(e.into())),
81            }
82        } else if this.delay.poll(cx).is_ready() {
83            Poll::Ready(Err(crate::error::CdpError::Timeout))
84        } else {
85            match this.rx_command.as_mut().poll(cx) {
86                Poll::Ready(Ok(Ok(response))) => {
87                    Poll::Ready(to_command_response::<T>(response, this.method.clone()))
88                }
89                Poll::Ready(Ok(Err(e))) => Poll::Ready(Err(e)),
90                Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
91                Poll::Pending => Poll::Pending,
92            }
93        }
94    }
95}