mod auth;
pub(crate) use auth::{BasicAuth, DEFAULT_ADMIN_PASSWORD, DEFAULT_ADMIN_USER};
use std::collections::HashMap;
use std::sync::Arc;
use axum::extract::{Query, State};
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use axum::routing::{get, post};
use axum::{Json, Router, middleware};
use daemon::{IndexName, Phase, Status};
use prometheus::{Registry, TextEncoder};
use tokio::net::TcpListener;
use tokio::sync::{mpsc, oneshot};
#[derive(Clone, Debug)]
pub(crate) struct PublicState {
pub status: Arc<Status>,
pub registry: Option<Registry>,
}
pub(crate) async fn serve(
surface: &'static str,
listener: TcpListener,
router: Router,
shutdown: oneshot::Receiver<()>,
) {
if let Ok(addr) = listener.local_addr() {
tracing::info!(%addr, surface, "HTTP surface listening");
}
let graceful = async move {
let _ = shutdown.await;
};
if let Err(error) = axum::serve(listener, router)
.with_graceful_shutdown(graceful)
.await
{
tracing::error!(%error, surface, "HTTP server stopped on error");
}
}
pub(crate) fn public_router(state: PublicState) -> Router {
Router::new()
.route("/healthz", get(healthz))
.route("/readyz", get(readyz))
.route("/status", get(status))
.route("/metrics", get(metrics))
.with_state(state)
}
#[derive(Clone, Debug)]
pub(crate) struct PrivateState {
pub status: Arc<Status>,
pub reindex: mpsc::Sender<IndexName>,
}
pub(crate) fn private_router(state: PrivateState, basic_auth: Arc<BasicAuth>) -> Router {
Router::new()
.route("/indexes", get(indexes))
.route("/reindex", post(reindex))
.layer(middleware::from_fn_with_state(
basic_auth,
auth::require_basic_auth,
))
.with_state(state)
}
async fn healthz() -> impl IntoResponse {
StatusCode::OK
}
async fn readyz(State(state): State<PublicState>) -> impl IntoResponse {
match state.status.snapshot().phase {
Phase::Backfilling | Phase::Live => StatusCode::OK,
Phase::Starting | Phase::Stopped => StatusCode::SERVICE_UNAVAILABLE,
}
}
async fn status(State(state): State<PublicState>) -> impl IntoResponse {
Json(state.status.snapshot())
}
async fn metrics(State(state): State<PublicState>) -> impl IntoResponse {
let Some(registry) = state.registry else {
return (
StatusCode::NOT_FOUND,
"metrics are not enabled\n".to_owned(),
);
};
match TextEncoder::new().encode_to_string(®istry.gather()) {
Ok(text) => (StatusCode::OK, text),
Err(error) => (
StatusCode::INTERNAL_SERVER_ERROR,
format!("failed to encode metrics: {error}\n"),
),
}
}
async fn indexes(State(state): State<PrivateState>) -> impl IntoResponse {
Json(state.status.snapshot().indexes)
}
async fn reindex(
State(state): State<PrivateState>,
Query(params): Query<HashMap<String, String>>,
) -> Response {
let Some(raw) = params.get("index") else {
return (
StatusCode::BAD_REQUEST,
"missing query parameter ?index=<name>\n",
)
.into_response();
};
let Ok(index) = IndexName::try_new(raw.clone()) else {
return (
StatusCode::BAD_REQUEST,
format!("invalid index name {raw:?}\n"),
)
.into_response();
};
if !state.status.snapshot().indexes.contains_key(index.as_ref()) {
return (
StatusCode::NOT_FOUND,
format!("unknown index {}\n", index.as_ref()),
)
.into_response();
}
match state.reindex.try_send(index.clone()) {
Ok(()) => (
StatusCode::ACCEPTED,
format!("reindex of {} queued\n", index.as_ref()),
)
.into_response(),
Err(_) => (
StatusCode::SERVICE_UNAVAILABLE,
"reindex queue is full or closed\n".to_owned(),
)
.into_response(),
}
}