Skip to main content

vectorizer_sdk/client/
admin.rs

1//! Admin / observability surface.
2//!
3//! Covers statistics, status, logs, indexing progress, collection
4//! maintenance (force-save, empty-collection cleanup), server config,
5//! backup management, admin restart, and workspace management.
6//!
7//! All methods call `self.make_request` via the shared dispatcher in
8//! [`super`] so the transport abstraction is preserved.
9
10use super::VectorizerClient;
11use crate::error::{Result, VectorizerError};
12use crate::models::{
13    AddWorkspaceRequest, BackupInfo, CleanupReport, ConfigPatch, ConfigSnapshot,
14    CreateBackupRequest, IndexingProgress, LogEntry, LogsQuery, RestoreBackupRequest,
15    RuntimeMetrics, ServerStatus, SlowQueryConfig, SlowQueryEntry, Stats, WorkspaceConfig,
16};
17
18impl VectorizerClient {
19    /// Aggregate collection + vector counts.
20    ///
21    /// Calls `GET /stats`.
22    pub async fn get_stats(&self) -> Result<Stats> {
23        let response = self.make_request("GET", "/stats", None).await?;
24        serde_json::from_str(&response).map_err(|e| {
25            VectorizerError::server(format!("Failed to parse get_stats response: {e}"))
26        })
27    }
28
29    /// Runtime metrics snapshot for the dashboard (phase25).
30    ///
31    /// Calls `GET /metrics/runtime`. Returns CPU, memory, active
32    /// connections, rolling 60-second QPS, per-route p50/p99,
33    /// 5xx error rate, and the WAL state. Requires admin auth.
34    pub async fn get_runtime_metrics(&self) -> Result<RuntimeMetrics> {
35        let response = self.make_request("GET", "/metrics/runtime", None).await?;
36        serde_json::from_str(&response).map_err(|e| {
37            VectorizerError::server(format!("Failed to parse get_runtime_metrics response: {e}"))
38        })
39    }
40
41    /// Server liveness / version / uptime.
42    ///
43    /// Calls `GET /status`.
44    pub async fn get_status(&self) -> Result<ServerStatus> {
45        let response = self.make_request("GET", "/status", None).await?;
46        serde_json::from_str(&response).map_err(|e| {
47            VectorizerError::server(format!("Failed to parse get_status response: {e}"))
48        })
49    }
50
51    /// Tail recent log lines.
52    ///
53    /// Calls `GET /logs?lines=N&level=LEVEL`.
54    pub async fn get_logs(&self, params: LogsQuery) -> Result<Vec<LogEntry>> {
55        let mut qs = String::new();
56        if let Some(lines) = params.lines {
57            qs.push_str(&format!("lines={lines}"));
58        }
59        if let Some(level) = &params.level {
60            if !qs.is_empty() {
61                qs.push('&');
62            }
63            qs.push_str(&format!("level={level}"));
64        }
65        let endpoint = if qs.is_empty() {
66            "/logs".to_string()
67        } else {
68            format!("/logs?{qs}")
69        };
70        let response = self.make_request("GET", &endpoint, None).await?;
71        let val: serde_json::Value = serde_json::from_str(&response).map_err(|e| {
72            VectorizerError::server(format!("Failed to parse get_logs response: {e}"))
73        })?;
74        let logs = val
75            .get("logs")
76            .and_then(|l| l.as_array())
77            .cloned()
78            .unwrap_or_default();
79        let entries: Result<Vec<LogEntry>> = logs
80            .into_iter()
81            .map(|v| {
82                serde_json::from_value(v)
83                    .map_err(|e| VectorizerError::server(format!("Failed to parse log entry: {e}")))
84            })
85            .collect();
86        entries
87    }
88
89    /// Per-collection indexing progress.
90    ///
91    /// Calls `GET /indexing/progress`.
92    pub async fn get_indexing_progress(&self) -> Result<IndexingProgress> {
93        let response = self.make_request("GET", "/indexing/progress", None).await?;
94        serde_json::from_str(&response).map_err(|e| {
95            VectorizerError::server(format!(
96                "Failed to parse get_indexing_progress response: {e}"
97            ))
98        })
99    }
100
101    /// Flush one collection to disk immediately.
102    ///
103    /// Calls `POST /collections/{name}/force-save`.
104    pub async fn force_save_collection(&self, collection: &str) -> Result<()> {
105        self.make_request(
106            "POST",
107            &format!("/collections/{collection}/force-save"),
108            None,
109        )
110        .await?;
111        Ok(())
112    }
113
114    /// List collections that contain zero vectors.
115    ///
116    /// Calls `GET /collections/empty`.
117    pub async fn list_empty_collections(&self) -> Result<Vec<String>> {
118        let response = self.make_request("GET", "/collections/empty", None).await?;
119        let val: serde_json::Value = serde_json::from_str(&response).map_err(|e| {
120            VectorizerError::server(format!(
121                "Failed to parse list_empty_collections response: {e}"
122            ))
123        })?;
124        // Server returns either an array directly or {collections: [...]}
125        let arr = if val.is_array() {
126            val.as_array().cloned().unwrap_or_default()
127        } else {
128            val.get("collections")
129                .and_then(|c| c.as_array())
130                .cloned()
131                .unwrap_or_default()
132        };
133        Ok(arr
134            .into_iter()
135            .filter_map(|v| v.as_str().map(str::to_string))
136            .collect())
137    }
138
139    /// Delete all empty collections in one call.
140    ///
141    /// Calls `DELETE /collections/cleanup`.
142    pub async fn cleanup_empty_collections(&self) -> Result<CleanupReport> {
143        let response = self
144            .make_request("DELETE", "/collections/cleanup", None)
145            .await?;
146        serde_json::from_str(&response).map_err(|e| {
147            VectorizerError::server(format!(
148                "Failed to parse cleanup_empty_collections response: {e}"
149            ))
150        })
151    }
152
153    /// Read the server's current `config.yml`.
154    ///
155    /// Calls `GET /config`.
156    pub async fn get_config(&self) -> Result<ConfigSnapshot> {
157        let response = self.make_request("GET", "/config", None).await?;
158        let val: serde_json::Value = serde_json::from_str(&response).map_err(|e| {
159            VectorizerError::server(format!("Failed to parse get_config response: {e}"))
160        })?;
161        Ok(ConfigSnapshot(val))
162    }
163
164    /// Overwrite the server's `config.yml` (admin).
165    ///
166    /// Calls `POST /config` with the full config object.
167    /// Returns the config as echoed back by the server (free-form JSON).
168    pub async fn update_config(&self, patch: ConfigPatch) -> Result<ConfigSnapshot> {
169        let response = self.make_request("POST", "/config", Some(patch.0)).await?;
170        let val: serde_json::Value = serde_json::from_str(&response).map_err(|e| {
171            VectorizerError::server(format!("Failed to parse update_config response: {e}"))
172        })?;
173        Ok(ConfigSnapshot(val))
174    }
175
176    /// List all server-side backup files.
177    ///
178    /// Calls `GET /backups`.
179    pub async fn list_backups(&self) -> Result<Vec<BackupInfo>> {
180        let response = self.make_request("GET", "/backups", None).await?;
181        let val: serde_json::Value = serde_json::from_str(&response).map_err(|e| {
182            VectorizerError::server(format!("Failed to parse list_backups response: {e}"))
183        })?;
184        let arr = val
185            .get("backups")
186            .and_then(|b| b.as_array())
187            .cloned()
188            .unwrap_or_default();
189        arr.into_iter()
190            .map(|v| {
191                serde_json::from_value(v).map_err(|e| {
192                    VectorizerError::server(format!("Failed to parse backup entry: {e}"))
193                })
194            })
195            .collect()
196    }
197
198    /// Create a new backup (admin).
199    ///
200    /// Calls `POST /backups/create` with `{name, collections}`.
201    pub async fn create_backup(&self, request: CreateBackupRequest) -> Result<BackupInfo> {
202        let payload = serde_json::to_value(&request).map_err(|e| {
203            VectorizerError::server(format!("Failed to serialize create_backup request: {e}"))
204        })?;
205        let response = self
206            .make_request("POST", "/backups/create", Some(payload))
207            .await?;
208        serde_json::from_str(&response).map_err(|e| {
209            VectorizerError::server(format!("Failed to parse create_backup response: {e}"))
210        })
211    }
212
213    /// Restore a backup from the server's backup directory (admin).
214    ///
215    /// Calls `POST /backups/restore` with `{backup_id}`.
216    pub async fn restore_backup(&self, request: RestoreBackupRequest) -> Result<()> {
217        let payload = serde_json::to_value(&request).map_err(|e| {
218            VectorizerError::server(format!("Failed to serialize restore_backup request: {e}"))
219        })?;
220        self.make_request("POST", "/backups/restore", Some(payload))
221            .await?;
222        Ok(())
223    }
224
225    /// Initiate a graceful server restart (admin).
226    ///
227    /// Calls `POST /admin/restart`. The server responds before the
228    /// process actually restarts; callers should poll `/health` until
229    /// the server is back.
230    pub async fn restart_server(&self) -> Result<()> {
231        self.make_request("POST", "/admin/restart", None).await?;
232        Ok(())
233    }
234
235    /// List configured workspace directories.
236    ///
237    /// Calls `GET /workspace/list`.
238    pub async fn list_workspaces(&self) -> Result<Vec<WorkspaceConfig>> {
239        let response = self.make_request("GET", "/workspace/list", None).await?;
240        let val: serde_json::Value = serde_json::from_str(&response).map_err(|e| {
241            VectorizerError::server(format!("Failed to parse list_workspaces response: {e}"))
242        })?;
243        let arr = val
244            .get("workspaces")
245            .and_then(|w| w.as_array())
246            .cloned()
247            .unwrap_or_default();
248        Ok(arr.into_iter().map(WorkspaceConfig).collect())
249    }
250
251    /// Read the workspace configuration file.
252    ///
253    /// Calls `GET /workspace/config`.
254    pub async fn get_workspace_config(&self) -> Result<WorkspaceConfig> {
255        let response = self.make_request("GET", "/workspace/config", None).await?;
256        let val: serde_json::Value = serde_json::from_str(&response).map_err(|e| {
257            VectorizerError::server(format!(
258                "Failed to parse get_workspace_config response: {e}"
259            ))
260        })?;
261        Ok(WorkspaceConfig(val))
262    }
263
264    /// Register a new workspace directory (admin).
265    ///
266    /// Calls `POST /workspace/add` with `{path, collection_name}`.
267    pub async fn add_workspace(&self, request: AddWorkspaceRequest) -> Result<()> {
268        let payload = serde_json::to_value(&request).map_err(|e| {
269            VectorizerError::server(format!("Failed to serialize add_workspace request: {e}"))
270        })?;
271        self.make_request("POST", "/workspace/add", Some(payload))
272            .await?;
273        Ok(())
274    }
275
276    /// Remove a registered workspace directory (admin).
277    ///
278    /// Calls `POST /workspace/remove` with `{path}`.
279    pub async fn remove_workspace(&self, name: &str) -> Result<()> {
280        let payload = serde_json::json!({ "path": name });
281        self.make_request("POST", "/workspace/remove", Some(payload))
282            .await?;
283        Ok(())
284    }
285
286    // ── Phase-14: observability ────────────────────────────────────────────────
287
288    /// List slow-query ring-buffer entries (phase14).
289    ///
290    /// Calls `GET /slow_queries`. Returns entries in the order they were
291    /// recorded (oldest first). The response also carries the current
292    /// ring-buffer configuration, but this method returns only the entries.
293    ///
294    /// Use [`set_slow_query_config`] to tune the threshold and capacity.
295    pub async fn list_slow_queries(&self) -> Result<Vec<SlowQueryEntry>> {
296        let response = self.make_request("GET", "/slow_queries", None).await?;
297        let val: serde_json::Value = serde_json::from_str(&response).map_err(|e| {
298            VectorizerError::server(format!("Failed to parse list_slow_queries response: {e}"))
299        })?;
300        let arr = val
301            .get("entries")
302            .and_then(|e| e.as_array())
303            .cloned()
304            .unwrap_or_default();
305        arr.into_iter()
306            .map(|v| {
307                serde_json::from_value(v).map_err(|e| {
308                    VectorizerError::server(format!("Failed to parse slow-query entry: {e}"))
309                })
310            })
311            .collect()
312    }
313
314    /// Reconfigure the slow-query ring buffer (phase14).
315    ///
316    /// Calls `POST /slow_queries/config` with
317    /// `{"threshold_ms": <u64>, "capacity": <usize>}`.
318    ///
319    /// Existing entries are retained. If the new capacity is smaller than
320    /// the current entry count the oldest entries are evicted by the server.
321    pub async fn set_slow_query_config(&self, config: SlowQueryConfig) -> Result<SlowQueryConfig> {
322        let payload = serde_json::json!({
323            "threshold_ms": config.threshold_ms,
324            "capacity": config.capacity,
325        });
326        let response = self
327            .make_request("POST", "/slow_queries/config", Some(payload))
328            .await?;
329        serde_json::from_str(&response).map_err(|e| {
330            VectorizerError::server(format!(
331                "Failed to parse set_slow_query_config response: {e}"
332            ))
333        })
334    }
335}
336
337#[cfg(test)]
338mod tests {
339    #![allow(clippy::unwrap_used)]
340
341    use serde_json::json;
342
343    use crate::models::{
344        AddWorkspaceRequest, BackupInfo, CleanupReport, ConfigPatch, ConfigSnapshot,
345        CreateBackupRequest, IndexingProgress, LogEntry, LogsQuery, RestoreBackupRequest,
346        RuntimeMetrics, ServerStatus, SlowQueryConfig, SlowQueryEntry, Stats, WorkspaceConfig,
347    };
348
349    #[test]
350    fn stats_deserializes() {
351        let raw = json!({
352            "collections": 5,
353            "total_vectors": 1000,
354            "uptime_seconds": 3600,
355            "version": "3.4.0"
356        });
357        let s: Stats = serde_json::from_value(raw).unwrap();
358        assert_eq!(s.collections, 5);
359        assert_eq!(s.total_vectors, 1000);
360        assert_eq!(s.version, "3.4.0");
361        // Older servers without phase25 §5 fields fall back to ("none", 1.0).
362        assert_eq!(s.default_quantization, "none");
363        assert!((s.compression_ratio - 1.0).abs() < f32::EPSILON);
364    }
365
366    #[test]
367    fn stats_deserializes_phase25_quantization_fields() {
368        let raw = json!({
369            "collections": 3,
370            "total_vectors": 12_000,
371            "uptime_seconds": 60,
372            "version": "3.4.0",
373            "default_quantization": "sq-8bit",
374            "compression_ratio": 4.0,
375        });
376        let s: Stats = serde_json::from_value(raw).unwrap();
377        assert_eq!(s.default_quantization, "sq-8bit");
378        assert!((s.compression_ratio - 4.0).abs() < f32::EPSILON);
379    }
380
381    #[test]
382    fn runtime_metrics_deserializes_full_snapshot() {
383        let raw = json!({
384            "cpu_percent": 12.4,
385            "memory_rss_bytes": 124_857_600u64,
386            "memory_total_bytes": 17_179_869_184u64,
387            "memory_percent": 0.73,
388            "active_connections": 8,
389            "uptime_seconds": 3712,
390            "qps_window_60s": 142.3,
391            "error_rate_5xx_60s": 0.001,
392            "throughput_by_route": [
393                {"route": "/insert_texts", "qps": 12.0, "p50_ms": 8.2, "p99_ms": 41.0}
394            ],
395            "wal": {
396                "current_seq": 482919u64,
397                "size_bytes": 12_582_912u64,
398                "last_checkpoint_at": 1_714_828_800u64,
399                "last_checkpoint_seq": 482_800u64,
400            }
401        });
402        let m: RuntimeMetrics = serde_json::from_value(raw).unwrap();
403        assert!((m.cpu_percent - 12.4).abs() < f64::EPSILON);
404        assert_eq!(m.active_connections, 8);
405        assert_eq!(m.throughput_by_route.len(), 1);
406        assert_eq!(m.throughput_by_route[0].route, "/insert_texts");
407        assert!((m.throughput_by_route[0].p99_ms - 41.0).abs() < f64::EPSILON);
408        assert_eq!(m.wal.current_seq, 482919);
409        assert_eq!(m.wal.last_checkpoint_seq, 482_800);
410    }
411
412    #[test]
413    fn runtime_metrics_tolerates_missing_fields() {
414        // Standalone server without WAL or routes: every field is
415        // marked default so partial payloads still deserialize.
416        let raw = json!({
417            "cpu_percent": 1.0,
418            "memory_total_bytes": 8_000_000_000u64,
419        });
420        let m: RuntimeMetrics = serde_json::from_value(raw).unwrap();
421        assert!((m.cpu_percent - 1.0).abs() < f64::EPSILON);
422        assert_eq!(m.active_connections, 0);
423        assert!(m.throughput_by_route.is_empty());
424        assert_eq!(m.wal.current_seq, 0);
425    }
426
427    #[test]
428    fn server_status_deserializes() {
429        let raw = json!({
430            "online": true,
431            "version": "3.4.0",
432            "uptime_seconds": 120,
433            "collections_count": 3
434        });
435        let ss: ServerStatus = serde_json::from_value(raw).unwrap();
436        assert!(ss.online);
437        assert_eq!(ss.collections_count, 3);
438    }
439
440    #[test]
441    fn log_entry_deserializes() {
442        let raw = json!({
443            "timestamp": "2026-05-02T00:00:00Z",
444            "level": "INFO",
445            "message": "Server started",
446            "source": "vectorizer"
447        });
448        let le: LogEntry = serde_json::from_value(raw).unwrap();
449        assert_eq!(le.level, "INFO");
450        assert_eq!(le.source, "vectorizer");
451    }
452
453    #[test]
454    fn logs_query_default_serializes() {
455        let q = LogsQuery::default();
456        let v = serde_json::to_value(&q).unwrap();
457        assert_eq!(v, json!({}));
458    }
459
460    #[test]
461    fn logs_query_with_params_serializes() {
462        let q = LogsQuery {
463            lines: Some(50),
464            level: Some("ERROR".into()),
465        };
466        let v = serde_json::to_value(&q).unwrap();
467        assert_eq!(v["lines"], 50);
468        assert_eq!(v["level"], "ERROR");
469    }
470
471    #[test]
472    fn indexing_progress_deserializes() {
473        let raw = json!({
474            "overall_status": "completed",
475            "collections": [],
476            "is_indexing": false
477        });
478        let ip: IndexingProgress = serde_json::from_value(raw).unwrap();
479        assert_eq!(ip.overall_status, "completed");
480    }
481
482    #[test]
483    fn cleanup_report_deserializes() {
484        let raw = json!({
485            "success": true,
486            "removed": 2,
487            "collections": ["empty1", "empty2"],
488            "message": "Done"
489        });
490        let cr: CleanupReport = serde_json::from_value(raw).unwrap();
491        assert!(cr.success);
492        assert_eq!(cr.removed, 2);
493        assert_eq!(cr.collections.len(), 2);
494    }
495
496    #[test]
497    fn config_snapshot_round_trips() {
498        let val = json!({ "server": { "port": 15002 } });
499        let cs = ConfigSnapshot(val.clone());
500        let serialized = serde_json::to_value(&cs).unwrap();
501        assert_eq!(serialized, val);
502    }
503
504    #[test]
505    fn config_patch_round_trips() {
506        let val = json!({ "embedding": { "provider": "fastembed" } });
507        let cp = ConfigPatch(val.clone());
508        let serialized = serde_json::to_value(&cp).unwrap();
509        assert_eq!(serialized, val);
510    }
511
512    #[test]
513    fn backup_info_deserializes() {
514        let raw = json!({
515            "id": "abc-123",
516            "name": "weekly",
517            "date": "2026-05-02T00:00:00Z",
518            "size": 4096,
519            "collections": ["docs"]
520        });
521        let bi: BackupInfo = serde_json::from_value(raw).unwrap();
522        assert_eq!(bi.id, "abc-123");
523        assert_eq!(bi.collections, vec!["docs"]);
524    }
525
526    #[test]
527    fn create_backup_request_serializes() {
528        let req = CreateBackupRequest {
529            name: "nightly".into(),
530            collections: vec!["code".into()],
531        };
532        let v = serde_json::to_value(&req).unwrap();
533        assert_eq!(v["name"], "nightly");
534        assert_eq!(v["collections"][0], "code");
535    }
536
537    #[test]
538    fn restore_backup_request_serializes() {
539        let req = RestoreBackupRequest {
540            backup_id: "xyz-789".into(),
541        };
542        let v = serde_json::to_value(&req).unwrap();
543        assert_eq!(v["backup_id"], "xyz-789");
544    }
545
546    #[test]
547    fn workspace_config_round_trips() {
548        let val = json!({ "projects": [], "global_settings": {} });
549        let wc = WorkspaceConfig(val.clone());
550        let serialized = serde_json::to_value(&wc).unwrap();
551        assert_eq!(serialized, val);
552    }
553
554    #[test]
555    fn add_workspace_request_serializes() {
556        let req = AddWorkspaceRequest {
557            path: "/home/user/project".into(),
558            collection_name: "project_docs".into(),
559        };
560        let v = serde_json::to_value(&req).unwrap();
561        assert_eq!(v["path"], "/home/user/project");
562        assert_eq!(v["collection_name"], "project_docs");
563    }
564
565    // ── Phase-14 slow-query round-trip tests ──────────────────────────────────
566
567    #[test]
568    fn slow_query_entry_wire_shape() {
569        // Mirror of one item in `GET /slow_queries` → entries[].
570        let raw = json!({
571            "timestamp": "2026-05-02T00:01:00Z",
572            "collection": "docs",
573            "k": 10,
574            "duration_ms": 312.5,
575        });
576        let e: SlowQueryEntry = serde_json::from_value(raw).unwrap();
577        assert_eq!(e.collection, "docs");
578        assert_eq!(e.k, 10);
579        assert!((e.duration_ms - 312.5).abs() < f64::EPSILON);
580    }
581
582    #[test]
583    fn slow_query_config_round_trips() {
584        // Mirror of `POST /slow_queries/config` response.
585        let raw = json!({
586            "threshold_ms": 200u64,
587            "capacity": 500usize,
588            "status": "ok",
589        });
590        let cfg: SlowQueryConfig = serde_json::from_value(raw).unwrap();
591        assert_eq!(cfg.threshold_ms, 200);
592        assert_eq!(cfg.capacity, 500);
593
594        // Serialize back — status is server-only, not echoed by the struct.
595        let v = serde_json::to_value(&cfg).unwrap();
596        assert_eq!(v["threshold_ms"], 200);
597        assert_eq!(v["capacity"], 500);
598    }
599
600    #[test]
601    fn slow_query_config_payload_shape() {
602        // Verify the request body for `POST /slow_queries/config`.
603        let cfg = SlowQueryConfig {
604            threshold_ms: 150,
605            capacity: 1000,
606        };
607        let payload = json!({
608            "threshold_ms": cfg.threshold_ms,
609            "capacity": cfg.capacity,
610        });
611        assert_eq!(payload["threshold_ms"], 150);
612        assert_eq!(payload["capacity"], 1000);
613    }
614
615    #[test]
616    fn list_slow_queries_response_parses_entries() {
617        // Full response shape from `GET /slow_queries`.
618        let raw = json!({
619            "entries": [
620                {
621                    "timestamp": "2026-05-02T00:01:00Z",
622                    "collection": "docs",
623                    "k": 5,
624                    "duration_ms": 450.0,
625                },
626                {
627                    "timestamp": "2026-05-02T00:02:00Z",
628                    "collection": "logs",
629                    "k": 20,
630                    "duration_ms": 800.0,
631                }
632            ],
633            "total": 2,
634            "config": {
635                "threshold_ms": 200,
636                "capacity": 1000,
637            }
638        });
639        let entries = raw["entries"].as_array().unwrap();
640        let parsed: Vec<SlowQueryEntry> = entries
641            .iter()
642            .map(|v| serde_json::from_value(v.clone()).unwrap())
643            .collect();
644        assert_eq!(parsed.len(), 2);
645        assert_eq!(parsed[0].collection, "docs");
646        assert_eq!(parsed[1].k, 20);
647    }
648}