1use serde::Deserialize;
5use std::io;
6#[cfg(unix)]
7use std::path::Path;
8use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
9
10#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
19pub struct AdminEvent {
20 #[serde(default)]
22 pub id: String,
23 #[serde(default)]
25 #[serde(rename = "type")]
26 pub kind: String,
27 pub status: String,
30 #[serde(default)]
34 pub phase: String,
35
36 #[serde(default)]
39 pub path: Option<String>,
40 #[serde(default)]
42 pub downloaded_bytes: Option<u64>,
43 #[serde(default)]
46 pub total_bytes: Option<u64>,
47 #[serde(default)]
49 pub source_url: Option<String>,
50 #[serde(default)]
52 pub expected_sha256: Option<String>,
53 #[serde(default)]
55 pub actual_sha256: Option<String>,
56 #[serde(default)]
58 pub quarantine_path: Option<String>,
59 #[serde(default)]
61 pub n_ctx: Option<u32>,
62
63 #[serde(default)]
66 pub backend: Option<String>,
67 #[serde(default)]
69 pub v2: Option<bool>,
70 #[serde(default)]
72 pub vision: Option<bool>,
73 #[serde(default)]
75 pub audio: Option<bool>,
76 #[serde(default)]
79 pub tools: Option<bool>,
80 #[serde(default)]
83 pub thinking: Option<bool>,
84 #[serde(default)]
87 pub embed: Option<bool>,
88 #[serde(default)]
91 pub accelerator: Option<String>,
92 #[serde(default)]
95 pub gpu_layers: Option<u32>,
96}
97
98#[derive(Debug, thiserror::Error)]
100pub enum AdminError {
101 #[error("io: {0}")]
103 Io(#[from] io::Error),
104 #[error("decode: {0}")]
106 Decode(#[from] serde_json::Error),
107 #[error("admin socket closed")]
109 Closed,
110}
111
112pub struct AdminClient {
118 reader: BufReader<Box<dyn AsyncRead + Send + Unpin>>,
119}
120
121impl AdminClient {
122 #[cfg(unix)]
125 pub async fn dial_admin_uds(path: &Path) -> Result<Self, AdminError> {
126 let stream = tokio::net::UnixStream::connect(path).await?;
127 Ok(Self {
128 reader: BufReader::with_capacity(8192, Box::new(stream)),
129 })
130 }
131
132 #[cfg(windows)]
135 pub async fn dial_admin_pipe(path: &str) -> Result<Self, AdminError> {
136 use tokio::net::windows::named_pipe::ClientOptions;
137 let pipe = ClientOptions::new().open(path)?;
138 Ok(Self {
139 reader: BufReader::with_capacity(8192, Box::new(pipe)),
140 })
141 }
142
143 pub async fn recv(&mut self) -> Result<AdminEvent, AdminError> {
146 let mut line = Vec::with_capacity(512);
147 let n = self.reader.read_until(b'\n', &mut line).await?;
148 if n == 0 {
149 return Err(AdminError::Closed);
150 }
151 let event: AdminEvent = serde_json::from_slice(&line)?;
152 Ok(event)
153 }
154
155 pub async fn wait_ready(&mut self) -> Result<AdminEvent, AdminError> {
160 loop {
161 let ev = self.recv().await?;
162 if ev.status == "ready" {
163 return Ok(ev);
164 }
165 }
166 }
167}
168
169#[cfg(test)]
170mod tests {
171 use super::*;
172
173 #[test]
174 fn decodes_download_frame() {
175 let raw = br#"{
176 "id":"admin","type":"status","status":"loading_model","phase":"download",
177 "downloaded_bytes":33554432,"total_bytes":5126304928,
178 "source_url":"https://huggingface.co/example.gguf"
179 }"#;
180 let ev: AdminEvent = serde_json::from_slice(raw).unwrap();
181 assert_eq!(ev.id, "admin");
182 assert_eq!(ev.kind, "status");
183 assert_eq!(ev.status, "loading_model");
184 assert_eq!(ev.phase, "download");
185 assert_eq!(ev.downloaded_bytes, Some(33_554_432));
186 assert_eq!(ev.total_bytes, Some(5_126_304_928));
187 assert_eq!(
188 ev.source_url.as_deref(),
189 Some("https://huggingface.co/example.gguf")
190 );
191 }
192
193 #[test]
194 fn decodes_ready_frame() {
195 let raw = br#"{"id":"admin","type":"status","status":"ready"}"#;
196 let ev: AdminEvent = serde_json::from_slice(raw).unwrap();
197 assert_eq!(ev.status, "ready");
198 assert_eq!(ev.phase, "");
199 assert!(ev.downloaded_bytes.is_none());
200 }
201
202 #[test]
203 fn total_bytes_may_be_null() {
204 let raw = br#"{"id":"admin","type":"status","status":"loading_model","phase":"download","downloaded_bytes":1024,"total_bytes":null,"source_url":"https://x"}"#;
205 let ev: AdminEvent = serde_json::from_slice(raw).unwrap();
206 assert_eq!(ev.downloaded_bytes, Some(1024));
207 assert_eq!(ev.total_bytes, None);
208 }
209
210 #[test]
211 fn unknown_status_round_trips_verbatim() {
212 let raw = br#"{"id":"admin","type":"status","status":"future_state_we_dont_know","extra_key":42}"#;
213 let ev: AdminEvent = serde_json::from_slice(raw).unwrap();
214 assert_eq!(ev.status, "future_state_we_dont_know");
215 }
216
217 #[tokio::test]
218 async fn recv_decodes_from_a_duplex_pipe() {
219 let (mut server_side, client_side) = tokio::io::duplex(4096);
220 use tokio::io::AsyncWriteExt;
221 server_side
222 .write_all(b"{\"id\":\"admin\",\"type\":\"status\",\"status\":\"starting\"}\n")
223 .await
224 .unwrap();
225 server_side.flush().await.unwrap();
226
227 let mut client = AdminClient {
228 reader: BufReader::with_capacity(4096, Box::new(client_side)),
229 };
230 let ev = client.recv().await.unwrap();
231 assert_eq!(ev.status, "starting");
232 }
233
234 #[tokio::test]
235 async fn wait_ready_skips_loading_frames() {
236 let (mut server_side, client_side) = tokio::io::duplex(4096);
237 use tokio::io::AsyncWriteExt;
238 let writes = b"\
239{\"id\":\"admin\",\"type\":\"status\",\"status\":\"starting\"}\n\
240{\"id\":\"admin\",\"type\":\"status\",\"status\":\"loading_model\",\"phase\":\"checking_local\",\"path\":\"/x.gguf\"}\n\
241{\"id\":\"admin\",\"type\":\"status\",\"status\":\"loading_model\",\"phase\":\"mmap\",\"path\":\"/x.gguf\"}\n\
242{\"id\":\"admin\",\"type\":\"status\",\"status\":\"ready\"}\n\
243";
244 server_side.write_all(writes).await.unwrap();
245 server_side.flush().await.unwrap();
246
247 let mut client = AdminClient {
248 reader: BufReader::with_capacity(4096, Box::new(client_side)),
249 };
250 let ev = client.wait_ready().await.unwrap();
251 assert_eq!(ev.status, "ready");
252 }
253
254 #[tokio::test]
255 async fn recv_reports_closed_on_eof() {
256 let (server_side, client_side) = tokio::io::duplex(4096);
257 drop(server_side);
258
259 let mut client = AdminClient {
260 reader: BufReader::with_capacity(4096, Box::new(client_side)),
261 };
262 match client.recv().await {
263 Err(AdminError::Closed) => {}
264 other => panic!("expected Closed, got {other:?}"),
265 }
266 }
267}