Skip to main content

chromiumoxide/handler/
commandfuture.rs

1use pin_project_lite::pin_project;
2use std::future::Future;
3use std::marker::PhantomData;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6use tokio::sync::{mpsc, oneshot};
7
8use crate::cmd::{to_command_response, CommandMessage};
9use crate::error::Result;
10use crate::handler::sender::PageSender;
11use crate::handler::target::TargetMessage;
12use chromiumoxide_cdp::cdp::browser_protocol::target::SessionId;
13use chromiumoxide_types::{Command, CommandResponse, MethodId, Response};
14
15pin_project! {
16
17    pub struct CommandFuture<T, M = Result<Response>> {
18        #[pin]
19        rx_command: oneshot::Receiver<M>,
20        target_sender: PageSender,
21        #[pin]
22        delay: tokio::time::Sleep,
23
24        message: Option<TargetMessage>,
25        send_fut: Option<Pin<Box<dyn Future<Output = std::result::Result<(), mpsc::error::SendError<TargetMessage>>> + Send>>>,
26
27        method: MethodId,
28
29        _marker: PhantomData<T>
30    }
31}
32
33impl<T: Command> CommandFuture<T> {
34    /// A new command future.
35    pub fn new(
36        cmd: T,
37        target_sender: PageSender,
38        session: Option<SessionId>,
39        request_timeout: std::time::Duration,
40    ) -> Result<Self> {
41        let (tx, rx_command) = oneshot::channel::<Result<Response>>();
42        let method = cmd.identifier();
43
44        let message = Some(TargetMessage::Command(CommandMessage::with_session(
45            cmd, tx, session,
46        )?));
47
48        let delay = tokio::time::sleep(request_timeout);
49
50        Ok(Self {
51            target_sender,
52            rx_command,
53            message,
54            send_fut: None,
55            delay,
56            method,
57            _marker: PhantomData,
58        })
59    }
60}
61
62impl<T> Future for CommandFuture<T>
63where
64    T: Command,
65{
66    type Output = Result<CommandResponse<T::Response>>;
67
68    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
69        let mut this = self.project();
70
71        // Phase 1: deliver the command to the target channel.
72        if let Some(message) = this.message.take() {
73            match this.target_sender.try_send(message) {
74                Ok(()) => {
75                    // Sent — fall through to phase 2.
76                }
77                Err(mpsc::error::TrySendError::Full(msg)) => {
78                    // Channel full — park via async send with timeout enforcement.
79                    // The send_fut path (below) polls both the send and the delay,
80                    // so we fall through instead of returning Pending with a spurious wake.
81                    let sender = this.target_sender.clone();
82                    *this.send_fut = Some(Box::pin(async move { sender.send(msg).await }));
83                }
84                Err(e) => return Poll::Ready(Err(e.into())),
85            }
86        }
87
88        if let Some(fut) = this.send_fut.as_mut() {
89            match fut.as_mut().poll(cx) {
90                Poll::Ready(Ok(())) => {
91                    *this.send_fut = None;
92                    // Sent — fall through to phase 2.
93                }
94                Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())),
95                Poll::Pending => {
96                    // Enforce timeout while waiting for channel capacity.
97                    if this.delay.as_mut().poll(cx).is_ready() {
98                        return Poll::Ready(Err(crate::error::CdpError::Timeout));
99                    }
100                    return Poll::Pending;
101                }
102            }
103        }
104
105        // Phase 2: wait for the response on the oneshot.
106        if this.delay.as_mut().poll(cx).is_ready() {
107            Poll::Ready(Err(crate::error::CdpError::Timeout))
108        } else {
109            match this.rx_command.as_mut().poll(cx) {
110                Poll::Ready(Ok(Ok(response))) => {
111                    Poll::Ready(to_command_response::<T>(response, this.method.clone()))
112                }
113                Poll::Ready(Ok(Err(e))) => Poll::Ready(Err(e)),
114                Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
115                Poll::Pending => Poll::Pending,
116            }
117        }
118    }
119}