1use noether_core::stage::{Stage, StageId, StageLifecycle};
24use noether_store::{MemoryStore, StageStore, StoreError, StoreStats};
25use serde_json::Value;
26use std::collections::BTreeMap;
27
28fn extract_result(resp: reqwest::blocking::Response) -> Result<Value, StoreError> {
31 let status = resp.status();
32 let body: Value = resp.json().map_err(|e| StoreError::IoError {
33 message: format!("failed to parse registry response: {e}"),
34 })?;
35
36 if body["ok"].as_bool().unwrap_or(false) {
37 return Ok(body["result"].clone());
38 }
39
40 let code = body["error"]["code"].as_str().unwrap_or("UNKNOWN");
41 let msg = body["error"]["message"].as_str().unwrap_or("unknown error");
42
43 if status == 404 || code == "NOT_FOUND" {
44 Err(StoreError::IoError {
45 message: format!("NOT_FOUND: {msg}"),
46 })
47 } else {
48 Err(StoreError::IoError {
49 message: format!("{code}: {msg}"),
50 })
51 }
52}
53
54pub struct RemoteStageStore {
61 client: reqwest::blocking::Client,
62 base_url: String,
63 api_key: Option<String>,
64 cache: MemoryStore,
65}
66
67impl RemoteStageStore {
68 pub fn connect(base_url: &str, api_key: Option<&str>) -> Result<Self, StoreError> {
73 let mut headers = reqwest::header::HeaderMap::new();
78 headers.insert(
79 "X-Noether-Client-Version",
80 reqwest::header::HeaderValue::from_static(env!("CARGO_PKG_VERSION")),
81 );
82 let client = reqwest::blocking::Client::builder()
83 .timeout(std::time::Duration::from_secs(30))
84 .user_agent(concat!("noether-cli/", env!("CARGO_PKG_VERSION")))
85 .default_headers(headers)
86 .build()
87 .map_err(|e| StoreError::IoError {
88 message: e.to_string(),
89 })?;
90
91 let mut store = Self {
92 client,
93 base_url: base_url.trim_end_matches('/').to_string(),
94 api_key: api_key.map(String::from),
95 cache: MemoryStore::new(),
96 };
97 store.refresh()?;
98 Ok(store)
99 }
100
101 pub fn from_env() -> Option<Result<Self, StoreError>> {
105 let url = std::env::var("NOETHER_REGISTRY").ok()?;
106 let key = std::env::var("NOETHER_API_KEY").ok();
107 Some(Self::connect(&url, key.as_deref()))
108 }
109
110 pub fn refresh(&mut self) -> Result<usize, StoreError> {
116 const PAGE: usize = 200;
117 let mut offset = 0usize;
118 let mut new_cache = MemoryStore::new();
119
120 loop {
121 let path = format!("/stages?lifecycle=active&limit={PAGE}&offset={offset}");
122 let resp = self
123 .get_req(&path)
124 .send()
125 .map_err(|e| StoreError::IoError {
126 message: format!("registry unreachable at {}: {e}", self.base_url),
127 })?;
128
129 let result = extract_result(resp)?;
130 let page: Vec<Stage> =
131 serde_json::from_value(result["stages"].clone()).map_err(|e| {
132 StoreError::IoError {
133 message: format!("malformed /stages response: {e}"),
134 }
135 })?;
136
137 let total = result["total"].as_u64().unwrap_or(0) as usize;
138 let fetched = page.len();
139 for stage in page {
140 new_cache.upsert(stage).ok();
141 }
142
143 offset += fetched;
144 if fetched == 0 || offset >= total {
145 break;
146 }
147 }
148
149 let count = new_cache.list(None).len();
150 self.cache = new_cache;
151 Ok(count)
152 }
153
154 pub fn base_url(&self) -> &str {
156 &self.base_url
157 }
158
159 pub fn get_live(&mut self, id: &StageId) -> Result<Option<&Stage>, StoreError> {
165 let resp = self
166 .get_req(&format!("/stages/{}", id.0))
167 .send()
168 .map_err(|e| StoreError::IoError {
169 message: e.to_string(),
170 })?;
171
172 match extract_result(resp) {
173 Ok(body) => {
174 let stage: Stage =
175 serde_json::from_value(body).map_err(|e| StoreError::IoError {
176 message: format!("malformed /stages/:id response: {e}"),
177 })?;
178 self.cache.upsert(stage).ok();
179 self.cache.get(id)
180 }
181 Err(StoreError::IoError { message }) if message.contains("NOT_FOUND") => Ok(None),
182 Err(e) => Err(e),
183 }
184 }
185
186 fn get_req(&self, path: &str) -> reqwest::blocking::RequestBuilder {
189 self.with_auth(self.client.get(format!("{}{path}", self.base_url)))
190 }
191
192 fn post_req(&self, path: &str) -> reqwest::blocking::RequestBuilder {
193 self.with_auth(self.client.post(format!("{}{path}", self.base_url)))
194 }
195
196 fn patch_req(&self, path: &str) -> reqwest::blocking::RequestBuilder {
197 self.with_auth(self.client.patch(format!("{}{path}", self.base_url)))
198 }
199
200 fn delete_req(&self, path: &str) -> reqwest::blocking::RequestBuilder {
201 self.with_auth(self.client.delete(format!("{}{path}", self.base_url)))
202 }
203
204 fn with_auth(&self, b: reqwest::blocking::RequestBuilder) -> reqwest::blocking::RequestBuilder {
205 match &self.api_key {
206 Some(k) => b.header("X-API-Key", k),
207 None => b,
208 }
209 }
210}
211
212impl StageStore for RemoteStageStore {
215 fn put(&mut self, stage: Stage) -> Result<StageId, StoreError> {
216 let resp =
217 self.post_req("/stages")
218 .json(&stage)
219 .send()
220 .map_err(|e| StoreError::IoError {
221 message: e.to_string(),
222 })?;
223
224 let result = match extract_result(resp) {
225 Ok(r) => r,
226 Err(e) => {
227 if e.to_string().contains("ALREADY_EXISTS") || e.to_string().contains("already") {
229 self.cache.upsert(stage.clone()).ok();
230 return Err(StoreError::AlreadyExists(stage.id));
231 }
232 return Err(e);
233 }
234 };
235
236 let id = StageId(result["id"].as_str().unwrap_or(&stage.id.0).to_string());
237 let is_new = result["is_new"].as_bool().unwrap_or(true);
238
239 self.cache.upsert(stage).ok();
241
242 if !is_new {
243 return Err(StoreError::AlreadyExists(id));
244 }
245 Ok(id)
246 }
247
248 fn upsert(&mut self, stage: Stage) -> Result<StageId, StoreError> {
249 let id = stage.id.clone();
250 match self.put(stage.clone()) {
251 Err(StoreError::AlreadyExists(_)) => {
252 self.cache.upsert(stage).ok();
253 Ok(id)
254 }
255 other => other,
256 }
257 }
258
259 fn remove(&mut self, id: &StageId) -> Result<(), StoreError> {
262 let resp = self
263 .delete_req(&format!("/stages/{}", id.0))
264 .send()
265 .map_err(|e| StoreError::IoError {
266 message: e.to_string(),
267 })?;
268
269 let status_str = extract_result(resp)
270 .err()
271 .map(|e| e.to_string())
272 .unwrap_or_default();
273 if !status_str.is_empty() && !status_str.contains("NOT_FOUND") {
274 return Err(StoreError::IoError {
275 message: status_str,
276 });
277 }
278 let _ = self.cache.remove(id);
280 Ok(())
281 }
282
283 fn get(&self, id: &StageId) -> Result<Option<&Stage>, StoreError> {
288 self.cache.get(id)
289 }
290
291 fn contains(&self, id: &StageId) -> bool {
292 self.cache.contains(id)
293 }
294
295 fn list(&self, lifecycle: Option<&StageLifecycle>) -> Vec<&Stage> {
296 self.cache.list(lifecycle)
297 }
298
299 fn update_lifecycle(
300 &mut self,
301 id: &StageId,
302 lifecycle: StageLifecycle,
303 ) -> Result<(), StoreError> {
304 let (lc_str, successor_id) = match &lifecycle {
305 StageLifecycle::Draft => ("draft", None),
306 StageLifecycle::Active => ("active", None),
307 StageLifecycle::Deprecated { successor_id } => {
308 ("deprecated", Some(successor_id.0.clone()))
309 }
310 StageLifecycle::Tombstone => ("tombstone", None),
311 };
312
313 let mut body = serde_json::json!({ "lifecycle": lc_str });
314 if let Some(succ) = successor_id {
315 body["successor_id"] = Value::String(succ);
316 }
317
318 let resp = self
319 .patch_req(&format!("/stages/{}/lifecycle", id.0))
320 .json(&body)
321 .send()
322 .map_err(|e| StoreError::IoError {
323 message: e.to_string(),
324 })?;
325
326 extract_result(resp)?;
327
328 self.cache.update_lifecycle(id, lifecycle)
330 }
331
332 fn stats(&self) -> StoreStats {
333 if let Ok(resp) = self.get_req("/health").send() {
335 if let Ok(result) = extract_result(resp) {
336 if let Some(store_json) = result.get("store") {
337 let total = store_json["total_stages"].as_u64().unwrap_or(0) as usize;
338 let by_lifecycle: BTreeMap<String, usize> = store_json["by_lifecycle"]
339 .as_object()
340 .map(|m| {
341 m.iter()
342 .filter_map(|(k, v)| Some((k.clone(), v.as_u64()? as usize)))
343 .collect()
344 })
345 .unwrap_or_default();
346 let by_effect: BTreeMap<String, usize> = store_json["by_effect"]
347 .as_object()
348 .map(|m| {
349 m.iter()
350 .filter_map(|(k, v)| Some((k.clone(), v.as_u64()? as usize)))
351 .collect()
352 })
353 .unwrap_or_default();
354 return StoreStats {
355 total,
356 by_lifecycle,
357 by_effect,
358 };
359 }
360 }
361 }
362 self.cache.stats()
363 }
364}