1use serde::Deserialize;
5use std::io;
6#[cfg(unix)]
7use std::path::Path;
8use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
9
10#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
19pub struct AdminEvent {
20 #[serde(default)]
22 pub id: String,
23 #[serde(default)]
25 #[serde(rename = "type")]
26 pub kind: String,
27 pub status: String,
30 #[serde(default)]
34 pub phase: String,
35
36 #[serde(default)]
39 pub path: Option<String>,
40 #[serde(default)]
42 pub downloaded_bytes: Option<u64>,
43 #[serde(default)]
46 pub total_bytes: Option<u64>,
47 #[serde(default)]
49 pub source_url: Option<String>,
50 #[serde(default)]
52 pub expected_sha256: Option<String>,
53 #[serde(default)]
55 pub actual_sha256: Option<String>,
56 #[serde(default)]
58 pub quarantine_path: Option<String>,
59 #[serde(default)]
61 pub n_ctx: Option<u32>,
62}
63
64#[derive(Debug, thiserror::Error)]
66pub enum AdminError {
67 #[error("io: {0}")]
69 Io(#[from] io::Error),
70 #[error("decode: {0}")]
72 Decode(#[from] serde_json::Error),
73 #[error("admin socket closed")]
75 Closed,
76}
77
78pub struct AdminClient {
84 reader: BufReader<Box<dyn AsyncRead + Send + Unpin>>,
85}
86
87impl AdminClient {
88 #[cfg(unix)]
91 pub async fn dial_admin_uds(path: &Path) -> Result<Self, AdminError> {
92 let stream = tokio::net::UnixStream::connect(path).await?;
93 Ok(Self {
94 reader: BufReader::with_capacity(8192, Box::new(stream)),
95 })
96 }
97
98 #[cfg(windows)]
101 pub async fn dial_admin_pipe(path: &str) -> Result<Self, AdminError> {
102 use tokio::net::windows::named_pipe::ClientOptions;
103 let pipe = ClientOptions::new().open(path)?;
104 Ok(Self {
105 reader: BufReader::with_capacity(8192, Box::new(pipe)),
106 })
107 }
108
109 pub async fn recv(&mut self) -> Result<AdminEvent, AdminError> {
112 let mut line = Vec::with_capacity(512);
113 let n = self.reader.read_until(b'\n', &mut line).await?;
114 if n == 0 {
115 return Err(AdminError::Closed);
116 }
117 let event: AdminEvent = serde_json::from_slice(&line)?;
118 Ok(event)
119 }
120
121 pub async fn wait_ready(&mut self) -> Result<AdminEvent, AdminError> {
126 loop {
127 let ev = self.recv().await?;
128 if ev.status == "ready" {
129 return Ok(ev);
130 }
131 }
132 }
133}
134
135#[cfg(test)]
136mod tests {
137 use super::*;
138
139 #[test]
140 fn decodes_download_frame() {
141 let raw = br#"{
142 "id":"admin","type":"status","status":"loading_model","phase":"download",
143 "downloaded_bytes":33554432,"total_bytes":5126304928,
144 "source_url":"https://huggingface.co/example.gguf"
145 }"#;
146 let ev: AdminEvent = serde_json::from_slice(raw).unwrap();
147 assert_eq!(ev.id, "admin");
148 assert_eq!(ev.kind, "status");
149 assert_eq!(ev.status, "loading_model");
150 assert_eq!(ev.phase, "download");
151 assert_eq!(ev.downloaded_bytes, Some(33_554_432));
152 assert_eq!(ev.total_bytes, Some(5_126_304_928));
153 assert_eq!(
154 ev.source_url.as_deref(),
155 Some("https://huggingface.co/example.gguf")
156 );
157 }
158
159 #[test]
160 fn decodes_ready_frame() {
161 let raw = br#"{"id":"admin","type":"status","status":"ready"}"#;
162 let ev: AdminEvent = serde_json::from_slice(raw).unwrap();
163 assert_eq!(ev.status, "ready");
164 assert_eq!(ev.phase, "");
165 assert!(ev.downloaded_bytes.is_none());
166 }
167
168 #[test]
169 fn total_bytes_may_be_null() {
170 let raw = br#"{"id":"admin","type":"status","status":"loading_model","phase":"download","downloaded_bytes":1024,"total_bytes":null,"source_url":"https://x"}"#;
171 let ev: AdminEvent = serde_json::from_slice(raw).unwrap();
172 assert_eq!(ev.downloaded_bytes, Some(1024));
173 assert_eq!(ev.total_bytes, None);
174 }
175
176 #[test]
177 fn unknown_status_round_trips_verbatim() {
178 let raw = br#"{"id":"admin","type":"status","status":"future_state_we_dont_know","extra_key":42}"#;
179 let ev: AdminEvent = serde_json::from_slice(raw).unwrap();
180 assert_eq!(ev.status, "future_state_we_dont_know");
181 }
182
183 #[tokio::test]
184 async fn recv_decodes_from_a_duplex_pipe() {
185 let (mut server_side, client_side) = tokio::io::duplex(4096);
186 use tokio::io::AsyncWriteExt;
187 server_side
188 .write_all(b"{\"id\":\"admin\",\"type\":\"status\",\"status\":\"starting\"}\n")
189 .await
190 .unwrap();
191 server_side.flush().await.unwrap();
192
193 let mut client = AdminClient {
194 reader: BufReader::with_capacity(4096, Box::new(client_side)),
195 };
196 let ev = client.recv().await.unwrap();
197 assert_eq!(ev.status, "starting");
198 }
199
200 #[tokio::test]
201 async fn wait_ready_skips_loading_frames() {
202 let (mut server_side, client_side) = tokio::io::duplex(4096);
203 use tokio::io::AsyncWriteExt;
204 let writes = b"\
205{\"id\":\"admin\",\"type\":\"status\",\"status\":\"starting\"}\n\
206{\"id\":\"admin\",\"type\":\"status\",\"status\":\"loading_model\",\"phase\":\"checking_local\",\"path\":\"/x.gguf\"}\n\
207{\"id\":\"admin\",\"type\":\"status\",\"status\":\"loading_model\",\"phase\":\"mmap\",\"path\":\"/x.gguf\"}\n\
208{\"id\":\"admin\",\"type\":\"status\",\"status\":\"ready\"}\n\
209";
210 server_side.write_all(writes).await.unwrap();
211 server_side.flush().await.unwrap();
212
213 let mut client = AdminClient {
214 reader: BufReader::with_capacity(4096, Box::new(client_side)),
215 };
216 let ev = client.wait_ready().await.unwrap();
217 assert_eq!(ev.status, "ready");
218 }
219
220 #[tokio::test]
221 async fn recv_reports_closed_on_eof() {
222 let (server_side, client_side) = tokio::io::duplex(4096);
223 drop(server_side);
224
225 let mut client = AdminClient {
226 reader: BufReader::with_capacity(4096, Box::new(client_side)),
227 };
228 match client.recv().await {
229 Err(AdminError::Closed) => {}
230 other => panic!("expected Closed, got {other:?}"),
231 }
232 }
233}