use axum::{
extract::{Path as AxPath, State},
http::{header, HeaderMap, HeaderValue, Method, StatusCode},
response::{IntoResponse, Response},
};
use crate::{
audit_broken, audit_not_applicable, audit_valid, bad_request, decimal_header_value, df_body,
du_body,
engine::{Engine, EngineError},
engine_introspection::{AuditVerify, PoolSnapshot, WorldUsage},
engine_types::ValidatedWorldPath,
insufficient_storage, method_not_allowed, not_found, options_response, proc_text_response,
server::ServerState,
server_error, storage_temporarily_unavailable, to_header_map, unauthorized, world_list_body,
VERSION,
};
pub(crate) const ROOT_ALLOW: &str = "GET, HEAD, OPTIONS";
pub(crate) const PROC_ALLOW: &str = "GET, HEAD, OPTIONS";
pub(crate) const AUDIT_VERIFY_ALLOW: &str = "GET, HEAD, OPTIONS";
pub(crate) async fn root_hint(method: Method) -> Response {
let body = format!("elastik-core {VERSION} (rust)\ntry: curl /proc/worlds\n");
match method {
Method::GET => (
StatusCode::OK,
[(header::CONTENT_TYPE, "text/plain; charset=utf-8")],
body,
)
.into_response(),
Method::HEAD => (
StatusCode::OK,
to_header_map(vec![
(
header::CONTENT_TYPE,
HeaderValue::from_static("text/plain; charset=utf-8"),
),
(header::CONTENT_LENGTH, decimal_header_value(body.len())),
]),
"",
)
.into_response(),
Method::OPTIONS => options_response(ROOT_ALLOW),
_ => method_not_allowed(ROOT_ALLOW),
}
}
pub(crate) async fn proc_version(method: Method) -> Response {
let body = format!("elastik-core {VERSION} (rust)\n");
match method {
Method::GET => (
StatusCode::OK,
[(header::CONTENT_TYPE, "text/plain; charset=utf-8")],
body,
)
.into_response(),
Method::HEAD => (
StatusCode::OK,
to_header_map(vec![
(
header::CONTENT_TYPE,
HeaderValue::from_static("text/plain; charset=utf-8"),
),
(header::CONTENT_LENGTH, decimal_header_value(body.len())),
]),
"",
)
.into_response(),
Method::OPTIONS => options_response(PROC_ALLOW),
_ => method_not_allowed(PROC_ALLOW),
}
}
pub(crate) async fn proc_worlds(
State(state): State<ServerState>,
method: Method,
headers: HeaderMap,
) -> Response {
if method == Method::OPTIONS {
return options_response(PROC_ALLOW);
}
if method != Method::GET && method != Method::HEAD {
return method_not_allowed(PROC_ALLOW);
}
let tier = state.access_tier_from_headers(&headers);
let engine = state.engine().clone();
let names = match run_introspection(engine, "proc worlds", move |engine| {
engine.list_worlds(tier)
})
.await
{
Ok(names) => names,
Err(resp) => return *resp,
};
let body = world_list_body_from_paths(&names);
let mut resp_headers = vec![(
header::CONTENT_TYPE,
HeaderValue::from_static("text/plain; charset=utf-8"),
)];
if method == Method::HEAD {
resp_headers.push((header::CONTENT_LENGTH, decimal_header_value(body.len())));
}
(
StatusCode::OK,
to_header_map(resp_headers),
if method == Method::HEAD {
String::new()
} else {
body
},
)
.into_response()
}
pub(crate) async fn proc_du(
State(state): State<ServerState>,
method: Method,
headers: HeaderMap,
) -> Response {
if method == Method::OPTIONS {
return options_response(PROC_ALLOW);
}
if method != Method::GET && method != Method::HEAD {
return method_not_allowed(PROC_ALLOW);
}
let tier = state.access_tier_from_headers(&headers);
let engine = state.engine().clone();
let sizes = match run_introspection(engine, "proc du", move |engine| engine.du(tier)).await {
Ok(sizes) => sizes,
Err(resp) => return *resp,
};
let body = du_body_from_usage(&sizes);
proc_text_response(method, body)
}
pub(crate) async fn proc_df(
State(state): State<ServerState>,
method: Method,
headers: HeaderMap,
) -> Response {
if method == Method::OPTIONS {
return options_response(PROC_ALLOW);
}
if method != Method::GET && method != Method::HEAD {
return method_not_allowed(PROC_ALLOW);
}
let tier = state.access_tier_from_headers(&headers);
let engine = state.engine().clone();
let snapshot = match run_introspection(engine, "proc df", move |engine| engine.df(tier)).await {
Ok(snapshot) => snapshot,
Err(resp) => return *resp,
};
let body = df_body(
snapshot.storage_used,
snapshot.storage_quota,
snapshot.memory_used,
snapshot.memory_quota,
snapshot.worlds,
);
proc_text_response(method, body)
}
pub(crate) async fn proc_pool(
State(state): State<ServerState>,
method: Method,
headers: HeaderMap,
) -> Response {
if method == Method::OPTIONS {
return options_response(PROC_ALLOW);
}
if method != Method::GET && method != Method::HEAD {
return method_not_allowed(PROC_ALLOW);
}
let tier = state.access_tier_from_headers(&headers);
let engine = state.engine().clone();
let snapshot =
match run_introspection(engine, "proc pool", move |engine| engine.pool(tier)).await {
Ok(snapshot) => snapshot,
Err(resp) => return *resp,
};
let body = pool_body(&snapshot);
proc_text_response(method, body)
}
pub(crate) async fn proc_audit_verify(
State(state): State<ServerState>,
method: Method,
AxPath(audit_path): AxPath<String>,
headers: HeaderMap,
) -> Response {
if method == Method::OPTIONS {
return options_response(AUDIT_VERIFY_ALLOW);
}
if method != Method::GET && method != Method::HEAD {
return method_not_allowed(AUDIT_VERIFY_ALLOW);
}
let Some(raw_world) = audit_path.strip_suffix("/verify") else {
return not_found();
};
let raw_world = raw_world.trim_end_matches('/');
if raw_world.is_empty() {
return bad_request("audit verify requires a world path");
}
let world = match ValidatedWorldPath::new(crate::canonicalize_path(raw_world)) {
Ok(world) => world,
Err(_) => return bad_request("invalid audit verify world path"),
};
let tier = state.access_tier_from_headers(&headers);
let engine = state.engine().clone();
let verify_result = match run_introspection(engine, "audit verify", move |engine| {
engine.verify_audit(&world, tier)
})
.await
{
Ok(result) => result,
Err(resp) => return *resp,
};
match verify_result {
AuditVerify::Valid(report) => audit_valid(report),
AuditVerify::Broken(report) => audit_broken(report),
AuditVerify::NotApplicable => audit_not_applicable(),
#[cfg(not(test))]
_ => server_error("unknown audit verification result".to_string()),
}
}
pub(crate) async fn proc_reserved(method: Method) -> Response {
match method {
Method::OPTIONS => options_response(PROC_ALLOW),
Method::GET | Method::HEAD => not_found(),
_ => method_not_allowed(PROC_ALLOW),
}
}
async fn run_introspection<T, F>(
engine: Engine,
scope: &'static str,
f: F,
) -> Result<T, Box<Response>>
where
T: Send + 'static,
F: FnOnce(&Engine) -> Result<T, EngineError> + Send + 'static,
{
match tokio::task::spawn_blocking(move || f(&engine)).await {
Ok(Ok(value)) => Ok(value),
Ok(Err(err)) => Err(Box::new(proc_engine_error(scope, err))),
Err(_) => Err(Box::new(server_error(format!("{scope} worker failed")))),
}
}
fn proc_engine_error(scope: &'static str, err: EngineError) -> Response {
match err {
EngineError::Auth(_) => unauthorized("read requires read token"),
EngineError::NotFound => not_found(),
EngineError::TransientStorage { .. } | EngineError::ShuttingDown => {
storage_temporarily_unavailable()
}
EngineError::InsufficientStorage { .. } => insufficient_storage(),
EngineError::Storage { .. } => server_error(format!("{scope} storage failure")),
EngineError::InvalidWorldName => bad_request("invalid world path"),
EngineError::InternalInvariant(message) => {
server_error(format!("{scope} internal invariant: {message}"))
}
EngineError::PayloadTooLarge { .. }
| EngineError::AppendOnly
| EngineError::PreconditionFailed { .. }
| EngineError::QuotaExceeded { .. }
| EngineError::SubscriptionLimit => {
server_error(format!("unexpected {scope} engine error"))
}
#[cfg(not(test))]
_ => server_error(format!("unknown {scope} engine error")),
}
}
fn world_list_body_from_paths(names: &[ValidatedWorldPath]) -> String {
let names: Vec<String> = names
.iter()
.map(|world| world.as_str().to_owned())
.collect();
world_list_body(&names)
}
fn du_body_from_usage(sizes: &[WorldUsage]) -> String {
let sizes: Vec<(String, usize)> = sizes
.iter()
.map(|usage| (usage.world.as_str().to_owned(), usage.bytes))
.collect();
du_body(&sizes)
}
fn pool_body(snapshot: &PoolSnapshot) -> String {
format!(
"read_cache_entries {} snapshot\n\
read_cache_tombstones {} snapshot\n\
read_cache_hits {} counter\n\
read_cache_misses {} counter\n\
read_cache_capped {} counter\n\
read_cache_open_fails {} counter\n\
read_cache_max_entries {} snapshot\n\
ledger_writer_inits {} counter\n",
snapshot.read_cache_entries,
snapshot.read_cache_tombstones,
snapshot.read_cache_hits,
snapshot.read_cache_misses,
snapshot.read_cache_capped,
snapshot.read_cache_open_fails,
snapshot.read_cache_max_entries,
snapshot.ledger_writer_inits
)
}