Skip to main content

noether_engine/
registry_client.rs

1//! HTTP client for a hosted `noether-registry` (noether-cloud).
2//!
3//! `RemoteStageStore` implements the sync [`StageStore`] trait by calling the
4//! registry REST API with `reqwest::blocking`.  Because `StageStore::get` and
5//! `list` must return borrowed references, all active stages are prefetched
6//! into a local [`MemoryStore`] at construction time.  Writes go to both the
7//! remote registry AND the local cache so the borrows remain valid.
8//!
9//! # Usage
10//!
11//! ```no_run
12//! use noether_engine::registry_client::RemoteStageStore;
13//!
14//! // Explicit URL:
15//! let store = RemoteStageStore::connect("http://localhost:3000", None).unwrap();
16//!
17//! // From environment (NOETHER_REGISTRY + optional NOETHER_API_KEY):
18//! if let Some(Ok(store)) = RemoteStageStore::from_env() {
19//!     println!("Connected to remote registry");
20//! }
21//! ```
22
23use noether_core::stage::{Stage, StageId, StageLifecycle};
24use noether_store::{MemoryStore, StageStore, StoreError, StoreStats};
25use serde_json::Value;
26use std::collections::BTreeMap;
27
28// ── ACLI response parsing ───────────────────────────────────────────────────
29
30fn extract_result(resp: reqwest::blocking::Response) -> Result<Value, StoreError> {
31    let status = resp.status();
32    let body: Value = resp.json().map_err(|e| StoreError::IoError {
33        message: format!("failed to parse registry response: {e}"),
34    })?;
35
36    if body["ok"].as_bool().unwrap_or(false) {
37        return Ok(body["result"].clone());
38    }
39
40    let code = body["error"]["code"].as_str().unwrap_or("UNKNOWN");
41    let msg = body["error"]["message"].as_str().unwrap_or("unknown error");
42
43    if status == 404 || code == "NOT_FOUND" {
44        Err(StoreError::IoError {
45            message: format!("NOT_FOUND: {msg}"),
46        })
47    } else {
48        Err(StoreError::IoError {
49            message: format!("{code}: {msg}"),
50        })
51    }
52}
53
54// ── RemoteStageStore ────────────────────────────────────────────────────────
55
56/// A `StageStore` backed by a remote `noether-registry` over HTTP.
57///
58/// All stages are prefetched into a local `MemoryStore` on construction,
59/// making subsequent reads (get, list) instant and allocation-free.
60pub struct RemoteStageStore {
61    client: reqwest::blocking::Client,
62    base_url: String,
63    api_key: Option<String>,
64    cache: MemoryStore,
65}
66
67impl RemoteStageStore {
68    /// Connect to a registry at `base_url` and prefetch all active stages.
69    ///
70    /// `api_key` is sent as `X-API-Key` header; pass `None` if the registry
71    /// runs without auth (local dev with `NOETHER_API_KEY=""`).
72    pub fn connect(base_url: &str, api_key: Option<&str>) -> Result<Self, StoreError> {
73        // Send the client version as both a User-Agent (standard) and an
74        // explicit X-Noether-Client-Version header (easier for the server
75        // to surface in logs / metrics / rate-limit decisions without
76        // string-parsing UA).
77        let mut headers = reqwest::header::HeaderMap::new();
78        headers.insert(
79            "X-Noether-Client-Version",
80            reqwest::header::HeaderValue::from_static(env!("CARGO_PKG_VERSION")),
81        );
82        let client = reqwest::blocking::Client::builder()
83            .timeout(std::time::Duration::from_secs(30))
84            .user_agent(concat!("noether-cli/", env!("CARGO_PKG_VERSION")))
85            .default_headers(headers)
86            .build()
87            .map_err(|e| StoreError::IoError {
88                message: e.to_string(),
89            })?;
90
91        let mut store = Self {
92            client,
93            base_url: base_url.trim_end_matches('/').to_string(),
94            api_key: api_key.map(String::from),
95            cache: MemoryStore::new(),
96        };
97        store.refresh()?;
98        Ok(store)
99    }
100
101    /// Build a store from the `NOETHER_REGISTRY` environment variable.
102    /// Also reads `NOETHER_API_KEY` if set.
103    /// Returns `None` if `NOETHER_REGISTRY` is not set.
104    pub fn from_env() -> Option<Result<Self, StoreError>> {
105        let url = std::env::var("NOETHER_REGISTRY").ok()?;
106        let key = std::env::var("NOETHER_API_KEY").ok();
107        Some(Self::connect(&url, key.as_deref()))
108    }
109
110    /// Re-fetch all stages from the registry using offset pagination and
111    /// rebuild the local cache. Each page is 200 stages; iteration stops when
112    /// the server returns an empty page or `offset >= total`.
113    ///
114    /// Call this if you know the registry was mutated externally.
115    pub fn refresh(&mut self) -> Result<usize, StoreError> {
116        const PAGE: usize = 200;
117        let mut offset = 0usize;
118        let mut new_cache = MemoryStore::new();
119
120        loop {
121            let path = format!("/stages?lifecycle=active&limit={PAGE}&offset={offset}");
122            let resp = self
123                .get_req(&path)
124                .send()
125                .map_err(|e| StoreError::IoError {
126                    message: format!("registry unreachable at {}: {e}", self.base_url),
127                })?;
128
129            let result = extract_result(resp)?;
130            let page: Vec<Stage> =
131                serde_json::from_value(result["stages"].clone()).map_err(|e| {
132                    StoreError::IoError {
133                        message: format!("malformed /stages response: {e}"),
134                    }
135                })?;
136
137            let total = result["total"].as_u64().unwrap_or(0) as usize;
138            let fetched = page.len();
139            for stage in page {
140                new_cache.upsert(stage).ok();
141            }
142
143            offset += fetched;
144            if fetched == 0 || offset >= total {
145                break;
146            }
147        }
148
149        let count = new_cache.list(None).len();
150        self.cache = new_cache;
151        Ok(count)
152    }
153
154    /// The base URL this store is connected to.
155    pub fn base_url(&self) -> &str {
156        &self.base_url
157    }
158
159    /// Fetch a single stage directly from the registry, bypassing the cache.
160    /// On success the stage is inserted into the local cache so subsequent
161    /// `get()` calls will find it without another HTTP round-trip.
162    ///
163    /// Returns `Ok(None)` when the server returns 404.
164    pub fn get_live(&mut self, id: &StageId) -> Result<Option<&Stage>, StoreError> {
165        let resp = self
166            .get_req(&format!("/stages/{}", id.0))
167            .send()
168            .map_err(|e| StoreError::IoError {
169                message: e.to_string(),
170            })?;
171
172        match extract_result(resp) {
173            Ok(body) => {
174                let stage: Stage =
175                    serde_json::from_value(body).map_err(|e| StoreError::IoError {
176                        message: format!("malformed /stages/:id response: {e}"),
177                    })?;
178                self.cache.upsert(stage).ok();
179                self.cache.get(id)
180            }
181            Err(StoreError::IoError { message }) if message.contains("NOT_FOUND") => Ok(None),
182            Err(e) => Err(e),
183        }
184    }
185
186    // ── internal request helpers ────────────────────────────────────────────
187
188    fn get_req(&self, path: &str) -> reqwest::blocking::RequestBuilder {
189        self.with_auth(self.client.get(format!("{}{path}", self.base_url)))
190    }
191
192    fn post_req(&self, path: &str) -> reqwest::blocking::RequestBuilder {
193        self.with_auth(self.client.post(format!("{}{path}", self.base_url)))
194    }
195
196    fn patch_req(&self, path: &str) -> reqwest::blocking::RequestBuilder {
197        self.with_auth(self.client.patch(format!("{}{path}", self.base_url)))
198    }
199
200    fn delete_req(&self, path: &str) -> reqwest::blocking::RequestBuilder {
201        self.with_auth(self.client.delete(format!("{}{path}", self.base_url)))
202    }
203
204    fn with_auth(&self, b: reqwest::blocking::RequestBuilder) -> reqwest::blocking::RequestBuilder {
205        match &self.api_key {
206            Some(k) => b.header("X-API-Key", k),
207            None => b,
208        }
209    }
210}
211
212// ── StageStore impl ─────────────────────────────────────────────────────────
213
214impl StageStore for RemoteStageStore {
215    fn put(&mut self, stage: Stage) -> Result<StageId, StoreError> {
216        let resp =
217            self.post_req("/stages")
218                .json(&stage)
219                .send()
220                .map_err(|e| StoreError::IoError {
221                    message: e.to_string(),
222                })?;
223
224        let result = match extract_result(resp) {
225            Ok(r) => r,
226            Err(e) => {
227                // Registry returns VALIDATION_FAILED if it detects AlreadyExists
228                if e.to_string().contains("ALREADY_EXISTS") || e.to_string().contains("already") {
229                    self.cache.upsert(stage.clone()).ok();
230                    return Err(StoreError::AlreadyExists(stage.id));
231                }
232                return Err(e);
233            }
234        };
235
236        let id = StageId(result["id"].as_str().unwrap_or(&stage.id.0).to_string());
237        let is_new = result["is_new"].as_bool().unwrap_or(true);
238
239        // Always cache, even if not new.
240        self.cache.upsert(stage).ok();
241
242        if !is_new {
243            return Err(StoreError::AlreadyExists(id));
244        }
245        Ok(id)
246    }
247
248    fn upsert(&mut self, stage: Stage) -> Result<StageId, StoreError> {
249        let id = stage.id.clone();
250        match self.put(stage.clone()) {
251            Err(StoreError::AlreadyExists(_)) => {
252                self.cache.upsert(stage).ok();
253                Ok(id)
254            }
255            other => other,
256        }
257    }
258
259    /// Removes the stage from the remote registry (DELETE /stages/:id) and
260    /// then from the local cache.
261    fn remove(&mut self, id: &StageId) -> Result<(), StoreError> {
262        let resp = self
263            .delete_req(&format!("/stages/{}", id.0))
264            .send()
265            .map_err(|e| StoreError::IoError {
266                message: e.to_string(),
267            })?;
268
269        let status_str = extract_result(resp)
270            .err()
271            .map(|e| e.to_string())
272            .unwrap_or_default();
273        if !status_str.is_empty() && !status_str.contains("NOT_FOUND") {
274            return Err(StoreError::IoError {
275                message: status_str,
276            });
277        }
278        // Best-effort cache eviction (stage may already be absent from cache).
279        let _ = self.cache.remove(id);
280        Ok(())
281    }
282
283    /// Returns the stage from the local cache.
284    ///
285    /// For a guaranteed-fresh read use [`RemoteStageStore::get_live`] (which
286    /// takes `&mut self` so it can update the cache).
287    fn get(&self, id: &StageId) -> Result<Option<&Stage>, StoreError> {
288        self.cache.get(id)
289    }
290
291    fn contains(&self, id: &StageId) -> bool {
292        self.cache.contains(id)
293    }
294
295    fn list(&self, lifecycle: Option<&StageLifecycle>) -> Vec<&Stage> {
296        self.cache.list(lifecycle)
297    }
298
299    fn update_lifecycle(
300        &mut self,
301        id: &StageId,
302        lifecycle: StageLifecycle,
303    ) -> Result<(), StoreError> {
304        let (lc_str, successor_id) = match &lifecycle {
305            StageLifecycle::Draft => ("draft", None),
306            StageLifecycle::Active => ("active", None),
307            StageLifecycle::Deprecated { successor_id } => {
308                ("deprecated", Some(successor_id.0.clone()))
309            }
310            StageLifecycle::Tombstone => ("tombstone", None),
311        };
312
313        let mut body = serde_json::json!({ "lifecycle": lc_str });
314        if let Some(succ) = successor_id {
315            body["successor_id"] = Value::String(succ);
316        }
317
318        let resp = self
319            .patch_req(&format!("/stages/{}/lifecycle", id.0))
320            .json(&body)
321            .send()
322            .map_err(|e| StoreError::IoError {
323                message: e.to_string(),
324            })?;
325
326        extract_result(resp)?;
327
328        // Mirror in local cache so subsequent list()/get() reflect the change.
329        self.cache.update_lifecycle(id, lifecycle)
330    }
331
332    fn stats(&self) -> StoreStats {
333        // Fetch live stats from /health; fall back to cache if unreachable.
334        if let Ok(resp) = self.get_req("/health").send() {
335            if let Ok(result) = extract_result(resp) {
336                if let Some(store_json) = result.get("store") {
337                    let total = store_json["total_stages"].as_u64().unwrap_or(0) as usize;
338                    let by_lifecycle: BTreeMap<String, usize> = store_json["by_lifecycle"]
339                        .as_object()
340                        .map(|m| {
341                            m.iter()
342                                .filter_map(|(k, v)| Some((k.clone(), v.as_u64()? as usize)))
343                                .collect()
344                        })
345                        .unwrap_or_default();
346                    let by_effect: BTreeMap<String, usize> = store_json["by_effect"]
347                        .as_object()
348                        .map(|m| {
349                            m.iter()
350                                .filter_map(|(k, v)| Some((k.clone(), v.as_u64()? as usize)))
351                                .collect()
352                        })
353                        .unwrap_or_default();
354                    return StoreStats {
355                        total,
356                        by_lifecycle,
357                        by_effect,
358                    };
359                }
360            }
361        }
362        self.cache.stats()
363    }
364}