Skip to main content

inferd_daemon/
admin.rs

1//! Admin socket — push-style daemon-lifecycle event broadcast.
2//!
3//! Specified in `docs/protocol-v1.md` §"Admin endpoint". On connect,
4//! the daemon writes a snapshot frame (current state). Every state
5//! transition pushes another frame. Read-only stream from daemon →
6//! client; client writes are ignored.
7//!
8//! Wire shape:
9//!
10//! ```json
11//! {"id":"admin","type":"status","status":"loading_model","phase":"download",
12//!  "downloaded_bytes":33554432,"total_bytes":5126304928,
13//!  "source_url":"https://huggingface.co/..."}
14//! ```
15//!
16//! Architecture:
17//!
18//! - `StatusBroadcaster` wraps a `tokio::sync::watch` (for the
19//!   "current state" snapshot newcomers see) and a
20//!   `tokio::sync::broadcast` (for the live event stream).
21//! - `serve_admin_*` accept loops per platform; each accepted
22//!   connection spawns a writer task that (1) writes the watch's
23//!   current value as the snapshot, then (2) forwards every
24//!   broadcast event after that. Slow clients overflow the
25//!   broadcast buffer and get disconnected with EOF — they
26//!   reconnect to resume.
27
28use crate::status::StatusEvent;
29use serde_json::json;
30use std::io;
31use std::sync::Arc;
32use tokio::io::{AsyncWrite, AsyncWriteExt};
33use tokio::sync::{broadcast, watch};
34use tracing::{debug, info, warn};
35
36/// Capacity of the broadcast channel that fans events out to admin
37/// clients. Past this, slow clients lag behind and the daemon drops
38/// them — they reconnect to resume from the current snapshot.
39pub const ADMIN_BROADCAST_CAPACITY: usize = 256;
40
41/// Daemon-wide status broadcaster.
42///
43/// Producers (fetch, lifecycle, etc.) call `publish` to update the
44/// current state. Subscribers (admin clients) connect to the admin
45/// socket; each gets a snapshot of the current state on connect, then
46/// the live event stream.
47#[derive(Clone)]
48pub struct StatusBroadcaster {
49    /// `watch` carries the *most recent* event so newcomers don't
50    /// need to wait for the next state change.
51    snapshot: watch::Sender<StatusEvent>,
52    /// `watch` carries the most recent `Capabilities` event so
53    /// late-connecting one-shot subscribers (e.g. `inferdctl doctor`)
54    /// see capability info even though it was published once at boot.
55    /// `None` until the backend has been constructed.
56    capabilities: watch::Sender<Option<StatusEvent>>,
57    /// `broadcast` carries the live event stream. Subscribers receive
58    /// every event from the moment they subscribe.
59    events: broadcast::Sender<StatusEvent>,
60}
61
62impl StatusBroadcaster {
63    /// Build a fresh broadcaster seeded with `initial`. Typical
64    /// initial value is `StatusEvent::Starting`.
65    pub fn new(initial: StatusEvent) -> Self {
66        let (snapshot, _rx) = watch::channel(initial);
67        let (capabilities, _rx) = watch::channel(None);
68        let (events, _rx) = broadcast::channel(ADMIN_BROADCAST_CAPACITY);
69        Self {
70            snapshot,
71            capabilities,
72            events,
73        }
74    }
75
76    /// Publish a new state. Updates the snapshot (so subsequent
77    /// connects see this) AND fans out to all current subscribers.
78    pub fn publish(&self, event: StatusEvent) {
79        // `send_replace` updates the watch's stored value regardless
80        // of how many receivers are alive — `send` would silently
81        // drop the value if no one is currently subscribed, which
82        // is wrong for snapshot semantics.
83        if matches!(event, StatusEvent::Capabilities { .. }) {
84            let _ = self.capabilities.send_replace(Some(event.clone()));
85        } else {
86            let _ = self.snapshot.send_replace(event.clone());
87        }
88        let _ = self.events.send(event);
89    }
90
91    /// Snapshot of the current state. Used by the admin accept loop
92    /// to write the first frame on connect.
93    pub fn current(&self) -> StatusEvent {
94        self.snapshot.borrow().clone()
95    }
96
97    /// Most recent capability advertisement, if any. The admin accept
98    /// loop writes this *before* the snapshot frame so one-shot
99    /// readers see capabilities even when they connect after Ready.
100    pub fn latest_capabilities(&self) -> Option<StatusEvent> {
101        self.capabilities.borrow().clone()
102    }
103
104    /// Subscribe to the live event stream. The receiver yields every
105    /// event published *after* this call.
106    pub fn subscribe(&self) -> broadcast::Receiver<StatusEvent> {
107        self.events.subscribe()
108    }
109}
110
111/// Serialise a single `StatusEvent` into the admin wire shape:
112/// `{"id":"admin","type":"status",...}` plus a trailing newline.
113fn render_frame(event: &StatusEvent) -> Vec<u8> {
114    // `serde(tag = "status", flatten phase)` already produces the
115    // status + phase + detail structure; we wrap it with the admin
116    // envelope (`id`, `type`).
117    let body = serde_json::to_value(event).unwrap_or_else(|_| json!({"status": "error"}));
118    let mut envelope = serde_json::Map::new();
119    envelope.insert("id".into(), json!("admin"));
120    envelope.insert("type".into(), json!("status"));
121    if let Some(obj) = body.as_object() {
122        for (k, v) in obj {
123            envelope.insert(k.clone(), v.clone());
124        }
125    }
126    let mut bytes = serde_json::to_vec(&serde_json::Value::Object(envelope)).unwrap_or_default();
127    bytes.push(b'\n');
128    bytes
129}
130
131/// Drive one accepted admin connection: write the snapshot frame,
132/// then forward every subsequent broadcast event until the client
133/// disconnects or the broadcast lags out (slow consumer).
134///
135/// The connection is read-only from the client's perspective; this
136/// function never reads from the stream.
137async fn handle_admin_connection<W: AsyncWrite + Unpin>(
138    mut writer: W,
139    snapshot: StatusEvent,
140    capabilities: Option<StatusEvent>,
141    mut rx: broadcast::Receiver<StatusEvent>,
142) -> io::Result<()> {
143    // 1a. Capability frame (if known) — written before the snapshot
144    // so one-shot subscribers see it.
145    if let Some(caps) = capabilities {
146        writer.write_all(&render_frame(&caps)).await?;
147    }
148    // 1b. Snapshot frame.
149    writer.write_all(&render_frame(&snapshot)).await?;
150    writer.flush().await?;
151
152    // 2. Live event stream.
153    loop {
154        match rx.recv().await {
155            Ok(event) => {
156                if writer.write_all(&render_frame(&event)).await.is_err() {
157                    return Ok(()); // peer closed
158                }
159                if writer.flush().await.is_err() {
160                    return Ok(());
161                }
162            }
163            Err(broadcast::error::RecvError::Lagged(n)) => {
164                warn!(skipped = n, "admin client lagged broadcast; closing");
165                return Ok(());
166            }
167            Err(broadcast::error::RecvError::Closed) => return Ok(()),
168        }
169    }
170}
171
172/// Serve an admin Unix domain socket (Unix only). Loops until
173/// `shutdown` resolves.
174#[cfg(unix)]
175pub async fn serve_admin_uds(
176    listener: tokio::net::UnixListener,
177    broadcaster: Arc<StatusBroadcaster>,
178    mut shutdown: tokio::sync::oneshot::Receiver<()>,
179) -> io::Result<()> {
180    info!("admin uds listener accepting");
181    loop {
182        tokio::select! {
183            _ = &mut shutdown => {
184                info!("admin shutdown signalled");
185                return Ok(());
186            }
187            accept = listener.accept() => {
188                let (stream, _) = accept?;
189                let snapshot = broadcaster.current();
190                let capabilities = broadcaster.latest_capabilities();
191                let rx = broadcaster.subscribe();
192                debug!("admin uds accept");
193                tokio::spawn(async move {
194                    if let Err(e) = handle_admin_connection(stream, snapshot, capabilities, rx).await {
195                        debug!(error = ?e, "admin connection ended with error");
196                    }
197                });
198            }
199        }
200    }
201}
202
203/// Serve an admin Windows named pipe (Windows only). Mirrors the
204/// inference-pipe multi-instance accept pattern: caller pre-binds
205/// the first instance, the loop binds the next one before spawning
206/// the handler.
207#[cfg(windows)]
208pub async fn serve_admin_pipe(
209    path: &str,
210    first_instance: tokio::net::windows::named_pipe::NamedPipeServer,
211    broadcaster: Arc<StatusBroadcaster>,
212    mut shutdown: tokio::sync::oneshot::Receiver<()>,
213) -> io::Result<()> {
214    use crate::endpoint::bind_admin_pipe;
215
216    info!(path = %path, "admin pipe listener accepting");
217    let mut server = first_instance;
218    loop {
219        tokio::select! {
220            _ = &mut shutdown => {
221                info!("admin shutdown signalled");
222                return Ok(());
223            }
224            connect_result = server.connect() => {
225                connect_result?;
226                let connected = server;
227                server = bind_admin_pipe(path, false)?;
228
229                let snapshot = broadcaster.current();
230                let capabilities = broadcaster.latest_capabilities();
231                let rx = broadcaster.subscribe();
232                debug!("admin pipe accept");
233                tokio::spawn(async move {
234                    if let Err(e) = handle_admin_connection(connected, snapshot, capabilities, rx).await {
235                        debug!(error = ?e, "admin connection ended with error");
236                    }
237                });
238            }
239        }
240    }
241}
242
243#[cfg(test)]
244mod tests {
245    use super::*;
246    use crate::status::LoadPhase;
247    use std::path::PathBuf;
248    use std::time::Duration;
249
250    fn parse_admin_frame(line: &[u8]) -> serde_json::Value {
251        let trimmed = std::str::from_utf8(line).unwrap().trim_end_matches('\n');
252        serde_json::from_str(trimmed).unwrap()
253    }
254
255    #[test]
256    fn render_frame_wraps_with_admin_envelope() {
257        let bytes = render_frame(&StatusEvent::Ready);
258        let v = parse_admin_frame(&bytes);
259        assert_eq!(v["id"], "admin");
260        assert_eq!(v["type"], "status");
261        assert_eq!(v["status"], "ready");
262    }
263
264    #[test]
265    fn render_frame_flattens_loading_model_phase() {
266        let bytes = render_frame(&StatusEvent::LoadingModel {
267            phase: LoadPhase::Download {
268                downloaded_bytes: 33_554_432,
269                total_bytes: Some(5_126_304_928),
270                source_url: "https://example.com/x.gguf".into(),
271            },
272        });
273        let v = parse_admin_frame(&bytes);
274        assert_eq!(v["id"], "admin");
275        assert_eq!(v["type"], "status");
276        assert_eq!(v["status"], "loading_model");
277        assert_eq!(v["phase"], "download");
278        assert_eq!(v["downloaded_bytes"], 33_554_432);
279        assert_eq!(v["total_bytes"], 5_126_304_928u64);
280        assert_eq!(v["source_url"], "https://example.com/x.gguf");
281    }
282
283    #[tokio::test]
284    async fn broadcaster_snapshot_returns_initial_state() {
285        let b = StatusBroadcaster::new(StatusEvent::Starting);
286        match b.current() {
287            StatusEvent::Starting => {}
288            other => panic!("expected Starting, got {other:?}"),
289        }
290    }
291
292    #[tokio::test]
293    async fn broadcaster_publish_updates_snapshot_and_fans_out() {
294        let b = StatusBroadcaster::new(StatusEvent::Starting);
295        let mut rx1 = b.subscribe();
296        let mut rx2 = b.subscribe();
297
298        b.publish(StatusEvent::Ready);
299
300        // Both subscribers see the event.
301        match rx1.recv().await {
302            Ok(StatusEvent::Ready) => {}
303            other => panic!("rx1: expected Ready, got {other:?}"),
304        }
305        match rx2.recv().await {
306            Ok(StatusEvent::Ready) => {}
307            other => panic!("rx2: expected Ready, got {other:?}"),
308        }
309
310        // Snapshot reflects the latest publish.
311        match b.current() {
312            StatusEvent::Ready => {}
313            other => panic!("expected snapshot Ready, got {other:?}"),
314        }
315    }
316
317    #[tokio::test]
318    async fn capabilities_publish_does_not_overwrite_lifecycle_snapshot() {
319        // The Capabilities event is stored in its own slot so that
320        // late-connecting subscribers still see the latest lifecycle
321        // state (Ready / Restarting / Draining) as their snapshot.
322        let b = StatusBroadcaster::new(StatusEvent::Starting);
323        b.publish(StatusEvent::Capabilities {
324            backend: "llamacpp".into(),
325            v2: true,
326            vision: true,
327            audio: false,
328            tools: true,
329            thinking: true,
330            embed: false,
331            accelerator: "cuda".into(),
332            gpu_layers: 99,
333        });
334        // Snapshot is still Starting — Capabilities lives outside it.
335        match b.current() {
336            StatusEvent::Starting => {}
337            other => panic!("expected Starting in snapshot, got {other:?}"),
338        }
339        // Capabilities is recorded for the connect prefix.
340        match b.latest_capabilities() {
341            Some(StatusEvent::Capabilities {
342                backend,
343                accelerator,
344                gpu_layers,
345                ..
346            }) => {
347                assert_eq!(backend, "llamacpp");
348                assert_eq!(accelerator, "cuda");
349                assert_eq!(gpu_layers, 99);
350            }
351            other => panic!("expected Capabilities, got {other:?}"),
352        }
353    }
354
355    #[tokio::test]
356    async fn handle_admin_connection_writes_capabilities_then_snapshot() {
357        let (server_side, mut client_side) = tokio::io::duplex(64 * 1024);
358        let b = StatusBroadcaster::new(StatusEvent::Starting);
359        b.publish(StatusEvent::Capabilities {
360            backend: "llamacpp".into(),
361            v2: true,
362            vision: false,
363            audio: false,
364            tools: true,
365            thinking: true,
366            embed: false,
367            accelerator: "cpu".into(),
368            gpu_layers: 0,
369        });
370        b.publish(StatusEvent::Ready);
371
372        let snapshot = b.current();
373        let capabilities = b.latest_capabilities();
374        let rx = b.subscribe();
375        let handle = tokio::spawn(async move {
376            let _ = handle_admin_connection(server_side, snapshot, capabilities, rx).await;
377        });
378
379        use tokio::io::AsyncBufReadExt;
380        let mut reader = tokio::io::BufReader::new(&mut client_side);
381
382        // First frame: capabilities.
383        let mut line = Vec::new();
384        let n = reader.read_until(b'\n', &mut line).await.unwrap();
385        assert!(n > 0);
386        let v = parse_admin_frame(&line);
387        assert_eq!(v["status"], "capabilities");
388        assert_eq!(v["backend"], "llamacpp");
389        assert_eq!(v["accelerator"], "cpu");
390        assert_eq!(v["gpu_layers"], 0);
391
392        // Second frame: snapshot (Ready).
393        let mut line2 = Vec::new();
394        let n2 = reader.read_until(b'\n', &mut line2).await.unwrap();
395        assert!(n2 > 0);
396        let v2 = parse_admin_frame(&line2);
397        assert_eq!(v2["status"], "ready");
398
399        drop(client_side);
400        let _ = tokio::time::timeout(Duration::from_secs(1), handle).await;
401    }
402
403    #[tokio::test]
404    async fn handle_admin_connection_writes_snapshot_first() {
405        // Use a duplex pipe so we can read what the handler wrote
406        // without binding a real socket.
407        let (server_side, mut client_side) = tokio::io::duplex(64 * 1024);
408        let b = StatusBroadcaster::new(StatusEvent::Starting);
409        b.publish(StatusEvent::LoadingModel {
410            phase: LoadPhase::CheckingLocal {
411                path: PathBuf::from("/tmp/x.gguf"),
412            },
413        });
414
415        let snapshot = b.current();
416        let capabilities = b.latest_capabilities();
417        let rx = b.subscribe();
418        let handle = tokio::spawn(async move {
419            let _ = handle_admin_connection(server_side, snapshot, capabilities, rx).await;
420        });
421
422        // Read the snapshot frame.
423        use tokio::io::AsyncBufReadExt;
424        let mut reader = tokio::io::BufReader::new(&mut client_side);
425        let mut line = Vec::new();
426        let n = reader.read_until(b'\n', &mut line).await.unwrap();
427        assert!(n > 0);
428        let v = parse_admin_frame(&line);
429        assert_eq!(v["status"], "loading_model");
430        assert_eq!(v["phase"], "checking_local");
431
432        // Publish another event; client should see it.
433        b.publish(StatusEvent::Ready);
434        let mut line2 = Vec::new();
435        let read =
436            tokio::time::timeout(Duration::from_secs(1), reader.read_until(b'\n', &mut line2))
437                .await
438                .unwrap()
439                .unwrap();
440        assert!(read > 0);
441        let v2 = parse_admin_frame(&line2);
442        assert_eq!(v2["status"], "ready");
443
444        drop(client_side);
445        let _ = tokio::time::timeout(Duration::from_secs(1), handle).await;
446    }
447}