Skip to main content

verdant_runtime/
remote.rs

1//! `RemoteStore` — `Store` impl that talks to a `verdant-server`
2//! instance over HTTP. Wired so `LiveCache::new(RemoteStore::new(...))`
3//! Just Works at every existing call site (verdant-mcp,
4//! verdant-proxy), letting a binary swap a local on-disk cache for a
5//! shared multi-user cache by configuration alone.
6//!
7//! Trust model: the server revalidates file roots server-side on
8//! every read (see verdant-server's `storage.rs::revalidate_roots`),
9//! and the wire `LookupResponse::Hit` now carries the recorded
10//! `file_roots` so the client can independently revalidate against
11//! its own checkout. This catches the cross-machine drift case where
12//! Bob's tree differs from Alice's: the server's filesystem says the
13//! entry is fresh, but Bob's local copy has diverged, so client-side
14//! revalidation through `LiveCache::lookup_revalidate` must
15//! invalidate. The server's check and the client's check are both
16//! required for a defensible trust model.
17
18use crate::store::{FileRootSerde, Key, Payload, PayloadMeta, Store, StoreError};
19use base64::Engine;
20use std::io;
21use std::time::Duration;
22use verdant_wire::{
23    InvalidateUpstreamRequest, InvalidateUpstreamResponse, LookupRequest, LookupResponse,
24    PersistRequest, PersistResponse, StatsResponse,
25};
26
27#[derive(Debug, Clone)]
28pub struct RemoteStoreConfig {
29    pub base_url: String,
30    pub bearer_token: String,
31    pub timeout: Duration,
32    /// When true every persist sends `promote_to_shared: true`. The
33    /// server still gates promotion on its own root revalidation;
34    /// this just opts the client in to requesting it. Defaults to
35    /// false so a misconfigured client cannot accidentally publish
36    /// per-user bytes into `_shared`.
37    pub auto_promote_shared: bool,
38    /// Mirrors `LookupRequest::allow_shared`. Default true.
39    pub allow_shared_lookup: bool,
40}
41
42impl RemoteStoreConfig {
43    pub fn new(base_url: impl Into<String>, bearer_token: impl Into<String>) -> Self {
44        Self {
45            base_url: base_url.into(),
46            bearer_token: bearer_token.into(),
47            timeout: Duration::from_secs(30),
48            auto_promote_shared: false,
49            allow_shared_lookup: true,
50        }
51    }
52}
53
54#[derive(Debug)]
55pub struct RemoteStore {
56    cfg: RemoteStoreConfig,
57    agent: ureq::Agent,
58}
59
60impl RemoteStore {
61    pub fn new(cfg: RemoteStoreConfig) -> Self {
62        let agent = ureq::AgentBuilder::new().timeout(cfg.timeout).build();
63        Self { cfg, agent }
64    }
65
66    fn url(&self, path: &str) -> String {
67        let base = self.cfg.base_url.trim_end_matches('/');
68        format!("{base}{path}")
69    }
70
71    fn auth_header(&self) -> String {
72        format!("Bearer {}", self.cfg.bearer_token)
73    }
74
75    fn post<R, T>(&self, path: &str, body: &R) -> Result<T, StoreError>
76    where
77        R: serde::Serialize,
78        T: serde::de::DeserializeOwned,
79    {
80        let outcome = self
81            .agent
82            .post(&self.url(path))
83            .set("Authorization", &self.auth_header())
84            .set("Content-Type", "application/json")
85            .send_json(serde_json::to_value(body).map_err(StoreError::Meta)?);
86        let resp = match outcome {
87            Ok(r) => r,
88            // 4xx and 5xx are surfaced by ureq as Status errors but the
89            // server body still carries the structured wire response
90            // (e.g. PersistResponse::QuotaExceeded on 429). Parse it
91            // anyway so callers see the typed variant instead of a
92            // raw HTTP code.
93            Err(ureq::Error::Status(_, r)) => r,
94            Err(e) => return Err(remote_err_to_store(e)),
95        };
96        let parsed: T = resp.into_json().map_err(StoreError::Io)?;
97        Ok(parsed)
98    }
99
100    pub fn get_stats(&self) -> Result<StatsResponse, StoreError> {
101        let resp = self
102            .agent
103            .get(&self.url("/v1/cache/stats"))
104            .set("Authorization", &self.auth_header())
105            .call()
106            .map_err(remote_err_to_store)?;
107        let parsed: StatsResponse = resp.into_json().map_err(StoreError::Io)?;
108        Ok(parsed)
109    }
110}
111
112fn remote_err_to_store(e: ureq::Error) -> StoreError {
113    StoreError::Io(io::Error::other(e.to_string()))
114}
115
116impl Store for RemoteStore {
117    fn persist_with_upstreams(
118        &self,
119        key: &Key,
120        bytes: &[u8],
121        tool_kind: &str,
122        file_roots: Vec<FileRootSerde>,
123        upstream_keys: Vec<String>,
124    ) -> Result<(), StoreError> {
125        let payload_b64 = base64::engine::general_purpose::STANDARD.encode(bytes);
126        let req = PersistRequest {
127            key: key.0.clone(),
128            tool_kind: tool_kind.to_string(),
129            payload: payload_b64,
130            file_roots: file_roots
131                .into_iter()
132                .map(|f| verdant_wire::FileRootSpec {
133                    path: f.path,
134                    expected_hash: f.expected_hash,
135                })
136                .collect(),
137            upstream_keys,
138            promote_to_shared: self.cfg.auto_promote_shared,
139        };
140        let resp: PersistResponse = self.post("/v1/cache/persist", &req)?;
141        match resp {
142            PersistResponse::Stored { .. } => Ok(()),
143            PersistResponse::Rejected { reason } => Err(StoreError::Io(io::Error::new(
144                io::ErrorKind::InvalidData,
145                format!("server rejected persist: {reason}"),
146            ))),
147            PersistResponse::QuotaExceeded {
148                bytes_used,
149                bytes_quota,
150            } => Err(StoreError::Io(io::Error::other(format!(
151                "server quota exceeded: used {bytes_used} of {bytes_quota}"
152            )))),
153        }
154    }
155
156    fn lookup(&self, key: &Key) -> Result<Option<Payload>, StoreError> {
157        let req = LookupRequest {
158            key: key.0.clone(),
159            allow_shared: self.cfg.allow_shared_lookup,
160        };
161        let resp: LookupResponse = self.post("/v1/cache/lookup", &req)?;
162        match resp {
163            LookupResponse::Hit {
164                payload,
165                tool_kind,
166                bytes,
167                file_roots,
168                upstream_keys,
169                ..
170            } => {
171                let raw = base64::engine::general_purpose::STANDARD
172                    .decode(&payload)
173                    .map_err(|e| {
174                        StoreError::Io(io::Error::new(
175                            io::ErrorKind::InvalidData,
176                            format!("base64 decode: {e}"),
177                        ))
178                    })?;
179                let payload_hash = blake3::hash(&raw).to_hex().to_string();
180                let file_roots = file_roots
181                    .into_iter()
182                    .map(|f| FileRootSerde {
183                        path: f.path,
184                        expected_hash: f.expected_hash,
185                    })
186                    .collect();
187                let meta = PayloadMeta {
188                    payload_hash,
189                    bytes,
190                    tool_kind,
191                    file_roots,
192                    upstream_keys,
193                };
194                Ok(Some(Payload { bytes: raw, meta }))
195            }
196            LookupResponse::Miss | LookupResponse::Invalidated => Ok(None),
197        }
198    }
199
200    fn remove(&self, key: &Key) -> Result<(), StoreError> {
201        // Server has no explicit remove. The closest semantic is the
202        // upstream-invalidation hook, which drops the key plus
203        // anything that declared it as an upstream — matching the
204        // local `Store::remove` contract used by `LiveCache::mark_dirty`.
205        let req = InvalidateUpstreamRequest { key: key.0.clone() };
206        let _: InvalidateUpstreamResponse = self.post("/v1/cache/invalidate-upstream", &req)?;
207        Ok(())
208    }
209
210    fn total_bytes(&self) -> Result<u64, StoreError> {
211        let s = self.get_stats()?;
212        Ok(s.user_bytes_used)
213    }
214
215    fn evict_to_cap(&self, _cap_bytes: u64) -> Result<usize, StoreError> {
216        // Eviction is server-side policy in the multi-user model;
217        // the client can no longer make local decisions about which
218        // entries to drop because the cache is shared. Return 0 to
219        // signal "nothing dropped here" without erroring (callers
220        // that periodically call this in the local-cache code path
221        // should not start failing when swapped to RemoteStore).
222        Ok(0)
223    }
224
225    fn iter_meta(&self) -> Result<Vec<(Key, PayloadMeta)>, StoreError> {
226        // No bulk iteration over the wire in M4 step 7. LiveCache
227        // rehydration relies on this to rebuild the local registry,
228        // but with a remote store the per-lookup path is the source
229        // of truth and a registry rebuild is not needed: every
230        // `lookup` round-trips to the server. Returning empty is
231        // safe and correct.
232        Ok(Vec::new())
233    }
234
235    fn contains(&self, key: &Key) -> bool {
236        matches!(self.lookup(key), Ok(Some(_)))
237    }
238}