use crate::store::{FileRootSerde, Key, Payload, PayloadMeta, Store, StoreError};
use base64::Engine;
use std::io;
use std::time::Duration;
use verdant_wire::{
InvalidateUpstreamRequest, InvalidateUpstreamResponse, LookupRequest, LookupResponse,
PersistRequest, PersistResponse, StatsResponse,
};
#[derive(Debug, Clone)]
pub struct RemoteStoreConfig {
pub base_url: String,
pub bearer_token: String,
pub timeout: Duration,
pub auto_promote_shared: bool,
pub allow_shared_lookup: bool,
}
impl RemoteStoreConfig {
pub fn new(base_url: impl Into<String>, bearer_token: impl Into<String>) -> Self {
Self {
base_url: base_url.into(),
bearer_token: bearer_token.into(),
timeout: Duration::from_secs(30),
auto_promote_shared: false,
allow_shared_lookup: true,
}
}
}
#[derive(Debug)]
pub struct RemoteStore {
cfg: RemoteStoreConfig,
agent: ureq::Agent,
}
impl RemoteStore {
pub fn new(cfg: RemoteStoreConfig) -> Self {
let agent = ureq::AgentBuilder::new().timeout(cfg.timeout).build();
Self { cfg, agent }
}
fn url(&self, path: &str) -> String {
let base = self.cfg.base_url.trim_end_matches('/');
format!("{base}{path}")
}
fn auth_header(&self) -> String {
format!("Bearer {}", self.cfg.bearer_token)
}
fn post<R, T>(&self, path: &str, body: &R) -> Result<T, StoreError>
where
R: serde::Serialize,
T: serde::de::DeserializeOwned,
{
let outcome = self
.agent
.post(&self.url(path))
.set("Authorization", &self.auth_header())
.set("Content-Type", "application/json")
.send_json(serde_json::to_value(body).map_err(StoreError::Meta)?);
let resp = match outcome {
Ok(r) => r,
Err(ureq::Error::Status(_, r)) => r,
Err(e) => return Err(remote_err_to_store(e)),
};
let parsed: T = resp.into_json().map_err(StoreError::Io)?;
Ok(parsed)
}
pub fn get_stats(&self) -> Result<StatsResponse, StoreError> {
let resp = self
.agent
.get(&self.url("/v1/cache/stats"))
.set("Authorization", &self.auth_header())
.call()
.map_err(remote_err_to_store)?;
let parsed: StatsResponse = resp.into_json().map_err(StoreError::Io)?;
Ok(parsed)
}
}
fn remote_err_to_store(e: ureq::Error) -> StoreError {
StoreError::Io(io::Error::new(io::ErrorKind::Other, e.to_string()))
}
impl Store for RemoteStore {
fn persist_with_upstreams(
&self,
key: &Key,
bytes: &[u8],
tool_kind: &str,
file_roots: Vec<FileRootSerde>,
upstream_keys: Vec<String>,
) -> Result<(), StoreError> {
let payload_b64 = base64::engine::general_purpose::STANDARD.encode(bytes);
let req = PersistRequest {
key: key.0.clone(),
tool_kind: tool_kind.to_string(),
payload: payload_b64,
file_roots: file_roots
.into_iter()
.map(|f| verdant_wire::FileRootSpec {
path: f.path,
expected_hash: f.expected_hash,
})
.collect(),
upstream_keys,
promote_to_shared: self.cfg.auto_promote_shared,
};
let resp: PersistResponse = self.post("/v1/cache/persist", &req)?;
match resp {
PersistResponse::Stored { .. } => Ok(()),
PersistResponse::Rejected { reason } => Err(StoreError::Io(io::Error::new(
io::ErrorKind::InvalidData,
format!("server rejected persist: {reason}"),
))),
PersistResponse::QuotaExceeded {
bytes_used,
bytes_quota,
} => Err(StoreError::Io(io::Error::new(
io::ErrorKind::Other,
format!("server quota exceeded: used {bytes_used} of {bytes_quota}"),
))),
}
}
fn lookup(&self, key: &Key) -> Result<Option<Payload>, StoreError> {
let req = LookupRequest {
key: key.0.clone(),
allow_shared: self.cfg.allow_shared_lookup,
};
let resp: LookupResponse = self.post("/v1/cache/lookup", &req)?;
match resp {
LookupResponse::Hit {
payload,
tool_kind,
bytes,
file_roots,
..
} => {
let raw = base64::engine::general_purpose::STANDARD
.decode(&payload)
.map_err(|e| {
StoreError::Io(io::Error::new(
io::ErrorKind::InvalidData,
format!("base64 decode: {e}"),
))
})?;
let payload_hash = blake3::hash(&raw).to_hex().to_string();
let file_roots = file_roots
.into_iter()
.map(|f| FileRootSerde {
path: f.path,
expected_hash: f.expected_hash,
})
.collect();
let meta = PayloadMeta {
payload_hash,
bytes,
tool_kind,
file_roots,
upstream_keys: Vec::new(),
};
Ok(Some(Payload { bytes: raw, meta }))
}
LookupResponse::Miss | LookupResponse::Invalidated => Ok(None),
}
}
fn remove(&self, key: &Key) -> Result<(), StoreError> {
let req = InvalidateUpstreamRequest { key: key.0.clone() };
let _: InvalidateUpstreamResponse = self.post("/v1/cache/invalidate-upstream", &req)?;
Ok(())
}
fn total_bytes(&self) -> Result<u64, StoreError> {
let s = self.get_stats()?;
Ok(s.user_bytes_used)
}
fn evict_to_cap(&self, _cap_bytes: u64) -> Result<usize, StoreError> {
Ok(0)
}
fn iter_meta(&self) -> Result<Vec<(Key, PayloadMeta)>, StoreError> {
Ok(Vec::new())
}
fn contains(&self, key: &Key) -> bool {
matches!(self.lookup(key), Ok(Some(_)))
}
}