use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use axum::extract::{Path, Query, State};
use axum::http::{HeaderMap, StatusCode};
use axum::response::sse::{Event, KeepAlive, Sse};
use axum::response::IntoResponse;
use axum::routing::{get, post};
use axum::{Json, Router};
use serde_json::{json, Value};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_stream::{Stream, StreamExt};
use super::ipc::Frame;
use super::state::DaemonState;
use super::Result;
#[derive(Debug, Clone, Default)]
pub struct HttpConfig {
pub listen: Option<String>,
pub tokens: super::auth::TokenRegistry,
}
#[derive(Clone)]
struct AppState {
daemon: Arc<DaemonState>,
tokens: super::auth::TokenRegistry,
started_at: std::time::Instant,
}
pub async fn serve_http(cfg: HttpConfig, daemon: Arc<DaemonState>) -> Result<()> {
let Some(listen) = cfg.listen.clone() else {
tracing::info!("http listener disabled (no [http].listen in daemon.toml)");
return Ok(());
};
let addr: SocketAddr = listen
.parse()
.map_err(|e| super::DaemonError::other(format!("[http].listen parse: {e}")))?;
if !addr.ip().is_loopback() && cfg.tokens.is_empty() {
return Err(super::DaemonError::other(format!(
"refusing to bind http listener on non-loopback {addr} without [http.tokens] — \
configure at least one bearer token first"
)));
}
let token_count = cfg.tokens.len();
let app_state = AppState {
daemon,
tokens: cfg.tokens,
started_at: std::time::Instant::now(),
};
let app = Router::new()
.route("/health", get(handler_health))
.route("/ops", get(handler_ops))
.route("/openapi", get(handler_openapi))
.route("/openapi.json", get(handler_openapi))
.route("/metrics", get(handler_metrics))
.route("/op/:name", post(handler_op))
.route("/stream/watch", get(handler_stream_watch))
.route("/stream/events", get(handler_stream_events))
.route("/stream/definitions", get(handler_stream_definitions))
.with_state(app_state);
let listener = tokio::net::TcpListener::bind(addr)
.await
.map_err(|e| super::DaemonError::other(format!("[http] tcp bind {addr}: {e}")))?;
tracing::info!(%addr, tokens = token_count, "http listener up");
tokio::spawn(async move {
if let Err(e) = axum::serve(listener, app).await {
tracing::error!(?e, "http listener exited");
}
});
Ok(())
}
fn authorize<'a>(
headers: &HeaderMap,
registry: &'a super::auth::TokenRegistry,
) -> std::result::Result<Option<&'a super::auth::Token>, StatusCode> {
if registry.is_empty() {
return Ok(None);
}
let header = headers
.get(axum::http::header::AUTHORIZATION)
.and_then(|v| v.to_str().ok())
.unwrap_or("");
let secret = header.strip_prefix("Bearer ").unwrap_or("").trim();
if secret.is_empty() {
return Err(StatusCode::UNAUTHORIZED);
}
registry
.lookup(secret)
.map(Some)
.ok_or(StatusCode::UNAUTHORIZED)
}
async fn handler_health(State(s): State<AppState>) -> impl IntoResponse {
let uptime_ms = s.started_at.elapsed().as_millis() as u64;
Json(json!({
"ok": true,
"version": env!("CARGO_PKG_VERSION"),
"uptime_ms": uptime_ms,
}))
}
async fn handler_ops(State(_s): State<AppState>) -> impl IntoResponse {
Json(json!({
"ok": true,
"ops": super::ops::OP_NAMES,
}))
}
async fn handler_openapi(State(s): State<AppState>) -> impl IntoResponse {
let mut paths = serde_json::Map::new();
paths.insert(
"/health".to_string(),
json!({
"get": {
"summary": "Liveness probe",
"tags": ["meta"],
"responses": {
"200": {
"description": "Daemon alive",
"content": {"application/json": {"schema": {
"type": "object",
"properties": {
"ok": {"type": "boolean"},
"version": {"type": "string"},
"uptime_ms": {"type": "integer", "format": "int64"},
},
}}},
}
},
}
}),
);
paths.insert(
"/ops".to_string(),
json!({
"get": {
"summary": "List every op the daemon accepts",
"tags": ["meta"],
"responses": {
"200": {
"description": "Op list",
"content": {"application/json": {"schema": {
"type": "object",
"properties": {
"ok": {"type": "boolean"},
"ops": {"type": "array", "items": {"type": "string"}},
},
}}},
}
},
}
}),
);
paths.insert(
"/metrics".to_string(),
json!({
"get": {
"summary": "Prometheus 0.0.4 metrics exposition",
"tags": ["meta"],
"responses": {
"200": {
"description": "Plain-text Prometheus exposition",
"content": {"text/plain": {"schema": {"type": "string"}}},
}
},
}
}),
);
paths.insert(
"/openapi".to_string(),
json!({
"get": {
"summary": "This document",
"tags": ["meta"],
"responses": {
"200": {
"description": "OpenAPI 3.1 schema",
"content": {"application/json": {"schema": {"type": "object"}}},
}
},
}
}),
);
for (path, summary) in [
(
"/stream/watch",
"fsnotify SSE: subscribe to filesystem-change events for a path",
),
(
"/stream/events",
"Pub/sub SSE: subscribe to fanout events by topic pattern",
),
(
"/stream/definitions",
"Definitions SSE: stream every recorder_ingest summary as it lands",
),
] {
paths.insert(
path.to_string(),
json!({
"get": {
"summary": summary,
"tags": ["streams"],
"responses": {
"200": {
"description": "Server-Sent Events stream",
"content": {"text/event-stream": {"schema": {"type": "string"}}},
}
},
}
}),
);
}
for op in super::ops::OP_NAMES {
let key = format!("/op/{}", op);
paths.insert(
key,
json!({
"post": {
"summary": format!("Invoke op `{}`", op),
"tags": ["ops"],
"requestBody": {
"required": false,
"content": {"application/json": {"schema": {"type": "object"}}},
},
"responses": {
"200": {
"description": "Op result envelope",
"content": {"application/json": {"schema": {"type": "object"}}},
},
"400": {
"description": "Bad arguments",
"content": {"application/json": {"schema": {"$ref": "#/components/schemas/ErrPayload"}}},
},
"401": {"description": "Bearer token required or invalid"},
"403": {"description": "Bearer token lacks scope for this op"},
"404": {"description": "Unknown op"},
"500": {
"description": "Op failed",
"content": {"application/json": {"schema": {"$ref": "#/components/schemas/ErrPayload"}}},
},
},
}
}),
);
}
let security_scheme = json!({
"type": "http",
"scheme": "bearer",
"description": "Bearer token configured under [http.tokens] in zshrs-daemon.toml.",
});
let mut components = json!({
"schemas": {
"ErrPayload": {
"type": "object",
"required": ["ok", "code", "message"],
"properties": {
"ok": {"type": "boolean", "enum": [false]},
"code": {"type": "string"},
"message": {"type": "string"},
},
}
}
});
let mut top = json!({
"openapi": "3.1.0",
"info": {
"title": "zshrs-daemon",
"version": env!("CARGO_PKG_VERSION"),
"description": "Single-host daemon backing the zshrs shell. Every op accepts JSON in, returns JSON out. See docs/DAEMON.md + docs/DAEMON_AS_SERVICE.md.",
},
"paths": paths,
});
if !s.tokens.is_empty() {
components["securitySchemes"] = json!({"bearerAuth": security_scheme});
top["security"] = json!([{"bearerAuth": []}]);
}
top["components"] = components;
Json(top)
}
async fn handler_metrics(State(s): State<AppState>) -> impl IntoResponse {
s.daemon.metrics.record_http("/metrics", 200);
let body = super::metrics::prometheus_text(&s.daemon);
(
StatusCode::OK,
[(
axum::http::header::CONTENT_TYPE,
"text/plain; version=0.0.4",
)],
body,
)
}
#[derive(serde::Deserialize)]
struct WatchQuery {
path: Option<String>,
recursive: Option<bool>,
}
async fn handler_stream_watch(
State(s): State<AppState>,
headers: HeaderMap,
Query(q): Query<WatchQuery>,
) -> impl IntoResponse {
if let Err(code) = authorize(&headers, &s.tokens) {
return code.into_response();
}
let watch_id = if let Some(p) = q.path.as_deref() {
let wp = super::fsnotify::WatchedPath {
path: std::path::PathBuf::from(p),
shard_slug: format!("http-watch-{}", super::shard::hash8(p)),
source_root: p.to_string(),
kind: super::fsnotify::WatchKind::Generic,
};
match s
.daemon
.fs_watcher
.subscribe(wp, q.recursive.unwrap_or(false))
{
Ok(id) => Some(id),
Err(e) => {
tracing::warn!(?e, "stream/watch: registration failed");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "ok": false, "code": "watch_register", "msg": e.to_string() })),
)
.into_response();
}
}
} else {
None
};
let stream = sse_event_stream_with_watch(&s.daemon, watch_id, |frame| {
if let Frame::Event { event, payload } = frame {
if event == "shard_updated" {
return Some(("fs".to_string(), payload.clone()));
}
}
None
});
Sse::new(stream)
.keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
.into_response()
}
#[derive(serde::Deserialize)]
struct EventsQuery {
channel: Option<String>,
}
async fn handler_stream_events(
State(s): State<AppState>,
headers: HeaderMap,
Query(q): Query<EventsQuery>,
) -> impl IntoResponse {
if let Err(code) = authorize(&headers, &s.tokens) {
return code.into_response();
}
let pattern = q.channel.unwrap_or_else(|| "*.*".to_string());
let (tx, rx) = mpsc::unbounded_channel::<Frame>();
let pid = std::process::id() as i32;
let (client_id, _session_id) = s.daemon.register_session(
pid,
Some("http-sse".to_string()),
None,
Some("http-sse-events".to_string()),
tx,
);
let sub_args = json!({ "pattern": pattern });
if let Err(e) = super::ops::dispatch(&s.daemon, client_id, "subscribe", sub_args).await {
s.daemon.unregister_session(client_id);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "ok": false, "code": e.code, "msg": e.msg })),
)
.into_response();
}
let state_for_drop = Arc::clone(&s.daemon);
let stream = UnboundedReceiverStream::new(rx).filter_map(|frame| {
if let Frame::Event { event, payload } = frame {
if event == "match" {
return Some(Ok(Event::default().event("pub").data(payload.to_string())));
}
}
None
});
let guarded = SseGuardStream {
inner: Box::pin(stream),
state: state_for_drop,
client_id: Some(client_id),
watch_id: None,
};
Sse::new(guarded)
.keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
.into_response()
}
async fn handler_stream_definitions(
State(s): State<AppState>,
headers: HeaderMap,
) -> impl IntoResponse {
if let Err(code) = authorize(&headers, &s.tokens) {
return code.into_response();
}
let stream = sse_event_stream_with_watch(&s.daemon, None, |frame| {
if let Frame::Event { event, payload } = frame {
if event == "recorder_ingested" {
return Some(("defs".to_string(), payload.clone()));
}
}
None
});
if let Some(client_id) = stream.client_id() {
let _ = s.daemon.set_definitions_subscribed(client_id, true);
}
Sse::new(stream)
.keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
.into_response()
}
fn sse_event_stream_with_watch<F>(
state: &Arc<DaemonState>,
watch_id: Option<u64>,
pick: F,
) -> SseGuardStream<impl Stream<Item = std::result::Result<Event, Infallible>>>
where
F: Fn(&Frame) -> Option<(String, Value)> + Send + 'static,
{
let (tx, rx) = mpsc::unbounded_channel::<Frame>();
let pid = std::process::id() as i32;
let (client_id, _session_id) = state.register_session(
pid,
Some("http-sse".to_string()),
None,
Some("http-sse".to_string()),
tx,
);
let state_for_drop = Arc::clone(state);
let stream = UnboundedReceiverStream::new(rx).filter_map(move |frame| {
let pair = pick(&frame);
pair.map(|(event_name, payload)| {
Ok(Event::default().event(event_name).data(payload.to_string()))
})
});
SseGuardStream {
inner: Box::pin(stream),
state: state_for_drop,
client_id: Some(client_id),
watch_id,
}
}
struct SseGuardStream<S> {
inner: std::pin::Pin<Box<S>>,
state: Arc<DaemonState>,
client_id: Option<u64>,
watch_id: Option<u64>,
}
impl<S> SseGuardStream<S> {
fn client_id(&self) -> Option<u64> {
self.client_id
}
}
impl<S> Stream for SseGuardStream<S>
where
S: Stream<Item = std::result::Result<Event, Infallible>> + Send,
{
type Item = std::result::Result<Event, Infallible>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.inner.as_mut().poll_next(cx)
}
}
impl<S> Drop for SseGuardStream<S> {
fn drop(&mut self) {
if let Some(id) = self.watch_id.take() {
let removed = self.state.fs_watcher.unsubscribe(id);
tracing::info!(watch_id = id, removed, "sse fsnotify subscription released");
}
if let Some(cid) = self.client_id.take() {
self.state.unregister_session(cid);
tracing::info!(client_id = cid, "sse session unregistered");
}
}
}
async fn handler_op(
State(s): State<AppState>,
Path(name): Path<String>,
headers: HeaderMap,
body: Option<Json<Value>>,
) -> impl IntoResponse {
let token = match authorize(&headers, &s.tokens) {
Ok(t) => t,
Err(code) => {
return (
code,
Json(json!({
"ok": false,
"code": "unauthorized",
"msg": "missing or invalid bearer token",
})),
);
}
};
if let Some(t) = token {
let required = super::auth::op_scope(&name);
if !t.allows(required) {
return (
StatusCode::FORBIDDEN,
Json(json!({
"ok": false,
"code": "scope_denied",
"msg": format!("token `{}` lacks scope `{required}` for op `{name}`", t.label),
"required_scope": required,
"granted_scopes": t.granted_scopes(),
})),
);
}
}
let args = body.map(|Json(v)| v).unwrap_or_else(|| json!({}));
let (tx, _rx) = mpsc::unbounded_channel::<Frame>();
let pid = std::process::id() as i32;
let (client_id, _session_id) = s.daemon.register_session(
pid,
Some("http".to_string()),
None,
Some(format!("http-op:{}", name)),
tx,
);
let dispatch_result = super::ops::dispatch(&s.daemon, client_id, &name, args).await;
s.daemon.unregister_session(client_id);
let path_label = format!("/op/{name}");
match dispatch_result {
Ok(payload) => {
s.daemon.metrics.record_http(&path_label, 200);
let mut out = match payload {
Value::Object(map) => map,
other => {
let mut m = serde_json::Map::new();
m.insert("payload".to_string(), other);
m
}
};
out.insert("ok".to_string(), Value::Bool(true));
(StatusCode::OK, Json(Value::Object(out)))
}
Err(err) => {
let status = match err.code.as_str() {
"bad_args" | "bad_cron" | "bad_format" | "bad_pattern" => StatusCode::BAD_REQUEST,
"unauthorized" => StatusCode::UNAUTHORIZED,
"no_such_file" | "no_such_kind" | "no_such_function" | "unknown_op" => {
StatusCode::NOT_FOUND
}
"busy" => StatusCode::CONFLICT,
"wrong_token" => StatusCode::FORBIDDEN,
"timeout" => StatusCode::REQUEST_TIMEOUT,
_ => StatusCode::INTERNAL_SERVER_ERROR,
};
s.daemon.metrics.record_http(&path_label, status.as_u16());
(
status,
Json(json!({
"ok": false,
"code": err.code,
"msg": err.msg,
})),
)
}
}
}