use crate::status::StatusEvent;
use serde_json::json;
use std::io;
use std::sync::Arc;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::{broadcast, watch};
use tracing::{debug, info, warn};
pub const ADMIN_BROADCAST_CAPACITY: usize = 256;
#[derive(Clone)]
pub struct StatusBroadcaster {
snapshot: watch::Sender<StatusEvent>,
capabilities: watch::Sender<Option<StatusEvent>>,
events: broadcast::Sender<StatusEvent>,
}
impl StatusBroadcaster {
pub fn new(initial: StatusEvent) -> Self {
let (snapshot, _rx) = watch::channel(initial);
let (capabilities, _rx) = watch::channel(None);
let (events, _rx) = broadcast::channel(ADMIN_BROADCAST_CAPACITY);
Self {
snapshot,
capabilities,
events,
}
}
pub fn publish(&self, event: StatusEvent) {
if matches!(event, StatusEvent::Capabilities { .. }) {
let _ = self.capabilities.send_replace(Some(event.clone()));
} else {
let _ = self.snapshot.send_replace(event.clone());
}
let _ = self.events.send(event);
}
pub fn current(&self) -> StatusEvent {
self.snapshot.borrow().clone()
}
pub fn latest_capabilities(&self) -> Option<StatusEvent> {
self.capabilities.borrow().clone()
}
pub fn subscribe(&self) -> broadcast::Receiver<StatusEvent> {
self.events.subscribe()
}
}
fn render_frame(event: &StatusEvent) -> Vec<u8> {
let body = serde_json::to_value(event).unwrap_or_else(|_| json!({"status": "error"}));
let mut envelope = serde_json::Map::new();
envelope.insert("id".into(), json!("admin"));
envelope.insert("type".into(), json!("status"));
if let Some(obj) = body.as_object() {
for (k, v) in obj {
envelope.insert(k.clone(), v.clone());
}
}
let mut bytes = serde_json::to_vec(&serde_json::Value::Object(envelope)).unwrap_or_default();
bytes.push(b'\n');
bytes
}
async fn handle_admin_connection<W: AsyncWrite + Unpin>(
mut writer: W,
snapshot: StatusEvent,
capabilities: Option<StatusEvent>,
mut rx: broadcast::Receiver<StatusEvent>,
) -> io::Result<()> {
if let Some(caps) = capabilities {
writer.write_all(&render_frame(&caps)).await?;
}
writer.write_all(&render_frame(&snapshot)).await?;
writer.flush().await?;
loop {
match rx.recv().await {
Ok(event) => {
if writer.write_all(&render_frame(&event)).await.is_err() {
return Ok(()); }
if writer.flush().await.is_err() {
return Ok(());
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!(skipped = n, "admin client lagged broadcast; closing");
return Ok(());
}
Err(broadcast::error::RecvError::Closed) => return Ok(()),
}
}
}
#[cfg(unix)]
pub async fn serve_admin_uds(
listener: tokio::net::UnixListener,
broadcaster: Arc<StatusBroadcaster>,
mut shutdown: tokio::sync::oneshot::Receiver<()>,
) -> io::Result<()> {
info!("admin uds listener accepting");
loop {
tokio::select! {
_ = &mut shutdown => {
info!("admin shutdown signalled");
return Ok(());
}
accept = listener.accept() => {
let (stream, _) = accept?;
let snapshot = broadcaster.current();
let capabilities = broadcaster.latest_capabilities();
let rx = broadcaster.subscribe();
debug!("admin uds accept");
tokio::spawn(async move {
if let Err(e) = handle_admin_connection(stream, snapshot, capabilities, rx).await {
debug!(error = ?e, "admin connection ended with error");
}
});
}
}
}
}
#[cfg(windows)]
pub async fn serve_admin_pipe(
path: &str,
first_instance: tokio::net::windows::named_pipe::NamedPipeServer,
broadcaster: Arc<StatusBroadcaster>,
mut shutdown: tokio::sync::oneshot::Receiver<()>,
) -> io::Result<()> {
use crate::endpoint::bind_admin_pipe;
info!(path = %path, "admin pipe listener accepting");
let mut server = first_instance;
loop {
tokio::select! {
_ = &mut shutdown => {
info!("admin shutdown signalled");
return Ok(());
}
connect_result = server.connect() => {
connect_result?;
let connected = server;
server = bind_admin_pipe(path, false)?;
let snapshot = broadcaster.current();
let capabilities = broadcaster.latest_capabilities();
let rx = broadcaster.subscribe();
debug!("admin pipe accept");
tokio::spawn(async move {
if let Err(e) = handle_admin_connection(connected, snapshot, capabilities, rx).await {
debug!(error = ?e, "admin connection ended with error");
}
});
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::status::LoadPhase;
use std::path::PathBuf;
use std::time::Duration;
fn parse_admin_frame(line: &[u8]) -> serde_json::Value {
let trimmed = std::str::from_utf8(line).unwrap().trim_end_matches('\n');
serde_json::from_str(trimmed).unwrap()
}
#[test]
fn render_frame_wraps_with_admin_envelope() {
let bytes = render_frame(&StatusEvent::Ready);
let v = parse_admin_frame(&bytes);
assert_eq!(v["id"], "admin");
assert_eq!(v["type"], "status");
assert_eq!(v["status"], "ready");
}
#[test]
fn render_frame_flattens_loading_model_phase() {
let bytes = render_frame(&StatusEvent::LoadingModel {
phase: LoadPhase::Download {
downloaded_bytes: 33_554_432,
total_bytes: Some(5_126_304_928),
source_url: "https://example.com/x.gguf".into(),
},
});
let v = parse_admin_frame(&bytes);
assert_eq!(v["id"], "admin");
assert_eq!(v["type"], "status");
assert_eq!(v["status"], "loading_model");
assert_eq!(v["phase"], "download");
assert_eq!(v["downloaded_bytes"], 33_554_432);
assert_eq!(v["total_bytes"], 5_126_304_928u64);
assert_eq!(v["source_url"], "https://example.com/x.gguf");
}
#[tokio::test]
async fn broadcaster_snapshot_returns_initial_state() {
let b = StatusBroadcaster::new(StatusEvent::Starting);
match b.current() {
StatusEvent::Starting => {}
other => panic!("expected Starting, got {other:?}"),
}
}
#[tokio::test]
async fn broadcaster_publish_updates_snapshot_and_fans_out() {
let b = StatusBroadcaster::new(StatusEvent::Starting);
let mut rx1 = b.subscribe();
let mut rx2 = b.subscribe();
b.publish(StatusEvent::Ready);
match rx1.recv().await {
Ok(StatusEvent::Ready) => {}
other => panic!("rx1: expected Ready, got {other:?}"),
}
match rx2.recv().await {
Ok(StatusEvent::Ready) => {}
other => panic!("rx2: expected Ready, got {other:?}"),
}
match b.current() {
StatusEvent::Ready => {}
other => panic!("expected snapshot Ready, got {other:?}"),
}
}
#[tokio::test]
async fn capabilities_publish_does_not_overwrite_lifecycle_snapshot() {
let b = StatusBroadcaster::new(StatusEvent::Starting);
b.publish(StatusEvent::Capabilities {
backend: "llamacpp".into(),
v2: true,
vision: true,
audio: false,
tools: true,
thinking: true,
embed: false,
accelerator: "cuda".into(),
gpu_layers: 99,
});
match b.current() {
StatusEvent::Starting => {}
other => panic!("expected Starting in snapshot, got {other:?}"),
}
match b.latest_capabilities() {
Some(StatusEvent::Capabilities {
backend,
accelerator,
gpu_layers,
..
}) => {
assert_eq!(backend, "llamacpp");
assert_eq!(accelerator, "cuda");
assert_eq!(gpu_layers, 99);
}
other => panic!("expected Capabilities, got {other:?}"),
}
}
#[tokio::test]
async fn handle_admin_connection_writes_capabilities_then_snapshot() {
let (server_side, mut client_side) = tokio::io::duplex(64 * 1024);
let b = StatusBroadcaster::new(StatusEvent::Starting);
b.publish(StatusEvent::Capabilities {
backend: "llamacpp".into(),
v2: true,
vision: false,
audio: false,
tools: true,
thinking: true,
embed: false,
accelerator: "cpu".into(),
gpu_layers: 0,
});
b.publish(StatusEvent::Ready);
let snapshot = b.current();
let capabilities = b.latest_capabilities();
let rx = b.subscribe();
let handle = tokio::spawn(async move {
let _ = handle_admin_connection(server_side, snapshot, capabilities, rx).await;
});
use tokio::io::AsyncBufReadExt;
let mut reader = tokio::io::BufReader::new(&mut client_side);
let mut line = Vec::new();
let n = reader.read_until(b'\n', &mut line).await.unwrap();
assert!(n > 0);
let v = parse_admin_frame(&line);
assert_eq!(v["status"], "capabilities");
assert_eq!(v["backend"], "llamacpp");
assert_eq!(v["accelerator"], "cpu");
assert_eq!(v["gpu_layers"], 0);
let mut line2 = Vec::new();
let n2 = reader.read_until(b'\n', &mut line2).await.unwrap();
assert!(n2 > 0);
let v2 = parse_admin_frame(&line2);
assert_eq!(v2["status"], "ready");
drop(client_side);
let _ = tokio::time::timeout(Duration::from_secs(1), handle).await;
}
#[tokio::test]
async fn handle_admin_connection_writes_snapshot_first() {
let (server_side, mut client_side) = tokio::io::duplex(64 * 1024);
let b = StatusBroadcaster::new(StatusEvent::Starting);
b.publish(StatusEvent::LoadingModel {
phase: LoadPhase::CheckingLocal {
path: PathBuf::from("/tmp/x.gguf"),
},
});
let snapshot = b.current();
let capabilities = b.latest_capabilities();
let rx = b.subscribe();
let handle = tokio::spawn(async move {
let _ = handle_admin_connection(server_side, snapshot, capabilities, rx).await;
});
use tokio::io::AsyncBufReadExt;
let mut reader = tokio::io::BufReader::new(&mut client_side);
let mut line = Vec::new();
let n = reader.read_until(b'\n', &mut line).await.unwrap();
assert!(n > 0);
let v = parse_admin_frame(&line);
assert_eq!(v["status"], "loading_model");
assert_eq!(v["phase"], "checking_local");
b.publish(StatusEvent::Ready);
let mut line2 = Vec::new();
let read =
tokio::time::timeout(Duration::from_secs(1), reader.read_until(b'\n', &mut line2))
.await
.unwrap()
.unwrap();
assert!(read > 0);
let v2 = parse_admin_frame(&line2);
assert_eq!(v2["status"], "ready");
drop(client_side);
let _ = tokio::time::timeout(Duration::from_secs(1), handle).await;
}
}