use base64::engine::general_purpose;
use base64::prelude::Engine as _;
use hashbrown::HashMap;
use http_cache_reqwest::CacheManager;
use http_cache_semantics::CachePolicy;
use http_global_cache::CACACHE_MANAGER;
use lazy_static::lazy_static;
use reqwest::Method;
use reqwest::StatusCode;
use url::Url;
use crate::cache::manager::site_key_for_target_url;
use crate::http::{convert_headers, HttpRequestLike, HttpResponseLike};
pub use spider_remote_cache::{
build_payload, dump_batch_to_remote as dump_batch_to_remote_cache,
dump_to_remote as dump_to_remote_cache_parts, get_client, get_endpoint,
resolve_base_url as resolve_remote_base_url, set_client, set_endpoint,
HybridCachePayload,
};
lazy_static! {
pub static ref LOCAL_SESSION_CACHE: dashmap::DashMap<String, HashMap<String, (http_cache_reqwest::HttpResponse, CachePolicy)>> = dashmap::DashMap::new();
pub(crate) static ref PENDING_STREAM_URLS: dashmap::DashSet<String> = dashmap::DashSet::new();
}
fn remote_version_to_http_cache(
v: spider_remote_cache::HttpVersion,
) -> http_cache::HttpVersion {
match v {
spider_remote_cache::HttpVersion::H2 => http_cache::HttpVersion::H2,
spider_remote_cache::HttpVersion::H3 => http_cache::HttpVersion::H3,
spider_remote_cache::HttpVersion::Http09 => http_cache::HttpVersion::Http09,
spider_remote_cache::HttpVersion::Http10 => http_cache::HttpVersion::Http10,
spider_remote_cache::HttpVersion::Http11 | _ => http_cache::HttpVersion::Http11,
}
}
pub async fn dump_to_remote_cache(
cache_key: &str,
cache_site: &str,
http_response: &crate::http::HttpResponse,
method: &str,
http_request_headers: &std::collections::HashMap<String, String>,
dump_remote: Option<&str>,
) {
let version = match http_response.version {
crate::http::HttpVersion::Http09 => spider_remote_cache::HttpVersion::Http09,
crate::http::HttpVersion::Http10 => spider_remote_cache::HttpVersion::Http10,
crate::http::HttpVersion::H2 => spider_remote_cache::HttpVersion::H2,
crate::http::HttpVersion::H3 => spider_remote_cache::HttpVersion::H3,
_ => spider_remote_cache::HttpVersion::Http11,
};
dump_to_remote_cache_parts(
cache_key,
cache_site,
http_response.url.as_str(),
&http_response.body,
method,
http_response.status,
http_request_headers,
&http_response.headers,
&version,
dump_remote,
)
.await
}
pub async fn get_cache_site(
target_url: &str,
auth: Option<&str>,
remote: Option<&str>,
namespace: Option<&str>,
) {
let base_url = spider_remote_cache::resolve_base_url(remote);
let cache_key = site_key_for_target_url(target_url, auth, namespace);
let endpoint = format!("{}/cache/site/{}", base_url, cache_key);
let result = get_client().get(&endpoint).send().await;
let resp = match result {
Ok(resp) => resp,
Err(err) => {
tracing::warn!(
"remote cache get: failed to GET {} from {}: {}",
cache_key,
endpoint,
err
);
return;
}
};
if !resp.status().is_success() {
tracing::warn!(
"remote cache get: non-success status for {}: {}",
cache_key,
resp.status()
);
return;
}
let payloads: Vec<Box<HybridCachePayload>> = match resp.json().await {
Ok(p) => p,
Err(err) => {
tracing::warn!(
"remote cache get: failed to parse JSON for {} from {}: {}",
cache_key,
endpoint,
err
);
return;
}
};
tracing::debug!(
"remote cache get: seeding {} entries locally for website {}",
payloads.len(),
cache_key
);
for payload in payloads {
if let Err(err) = seed_payload_into_local_cache(&cache_key, &payload, target_url).await {
tracing::warn!(
"remote cache get: failed to seed resource {} for website {}: {}",
payload.resource_key,
cache_key,
err
);
}
}
}
pub async fn get_cache_resource(
target_url: &str,
auth: Option<&str>,
remote: Option<&str>,
namespace: Option<&str>,
) {
let base_url = spider_remote_cache::resolve_base_url(remote);
let cache_key = site_key_for_target_url(target_url, auth, namespace);
let endpoint = format!("{}/cache/resource/{}", base_url, cache_key);
let result = get_client().get(&endpoint).send().await;
let resp = match result {
Ok(resp) => resp,
Err(err) => {
tracing::warn!(
"remote cache get: failed to GET {} from {}: {}",
cache_key,
endpoint,
err
);
return;
}
};
if !resp.status().is_success() {
tracing::warn!(
"remote cache get: non-success status for {}: {}",
cache_key,
resp.status()
);
return;
}
let payload: Box<HybridCachePayload> = match resp.json().await {
Ok(p) => p,
Err(err) => {
tracing::warn!(
"remote cache get: failed to parse JSON for {} from {}: {}",
cache_key,
endpoint,
err
);
return;
}
};
tracing::debug!(
"remote cache get: seeding 1 entrie locally for website {}",
cache_key
);
if let Err(err) = seed_payload_into_local_cache(&cache_key, &payload, target_url).await {
tracing::warn!(
"remote cache get: failed to seed resource {} for website {}: {}",
payload.resource_key,
cache_key,
err
);
}
}
pub async fn clear_local_session_cache(cache_key: &str) {
LOCAL_SESSION_CACHE.remove(cache_key);
}
const SESSION_CACHE_MAX_SITES: usize = 2_000;
const SESSION_CACHE_MAX_PER_SITE: usize = 10_000;
pub fn session_cache_insert(
cache_key: &str,
http_res: http_cache_reqwest::HttpResponse,
cache_policy: CachePolicy,
entry_key: &str,
) {
use dashmap::mapref::entry::Entry;
match LOCAL_SESSION_CACHE.entry(cache_key.to_string()) {
Entry::Occupied(mut occ) => {
let inner = occ.get_mut();
if inner.len() < SESSION_CACHE_MAX_PER_SITE {
inner.insert(entry_key.into(), (http_res, cache_policy));
}
}
Entry::Vacant(vac) => {
if LOCAL_SESSION_CACHE.len() >= SESSION_CACHE_MAX_SITES {
let to_remove: Vec<String> = LOCAL_SESSION_CACHE
.iter()
.take(SESSION_CACHE_MAX_SITES / 4)
.map(|r| r.key().clone())
.collect();
for key in to_remove {
LOCAL_SESSION_CACHE.remove(&key);
}
}
let mut m: HashMap<String, (http_cache_reqwest::HttpResponse, CachePolicy)> =
HashMap::new();
m.insert(entry_key.into(), (http_res, cache_policy));
vac.insert(m);
}
}
}
async fn seed_payload_into_local_cache(
cache_key: &str,
payload: &HybridCachePayload,
target_url: &str,
) -> Result<(), String> {
if payload.body_base64.is_empty() {
return Ok(());
}
let same_document = payload.url == target_url;
let uri = payload
.url
.parse()
.map_err(|e| format!("invalid URI for {}: {e}", payload.url))?;
let body = general_purpose::STANDARD
.decode(&payload.body_base64)
.map_err(|e| format!("invalid base64 body for {}: {e}", payload.resource_key))?;
let req = HttpRequestLike {
uri,
method: Method::from_bytes(payload.method.as_bytes()).unwrap_or(Method::GET),
headers: convert_headers(&payload.request_headers),
};
let res = HttpResponseLike {
status: StatusCode::from_u16(payload.status).unwrap_or(StatusCode::EXPECTATION_FAILED),
headers: convert_headers(&payload.response_headers),
};
let policy = CachePolicy::new(&req, &res);
let url =
Url::parse(&payload.url).map_err(|e| format!("invalid Url for {}: {e}", payload.url))?;
let http_res = http_cache_reqwest::HttpResponse {
url,
headers: http_cache::HttpHeaders::Modern(crate::http::headers_to_multi(
&payload.response_headers,
)),
version: remote_version_to_http_cache(payload.http_version),
status: payload.status,
body,
metadata: None,
};
let key = payload.resource_key.clone();
let session_key = format!("{}:{}", payload.method, http_res.url);
if same_document {
let put_result = CACACHE_MANAGER
.put(key.clone(), http_res.clone(), policy.clone())
.await;
if let Err(e) = put_result {
return Err(format!("CACACHE_MANAGER.put failed for {}: {e}", key));
}
}
session_cache_insert(cache_key, http_res, policy, &session_key);
Ok(())
}
pub fn get_session_cache_item(
cache_key: &str,
target_url: &str,
) -> Option<(http_cache_reqwest::HttpResponse, CachePolicy)> {
LOCAL_SESSION_CACHE
.get(cache_key)
.and_then(|local_cache| local_cache.get(target_url).cloned())
}
pub fn check_session_cache_item(cache_key: &str, target_url: &str) -> bool {
LOCAL_SESSION_CACHE
.get(cache_key)
.map_or(false, |local_cache| local_cache.contains_key(target_url))
}
pub fn mark_stream_pending(key: &str) {
PENDING_STREAM_URLS.insert(key.to_string());
}
pub fn clear_stream_pending(key: &str) {
PENDING_STREAM_URLS.remove(key);
}
pub fn is_stream_pending(key: &str) -> bool {
PENDING_STREAM_URLS.contains(key)
}