use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use viewpoint_cdp::CdpConnection;
use viewpoint_cdp::protocol::fetch::{
ContinueRequestParams, ErrorReason, FailRequestParams, FulfillRequestParams,
GetResponseBodyParams, GetResponseBodyResult, HeaderEntry,
};
use super::request::Request;
use super::route_builders::{ContinueBuilder, FulfillBuilder};
use super::route_fetch::{FetchBuilder, FetchedResponse};
use super::types::AbortError;
use crate::error::NetworkError;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RouteAction {
Handled,
Fallback,
}
pub type RouteHandler = Box<
dyn Fn(Route) -> Pin<Box<dyn Future<Output = Result<(), NetworkError>> + Send>> + Send + Sync,
>;
#[derive(Debug, Clone)]
pub struct Route {
request: Request,
request_id: String,
connection: Arc<CdpConnection>,
session_id: String,
handled: Arc<Mutex<bool>>,
response_status: Option<u16>,
response_headers: Option<Vec<HeaderEntry>>,
}
impl Route {
pub(crate) fn new(
request: Request,
request_id: String,
connection: Arc<CdpConnection>,
session_id: String,
response_status: Option<u16>,
response_headers: Option<Vec<HeaderEntry>>,
) -> Self {
Self {
request,
request_id,
connection,
session_id,
handled: Arc::new(Mutex::new(false)),
response_status,
response_headers,
}
}
pub fn request(&self) -> &Request {
&self.request
}
pub(super) fn request_id(&self) -> &str {
&self.request_id
}
pub(super) fn connection(&self) -> &Arc<CdpConnection> {
&self.connection
}
pub(super) fn session_id(&self) -> &str {
&self.session_id
}
pub async fn is_handled(&self) -> bool {
*self.handled.lock().await
}
pub fn is_response_stage(&self) -> bool {
self.response_status.is_some()
}
pub fn response_status(&self) -> Option<u16> {
self.response_status
}
pub fn response_headers(&self) -> Option<&[HeaderEntry]> {
self.response_headers.as_deref()
}
pub async fn response_body(&self) -> Result<Option<Vec<u8>>, NetworkError> {
if !self.is_response_stage() {
return Ok(None);
}
self.get_response_body(&self.request_id).await
}
pub fn fulfill(&self) -> FulfillBuilder<'_> {
FulfillBuilder::new(self)
}
pub fn continue_(&self) -> ContinueBuilder<'_> {
ContinueBuilder::new(self)
}
pub async fn abort(&self) -> Result<(), NetworkError> {
self.abort_with(AbortError::Failed).await
}
pub async fn abort_with(&self, error: AbortError) -> Result<(), NetworkError> {
{
let mut handled = self.handled.lock().await;
if *handled {
return Err(NetworkError::AlreadyHandled);
}
*handled = true;
}
let error_reason = match error {
AbortError::Aborted => ErrorReason::Aborted,
AbortError::AccessDenied => ErrorReason::AccessDenied,
AbortError::AddressUnreachable => ErrorReason::AddressUnreachable,
AbortError::BlockedByClient => ErrorReason::BlockedByClient,
AbortError::BlockedByResponse => ErrorReason::BlockedByResponse,
AbortError::ConnectionAborted => ErrorReason::ConnectionAborted,
AbortError::ConnectionClosed => ErrorReason::ConnectionClosed,
AbortError::ConnectionFailed => ErrorReason::ConnectionFailed,
AbortError::ConnectionRefused => ErrorReason::ConnectionRefused,
AbortError::ConnectionReset => ErrorReason::ConnectionReset,
AbortError::InternetDisconnected => ErrorReason::InternetDisconnected,
AbortError::NameNotResolved => ErrorReason::NameNotResolved,
AbortError::TimedOut => ErrorReason::TimedOut,
AbortError::Failed => ErrorReason::Failed,
};
let params = FailRequestParams {
request_id: self.request_id.clone(),
error_reason,
};
self.connection
.send_command::<_, serde_json::Value>(
"Fetch.failRequest",
Some(params),
Some(&self.session_id),
)
.await
.map_err(NetworkError::from)?;
Ok(())
}
pub async fn fallback(&self) -> Result<(), NetworkError> {
let params = ContinueRequestParams {
request_id: self.request_id.clone(),
url: None,
method: None,
post_data: None,
headers: None,
intercept_response: None,
};
self.connection
.send_command::<_, serde_json::Value>(
"Fetch.continueRequest",
Some(params),
Some(&self.session_id),
)
.await
.map_err(NetworkError::from)?;
Ok(())
}
pub fn fetch(&self) -> FetchBuilder<'_> {
FetchBuilder::new(self)
}
pub async fn fetch_with_timeout(
&self,
timeout: Duration,
) -> Result<FetchedResponse<'_>, NetworkError> {
self.fetch().timeout(timeout).send().await
}
pub(super) async fn send_fulfill(
&self,
params: FulfillRequestParams,
) -> Result<(), NetworkError> {
{
let mut handled = self.handled.lock().await;
if *handled {
return Err(NetworkError::AlreadyHandled);
}
*handled = true;
}
self.connection
.send_command::<_, serde_json::Value>(
"Fetch.fulfillRequest",
Some(params),
Some(&self.session_id),
)
.await
.map_err(NetworkError::from)?;
Ok(())
}
pub(super) async fn send_continue(
&self,
params: ContinueRequestParams,
) -> Result<(), NetworkError> {
{
let mut handled = self.handled.lock().await;
if *handled {
return Err(NetworkError::AlreadyHandled);
}
*handled = true;
}
self.connection
.send_command::<_, serde_json::Value>(
"Fetch.continueRequest",
Some(params),
Some(&self.session_id),
)
.await
.map_err(NetworkError::from)?;
Ok(())
}
pub(super) async fn get_response_body(
&self,
request_id: &str,
) -> Result<Option<Vec<u8>>, NetworkError> {
use base64::Engine;
let result: GetResponseBodyResult = self
.connection
.send_command(
"Fetch.getResponseBody",
Some(GetResponseBodyParams {
request_id: request_id.to_string(),
}),
Some(&self.session_id),
)
.await
.map_err(NetworkError::from)?;
let body = if result.base64_encoded {
base64::engine::general_purpose::STANDARD
.decode(&result.body)
.ok()
} else {
Some(result.body.into_bytes())
};
Ok(body)
}
}
#[cfg(test)]
mod tests;