Skip to main content

inferd_client/
admin.rs

1//! Admin-socket subscriber. Read-only stream of lifecycle events
2//! from the daemon. Per `docs/protocol-v1.md` §"Admin endpoint".
3
4use serde::Deserialize;
5use std::io;
6#[cfg(unix)]
7use std::path::Path;
8use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
9
10/// One frame off the admin socket. Fields not relevant to the
11/// current `status`/`phase` are absent (or default) per the spec's
12/// flattened wire shape.
13///
14/// Forward compatibility: clients **must ignore** unknown `status`,
15/// `phase`, and detail keys per `docs/protocol-v1.md`. Unknown
16/// values land in the typed fields verbatim — branch on values you
17/// recognise; default to logging-and-ignoring otherwise.
18#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
19pub struct AdminEvent {
20    /// Always `"admin"` in v1.
21    #[serde(default)]
22    pub id: String,
23    /// Always `"status"` in v1.
24    #[serde(default)]
25    #[serde(rename = "type")]
26    pub kind: String,
27    /// One of `starting`, `loading_model`, `ready`, `restarting`,
28    /// `draining`. Unknown values surface verbatim — ignore them.
29    pub status: String,
30    /// Set on `loading_model` and `restarting`. One of
31    /// `checking_local`, `download`, `verify`, `quarantine`,
32    /// `mmap`, `kv_cache`. Unknown values surface verbatim.
33    #[serde(default)]
34    pub phase: String,
35
36    // --- Phase-specific detail fields (flattened on the wire) ---
37    /// Path being checked / verified / mmapped / quarantined.
38    #[serde(default)]
39    pub path: Option<String>,
40    /// Bytes downloaded so far (download phase).
41    #[serde(default)]
42    pub downloaded_bytes: Option<u64>,
43    /// Total bytes if known (download phase). `None` when the
44    /// server didn't supply Content-Length.
45    #[serde(default)]
46    pub total_bytes: Option<u64>,
47    /// Source URL (download phase). Diagnostic only.
48    #[serde(default)]
49    pub source_url: Option<String>,
50    /// Expected SHA-256 (quarantine phase).
51    #[serde(default)]
52    pub expected_sha256: Option<String>,
53    /// Computed SHA-256 (quarantine phase).
54    #[serde(default)]
55    pub actual_sha256: Option<String>,
56    /// Where the bad bytes were moved (quarantine phase).
57    #[serde(default)]
58    pub quarantine_path: Option<String>,
59    /// Configured context window in tokens (kv_cache phase).
60    #[serde(default)]
61    pub n_ctx: Option<u32>,
62
63    // --- capabilities-specific fields (status="capabilities", #77) ---
64    /// Backend identifier (capabilities phase).
65    #[serde(default)]
66    pub backend: Option<String>,
67    /// `true` if the backend implements v2 (capabilities phase).
68    #[serde(default)]
69    pub v2: Option<bool>,
70    /// `true` if the backend can ingest images (capabilities phase).
71    #[serde(default)]
72    pub vision: Option<bool>,
73    /// `true` if the backend can ingest audio (capabilities phase).
74    #[serde(default)]
75    pub audio: Option<bool>,
76    /// `true` if the backend natively supports tool-use
77    /// (capabilities phase).
78    #[serde(default)]
79    pub tools: Option<bool>,
80    /// `true` if the backend separates `<|think|>` reasoning from
81    /// user-visible output (capabilities phase).
82    #[serde(default)]
83    pub thinking: Option<bool>,
84    /// `true` if the backend implements `embed` per ADR 0017
85    /// (capabilities phase).
86    #[serde(default)]
87    pub embed: Option<bool>,
88    /// Compile-time GGML accelerator: `"cpu"` / `"cuda"` / `"metal"`
89    /// / `"vulkan"` / `"rocm"` (capabilities phase).
90    #[serde(default)]
91    pub accelerator: Option<String>,
92    /// Layers offloaded to the accelerator at runtime (capabilities
93    /// phase). 0 means CPU-only at runtime regardless of `accelerator`.
94    #[serde(default)]
95    pub gpu_layers: Option<u32>,
96}
97
98/// Errors produced by the admin client.
99#[derive(Debug, thiserror::Error)]
100pub enum AdminError {
101    /// Underlying I/O failure.
102    #[error("io: {0}")]
103    Io(#[from] io::Error),
104    /// JSON decode of an admin frame failed.
105    #[error("decode: {0}")]
106    Decode(#[from] serde_json::Error),
107    /// Daemon closed the admin socket. Reconnect to resume.
108    #[error("admin socket closed")]
109    Closed,
110}
111
112/// Subscriber for the inferd admin socket.
113///
114/// Construct via `dial_admin_uds` (Unix) / `dial_admin_pipe`
115/// (Windows). Read events with `recv()` or wait for a specific
116/// state with `wait_ready()`.
117pub struct AdminClient {
118    reader: BufReader<Box<dyn AsyncRead + Send + Unpin>>,
119}
120
121impl AdminClient {
122    /// Open a Unix domain socket connection to the admin socket
123    /// (Unix only).
124    #[cfg(unix)]
125    pub async fn dial_admin_uds(path: &Path) -> Result<Self, AdminError> {
126        let stream = tokio::net::UnixStream::connect(path).await?;
127        Ok(Self {
128            reader: BufReader::with_capacity(8192, Box::new(stream)),
129        })
130    }
131
132    /// Open a Windows named pipe connection to the admin socket
133    /// (Windows only).
134    #[cfg(windows)]
135    pub async fn dial_admin_pipe(path: &str) -> Result<Self, AdminError> {
136        use tokio::net::windows::named_pipe::ClientOptions;
137        let pipe = ClientOptions::new().open(path)?;
138        Ok(Self {
139            reader: BufReader::with_capacity(8192, Box::new(pipe)),
140        })
141    }
142
143    /// Read the next admin event. Blocks until a frame arrives or
144    /// the daemon closes the connection.
145    pub async fn recv(&mut self) -> Result<AdminEvent, AdminError> {
146        let mut line = Vec::with_capacity(512);
147        let n = self.reader.read_until(b'\n', &mut line).await?;
148        if n == 0 {
149            return Err(AdminError::Closed);
150        }
151        let event: AdminEvent = serde_json::from_slice(&line)?;
152        Ok(event)
153    }
154
155    /// Loop `recv()` until a `ready` event arrives. Returns the
156    /// event that flipped state. Returns `Err(AdminError::Closed)`
157    /// if the daemon closes before reaching `ready` (a daemon that
158    /// crashes mid-load looks like this).
159    pub async fn wait_ready(&mut self) -> Result<AdminEvent, AdminError> {
160        loop {
161            let ev = self.recv().await?;
162            if ev.status == "ready" {
163                return Ok(ev);
164            }
165        }
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use super::*;
172
173    #[test]
174    fn decodes_download_frame() {
175        let raw = br#"{
176            "id":"admin","type":"status","status":"loading_model","phase":"download",
177            "downloaded_bytes":33554432,"total_bytes":5126304928,
178            "source_url":"https://huggingface.co/example.gguf"
179        }"#;
180        let ev: AdminEvent = serde_json::from_slice(raw).unwrap();
181        assert_eq!(ev.id, "admin");
182        assert_eq!(ev.kind, "status");
183        assert_eq!(ev.status, "loading_model");
184        assert_eq!(ev.phase, "download");
185        assert_eq!(ev.downloaded_bytes, Some(33_554_432));
186        assert_eq!(ev.total_bytes, Some(5_126_304_928));
187        assert_eq!(
188            ev.source_url.as_deref(),
189            Some("https://huggingface.co/example.gguf")
190        );
191    }
192
193    #[test]
194    fn decodes_ready_frame() {
195        let raw = br#"{"id":"admin","type":"status","status":"ready"}"#;
196        let ev: AdminEvent = serde_json::from_slice(raw).unwrap();
197        assert_eq!(ev.status, "ready");
198        assert_eq!(ev.phase, "");
199        assert!(ev.downloaded_bytes.is_none());
200    }
201
202    #[test]
203    fn total_bytes_may_be_null() {
204        let raw = br#"{"id":"admin","type":"status","status":"loading_model","phase":"download","downloaded_bytes":1024,"total_bytes":null,"source_url":"https://x"}"#;
205        let ev: AdminEvent = serde_json::from_slice(raw).unwrap();
206        assert_eq!(ev.downloaded_bytes, Some(1024));
207        assert_eq!(ev.total_bytes, None);
208    }
209
210    #[test]
211    fn unknown_status_round_trips_verbatim() {
212        let raw = br#"{"id":"admin","type":"status","status":"future_state_we_dont_know","extra_key":42}"#;
213        let ev: AdminEvent = serde_json::from_slice(raw).unwrap();
214        assert_eq!(ev.status, "future_state_we_dont_know");
215    }
216
217    #[tokio::test]
218    async fn recv_decodes_from_a_duplex_pipe() {
219        let (mut server_side, client_side) = tokio::io::duplex(4096);
220        use tokio::io::AsyncWriteExt;
221        server_side
222            .write_all(b"{\"id\":\"admin\",\"type\":\"status\",\"status\":\"starting\"}\n")
223            .await
224            .unwrap();
225        server_side.flush().await.unwrap();
226
227        let mut client = AdminClient {
228            reader: BufReader::with_capacity(4096, Box::new(client_side)),
229        };
230        let ev = client.recv().await.unwrap();
231        assert_eq!(ev.status, "starting");
232    }
233
234    #[tokio::test]
235    async fn wait_ready_skips_loading_frames() {
236        let (mut server_side, client_side) = tokio::io::duplex(4096);
237        use tokio::io::AsyncWriteExt;
238        let writes = b"\
239{\"id\":\"admin\",\"type\":\"status\",\"status\":\"starting\"}\n\
240{\"id\":\"admin\",\"type\":\"status\",\"status\":\"loading_model\",\"phase\":\"checking_local\",\"path\":\"/x.gguf\"}\n\
241{\"id\":\"admin\",\"type\":\"status\",\"status\":\"loading_model\",\"phase\":\"mmap\",\"path\":\"/x.gguf\"}\n\
242{\"id\":\"admin\",\"type\":\"status\",\"status\":\"ready\"}\n\
243";
244        server_side.write_all(writes).await.unwrap();
245        server_side.flush().await.unwrap();
246
247        let mut client = AdminClient {
248            reader: BufReader::with_capacity(4096, Box::new(client_side)),
249        };
250        let ev = client.wait_ready().await.unwrap();
251        assert_eq!(ev.status, "ready");
252    }
253
254    #[tokio::test]
255    async fn recv_reports_closed_on_eof() {
256        let (server_side, client_side) = tokio::io::duplex(4096);
257        drop(server_side);
258
259        let mut client = AdminClient {
260            reader: BufReader::with_capacity(4096, Box::new(client_side)),
261        };
262        match client.recv().await {
263            Err(AdminError::Closed) => {}
264            other => panic!("expected Closed, got {other:?}"),
265        }
266    }
267}