use base64::engine::general_purpose;
use base64::prelude::Engine as _;
use hashbrown::HashMap;
use http_cache_reqwest::CacheManager;
use http_cache_semantics::CachePolicy;
use http_global_cache::CACACHE_MANAGER;
use lazy_static::lazy_static;
use reqwest::header::HeaderValue;
use reqwest::Method;
use reqwest::{Client, StatusCode};
use serde::{Deserialize, Serialize};
use tokio::sync::Semaphore;
use url::Url;
use crate::cache::manager::site_key_for_target_url;
use crate::http::{convert_headers, HttpRequestLike, HttpResponseLike, HttpVersion};
lazy_static! {
pub static ref HYBRID_CACHE_CLIENT: Client = Client::builder()
.pool_idle_timeout(std::time::Duration::from_secs(90))
.build()
.expect("failed to build HYBRID_CACHE_CLIENT");
pub static ref HYBRID_CACHE_ENDPOINT: String = std::env::var("HYBRID_CACHE_ENDPOINT")
.unwrap_or_else(|_| "http://127.0.0.1:8080".to_string());
pub static ref LOCAL_SESSION_CACHE: dashmap::DashMap<String, HashMap<String, (http_cache_reqwest::HttpResponse, CachePolicy)>> = dashmap::DashMap::new();
pub static ref REMOTE_CACHE_DUMP_SEM: Semaphore = Semaphore::new(1000);
}
#[derive(Debug, Serialize, Deserialize, Default)]
pub struct HybridCachePayload {
#[serde(default)]
website_key: Option<String>,
resource_key: String,
url: String,
method: String,
status: u16,
request_headers: std::collections::HashMap<String, String>,
response_headers: std::collections::HashMap<String, String>,
http_version: HttpVersion,
body_base64: String,
}
pub async fn dump_to_remote_cache_parts(
cache_key: &str,
cache_site: &str,
url_str: &str,
body: &[u8],
method: &str,
status: u16,
http_request_headers: &std::collections::HashMap<String, String>,
response_headers: &std::collections::HashMap<String, String>,
http_version: &HttpVersion,
dump_remote: Option<&str>,
) {
let _permit = match REMOTE_CACHE_DUMP_SEM.acquire().await {
Ok(p) => p,
Err(_) => return,
};
let website_key = url::Url::parse(url_str)
.ok()
.and_then(|u| u.host_str().map(|h| h.to_string()));
let body_base64 = general_purpose::STANDARD.encode(body);
let payload = HybridCachePayload {
website_key,
resource_key: cache_key.to_string(),
url: url_str.to_string(),
method: method.to_string(),
status,
http_version: *http_version,
request_headers: http_request_headers.clone(),
response_headers: response_headers.clone(),
body_base64,
};
let mut base_url = HYBRID_CACHE_ENDPOINT.as_str();
if let Some(remote) = dump_remote {
if remote != "true" {
base_url = remote.trim_ascii();
}
}
let endpoint = format!("{}/cache/index", &*base_url);
let result = HYBRID_CACHE_CLIENT
.post(&endpoint)
.json(&payload)
.header(
"x-cache-site",
HeaderValue::from_str(cache_site).unwrap_or(HeaderValue::from_static("")),
)
.send()
.await;
match result {
Ok(resp) => {
if !resp.status().is_success() {
tracing::warn!(
"remote cache dump: non-success status for {}: {}",
cache_key,
resp.status()
);
} else {
tracing::info!(
"remote cache dump: success status for {}: {}",
cache_key,
resp.status()
);
}
}
Err(err) => {
tracing::warn!(
"remote cache dump: failed to POST {} to {}: {}",
cache_key,
endpoint,
err
);
}
}
}
pub async fn dump_to_remote_cache(
cache_key: &str,
cache_site: &str,
http_response: &crate::http::HttpResponse,
method: &str,
http_request_headers: &std::collections::HashMap<String, String>,
dump_remote: Option<&str>,
) {
dump_to_remote_cache_parts(
cache_key,
cache_site,
http_response.url.as_str(),
&http_response.body,
method,
http_response.status,
http_request_headers,
&http_response.headers,
&http_response.version,
dump_remote,
)
.await
}
pub async fn get_cache_site(
target_url: &str,
auth: Option<&str>,
remote: Option<&str>,
namespace: Option<&str>,
) {
let mut base_url = HYBRID_CACHE_ENDPOINT.as_str();
if let Some(remote) = remote {
if remote != "true" {
base_url = remote.trim_ascii();
}
}
let cache_key = site_key_for_target_url(target_url, auth, namespace);
let endpoint = format!("{}/cache/site/{}", &*base_url, cache_key);
let result = HYBRID_CACHE_CLIENT.get(&endpoint).send().await;
let resp = match result {
Ok(resp) => resp,
Err(err) => {
tracing::warn!(
"remote cache get: failed to GET {} from {}: {}",
cache_key,
endpoint,
err
);
return;
}
};
if !resp.status().is_success() {
tracing::warn!(
"remote cache get: non-success status for {}: {}",
cache_key,
resp.status()
);
return;
}
let payloads: Vec<Box<HybridCachePayload>> = match resp.json().await {
Ok(p) => p,
Err(err) => {
tracing::warn!(
"remote cache get: failed to parse JSON for {} from {}: {}",
cache_key,
endpoint,
err
);
return;
}
};
tracing::debug!(
"remote cache get: seeding {} entries locally for website {}",
payloads.len(),
cache_key
);
for payload in payloads {
if let Err(err) = seed_payload_into_local_cache(&cache_key, &payload, &target_url).await {
tracing::warn!(
"remote cache get: failed to seed resource {} for website {}: {}",
payload.resource_key,
cache_key,
err
);
}
}
}
pub async fn get_cache_resource(
target_url: &str,
auth: Option<&str>,
remote: Option<&str>,
namespace: Option<&str>,
) {
let mut base_url = HYBRID_CACHE_ENDPOINT.as_str();
if let Some(remote) = remote {
if remote != "true" {
base_url = remote.trim_ascii();
}
}
let cache_key = site_key_for_target_url(target_url, auth, namespace);
let endpoint = format!("{}/cache/resource/{}", &*base_url, cache_key);
let result = HYBRID_CACHE_CLIENT.get(&endpoint).send().await;
let resp = match result {
Ok(resp) => resp,
Err(err) => {
tracing::warn!(
"remote cache get: failed to GET {} from {}: {}",
cache_key,
endpoint,
err
);
return;
}
};
if !resp.status().is_success() {
tracing::warn!(
"remote cache get: non-success status for {}: {}",
cache_key,
resp.status()
);
return;
}
let payload: Box<HybridCachePayload> = match resp.json().await {
Ok(p) => p,
Err(err) => {
tracing::warn!(
"remote cache get: failed to parse JSON for {} from {}: {}",
cache_key,
endpoint,
err
);
return;
}
};
tracing::debug!(
"remote cache get: seeding 1 entrie locally for website {}",
cache_key
);
if let Err(err) = seed_payload_into_local_cache(&cache_key, &payload, &target_url).await {
tracing::warn!(
"remote cache get: failed to seed resource {} for website {}: {}",
payload.resource_key,
cache_key,
err
);
}
}
pub async fn clear_local_session_cache(cache_key: &str) {
LOCAL_SESSION_CACHE.remove(cache_key);
}
const SESSION_CACHE_MAX_SITES: usize = 2_000;
const SESSION_CACHE_MAX_PER_SITE: usize = 10_000;
pub fn session_cache_insert(
cache_key: &str,
http_res: http_cache_reqwest::HttpResponse,
cache_policy: CachePolicy,
entry_key: &str,
) {
use dashmap::mapref::entry::Entry;
match LOCAL_SESSION_CACHE.entry(cache_key.to_string()) {
Entry::Occupied(mut occ) => {
let inner = occ.get_mut();
if inner.len() < SESSION_CACHE_MAX_PER_SITE {
inner.insert(entry_key.into(), (http_res, cache_policy));
}
}
Entry::Vacant(vac) => {
if LOCAL_SESSION_CACHE.len() >= SESSION_CACHE_MAX_SITES {
let to_remove: Vec<String> = LOCAL_SESSION_CACHE
.iter()
.take(SESSION_CACHE_MAX_SITES / 4)
.map(|r| r.key().clone())
.collect();
for key in to_remove {
LOCAL_SESSION_CACHE.remove(&key);
}
}
let mut m: HashMap<String, (http_cache_reqwest::HttpResponse, CachePolicy)> =
HashMap::new();
m.insert(entry_key.into(), (http_res, cache_policy));
vac.insert(m);
}
}
}
async fn seed_payload_into_local_cache(
cache_key: &str,
payload: &HybridCachePayload,
target_url: &str,
) -> Result<(), String> {
if payload.body_base64.is_empty() {
return Ok(());
}
let same_document = payload.url == target_url;
let uri = payload
.url
.parse()
.map_err(|e| format!("invalid URI for {}: {e}", payload.url))?;
let body = general_purpose::STANDARD
.decode(&payload.body_base64)
.map_err(|e| format!("invalid base64 body for {}: {e}", payload.resource_key))?;
let req = HttpRequestLike {
uri,
method: Method::from_bytes(payload.method.as_bytes()).unwrap_or(Method::GET),
headers: convert_headers(&payload.request_headers),
};
let res = HttpResponseLike {
status: StatusCode::from_u16(payload.status).unwrap_or(StatusCode::EXPECTATION_FAILED),
headers: convert_headers(&payload.response_headers),
};
let policy = CachePolicy::new(&req, &res);
let url =
Url::parse(&payload.url).map_err(|e| format!("invalid Url for {}: {e}", payload.url))?;
let http_res = http_cache_reqwest::HttpResponse {
url,
headers: http_cache::HttpHeaders::Modern(crate::http::headers_to_multi(
&payload.response_headers,
)),
version: payload.http_version.into(),
status: payload.status,
body,
metadata: None,
};
let key = payload.resource_key.clone();
let session_key = format!("{}:{}", payload.method, http_res.url);
if same_document {
let put_result = CACACHE_MANAGER
.put(key.clone(), http_res.clone(), policy.clone())
.await;
if let Err(e) = put_result {
return Err(format!("CACACHE_MANAGER.put failed for {}: {e}", key));
}
}
session_cache_insert(cache_key, http_res, policy, &session_key);
Ok(())
}
pub fn get_session_cache_item(
cache_key: &str,
target_url: &str,
) -> Option<(http_cache_reqwest::HttpResponse, CachePolicy)> {
LOCAL_SESSION_CACHE
.get(cache_key)
.and_then(|local_cache| local_cache.get(target_url).cloned())
}
pub fn check_session_cache_item(cache_key: &str, target_url: &str) -> bool {
LOCAL_SESSION_CACHE
.get(cache_key)
.map_or(false, |local_cache| local_cache.contains_key(target_url))
}