use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::time::Duration;
use viewpoint_cdp::protocol::fetch::{
ContinueRequestParams, FulfillRequestParams, HeaderEntry, RequestPausedEvent,
};
use super::route::Route;
use crate::error::NetworkError;
#[derive(Debug)]
pub struct FetchBuilder<'a> {
pub(super) route: &'a Route,
pub(super) url: Option<String>,
pub(super) method: Option<String>,
pub(super) headers: Vec<HeaderEntry>,
pub(super) post_data: Option<Vec<u8>>,
pub(super) timeout: Duration,
}
impl<'a> FetchBuilder<'a> {
pub(super) fn new(route: &'a Route) -> Self {
Self {
route,
url: None,
method: None,
headers: Vec::new(),
post_data: None,
timeout: Duration::from_secs(30),
}
}
#[must_use]
pub fn url(mut self, url: impl Into<String>) -> Self {
self.url = Some(url.into());
self
}
#[must_use]
pub fn method(mut self, method: impl Into<String>) -> Self {
self.method = Some(method.into());
self
}
#[must_use]
pub fn header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
self.headers.push(HeaderEntry {
name: name.into(),
value: value.into(),
});
self
}
#[must_use]
pub fn headers(
mut self,
headers: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
) -> Self {
for (name, value) in headers {
self.headers.push(HeaderEntry {
name: name.into(),
value: value.into(),
});
}
self
}
#[must_use]
pub fn post_data(mut self, data: impl Into<Vec<u8>>) -> Self {
self.post_data = Some(data.into());
self
}
#[must_use]
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
pub async fn send(self) -> Result<FetchedResponse<'a>, NetworkError> {
use base64::Engine;
let mut events = self.route.connection().subscribe_events();
let request_id = self.route.request_id().to_string();
let session_id = self.route.session_id().to_string();
let post_data = self
.post_data
.map(|d| base64::engine::general_purpose::STANDARD.encode(&d));
let params = ContinueRequestParams {
request_id: self.route.request_id().to_string(),
url: self.url,
method: self.method,
post_data,
headers: if self.headers.is_empty() {
None
} else {
Some(self.headers)
},
intercept_response: Some(true),
};
self.route
.connection()
.send_command::<_, serde_json::Value>(
"Fetch.continueRequest",
Some(params),
Some(&session_id),
)
.await
.map_err(NetworkError::from)?;
let timeout = self.timeout;
let response_event = tokio::time::timeout(timeout, async {
while let Ok(event) = events.recv().await {
if event.session_id.as_deref() != Some(&session_id) {
continue;
}
if event.method == "Fetch.requestPaused" {
if let Some(params) = &event.params {
if let Ok(paused) =
serde_json::from_value::<RequestPausedEvent>(params.clone())
{
if paused.request_id == request_id && paused.is_response_stage() {
return Ok(paused);
}
}
}
}
}
Err(NetworkError::Aborted)
})
.await
.map_err(|_| NetworkError::Timeout(timeout))??;
let status = response_event.response_status_code.unwrap_or(200) as u16;
let headers: HashMap<String, String> = response_event
.response_headers
.as_ref()
.map(|h| {
h.iter()
.map(|e| (e.name.clone(), e.value.clone()))
.collect()
})
.unwrap_or_default();
let body = self
.route
.get_response_body(&response_event.request_id)
.await?;
Ok(FetchedResponse {
route: self.route,
request_id: response_event.request_id,
status,
headers,
body,
})
}
}
impl<'a> std::future::IntoFuture for FetchBuilder<'a> {
type Output = Result<FetchedResponse<'a>, NetworkError>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(self.send())
}
}
#[derive(Debug)]
pub struct FetchedResponse<'a> {
route: &'a Route,
request_id: String,
pub status: u16,
pub headers: HashMap<String, String>,
pub(super) body: Option<Vec<u8>>,
}
impl FetchedResponse<'_> {
pub fn body(&self) -> Result<Vec<u8>, NetworkError> {
self.body
.clone()
.ok_or_else(|| NetworkError::InvalidResponse("Response body not available".to_string()))
}
pub fn text(&self) -> Result<String, NetworkError> {
let body = self.body()?;
String::from_utf8(body)
.map_err(|e| NetworkError::InvalidResponse(format!("Response is not valid UTF-8: {e}")))
}
pub fn json<T: serde::de::DeserializeOwned>(&self) -> Result<T, NetworkError> {
let text = self.text()?;
serde_json::from_str(&text)
.map_err(|e| NetworkError::InvalidResponse(format!("Failed to parse JSON: {e}")))
}
pub async fn fulfill(self) -> Result<(), NetworkError> {
use base64::Engine;
let response_headers: Vec<HeaderEntry> = self
.headers
.iter()
.map(|(k, v)| HeaderEntry {
name: k.clone(),
value: v.clone(),
})
.collect();
let body = self
.body
.map(|b| base64::engine::general_purpose::STANDARD.encode(&b));
let params = FulfillRequestParams {
request_id: self.request_id.clone(),
response_code: i32::from(self.status),
response_headers: if response_headers.is_empty() {
None
} else {
Some(response_headers)
},
binary_response_headers: None,
body,
response_phrase: None,
};
self.route
.connection()
.send_command::<_, serde_json::Value>(
"Fetch.fulfillRequest",
Some(params),
Some(self.route.session_id()),
)
.await
.map_err(NetworkError::from)?;
Ok(())
}
}