use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Instant;
use axum::{
body::Bytes,
http::{HeaderMap, Method, StatusCode},
response::Response,
};
use crate::{
bad_request, canonicalize_path,
engine_types::{AccessTier, ValidatedWorldPath},
method_not_allowed,
server::ServerState,
AuthGate, WORLD_ALLOW,
};
#[derive(Clone, Copy)]
pub(crate) struct RequestId(pub(crate) u64);
#[allow(dead_code)]
pub(crate) enum Phase {
Received {
method: Method,
path: String,
headers: HeaderMap,
body: Bytes,
},
Authenticated {
method: Method,
path: String,
headers: HeaderMap,
body: Bytes,
tier: AccessTier,
},
PathValidated {
method: Method,
headers: HeaderMap,
body: Bytes,
tier: AccessTier,
world: ValidatedWorldPath,
},
Dispatched {
verb: Verb,
headers: HeaderMap,
body: Bytes,
tier: AccessTier,
world: ValidatedWorldPath,
},
ExecutedRead(Response),
CommittedWrite(Response),
Done(Response),
Error {
resp: Response,
reason: ErrorReason,
},
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub(crate) enum Verb {
Get,
Head,
Put,
Post,
Delete,
}
#[allow(dead_code)]
#[derive(Debug)]
pub(crate) enum ErrorReason {
Auth(AuthGate),
PathInvalid(&'static str),
MethodNotAllowed,
NotFound,
PreconditionFailed,
RangeNotSatisfiable,
PayloadTooLarge,
QuotaExceeded,
InsufficientStorage,
StorageRead,
StorageWriteAudit,
#[allow(dead_code)]
AuditChainBroken,
}
static PIPELINE_TRACE: AtomicBool = AtomicBool::new(false);
pub(crate) fn init_trace_from_env() {
let enabled = matches!(
std::env::var("ELASTIK_TRACE_PIPELINE").as_deref(),
Ok("1") | Ok("true") | Ok("yes") | Ok("on")
);
PIPELINE_TRACE.store(enabled, Ordering::Relaxed);
if enabled {
eprintln!("elastik-core: pipeline trace ENABLED via ELASTIK_TRACE_PIPELINE");
}
}
pub(crate) struct TraceCtx {
req_id: u64,
started: Instant,
enabled: bool,
}
impl TraceCtx {
pub(crate) fn new(req_id: u64, started: Instant) -> Self {
Self {
req_id,
started,
enabled: PIPELINE_TRACE.load(Ordering::Relaxed),
}
}
#[cfg(test)]
pub(crate) fn disabled() -> Self {
Self {
req_id: 0,
started: Instant::now(),
enabled: false,
}
}
pub(crate) fn emit_phase(&self, phase: &Phase) {
if !self.enabled {
return;
}
let elapsed_ms = self.started.elapsed().as_secs_f64() * 1000.0;
eprintln!(
"[req-{:<3} +{:>7.3}ms] {}",
self.req_id,
elapsed_ms,
phase_summary(phase)
);
}
#[allow(dead_code)]
pub(crate) fn emit_aux(&self, label: &str) {
if !self.enabled {
return;
}
let elapsed_ms = self.started.elapsed().as_secs_f64() * 1000.0;
eprintln!(
"[req-{:<3} +{:>7.3}ms] aux {}",
self.req_id, elapsed_ms, label
);
}
#[allow(dead_code)]
pub(crate) fn emit_aux_kv(&self, label: &str, kv: &str) {
if !self.enabled {
return;
}
let elapsed_ms = self.started.elapsed().as_secs_f64() * 1000.0;
eprintln!(
"[req-{:<3} +{:>7.3}ms] aux {} {}",
self.req_id, elapsed_ms, label, kv
);
}
fn emit_done(&self, resp: &Response) {
if !self.enabled {
return;
}
let total_ms = self.started.elapsed().as_secs_f64() * 1000.0;
eprintln!(
"[req-{:<3} +{:>7.3}ms] Done status={} total={:.3}ms",
self.req_id,
total_ms,
resp.status(),
total_ms
);
}
fn emit_error(&self, reason: &ErrorReason, status: StatusCode) {
if !self.enabled {
return;
}
let elapsed_ms = self.started.elapsed().as_secs_f64() * 1000.0;
eprintln!(
"[req-{:<3} +{:>7.3}ms] Error status={} reason={:?}",
self.req_id, elapsed_ms, status, reason
);
}
}
fn phase_summary(p: &Phase) -> String {
match p {
Phase::Received {
method, path, body, ..
} => format!("Received {method} {path} {}B", body.len()),
Phase::Authenticated { tier, .. } => format!("Authenticated tier={tier:?}"),
Phase::PathValidated { world, .. } => format!("PathValidated world={world}"),
Phase::Dispatched { verb, .. } => format!("Dispatched verb={verb:?}"),
Phase::ExecutedRead(resp) => format!("ExecutedRead status={}", resp.status()),
Phase::CommittedWrite(resp) => format!("CommittedWrite status={}", resp.status()),
Phase::Done(resp) => format!("Done status={}", resp.status()),
Phase::Error { resp, reason } => {
format!("Error status={} reason={reason:?}", resp.status())
}
}
}
fn authenticate(
method: Method,
path: String,
headers: HeaderMap,
body: Bytes,
tier: AccessTier,
) -> Phase {
Phase::Authenticated {
method,
path,
headers,
body,
tier,
}
}
fn validate_path(
method: Method,
path: String,
headers: HeaderMap,
body: Bytes,
tier: AccessTier,
) -> Phase {
let world = canonicalize_path(&path);
if let Err(reason) = crate::validate_world_name(&world) {
return Phase::Error {
resp: bad_request(reason),
reason: ErrorReason::PathInvalid(reason),
};
}
let world = match ValidatedWorldPath::new(world) {
Ok(world) => world,
Err(_) => {
let reason = "world path missing canonical namespace prefix";
return Phase::Error {
resp: bad_request(reason),
reason: ErrorReason::PathInvalid(reason),
};
}
};
Phase::PathValidated {
method,
headers,
body,
tier,
world,
}
}
fn dispatch(
method: Method,
headers: HeaderMap,
body: Bytes,
tier: AccessTier,
world: ValidatedWorldPath,
) -> Phase {
let verb = match method {
Method::GET => Verb::Get,
Method::HEAD => Verb::Head,
Method::PUT => Verb::Put,
Method::POST => Verb::Post,
Method::DELETE => Verb::Delete,
_ => {
return Phase::Error {
resp: method_not_allowed(WORLD_ALLOW),
reason: ErrorReason::MethodNotAllowed,
};
}
};
Phase::Dispatched {
verb,
headers,
body,
tier,
world,
}
}
pub(crate) async fn run(
method: Method,
path: String,
headers: HeaderMap,
body: Bytes,
state: &ServerState,
req_id: u64,
) -> Response {
let trace = TraceCtx::new(req_id, Instant::now());
let mut phase = Phase::Received {
method,
path,
headers,
body,
};
trace.emit_phase(&phase);
loop {
phase = match phase {
Phase::Received {
method,
path,
headers,
body,
} => {
let tier = state.access_tier_from_headers(&headers);
authenticate(method, path, headers, body, tier)
}
Phase::Authenticated {
method,
path,
headers,
body,
tier,
} => validate_path(method, path, headers, body, tier),
Phase::PathValidated {
method,
headers,
body,
tier,
world,
} => dispatch(method, headers, body, tier, world),
Phase::Dispatched {
verb,
headers,
body,
tier,
world,
} => crate::handler::execute(verb, headers, body, tier, world, state, &trace).await,
Phase::ExecutedRead(resp) | Phase::CommittedWrite(resp) => Phase::Done(resp),
Phase::Done(resp) => {
trace.emit_done(&resp);
return resp;
}
Phase::Error { resp, reason } => {
trace.emit_error(&reason, resp.status());
trace.emit_done(&resp);
return resp;
}
};
if !matches!(phase, Phase::Done(_) | Phase::Error { .. }) {
trace.emit_phase(&phase);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use axum::http::{header, HeaderValue};
fn header_map_with_auth(value: &str) -> HeaderMap {
let mut h = HeaderMap::new();
h.insert(header::AUTHORIZATION, HeaderValue::from_str(value).unwrap());
h
}
fn bearer(token: &str) -> String {
format!("{} {token}", "Bearer")
}
#[test]
fn authenticate_no_auth_header_yields_anon_tier() {
let phase = authenticate(
Method::GET,
"/home/foo".into(),
HeaderMap::new(),
Bytes::new(),
AccessTier::Anon,
);
match phase {
Phase::Authenticated { tier, .. } => assert_eq!(tier, AccessTier::Anon),
_ => panic!("expected Authenticated phase"),
}
}
#[test]
fn authenticate_valid_bearer_yields_write_tier() {
let phase = authenticate(
Method::PUT,
"/home/foo".into(),
header_map_with_auth(&bearer("writer")),
Bytes::from_static(b"hi"),
AccessTier::Write,
);
match phase {
Phase::Authenticated { tier, .. } => assert_eq!(tier, AccessTier::Write),
_ => panic!("expected Authenticated phase"),
}
}
#[test]
fn authenticate_unrecognized_token_falls_back_to_anon() {
let phase = authenticate(
Method::PUT,
"/home/foo".into(),
header_map_with_auth(&bearer("wrong")),
Bytes::new(),
AccessTier::Anon,
);
match phase {
Phase::Authenticated { tier, .. } => assert_eq!(tier, AccessTier::Anon),
_ => panic!("expected Authenticated phase"),
}
}
#[test]
fn validate_path_canonicalizes_bare_to_home_namespace() {
let phase = validate_path(
Method::GET,
"/foo".into(),
HeaderMap::new(),
Bytes::new(),
AccessTier::Anon,
);
match phase {
Phase::PathValidated { world, .. } => assert_eq!(world.as_str(), "home/foo"),
_ => panic!("expected PathValidated"),
}
}
#[test]
fn validate_path_keeps_explicit_namespaces() {
let phase = validate_path(
Method::GET,
"/etc/foo".into(),
HeaderMap::new(),
Bytes::new(),
AccessTier::Approve,
);
match phase {
Phase::PathValidated { world, .. } => assert_eq!(world.as_str(), "etc/foo"),
_ => panic!("expected PathValidated"),
}
}
#[test]
fn validate_path_rejects_dot_segments_with_pathinvalid() {
let phase = validate_path(
Method::PUT,
"/home/../etc/secret".into(),
HeaderMap::new(),
Bytes::new(),
AccessTier::Write,
);
match phase {
Phase::Error {
reason: ErrorReason::PathInvalid(_),
resp,
} => assert_eq!(resp.status(), StatusCode::BAD_REQUEST),
_ => panic!("expected Error::PathInvalid"),
}
}
#[test]
fn validate_path_rejects_reserved_namespace_root() {
let phase = validate_path(
Method::PUT,
"/home".into(),
HeaderMap::new(),
Bytes::new(),
AccessTier::Write,
);
match phase {
Phase::Error {
reason: ErrorReason::PathInvalid(_),
..
} => {}
_ => panic!("expected Error::PathInvalid for reserved root"),
}
}
#[test]
fn validate_path_rejects_percent_encoded_dot_segment() {
let phase = validate_path(
Method::GET,
"/home/%2E%2E/etc/secret".into(),
HeaderMap::new(),
Bytes::new(),
AccessTier::Read,
);
assert!(matches!(
phase,
Phase::Error {
reason: ErrorReason::PathInvalid(_),
..
}
));
}
#[test]
fn dispatch_maps_get_to_verb_get() {
let phase = dispatch(
Method::GET,
HeaderMap::new(),
Bytes::new(),
AccessTier::Anon,
ValidatedWorldPath::new("home/foo").unwrap(),
);
match phase {
Phase::Dispatched { verb, .. } => assert_eq!(verb, Verb::Get),
_ => panic!("expected Dispatched"),
}
}
#[test]
fn dispatch_rejects_patch_with_method_not_allowed() {
let phase = dispatch(
Method::PATCH,
HeaderMap::new(),
Bytes::new(),
AccessTier::Write,
ValidatedWorldPath::new("home/foo").unwrap(),
);
match phase {
Phase::Error {
reason: ErrorReason::MethodNotAllowed,
resp,
} => {
assert_eq!(resp.status(), StatusCode::METHOD_NOT_ALLOWED);
let allow = resp
.headers()
.get(header::ALLOW)
.and_then(|v| v.to_str().ok())
.unwrap_or("");
assert!(allow.contains("GET"));
assert!(allow.contains("PUT"));
assert!(allow.contains("OPTIONS"));
}
_ => panic!("expected Error::MethodNotAllowed"),
}
}
#[test]
fn dispatch_maps_all_five_supported_verbs() {
let cases = [
(Method::GET, Verb::Get),
(Method::HEAD, Verb::Head),
(Method::PUT, Verb::Put),
(Method::POST, Verb::Post),
(Method::DELETE, Verb::Delete),
];
for (method, expected_verb) in cases {
let display = method.clone();
let phase = dispatch(
method,
HeaderMap::new(),
Bytes::new(),
AccessTier::Anon,
ValidatedWorldPath::new("home/x").unwrap(),
);
match phase {
Phase::Dispatched { verb, .. } => assert_eq!(verb, expected_verb),
_ => panic!("expected Dispatched for {display}"),
}
}
}
#[test]
fn trace_ctx_disabled_emits_nothing() {
let ctx = TraceCtx::disabled();
let phase = Phase::Received {
method: Method::GET,
path: "/home/foo".into(),
headers: HeaderMap::new(),
body: Bytes::new(),
};
ctx.emit_phase(&phase);
ctx.emit_aux("noop");
ctx.emit_aux_kv("noop", "key=value");
}
#[test]
fn phase_summary_distinguishes_terminal_variants() {
use axum::body::Body;
let resp_ok = Response::builder().status(200).body(Body::empty()).unwrap();
let resp_err = Response::builder().status(401).body(Body::empty()).unwrap();
assert!(phase_summary(&Phase::Done(resp_ok)).starts_with("Done"));
assert!(phase_summary(&Phase::Error {
resp: resp_err,
reason: ErrorReason::Auth(AuthGate::Read),
})
.starts_with("Error"));
}
#[test]
fn init_trace_from_env_off_by_default() {
let prior = std::env::var("ELASTIK_TRACE_PIPELINE").ok();
std::env::remove_var("ELASTIK_TRACE_PIPELINE");
init_trace_from_env();
assert!(!PIPELINE_TRACE.load(Ordering::Relaxed));
if let Some(v) = prior {
std::env::set_var("ELASTIK_TRACE_PIPELINE", v);
}
init_trace_from_env();
}
}