1use crate::status::StatusEvent;
29use serde_json::json;
30use std::io;
31use std::sync::Arc;
32use tokio::io::{AsyncWrite, AsyncWriteExt};
33use tokio::sync::{broadcast, watch};
34use tracing::{debug, info, warn};
35
36pub const ADMIN_BROADCAST_CAPACITY: usize = 256;
40
41#[derive(Clone)]
48pub struct StatusBroadcaster {
49 snapshot: watch::Sender<StatusEvent>,
52 capabilities: watch::Sender<Option<StatusEvent>>,
57 events: broadcast::Sender<StatusEvent>,
60}
61
62impl StatusBroadcaster {
63 pub fn new(initial: StatusEvent) -> Self {
66 let (snapshot, _rx) = watch::channel(initial);
67 let (capabilities, _rx) = watch::channel(None);
68 let (events, _rx) = broadcast::channel(ADMIN_BROADCAST_CAPACITY);
69 Self {
70 snapshot,
71 capabilities,
72 events,
73 }
74 }
75
76 pub fn publish(&self, event: StatusEvent) {
79 if matches!(event, StatusEvent::Capabilities { .. }) {
84 let _ = self.capabilities.send_replace(Some(event.clone()));
85 } else {
86 let _ = self.snapshot.send_replace(event.clone());
87 }
88 let _ = self.events.send(event);
89 }
90
91 pub fn current(&self) -> StatusEvent {
94 self.snapshot.borrow().clone()
95 }
96
97 pub fn latest_capabilities(&self) -> Option<StatusEvent> {
101 self.capabilities.borrow().clone()
102 }
103
104 pub fn subscribe(&self) -> broadcast::Receiver<StatusEvent> {
107 self.events.subscribe()
108 }
109}
110
111fn render_frame(event: &StatusEvent) -> Vec<u8> {
114 let body = serde_json::to_value(event).unwrap_or_else(|_| json!({"status": "error"}));
118 let mut envelope = serde_json::Map::new();
119 envelope.insert("id".into(), json!("admin"));
120 envelope.insert("type".into(), json!("status"));
121 if let Some(obj) = body.as_object() {
122 for (k, v) in obj {
123 envelope.insert(k.clone(), v.clone());
124 }
125 }
126 let mut bytes = serde_json::to_vec(&serde_json::Value::Object(envelope)).unwrap_or_default();
127 bytes.push(b'\n');
128 bytes
129}
130
131async fn handle_admin_connection<W: AsyncWrite + Unpin>(
138 mut writer: W,
139 snapshot: StatusEvent,
140 capabilities: Option<StatusEvent>,
141 mut rx: broadcast::Receiver<StatusEvent>,
142) -> io::Result<()> {
143 if let Some(caps) = capabilities {
146 writer.write_all(&render_frame(&caps)).await?;
147 }
148 writer.write_all(&render_frame(&snapshot)).await?;
150 writer.flush().await?;
151
152 loop {
154 match rx.recv().await {
155 Ok(event) => {
156 if writer.write_all(&render_frame(&event)).await.is_err() {
157 return Ok(()); }
159 if writer.flush().await.is_err() {
160 return Ok(());
161 }
162 }
163 Err(broadcast::error::RecvError::Lagged(n)) => {
164 warn!(skipped = n, "admin client lagged broadcast; closing");
165 return Ok(());
166 }
167 Err(broadcast::error::RecvError::Closed) => return Ok(()),
168 }
169 }
170}
171
172#[cfg(unix)]
175pub async fn serve_admin_uds(
176 listener: tokio::net::UnixListener,
177 broadcaster: Arc<StatusBroadcaster>,
178 mut shutdown: tokio::sync::oneshot::Receiver<()>,
179) -> io::Result<()> {
180 info!("admin uds listener accepting");
181 loop {
182 tokio::select! {
183 _ = &mut shutdown => {
184 info!("admin shutdown signalled");
185 return Ok(());
186 }
187 accept = listener.accept() => {
188 let (stream, _) = accept?;
189 let snapshot = broadcaster.current();
190 let capabilities = broadcaster.latest_capabilities();
191 let rx = broadcaster.subscribe();
192 debug!("admin uds accept");
193 tokio::spawn(async move {
194 if let Err(e) = handle_admin_connection(stream, snapshot, capabilities, rx).await {
195 debug!(error = ?e, "admin connection ended with error");
196 }
197 });
198 }
199 }
200 }
201}
202
203#[cfg(windows)]
208pub async fn serve_admin_pipe(
209 path: &str,
210 first_instance: tokio::net::windows::named_pipe::NamedPipeServer,
211 broadcaster: Arc<StatusBroadcaster>,
212 mut shutdown: tokio::sync::oneshot::Receiver<()>,
213) -> io::Result<()> {
214 use crate::endpoint::bind_admin_pipe;
215
216 info!(path = %path, "admin pipe listener accepting");
217 let mut server = first_instance;
218 loop {
219 tokio::select! {
220 _ = &mut shutdown => {
221 info!("admin shutdown signalled");
222 return Ok(());
223 }
224 connect_result = server.connect() => {
225 connect_result?;
226 let connected = server;
227 server = bind_admin_pipe(path, false)?;
228
229 let snapshot = broadcaster.current();
230 let capabilities = broadcaster.latest_capabilities();
231 let rx = broadcaster.subscribe();
232 debug!("admin pipe accept");
233 tokio::spawn(async move {
234 if let Err(e) = handle_admin_connection(connected, snapshot, capabilities, rx).await {
235 debug!(error = ?e, "admin connection ended with error");
236 }
237 });
238 }
239 }
240 }
241}
242
243#[cfg(test)]
244mod tests {
245 use super::*;
246 use crate::status::LoadPhase;
247 use std::path::PathBuf;
248 use std::time::Duration;
249
250 fn parse_admin_frame(line: &[u8]) -> serde_json::Value {
251 let trimmed = std::str::from_utf8(line).unwrap().trim_end_matches('\n');
252 serde_json::from_str(trimmed).unwrap()
253 }
254
255 #[test]
256 fn render_frame_wraps_with_admin_envelope() {
257 let bytes = render_frame(&StatusEvent::Ready);
258 let v = parse_admin_frame(&bytes);
259 assert_eq!(v["id"], "admin");
260 assert_eq!(v["type"], "status");
261 assert_eq!(v["status"], "ready");
262 }
263
264 #[test]
265 fn render_frame_flattens_loading_model_phase() {
266 let bytes = render_frame(&StatusEvent::LoadingModel {
267 phase: LoadPhase::Download {
268 downloaded_bytes: 33_554_432,
269 total_bytes: Some(5_126_304_928),
270 source_url: "https://example.com/x.gguf".into(),
271 },
272 });
273 let v = parse_admin_frame(&bytes);
274 assert_eq!(v["id"], "admin");
275 assert_eq!(v["type"], "status");
276 assert_eq!(v["status"], "loading_model");
277 assert_eq!(v["phase"], "download");
278 assert_eq!(v["downloaded_bytes"], 33_554_432);
279 assert_eq!(v["total_bytes"], 5_126_304_928u64);
280 assert_eq!(v["source_url"], "https://example.com/x.gguf");
281 }
282
283 #[tokio::test]
284 async fn broadcaster_snapshot_returns_initial_state() {
285 let b = StatusBroadcaster::new(StatusEvent::Starting);
286 match b.current() {
287 StatusEvent::Starting => {}
288 other => panic!("expected Starting, got {other:?}"),
289 }
290 }
291
292 #[tokio::test]
293 async fn broadcaster_publish_updates_snapshot_and_fans_out() {
294 let b = StatusBroadcaster::new(StatusEvent::Starting);
295 let mut rx1 = b.subscribe();
296 let mut rx2 = b.subscribe();
297
298 b.publish(StatusEvent::Ready);
299
300 match rx1.recv().await {
302 Ok(StatusEvent::Ready) => {}
303 other => panic!("rx1: expected Ready, got {other:?}"),
304 }
305 match rx2.recv().await {
306 Ok(StatusEvent::Ready) => {}
307 other => panic!("rx2: expected Ready, got {other:?}"),
308 }
309
310 match b.current() {
312 StatusEvent::Ready => {}
313 other => panic!("expected snapshot Ready, got {other:?}"),
314 }
315 }
316
317 #[tokio::test]
318 async fn capabilities_publish_does_not_overwrite_lifecycle_snapshot() {
319 let b = StatusBroadcaster::new(StatusEvent::Starting);
323 b.publish(StatusEvent::Capabilities {
324 backend: "llamacpp".into(),
325 v2: true,
326 vision: true,
327 audio: false,
328 tools: true,
329 thinking: true,
330 embed: false,
331 accelerator: "cuda".into(),
332 gpu_layers: 99,
333 });
334 match b.current() {
336 StatusEvent::Starting => {}
337 other => panic!("expected Starting in snapshot, got {other:?}"),
338 }
339 match b.latest_capabilities() {
341 Some(StatusEvent::Capabilities {
342 backend,
343 accelerator,
344 gpu_layers,
345 ..
346 }) => {
347 assert_eq!(backend, "llamacpp");
348 assert_eq!(accelerator, "cuda");
349 assert_eq!(gpu_layers, 99);
350 }
351 other => panic!("expected Capabilities, got {other:?}"),
352 }
353 }
354
355 #[tokio::test]
356 async fn handle_admin_connection_writes_capabilities_then_snapshot() {
357 let (server_side, mut client_side) = tokio::io::duplex(64 * 1024);
358 let b = StatusBroadcaster::new(StatusEvent::Starting);
359 b.publish(StatusEvent::Capabilities {
360 backend: "llamacpp".into(),
361 v2: true,
362 vision: false,
363 audio: false,
364 tools: true,
365 thinking: true,
366 embed: false,
367 accelerator: "cpu".into(),
368 gpu_layers: 0,
369 });
370 b.publish(StatusEvent::Ready);
371
372 let snapshot = b.current();
373 let capabilities = b.latest_capabilities();
374 let rx = b.subscribe();
375 let handle = tokio::spawn(async move {
376 let _ = handle_admin_connection(server_side, snapshot, capabilities, rx).await;
377 });
378
379 use tokio::io::AsyncBufReadExt;
380 let mut reader = tokio::io::BufReader::new(&mut client_side);
381
382 let mut line = Vec::new();
384 let n = reader.read_until(b'\n', &mut line).await.unwrap();
385 assert!(n > 0);
386 let v = parse_admin_frame(&line);
387 assert_eq!(v["status"], "capabilities");
388 assert_eq!(v["backend"], "llamacpp");
389 assert_eq!(v["accelerator"], "cpu");
390 assert_eq!(v["gpu_layers"], 0);
391
392 let mut line2 = Vec::new();
394 let n2 = reader.read_until(b'\n', &mut line2).await.unwrap();
395 assert!(n2 > 0);
396 let v2 = parse_admin_frame(&line2);
397 assert_eq!(v2["status"], "ready");
398
399 drop(client_side);
400 let _ = tokio::time::timeout(Duration::from_secs(1), handle).await;
401 }
402
403 #[tokio::test]
404 async fn handle_admin_connection_writes_snapshot_first() {
405 let (server_side, mut client_side) = tokio::io::duplex(64 * 1024);
408 let b = StatusBroadcaster::new(StatusEvent::Starting);
409 b.publish(StatusEvent::LoadingModel {
410 phase: LoadPhase::CheckingLocal {
411 path: PathBuf::from("/tmp/x.gguf"),
412 },
413 });
414
415 let snapshot = b.current();
416 let capabilities = b.latest_capabilities();
417 let rx = b.subscribe();
418 let handle = tokio::spawn(async move {
419 let _ = handle_admin_connection(server_side, snapshot, capabilities, rx).await;
420 });
421
422 use tokio::io::AsyncBufReadExt;
424 let mut reader = tokio::io::BufReader::new(&mut client_side);
425 let mut line = Vec::new();
426 let n = reader.read_until(b'\n', &mut line).await.unwrap();
427 assert!(n > 0);
428 let v = parse_admin_frame(&line);
429 assert_eq!(v["status"], "loading_model");
430 assert_eq!(v["phase"], "checking_local");
431
432 b.publish(StatusEvent::Ready);
434 let mut line2 = Vec::new();
435 let read =
436 tokio::time::timeout(Duration::from_secs(1), reader.read_until(b'\n', &mut line2))
437 .await
438 .unwrap()
439 .unwrap();
440 assert!(read > 0);
441 let v2 = parse_admin_frame(&line2);
442 assert_eq!(v2["status"], "ready");
443
444 drop(client_side);
445 let _ = tokio::time::timeout(Duration::from_secs(1), handle).await;
446 }
447}