mod audit;
mod auth;
mod coap;
mod config;
mod handler;
mod http_semantics;
mod ledger;
mod listen;
mod middleware;
mod path;
mod pipeline;
mod proc;
mod read_cache;
mod response;
mod route;
mod state;
mod store;
mod world;
pub(crate) use crate::path::*;
pub(crate) use crate::pipeline::*;
pub(crate) use crate::proc::*;
pub(crate) use crate::response::*;
pub(crate) use crate::state::*;
use std::collections::VecDeque;
use std::net::IpAddr;
use std::path::PathBuf;
use std::sync::{
atomic::{AtomicBool, AtomicU64, AtomicUsize},
Arc, Mutex as StdMutex,
};
use dashmap::DashMap;
use tokio::sync::{broadcast, watch, Semaphore};
use crate::config::{
coap_bind_from_env, env_nonzero_usize, env_optional_usize, env_usize, hmac_key_from_env_value,
listen_addr, should_warn_public_read, DEFAULT_COAP_MAX_IN_FLIGHT, DEFAULT_LISTEN_REPLAY_MAX,
DEFAULT_MAX_LISTEN_CONNECTIONS, DEFAULT_MAX_MEMORY_BYTES, DEFAULT_MAX_WORLD_BYTES,
};
pub(crate) const VERSION: &str = env!("CARGO_PKG_VERSION");
pub(crate) const WORLD_ALLOW: &str = "GET, HEAD, PUT, POST, DELETE, OPTIONS";
#[tokio::main]
async fn main() {
pipeline::init_trace_from_env();
let host = std::env::var("ELASTIK_HOST").unwrap_or_else(|_| "127.0.0.1".into());
let port: u16 = std::env::var("ELASTIK_PORT")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(3105);
let coap_bind = coap_bind_from_env();
let data = PathBuf::from(std::env::var("ELASTIK_DATA").unwrap_or_else(|_| "./data".into()));
let max_world_bytes = env_usize("ELASTIK_MAX_WORLD_BYTES", DEFAULT_MAX_WORLD_BYTES);
let max_memory_bytes = env_usize("ELASTIK_MAX_MEMORY_BYTES", DEFAULT_MAX_MEMORY_BYTES);
let max_storage_bytes = env_optional_usize("ELASTIK_MAX_STORAGE_BYTES");
let max_listen_connections = env_nonzero_usize(
"ELASTIK_MAX_LISTEN_CONNECTIONS",
DEFAULT_MAX_LISTEN_CONNECTIONS,
);
let listen_replay_max =
env_nonzero_usize("ELASTIK_LISTEN_REPLAY_MAX", DEFAULT_LISTEN_REPLAY_MAX);
let coap_max_in_flight =
env_nonzero_usize("ELASTIK_COAP_MAX_IN_FLIGHT", DEFAULT_COAP_MAX_IN_FLIGHT);
let read_cache_max_entries = env_nonzero_usize(
"ELASTIK_READ_CACHE_MAX_ENTRIES",
crate::read_cache::DEFAULT_READ_CACHE_MAX_ENTRIES,
);
std::fs::create_dir_all(&data).expect("create data dir");
let durable_sizes = world::sizes(&data).expect("read durable storage usage");
let storage_body_bytes = durable_sizes.iter().map(|(_, size)| *size).sum();
let durable_world_count = durable_sizes.len();
let delete_ledger_created = durable_sizes
.iter()
.any(|(world_name, _)| world_name == "var/log/deletes");
let hmac_key = hmac_key_from_env_value(std::env::var("ELASTIK_KEY").ok()).expect(
"ELASTIK_KEY must be a non-empty string; the audit chain has no meaning without it",
);
let (events, _) = broadcast::channel(listen_replay_max);
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let state = Arc::new(Core {
data,
tokens: auth::Tokens::from_env(),
hmac_key,
mem: Arc::new(store::MemoryStore::new()),
max_world_bytes,
max_memory_bytes,
max_storage_bytes,
storage_body_bytes: Arc::new(AtomicUsize::new(storage_body_bytes)),
durable_world_count: Arc::new(AtomicUsize::new(durable_world_count)),
delete_ledger_created: Arc::new(AtomicBool::new(delete_ledger_created)),
events,
listen_slots: Arc::new(Semaphore::new(max_listen_connections)),
listen_replay_max,
event_log: Arc::new(StdMutex::new(VecDeque::with_capacity(listen_replay_max))),
shutdown: shutdown_rx.clone(),
next_event: Arc::new(AtomicU64::new(0)),
next_request: Arc::new(AtomicU64::new(0)),
world_locks: Arc::new(DashMap::new()),
ledger: Arc::new(crate::ledger::LedgerWriter::new()),
read_cache: Arc::new(crate::read_cache::ReadCache::new(read_cache_max_entries)),
persist_header_allowlist: Arc::new(crate::config::header_allowlist_from_env()),
persist_header_user_deny: Arc::new(crate::config::header_user_deny_from_env()),
});
let addr = listen_addr(&host, port);
let listener = tokio::net::TcpListener::bind(&addr).await.expect("bind");
let bind_ip = listener
.local_addr()
.map(|addr| addr.ip())
.unwrap_or_else(|_| IpAddr::from([127, 0, 0, 1]));
eprintln!("elastik-core v{VERSION} on http://{addr}/");
print_auth_summary(&state.tokens, bind_ip);
if let Some((coap_host, coap_port)) = coap_bind {
let coap_addr = listen_addr(&coap_host, coap_port);
let coap_state = state.clone();
let coap_shutdown = shutdown_rx.clone();
tokio::spawn(async move {
coap::serve(coap_state, coap_addr, coap_shutdown, coap_max_in_flight).await;
});
}
let app = route::build_app(state, max_world_bytes);
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal(shutdown_tx))
.await
.unwrap();
}
fn print_auth_summary(tokens: &auth::Tokens, bind_ip: IpAddr) {
eprintln!("auth:");
eprintln!(
" read: {}",
if tokens.read_required() {
"token required"
} else {
"public (ELASTIK_READ_TOKEN not set)"
}
);
eprintln!(
" write: {}",
if tokens.write.is_some() {
"token required"
} else {
"disabled (ELASTIK_WRITE_TOKEN not set)"
}
);
eprintln!(
" approve: {}",
if tokens.approve.is_some() {
"token required"
} else {
"disabled (ELASTIK_APPROVE_TOKEN not set)"
}
);
if auth::env_set_but_empty("ELASTIK_READ_TOKEN") {
eprintln!(" warning: empty ELASTIK_READ_TOKEN treated as unset (reads public)");
}
if should_warn_public_read(bind_ip, tokens) {
eprintln!(
" WARNING: reads are public on non-loopback interface {bind_ip}; set ELASTIK_READ_TOKEN to gate reads."
);
}
if auth::env_set_but_empty("ELASTIK_WRITE_TOKEN") {
eprintln!(" warning: empty ELASTIK_WRITE_TOKEN treated as unset (PUT/POST disabled)");
}
if std::env::var("ELASTIK_TOKEN").is_ok() {
eprintln!(" warning: ELASTIK_TOKEN is deprecated; rename it to ELASTIK_WRITE_TOKEN.");
}
if auth::env_set_but_empty("ELASTIK_APPROVE_TOKEN") {
eprintln!(
" warning: empty ELASTIK_APPROVE_TOKEN treated as unset (DELETE/system writes disabled)"
);
}
if tokens.write.is_none() {
eprintln!(" warning: ELASTIK_WRITE_TOKEN not set; ordinary PUT/POST are disabled.");
}
if tokens.approve.is_none() {
eprintln!(
" warning: ELASTIK_APPROVE_TOKEN not set; DELETE and system writes are disabled."
);
}
}
async fn shutdown_signal(shutdown_tx: watch::Sender<bool>) {
wait_for_shutdown_signal().await;
eprintln!("elastik-core: shutdown signal received");
let _ = shutdown_tx.send(true);
}
#[cfg(unix)]
async fn wait_for_shutdown_signal() {
use tokio::signal::unix::{signal, SignalKind};
let mut sigterm = match signal(SignalKind::terminate()) {
Ok(sigterm) => sigterm,
Err(e) => {
eprintln!("elastik-core: failed to install SIGTERM handler: {e}; waiting for Ctrl-C");
let _ = tokio::signal::ctrl_c().await;
return;
}
};
tokio::select! {
_ = tokio::signal::ctrl_c() => {},
_ = sigterm.recv() => {},
}
}
#[cfg(not(unix))]
async fn wait_for_shutdown_signal() {
let _ = tokio::signal::ctrl_c().await;
}
pub(crate) fn can_write(world_name: &str, tier: auth::Tier) -> bool {
let needs_approve = needs_write_approve(world_name);
match tier {
auth::Tier::Anon => false,
auth::Tier::Read => false,
auth::Tier::Write => !needs_approve,
auth::Tier::Approve => true,
}
}
pub(crate) fn needs_write_approve(world_name: &str) -> bool {
exact_or_child(world_name, "lib")
|| exact_or_child(world_name, "etc")
|| exact_or_child(world_name, "boot")
|| exact_or_child(world_name, "usr")
|| exact_or_child(world_name, "var/log")
}
pub(crate) fn can_delete(tier: auth::Tier) -> bool {
matches!(tier, auth::Tier::Approve)
}
pub(crate) fn exact_or_child(world_name: &str, prefix: &str) -> bool {
world_name == prefix
|| world_name
.strip_prefix(prefix)
.is_some_and(|rest| rest.starts_with('/'))
}
pub(crate) fn can_read(core: &Core, tier: auth::Tier) -> bool {
!core.tokens.read_required()
|| matches!(
tier,
auth::Tier::Read | auth::Tier::Write | auth::Tier::Approve
)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::handler::{execute_delete, execute_get, execute_head, execute_post, execute_put};
use crate::http_semantics as hs;
use crate::middleware::{add_core_response_headers, stamp_core_response_headers};
use crate::route::world_handler;
use axum::body::Bytes;
use axum::extract::{Path as AxPath, State};
use axum::http::{header, HeaderMap, HeaderValue, Method, StatusCode};
use axum::response::Response;
use axum::routing::any;
use axum::Router;
use std::sync::atomic::Ordering;
use std::sync::{Mutex as TestMutex, OnceLock};
fn unwrap_response(phase: Phase) -> Response {
match phase {
Phase::ExecutedRead(r) | Phase::CommittedWrite(r) | Phase::Done(r) => r,
Phase::Error { resp, .. } => resp,
Phase::Received { .. }
| Phase::Authenticated { .. }
| Phase::PathValidated { .. }
| Phase::Dispatched { .. } => {
panic!("execute_* returned a non-terminal Phase variant")
}
}
}
fn env_lock() -> &'static TestMutex<()> {
static LOCK: OnceLock<TestMutex<()>> = OnceLock::new();
LOCK.get_or_init(|| TestMutex::new(()))
}
struct CoapEnvGuard {
host: Option<String>,
port: Option<String>,
}
impl CoapEnvGuard {
fn capture() -> Self {
Self {
host: std::env::var("ELASTIK_COAP_HOST").ok(),
port: std::env::var("ELASTIK_COAP_PORT").ok(),
}
}
}
impl Drop for CoapEnvGuard {
fn drop(&mut self) {
match &self.host {
Some(v) => std::env::set_var("ELASTIK_COAP_HOST", v),
None => std::env::remove_var("ELASTIK_COAP_HOST"),
}
match &self.port {
Some(v) => std::env::set_var("ELASTIK_COAP_PORT", v),
None => std::env::remove_var("ELASTIK_COAP_PORT"),
}
}
}
#[test]
fn hmac_key_requires_nonempty_semantic_content() {
assert!(hmac_key_from_env_value(None).is_none());
assert!(hmac_key_from_env_value(Some(String::new())).is_none());
assert!(hmac_key_from_env_value(Some(" \t\n".to_string())).is_none());
assert_eq!(
hmac_key_from_env_value(Some(" secret ".to_string())).unwrap(),
b" secret ".to_vec()
);
}
#[test]
fn resource_cap_env_zero_falls_back_to_default() {
let _guard = env_lock().lock().unwrap();
let key = format!("ELASTIK_TEST_ZERO_CAP_{}", std::process::id());
std::env::set_var(&key, "0");
assert_eq!(env_nonzero_usize(&key, 7), 7);
std::env::set_var(&key, "9");
assert_eq!(env_nonzero_usize(&key, 7), 9);
std::env::remove_var(&key);
}
#[test]
fn optional_storage_quota_zero_is_unlimited() {
let _guard = env_lock().lock().unwrap();
let key = format!("ELASTIK_TEST_STORAGE_CAP_{}", std::process::id());
std::env::remove_var(&key);
assert_eq!(env_optional_usize(&key), None);
std::env::set_var(&key, "");
assert_eq!(env_optional_usize(&key), None);
std::env::set_var(&key, " \t ");
assert_eq!(env_optional_usize(&key), None);
std::env::set_var(&key, "0");
assert_eq!(env_optional_usize(&key), None);
std::env::set_var(&key, "11");
assert_eq!(env_optional_usize(&key), Some(11));
std::env::set_var(&key, "10GB");
assert!(std::panic::catch_unwind(|| env_optional_usize(&key)).is_err());
std::env::remove_var(&key);
}
#[test]
fn sqlite_disk_full_maps_to_507() {
let err = rusqlite::Error::SqliteFailure(
rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_FULL),
None,
);
assert!(is_insufficient_storage_error(&err));
let resp = storage_error("test", err);
assert_eq!(resp.status(), StatusCode::INSUFFICIENT_STORAGE);
}
#[test]
fn non_storage_sqlite_errors_stay_500() {
let err = rusqlite::Error::SqliteFailure(
rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_CORRUPT),
None,
);
assert!(!is_insufficient_storage_error(&err));
let resp = storage_error("test", err);
assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR);
}
#[tokio::test]
async fn durable_storage_quota_returns_507_without_writing() {
let (mut core, dir) = test_core("storage-quota");
core.max_storage_bytes = Some(4);
let headers = HeaderMap::new();
let first = unwrap_response(
execute_put(
headers.clone(),
Bytes::from_static(b"1234"),
auth::Tier::Write,
"home/a".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(first.status(), StatusCode::CREATED);
let over = unwrap_response(
execute_put(
headers.clone(),
Bytes::from_static(b"5"),
auth::Tier::Write,
"home/b".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(over.status(), StatusCode::INSUFFICIENT_STORAGE);
assert_eq!(over.headers().get("x-storage-usage").unwrap(), "4");
assert_eq!(over.headers().get("x-storage-quota").unwrap(), "4");
assert_eq!(over.headers().get("x-storage-needed").unwrap(), "1");
assert!(core.read_world("home/b").unwrap().is_none());
let append = unwrap_response(
execute_post(
headers.clone(),
Bytes::from_static(b"5"),
auth::Tier::Write,
"home/a".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(append.status(), StatusCode::INSUFFICIENT_STORAGE);
assert_eq!(append.headers().get("x-storage-usage").unwrap(), "4");
assert_eq!(append.headers().get("x-storage-quota").unwrap(), "4");
assert_eq!(append.headers().get("x-storage-needed").unwrap(), "1");
assert_eq!(
core.read_world("home/a").unwrap().unwrap().body,
b"1234".to_vec()
);
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn concurrent_puts_to_distinct_worlds_do_not_overshoot_quota() {
let (mut core, dir) = test_core("quota-race");
let quota = 100;
core.max_storage_bytes = Some(quota);
let core = Arc::new(core);
let workers = 16;
let body_len = 12; let mut handles = Vec::with_capacity(workers);
for i in 0..workers {
let core = core.clone();
handles.push(tokio::spawn(async move {
let path = format!("home/race/{i}");
let body = Bytes::copy_from_slice(&vec![b'x'; body_len]);
unwrap_response(
execute_put(
HeaderMap::new(),
body,
auth::Tier::Write,
path.to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
)
}));
}
let mut accepted: usize = 0;
for handle in handles {
let resp = handle.await.unwrap();
match resp.status() {
StatusCode::CREATED | StatusCode::OK => accepted += 1,
StatusCode::INSUFFICIENT_STORAGE => {}
other => panic!("unexpected status: {other}"),
}
}
let used = core.storage_body_bytes.load(Ordering::Relaxed);
let counted = accepted * body_len;
assert_eq!(used, counted, "counter must equal sum of accepted bodies");
assert!(
used <= quota,
"counter must never exceed quota: {used} > {quota}"
);
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn concurrent_memory_puts_do_not_overshoot_max_memory_bytes() {
let (mut core, dir) = test_core("memory-quota-race");
let cap = 100;
core.max_memory_bytes = cap;
let core = Arc::new(core);
let workers = 16;
let body_len = 12; let mut handles = Vec::with_capacity(workers);
for i in 0..workers {
let core = core.clone();
handles.push(tokio::spawn(async move {
let path = format!("tmp/race/{i}");
let body = Bytes::copy_from_slice(&vec![b'm'; body_len]);
unwrap_response(
execute_put(
HeaderMap::new(),
body,
auth::Tier::Write,
path.to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
)
}));
}
let mut accepted: usize = 0;
for handle in handles {
let resp = handle.await.unwrap();
match resp.status() {
StatusCode::CREATED | StatusCode::OK => accepted += 1,
StatusCode::PAYLOAD_TOO_LARGE => {}
other => panic!("unexpected status: {other}"),
}
}
let used = core.mem.total_bytes();
let counted = accepted * body_len;
assert_eq!(
used, counted,
"memory total must equal sum of accepted bodies"
);
assert!(
used <= cap,
"memory total must never exceed cap: {used} > {cap}"
);
let _ = std::fs::remove_dir_all(dir);
}
#[test]
fn var_log_requires_approve_token() {
assert!(!can_write("var/log", auth::Tier::Anon));
assert!(!can_write("var/log", auth::Tier::Read));
assert!(!can_write("var/log", auth::Tier::Write));
assert!(can_write("var/log", auth::Tier::Approve));
assert!(!can_write("var/log/deletes", auth::Tier::Anon));
assert!(!can_write("var/log/deletes", auth::Tier::Read));
assert!(!can_write("var/log/deletes", auth::Tier::Write));
assert!(can_write("var/log/deletes", auth::Tier::Approve));
}
#[test]
fn delete_requires_approve_token() {
assert!(!can_delete(auth::Tier::Anon));
assert!(!can_delete(auth::Tier::Read));
assert!(!can_delete(auth::Tier::Write));
assert!(can_delete(auth::Tier::Approve));
}
#[test]
fn listen_addr_brackets_ipv6_hosts() {
assert_eq!(listen_addr("127.0.0.1", 3105), "127.0.0.1:3105");
assert_eq!(listen_addr("0.0.0.0", 3105), "0.0.0.0:3105");
assert_eq!(listen_addr("::1", 3105), "[::1]:3105");
assert_eq!(listen_addr("localhost", 3105), "localhost:3105");
}
#[test]
fn coap_bind_is_opt_in_by_port_env() {
let _lock = env_lock().lock().unwrap();
let _guard = CoapEnvGuard::capture();
std::env::remove_var("ELASTIK_COAP_HOST");
std::env::remove_var("ELASTIK_COAP_PORT");
assert_eq!(coap_bind_from_env(), None);
std::env::set_var("ELASTIK_COAP_HOST", "0.0.0.0");
assert_eq!(coap_bind_from_env(), None);
std::env::set_var("ELASTIK_COAP_PORT", "5683");
assert_eq!(coap_bind_from_env(), Some(("0.0.0.0".to_owned(), 5683)));
std::env::set_var("ELASTIK_COAP_HOST", "127.0.0.1");
std::env::set_var("ELASTIK_COAP_PORT", " ");
assert_eq!(coap_bind_from_env(), None);
std::env::set_var("ELASTIK_COAP_PORT", "not-a-port");
assert_eq!(coap_bind_from_env(), None);
}
#[test]
fn non_loopback_public_read_gets_warning_flag() {
let mut tokens = auth::Tokens {
read: None,
write: None,
approve: None,
};
assert!(!should_warn_public_read(
"127.0.0.1".parse::<IpAddr>().unwrap(),
&tokens
));
assert!(should_warn_public_read(
"0.0.0.0".parse::<IpAddr>().unwrap(),
&tokens
));
tokens.read = Some(b"reader".to_vec());
assert!(!should_warn_public_read(
"0.0.0.0".parse::<IpAddr>().unwrap(),
&tokens
));
}
#[tokio::test]
async fn put_and_post_enforce_world_size_cap() {
let (mut core, dir) = test_core("world-size-cap");
core.max_world_bytes = 4;
let headers = HeaderMap::new();
let too_big = unwrap_response(
execute_put(
headers.clone(),
Bytes::from_static(b"12345"),
auth::Tier::Write,
"home/too-big".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(too_big.status(), StatusCode::PAYLOAD_TOO_LARGE);
let ok = unwrap_response(
execute_put(
headers.clone(),
Bytes::from_static(b"1234"),
auth::Tier::Write,
"home/four".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(ok.status(), StatusCode::CREATED);
let append = unwrap_response(
execute_post(
headers.clone(),
Bytes::from_static(b"5"),
auth::Tier::Write,
"home/four".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(append.status(), StatusCode::PAYLOAD_TOO_LARGE);
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn memory_backend_enforces_total_quota() {
let (mut core, dir) = test_core("memory-quota");
core.max_memory_bytes = 4;
let headers = HeaderMap::new();
let first = unwrap_response(
execute_put(
headers.clone(),
Bytes::from_static(b"12"),
auth::Tier::Write,
"tmp/a".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(first.status(), StatusCode::CREATED);
let second = unwrap_response(
execute_put(
headers.clone(),
Bytes::from_static(b"34"),
auth::Tier::Write,
"tmp/b".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(second.status(), StatusCode::CREATED);
let third = unwrap_response(
execute_put(
headers.clone(),
Bytes::from_static(b"5"),
auth::Tier::Write,
"tmp/c".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(third.status(), StatusCode::PAYLOAD_TOO_LARGE);
let _ = std::fs::remove_dir_all(dir);
}
#[test]
fn system_namespace_roots_require_approve_even_if_called_directly() {
for name in ["lib", "etc", "boot", "usr"] {
assert!(!can_write(name, auth::Tier::Anon), "{name}");
assert!(!can_write(name, auth::Tier::Read), "{name}");
assert!(!can_write(name, auth::Tier::Write), "{name}");
assert!(can_write(name, auth::Tier::Approve), "{name}");
}
}
#[test]
fn non_log_var_still_accepts_auth_token() {
assert!(!can_write("var/cache/rag", auth::Tier::Anon));
assert!(!can_write("var/cache/rag", auth::Tier::Read));
assert!(can_write("var/cache/rag", auth::Tier::Write));
assert!(can_write("var/cache/rag", auth::Tier::Approve));
}
#[test]
fn read_token_is_optional_but_gates_reads_when_set() {
let (mut core, dir) = test_core("read-token");
assert!(can_read(&core, auth::Tier::Anon));
core.tokens.read = Some(b"reader".to_vec());
assert!(!can_read(&core, auth::Tier::Anon));
assert!(can_read(&core, auth::Tier::Read));
assert!(can_read(&core, auth::Tier::Write));
assert!(can_read(&core, auth::Tier::Approve));
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn get_and_head_require_read_token_when_enabled() {
let (mut core, dir) = test_core("read-token-handlers");
core.write_world("home/private", b"secret", "text/plain", &[])
.unwrap();
core.tokens.read = Some(b"reader".to_vec());
let headers = HeaderMap::new();
let get_anon = unwrap_response(
execute_get(
headers.clone(),
auth::Tier::Anon,
"home/private".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(get_anon.status(), StatusCode::UNAUTHORIZED);
let head_reader = unwrap_response(
execute_head(
headers.clone(),
auth::Tier::Read,
"home/private".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(head_reader.status(), StatusCode::OK);
let _ = std::fs::remove_dir_all(dir);
}
#[test]
fn unauthorized_responses_advertise_bearer_challenge() {
let resp = unauthorized("read requires read token");
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
assert_eq!(
resp.headers().get(header::WWW_AUTHENTICATE).unwrap(),
"Bearer realm=\"elastik\""
);
assert_eq!(
resp.headers().get(header::CONTENT_TYPE).unwrap(),
"text/plain; charset=utf-8"
);
}
#[test]
fn core_response_headers_are_core_owned() {
let mut headers = HeaderMap::new();
headers.insert(header::VARY, HeaderValue::from_static("*"));
headers.insert("x-request-id", HeaderValue::from_static("stale"));
headers.insert("x-elapsed-us", HeaderValue::from_static("999"));
headers.insert("x-content-type-options", HeaderValue::from_static("sniff"));
stamp_core_response_headers(42, 7, &mut headers);
assert_eq!(headers.get("x-request-id").unwrap(), "42");
assert_eq!(headers.get("x-elapsed-us").unwrap(), "7");
assert_eq!(headers.get(header::VARY).unwrap(), "Authorization");
assert_eq!(headers.get("x-content-type-options").unwrap(), "nosniff");
}
#[test]
fn poisoned_persisted_headers_are_not_replayed() {
let mut out = Vec::new();
hs::apply_meta_headers(
&[
(
"x-custom".to_owned(),
"evil\r\nset-cookie: admin=true".to_owned(),
),
("set-cookie".to_owned(), "sid=admin; Path=/".to_owned()),
("clear-site-data".to_owned(), "\"cookies\"".to_owned()),
("bad name".to_owned(), "ok".to_owned()),
("x-safe".to_owned(), "ok".to_owned()),
],
&mut out,
);
assert_eq!(out.len(), 1);
assert_eq!(out[0].0.as_str(), "x-safe");
assert_eq!(out[0].1, "ok");
}
#[test]
fn canonicalize_preserves_explicit_namespaces() {
assert_eq!(canonicalize_path("/home/tmp/foo"), "home/tmp/foo");
assert_eq!(canonicalize_path("/home/etc/foo"), "home/etc/foo");
assert_eq!(canonicalize_path("/tmp/foo"), "tmp/foo");
assert_eq!(canonicalize_path("/etc/foo"), "etc/foo");
assert_eq!(canonicalize_path("/foo"), "home/foo");
}
#[test]
fn control_bytes_are_not_valid_world_names() {
assert!(valid_world_name("home/ok"));
assert!(!valid_world_name("home/bad\nname"));
assert!(!valid_world_name(""));
}
#[test]
fn dot_segments_empty_segments_and_backslashes_are_not_valid_world_names() {
assert!(!valid_world_name("home/../etc/secret"));
assert!(!valid_world_name("home/%2E%2E/etc/secret"));
assert!(!valid_world_name("home/./x"));
assert!(!valid_world_name("home//x"));
assert!(!valid_world_name("home/x/"));
assert!(!valid_world_name("home\\x"));
assert_eq!(
validate_world_name("home/%2E%2E/etc/secret"),
Err("world path contains dot or encoded-dot segment")
);
assert_eq!(
validate_world_name("home//x"),
Err("world path has empty segment")
);
assert_eq!(
validate_world_name("home\\x"),
Err("world path contains backslash")
);
}
#[test]
fn namespace_roots_and_proc_subtree_are_not_world_names() {
for name in [
"home",
"tmp",
"dev",
"sys",
"proc",
"proc/anything",
"etc",
"lib",
"boot",
"usr",
"var",
"var/log",
] {
assert!(!valid_world_name(name), "{name}");
}
assert!(valid_world_name("home/x"));
assert!(valid_world_name("var/log/deletes"));
}
#[test]
fn byte_ranges_cover_normal_open_and_suffix_forms() {
let mut h = HeaderMap::new();
assert_eq!(hs::parse_range(&h, 10), Ok(None));
h.insert(header::RANGE, HeaderValue::from_static("bytes=2-5"));
assert_eq!(hs::parse_range(&h, 10), Ok(Some((2, 5))));
h.insert(header::RANGE, HeaderValue::from_static("bytes=7-"));
assert_eq!(hs::parse_range(&h, 10), Ok(Some((7, 9))));
h.insert(header::RANGE, HeaderValue::from_static("bytes=-3"));
assert_eq!(hs::parse_range(&h, 10), Ok(Some((7, 9))));
h.insert(header::RANGE, HeaderValue::from_static("bytes=8-99"));
assert_eq!(hs::parse_range(&h, 10), Ok(Some((8, 9))));
h.insert(header::RANGE, HeaderValue::from_static("bytes=11-12"));
assert_eq!(hs::parse_range(&h, 10), Err(()));
h.insert(header::RANGE, HeaderValue::from_static("bytes=0-1,4-5"));
assert_eq!(hs::parse_range(&h, 10), Ok(None));
}
#[tokio::test]
async fn get_and_head_honor_single_byte_range() {
let (core, dir) = test_core("range-handler");
core.write_world("home/range", b"abcdef", "text/plain", &[])
.unwrap();
let mut headers = HeaderMap::new();
headers.insert(header::RANGE, HeaderValue::from_static("bytes=1-3"));
let get = unwrap_response(
execute_get(
headers.clone(),
auth::Tier::Anon,
"home/range".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(get.status(), StatusCode::PARTIAL_CONTENT);
assert_eq!(
get.headers().get(header::CONTENT_RANGE).unwrap(),
"bytes 1-3/6"
);
assert_eq!(get.headers().get(header::CONTENT_LENGTH).unwrap(), "3");
let head = unwrap_response(
execute_head(
headers.clone(),
auth::Tier::Anon,
"home/range".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(head.status(), StatusCode::PARTIAL_CONTENT);
assert_eq!(
head.headers().get(header::CONTENT_RANGE).unwrap(),
"bytes 1-3/6"
);
assert_eq!(head.headers().get(header::CONTENT_LENGTH).unwrap(), "3");
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn get_and_head_advertise_accept_ranges_on_full_body() {
let (core, dir) = test_core("accept-ranges");
core.write_world("home/ranges", b"abcdef", "text/plain", &[])
.unwrap();
let headers = HeaderMap::new();
let get = unwrap_response(
execute_get(
headers.clone(),
auth::Tier::Anon,
"home/ranges".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(get.status(), StatusCode::OK);
assert_eq!(get.headers().get(header::ACCEPT_RANGES).unwrap(), "bytes");
let head = unwrap_response(
execute_head(
headers.clone(),
auth::Tier::Anon,
"home/ranges".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(head.status(), StatusCode::OK);
assert_eq!(head.headers().get(header::ACCEPT_RANGES).unwrap(), "bytes");
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn unsatisfied_range_returns_416_with_content_range() {
let (core, dir) = test_core("range-416");
core.write_world("home/range", b"abcdef", "text/plain", &[])
.unwrap();
let mut headers = HeaderMap::new();
headers.insert(header::RANGE, HeaderValue::from_static("bytes=99-100"));
let get = unwrap_response(
execute_get(
headers.clone(),
auth::Tier::Anon,
"home/range".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(get.status(), StatusCode::RANGE_NOT_SATISFIABLE);
assert_eq!(
get.headers().get(header::CONTENT_RANGE).unwrap(),
"bytes */6"
);
assert_eq!(get.headers().get(header::ACCEPT_RANGES).unwrap(), "bytes");
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn multi_range_is_ignored_and_returns_full_body() {
let (core, dir) = test_core("multi-range");
core.write_world("home/range", b"abcdef", "text/plain", &[])
.unwrap();
let mut headers = HeaderMap::new();
headers.insert(header::RANGE, HeaderValue::from_static("bytes=0-1,4-5"));
let get = unwrap_response(
execute_get(
headers.clone(),
auth::Tier::Anon,
"home/range".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(get.status(), StatusCode::OK);
assert!(get.headers().get(header::CONTENT_RANGE).is_none());
assert_eq!(get.headers().get(header::CONTENT_LENGTH).unwrap(), "6");
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn world_reads_advertise_monitor_and_collection_links() {
let (core, dir) = test_core("link-headers");
core.write_world("home/links", b"hello", "text/plain", &[])
.unwrap();
let headers = HeaderMap::new();
let get = unwrap_response(
execute_get(
headers.clone(),
auth::Tier::Anon,
"home/links".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
let links: Vec<_> = get.headers().get_all(header::LINK).iter().collect();
assert_eq!(links.len(), 2);
assert!(links
.iter()
.any(|v| *v == "</listen/home/links>; rel=\"monitor\""));
assert!(links
.iter()
.any(|v| *v == "</proc/worlds>; rel=\"collection\""));
let head = unwrap_response(
execute_head(
headers.clone(),
auth::Tier::Anon,
"home/links".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(head.headers().get_all(header::LINK).iter().count(), 2);
let _ = std::fs::remove_dir_all(dir);
}
#[test]
fn if_range_controls_whether_range_is_applied() {
let mut h = HeaderMap::new();
h.insert(header::RANGE, HeaderValue::from_static("bytes=1-3"));
h.insert(
header::IF_RANGE,
HeaderValue::from_static("\"hmac-current\""),
);
assert_eq!(hs::effective_range(&h, 6, "hmac-current"), Ok(Some((1, 3))));
h.insert(header::IF_RANGE, HeaderValue::from_static("\"hmac-stale\""));
assert_eq!(hs::effective_range(&h, 6, "hmac-current"), Ok(None));
h.insert(
header::IF_RANGE,
HeaderValue::from_static("W/\"hmac-current\""),
);
assert_eq!(hs::effective_range(&h, 6, "hmac-current"), Ok(None));
}
#[tokio::test]
async fn stale_if_range_returns_full_body() {
let (core, dir) = test_core("if-range-stale");
core.write_world("home/range", b"abcdef", "text/plain", &[])
.unwrap();
let mut headers = HeaderMap::new();
headers.insert(header::RANGE, HeaderValue::from_static("bytes=1-3"));
headers.insert(header::IF_RANGE, HeaderValue::from_static("\"hmac-stale\""));
let get = unwrap_response(
execute_get(
headers.clone(),
auth::Tier::Anon,
"home/range".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(get.status(), StatusCode::OK);
assert!(get.headers().get(header::CONTENT_RANGE).is_none());
assert_eq!(get.headers().get(header::CONTENT_LENGTH).unwrap(), "6");
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn get_and_head_honor_if_none_match_cache_revalidation() {
let (core, dir) = test_core("read-if-none-match");
let h = world::write_with_audit(
&core.data,
"home/cache",
b"cached body",
"text/plain",
&[],
&core.hmac_key,
)
.unwrap();
let etag = format!("\"{}\"", hs::hmac_etag(&h));
let mut headers = HeaderMap::new();
headers.insert(header::IF_NONE_MATCH, HeaderValue::from_str(&etag).unwrap());
let get = unwrap_response(
execute_get(
headers.clone(),
auth::Tier::Anon,
"home/cache".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(get.status(), StatusCode::NOT_MODIFIED);
assert_eq!(get.headers().get(header::ETAG).unwrap(), etag.as_str());
assert!(get
.headers()
.get_all(header::LINK)
.iter()
.any(|v| v == "</listen/home/cache>; rel=\"monitor\""));
let head = unwrap_response(
execute_head(
headers.clone(),
auth::Tier::Anon,
"home/cache".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(head.status(), StatusCode::NOT_MODIFIED);
assert_eq!(head.headers().get(header::ETAG).unwrap(), etag.as_str());
headers.insert(
header::IF_NONE_MATCH,
HeaderValue::from_static("\"hmac-stale\""),
);
let get = unwrap_response(
execute_get(
headers.clone(),
auth::Tier::Anon,
"home/cache".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(get.status(), StatusCode::OK);
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn options_and_405_advertise_allow_headers() {
let (core, dir) = test_core("allow");
let core = std::sync::Arc::new(core);
let options = options_response(WORLD_ALLOW);
assert_eq!(options.status(), StatusCode::NO_CONTENT);
assert_eq!(options.headers().get(header::ALLOW).unwrap(), WORLD_ALLOW);
let patch = pipeline::run(
Method::PATCH,
"home/allow".to_string(),
HeaderMap::new(),
Bytes::new(),
&core,
0,
)
.await;
assert_eq!(patch.status(), StatusCode::METHOD_NOT_ALLOWED);
assert_eq!(patch.headers().get(header::ALLOW).unwrap(), WORLD_ALLOW);
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn root_and_proc_endpoints_advertise_head_options_and_405() {
let root_head = root_hint(Method::HEAD).await;
assert_eq!(root_head.status(), StatusCode::OK);
assert_eq!(
root_head.headers().get(header::CONTENT_TYPE).unwrap(),
"text/plain; charset=utf-8"
);
assert!(root_head.headers().get(header::CONTENT_LENGTH).is_some());
let root_options = root_hint(Method::OPTIONS).await;
assert_eq!(root_options.status(), StatusCode::NO_CONTENT);
assert_eq!(
root_options.headers().get(header::ALLOW).unwrap(),
ROOT_ALLOW
);
let root_post = root_hint(Method::POST).await;
assert_eq!(root_post.status(), StatusCode::METHOD_NOT_ALLOWED);
assert_eq!(root_post.headers().get(header::ALLOW).unwrap(), ROOT_ALLOW);
let version_head = proc_version(Method::HEAD).await;
assert_eq!(version_head.status(), StatusCode::OK);
assert!(version_head.headers().get(header::CONTENT_LENGTH).is_some());
let version_delete = proc_version(Method::DELETE).await;
assert_eq!(version_delete.status(), StatusCode::METHOD_NOT_ALLOWED);
assert_eq!(
version_delete.headers().get(header::ALLOW).unwrap(),
PROC_ALLOW
);
}
#[tokio::test]
async fn proc_worlds_head_options_and_405_are_plain_http() {
let (core, dir) = test_core("proc-worlds-http");
core.write_world("home/a", b"a", "text/plain", &[]).unwrap();
let state = Arc::new(core);
let headers = HeaderMap::new();
let head = proc_worlds(State(state.clone()), Method::HEAD, headers.clone()).await;
assert_eq!(head.status(), StatusCode::OK);
assert_eq!(
head.headers().get(header::CONTENT_TYPE).unwrap(),
"text/plain; charset=utf-8"
);
assert!(head.headers().get(header::CONTENT_LENGTH).is_some());
let options = proc_worlds(State(state.clone()), Method::OPTIONS, headers.clone()).await;
assert_eq!(options.status(), StatusCode::NO_CONTENT);
assert_eq!(options.headers().get(header::ALLOW).unwrap(), PROC_ALLOW);
let delete = proc_worlds(State(state), Method::DELETE, headers).await;
assert_eq!(delete.status(), StatusCode::METHOD_NOT_ALLOWED);
assert_eq!(delete.headers().get(header::ALLOW).unwrap(), PROC_ALLOW);
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn proc_namespace_is_reserved_beyond_declared_endpoints() {
let not_found = proc_reserved(Method::GET).await;
assert_eq!(not_found.status(), StatusCode::NOT_FOUND);
let head = proc_reserved(Method::HEAD).await;
assert_eq!(head.status(), StatusCode::NOT_FOUND);
let options = proc_reserved(Method::OPTIONS).await;
assert_eq!(options.status(), StatusCode::NO_CONTENT);
assert_eq!(options.headers().get(header::ALLOW).unwrap(), PROC_ALLOW);
let put = proc_reserved(Method::PUT).await;
assert_eq!(put.status(), StatusCode::METHOD_NOT_ALLOWED);
assert_eq!(put.headers().get(header::ALLOW).unwrap(), PROC_ALLOW);
}
#[tokio::test]
async fn proc_audit_verify_reports_valid_chain_in_headers() {
let (core, dir) = test_core("proc-audit-valid");
let h = world::write_with_audit(
&core.data,
"home/audit-ok",
b"hello",
"text/plain",
&[("x-meta-author".to_owned(), "ranger".to_owned())],
&core.hmac_key,
)
.unwrap();
let state = Arc::new(core);
let resp = proc_audit_verify(
State(state),
Method::HEAD,
AxPath("home/audit-ok/verify".to_owned()),
HeaderMap::new(),
)
.await;
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(resp.headers().get("x-audit-valid").unwrap(), "true");
assert_eq!(resp.headers().get("x-audit-events").unwrap(), "1");
assert_eq!(
resp.headers().get("x-audit-latest").unwrap(),
&format!("hmac-{h}")
);
assert_eq!(resp.headers().get(header::CONTENT_LENGTH).unwrap(), "0");
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn proc_audit_verify_reports_broken_chain_in_headers() {
let (core, dir) = test_core("proc-audit-broken");
world::write_with_audit(
&core.data,
"home/audit-broken",
b"hello",
"text/plain",
&[],
&core.hmac_key,
)
.unwrap();
let db = world::world_db(&core.data, "home/audit-broken");
let c = rusqlite::Connection::open(db).unwrap();
c.execute("UPDATE events SET hmac='bad' WHERE id=1", [])
.unwrap();
let state = Arc::new(core);
let resp = proc_audit_verify(
State(state),
Method::HEAD,
AxPath("home/audit-broken/verify".to_owned()),
HeaderMap::new(),
)
.await;
assert_eq!(resp.status(), StatusCode::CONFLICT);
assert_eq!(resp.headers().get("x-audit-valid").unwrap(), "false");
assert_eq!(resp.headers().get("x-audit-break-at").unwrap(), "0");
assert_eq!(resp.headers().get("x-audit-actual").unwrap(), "hmac-bad");
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn proc_audit_verify_escapes_tampered_header_values() {
let (core, dir) = test_core("proc-audit-header-escape");
world::write_with_audit(
&core.data,
"home/audit-escaped",
b"hello",
"text/plain",
&[],
&core.hmac_key,
)
.unwrap();
let db = world::world_db(&core.data, "home/audit-escaped");
let c = rusqlite::Connection::open(db).unwrap();
c.execute(
"UPDATE events SET hmac=? WHERE id=1",
["bad\nInjected: yes"],
)
.unwrap();
let state = Arc::new(core);
let resp = proc_audit_verify(
State(state),
Method::HEAD,
AxPath("home/audit-escaped/verify".to_owned()),
HeaderMap::new(),
)
.await;
assert_eq!(resp.status(), StatusCode::CONFLICT);
assert_eq!(
resp.headers().get("x-audit-actual").unwrap(),
"hmac-bad\\x0aInjected: yes"
);
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn proc_audit_verify_reports_memory_world_not_applicable() {
let (core, dir) = test_core("proc-audit-memory");
core.write_world("tmp/scratch", b"draft", "text/plain", &[])
.unwrap();
let state = Arc::new(core);
let resp = proc_audit_verify(
State(state),
Method::HEAD,
AxPath("tmp/scratch/verify".to_owned()),
HeaderMap::new(),
)
.await;
assert_eq!(resp.status(), StatusCode::NO_CONTENT);
assert_eq!(resp.headers().get("x-audit-valid").unwrap(), "n/a");
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn proc_audit_verify_missing_disk_world_does_not_create_db() {
let (core, dir) = test_core("proc-audit-missing-no-create");
let db = world::world_db(&core.data, "home/missing-audit");
assert!(!db.exists());
let state = Arc::new(core);
let resp = proc_audit_verify(
State(state),
Method::HEAD,
AxPath("home/missing-audit/verify".to_owned()),
HeaderMap::new(),
)
.await;
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
assert!(!db.exists());
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn put_created_returns_location() {
let (core, dir) = test_core("put-location");
let headers = HeaderMap::new();
let resp = unwrap_response(
execute_put(
headers.clone(),
Bytes::from_static(b"new"),
auth::Tier::Write,
"home/created".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(resp.status(), StatusCode::CREATED);
assert_eq!(
resp.headers().get(header::LOCATION).unwrap(),
"/home/created"
);
let resp = unwrap_response(
execute_put(
headers.clone(),
Bytes::from_static(b"again"),
auth::Tier::Write,
"home/created".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(resp.status(), StatusCode::OK);
assert!(resp.headers().get(header::LOCATION).is_none());
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn location_and_link_headers_percent_encode_world_urls() {
let (core, dir) = test_core("encoded-headers");
let headers = HeaderMap::new();
let resp = unwrap_response(
execute_put(
headers.clone(),
Bytes::from_static(b"new"),
auth::Tier::Write,
"home/café report".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(resp.status(), StatusCode::CREATED);
assert_eq!(
resp.headers().get(header::LOCATION).unwrap(),
"/home/caf%C3%A9%20report"
);
let get = unwrap_response(
execute_get(
headers.clone(),
auth::Tier::Anon,
"home/café report".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
let links: Vec<_> = get.headers().get_all(header::LINK).iter().collect();
assert!(links
.iter()
.any(|v| *v == "</listen/home/caf%C3%A9%20report>; rel=\"monitor\""));
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn unicode_worlds_roundtrip_body_headers_and_proc_listing() {
let (core, dir) = test_core("unicode-roundtrip");
let headers = vec![(
"content-disposition".to_string(),
"attachment; filename*=UTF-8''%E6%8A%A5%E5%91%8A.pdf".to_string(),
)];
core.write_world(
"home/销售/报告",
"ä½ å¥½ï¼Œä¸–ç•Œ".as_bytes(),
"text/plain; charset=utf-8",
&headers,
)
.unwrap();
let req_headers = HeaderMap::new();
let get = unwrap_response(
execute_get(
req_headers.clone(),
auth::Tier::Anon,
"home/销售/报告".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(get.status(), StatusCode::OK);
assert_eq!(
get.headers().get(header::CONTENT_TYPE).unwrap(),
"text/plain; charset=utf-8"
);
assert_eq!(
get.headers().get(header::CONTENT_DISPOSITION).unwrap(),
"attachment; filename*=UTF-8''%E6%8A%A5%E5%91%8A.pdf"
);
assert!(
get.headers()
.get_all(header::LINK)
.iter()
.any(|v| *v
== "</listen/home/%E9%94%80%E5%94%AE/%E6%8A%A5%E5%91%8A>; rel=\"monitor\"")
);
let names = store::list_all(&core.data, &core.mem);
assert_eq!(world_list_body(&names.unwrap()), "home/销售/报告\n");
let _ = std::fs::remove_dir_all(dir);
}
#[test]
fn etag_lists_match_http_strong_and_weak_rules() {
assert!(hs::etag_list_strong_matches("\"hmac-abc\"", "hmac-abc"));
assert!(hs::etag_list_strong_matches(
"\"other\", \"hmac-abc\"",
"hmac-abc"
));
assert!(hs::etag_list_strong_matches("*", "hmac-abc"));
assert!(!hs::etag_list_strong_matches("W/\"hmac-abc\"", "hmac-abc"));
assert!(!hs::etag_list_strong_matches("\"other\"", "hmac-abc"));
assert!(hs::etag_list_weak_matches("W/\"hmac-abc\"", "hmac-abc"));
}
#[test]
fn if_none_match_star_blocks_existing_world() {
let (core, dir) = test_core("if-none-match-star");
core.write_world("home/cas", b"one", "text/plain; charset=utf-8", &[])
.unwrap();
let mut headers = HeaderMap::new();
headers.insert(header::IF_NONE_MATCH, HeaderValue::from_static("*"));
assert!(hs::check_write_preconditions(&core, "home/cas", &headers).is_err());
assert!(hs::check_write_preconditions(&core, "home/new", &headers).is_ok());
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn put_and_post_honor_write_preconditions_at_handler_level() {
let (core, dir) = test_core("write-preconditions");
let h = world::write_with_audit(
&core.data,
"home/cas",
b"one",
"text/plain; charset=utf-8",
&[],
&core.hmac_key,
)
.unwrap();
let mut stale = HeaderMap::new();
stale.insert(header::IF_MATCH, HeaderValue::from_static("\"hmac-stale\""));
let put = unwrap_response(
execute_put(
stale.clone(),
Bytes::from_static(b"two"),
auth::Tier::Write,
"home/cas".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(put.status(), StatusCode::PRECONDITION_FAILED);
let post = unwrap_response(
execute_post(
stale.clone(),
Bytes::from_static(b" plus"),
auth::Tier::Write,
"home/cas".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(post.status(), StatusCode::PRECONDITION_FAILED);
let mut good = HeaderMap::new();
good.insert(
header::IF_MATCH,
HeaderValue::from_str(&format!("\"{}\"", hs::hmac_etag(&h))).unwrap(),
);
let post = unwrap_response(
execute_post(
good.clone(),
Bytes::from_static(b" plus"),
auth::Tier::Write,
"home/cas".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(post.status(), StatusCode::OK);
let _ = std::fs::remove_dir_all(dir);
}
#[test]
fn if_match_accepts_current_hmac_etag_only() {
let (core, dir) = test_core("if-match-hmac");
core.write_world("home/cas", b"one", "text/plain; charset=utf-8", &[])
.unwrap();
let mut conn = world::open(&core.data, "home/cas").unwrap();
let h = audit::append_with_conn(
&mut conn,
"put",
"home/cas",
&world::sha256_hex(b"one"),
3,
"text/plain; charset=utf-8",
&[],
&core.hmac_key,
)
.unwrap();
drop(conn);
let etag = format!("\"{}\"", hs::hmac_etag(&h));
let mut good = HeaderMap::new();
good.insert(header::IF_MATCH, HeaderValue::from_str(&etag).unwrap());
assert!(hs::check_write_preconditions(&core, "home/cas", &good).is_ok());
let mut stale = HeaderMap::new();
stale.insert(header::IF_MATCH, HeaderValue::from_static("\"hmac-stale\""));
assert!(hs::check_write_preconditions(&core, "home/cas", &stale).is_err());
let _ = std::fs::remove_dir_all(dir);
}
#[test]
fn request_content_type_preserves_http_content_type_verbatim() {
let mut headers = HeaderMap::new();
headers.insert(
header::CONTENT_TYPE,
HeaderValue::from_static("application/pdf"),
);
assert_eq!(hs::request_content_type(&headers), "application/pdf");
headers.insert(
header::CONTENT_TYPE,
HeaderValue::from_static("text/html; charset=utf-8"),
);
assert_eq!(
hs::request_content_type(&headers),
"text/html; charset=utf-8"
);
headers.clear();
assert_eq!(
hs::request_content_type(&headers),
"application/octet-stream"
);
}
#[test]
fn request_meta_headers_persist_default_representation_headers_only() {
let mut headers = HeaderMap::new();
headers.insert(header::CONTENT_TYPE, HeaderValue::from_static("image/png"));
headers.insert(header::CONTENT_ENCODING, HeaderValue::from_static("gzip"));
headers.insert(header::CONTENT_LANGUAGE, HeaderValue::from_static("zh-CN"));
headers.insert(
header::CONTENT_DISPOSITION,
HeaderValue::from_static("attachment; filename=\"report.pdf\""),
);
headers.insert(
header::CACHE_CONTROL,
HeaderValue::from_static("max-age=60"),
);
headers.insert("access-control-allow-origin", HeaderValue::from_static("*"));
headers.insert(
"access-control-allow-methods",
HeaderValue::from_static("GET, HEAD"),
);
headers.insert(
"access-control-expose-headers",
HeaderValue::from_static("ETag"),
);
headers.insert(
"content-security-policy",
HeaderValue::from_static("default-src 'self'"),
);
headers.insert("x-frame-options", HeaderValue::from_static("DENY"));
headers.insert("permissions-policy", HeaderValue::from_static("camera=()"));
headers.insert(
"cross-origin-resource-policy",
HeaderValue::from_static("same-origin"),
);
headers.insert(
"x-content-type-options",
HeaderValue::from_static("nosniff"),
);
headers.insert("x-future-http-thing", HeaderValue::from_static("ok"));
headers.insert("x-meta-author", HeaderValue::from_static("ranger"));
headers.insert(
header::AUTHORIZATION,
HeaderValue::from_static("Bearer secret"),
);
headers.insert(
"proxy-authorization",
HeaderValue::from_static("Bearer secret"),
);
headers.insert(header::COOKIE, HeaderValue::from_static("sid=secret"));
headers.insert(header::SET_COOKIE, HeaderValue::from_static("sid=secret"));
headers.insert(header::HOST, HeaderValue::from_static("localhost:3105"));
headers.insert(header::CONNECTION, HeaderValue::from_static("keep-alive"));
headers.insert("keep-alive", HeaderValue::from_static("timeout=5"));
headers.insert(
header::TRANSFER_ENCODING,
HeaderValue::from_static("chunked"),
);
headers.insert(header::TE, HeaderValue::from_static("trailers"));
headers.insert(header::TRAILER, HeaderValue::from_static("expires"));
headers.insert(header::UPGRADE, HeaderValue::from_static("websocket"));
headers.insert("http2-settings", HeaderValue::from_static("abc"));
headers.insert(header::CONTENT_LENGTH, HeaderValue::from_static("999"));
headers.insert(header::ETAG, HeaderValue::from_static("\"fake\""));
headers.insert(header::ALLOW, HeaderValue::from_static("GET"));
headers.insert(header::LOCATION, HeaderValue::from_static("/elsewhere"));
headers.insert(header::LINK, HeaderValue::from_static("</x>; rel=\"next\""));
headers.insert(
header::WWW_AUTHENTICATE,
HeaderValue::from_static("Bearer realm=\"x\""),
);
headers.insert(header::ACCEPT, HeaderValue::from_static("text/html"));
headers.insert(header::ACCEPT_ENCODING, HeaderValue::from_static("gzip"));
headers.insert(header::ACCEPT_LANGUAGE, HeaderValue::from_static("zh-CN"));
headers.insert("accept-charset", HeaderValue::from_static("utf-8"));
headers.insert(header::RANGE, HeaderValue::from_static("bytes=0-1"));
headers.insert(header::IF_MATCH, HeaderValue::from_static("\"abc\""));
headers.insert(header::IF_NONE_MATCH, HeaderValue::from_static("\"abc\""));
headers.insert(header::IF_RANGE, HeaderValue::from_static("\"abc\""));
headers.insert(
header::IF_MODIFIED_SINCE,
HeaderValue::from_static("Wed, 21 Oct 2015 07:28:00 GMT"),
);
headers.insert(
header::IF_UNMODIFIED_SINCE,
HeaderValue::from_static("Wed, 21 Oct 2015 07:28:00 GMT"),
);
headers.insert(header::EXPECT, HeaderValue::from_static("100-continue"));
headers.insert("sec-fetch-mode", HeaderValue::from_static("cors"));
headers.insert("sec-ch-ua", HeaderValue::from_static("\"Chromium\""));
headers.insert("dnt", HeaderValue::from_static("1"));
headers.insert(
header::ORIGIN,
HeaderValue::from_static("https://example.com"),
);
headers.insert(
header::REFERER,
HeaderValue::from_static("https://example.com/"),
);
headers.insert(header::USER_AGENT, HeaderValue::from_static("curl"));
headers.insert(header::SERVER, HeaderValue::from_static("fake"));
headers.insert(
header::DATE,
HeaderValue::from_static("Wed, 21 Oct 2015 07:28:00 GMT"),
);
headers.insert(header::AGE, HeaderValue::from_static("10"));
headers.insert(header::VARY, HeaderValue::from_static("*"));
headers.insert("via", HeaderValue::from_static("1.1 proxy"));
headers.insert("forwarded", HeaderValue::from_static("for=127.0.0.1"));
headers.insert("x-forwarded-for", HeaderValue::from_static("127.0.0.1"));
headers.insert("x-forwarded-host", HeaderValue::from_static("example.com"));
headers.insert("x-forwarded-proto", HeaderValue::from_static("https"));
headers.insert("x-real-ip", HeaderValue::from_static("203.0.113.7"));
headers.insert("true-client-ip", HeaderValue::from_static("203.0.113.7"));
headers.insert("client-ip", HeaderValue::from_static("203.0.113.7"));
headers.insert("clear-site-data", HeaderValue::from_static("\"cookies\""));
headers.insert(
"traceparent",
HeaderValue::from_static("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"),
);
headers.insert(
"tracestate",
HeaderValue::from_static("rojo=00f067aa0ba902b7"),
);
headers.insert(
"baggage",
HeaderValue::from_static("userId=alice,serverNode=DF%2028"),
);
headers.insert(
"b3",
HeaderValue::from_static("80f198ee56343ba864fe8b2a57d3eff7-e457b5a2e4d86bd1-1"),
);
headers.insert(
"x-b3-traceid",
HeaderValue::from_static("80f198ee56343ba864fe8b2a57d3eff7"),
);
headers.insert("x-b3-spanid", HeaderValue::from_static("e457b5a2e4d86bd1"));
headers.insert("x-b3-sampled", HeaderValue::from_static("1"));
headers.insert(
"x-amzn-trace-id",
HeaderValue::from_static("Root=1-5759e988-bd862e3fe1be46a994272793"),
);
headers.insert("cf-ray", HeaderValue::from_static("8f1234567abcdef0-IAD"));
headers.insert("cf-connecting-ip", HeaderValue::from_static("203.0.113.7"));
headers.insert(
"cf-visitor",
HeaderValue::from_static("{\"scheme\":\"https\"}"),
);
headers.insert(
"http3-settings",
HeaderValue::from_static("AAMAAABkAARAAgAAAAAEAIAAAAA"),
);
headers.insert("pragma", HeaderValue::from_static("no-cache"));
let allowlist = hs::HeaderAllowlist::empty();
let user_deny = hs::HeaderAllowlist::empty();
let meta = hs::request_meta_headers(&headers, &allowlist, &user_deny);
let has = |name: &str| meta.iter().any(|(n, _)| n == name);
assert!(meta.contains(&("content-encoding".to_string(), "gzip".to_string())));
assert!(meta.contains(&("content-language".to_string(), "zh-CN".to_string())));
assert!(meta.contains(&(
"content-disposition".to_string(),
"attachment; filename=\"report.pdf\"".to_string()
)));
assert!(meta.contains(&("cache-control".to_string(), "max-age=60".to_string())));
assert!(meta.contains(&("access-control-allow-origin".to_string(), "*".to_string())));
assert!(meta.contains(&(
"content-security-policy".to_string(),
"default-src 'self'".to_string()
)));
assert!(meta.contains(&("x-frame-options".to_string(), "DENY".to_string())));
assert!(meta.contains(&("permissions-policy".to_string(), "camera=()".to_string())));
assert!(meta.contains(&(
"cross-origin-resource-policy".to_string(),
"same-origin".to_string()
)));
assert!(
!has("x-meta-author"),
"x-meta-* is opt-in via ELASTIK_PERSIST_HEADERS"
);
assert!(
!has("x-future-http-thing"),
"unknown x- headers default-deny"
);
for name in [
"authorization",
"proxy-authorization",
"cookie",
"set-cookie",
"host",
"connection",
"keep-alive",
"transfer-encoding",
"te",
"trailer",
"upgrade",
"http2-settings",
"content-type",
"content-length",
"etag",
"allow",
"location",
"link",
"www-authenticate",
"accept",
"accept-encoding",
"accept-language",
"accept-charset",
"range",
"if-match",
"if-none-match",
"if-range",
"if-modified-since",
"if-unmodified-since",
"expect",
"sec-fetch-mode",
"sec-ch-ua",
"dnt",
"origin",
"referer",
"user-agent",
"server",
"date",
"age",
"vary",
"x-request-id",
"x-elapsed-us",
"x-elapsed-ms",
"x-content-type-options",
"via",
"forwarded",
"x-forwarded-for",
"x-forwarded-host",
"x-forwarded-proto",
"x-real-ip",
"true-client-ip",
"client-ip",
"clear-site-data",
"traceparent",
"tracestate",
"baggage",
"b3",
"x-b3-traceid",
"x-b3-spanid",
"x-b3-sampled",
"x-amzn-trace-id",
"cf-ray",
"cf-connecting-ip",
"cf-visitor",
"http3-settings",
"pragma",
] {
assert!(!has(name), "{name} should not be persisted");
}
}
#[test]
fn request_meta_headers_deduplicate_repeated_names_last_wins() {
let mut headers = HeaderMap::new();
headers.append("x-meta-author", HeaderValue::from_static("alice"));
headers.append("x-meta-author", HeaderValue::from_static("bob"));
let allowlist = hs::HeaderAllowlist::parse("x-meta-*");
let user_deny = hs::HeaderAllowlist::empty();
let meta = hs::request_meta_headers(&headers, &allowlist, &user_deny);
assert_eq!(meta, vec![("x-meta-author".to_string(), "bob".to_string())]);
}
#[test]
fn request_meta_headers_user_allowlist_persists_custom_headers() {
let mut headers = HeaderMap::new();
headers.insert("x-author", HeaderValue::from_static("ranger"));
headers.insert("x-version", HeaderValue::from_static("7.1.0"));
headers.insert("x-my-tag", HeaderValue::from_static("custom"));
headers.insert("x-my-region", HeaderValue::from_static("ap-east-1"));
headers.insert("x-other", HeaderValue::from_static("not-allowlisted"));
headers.insert(
"traceparent",
HeaderValue::from_static("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"),
);
headers.insert(
header::CACHE_CONTROL,
HeaderValue::from_static("max-age=60"),
);
let allowlist = hs::HeaderAllowlist::parse("x-author,x-version,x-my-*,traceparent");
let user_deny = hs::HeaderAllowlist::empty();
let meta = hs::request_meta_headers(&headers, &allowlist, &user_deny);
let has = |name: &str| meta.iter().any(|(n, _)| n == name);
assert!(has("x-author"));
assert!(has("x-version"));
assert!(has("x-my-tag"));
assert!(has("x-my-region"));
assert!(!has("x-other"));
assert!(
!has("traceparent"),
"L1 hard deny must override user allowlist; tracing context never persists"
);
assert!(has("cache-control"));
}
#[test]
fn request_meta_headers_user_deny_subtracts_from_default_allow() {
let mut headers = HeaderMap::new();
headers.insert(
header::CACHE_CONTROL,
HeaderValue::from_static("max-age=60"),
);
headers.insert(header::CONTENT_ENCODING, HeaderValue::from_static("gzip"));
headers.insert("permissions-policy", HeaderValue::from_static("camera=()"));
headers.insert("x-author", HeaderValue::from_static("ranger"));
let allowlist = hs::HeaderAllowlist::parse("x-author");
let user_deny = hs::HeaderAllowlist::parse("cache-control,permissions-policy,x-author");
let meta = hs::request_meta_headers(&headers, &allowlist, &user_deny);
let has = |name: &str| meta.iter().any(|(n, _)| n == name);
assert!(
!has("cache-control"),
"user-deny must subtract from L2 default-allow"
);
assert!(!has("permissions-policy"));
assert!(has("content-encoding"));
assert!(
!has("x-author"),
"user-deny must override user-allow when both match"
);
}
#[test]
fn request_meta_headers_is_case_insensitive_per_rfc_7230() {
let mut headers = HeaderMap::new();
headers.insert("X-Author", HeaderValue::from_static("ranger"));
headers.insert("CACHE-CONTROL", HeaderValue::from_static("max-age=60"));
headers.insert("Traceparent", HeaderValue::from_static("00-...-01"));
headers.insert("X-Forwarded-For", HeaderValue::from_static("203.0.113.7"));
let allowlist = hs::HeaderAllowlist::parse("X-AUTHOR, X-Version");
let user_deny = hs::HeaderAllowlist::empty();
let meta = hs::request_meta_headers(&headers, &allowlist, &user_deny);
let has = |name: &str| meta.iter().any(|(n, _)| n == name);
assert!(has("x-author"), "X-Author allowlisted as X-AUTHOR persists");
assert!(has("cache-control"));
assert!(!has("traceparent"));
assert!(!has("x-forwarded-for"));
for (name, _) in &meta {
assert_eq!(
name,
&name.to_ascii_lowercase(),
"stored meta keys must be lowercase"
);
}
}
#[test]
fn header_allowlist_parser_handles_whitespace_dedup_and_wildcards() {
let allow = hs::HeaderAllowlist::parse(
" X-Author , x-version, x-my-*, x-version, , *, x-my-* , x-AUTHOR",
);
assert!(allow.matches("x-author"));
assert!(allow.matches("x-version"));
assert!(allow.matches("x-my-tag"));
assert!(allow.matches("x-my-region"));
assert!(!allow.matches("x-other"));
assert!(!allow.matches(""));
assert!(!allow.matches("anything-else"));
assert!(hs::HeaderAllowlist::parse("").is_empty());
assert!(hs::HeaderAllowlist::parse(" , ,").is_empty());
}
#[tokio::test]
async fn get_returns_stored_standard_representation_headers() {
let (core, dir) = test_core("representation-headers");
let headers = vec![
("content-encoding".to_string(), "gzip".to_string()),
(
"content-disposition".to_string(),
"attachment; filename=\"report.pdf\"".to_string(),
),
("access-control-allow-origin".to_string(), "*".to_string()),
(
"content-security-policy".to_string(),
"default-src 'self'".to_string(),
),
("x-frame-options".to_string(), "DENY".to_string()),
];
core.write_world(
"home/gzip",
b"compressed bytes",
"application/pdf",
&headers,
)
.unwrap();
let req_headers = HeaderMap::new();
let resp = unwrap_response(
execute_get(
req_headers.clone(),
auth::Tier::Anon,
"home/gzip".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.headers().get(header::CONTENT_ENCODING).unwrap(),
"gzip"
);
assert_eq!(
resp.headers().get(header::CONTENT_DISPOSITION).unwrap(),
"attachment; filename=\"report.pdf\""
);
assert_eq!(
resp.headers().get("access-control-allow-origin").unwrap(),
"*"
);
assert_eq!(
resp.headers().get("content-security-policy").unwrap(),
"default-src 'self'"
);
assert_eq!(resp.headers().get("x-frame-options").unwrap(), "DENY");
let _ = std::fs::remove_dir_all(dir);
}
#[test]
fn worlds_store_content_type_not_private_extensions() {
let (core, dir) = test_core("content-type");
core.write_world("home/pdf", b"%PDF-1.7", "application/pdf", &[])
.unwrap();
let stage = core.read_world("home/pdf").unwrap().unwrap();
assert_eq!(stage.content_type, "application/pdf");
assert_eq!(stage.body, b"%PDF-1.7");
let _ = std::fs::remove_dir_all(dir);
}
#[test]
fn storage_prefix_routes_memory_and_disk_modes() {
assert!(!store::is_memory_world("home/report"));
assert!(!store::is_memory_world("etc/config"));
assert!(store::is_memory_world("tmp/scratch"));
assert!(store::is_memory_world("dev/fb0"));
assert!(store::is_memory_world("sys/status"));
assert!(store::is_persistent("home/report"));
assert!(!store::is_persistent("tmp/scratch"));
}
#[test]
fn memory_worlds_do_not_create_sqlite_files_or_audit_chain() {
let (core, dir) = test_core("memory-world");
core.write_world(
"tmp/scratch",
b"draft",
"text/plain; charset=utf-8",
&[("x-meta-owner".to_string(), "agent".to_string())],
)
.unwrap();
let stage = core.read_world("tmp/scratch").unwrap().unwrap();
assert_eq!(stage.body, b"draft");
assert_eq!(stage.content_type, "text/plain; charset=utf-8");
assert_eq!(
stage.headers,
vec![("x-meta-owner".to_string(), "agent".to_string())]
);
assert!(!world::world_db(&core.data, "tmp/scratch").exists());
assert!(audit::latest_hmac(&core.data, "tmp/scratch").is_none());
let names = store::list_all(&core.data, &core.mem);
assert_eq!(names.unwrap(), vec!["tmp/scratch".to_string()]);
let _ = std::fs::remove_dir_all(dir);
}
#[test]
fn disk_worlds_create_sqlite_files_and_audit_chain_when_using_audit_path() {
let (core, dir) = test_core("disk-world");
let h = world::write_with_audit(
&core.data,
"home/report",
b"final",
"text/plain; charset=utf-8",
&[],
&core.hmac_key,
)
.unwrap();
let stage = core.read_world("home/report").unwrap().unwrap();
assert_eq!(stage.body, b"final");
assert!(world::world_db(&core.data, "home/report").exists());
assert_eq!(audit::latest_hmac(&core.data, "home/report"), Some(h));
let names = store::list_all(&core.data, &core.mem);
assert_eq!(names.unwrap(), vec!["home/report".to_string()]);
let _ = std::fs::remove_dir_all(dir);
}
#[test]
fn audit_keeps_historical_metadata_without_json_payload() {
let (core, dir) = test_core("audit-meta");
let headers = vec![("x-meta-author".to_string(), "ranger".to_string())];
let h = world::write_with_audit(
&core.data,
"home/audit-meta",
b"hello",
"text/plain; charset=utf-8",
&headers,
&core.hmac_key,
)
.unwrap();
let c = rusqlite::Connection::open(world::world_db(&core.data, "home/audit-meta")).unwrap();
let (content_type, meta_sha256): (String, String) = c
.query_row(
"SELECT content_type, meta_sha256 FROM events WHERE hmac=?",
[h],
|r| Ok((r.get(0)?, r.get(1)?)),
)
.unwrap();
assert_eq!(content_type, "text/plain; charset=utf-8");
assert_eq!(
meta_sha256,
audit::meta_sha256("text/plain; charset=utf-8", &headers)
);
let author: String = c
.query_row(
"SELECT value FROM event_headers WHERE name='x-meta-author'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(author, "ranger");
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn post_audit_uses_existing_representation_metadata() {
let (core, dir) = test_core("post-audit-meta");
let headers = vec![
("content-encoding".to_string(), "gzip".to_string()),
("x-meta-author".to_string(), "ranger".to_string()),
];
world::write_with_audit(
&core.data,
"home/post-audit-meta",
b"hello",
"text/plain",
&headers,
&core.hmac_key,
)
.unwrap();
let mut req_headers = HeaderMap::new();
req_headers.insert(
header::CONTENT_TYPE,
HeaderValue::from_static("application/pdf"),
);
req_headers.insert(header::CONTENT_LANGUAGE, HeaderValue::from_static("zh-CN"));
let resp = unwrap_response(
execute_post(
req_headers.clone(),
Bytes::from_static(b" world"),
auth::Tier::Write,
"home/post-audit-meta".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(resp.status(), StatusCode::OK);
let c = rusqlite::Connection::open(world::world_db(&core.data, "home/post-audit-meta"))
.unwrap();
let (content_type, meta_sha256): (String, String) = c
.query_row(
"SELECT content_type, meta_sha256 FROM events WHERE event_type='append'",
[],
|r| Ok((r.get(0)?, r.get(1)?)),
)
.unwrap();
assert_eq!(content_type, "text/plain");
assert_eq!(meta_sha256, audit::meta_sha256("text/plain", &headers));
let language_count: i64 = c
.query_row(
"SELECT COUNT(*) FROM event_headers WHERE name='content-language'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(language_count, 0);
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn delete_honors_if_match_before_audit_or_remove() {
let (core, dir) = test_core("delete-if-match");
let h = world::write_with_audit(
&core.data,
"home/delete-cas",
b"alive",
"text/plain; charset=utf-8",
&[],
&core.hmac_key,
)
.unwrap();
let mut stale = HeaderMap::new();
stale.insert(header::IF_MATCH, HeaderValue::from_static("\"hmac-stale\""));
let resp = unwrap_response(
execute_delete(
stale.clone(),
auth::Tier::Approve,
"home/delete-cas".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(resp.status(), StatusCode::PRECONDITION_FAILED);
assert!(core.read_world("home/delete-cas").unwrap().is_some());
let mut good = HeaderMap::new();
good.insert(
header::IF_MATCH,
HeaderValue::from_str(&format!("\"{}\"", hs::hmac_etag(&h))).unwrap(),
);
let resp = unwrap_response(
execute_delete(
good.clone(),
auth::Tier::Approve,
"home/delete-cas".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(resp.status(), StatusCode::NO_CONTENT);
assert!(core.read_world("home/delete-cas").unwrap().is_none());
assert!(core.read_world("var/log/deletes").unwrap().is_some());
assert!(matches!(
core.cached_verify_chain("var/log/deletes").unwrap(),
Some(audit::VerifyReport::Valid(_))
));
let ledger = world::open_existing(&core.data, "var/log/deletes")
.unwrap()
.unwrap();
let mut stmt = ledger
.prepare("SELECT event_type FROM events ORDER BY id")
.unwrap();
let events: Vec<String> = stmt
.query_map([], |r| r.get(0))
.unwrap()
.collect::<Result<_, _>>()
.unwrap();
assert_eq!(events, vec!["delete_intent", "delete_commit"]);
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn delete_rejects_auth_token_and_append_only_ledger() {
let (core, dir) = test_core("delete-policy");
world::write_with_audit(
&core.data,
"home/delete-policy",
b"alive",
"text/plain; charset=utf-8",
&[],
&core.hmac_key,
)
.unwrap();
world::write_with_audit(
&core.data,
"var/log/deletes",
b"ledger",
"text/plain; charset=utf-8",
&[],
&core.hmac_key,
)
.unwrap();
let headers = HeaderMap::new();
let auth_delete = unwrap_response(
execute_delete(
headers.clone(),
auth::Tier::Write,
"home/delete-policy".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(auth_delete.status(), StatusCode::UNAUTHORIZED);
assert!(core.read_world("home/delete-policy").unwrap().is_some());
let ledger_delete = unwrap_response(
execute_delete(
headers.clone(),
auth::Tier::Approve,
"var/log/deletes".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(ledger_delete.status(), StatusCode::UNAUTHORIZED);
assert!(core.read_world("var/log/deletes").unwrap().is_some());
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn delete_missing_world_does_not_write_delete_ledger() {
let (core, dir) = test_core("delete-missing");
let headers = HeaderMap::new();
let resp = unwrap_response(
execute_delete(
headers.clone(),
auth::Tier::Approve,
"home/missing".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
assert!(core.read_world("var/log/deletes").unwrap().is_none());
let _ = std::fs::remove_dir_all(dir);
}
#[test]
fn proc_worlds_body_is_plain_lines() {
assert_eq!(world_list_body(&[]), "");
assert_eq!(
world_list_body(&["home/a".to_owned(), "tmp/b".to_owned()]),
"home/a\ntmp/b\n"
);
}
#[tokio::test]
async fn proc_du_and_df_report_resource_usage() {
let (mut core, dir) = test_core("proc-du-df");
core.max_storage_bytes = Some(10);
core.write_world("home/hello", b"hello", "text/plain", &[])
.unwrap();
core.write_world("tmp/scratch", b"data", "text/plain", &[])
.unwrap();
let state = Arc::new(core);
let headers = HeaderMap::new();
let du = proc_du(State(state.clone()), Method::GET, headers.clone()).await;
assert_eq!(du.status(), StatusCode::OK);
let du_body = response_text(du).await;
assert!(du_body.contains("home/hello\t5\n"));
assert!(du_body.contains("tmp/scratch\t4\n"));
let df = proc_df(State(state.clone()), Method::GET, headers.clone()).await;
assert_eq!(df.status(), StatusCode::OK);
let df_body = response_text(df).await;
assert!(df_body.contains("storage\t5\t10\t5\n"));
assert!(df_body.contains("memory\t4\t268435456\t268435452\n"));
assert!(df_body.contains("worlds\t2\tunlimited\tunlimited\n"));
let head = proc_du(State(state), Method::HEAD, headers).await;
assert_eq!(head.status(), StatusCode::OK);
assert_eq!(head.headers().get(header::CONTENT_LENGTH).unwrap(), "27");
assert_eq!(response_text(head).await, "");
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn proc_du_and_df_require_read_token_when_enabled() {
let (mut core, dir) = test_core("proc-du-df-read-token");
core.tokens.read = Some(b"reader".to_vec());
let state = Arc::new(core);
let headers = HeaderMap::new();
let du = proc_du(State(state.clone()), Method::GET, headers.clone()).await;
assert_eq!(du.status(), StatusCode::UNAUTHORIZED);
let df = proc_df(State(state), Method::GET, headers).await;
assert_eq!(df.status(), StatusCode::UNAUTHORIZED);
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn proc_df_world_count_tracks_durable_put_and_delete() {
let (core, dir) = test_core("proc-df-world-count");
let headers = HeaderMap::new();
let put = unwrap_response(
execute_put(
headers.clone(),
Bytes::from_static(b"x"),
auth::Tier::Write,
"home/count".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(put.status(), StatusCode::CREATED);
let state = Arc::new(core);
let before = proc_df(State(state.clone()), Method::GET, headers.clone()).await;
assert!(response_text(before)
.await
.contains("worlds\t1\tunlimited\tunlimited\n"));
let delete = unwrap_response(
execute_delete(
headers.clone(),
auth::Tier::Approve,
"home/count".to_string(),
&state,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(delete.status(), StatusCode::NO_CONTENT);
let after = proc_df(State(state), Method::GET, headers).await;
let after_body = response_text(after).await;
assert!(after_body.contains("storage\t0\tunlimited\tunlimited\n"));
assert!(after_body.contains("worlds\t0\tunlimited\tunlimited\n"));
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn proc_pool_emits_metrics_with_type_labels() {
let (core, dir) = test_core("proc-pool-metrics");
let headers = HeaderMap::new();
let put = unwrap_response(
execute_put(
headers.clone(),
Bytes::from_static(b"hello"),
auth::Tier::Write,
"home/m".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(put.status(), StatusCode::CREATED);
for _ in 0..2 {
let get = unwrap_response(
execute_get(
headers.clone(),
auth::Tier::Read,
"home/m".to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(get.status(), StatusCode::OK);
}
let state = Arc::new(core);
let resp = proc_pool(State(state.clone()), Method::GET, headers.clone()).await;
let body = response_text(resp).await;
assert!(body.contains("read_cache_entries 1 snapshot\n"));
assert!(body.contains("read_cache_tombstones 0 snapshot\n"));
assert!(body.contains("read_cache_hits 1 counter\n"));
assert!(body.contains("read_cache_misses 1 counter\n"));
assert!(body.contains("read_cache_capped 0 counter\n"));
assert!(body.contains("read_cache_open_fails 0 counter\n"));
assert!(body.contains("read_cache_max_entries "));
assert!(body.contains("ledger_writer_inits 0 counter\n"));
let _ = unwrap_response(
execute_delete(
headers.clone(),
auth::Tier::Approve,
"home/m".to_string(),
&state,
&TraceCtx::disabled(),
)
.await,
);
let resp2 = proc_pool(State(state), Method::GET, headers).await;
let body2 = response_text(resp2).await;
assert!(
body2.contains("ledger_writer_inits 1 counter\n"),
"expected counter to bump to 1 after first DELETE; body=\n{body2}"
);
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn proc_pool_requires_read_token_when_enabled() {
let (mut core, dir) = test_core("proc-pool-read-token");
core.tokens.read = Some(b"reader".to_vec());
let state = Arc::new(core);
let headers = HeaderMap::new();
let resp = proc_pool(State(state), Method::GET, headers).await;
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn concurrent_first_deletes_increment_world_count_exactly_once() {
let (core, dir) = test_core("concurrent-first-deletes");
let headers = HeaderMap::new();
for w in ["home/a", "home/b", "home/c"] {
let put = unwrap_response(
execute_put(
headers.clone(),
Bytes::from_static(b"x"),
auth::Tier::Write,
w.to_string(),
&core,
&TraceCtx::disabled(),
)
.await,
);
assert_eq!(put.status(), StatusCode::CREATED);
}
assert_eq!(core.durable_world_count.load(Ordering::Relaxed), 3);
assert!(!core.delete_ledger_created.load(Ordering::Relaxed));
let state = Arc::new(core);
let s1 = state.clone();
let s2 = state.clone();
let s3 = state.clone();
let h1 = headers.clone();
let h2 = headers.clone();
let h3 = headers.clone();
let trace1 = TraceCtx::disabled();
let trace2 = TraceCtx::disabled();
let trace3 = TraceCtx::disabled();
let (r1, r2, r3) = tokio::join!(
execute_delete(h1, auth::Tier::Approve, "home/a".to_string(), &s1, &trace1),
execute_delete(h2, auth::Tier::Approve, "home/b".to_string(), &s2, &trace2),
execute_delete(h3, auth::Tier::Approve, "home/c".to_string(), &s3, &trace3),
);
assert_eq!(unwrap_response(r1).status(), StatusCode::NO_CONTENT);
assert_eq!(unwrap_response(r2).status(), StatusCode::NO_CONTENT);
assert_eq!(unwrap_response(r3).status(), StatusCode::NO_CONTENT);
assert_eq!(state.durable_world_count.load(Ordering::Relaxed), 1);
assert!(state.delete_ledger_created.load(Ordering::Relaxed));
let _ = std::fs::remove_dir_all(dir);
}
async fn response_text(resp: Response) -> String {
let bytes = axum::body::to_bytes(resp.into_body(), usize::MAX)
.await
.unwrap();
String::from_utf8(bytes.to_vec()).unwrap()
}
#[tokio::test]
async fn pipeline_get_existing_world_returns_200_with_body() {
let (core, dir) = test_core("pipeline-get-200");
core.write_world("home/hello", b"hello world", "text/plain", &[])
.unwrap();
let core = Arc::new(core);
let resp = pipeline::run(
Method::GET,
"/home/hello".to_string(),
HeaderMap::new(),
Bytes::new(),
&core,
42,
)
.await;
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.headers()
.get(header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok()),
Some("text/plain")
);
assert_eq!(response_text(resp).await, "hello world");
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn pipeline_head_existing_world_returns_headers_no_body() {
let (core, dir) = test_core("pipeline-head-200");
core.write_world("home/hello", b"hello world", "text/plain", &[])
.unwrap();
let core = Arc::new(core);
let resp = pipeline::run(
Method::HEAD,
"/home/hello".to_string(),
HeaderMap::new(),
Bytes::new(),
&core,
43,
)
.await;
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.headers()
.get(header::CONTENT_LENGTH)
.and_then(|v| v.to_str().ok()),
Some("11")
);
assert_eq!(response_text(resp).await, "");
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn pipeline_get_nonexistent_world_returns_404() {
let (core, dir) = test_core("pipeline-get-404");
let core = Arc::new(core);
let resp = pipeline::run(
Method::GET,
"/home/missing".to_string(),
HeaderMap::new(),
Bytes::new(),
&core,
44,
)
.await;
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn pipeline_get_invalid_dot_segment_returns_400() {
let (core, dir) = test_core("pipeline-get-400");
let core = Arc::new(core);
let resp = pipeline::run(
Method::GET,
"/home/../etc/secret".to_string(),
HeaderMap::new(),
Bytes::new(),
&core,
45,
)
.await;
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn pipeline_get_with_read_token_required_rejects_anon() {
let (mut core, dir) = test_core("pipeline-get-401");
core.tokens.read = Some(b"reader".to_vec());
core.write_world("home/secret", b"shhh", "text/plain", &[])
.unwrap();
let core = Arc::new(core);
let resp = pipeline::run(
Method::GET,
"/home/secret".to_string(),
HeaderMap::new(), Bytes::new(),
&core,
46,
)
.await;
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn world_handler_get_routes_through_pipeline() {
use axum::body::Body;
use axum::http::Request as HttpRequest;
use tower::ServiceExt;
let (core, dir) = test_core("world-handler-get");
core.write_world("home/hello", b"hello world", "text/plain", &[])
.unwrap();
let core = Arc::new(core);
let app = Router::new()
.route("/*world", any(world_handler))
.layer(axum::middleware::from_fn_with_state(
core.clone(),
add_core_response_headers,
))
.with_state(core.clone());
let req = HttpRequest::builder()
.method("GET")
.uri("/home/hello")
.body(Body::empty())
.unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let req_id_header = resp
.headers()
.get("x-request-id")
.and_then(|v| v.to_str().ok())
.unwrap_or("missing");
assert!(
req_id_header.parse::<u64>().is_ok(),
"x-request-id should be a numeric id, got {req_id_header:?}"
);
assert_eq!(response_text(resp).await, "hello world");
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn world_handler_head_routes_through_pipeline() {
use axum::body::Body;
use axum::http::Request as HttpRequest;
use tower::ServiceExt;
let (core, dir) = test_core("world-handler-head");
core.write_world("home/hello", b"hello world", "text/plain", &[])
.unwrap();
let core = Arc::new(core);
let app = Router::new()
.route("/*world", any(world_handler))
.layer(axum::middleware::from_fn_with_state(
core.clone(),
add_core_response_headers,
))
.with_state(core.clone());
let req = HttpRequest::builder()
.method("HEAD")
.uri("/home/hello")
.body(Body::empty())
.unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.headers()
.get(header::CONTENT_LENGTH)
.and_then(|v| v.to_str().ok()),
Some("11"),
);
assert!(resp.headers().get("x-request-id").is_some());
assert_eq!(response_text(resp).await, "");
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn pipeline_get_if_none_match_returns_304() {
let (core, dir) = test_core("pipeline-304");
core.write_world("home/cached", b"cached body", "text/plain", &[])
.unwrap();
let core = Arc::new(core);
let first = pipeline::run(
Method::GET,
"/home/cached".to_string(),
HeaderMap::new(),
Bytes::new(),
&core,
100,
)
.await;
let etag = first
.headers()
.get(header::ETAG)
.and_then(|v| v.to_str().ok())
.expect("first GET must return an ETag header")
.to_string();
let mut headers = HeaderMap::new();
headers.insert(header::IF_NONE_MATCH, HeaderValue::from_str(&etag).unwrap());
let resp = pipeline::run(
Method::GET,
"/home/cached".to_string(),
headers,
Bytes::new(),
&core,
101,
)
.await;
assert_eq!(resp.status(), StatusCode::NOT_MODIFIED);
assert_eq!(response_text(resp).await, "");
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn pipeline_get_out_of_range_returns_416_with_reason() {
let (core, dir) = test_core("pipeline-416");
core.write_world("home/short", b"abc", "text/plain", &[])
.unwrap();
let core = Arc::new(core);
let mut headers = HeaderMap::new();
headers.insert(header::RANGE, HeaderValue::from_static("bytes=999-"));
let resp = pipeline::run(
Method::GET,
"/home/short".to_string(),
headers,
Bytes::new(),
&core,
200,
)
.await;
assert_eq!(resp.status(), StatusCode::RANGE_NOT_SATISFIABLE);
let mut headers2 = HeaderMap::new();
headers2.insert(header::RANGE, HeaderValue::from_static("bytes=999-"));
let phase = crate::handler::execute(
Verb::Get,
headers2,
Bytes::new(),
auth::Tier::Anon,
"home/short".to_string(),
&core,
&TraceCtx::disabled(),
)
.await;
match phase {
Phase::Error {
reason: ErrorReason::RangeNotSatisfiable,
resp,
} => {
assert_eq!(resp.status(), StatusCode::RANGE_NOT_SATISFIABLE);
}
_ => panic!("expected Phase::Error{{RangeNotSatisfiable}}"),
}
let _ = std::fs::remove_dir_all(dir);
}
#[tokio::test]
async fn pipeline_get_range_returns_206_with_chunk() {
let (core, dir) = test_core("pipeline-get-range");
core.write_world("home/range", b"abcdef", "text/plain", &[])
.unwrap();
let core = Arc::new(core);
let mut headers = HeaderMap::new();
headers.insert(header::RANGE, HeaderValue::from_static("bytes=1-3"));
let resp = pipeline::run(
Method::GET,
"/home/range".to_string(),
headers,
Bytes::new(),
&core,
47,
)
.await;
assert_eq!(resp.status(), StatusCode::PARTIAL_CONTENT);
assert_eq!(
resp.headers()
.get(header::CONTENT_RANGE)
.and_then(|v| v.to_str().ok()),
Some("bytes 1-3/6")
);
assert_eq!(response_text(resp).await, "bcd");
let _ = std::fs::remove_dir_all(dir);
}
fn test_core(label: &str) -> (Core, PathBuf) {
let mut dir = std::env::temp_dir();
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
dir.push(format!(
"elastik-core-{label}-{}-{nanos}",
std::process::id()
));
std::fs::create_dir_all(&dir).unwrap();
(
{
let (events, _) = broadcast::channel(16);
Core {
data: dir.clone(),
tokens: auth::Tokens {
read: None,
write: None,
approve: None,
},
hmac_key: b"test-key".to_vec(),
mem: Arc::new(store::MemoryStore::new()),
max_world_bytes: DEFAULT_MAX_WORLD_BYTES,
max_memory_bytes: DEFAULT_MAX_MEMORY_BYTES,
max_storage_bytes: None,
storage_body_bytes: Arc::new(AtomicUsize::new(0)),
durable_world_count: Arc::new(AtomicUsize::new(0)),
delete_ledger_created: Arc::new(AtomicBool::new(false)),
events,
listen_slots: Arc::new(Semaphore::new(DEFAULT_MAX_LISTEN_CONNECTIONS)),
listen_replay_max: DEFAULT_LISTEN_REPLAY_MAX,
event_log: Arc::new(StdMutex::new(VecDeque::with_capacity(
DEFAULT_LISTEN_REPLAY_MAX,
))),
shutdown: watch::channel(false).1,
next_event: Arc::new(AtomicU64::new(0)),
next_request: Arc::new(AtomicU64::new(0)),
world_locks: Arc::new(DashMap::new()),
ledger: Arc::new(crate::ledger::LedgerWriter::new()),
read_cache: Arc::new(crate::read_cache::ReadCache::new(
crate::read_cache::DEFAULT_READ_CACHE_MAX_ENTRIES,
)),
persist_header_allowlist: Arc::new(
crate::http_semantics::HeaderAllowlist::empty(),
),
persist_header_user_deny: Arc::new(
crate::http_semantics::HeaderAllowlist::empty(),
),
}
},
dir,
)
}
}