use crate::common::model::Response;
use crate::common::response_cache::ResponseCacheRemoteClient;
use crate::engine::api::profile_store::ProfileControlPlaneStore;
use async_trait::async_trait;
use log::warn;
use reqwest::Client;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
const ACTIVE_NODE_HEARTBEAT_TTL_SECS: u64 = 30;
const REMOTE_CACHE_LOOKUP_TIMEOUT_SECS: u64 = 3;
fn response_cache_current_time_ms() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64
}
#[derive(Debug, serde::Deserialize)]
struct RemoteCachedResponsePayload {
response: Response,
}
#[derive(Clone)]
pub struct HttpResponseCacheRemoteClient {
owner_namespace: String,
owner_node_id: Option<String>,
profile_store: Arc<ProfileControlPlaneStore>,
api_key: Option<String>,
pub federation_endpoints: HashMap<String, String>,
http_client: Client,
}
#[derive(Clone)]
pub struct HttpResponseCacheRemoteClientConfig {
pub owner_namespace: String,
pub owner_node_id: Option<String>,
pub profile_store: Arc<ProfileControlPlaneStore>,
pub api_key: Option<String>,
pub federation_endpoints: HashMap<String, String>,
pub http_client: Client,
}
impl HttpResponseCacheRemoteClient {
pub fn new(config: HttpResponseCacheRemoteClientConfig) -> Self {
Self {
owner_namespace: config.owner_namespace,
owner_node_id: config.owner_node_id,
profile_store: config.profile_store,
api_key: config.api_key,
federation_endpoints: config.federation_endpoints,
http_client: config.http_client,
}
}
}
#[async_trait]
impl ResponseCacheRemoteClient for HttpResponseCacheRemoteClient {
async fn fetch_remote_cached_response(&self, cache_key: &str) -> Option<Response> {
let owner = self.profile_store.get_response_cache_owner(cache_key)?;
if owner
.expires_at
.is_some_and(|expires_at| expires_at <= response_cache_current_time_ms())
{
self.clear_stale_owner(
cache_key,
&owner.owner_namespace,
owner.owner_node_id.as_deref(),
"owner_record_expired",
)
.await;
return None;
}
let Some(api_key) = self.api_key.as_deref() else {
warn!(
"Remote cached response lookup skipped: cache_key={} owner_namespace={} reason=missing_api_key",
cache_key, owner.owner_namespace
);
return None;
};
if owner.owner_namespace == self.owner_namespace {
let Some(owner_node_id) = owner.owner_node_id.as_deref() else {
warn!(
"Remote cached response lookup skipped: cache_key={} owner_namespace={} reason=missing_owner_node_id",
cache_key, owner.owner_namespace
);
return None;
};
if self.owner_node_id.as_deref() == Some(owner_node_id) {
self.clear_stale_owner(
cache_key,
&owner.owner_namespace,
Some(owner_node_id),
"local_owner_cache_miss",
)
.await;
return None;
}
let node_active = self
.profile_store
.list_active_nodes(Duration::from_secs(ACTIVE_NODE_HEARTBEAT_TTL_SECS))
.into_iter()
.any(|candidate| candidate.id == owner_node_id);
if !node_active {
self.clear_stale_owner(
cache_key,
&owner.owner_namespace,
Some(owner_node_id),
"same_namespace_owner_not_active",
)
.await;
return None;
}
}
let encoded_cache_key: String =
url::form_urlencoded::byte_serialize(cache_key.as_bytes()).collect();
let remote_endpoints = self.resolve_endpoints(&owner, &encoded_cache_key).await;
if remote_endpoints.is_empty() {
let reason = if owner.owner_namespace == self.owner_namespace {
"missing_same_namespace_endpoint"
} else {
"missing_foreign_namespace_endpoint_mapping"
};
warn!(
"Remote cached response lookup skipped: cache_key={} owner_namespace={} owner_node_id={:?} reason={}",
cache_key, owner.owner_namespace, owner.owner_node_id, reason
);
return None;
}
for (endpoint, authoritative_not_found) in remote_endpoints {
let response = match self
.http_client
.get(&endpoint)
.header("x-api-key", api_key)
.timeout(Duration::from_secs(REMOTE_CACHE_LOOKUP_TIMEOUT_SECS))
.send()
.await
{
Ok(response) => response,
Err(err) => {
warn!(
"Remote cached response lookup failed: cache_key={} owner_namespace={} endpoint={} reason=request_failed error={:?}",
cache_key, owner.owner_namespace, endpoint, err
);
continue;
}
};
if response.status() == reqwest::StatusCode::NOT_FOUND {
if authoritative_not_found {
self.clear_stale_owner(
cache_key,
&owner.owner_namespace,
owner.owner_node_id.as_deref(),
"remote_cache_not_found",
)
.await;
}
continue;
}
if !response.status().is_success() {
warn!(
"Remote cached response lookup returned non-success: cache_key={} owner_namespace={} endpoint={} reason=http_status status={}",
cache_key,
owner.owner_namespace,
endpoint,
response.status()
);
continue;
}
match response.json::<RemoteCachedResponsePayload>().await {
Ok(payload) => {
if self.response_matches_key(cache_key, &payload.response) {
return Some(payload.response);
}
}
Err(err) => {
warn!(
"Remote cached response lookup failed: cache_key={} owner_namespace={} endpoint={} reason=decode_failed error={:?}",
cache_key, owner.owner_namespace, endpoint, err
);
}
}
}
None
}
}
impl HttpResponseCacheRemoteClient {
async fn resolve_endpoints(
&self,
owner: &crate::engine::api::profile_store::StoredResponseCacheOwnerRecord,
encoded_cache_key: &str,
) -> Vec<(String, bool)> {
let mut endpoints: Vec<(String, bool)> = Vec::new();
let mut push = |endpoint: String, authoritative_not_found: bool| {
if let Some((_, existing)) = endpoints.iter_mut().find(|(e, _)| *e == endpoint) {
*existing |= authoritative_not_found;
} else {
endpoints.push((endpoint, authoritative_not_found));
}
};
if owner.owner_namespace == self.owner_namespace {
let Some(owner_node_id) = owner.owner_node_id.as_deref() else {
return endpoints;
};
let Some(node) = self
.profile_store
.list_active_nodes(Duration::from_secs(ACTIVE_NODE_HEARTBEAT_TTL_SECS))
.into_iter()
.find(|candidate| candidate.id == owner_node_id)
else {
return endpoints;
};
let Some(api_port) = node.api_port else {
return endpoints;
};
push(
format!(
"http://{}:{}/debug/cache/response/{}",
node.ip, api_port, encoded_cache_key
),
true,
);
} else {
if let Some(base_url) = owner.owner_api_base_url.as_deref() {
push(
format!(
"{}/debug/cache/response/{}",
base_url.trim_end_matches('/'),
encoded_cache_key
),
false,
);
}
if let Some(base_url) = self.federation_endpoints.get(&owner.owner_namespace) {
push(
format!(
"{}/debug/cache/response/{}",
base_url.trim_end_matches('/'),
encoded_cache_key
),
true,
);
}
}
endpoints
}
fn response_matches_key(&self, cache_key: &str, response: &Response) -> bool {
use crate::common::response_cache::RESPONSE_CACHE_LOOKUP_KEY;
if let Some(response_hash) = response.request_hash.as_deref()
&& response_hash != cache_key
{
warn!(
"Cached response returned mismatched request_hash: cache_key={} response_request_hash={}",
cache_key, response_hash
);
return false;
}
if let Some(lookup_key) = response
.metadata
.get_trait_config::<String>(RESPONSE_CACHE_LOOKUP_KEY)
.as_deref()
&& lookup_key != cache_key
{
warn!(
"Cached response returned mismatched lookup_key: cache_key={} response_lookup_key={}",
cache_key, lookup_key
);
return false;
}
true
}
async fn clear_stale_owner(
&self,
cache_key: &str,
owner_namespace: &str,
owner_node_id: Option<&str>,
reason: &str,
) {
if let Err(err) = self
.profile_store
.remove_response_cache_owner_if_matches(cache_key, owner_namespace, owner_node_id)
.await
{
warn!(
"Failed to clear stale response cache owner: cache_key={} owner_namespace={} owner_node_id={:?} reason={} error={:?}",
cache_key, owner_namespace, owner_node_id, reason, err
);
}
}
}