Skip to main content

inferd_client/
admin.rs

1//! Admin-socket subscriber. Read-only stream of lifecycle events
2//! from the daemon. Per `docs/protocol-v1.md` §"Admin endpoint".
3
4use serde::Deserialize;
5use std::io;
6#[cfg(unix)]
7use std::path::Path;
8use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
9
10/// One frame off the admin socket. Fields not relevant to the
11/// current `status`/`phase` are absent (or default) per the spec's
12/// flattened wire shape.
13///
14/// Forward compatibility: clients **must ignore** unknown `status`,
15/// `phase`, and detail keys per `docs/protocol-v1.md`. Unknown
16/// values land in the typed fields verbatim — branch on values you
17/// recognise; default to logging-and-ignoring otherwise.
18#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
19pub struct AdminEvent {
20    /// Always `"admin"` in v1.
21    #[serde(default)]
22    pub id: String,
23    /// Always `"status"` in v1.
24    #[serde(default)]
25    #[serde(rename = "type")]
26    pub kind: String,
27    /// One of `starting`, `loading_model`, `ready`, `restarting`,
28    /// `draining`. Unknown values surface verbatim — ignore them.
29    pub status: String,
30    /// Set on `loading_model` and `restarting`. One of
31    /// `checking_local`, `download`, `verify`, `quarantine`,
32    /// `mmap`, `kv_cache`. Unknown values surface verbatim.
33    #[serde(default)]
34    pub phase: String,
35
36    // --- Phase-specific detail fields (flattened on the wire) ---
37    /// Path being checked / verified / mmapped / quarantined.
38    #[serde(default)]
39    pub path: Option<String>,
40    /// Bytes downloaded so far (download phase).
41    #[serde(default)]
42    pub downloaded_bytes: Option<u64>,
43    /// Total bytes if known (download phase). `None` when the
44    /// server didn't supply Content-Length.
45    #[serde(default)]
46    pub total_bytes: Option<u64>,
47    /// Source URL (download phase). Diagnostic only.
48    #[serde(default)]
49    pub source_url: Option<String>,
50    /// Expected SHA-256 (quarantine phase).
51    #[serde(default)]
52    pub expected_sha256: Option<String>,
53    /// Computed SHA-256 (quarantine phase).
54    #[serde(default)]
55    pub actual_sha256: Option<String>,
56    /// Where the bad bytes were moved (quarantine phase).
57    #[serde(default)]
58    pub quarantine_path: Option<String>,
59    /// Configured context window in tokens (kv_cache phase).
60    #[serde(default)]
61    pub n_ctx: Option<u32>,
62}
63
64/// Errors produced by the admin client.
65#[derive(Debug, thiserror::Error)]
66pub enum AdminError {
67    /// Underlying I/O failure.
68    #[error("io: {0}")]
69    Io(#[from] io::Error),
70    /// JSON decode of an admin frame failed.
71    #[error("decode: {0}")]
72    Decode(#[from] serde_json::Error),
73    /// Daemon closed the admin socket. Reconnect to resume.
74    #[error("admin socket closed")]
75    Closed,
76}
77
78/// Subscriber for the inferd admin socket.
79///
80/// Construct via `dial_admin_uds` (Unix) / `dial_admin_pipe`
81/// (Windows). Read events with `recv()` or wait for a specific
82/// state with `wait_ready()`.
83pub struct AdminClient {
84    reader: BufReader<Box<dyn AsyncRead + Send + Unpin>>,
85}
86
87impl AdminClient {
88    /// Open a Unix domain socket connection to the admin socket
89    /// (Unix only).
90    #[cfg(unix)]
91    pub async fn dial_admin_uds(path: &Path) -> Result<Self, AdminError> {
92        let stream = tokio::net::UnixStream::connect(path).await?;
93        Ok(Self {
94            reader: BufReader::with_capacity(8192, Box::new(stream)),
95        })
96    }
97
98    /// Open a Windows named pipe connection to the admin socket
99    /// (Windows only).
100    #[cfg(windows)]
101    pub async fn dial_admin_pipe(path: &str) -> Result<Self, AdminError> {
102        use tokio::net::windows::named_pipe::ClientOptions;
103        let pipe = ClientOptions::new().open(path)?;
104        Ok(Self {
105            reader: BufReader::with_capacity(8192, Box::new(pipe)),
106        })
107    }
108
109    /// Read the next admin event. Blocks until a frame arrives or
110    /// the daemon closes the connection.
111    pub async fn recv(&mut self) -> Result<AdminEvent, AdminError> {
112        let mut line = Vec::with_capacity(512);
113        let n = self.reader.read_until(b'\n', &mut line).await?;
114        if n == 0 {
115            return Err(AdminError::Closed);
116        }
117        let event: AdminEvent = serde_json::from_slice(&line)?;
118        Ok(event)
119    }
120
121    /// Loop `recv()` until a `ready` event arrives. Returns the
122    /// event that flipped state. Returns `Err(AdminError::Closed)`
123    /// if the daemon closes before reaching `ready` (a daemon that
124    /// crashes mid-load looks like this).
125    pub async fn wait_ready(&mut self) -> Result<AdminEvent, AdminError> {
126        loop {
127            let ev = self.recv().await?;
128            if ev.status == "ready" {
129                return Ok(ev);
130            }
131        }
132    }
133}
134
135#[cfg(test)]
136mod tests {
137    use super::*;
138
139    #[test]
140    fn decodes_download_frame() {
141        let raw = br#"{
142            "id":"admin","type":"status","status":"loading_model","phase":"download",
143            "downloaded_bytes":33554432,"total_bytes":5126304928,
144            "source_url":"https://huggingface.co/example.gguf"
145        }"#;
146        let ev: AdminEvent = serde_json::from_slice(raw).unwrap();
147        assert_eq!(ev.id, "admin");
148        assert_eq!(ev.kind, "status");
149        assert_eq!(ev.status, "loading_model");
150        assert_eq!(ev.phase, "download");
151        assert_eq!(ev.downloaded_bytes, Some(33_554_432));
152        assert_eq!(ev.total_bytes, Some(5_126_304_928));
153        assert_eq!(
154            ev.source_url.as_deref(),
155            Some("https://huggingface.co/example.gguf")
156        );
157    }
158
159    #[test]
160    fn decodes_ready_frame() {
161        let raw = br#"{"id":"admin","type":"status","status":"ready"}"#;
162        let ev: AdminEvent = serde_json::from_slice(raw).unwrap();
163        assert_eq!(ev.status, "ready");
164        assert_eq!(ev.phase, "");
165        assert!(ev.downloaded_bytes.is_none());
166    }
167
168    #[test]
169    fn total_bytes_may_be_null() {
170        let raw = br#"{"id":"admin","type":"status","status":"loading_model","phase":"download","downloaded_bytes":1024,"total_bytes":null,"source_url":"https://x"}"#;
171        let ev: AdminEvent = serde_json::from_slice(raw).unwrap();
172        assert_eq!(ev.downloaded_bytes, Some(1024));
173        assert_eq!(ev.total_bytes, None);
174    }
175
176    #[test]
177    fn unknown_status_round_trips_verbatim() {
178        let raw = br#"{"id":"admin","type":"status","status":"future_state_we_dont_know","extra_key":42}"#;
179        let ev: AdminEvent = serde_json::from_slice(raw).unwrap();
180        assert_eq!(ev.status, "future_state_we_dont_know");
181    }
182
183    #[tokio::test]
184    async fn recv_decodes_from_a_duplex_pipe() {
185        let (mut server_side, client_side) = tokio::io::duplex(4096);
186        use tokio::io::AsyncWriteExt;
187        server_side
188            .write_all(b"{\"id\":\"admin\",\"type\":\"status\",\"status\":\"starting\"}\n")
189            .await
190            .unwrap();
191        server_side.flush().await.unwrap();
192
193        let mut client = AdminClient {
194            reader: BufReader::with_capacity(4096, Box::new(client_side)),
195        };
196        let ev = client.recv().await.unwrap();
197        assert_eq!(ev.status, "starting");
198    }
199
200    #[tokio::test]
201    async fn wait_ready_skips_loading_frames() {
202        let (mut server_side, client_side) = tokio::io::duplex(4096);
203        use tokio::io::AsyncWriteExt;
204        let writes = b"\
205{\"id\":\"admin\",\"type\":\"status\",\"status\":\"starting\"}\n\
206{\"id\":\"admin\",\"type\":\"status\",\"status\":\"loading_model\",\"phase\":\"checking_local\",\"path\":\"/x.gguf\"}\n\
207{\"id\":\"admin\",\"type\":\"status\",\"status\":\"loading_model\",\"phase\":\"mmap\",\"path\":\"/x.gguf\"}\n\
208{\"id\":\"admin\",\"type\":\"status\",\"status\":\"ready\"}\n\
209";
210        server_side.write_all(writes).await.unwrap();
211        server_side.flush().await.unwrap();
212
213        let mut client = AdminClient {
214            reader: BufReader::with_capacity(4096, Box::new(client_side)),
215        };
216        let ev = client.wait_ready().await.unwrap();
217        assert_eq!(ev.status, "ready");
218    }
219
220    #[tokio::test]
221    async fn recv_reports_closed_on_eof() {
222        let (server_side, client_side) = tokio::io::duplex(4096);
223        drop(server_side);
224
225        let mut client = AdminClient {
226            reader: BufReader::with_capacity(4096, Box::new(client_side)),
227        };
228        match client.recv().await {
229            Err(AdminError::Closed) => {}
230            other => panic!("expected Closed, got {other:?}"),
231        }
232    }
233}