chromiumoxide/
cmd.rs

1use std::collections::VecDeque;
2use std::iter::FromIterator;
3use std::time::{Duration, Instant};
4
5use futures::channel::oneshot::Sender as OneshotSender;
6use futures::task::Poll;
7use serde::Serialize;
8
9use chromiumoxide_cdp::cdp::browser_protocol::page::NavigateParams;
10use chromiumoxide_cdp::cdp::browser_protocol::target::SessionId;
11use chromiumoxide_types::{Command, CommandResponse, Method, MethodId, Request, Response};
12
13use crate::error::{CdpError, DeadlineExceeded, Result};
14use crate::handler::REQUEST_TIMEOUT;
15
16/// Deserialize a response
17pub(crate) fn to_command_response<T: Command>(
18    resp: Response,
19    method: MethodId,
20) -> Result<CommandResponse<T::Response>> {
21    if let Some(res) = resp.result {
22        let result = serde_json::from_value(res)?;
23        Ok(CommandResponse {
24            id: resp.id,
25            result,
26            method,
27        })
28    } else if let Some(err) = resp.error {
29        Err(err.into())
30    } else {
31        Err(CdpError::NoResponse)
32    }
33}
34
35/// Messages used internally to communicate with the connection, which is
36/// executed in the the background task.
37#[derive(Debug, Serialize)]
38pub struct CommandMessage<T = Result<Response>> {
39    pub method: MethodId,
40    #[serde(rename = "sessionId", skip_serializing_if = "Option::is_none")]
41    pub session_id: Option<SessionId>,
42    pub params: serde_json::Value,
43    #[serde(skip_serializing)]
44    pub sender: OneshotSender<T>,
45}
46
47impl<T> CommandMessage<T> {
48    pub fn new<C: Command>(cmd: C, sender: OneshotSender<T>) -> serde_json::Result<Self> {
49        Ok(Self {
50            method: cmd.identifier(),
51            session_id: None,
52            params: serde_json::to_value(cmd)?,
53            sender,
54        })
55    }
56
57    /// Whether this command is a navigation
58    pub fn is_navigation(&self) -> bool {
59        self.method.as_ref() == NavigateParams::IDENTIFIER
60    }
61
62    pub fn with_session<C: Command>(
63        cmd: C,
64        sender: OneshotSender<T>,
65        session_id: Option<SessionId>,
66    ) -> serde_json::Result<Self> {
67        Ok(Self {
68            method: cmd.identifier(),
69            session_id,
70            params: serde_json::to_value(cmd)?,
71            sender,
72        })
73    }
74
75    pub fn split(self) -> (Request, OneshotSender<T>) {
76        (
77            Request {
78                method: self.method,
79                session_id: self.session_id.map(Into::into),
80                params: self.params,
81            },
82            self.sender,
83        )
84    }
85}
86
87impl Method for CommandMessage {
88    fn identifier(&self) -> MethodId {
89        self.method.clone()
90    }
91}
92
93#[derive(Debug, PartialEq)]
94pub struct CommandChain {
95    /// The commands to process: (method identifier, params)
96    cmds: VecDeque<(MethodId, serde_json::Value)>,
97    /// The last issued command we currently waiting for its completion
98    waiting: Option<(MethodId, Instant)>,
99    /// The window a response after issuing a request must arrive
100    timeout: Duration,
101}
102
103pub type NextCommand = Poll<Option<Result<(MethodId, serde_json::Value), DeadlineExceeded>>>;
104
105impl CommandChain {
106    /// Creates a new `CommandChain` from an `Iterator`.
107    ///
108    /// The order of the commands corresponds to the iterator's
109    pub fn new<I>(cmds: I, timeout: Duration) -> Self
110    where
111        I: IntoIterator<Item = (MethodId, serde_json::Value)>,
112    {
113        Self {
114            cmds: VecDeque::from_iter(cmds),
115            waiting: None,
116            timeout,
117        }
118    }
119
120    /// queue in another request
121    pub fn push_back(&mut self, method: MethodId, params: serde_json::Value) {
122        self.cmds.push_back((method, params))
123    }
124
125    /// Removes the waiting state if the identifier matches that of the last
126    /// issued command
127    pub fn received_response(&mut self, identifier: &str) -> bool {
128        if self.waiting.as_ref().map(|(c, _)| c.as_ref()) == Some(identifier) {
129            self.waiting.take();
130            true
131        } else {
132            false
133        }
134    }
135
136    /// Return the next command to process or `None` if done.
137    /// If the response timeout an error is returned instead
138    pub fn poll(&mut self, now: Instant) -> NextCommand {
139        if let Some((cmd, deadline)) = self.waiting.as_ref() {
140            if now > *deadline {
141                tracing::error!(
142                    "Command {:?} exceeded deadline by {:?}",
143                    cmd,
144                    now - *deadline
145                );
146                Poll::Ready(Some(Err(DeadlineExceeded::new(now, *deadline))))
147            } else {
148                Poll::Pending
149            }
150        } else if let Some((method, val)) = self.cmds.pop_front() {
151            self.waiting = Some((method.clone(), now + self.timeout));
152            Poll::Ready(Some(Ok((method, val))))
153        } else {
154            Poll::Ready(None)
155        }
156    }
157}
158
159impl Default for CommandChain {
160    fn default() -> Self {
161        Self {
162            cmds: Default::default(),
163            waiting: None,
164            timeout: Duration::from_millis(REQUEST_TIMEOUT),
165        }
166    }
167}