pub use http_global_cache::CACACHE_MANAGER;
use crate::http::{convert_headers, HttpRequestLike, HttpResponse, HttpResponseLike, HttpVersion};
use crate::{
cdp::browser_protocol::{
fetch::{ContinueRequestParams, EventRequestPaused, FulfillRequestParams, HeaderEntry},
network::{EnableParams, EventResponseReceived, GetResponseBodyParams, ResourceType},
},
page::Page,
utils::is_data_resource,
};
use base64::{engine::general_purpose, Engine as _};
use chromiumoxide_cdp::cdp::browser_protocol::fetch::{RequestPattern, RequestStage};
use http_cache_reqwest::CacheManager;
use reqwest::StatusCode;
use spider_fingerprint::http;
use std::collections::HashMap;
use std::time::SystemTime;
use tokio::task::JoinHandle;
use tokio_stream::StreamExt;
lazy_static::lazy_static! {
pub(crate) static ref STREAMING_CHUNK_SIZE: usize = {
let default_streaming_chunk_size: usize = 8192 * 2;
let min_streaming_chunk_size: usize = default_streaming_chunk_size * 2 / 3;
std::env::var("STREAMING_CHUNK_SIZE")
.ok()
.and_then(|val| val.parse::<usize>().ok())
.map(|val| {
if val < min_streaming_chunk_size {
min_streaming_chunk_size
} else {
val
}
})
.unwrap_or(default_streaming_chunk_size)
};
}
pub async fn rewrite_base_tag(html: &[u8], base_url: Option<&str>) -> String {
use lol_html::{element, html_content::ContentType};
use std::sync::{
atomic::{AtomicBool, AtomicU8, Ordering},
Arc,
};
#[inline]
fn bytes_to_string(b: &[u8]) -> String {
match std::str::from_utf8(b) {
Ok(s) => s.to_owned(),
Err(_) => String::from_utf8_lossy(b).into_owned(),
}
}
if html.is_empty() {
return String::new();
}
let base_href = match base_url {
Some(s) if !s.is_empty() => s,
_ => return bytes_to_string(html),
};
const UNSET: u8 = 0;
const INSERTED: u8 = 1;
const PRESENT: u8 = 2;
let state = Arc::new(AtomicU8::new(UNSET));
let saw_head = Arc::new(AtomicBool::new(false));
let base_tag = format!(r#"<base href="{}">"#, base_href);
let head_with_base = format!(r#"<head>{}</head>"#, base_tag);
let mut buffer = Vec::with_capacity(html.len() + base_href.len() + 64);
let state_for_base = state.clone();
let state_for_head = state.clone();
let state_for_body = state.clone();
let saw_head_for_head = saw_head.clone();
let saw_head_for_body = saw_head.clone();
let settings: lol_html::Settings<'_, '_, lol_html::send::SendHandlerTypes> =
lol_html::send::Settings {
element_content_handlers: vec![
element!("base", move |el| {
if state_for_base.load(Ordering::Relaxed) == PRESENT {
el.remove();
return Ok(());
}
match el.get_attribute("href") {
Some(href)
if href.starts_with("http://") || href.starts_with("https://") =>
{
state_for_base.store(PRESENT, Ordering::Relaxed);
}
_ => el.remove(),
}
Ok(())
}),
element!("head", move |el: &mut lol_html::send::Element<'_, '_>| {
saw_head_for_head.store(true, Ordering::Relaxed);
if let Some(handlers) = el.end_tag_handlers() {
let state = state_for_head.clone();
let base_tag = base_tag.clone();
handlers.push(Box::new(move |end| {
if state
.compare_exchange(
UNSET,
INSERTED,
Ordering::Relaxed,
Ordering::Relaxed,
)
.is_ok()
{
end.before(&base_tag, ContentType::Html);
}
Ok(())
}));
}
Ok(())
}),
element!("body", move |el: &mut lol_html::send::Element<'_, '_>| {
if !saw_head_for_body.load(Ordering::Relaxed) {
if state_for_body
.compare_exchange(UNSET, INSERTED, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
el.before(&head_with_base, ContentType::Html);
}
}
Ok(())
}),
],
..lol_html::send::Settings::new_for_handler_types()
};
let mut rewriter = lol_html::send::HtmlRewriter::new(settings, |c: &[u8]| {
buffer.extend_from_slice(c);
});
for chunk in html.chunks(*STREAMING_CHUNK_SIZE) {
if state.load(Ordering::Relaxed) == PRESENT {
return bytes_to_string(html);
}
if rewriter.write(chunk).is_err() {
return bytes_to_string(html);
}
}
if rewriter.end().is_err() {
return bytes_to_string(html);
}
auto_encoder::auto_encode_bytes(&buffer)
}
pub fn create_cache_key_raw(
uri: &str,
override_method: Option<&str>,
auth: Option<&str>,
) -> String {
let method = override_method.unwrap_or("GET");
match auth {
Some(a) => format!("{method}:{uri}:{a}"),
None => format!("{method}:{uri}"),
}
}
fn hash_key_v1(s: &str) -> String {
hex::encode(blake3::hash(s.as_bytes()).as_bytes())
}
pub fn create_site_key(target_url: &str, auth: Option<&str>, method: Option<&str>) -> String {
let normalized = url::Url::parse(target_url)
.map(|mut u| {
u.set_fragment(None);
u.to_string()
})
.unwrap_or_else(|_| target_url.to_string());
let method_part = method.unwrap_or("GET");
let raw = match auth {
Some(a) => format!("site|v1|m={method_part}|url={normalized}|auth={a}"),
None => format!("site|v1|m={method_part}|url={normalized}|auth="),
};
hash_key_v1(&raw)
}
pub async fn get_cached_url(target_url: &str, auth_opt: Option<&str>) -> Option<Vec<u8>> {
let cache_url = create_cache_key_raw(target_url, None, auth_opt.as_deref());
let result = tokio::time::timeout(std::time::Duration::from_millis(60), async {
CACACHE_MANAGER.get(&cache_url).await
})
.await;
if let Ok(cached) = result {
if let Ok(Some((http_response, cache_policy))) = cached {
if !cache_policy.is_stale(SystemTime::now()) {
return Some(http_response.body);
}
}
}
None
}
#[derive(Debug, Default, Clone)]
pub enum BasicCachePolicy {
AllowStale,
Period(SystemTime),
#[default]
Normal,
}
impl BasicCachePolicy {
#[inline]
pub fn allows_cached(&self, cache_policy: &http_cache_semantics::CachePolicy) -> bool {
match self {
BasicCachePolicy::AllowStale => true,
BasicCachePolicy::Period(now) => !cache_policy.is_stale(*now),
BasicCachePolicy::Normal => !cache_policy.is_stale(SystemTime::now()),
}
}
}
pub async fn get_cached_url_with_metadata(
target_url: &str,
auth_opt: Option<&str>,
policy: Option<&BasicCachePolicy>,
) -> Option<(Vec<u8>, HashMap<String, String>)> {
let cache_key = create_cache_key_raw(target_url, None, auth_opt.as_deref());
let result = tokio::time::timeout(std::time::Duration::from_millis(250), async {
CACACHE_MANAGER.get(&cache_key).await
})
.await;
if let Ok(cached) = result {
if let Ok(Some((http_response, stored_policy))) = cached {
let allow = match policy {
Some(BasicCachePolicy::AllowStale) => true,
Some(BasicCachePolicy::Period(now)) => !stored_policy.is_stale(*now),
_ => !stored_policy.is_stale(SystemTime::now()),
};
if allow {
let headers = crate::http::headers_from_multi(&http_response.headers);
return Some((http_response.body, headers));
}
}
}
None
}
pub async fn put_hybrid_cache(
cache_key: &str,
cache_site: &str,
http_response: HttpResponse,
method: &str,
http_request_headers: std::collections::HashMap<String, String>,
dump_remote: Option<&str>,
) {
use http_cache_reqwest::CacheManager;
use http_cache_semantics::CachePolicy;
if is_body_empty_for_cache(&http_response.body) {
return;
}
if let Ok(u) = http_response.url.as_str().parse::<http::uri::Uri>() {
let req = HttpRequestLike {
uri: u,
method: http::method::Method::from_bytes(method.as_bytes())
.unwrap_or(http::method::Method::GET),
headers: convert_headers(&http_request_headers),
};
let res = HttpResponseLike {
status: StatusCode::from_u16(http_response.status)
.unwrap_or(StatusCode::EXPECTATION_FAILED),
headers: convert_headers(&http_response.headers),
};
let policy = CachePolicy::new(&req, &res);
tracing::debug!("Storing cache {:?}", http_response.url.as_str());
let multi_headers = crate::http::headers_to_multi(&http_response.headers);
let session_key = format!("{}:{}", method, http_response.url);
let status = http_response.status;
let version = http_response.version;
let mut need_dump = false;
if dump_remote.is_some() {
let result = tokio::time::timeout(std::time::Duration::from_millis(250), async {
CACACHE_MANAGER.get(&cache_key).await
})
.await;
let already_fresh = matches!(
result,
Ok(Ok(Some((_, ref stored_policy)))) if !stored_policy.is_stale(SystemTime::now())
);
if !already_fresh {
let cached =
crate::cache::remote::check_session_cache_item(cache_site, &session_key);
need_dump = !cached;
}
}
let dump_body = if need_dump {
Some(http_response.body.clone())
} else {
None
};
let cached_response = http_cache_reqwest::HttpResponse {
url: http_response.url,
body: http_response.body,
headers: http_cache::HttpHeaders::Modern(multi_headers),
version: version.into(),
status,
metadata: None,
};
if let Some(body) = dump_body {
let job = super::dump_remote::DumpJob {
cache_key: cache_key.to_string(),
cache_site: cache_site.to_string(),
url: cached_response.url.to_string(),
method: method.to_string(),
status,
request_headers: http_request_headers,
response_headers: http_response.headers,
body,
http_version: version,
dump_remote: dump_remote.map(|s| s.to_string()),
};
if super::dump_remote::worke_inited() {
if !super::dump_remote::try_enqueue(job) {
tracing::debug!("remote dump skipped (worker not initialized or queue full)");
}
} else {
if let Err(err) = super::dump_remote::enqueue(job).await {
tracing::debug!(
"remote dump skipped (worker not initialized or queue full) - {:?}",
err
);
}
}
}
crate::cache::remote::session_cache_insert(
cache_site,
cached_response.clone(),
policy.clone(),
&session_key,
);
let _ = CACACHE_MANAGER
.put(cache_key.into(), cached_response, policy)
.await;
}
}
pub async fn spawn_response_cache_listener(
page: Page,
cache_site: String,
auth: Option<String>,
cache_strategy: Option<CacheStrategy>,
dump_remote: Option<String>,
) -> Result<JoinHandle<()>, crate::error::CdpError> {
page.execute(EnableParams::default()).await?;
let mut events = page.event_listener::<EventResponseReceived>().await?;
let handle = tokio::spawn(async move {
while let Some(ev) = events.next().await {
if let Err(err) = handle_single_response(
&page,
&cache_site,
ev,
auth.as_deref(),
cache_strategy,
dump_remote.as_deref(),
)
.await
{
tracing::debug!("failed to cache response: {err:?}");
}
}
});
Ok(handle)
}
fn headers_to_string_map(
headers: &crate::cdp::browser_protocol::network::Headers,
) -> HashMap<String, String> {
let mut out = HashMap::new();
if let Some(obj) = headers.inner().as_object() {
for (k, v) in obj {
let val = if let Some(s) = v.as_str() {
s.to_string()
} else {
v.to_string()
};
out.insert(k.clone(), val);
}
}
out
}
#[derive(Debug, Default, Clone, PartialEq, Copy)]
pub enum CacheStrategy {
#[default]
Scraping,
Screenshots,
}
pub fn allow_cache_response(
resource_type: &ResourceType,
cache_strategy: Option<&CacheStrategy>,
) -> bool {
let is_data = is_data_resource(resource_type);
let strategy = cache_strategy.copied().unwrap_or(CacheStrategy::Scraping);
let is_media_like = matches!(
resource_type,
ResourceType::Image | ResourceType::Media | ResourceType::Font
);
if strategy == CacheStrategy::Scraping {
!is_data && !is_media_like
} else {
!is_data
}
}
pub fn site_key_for_target_url(
target_url: &str,
auth: Option<&str>,
namespace: Option<&str>,
) -> String {
let normalized = match url::Url::parse(target_url) {
Ok(mut u) => {
u.set_fragment(None);
u.to_string()
}
Err(_) => target_url.to_string(),
};
let input = format!(
"v1|url={}|auth={}|ns={}",
normalized,
auth.unwrap_or(""),
namespace.unwrap_or(""),
);
hex::encode(blake3::hash(input.as_bytes()).as_bytes()) }
#[inline]
fn is_body_empty_for_cache(body: &[u8]) -> bool {
if body.is_empty() {
return true;
}
let trimmed = body.trim_ascii();
if trimmed.is_empty() {
return true;
}
if trimmed[0] != b'<' {
return false;
}
if trimmed == b"<html><head></head><body></body></html>" || trimmed == b"<html></html>" {
return true;
}
if trimmed.len() <= 2048 {
let lower: Vec<u8> = trimmed.iter().map(|c| c.to_ascii_lowercase()).collect();
if let Some(body_open) = lower.windows(5).position(|w| w == b"<body") {
if let Some(gt) = lower[body_open..].iter().position(|&c| c == b'>') {
let content_start = body_open + gt + 1;
if let Some(close) = lower[content_start..]
.windows(7)
.position(|w| w == b"</body>")
{
let content_end = content_start + close;
if trimmed[content_start..content_end]
.iter()
.all(|c| c.is_ascii_whitespace())
{
return true;
}
}
}
}
}
false
}
const DEFAULT_METHOD: &str = "GET";
async fn handle_single_response(
page: &Page,
cache_site: &str,
ev: std::sync::Arc<EventResponseReceived>,
auth: Option<&str>,
cache_strategy: Option<CacheStrategy>,
dump_remote: Option<&str>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
if !ev.response.url.starts_with("http") {
return Ok(());
}
let document_resource = ev.r#type == ResourceType::Document;
let eligible_for_cache =
document_resource || allow_cache_response(&ev.r#type, cache_strategy.as_ref());
if !eligible_for_cache || ev.response.encoded_data_length == 0.0 {
return Ok(());
}
let method = DEFAULT_METHOD.to_string();
let current_url = format!("{}:{}", &method, &ev.response.url);
if crate::cache::remote::check_session_cache_item(cache_site, ¤t_url) {
return Ok(());
}
if crate::cache::remote::is_stream_pending(¤t_url) {
return Ok(());
}
let body_ret = page
.execute(GetResponseBodyParams::new(ev.request_id.clone()))
.await;
if let Ok(body_ret) = body_ret {
let body_bytes = if body_ret.base64_encoded {
general_purpose::STANDARD.decode(&body_ret.body)?
} else {
body_ret.body.clone().into_bytes()
};
if is_body_empty_for_cache(&body_bytes) {
return Ok(());
}
let resp_headers: HashMap<String, String> = headers_to_string_map(&ev.response.headers);
let req_headers: HashMap<String, String> = ev
.response
.request_headers
.as_ref()
.map(headers_to_string_map)
.unwrap_or_default();
let url = &ev.response.url;
let status = ev.response.status as u16;
let version = match ev.response.protocol.as_deref() {
Some(v) => v.into(),
_ => HttpVersion::Http11,
};
let cache_key = create_cache_key_raw(url.as_str(), Some(DEFAULT_METHOD), auth);
{
let parsed_url = match url::Url::parse(url.as_str()) {
Ok(u) => u,
Err(_) => {
match url::Url::parse("http://localhost") {
Ok(u) => u,
Err(_) => return Ok(()),
}
}
};
let uri: http::uri::Uri = url.as_str().parse().unwrap_or_default();
let req = HttpRequestLike {
uri,
method: http::method::Method::GET,
headers: convert_headers(&req_headers),
};
let res = HttpResponseLike {
status: StatusCode::from_u16(status).unwrap_or(StatusCode::EXPECTATION_FAILED),
headers: convert_headers(&resp_headers),
};
let policy = http_cache_semantics::CachePolicy::new(&req, &res);
let http_res = http_cache_reqwest::HttpResponse {
url: parsed_url,
body: body_bytes.clone(),
headers: http_cache::HttpHeaders::Modern(crate::http::headers_to_multi(
&resp_headers,
)),
version: version.into(),
status,
metadata: None,
};
crate::cache::remote::session_cache_insert(cache_site, http_res, policy, ¤t_url);
}
let job = super::dump_remote::DumpJob {
cache_key: cache_key,
cache_site: cache_site.to_string(),
url: url.to_string(),
method: method.to_string(),
status: status,
request_headers: req_headers,
response_headers: resp_headers,
body: body_bytes,
http_version: version,
dump_remote: dump_remote.map(|s| s.to_string()),
};
if super::dump_remote::worke_inited() {
if !super::dump_remote::try_enqueue(job) {
tracing::debug!("remote dump skipped (worker not initialized or queue full)");
}
} else {
if let Err(err) = super::dump_remote::enqueue(job).await {
tracing::debug!(
"remote dump skipped (worker not initialized or queue full) - {:?}",
err
);
}
}
}
Ok(())
}
pub async fn spawn_fetch_cache_interceptor(
page: Page,
auth: Option<String>,
policy: Option<BasicCachePolicy>,
cache_strategy: Option<CacheStrategy>,
) -> Result<JoinHandle<()>, crate::error::CdpError> {
page.send_command(crate::cdp::browser_protocol::fetch::EnableParams {
handle_auth_requests: Some(false),
patterns: Some(vec![
RequestPattern {
resource_type: Some(ResourceType::Document),
request_stage: Some(RequestStage::Request),
url_pattern: Some("*".into()),
},
RequestPattern {
resource_type: Some(ResourceType::Script),
request_stage: Some(RequestStage::Request),
url_pattern: Some("*".into()),
},
RequestPattern {
resource_type: Some(ResourceType::Document),
request_stage: Some(RequestStage::Response),
url_pattern: Some("*".into()),
},
RequestPattern {
resource_type: Some(ResourceType::Script),
request_stage: Some(RequestStage::Response),
url_pattern: Some("*".into()),
},
]),
})
.await?;
let mut events = page.event_listener::<EventRequestPaused>().await?;
let handle = tokio::spawn(async move {
while let Some(ev) = events.next().await {
let is_response_stage = ev.response_status_code.is_some();
if is_response_stage {
let page = page.clone();
let auth = auth.clone();
let cache_strategy = cache_strategy;
tokio::spawn(async move {
if let Err(err) = handle_fetch_response_stage(
&page,
&ev,
auth.as_deref(),
cache_strategy.as_ref(),
)
.await
{
tracing::debug!(
"cache stream interceptor error: {err:?} - {:?}",
ev.request.url
);
}
});
} else if let Err(err) = handle_fetch_paused(
&page,
&ev,
auth.as_deref(),
policy.as_ref(),
cache_strategy.as_ref(),
)
.await
{
tracing::debug!("cache interceptor error: {err:?} - {:?}", ev.request.url);
}
}
});
Ok(handle)
}
async fn handle_fetch_paused(
page: &Page,
ev: &std::sync::Arc<EventRequestPaused>,
auth: Option<&str>,
policy: Option<&BasicCachePolicy>,
cache_strategy: Option<&CacheStrategy>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let current_url = ev.request.url.as_str();
let eligible_for_cache = allow_cache_response(&ev.resource_type, cache_strategy.as_deref());
if !eligible_for_cache || !current_url.starts_with("http") {
let params = ContinueRequestParams::new(ev.request_id.clone());
page.send_command(params).await?;
return Ok(());
}
if ev.response_error_reason.is_some() {
let params = ContinueRequestParams::new(ev.request_id.clone());
page.send_command(params).await?;
return Ok(());
}
if let Some((body, metadata)) =
get_cached_url_with_metadata(current_url, auth.as_deref(), policy).await
{
tracing::debug!("Cache HIT: {}", current_url);
let mut resp_headers = Vec::<HeaderEntry>::with_capacity(metadata.len());
for (key, val) in metadata.iter() {
resp_headers.push(HeaderEntry {
name: key.into(),
value: val.into(),
});
}
let mut params = FulfillRequestParams::new(ev.request_id.clone(), 200);
params.body = Some(general_purpose::STANDARD.encode(&body).into());
params.response_headers = Some(resp_headers);
page.send_command(params).await?;
} else {
tracing::debug!("Cache MISS: {}, continuing request", current_url);
let params = ContinueRequestParams::new(ev.request_id.clone());
page.send_command(params).await?;
}
Ok(())
}
async fn handle_fetch_response_stage(
page: &Page,
ev: &std::sync::Arc<EventRequestPaused>,
auth: Option<&str>,
cache_strategy: Option<&CacheStrategy>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let current_url = ev.request.url.as_str();
if ev.response_error_reason.is_some() {
let params = ContinueRequestParams::new(ev.request_id.clone());
page.send_command(params).await?;
return Ok(());
}
let eligible_for_cache = allow_cache_response(&ev.resource_type, cache_strategy.as_deref());
if !eligible_for_cache || !current_url.starts_with("http") {
let params = ContinueRequestParams::new(ev.request_id.clone());
page.send_command(params).await?;
return Ok(());
}
let status = ev.response_status_code.unwrap_or(0);
if !(200..400).contains(&status) {
let params = ContinueRequestParams::new(ev.request_id.clone());
page.send_command(params).await?;
return Ok(());
}
let resp_headers = response_headers_from_event(ev);
let content_length = super::stream::content_length_from_headers(&resp_headers);
let threshold = super::stream::streaming_threshold_bytes();
let should_stream = match content_length {
Some(len) => len >= threshold,
None => true,
};
if !should_stream {
let params = ContinueRequestParams::new(ev.request_id.clone());
page.send_command(params).await?;
return Ok(());
}
let dedup_key = format!("{}:{}", DEFAULT_METHOD, current_url);
crate::cache::remote::mark_stream_pending(&dedup_key);
let body_bytes = match super::stream::read_response_body_as_stream(
page,
ev.request_id.clone(),
content_length,
)
.await
{
super::stream::StreamResult::Ok(body) => body,
super::stream::StreamResult::NotStarted(err) => {
tracing::debug!(
"Stream not started for {}, falling back to Fetch.getResponseBody: {err}",
current_url
);
match fetch_response_body_oneshot(page, ev, &dedup_key).await {
Some(body) => body,
None => {
crate::cache::remote::clear_stream_pending(&dedup_key);
let params = ContinueRequestParams::new(ev.request_id.clone());
let _ = page.send_command(params).await;
return Ok(());
}
}
}
super::stream::StreamResult::PartialBody { body, error } => {
tracing::debug!(
"Stream failed mid-read for {} ({} bytes recovered): {error}",
current_url,
body.len(),
);
crate::cache::remote::clear_stream_pending(&dedup_key);
let mut params = FulfillRequestParams::new(ev.request_id.clone(), status);
params.body = Some(general_purpose::STANDARD.encode(&body).into());
params.response_headers = Some(headers_to_header_entries(&resp_headers));
let _ = page.send_command(params).await;
return Ok(());
}
};
crate::cache::remote::clear_stream_pending(&dedup_key);
if is_body_empty_for_cache(&body_bytes) {
let mut params = FulfillRequestParams::new(ev.request_id.clone(), status);
params.body = Some(general_purpose::STANDARD.encode(&body_bytes).into());
params.response_headers = Some(headers_to_header_entries(&resp_headers));
page.send_command(params).await?;
return Ok(());
}
let req_headers = request_headers_from_event(ev);
let cache_key_url = current_url;
let uri: http::uri::Uri = cache_key_url.parse().unwrap_or_default();
let req = crate::http::HttpRequestLike {
uri,
method: http::method::Method::GET,
headers: crate::http::convert_headers(&req_headers),
};
let res = crate::http::HttpResponseLike {
status: StatusCode::from_u16(status as u16)
.unwrap_or(StatusCode::EXPECTATION_FAILED),
headers: crate::http::convert_headers(&resp_headers),
};
let policy = http_cache_semantics::CachePolicy::new(&req, &res);
let parsed_url = url::Url::parse(cache_key_url).unwrap_or_else(|_| {
url::Url::parse("http://localhost").expect("static URL")
});
let http_res = http_cache_reqwest::HttpResponse {
url: parsed_url,
body: body_bytes.clone(),
headers: http_cache::HttpHeaders::Modern(crate::http::headers_to_multi(
&resp_headers,
)),
version: crate::http::HttpVersion::Http11.into(),
status: status as u16,
metadata: None,
};
let cache_site = site_key_for_target_url(cache_key_url, auth, None);
crate::cache::remote::session_cache_insert(
&cache_site,
http_res,
policy,
&dedup_key,
);
let mut params = FulfillRequestParams::new(ev.request_id.clone(), status);
params.body = Some(general_purpose::STANDARD.encode(&body_bytes).into());
params.response_headers = Some(headers_to_header_entries(&resp_headers));
page.send_command(params).await?;
tracing::debug!("Stream cached: {} ({} bytes)", current_url, body_bytes.len());
Ok(())
}
async fn fetch_response_body_oneshot(
page: &Page,
ev: &std::sync::Arc<EventRequestPaused>,
dedup_key: &str,
) -> Option<Vec<u8>> {
use chromiumoxide_cdp::cdp::browser_protocol::fetch::GetResponseBodyParams;
let result = page
.execute(GetResponseBodyParams::new(ev.request_id.clone()))
.await;
match result {
Ok(ret) => {
let body_bytes = if ret.result.base64_encoded {
match general_purpose::STANDARD.decode(&ret.result.body) {
Ok(b) => b,
Err(err) => {
tracing::debug!("Fetch.getResponseBody base64 decode failed: {err}");
crate::cache::remote::clear_stream_pending(dedup_key);
return None;
}
}
} else {
ret.result.body.into_bytes()
};
Some(body_bytes)
}
Err(err) => {
tracing::debug!("Fetch.getResponseBody failed: {err}");
None
}
}
}
fn response_headers_from_event(
ev: &EventRequestPaused,
) -> HashMap<String, String> {
let mut map = HashMap::new();
if let Some(ref headers) = ev.response_headers {
for entry in headers {
map.insert(entry.name.clone(), entry.value.clone());
}
}
map
}
fn request_headers_from_event(
ev: &EventRequestPaused,
) -> HashMap<String, String> {
let mut map = HashMap::new();
if let Some(obj) = ev.request.headers.inner().as_object() {
for (k, v) in obj {
let val = if let Some(s) = v.as_str() {
s.to_string()
} else {
v.to_string()
};
map.insert(k.clone(), val);
}
}
map
}
fn headers_to_header_entries(
headers: &HashMap<String, String>,
) -> Vec<HeaderEntry> {
headers
.iter()
.map(|(k, v)| HeaderEntry {
name: k.clone(),
value: v.clone(),
})
.collect()
}