1use crate::store::{FileRootSerde, Key, Payload, PayloadMeta, Store, StoreError};
19use base64::Engine;
20use std::io;
21use std::time::Duration;
22use verdant_wire::{
23 InvalidateUpstreamRequest, InvalidateUpstreamResponse, LookupRequest, LookupResponse,
24 PersistRequest, PersistResponse, StatsResponse,
25};
26
27#[derive(Debug, Clone)]
28pub struct RemoteStoreConfig {
29 pub base_url: String,
30 pub bearer_token: String,
31 pub timeout: Duration,
32 pub auto_promote_shared: bool,
38 pub allow_shared_lookup: bool,
40}
41
42impl RemoteStoreConfig {
43 pub fn new(base_url: impl Into<String>, bearer_token: impl Into<String>) -> Self {
44 Self {
45 base_url: base_url.into(),
46 bearer_token: bearer_token.into(),
47 timeout: Duration::from_secs(30),
48 auto_promote_shared: false,
49 allow_shared_lookup: true,
50 }
51 }
52}
53
54#[derive(Debug)]
55pub struct RemoteStore {
56 cfg: RemoteStoreConfig,
57 agent: ureq::Agent,
58}
59
60impl RemoteStore {
61 pub fn new(cfg: RemoteStoreConfig) -> Self {
62 let agent = ureq::AgentBuilder::new().timeout(cfg.timeout).build();
63 Self { cfg, agent }
64 }
65
66 fn url(&self, path: &str) -> String {
67 let base = self.cfg.base_url.trim_end_matches('/');
68 format!("{base}{path}")
69 }
70
71 fn auth_header(&self) -> String {
72 format!("Bearer {}", self.cfg.bearer_token)
73 }
74
75 fn post<R, T>(&self, path: &str, body: &R) -> Result<T, StoreError>
76 where
77 R: serde::Serialize,
78 T: serde::de::DeserializeOwned,
79 {
80 let outcome = self
81 .agent
82 .post(&self.url(path))
83 .set("Authorization", &self.auth_header())
84 .set("Content-Type", "application/json")
85 .send_json(serde_json::to_value(body).map_err(StoreError::Meta)?);
86 let resp = match outcome {
87 Ok(r) => r,
88 Err(ureq::Error::Status(_, r)) => r,
94 Err(e) => return Err(remote_err_to_store(e)),
95 };
96 let parsed: T = resp.into_json().map_err(StoreError::Io)?;
97 Ok(parsed)
98 }
99
100 pub fn get_stats(&self) -> Result<StatsResponse, StoreError> {
101 let resp = self
102 .agent
103 .get(&self.url("/v1/cache/stats"))
104 .set("Authorization", &self.auth_header())
105 .call()
106 .map_err(remote_err_to_store)?;
107 let parsed: StatsResponse = resp.into_json().map_err(StoreError::Io)?;
108 Ok(parsed)
109 }
110}
111
112fn remote_err_to_store(e: ureq::Error) -> StoreError {
113 StoreError::Io(io::Error::other(e.to_string()))
114}
115
116impl Store for RemoteStore {
117 fn persist_with_upstreams(
118 &self,
119 key: &Key,
120 bytes: &[u8],
121 tool_kind: &str,
122 file_roots: Vec<FileRootSerde>,
123 upstream_keys: Vec<String>,
124 ) -> Result<(), StoreError> {
125 let payload_b64 = base64::engine::general_purpose::STANDARD.encode(bytes);
126 let req = PersistRequest {
127 key: key.0.clone(),
128 tool_kind: tool_kind.to_string(),
129 payload: payload_b64,
130 file_roots: file_roots
131 .into_iter()
132 .map(|f| verdant_wire::FileRootSpec {
133 path: f.path,
134 expected_hash: f.expected_hash,
135 })
136 .collect(),
137 upstream_keys,
138 promote_to_shared: self.cfg.auto_promote_shared,
139 };
140 let resp: PersistResponse = self.post("/v1/cache/persist", &req)?;
141 match resp {
142 PersistResponse::Stored { .. } => Ok(()),
143 PersistResponse::Rejected { reason } => Err(StoreError::Io(io::Error::new(
144 io::ErrorKind::InvalidData,
145 format!("server rejected persist: {reason}"),
146 ))),
147 PersistResponse::QuotaExceeded {
148 bytes_used,
149 bytes_quota,
150 } => Err(StoreError::Io(io::Error::other(format!(
151 "server quota exceeded: used {bytes_used} of {bytes_quota}"
152 )))),
153 }
154 }
155
156 fn lookup(&self, key: &Key) -> Result<Option<Payload>, StoreError> {
157 let req = LookupRequest {
158 key: key.0.clone(),
159 allow_shared: self.cfg.allow_shared_lookup,
160 };
161 let resp: LookupResponse = self.post("/v1/cache/lookup", &req)?;
162 match resp {
163 LookupResponse::Hit {
164 payload,
165 tool_kind,
166 bytes,
167 file_roots,
168 upstream_keys,
169 ..
170 } => {
171 let raw = base64::engine::general_purpose::STANDARD
172 .decode(&payload)
173 .map_err(|e| {
174 StoreError::Io(io::Error::new(
175 io::ErrorKind::InvalidData,
176 format!("base64 decode: {e}"),
177 ))
178 })?;
179 let payload_hash = blake3::hash(&raw).to_hex().to_string();
180 let file_roots = file_roots
181 .into_iter()
182 .map(|f| FileRootSerde {
183 path: f.path,
184 expected_hash: f.expected_hash,
185 })
186 .collect();
187 let meta = PayloadMeta {
188 payload_hash,
189 bytes,
190 tool_kind,
191 file_roots,
192 upstream_keys,
193 };
194 Ok(Some(Payload { bytes: raw, meta }))
195 }
196 LookupResponse::Miss | LookupResponse::Invalidated => Ok(None),
197 }
198 }
199
200 fn remove(&self, key: &Key) -> Result<(), StoreError> {
201 let req = InvalidateUpstreamRequest { key: key.0.clone() };
206 let _: InvalidateUpstreamResponse = self.post("/v1/cache/invalidate-upstream", &req)?;
207 Ok(())
208 }
209
210 fn total_bytes(&self) -> Result<u64, StoreError> {
211 let s = self.get_stats()?;
212 Ok(s.user_bytes_used)
213 }
214
215 fn evict_to_cap(&self, _cap_bytes: u64) -> Result<usize, StoreError> {
216 Ok(0)
223 }
224
225 fn iter_meta(&self) -> Result<Vec<(Key, PayloadMeta)>, StoreError> {
226 Ok(Vec::new())
233 }
234
235 fn contains(&self, key: &Key) -> bool {
236 matches!(self.lookup(key), Ok(Some(_)))
237 }
238}