use std::net::SocketAddr;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use axum::extract::State;
use axum::http::{HeaderMap, StatusCode};
use axum::response::{IntoResponse, Response};
use axum::routing::{get, post};
use axum::Router;
use serde::{Deserialize, Serialize};
use tracing::{error, info, warn};
use crate::api::client::ApiClient;
use crate::api::editor::list_bindings as api_list_bindings;
use crate::cli::commands::shared;
use crate::error::OlError;
use crate::runtime::admin_token;
use crate::runtime::reload::{ReloadInputs, SharedRoutes};
use crate::runtime::server::RuntimeContext;
use crate::update::{
self, ApplyMode, ApplyResult, ApplyStage, CheckResult, UpdateStatusKind, UpdateStatusSnapshot,
};
#[derive(Clone)]
pub struct AdminState {
pub ctx: Arc<RuntimeContext>,
pub admin_token: String,
}
pub fn build_router(state: AdminState) -> Router {
Router::new()
.route("/v1/admin/reload", post(handle_reload))
.route("/admin/update", post(handle_admin_update))
.route("/admin/update/status", get(handle_admin_update_status))
.route("/admin/tools", get(handle_admin_tools_list))
.route(
"/admin/tools/{slug}/restart",
post(handle_admin_tools_restart),
)
.route("/admin/tools/{slug}/probe", post(handle_admin_tools_probe))
.with_state(state)
}
pub async fn serve(
port: u16,
state: AdminState,
shutdown: impl std::future::Future<Output = ()> + Send + 'static,
) -> Result<SocketAddr, OlError> {
let bind: SocketAddr = ([127, 0, 0, 1], port).into();
let listener = tokio::net::TcpListener::bind(bind).await.map_err(|e| {
OlError::new(
crate::error::OL_4272_XDG_DIR_UNWRITABLE,
format!("admin bind {bind}: {e}"),
)
})?;
let local = listener.local_addr().map_err(|e| {
OlError::new(
crate::error::OL_4272_XDG_DIR_UNWRITABLE,
format!("admin local_addr: {e}"),
)
})?;
let app = build_router(state);
info!(addr = %local, "admin endpoint listening");
if let Err(e) = axum::serve(listener, app)
.with_graceful_shutdown(shutdown)
.await
{
error!(error = %e, "admin endpoint serve error");
}
Ok(local)
}
async fn handle_reload(State(state): State<AdminState>, headers: HeaderMap) -> Response {
if !verify_bearer(&state.admin_token, &headers) {
return (StatusCode::UNAUTHORIZED, "missing or invalid bearer token").into_response();
}
info!("admin reload requested");
let live = match shared::make_client().await {
Ok(client) => match refresh_live(&client).await {
Ok(rows) => Some(rows),
Err(e) => {
warn!(error = %e, "could not refresh live bindings during reload");
None
}
},
Err(e) => {
warn!(error = %e, "no API client during reload");
None
}
};
if let Some(rows) = live {
let mut guard = state.ctx.live_bindings.lock().await;
*guard = rows;
}
let live_snapshot = state.ctx.live_bindings.lock().await.clone();
let manifest_known = state.ctx.manifest_secret_ids.lock().await.clone();
let inputs = ReloadInputs {
manifest_path: &state.ctx.manifest_path,
live_bindings: &live_snapshot,
secret_store: state.ctx.secrets.as_ref(),
manifest_secret_ids_fallback: Some(manifest_known),
};
match crate::runtime::reload::reload_into(&state.ctx.routes, &inputs) {
Ok(n) => (
StatusCode::OK,
axum::Json(serde_json::json!({ "status": "reloaded", "binding_count": n })),
)
.into_response(),
Err(e) => {
warn!(error = %e, code = %e.code, "admin reload failed; routes unchanged");
(
StatusCode::CONFLICT,
axum::Json(serde_json::json!({
"status": "reload_failed",
"error": { "code": e.code.code, "message": e.message }
})),
)
.into_response()
}
}
}
#[derive(Debug, Deserialize, Default)]
#[serde(default)]
pub struct AdminUpdateRequest {
pub force_cargo_install: bool,
}
#[derive(Debug, Serialize)]
pub struct AdminUpdateResponse {
pub started: bool,
pub from: String,
pub to: String,
pub stream_url: &'static str,
}
async fn handle_admin_update(
State(state): State<AdminState>,
headers: HeaderMap,
body: Option<axum::Json<AdminUpdateRequest>>,
) -> Response {
if !verify_bearer(&state.admin_token, &headers) {
return (StatusCode::UNAUTHORIZED, "missing or invalid bearer token").into_response();
}
let req = body.map(|axum::Json(b)| b).unwrap_or_default();
let current = env!("CARGO_PKG_VERSION").to_string();
if !req.force_cargo_install
&& matches!(
update::install_state::detect_install_method(),
update::install_state::InstallMethod::CargoInstall
)
{
return (
StatusCode::CONFLICT,
axum::Json(serde_json::json!({
"error": {
"code": "OL-4258",
"message": "this daemon was installed via `cargo install` — auto-update would not take effect",
"suggestion": "Run: cargo install --force --locked openlatch-provider"
}
})),
)
.into_response();
}
let registry_origin = "https://registry.npmjs.org".to_string();
let download_timeout = Duration::from_secs(60);
let check = update::check(¤t, ®istry_origin).await;
let (latest, severity, min_supported) = match check {
CheckResult::UpToDate { current } => {
return (
StatusCode::CONFLICT,
axum::Json(serde_json::json!({
"idempotent": true,
"current": current,
"message": "already on the latest version"
})),
)
.into_response();
}
CheckResult::Failed { reason } => {
return (
StatusCode::BAD_GATEWAY,
axum::Json(serde_json::json!({
"error": { "code": "OL-4252", "message": format!("update check failed: {reason}") }
})),
)
.into_response();
}
CheckResult::Available {
latest,
severity,
min_supported,
..
} => (latest, severity, min_supported),
};
if let Some(ref min) = min_supported {
if !update::version_at_least(¤t, min) {
return (
StatusCode::PRECONDITION_FAILED,
axum::Json(serde_json::json!({
"error": {
"code": "OL-4259",
"message": format!(
"release {latest} requires provider >= {min}; manual `npm install -g @openlatch/provider@{latest}` required"
),
"latest": latest,
"min_supported": min,
}
})),
)
.into_response();
}
}
if state
.ctx
.update_in_progress
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
return (
StatusCode::SERVICE_UNAVAILABLE,
axum::Json(serde_json::json!({
"error": { "code": "OL-4253", "message": "another auto-update is already in progress" }
})),
)
.into_response();
}
{
let mut snap = state
.ctx
.update_status
.lock()
.expect("status mutex poisoned");
*snap = UpdateStatusSnapshot {
status: UpdateStatusKind::InProgress,
stage: Some(ApplyStage::Check),
from: Some(current.clone()),
to: Some(latest.clone()),
started_at: Some(update::install_state::now_rfc3339()),
ended_at: None,
error: None,
};
}
let state_for_task = state.clone();
let from = current.clone();
let to = latest.clone();
let response_to = to.clone();
tokio::spawn(async move {
let opts = update::ApplyOptions {
current_version: from,
registry_origin,
download_timeout,
force_cargo_install: req.force_cargo_install,
mode: ApplyMode::Rpc,
};
run_apply_in_daemon(state_for_task, opts, severity).await;
});
(
StatusCode::ACCEPTED,
axum::Json(AdminUpdateResponse {
started: true,
from: current,
to: response_to,
stream_url: "/admin/update/status",
}),
)
.into_response()
}
async fn handle_admin_update_status(
State(state): State<AdminState>,
headers: HeaderMap,
) -> Response {
if !verify_bearer(&state.admin_token, &headers) {
return (StatusCode::UNAUTHORIZED, "missing or invalid bearer token").into_response();
}
let snap = state
.ctx
.update_status
.lock()
.expect("status mutex poisoned")
.clone();
axum::Json(snap).into_response()
}
pub async fn run_apply_in_daemon(
state: AdminState,
opts: update::ApplyOptions,
_severity: update::Severity,
) {
let started = std::time::Instant::now();
let started_at = update::install_state::now_rfc3339();
let ctx = state.ctx.clone();
let stamp_stage = |stage: ApplyStage| {
let mut snap = ctx.update_status.lock().expect("status mutex poisoned");
snap.stage = Some(stage);
};
let mark_failed = |stage: ApplyStage, reason: String| {
let mut snap = ctx.update_status.lock().expect("status mutex poisoned");
snap.status = UpdateStatusKind::Failed;
snap.stage = Some(stage);
snap.error = Some(reason);
snap.ended_at = Some(update::install_state::now_rfc3339());
};
let mark_completed = || {
let mut snap = ctx.update_status.lock().expect("status mutex poisoned");
snap.status = UpdateStatusKind::Completed;
snap.stage = None;
snap.ended_at = Some(update::install_state::now_rfc3339());
snap.error = None;
};
let release_lock = || {
ctx.update_in_progress.store(false, Ordering::Release);
};
stamp_stage(ApplyStage::Check);
let artefacts = match update::prepare_swap_artefacts(&opts).await {
Ok(a) => a,
Err(ApplyResult::UpToDate { current }) => {
tracing::info!(
target: "update",
current = %current,
"concurrent check found us up-to-date — releasing lock"
);
mark_completed();
release_lock();
return;
}
Err(ApplyResult::RefusedCargoInstall { suggestion }) => {
mark_failed(ApplyStage::Check, suggestion);
release_lock();
return;
}
Err(ApplyResult::Failed { stage, reason }) => {
mark_failed(stage, reason);
release_lock();
return;
}
Err(ApplyResult::Applied { .. }) => unreachable!("prepare can't return Applied"),
};
stamp_stage(ApplyStage::Swap);
let _swap_handle = match update::perform_swap(&artefacts.staging_exe) {
Ok(h) => h,
Err(e) => {
mark_failed(ApplyStage::Swap, e.to_string());
release_lock();
return;
}
};
let sentinel = update::UpdateSentinel {
from: artefacts.from.clone(),
to: artefacts.to.clone(),
applied_at: started_at.clone(),
};
if let Err(e) = update::write_sentinel(&sentinel) {
let rollback = update::rollback_from_bak();
tracing::error!(
target: "update",
error = %e,
rollback_error = ?rollback.as_ref().err(),
"sentinel write failed post-swap; rolled back the swap to keep the safety net intact",
);
mark_failed(
ApplyStage::Swap,
format!("sentinel write failed post-swap: {e}"),
);
release_lock();
return;
}
stamp_stage(ApplyStage::Drain);
tracing::info!(target: "update", "draining axum prior to restart");
ctx.admin_shutdown_request.notify_waiters();
let drain_deadline = tokio::time::Instant::now() + Duration::from_secs(5);
tokio::time::sleep_until(drain_deadline).await;
tokio::time::sleep(Duration::from_secs(1)).await;
let _duration_ms = started.elapsed().as_millis().min(u128::from(u64::MAX)) as u64;
mark_completed();
stamp_stage(ApplyStage::Restart);
if let Err(e) = update::restart_into_new_binary() {
mark_failed(ApplyStage::Restart, e);
release_lock();
}
}
async fn handle_admin_tools_list(State(state): State<AdminState>, headers: HeaderMap) -> Response {
if !verify_bearer(&state.admin_token, &headers) {
return (StatusCode::UNAUTHORIZED, "missing or invalid bearer token").into_response();
}
let Some(sup) = &state.ctx.supervisor else {
return (
StatusCode::SERVICE_UNAVAILABLE,
axum::Json(serde_json::json!({
"error": { "code": "OL-4222", "message": "this daemon is not running with a supervisor" }
})),
)
.into_response();
};
let snap = sup.snapshot().await;
axum::Json(serde_json::json!({ "tools": snap })).into_response()
}
async fn handle_admin_tools_restart(
State(state): State<AdminState>,
axum::extract::Path(slug): axum::extract::Path<String>,
headers: HeaderMap,
) -> Response {
if !verify_bearer(&state.admin_token, &headers) {
return (StatusCode::UNAUTHORIZED, "missing or invalid bearer token").into_response();
}
let Some(sup) = &state.ctx.supervisor else {
return (
StatusCode::SERVICE_UNAVAILABLE,
axum::Json(serde_json::json!({
"error": { "code": "OL-4222", "message": "this daemon is not running with a supervisor" }
})),
)
.into_response();
};
let binding_id = match sup.resolve_slug(&slug).await {
Some(id) => id,
None => {
return (
StatusCode::NOT_FOUND,
axum::Json(serde_json::json!({
"error": { "code": "OL-4222", "message": format!("no managed tool with slug `{slug}`") }
})),
)
.into_response();
}
};
match sup.restart(&binding_id).await {
Ok(()) => (
StatusCode::OK,
axum::Json(serde_json::json!({ "status": "restarted", "binding_id": binding_id })),
)
.into_response(),
Err(e) => (
StatusCode::CONFLICT,
axum::Json(serde_json::json!({
"error": { "code": e.code.code, "message": e.message }
})),
)
.into_response(),
}
}
async fn handle_admin_tools_probe(
State(state): State<AdminState>,
axum::extract::Path(slug): axum::extract::Path<String>,
headers: HeaderMap,
) -> Response {
if !verify_bearer(&state.admin_token, &headers) {
return (StatusCode::UNAUTHORIZED, "missing or invalid bearer token").into_response();
}
let Some(sup) = &state.ctx.supervisor else {
return (
StatusCode::SERVICE_UNAVAILABLE,
axum::Json(serde_json::json!({
"error": { "code": "OL-4222", "message": "this daemon is not running with a supervisor" }
})),
)
.into_response();
};
let binding_id = match sup.resolve_slug(&slug).await {
Some(id) => id,
None => {
return (
StatusCode::NOT_FOUND,
axum::Json(serde_json::json!({
"error": { "code": "OL-4222", "message": format!("no managed tool with slug `{slug}`") }
})),
)
.into_response();
}
};
match sup.probe(&binding_id).await {
Ok(()) => (
StatusCode::OK,
axum::Json(serde_json::json!({ "status": "ok", "binding_id": binding_id })),
)
.into_response(),
Err(e) => (
StatusCode::SERVICE_UNAVAILABLE,
axum::Json(serde_json::json!({
"error": { "code": e.code.code, "message": e.message }
})),
)
.into_response(),
}
}
fn verify_bearer(expected: &str, headers: &HeaderMap) -> bool {
let Some(auth) = headers.get("authorization").and_then(|v| v.to_str().ok()) else {
return false;
};
let Some(presented) = auth.strip_prefix("Bearer ") else {
return false;
};
admin_token::matches(expected, presented)
}
async fn refresh_live(
client: &ApiClient,
) -> Result<Vec<crate::api::editor::EditorBindingRow>, OlError> {
api_list_bindings(client).await
}
pub fn swap_routes(shared: &SharedRoutes, table: crate::runtime::multi_tool::RouteTable) {
shared.store(table);
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn verify_bearer_matches_only_with_correct_prefix_and_value() {
let mut h = HeaderMap::new();
h.insert("authorization", "Bearer secret".parse().unwrap());
assert!(verify_bearer("secret", &h));
assert!(!verify_bearer("other", &h));
let mut bad = HeaderMap::new();
bad.insert("authorization", "Token secret".parse().unwrap());
assert!(!verify_bearer("secret", &bad));
}
#[test]
fn verify_bearer_returns_false_when_header_absent() {
let h = HeaderMap::new();
assert!(!verify_bearer("anything", &h));
}
}