use std::collections::VecDeque;
use std::iter::FromIterator;
use std::time::{Duration, Instant};
use futures::channel::oneshot::Sender as OneshotSender;
use futures::task::Poll;
use serde::Serialize;
use chromiumoxide_cdp::cdp::browser_protocol::page::NavigateParams;
use chromiumoxide_cdp::cdp::browser_protocol::target::SessionId;
use chromiumoxide_types::{Command, CommandResponse, Method, MethodId, Request, Response};
use crate::error::{CdpError, DeadlineExceeded, Result};
use crate::handler::REQUEST_TIMEOUT;
pub(crate) fn to_command_response<T: Command>(
resp: Response,
method: MethodId,
) -> Result<CommandResponse<T::Response>> {
if let Some(res) = resp.result {
let result = serde_json::from_value(res)?;
Ok(CommandResponse {
id: resp.id,
result,
method,
})
} else if let Some(err) = resp.error {
Err(err.into())
} else {
Err(CdpError::NoResponse)
}
}
#[derive(Debug, Serialize)]
pub struct CommandMessage<T = Result<Response>> {
pub method: MethodId,
#[serde(rename = "sessionId", skip_serializing_if = "Option::is_none")]
pub session_id: Option<SessionId>,
pub params: serde_json::Value,
#[serde(skip_serializing)]
pub sender: OneshotSender<T>,
}
impl<T> CommandMessage<T> {
pub fn new<C: Command>(cmd: C, sender: OneshotSender<T>) -> serde_json::Result<Self> {
Ok(Self {
method: cmd.identifier(),
session_id: None,
params: serde_json::to_value(cmd)?,
sender,
})
}
pub fn is_navigation(&self) -> bool {
self.method.as_ref() == NavigateParams::IDENTIFIER
}
pub fn with_session<C: Command>(
cmd: C,
sender: OneshotSender<T>,
session_id: Option<SessionId>,
) -> serde_json::Result<Self> {
Ok(Self {
method: cmd.identifier(),
session_id,
params: serde_json::to_value(cmd)?,
sender,
})
}
pub fn split(self) -> (Request, OneshotSender<T>) {
(
Request {
method: self.method,
session_id: self.session_id.map(Into::into),
params: self.params,
},
self.sender,
)
}
}
impl Method for CommandMessage {
fn identifier(&self) -> MethodId {
self.method.clone()
}
}
#[derive(Debug, PartialEq)]
pub struct CommandChain {
cmds: VecDeque<(MethodId, serde_json::Value)>,
waiting: Option<(MethodId, Instant)>,
timeout: Duration,
}
pub type NextCommand = Poll<Option<Result<(MethodId, serde_json::Value), DeadlineExceeded>>>;
impl CommandChain {
pub fn new<I>(cmds: I, timeout: Duration) -> Self
where
I: IntoIterator<Item = (MethodId, serde_json::Value)>,
{
Self {
cmds: VecDeque::from_iter(cmds),
waiting: None,
timeout,
}
}
pub fn push_back(&mut self, method: MethodId, params: serde_json::Value) {
self.cmds.push_back((method, params))
}
pub fn received_response(&mut self, identifier: &str) -> bool {
if self.waiting.as_ref().map(|(c, _)| c.as_ref()) == Some(identifier) {
self.waiting.take();
true
} else {
false
}
}
pub fn poll(&mut self, now: Instant) -> NextCommand {
if let Some((cmd, deadline)) = self.waiting.as_ref() {
if now > *deadline {
tracing::error!(
"Command {:?} exceeded deadline by {:?}",
cmd,
now - *deadline
);
Poll::Ready(Some(Err(DeadlineExceeded::new(now, *deadline))))
} else {
Poll::Pending
}
} else if let Some((method, val)) = self.cmds.pop_front() {
self.waiting = Some((method.clone(), now + self.timeout));
Poll::Ready(Some(Ok((method, val))))
} else {
Poll::Ready(None)
}
}
}
impl Default for CommandChain {
fn default() -> Self {
Self {
cmds: Default::default(),
waiting: None,
timeout: Duration::from_millis(REQUEST_TIMEOUT),
}
}
}