chromiumoxide/handler/
commandfuture.rs1use futures::channel::{
2 mpsc,
3 oneshot::{self, channel as oneshot_channel},
4};
5use pin_project_lite::pin_project;
6use std::future::Future;
7use std::marker::PhantomData;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10
11use crate::cmd::{to_command_response, CommandMessage};
12use crate::error::Result;
13use crate::handler::target::TargetMessage;
14use chromiumoxide_cdp::cdp::browser_protocol::target::SessionId;
15use chromiumoxide_types::{Command, CommandResponse, MethodId, Response};
16
17pin_project! {
18 pub struct CommandFuture<T, M = Result<Response>> {
19 #[pin]
20 rx_command: oneshot::Receiver<M>,
21 #[pin]
22 target_sender: mpsc::Sender<TargetMessage>,
23 #[pin]
27 delay: futures_timer::Delay,
28
29 message: Option<TargetMessage>,
30
31 method: MethodId,
32
33 _marker: PhantomData<T>
34 }
35}
36
37impl<T: Command> CommandFuture<T> {
38 pub fn new(
39 cmd: T,
40 target_sender: mpsc::Sender<TargetMessage>,
41 session: Option<SessionId>,
42 ) -> Result<Self> {
43 let (tx, rx_command) = oneshot_channel::<Result<Response>>();
44 let method = cmd.identifier();
45
46 let message = Some(TargetMessage::Command(CommandMessage::with_session(
47 cmd, tx, session,
48 )?));
49
50 let delay = futures_timer::Delay::new(std::time::Duration::from_millis(
51 crate::handler::REQUEST_TIMEOUT,
52 ));
53
54 Ok(Self {
55 target_sender,
56 rx_command,
57 message,
58 delay,
59 method,
60 _marker: PhantomData,
61 })
62 }
63}
64
65impl<T> Future for CommandFuture<T>
66where
67 T: Command,
68{
69 type Output = Result<CommandResponse<T::Response>>;
70
71 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
72 let mut this = self.project();
73
74 if this.message.is_some() {
75 match this.target_sender.poll_ready(cx) {
76 Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
77 Poll::Ready(Ok(_)) => {
78 let message = this.message.take().expect("existence checked above");
79 this.target_sender.start_send(message)?;
80
81 cx.waker().wake_by_ref();
82 Poll::Pending
83 }
84 Poll::Pending => Poll::Pending,
85 }
86 } else if this.delay.poll(cx).is_ready() {
87 Poll::Ready(Err(crate::error::CdpError::Timeout))
88 } else {
89 match this.rx_command.as_mut().poll(cx) {
90 Poll::Ready(Ok(Ok(response))) => {
91 Poll::Ready(to_command_response::<T>(response, this.method.clone()))
92 }
93 Poll::Ready(Ok(Err(e))) => Poll::Ready(Err(e)),
94 Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
95 Poll::Pending => Poll::Pending,
96 }
97 }
98 }
99}