1use std::collections::VecDeque;
2use std::iter::FromIterator;
3use std::time::{Duration, Instant};
4
5use futures::channel::oneshot::Sender as OneshotSender;
6use futures::task::Poll;
7use serde::Serialize;
8
9use chromiumoxide_cdp::cdp::browser_protocol::page::NavigateParams;
10use chromiumoxide_cdp::cdp::browser_protocol::target::SessionId;
11use chromiumoxide_types::{Command, CommandResponse, Method, MethodId, Request, Response};
12
13use crate::error::{CdpError, DeadlineExceeded, Result};
14use crate::handler::REQUEST_TIMEOUT;
15
16pub(crate) fn to_command_response<T: Command>(
18 resp: Response,
19 method: MethodId,
20) -> Result<CommandResponse<T::Response>> {
21 if let Some(res) = resp.result {
22 let result = serde_json::from_value(res)?;
23 Ok(CommandResponse {
24 id: resp.id,
25 result,
26 method,
27 })
28 } else if let Some(err) = resp.error {
29 Err(err.into())
30 } else {
31 Err(CdpError::NoResponse)
32 }
33}
34
35#[derive(Debug, Serialize)]
38pub struct CommandMessage<T = Result<Response>> {
39 pub method: MethodId,
40 #[serde(rename = "sessionId", skip_serializing_if = "Option::is_none")]
41 pub session_id: Option<SessionId>,
42 pub params: serde_json::Value,
43 #[serde(skip_serializing)]
44 pub sender: OneshotSender<T>,
45}
46
47impl<T> CommandMessage<T> {
48 pub fn new<C: Command>(cmd: C, sender: OneshotSender<T>) -> serde_json::Result<Self> {
49 Ok(Self {
50 method: cmd.identifier(),
51 session_id: None,
52 params: serde_json::to_value(cmd)?,
53 sender,
54 })
55 }
56
57 pub fn is_navigation(&self) -> bool {
59 self.method.as_ref() == NavigateParams::IDENTIFIER
60 }
61
62 pub fn with_session<C: Command>(
63 cmd: C,
64 sender: OneshotSender<T>,
65 session_id: Option<SessionId>,
66 ) -> serde_json::Result<Self> {
67 Ok(Self {
68 method: cmd.identifier(),
69 session_id,
70 params: serde_json::to_value(cmd)?,
71 sender,
72 })
73 }
74
75 pub fn split(self) -> (Request, OneshotSender<T>) {
76 (
77 Request {
78 method: self.method,
79 session_id: self.session_id.map(Into::into),
80 params: self.params,
81 },
82 self.sender,
83 )
84 }
85}
86
87impl Method for CommandMessage {
88 fn identifier(&self) -> MethodId {
89 self.method.clone()
90 }
91}
92
93#[derive(Debug, PartialEq)]
94pub struct CommandChain {
95 cmds: VecDeque<(MethodId, serde_json::Value)>,
97 waiting: Option<(MethodId, Instant)>,
99 timeout: Duration,
101}
102
103pub type NextCommand = Poll<Option<Result<(MethodId, serde_json::Value), DeadlineExceeded>>>;
104
105impl CommandChain {
106 pub fn new<I>(cmds: I, timeout: Duration) -> Self
110 where
111 I: IntoIterator<Item = (MethodId, serde_json::Value)>,
112 {
113 Self {
114 cmds: VecDeque::from_iter(cmds),
115 waiting: None,
116 timeout,
117 }
118 }
119
120 pub fn push_back(&mut self, method: MethodId, params: serde_json::Value) {
122 self.cmds.push_back((method, params))
123 }
124
125 pub fn received_response(&mut self, identifier: &str) -> bool {
128 if self.waiting.as_ref().map(|(c, _)| c.as_ref()) == Some(identifier) {
129 self.waiting.take();
130 true
131 } else {
132 false
133 }
134 }
135
136 pub fn poll(&mut self, now: Instant) -> NextCommand {
139 if let Some((cmd, deadline)) = self.waiting.as_ref() {
140 if now > *deadline {
141 tracing::error!(
142 "Command {:?} exceeded deadline by {:?}",
143 cmd,
144 now - *deadline
145 );
146 Poll::Ready(Some(Err(DeadlineExceeded::new(now, *deadline))))
147 } else {
148 Poll::Pending
149 }
150 } else if let Some((method, val)) = self.cmds.pop_front() {
151 self.waiting = Some((method.clone(), now + self.timeout));
152 Poll::Ready(Some(Ok((method, val))))
153 } else {
154 Poll::Ready(None)
155 }
156 }
157}
158
159impl Default for CommandChain {
160 fn default() -> Self {
161 Self {
162 cmds: Default::default(),
163 waiting: None,
164 timeout: Duration::from_millis(REQUEST_TIMEOUT),
165 }
166 }
167}