Skip to main content

elastik_core/
lib.rs

1//! # Elastik — Audi-ted L5 Storage Engine
2//!
3//! `elastik-core` is a protocol-neutral storage engine: canonical paths, opaque
4//! bytes, content-addressed versioning, an HMAC-chained audit log, and a
5//! four-tier access model. **SQLite for files.**
6//!
7//! ## Quick start
8//!
9//! ```no_run
10//! # #[cfg(feature = "unstable-engine")]
11//! # async fn run() {
12//! use elastik_core::{
13//!     AccessTier, Engine, Preconditions, Representation, SecretBytes, ValidatedWorldPath,
14//! };
15//! use bytes::Bytes;
16//!
17//! let engine = Engine::builder()
18//!     .data_root("./data")
19//!     .key(SecretBytes::new(b"shared-hmac-secret".to_vec()).expect("hmac key"))
20//!     .build()
21//!     .expect("engine builds");
22//!
23//! let world = ValidatedWorldPath::new("home/hello").expect("canonical path");
24//!
25//! // Store bytes at a path.
26//! engine
27//!     .replace(
28//!         &world,
29//!         Representation::new(Bytes::from_static(b"hi"), "text/plain", Vec::new()),
30//!         Preconditions::none(),
31//!         AccessTier::Write,
32//!     )
33//!     .await
34//!     .expect("write succeeds");
35//!
36//! // Retrieve bytes by path.
37//! let read = engine.read(&world, AccessTier::Read).expect("read succeeds");
38//! assert!(read.is_some());
39//! # }
40//! ```
41//!
42//! ## What the library does
43//!
44//! - **Bytes at paths.** Canonical `home/`, `tmp/`, `dev/`, `sys/`, `etc/`,
45//!   `lib/`, `boot/`, `usr/`, `var/` namespaces decide durable-vs-transient
46//!   without per-call configuration.
47//! - **Versions everything.** Every successful write returns an ETag; reads,
48//!   replaces, and appends honour `Preconditions::if_match` / `if_none_match`.
49//! - **Audits everything.** HMAC-chained ledger; `Engine::verify_audit`
50//!   returns a typed [`AuditVerify`] result and refuses to start when an
51//!   existing chain is corrupted.
52//! - **Authenticates everything.** [`AccessTier`] (Anon / Read / Write /
53//!   Approve) plus token-bytes verification via [`Engine::verify_token`].
54//! - **Subscribes to changes.** [`Engine::subscribe`] returns an
55//!   [`EngineSubscription`] with replay-then-live ordering.
56//!
57//! ## What the library does *not* do
58//!
59//! No HTTP, no CoAP, no SSE, no server runtime. Those live in the
60//! `elastik-core` binary and consume this library through the unstable public
61//! [`Engine`] API. In a minimal library-only build, the library does not read
62//! environment variables, does not bind sockets, and does not depend on `axum`,
63//! `hyper`, `tower`, `tokio-stream`, `futures-util`, or `base64`.
64//!
65//! ## Feature flags
66//!
67//! - `bundled-sqlite` *(default)* — link a bundled SQLite via `rusqlite/bundled`.
68//! - `coap` *(default)* — enable the CoAP adapter inside the binary.
69//! - `multi-thread` *(default)* — enable Tokio's multi-thread runtime for the
70//!   binary.
71//! - `unstable-engine` — expose the public [`Engine`] facade. **The API shape
72//!   is allowed to change between minor versions while this gate stays.**
73//! - `unstable-engine-bin` *(default)* — superset that adds `axum`, `base64`,
74//!   `futures-util`, Tokio `net`/`signal`, and `tracing-subscriber`; the
75//!   `elastik-core` binary requires this feature.
76//!
77//! Minimal library-only build: `cargo build --lib --no-default-features
78//! --features bundled-sqlite,unstable-engine`.
79mod audit;
80mod auth;
81#[cfg(test)]
82#[cfg(feature = "coap")]
83#[path = "server/coap.rs"]
84mod coap;
85#[cfg(test)]
86#[cfg(feature = "coap")]
87#[path = "server/coap_errors.rs"]
88mod coap_errors;
89#[cfg(test)]
90mod config;
91mod defaults;
92mod delete_ops;
93mod engine;
94mod engine_introspection;
95mod engine_ops;
96mod engine_trace;
97mod engine_types;
98mod etag;
99mod event;
100#[cfg(test)]
101#[path = "server/handler.rs"]
102mod handler;
103#[cfg(test)]
104mod http_range;
105#[cfg(test)]
106mod http_semantics;
107mod ledger;
108#[cfg(test)]
109#[path = "server/listen.rs"]
110mod listen;
111#[cfg(test)]
112#[path = "server/middleware.rs"]
113mod middleware;
114mod path;
115#[cfg(test)]
116#[path = "server/pipeline.rs"]
117mod pipeline;
118#[cfg(test)]
119#[path = "server/proc.rs"]
120mod proc;
121mod read_cache;
122#[cfg(test)]
123#[path = "server/response.rs"]
124mod response;
125#[cfg(test)]
126#[path = "server/route.rs"]
127mod route;
128#[cfg(test)]
129mod server;
130mod state;
131mod storage_class;
132mod store;
133mod world;
134mod world_ops;
135
136// Re-export protocol-neutral helpers at the crate root. HTTP adapter modules
137// are only compiled here for legacy white-box tests; production binary builds
138// own their adapter helpers from `main.rs`.
139#[cfg(test)]
140pub(crate) use crate::path::*;
141#[cfg(test)]
142pub(crate) use crate::pipeline::*;
143#[cfg(test)]
144pub(crate) use crate::proc::*;
145#[cfg(test)]
146pub(crate) use crate::response::*;
147pub(crate) use crate::state::*;
148pub(crate) use crate::storage_class::*;
149#[cfg(feature = "unstable-engine")]
150pub use auth::AuthGate;
151#[cfg(not(feature = "unstable-engine"))]
152pub(crate) use auth::AuthGate;
153#[cfg(feature = "unstable-engine")]
154#[doc(hidden)]
155pub use engine::ShutdownToken;
156#[cfg(feature = "unstable-engine")]
157pub use engine::{Engine, EngineBuildError, EngineBuilder, EngineError};
158#[cfg(feature = "unstable-engine")]
159pub use engine_introspection::{
160    AuditBroken, AuditValid, AuditVerify, DfSnapshot, InvalidProcPath, PoolSnapshot, ProcEndpoint,
161    ValidatedProcPath, WorldUsage,
162};
163#[cfg(feature = "unstable-engine")]
164pub use engine_trace::{DeleteMetadata, EngineDeleteTraceHooks, EngineWriteTraceHooks};
165#[cfg(feature = "unstable-engine")]
166pub use engine_types::{
167    parse_etag_matchers, AccessTier, ChangeEvent, EmptyKeyError, EngineSubscription, EtagMatcher,
168    InvalidWorldPath, Preconditions, ReadResult, Representation, SecretBytes, SubscribePattern,
169    SubscriptionRecvError, ValidatedWorldPath, WriteKind, WriteResult,
170};
171
172use std::path::Path;
173use std::time::Duration;
174
175#[cfg(test)]
176pub(crate) use crate::defaults::{
177    DEFAULT_LISTEN_REPLAY_MAX, DEFAULT_MAX_LISTEN_CONNECTIONS, DEFAULT_MAX_MEMORY_BYTES,
178    DEFAULT_MAX_WORLD_BYTES,
179};
180
181// Test-only HTTP constants for legacy white-box tests. Production constants
182// live on the binary side.
183#[cfg(test)]
184pub(crate) const VERSION: &str = env!("CARGO_PKG_VERSION");
185#[cfg(test)]
186pub(crate) const WORLD_ALLOW: &str = "GET, HEAD, PUT, POST, DELETE, OPTIONS";
187
188fn acquire_data_root_writer_lock(data: &Path) -> rusqlite::Result<rusqlite::Connection> {
189    let c = rusqlite::Connection::open(data.join(".elastik-writer-lock.sqlite3"))?;
190    c.busy_timeout(Duration::from_millis(0))?;
191    c.execute_batch(
192        r#"
193        PRAGMA journal_mode=WAL;
194        CREATE TABLE IF NOT EXISTS writer_lock(
195            id INTEGER PRIMARY KEY CHECK(id=1),
196            holder TEXT NOT NULL DEFAULT ''
197        );
198        INSERT OR IGNORE INTO writer_lock(id, holder) VALUES(1, '');
199        BEGIN IMMEDIATE;
200        "#,
201    )?;
202    c.execute(
203        "UPDATE writer_lock SET holder=?1 WHERE id=1",
204        [std::process::id().to_string()],
205    )?;
206    Ok(c)
207}
208
209// Legacy white-box tests still compile the HTTP route modules through this
210// library crate. Production builds compile those modules only from `main.rs`.
211
212// ─── /<world> all five methods ──────────────────────────────────────
213// Path validation lives in `path.rs`. Adapter-side canonicalization lives in
214// `server/path.rs` and is not part of the production library.
215
216// ─── helpers ────────────────────────────────────────────────────────
217
218pub(crate) fn can_write(world_name: &str, tier: auth::Tier) -> bool {
219    // Harvard gate: /lib/, /etc/, /boot/, /usr/, and audit logs
220    // require approve. /home/, /tmp/, /dev/, /sys/, and non-log
221    // /var/ worlds accept the normal token. Anon refused.
222    let needs_approve = needs_write_approve(world_name);
223    match tier {
224        auth::Tier::Anon => false,
225        auth::Tier::Read => false,
226        auth::Tier::Write => !needs_approve,
227        auth::Tier::Approve => true,
228    }
229}
230
231/// Mirrors the harvard-gate decision in `can_write`. Lifted out so
232/// `handler::execute_put` / `execute_post` can classify a rejected
233/// write as `Auth(Write)` vs `Auth(WriteApprove)` for the FSM trace
234/// without re-deriving the predicate.
235pub(crate) fn needs_write_approve(world_name: &str) -> bool {
236    exact_or_child(world_name, "lib")
237        || exact_or_child(world_name, "etc")
238        || exact_or_child(world_name, "boot")
239        || exact_or_child(world_name, "usr")
240        || exact_or_child(world_name, "var/log")
241}
242
243pub(crate) fn can_delete(tier: auth::Tier) -> bool {
244    matches!(tier, auth::Tier::Approve)
245}
246
247// (memory_write_projected_bytes / memory_append_projected_bytes were
248// removed: the snapshot-based projection they computed could be observed
249// by two concurrent writers before either had committed, letting them
250// both pass and overshoot max_memory_bytes once the global write_lock was
251// gone. Quota is now enforced inside MemoryStore::write_with_quota /
252// append_with_quota, atomically with the write itself.)
253
254// `require_read` (auth gate for proc handlers) lives in proc.rs now,
255// next to its only callers.
256
257// Response constructors (not_found, unauthorized, bad_request,
258// payload_too_large, insufficient_storage,
259// storage_temporarily_unavailable, storage_quota_exceeded,
260// options_response, method_not_allowed, precondition_failed,
261// server_error, storage_error, is_insufficient_storage_error,
262// is_transient_storage_error, audit_valid, audit_broken,
263// audit_header_value, audit_not_applicable, proc_text_response,
264// du_body, df_body, world_list_body,
265// to_header_map) live in `response.rs` and are re-exported at the
266// crate root. Helpers below use them through that re-export.
267
268pub(crate) fn exact_or_child(world_name: &str, prefix: &str) -> bool {
269    world_name == prefix
270        || world_name
271            .strip_prefix(prefix)
272            .is_some_and(|rest| rest.starts_with('/'))
273}
274
275pub(crate) fn can_read(core: &Core, tier: auth::Tier) -> bool {
276    !core.tokens.read_required()
277        || matches!(
278            tier,
279            auth::Tier::Read | auth::Tier::Write | auth::Tier::Approve
280        )
281}
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286    #[cfg(feature = "coap")]
287    use crate::config::coap_bind_from_env;
288    use crate::config::{
289        env_nonzero_usize, env_optional_usize, hmac_key_from_env_value, listen_addr,
290        should_warn_public_read,
291    };
292    use crate::etag as et;
293    use crate::handler::{execute_delete, execute_get, execute_head, execute_post, execute_put};
294    use crate::http_semantics as hs;
295    use crate::middleware::{add_server_response_headers, stamp_core_response_headers};
296    use crate::route::world_handler;
297    use axum::body::Bytes;
298    use axum::extract::{Path as AxPath, State};
299    use axum::http::{header, HeaderMap, HeaderValue, Method, StatusCode};
300    use axum::response::Response;
301    use axum::routing::any;
302    use axum::Router;
303    use dashmap::DashMap;
304    use std::collections::VecDeque;
305    use std::net::IpAddr;
306    use std::path::PathBuf;
307    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
308    use std::sync::{Arc, Mutex as StdMutex, Mutex as TestMutex, OnceLock};
309    use tokio::sync::{broadcast, watch, Semaphore};
310
311    fn server_state_for_tests(core: Arc<Core>) -> crate::server::ServerState {
312        crate::server::ServerState::from_core_for_tests(core, DEFAULT_MAX_WORLD_BYTES)
313    }
314
315    /// PR 4c: 50 unit tests below were renamed mechanically from
316    /// `handle_*` (sync `&Core -> Response`) to `execute_*` (async
317    /// `&Core, &TraceCtx -> Phase`). The verb-handler entry point
318    /// returns `Phase`, but the assertions all operate on the
319    /// underlying `Response`. This helper unwraps the three terminal
320    /// `Phase` variants `execute_*` can return so tests can keep
321    /// asserting on `.status()` / `.headers()` without scaffolding
322    /// per call site.
323    fn unwrap_response(phase: Phase) -> Response {
324        match phase {
325            Phase::ExecutedRead(r) | Phase::CommittedWrite(r) | Phase::Done(r) => r,
326            Phase::Error { resp, .. } => resp,
327            // execute_* never returns a non-terminal Phase; reaching
328            // any of these would be a bug in the handler module.
329            Phase::Received { .. }
330            | Phase::Authenticated { .. }
331            | Phase::PathValidated { .. }
332            | Phase::Dispatched { .. } => {
333                panic!("execute_* returned a non-terminal Phase variant")
334            }
335        }
336    }
337
338    fn world_path(world: &str) -> crate::engine_types::ValidatedWorldPath {
339        crate::engine_types::ValidatedWorldPath::new(world).unwrap()
340    }
341
342    fn env_lock() -> &'static TestMutex<()> {
343        static LOCK: OnceLock<TestMutex<()>> = OnceLock::new();
344        LOCK.get_or_init(|| TestMutex::new(()))
345    }
346
347    #[cfg(feature = "coap")]
348    struct CoapEnvGuard {
349        host: Option<String>,
350        port: Option<String>,
351    }
352
353    #[cfg(feature = "coap")]
354    impl CoapEnvGuard {
355        fn capture() -> Self {
356            Self {
357                host: std::env::var("ELASTIK_COAP_HOST").ok(),
358                port: std::env::var("ELASTIK_COAP_PORT").ok(),
359            }
360        }
361    }
362
363    #[cfg(feature = "coap")]
364    impl Drop for CoapEnvGuard {
365        fn drop(&mut self) {
366            match &self.host {
367                Some(v) => std::env::set_var("ELASTIK_COAP_HOST", v),
368                None => std::env::remove_var("ELASTIK_COAP_HOST"),
369            }
370            match &self.port {
371                Some(v) => std::env::set_var("ELASTIK_COAP_PORT", v),
372                None => std::env::remove_var("ELASTIK_COAP_PORT"),
373            }
374        }
375    }
376
377    #[test]
378    fn hmac_key_requires_nonempty_semantic_content() {
379        assert!(hmac_key_from_env_value(None).is_none());
380        assert!(hmac_key_from_env_value(Some(String::new())).is_none());
381        assert!(hmac_key_from_env_value(Some(" \t\n".to_string())).is_none());
382        assert_eq!(
383            hmac_key_from_env_value(Some(" secret ".to_string())).unwrap(),
384            b" secret ".to_vec()
385        );
386    }
387
388    #[test]
389    fn resource_cap_env_zero_falls_back_to_default() {
390        let _guard = env_lock().lock().unwrap();
391        let key = format!("ELASTIK_TEST_ZERO_CAP_{}", std::process::id());
392        std::env::set_var(&key, "0");
393        assert_eq!(env_nonzero_usize(&key, 7), 7);
394        std::env::set_var(&key, "9");
395        assert_eq!(env_nonzero_usize(&key, 7), 9);
396        std::env::remove_var(&key);
397    }
398
399    #[test]
400    fn optional_storage_quota_zero_is_unlimited() {
401        let _guard = env_lock().lock().unwrap();
402        let key = format!("ELASTIK_TEST_STORAGE_CAP_{}", std::process::id());
403        std::env::remove_var(&key);
404        assert_eq!(env_optional_usize(&key), None);
405        std::env::set_var(&key, "");
406        assert_eq!(env_optional_usize(&key), None);
407        std::env::set_var(&key, " \t ");
408        assert_eq!(env_optional_usize(&key), None);
409        std::env::set_var(&key, "0");
410        assert_eq!(env_optional_usize(&key), None);
411        std::env::set_var(&key, "11");
412        assert_eq!(env_optional_usize(&key), Some(11));
413        std::env::set_var(&key, "10GB");
414        assert!(std::panic::catch_unwind(|| env_optional_usize(&key)).is_err());
415        std::env::remove_var(&key);
416    }
417
418    #[test]
419    fn data_root_writer_lock_is_exclusive() {
420        let dir =
421            std::env::temp_dir().join(format!("elastik-data-lock-test-{}", std::process::id()));
422        let _ = std::fs::remove_dir_all(&dir);
423        std::fs::create_dir_all(&dir).unwrap();
424
425        let first = acquire_data_root_writer_lock(&dir).unwrap();
426        assert!(acquire_data_root_writer_lock(&dir).is_err());
427        drop(first);
428        let second = acquire_data_root_writer_lock(&dir).unwrap();
429        drop(second);
430
431        let _ = std::fs::remove_dir_all(dir);
432    }
433
434    #[test]
435    fn sqlite_disk_full_maps_to_507() {
436        let err = rusqlite::Error::SqliteFailure(
437            rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_FULL),
438            None,
439        );
440        assert!(is_insufficient_storage_error(&err));
441
442        let resp = storage_error("test", err);
443        assert_eq!(resp.status(), StatusCode::INSUFFICIENT_STORAGE);
444    }
445
446    #[test]
447    fn sqlite_busy_and_locked_map_to_503_retry_after() {
448        for code in [rusqlite::ffi::SQLITE_BUSY, rusqlite::ffi::SQLITE_LOCKED] {
449            let err = rusqlite::Error::SqliteFailure(rusqlite::ffi::Error::new(code), None);
450            assert!(is_transient_storage_error(&err));
451
452            let resp = storage_error("test", err);
453            assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE);
454            assert_eq!(
455                resp.headers()
456                    .get(header::RETRY_AFTER)
457                    .and_then(|v| v.to_str().ok()),
458                Some("1")
459            );
460        }
461    }
462
463    #[test]
464    fn non_storage_sqlite_errors_stay_500() {
465        let err = rusqlite::Error::SqliteFailure(
466            rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_CORRUPT),
467            None,
468        );
469        assert!(!is_insufficient_storage_error(&err));
470
471        let resp = storage_error("test", err);
472        assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR);
473    }
474
475    #[tokio::test]
476    async fn durable_storage_quota_returns_507_without_writing() {
477        let (mut core, dir) = test_core("storage-quota");
478        core.max_storage_bytes = Some(4);
479        let headers = HeaderMap::new();
480
481        let first = unwrap_response(
482            execute_put(
483                headers.clone(),
484                Bytes::from_static(b"1234"),
485                auth::Tier::Write,
486                world_path("home/a"),
487                &core,
488                &TraceCtx::disabled(),
489            )
490            .await,
491        );
492        assert_eq!(first.status(), StatusCode::CREATED);
493
494        let over = unwrap_response(
495            execute_put(
496                headers.clone(),
497                Bytes::from_static(b"5"),
498                auth::Tier::Write,
499                world_path("home/b"),
500                &core,
501                &TraceCtx::disabled(),
502            )
503            .await,
504        );
505        assert_eq!(over.status(), StatusCode::INSUFFICIENT_STORAGE);
506        assert_eq!(over.headers().get("x-storage-usage").unwrap(), "4");
507        assert_eq!(over.headers().get("x-storage-quota").unwrap(), "4");
508        assert_eq!(over.headers().get("x-storage-needed").unwrap(), "1");
509        assert!(core.read_world("home/b").unwrap().is_none());
510
511        let append = unwrap_response(
512            execute_post(
513                headers.clone(),
514                Bytes::from_static(b"5"),
515                auth::Tier::Write,
516                world_path("home/a"),
517                &core,
518                &TraceCtx::disabled(),
519            )
520            .await,
521        );
522        assert_eq!(append.status(), StatusCode::INSUFFICIENT_STORAGE);
523        assert_eq!(append.headers().get("x-storage-usage").unwrap(), "4");
524        assert_eq!(append.headers().get("x-storage-quota").unwrap(), "4");
525        assert_eq!(append.headers().get("x-storage-needed").unwrap(), "1");
526        assert_eq!(
527            core.read_world("home/a").unwrap().unwrap().body,
528            b"1234".to_vec()
529        );
530
531        let _ = std::fs::remove_dir_all(dir);
532    }
533
534    #[tokio::test]
535    async fn concurrent_puts_to_distinct_worlds_do_not_overshoot_quota() {
536        // Regression: under per-world locks, the previous "snapshot then
537        // write then adjust" pattern let two concurrent PUTs to different
538        // worlds both observe usage below quota, both commit, and only
539        // afterwards push the counter past quota (lost-update race).
540        // Atomic CAS reservation in `Core::reserve_storage` closes that
541        // race. This test fires N concurrent PUTs each just under the
542        // quota and asserts that ANY mix of accept/reject keeps the
543        // counter coherent and never overshoots.
544        let (mut core, dir) = test_core("quota-race");
545        let quota = 100;
546        core.max_storage_bytes = Some(quota);
547        let core = Arc::new(core);
548
549        let workers = 16;
550        let body_len = 12; // 16 * 12 = 192 > quota: definitely contention
551        let mut handles = Vec::with_capacity(workers);
552        for i in 0..workers {
553            let core = core.clone();
554            handles.push(tokio::spawn(async move {
555                let path = format!("home/race/{i}");
556                let body = Bytes::copy_from_slice(&vec![b'x'; body_len]);
557                unwrap_response(
558                    execute_put(
559                        HeaderMap::new(),
560                        body,
561                        auth::Tier::Write,
562                        world_path(&path),
563                        &core,
564                        &TraceCtx::disabled(),
565                    )
566                    .await,
567                )
568            }));
569        }
570
571        let mut accepted: usize = 0;
572        for handle in handles {
573            let resp = handle.await.unwrap();
574            match resp.status() {
575                StatusCode::CREATED | StatusCode::OK => accepted += 1,
576                StatusCode::INSUFFICIENT_STORAGE => {}
577                other => panic!("unexpected status: {other}"),
578            }
579        }
580
581        let used = core.storage_body_bytes.load(Ordering::Relaxed);
582        let counted = accepted * body_len;
583        assert_eq!(used, counted, "counter must equal sum of accepted bodies");
584        assert!(
585            used <= quota,
586            "counter must never exceed quota: {used} > {quota}"
587        );
588
589        let _ = std::fs::remove_dir_all(dir);
590    }
591
592    #[tokio::test]
593    async fn concurrent_memory_puts_do_not_overshoot_max_memory_bytes() {
594        // Mirror of concurrent_puts_to_distinct_worlds_do_not_overshoot_quota
595        // for memory worlds. Per-world locks let writes to /tmp/a and
596        // /tmp/b run concurrently; before this fix each one would read
597        // total_bytes() as a snapshot, both pass the budget check, and
598        // both commit -- overshooting ELASTIK_MAX_MEMORY_BYTES. With the
599        // check + write fused under the MemoryStore HashMap mutex inside
600        // write_with_quota, only the writes whose accept order keeps the
601        // running total under the cap can commit.
602        let (mut core, dir) = test_core("memory-quota-race");
603        let cap = 100;
604        core.max_memory_bytes = cap;
605        let core = Arc::new(core);
606
607        let workers = 16;
608        let body_len = 12; // 16 * 12 = 192 > cap: forced contention
609        let mut handles = Vec::with_capacity(workers);
610        for i in 0..workers {
611            let core = core.clone();
612            handles.push(tokio::spawn(async move {
613                let path = format!("tmp/race/{i}");
614                let body = Bytes::copy_from_slice(&vec![b'm'; body_len]);
615                unwrap_response(
616                    execute_put(
617                        HeaderMap::new(),
618                        body,
619                        auth::Tier::Write,
620                        world_path(&path),
621                        &core,
622                        &TraceCtx::disabled(),
623                    )
624                    .await,
625                )
626            }));
627        }
628
629        let mut accepted: usize = 0;
630        for handle in handles {
631            let resp = handle.await.unwrap();
632            match resp.status() {
633                StatusCode::CREATED | StatusCode::OK => accepted += 1,
634                StatusCode::PAYLOAD_TOO_LARGE => {}
635                other => panic!("unexpected status: {other}"),
636            }
637        }
638
639        let used = core.mem.total_bytes();
640        let counted = accepted * body_len;
641        assert_eq!(
642            used, counted,
643            "memory total must equal sum of accepted bodies"
644        );
645        assert!(
646            used <= cap,
647            "memory total must never exceed cap: {used} > {cap}"
648        );
649
650        let _ = std::fs::remove_dir_all(dir);
651    }
652
653    #[test]
654    fn var_log_requires_approve_token() {
655        assert!(!can_write("var/log", auth::Tier::Anon));
656        assert!(!can_write("var/log", auth::Tier::Read));
657        assert!(!can_write("var/log", auth::Tier::Write));
658        assert!(can_write("var/log", auth::Tier::Approve));
659        assert!(!can_write("var/log/deletes", auth::Tier::Anon));
660        assert!(!can_write("var/log/deletes", auth::Tier::Read));
661        assert!(!can_write("var/log/deletes", auth::Tier::Write));
662        assert!(can_write("var/log/deletes", auth::Tier::Approve));
663    }
664
665    #[test]
666    fn delete_requires_approve_token() {
667        assert!(!can_delete(auth::Tier::Anon));
668        assert!(!can_delete(auth::Tier::Read));
669        assert!(!can_delete(auth::Tier::Write));
670        assert!(can_delete(auth::Tier::Approve));
671    }
672
673    #[test]
674    fn listen_addr_brackets_ipv6_hosts() {
675        assert_eq!(listen_addr("127.0.0.1", 3105), "127.0.0.1:3105");
676        assert_eq!(listen_addr("0.0.0.0", 3105), "0.0.0.0:3105");
677        assert_eq!(listen_addr("::1", 3105), "[::1]:3105");
678        assert_eq!(listen_addr("localhost", 3105), "localhost:3105");
679    }
680
681    #[cfg(feature = "coap")]
682    #[test]
683    fn coap_bind_is_opt_in_by_port_env() {
684        let _lock = env_lock().lock().unwrap();
685        let _guard = CoapEnvGuard::capture();
686        std::env::remove_var("ELASTIK_COAP_HOST");
687        std::env::remove_var("ELASTIK_COAP_PORT");
688
689        assert_eq!(coap_bind_from_env(), None);
690
691        std::env::set_var("ELASTIK_COAP_HOST", "0.0.0.0");
692        assert_eq!(coap_bind_from_env(), None);
693
694        std::env::set_var("ELASTIK_COAP_PORT", "5683");
695        assert_eq!(coap_bind_from_env(), Some(("0.0.0.0".to_owned(), 5683)));
696
697        std::env::set_var("ELASTIK_COAP_HOST", "127.0.0.1");
698        std::env::set_var("ELASTIK_COAP_PORT", " ");
699        assert_eq!(coap_bind_from_env(), None);
700
701        std::env::set_var("ELASTIK_COAP_PORT", "not-a-port");
702        assert_eq!(coap_bind_from_env(), None);
703    }
704
705    #[test]
706    fn non_loopback_public_read_gets_warning_flag() {
707        let mut tokens = auth::Tokens {
708            read: None,
709            write: None,
710            approve: None,
711        };
712        assert!(!should_warn_public_read(
713            "127.0.0.1".parse::<IpAddr>().unwrap(),
714            tokens.read_required()
715        ));
716        assert!(should_warn_public_read(
717            "0.0.0.0".parse::<IpAddr>().unwrap(),
718            tokens.read_required()
719        ));
720
721        tokens.read = auth::NonEmptyBytes::new(b"reader".to_vec());
722        assert!(!should_warn_public_read(
723            "0.0.0.0".parse::<IpAddr>().unwrap(),
724            tokens.read_required()
725        ));
726    }
727
728    #[tokio::test]
729    async fn put_and_post_enforce_world_size_cap() {
730        let (mut core, dir) = test_core("world-size-cap");
731        core.max_world_bytes = 4;
732        let headers = HeaderMap::new();
733
734        let too_big = unwrap_response(
735            execute_put(
736                headers.clone(),
737                Bytes::from_static(b"12345"),
738                auth::Tier::Write,
739                world_path("home/too-big"),
740                &core,
741                &TraceCtx::disabled(),
742            )
743            .await,
744        );
745        assert_eq!(too_big.status(), StatusCode::PAYLOAD_TOO_LARGE);
746
747        let ok = unwrap_response(
748            execute_put(
749                headers.clone(),
750                Bytes::from_static(b"1234"),
751                auth::Tier::Write,
752                world_path("home/four"),
753                &core,
754                &TraceCtx::disabled(),
755            )
756            .await,
757        );
758        assert_eq!(ok.status(), StatusCode::CREATED);
759
760        let append = unwrap_response(
761            execute_post(
762                headers.clone(),
763                Bytes::from_static(b"5"),
764                auth::Tier::Write,
765                world_path("home/four"),
766                &core,
767                &TraceCtx::disabled(),
768            )
769            .await,
770        );
771        assert_eq!(append.status(), StatusCode::PAYLOAD_TOO_LARGE);
772
773        let _ = std::fs::remove_dir_all(dir);
774    }
775
776    #[tokio::test]
777    async fn memory_backend_enforces_total_quota() {
778        let (mut core, dir) = test_core("memory-quota");
779        core.max_memory_bytes = 4;
780        let headers = HeaderMap::new();
781
782        let first = unwrap_response(
783            execute_put(
784                headers.clone(),
785                Bytes::from_static(b"12"),
786                auth::Tier::Write,
787                world_path("tmp/a"),
788                &core,
789                &TraceCtx::disabled(),
790            )
791            .await,
792        );
793        assert_eq!(first.status(), StatusCode::CREATED);
794        let second = unwrap_response(
795            execute_put(
796                headers.clone(),
797                Bytes::from_static(b"34"),
798                auth::Tier::Write,
799                world_path("tmp/b"),
800                &core,
801                &TraceCtx::disabled(),
802            )
803            .await,
804        );
805        assert_eq!(second.status(), StatusCode::CREATED);
806        let third = unwrap_response(
807            execute_put(
808                headers.clone(),
809                Bytes::from_static(b"5"),
810                auth::Tier::Write,
811                world_path("tmp/c"),
812                &core,
813                &TraceCtx::disabled(),
814            )
815            .await,
816        );
817        assert_eq!(third.status(), StatusCode::PAYLOAD_TOO_LARGE);
818
819        let _ = std::fs::remove_dir_all(dir);
820    }
821
822    #[test]
823    fn system_namespace_roots_require_approve_even_if_called_directly() {
824        for name in ["lib", "etc", "boot", "usr"] {
825            assert!(!can_write(name, auth::Tier::Anon), "{name}");
826            assert!(!can_write(name, auth::Tier::Read), "{name}");
827            assert!(!can_write(name, auth::Tier::Write), "{name}");
828            assert!(can_write(name, auth::Tier::Approve), "{name}");
829        }
830    }
831
832    #[test]
833    fn non_log_var_still_accepts_auth_token() {
834        assert!(!can_write("var/cache/rag", auth::Tier::Anon));
835        assert!(!can_write("var/cache/rag", auth::Tier::Read));
836        assert!(can_write("var/cache/rag", auth::Tier::Write));
837        assert!(can_write("var/cache/rag", auth::Tier::Approve));
838    }
839
840    #[test]
841    fn read_token_is_optional_but_gates_reads_when_set() {
842        let (mut core, dir) = test_core("read-token");
843        assert!(can_read(&core, auth::Tier::Anon));
844
845        core.tokens.read = auth::NonEmptyBytes::new(b"reader".to_vec());
846        assert!(!can_read(&core, auth::Tier::Anon));
847        assert!(can_read(&core, auth::Tier::Read));
848        assert!(can_read(&core, auth::Tier::Write));
849        assert!(can_read(&core, auth::Tier::Approve));
850
851        let _ = std::fs::remove_dir_all(dir);
852    }
853
854    #[tokio::test]
855    async fn get_and_head_require_read_token_when_enabled() {
856        let (mut core, dir) = test_core("read-token-handlers");
857        core.write_world("home/private", b"secret", "text/plain", &[])
858            .unwrap();
859        core.tokens.read = auth::NonEmptyBytes::new(b"reader".to_vec());
860
861        let headers = HeaderMap::new();
862        let get_anon = unwrap_response(
863            execute_get(
864                headers.clone(),
865                auth::Tier::Anon,
866                world_path("home/private"),
867                &core,
868                &TraceCtx::disabled(),
869            )
870            .await,
871        );
872        assert_eq!(get_anon.status(), StatusCode::UNAUTHORIZED);
873        let head_reader = unwrap_response(
874            execute_head(
875                headers.clone(),
876                auth::Tier::Read,
877                world_path("home/private"),
878                &core,
879                &TraceCtx::disabled(),
880            )
881            .await,
882        );
883        assert_eq!(head_reader.status(), StatusCode::OK);
884
885        let _ = std::fs::remove_dir_all(dir);
886    }
887
888    #[test]
889    fn unauthorized_responses_advertise_bearer_challenge() {
890        let resp = unauthorized("read requires read token");
891        assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
892        assert_eq!(
893            resp.headers().get(header::WWW_AUTHENTICATE).unwrap(),
894            "Bearer realm=\"elastik\""
895        );
896        assert_eq!(
897            resp.headers().get(header::CONTENT_TYPE).unwrap(),
898            "text/plain; charset=utf-8"
899        );
900    }
901
902    #[test]
903    fn core_response_headers_are_core_owned() {
904        let mut headers = HeaderMap::new();
905        headers.insert(header::VARY, HeaderValue::from_static("*"));
906        headers.insert("x-request-id", HeaderValue::from_static("stale"));
907        headers.insert("x-elapsed-us", HeaderValue::from_static("999"));
908        headers.insert("x-content-type-options", HeaderValue::from_static("sniff"));
909
910        stamp_core_response_headers(42, 7, &mut headers);
911
912        assert_eq!(headers.get("x-request-id").unwrap(), "42");
913        assert_eq!(headers.get("x-elapsed-us").unwrap(), "7");
914        assert_eq!(headers.get(header::VARY).unwrap(), "Authorization");
915        assert_eq!(headers.get("x-content-type-options").unwrap(), "nosniff");
916    }
917
918    #[test]
919    fn poisoned_persisted_headers_are_not_replayed() {
920        let mut out = Vec::new();
921        hs::apply_meta_headers(
922            &[
923                (
924                    "x-custom".to_owned(),
925                    "evil\r\nset-cookie: admin=true".to_owned(),
926                ),
927                ("set-cookie".to_owned(), "sid=admin; Path=/".to_owned()),
928                ("clear-site-data".to_owned(), "\"cookies\"".to_owned()),
929                ("bad name".to_owned(), "ok".to_owned()),
930                ("x-safe".to_owned(), "ok".to_owned()),
931            ],
932            &mut out,
933        );
934
935        assert_eq!(out.len(), 1);
936        assert_eq!(out[0].0.as_str(), "x-safe");
937        assert_eq!(out[0].1, "ok");
938    }
939
940    #[test]
941    fn canonicalize_preserves_explicit_namespaces() {
942        assert_eq!(canonicalize_path("/home/tmp/foo"), "home/tmp/foo");
943        assert_eq!(canonicalize_path("/home/etc/foo"), "home/etc/foo");
944        assert_eq!(canonicalize_path("/tmp/foo"), "tmp/foo");
945        assert_eq!(canonicalize_path("/etc/foo"), "etc/foo");
946        assert_eq!(canonicalize_path("/foo"), "home/foo");
947    }
948
949    #[test]
950    fn control_bytes_are_not_valid_world_names() {
951        assert!(valid_world_name("home/ok"));
952        assert!(!valid_world_name("home/bad\nname"));
953        assert!(!valid_world_name(""));
954    }
955
956    #[test]
957    fn dot_segments_empty_segments_and_backslashes_are_not_valid_world_names() {
958        assert!(!valid_world_name("home/../etc/secret"));
959        assert!(!valid_world_name("home/%2E%2E/etc/secret"));
960        assert!(!valid_world_name("home/./x"));
961        assert!(!valid_world_name("home//x"));
962        assert!(!valid_world_name("home/x/"));
963        assert!(!valid_world_name("home\\x"));
964        assert_eq!(
965            validate_world_name("home/%2E%2E/etc/secret"),
966            Err("world path contains dot or encoded-dot segment")
967        );
968        assert_eq!(
969            validate_world_name("home//x"),
970            Err("world path has empty segment")
971        );
972        assert_eq!(
973            validate_world_name("home\\x"),
974            Err("world path contains backslash")
975        );
976    }
977
978    #[test]
979    fn namespace_roots_and_proc_subtree_are_not_world_names() {
980        for name in [
981            "home",
982            "tmp",
983            "dev",
984            "sys",
985            "proc",
986            "proc/anything",
987            "etc",
988            "lib",
989            "boot",
990            "usr",
991            "var",
992            "var/log",
993        ] {
994            assert!(!valid_world_name(name), "{name}");
995        }
996        assert!(valid_world_name("home/x"));
997        assert!(valid_world_name("var/log/deletes"));
998    }
999
1000    #[test]
1001    fn byte_ranges_cover_normal_open_and_suffix_forms() {
1002        let mut h = HeaderMap::new();
1003        assert_eq!(hs::parse_range(&h, 10), Ok(None));
1004
1005        h.insert(header::RANGE, HeaderValue::from_static("bytes=2-5"));
1006        assert_eq!(hs::parse_range(&h, 10), Ok(Some((2, 5))));
1007
1008        h.insert(header::RANGE, HeaderValue::from_static("bytes=7-"));
1009        assert_eq!(hs::parse_range(&h, 10), Ok(Some((7, 9))));
1010
1011        h.insert(header::RANGE, HeaderValue::from_static("bytes=-3"));
1012        assert_eq!(hs::parse_range(&h, 10), Ok(Some((7, 9))));
1013
1014        h.insert(header::RANGE, HeaderValue::from_static("bytes=8-99"));
1015        assert_eq!(hs::parse_range(&h, 10), Ok(Some((8, 9))));
1016
1017        h.insert(header::RANGE, HeaderValue::from_static("bytes=11-12"));
1018        assert_eq!(hs::parse_range(&h, 10), Err(()));
1019
1020        h.insert(header::RANGE, HeaderValue::from_static("bytes=0-1,4-5"));
1021        assert_eq!(hs::parse_range(&h, 10), Ok(None));
1022    }
1023
1024    #[tokio::test]
1025    async fn get_and_head_honor_single_byte_range() {
1026        let (core, dir) = test_core("range-handler");
1027        core.write_world("home/range", b"abcdef", "text/plain", &[])
1028            .unwrap();
1029        let mut headers = HeaderMap::new();
1030        headers.insert(header::RANGE, HeaderValue::from_static("bytes=1-3"));
1031
1032        let get = unwrap_response(
1033            execute_get(
1034                headers.clone(),
1035                auth::Tier::Anon,
1036                world_path("home/range"),
1037                &core,
1038                &TraceCtx::disabled(),
1039            )
1040            .await,
1041        );
1042        assert_eq!(get.status(), StatusCode::PARTIAL_CONTENT);
1043        assert_eq!(
1044            get.headers().get(header::CONTENT_RANGE).unwrap(),
1045            "bytes 1-3/6"
1046        );
1047        assert_eq!(get.headers().get(header::CONTENT_LENGTH).unwrap(), "3");
1048
1049        let head = unwrap_response(
1050            execute_head(
1051                headers.clone(),
1052                auth::Tier::Anon,
1053                world_path("home/range"),
1054                &core,
1055                &TraceCtx::disabled(),
1056            )
1057            .await,
1058        );
1059        assert_eq!(head.status(), StatusCode::PARTIAL_CONTENT);
1060        assert_eq!(
1061            head.headers().get(header::CONTENT_RANGE).unwrap(),
1062            "bytes 1-3/6"
1063        );
1064        assert_eq!(head.headers().get(header::CONTENT_LENGTH).unwrap(), "3");
1065
1066        let _ = std::fs::remove_dir_all(dir);
1067    }
1068
1069    #[tokio::test]
1070    async fn get_and_head_advertise_accept_ranges_on_full_body() {
1071        let (core, dir) = test_core("accept-ranges");
1072        core.write_world("home/ranges", b"abcdef", "text/plain", &[])
1073            .unwrap();
1074        let headers = HeaderMap::new();
1075
1076        let get = unwrap_response(
1077            execute_get(
1078                headers.clone(),
1079                auth::Tier::Anon,
1080                world_path("home/ranges"),
1081                &core,
1082                &TraceCtx::disabled(),
1083            )
1084            .await,
1085        );
1086        assert_eq!(get.status(), StatusCode::OK);
1087        assert_eq!(get.headers().get(header::ACCEPT_RANGES).unwrap(), "bytes");
1088
1089        let head = unwrap_response(
1090            execute_head(
1091                headers.clone(),
1092                auth::Tier::Anon,
1093                world_path("home/ranges"),
1094                &core,
1095                &TraceCtx::disabled(),
1096            )
1097            .await,
1098        );
1099        assert_eq!(head.status(), StatusCode::OK);
1100        assert_eq!(head.headers().get(header::ACCEPT_RANGES).unwrap(), "bytes");
1101
1102        let _ = std::fs::remove_dir_all(dir);
1103    }
1104
1105    #[tokio::test]
1106    async fn unsatisfied_range_returns_416_with_content_range() {
1107        let (core, dir) = test_core("range-416");
1108        core.write_world("home/range", b"abcdef", "text/plain", &[])
1109            .unwrap();
1110        let mut headers = HeaderMap::new();
1111        headers.insert(header::RANGE, HeaderValue::from_static("bytes=99-100"));
1112
1113        let get = unwrap_response(
1114            execute_get(
1115                headers.clone(),
1116                auth::Tier::Anon,
1117                world_path("home/range"),
1118                &core,
1119                &TraceCtx::disabled(),
1120            )
1121            .await,
1122        );
1123        assert_eq!(get.status(), StatusCode::RANGE_NOT_SATISFIABLE);
1124        assert_eq!(
1125            get.headers().get(header::CONTENT_RANGE).unwrap(),
1126            "bytes */6"
1127        );
1128        assert_eq!(get.headers().get(header::ACCEPT_RANGES).unwrap(), "bytes");
1129
1130        let _ = std::fs::remove_dir_all(dir);
1131    }
1132
1133    #[tokio::test]
1134    async fn multi_range_is_ignored_and_returns_full_body() {
1135        let (core, dir) = test_core("multi-range");
1136        core.write_world("home/range", b"abcdef", "text/plain", &[])
1137            .unwrap();
1138        let mut headers = HeaderMap::new();
1139        headers.insert(header::RANGE, HeaderValue::from_static("bytes=0-1,4-5"));
1140
1141        let get = unwrap_response(
1142            execute_get(
1143                headers.clone(),
1144                auth::Tier::Anon,
1145                world_path("home/range"),
1146                &core,
1147                &TraceCtx::disabled(),
1148            )
1149            .await,
1150        );
1151        assert_eq!(get.status(), StatusCode::OK);
1152        assert!(get.headers().get(header::CONTENT_RANGE).is_none());
1153        assert_eq!(get.headers().get(header::CONTENT_LENGTH).unwrap(), "6");
1154
1155        let _ = std::fs::remove_dir_all(dir);
1156    }
1157
1158    #[tokio::test]
1159    async fn world_reads_advertise_monitor_and_collection_links() {
1160        let (core, dir) = test_core("link-headers");
1161        core.write_world("home/links", b"hello", "text/plain", &[])
1162            .unwrap();
1163        let headers = HeaderMap::new();
1164
1165        let get = unwrap_response(
1166            execute_get(
1167                headers.clone(),
1168                auth::Tier::Anon,
1169                world_path("home/links"),
1170                &core,
1171                &TraceCtx::disabled(),
1172            )
1173            .await,
1174        );
1175        let links: Vec<_> = get.headers().get_all(header::LINK).iter().collect();
1176        assert_eq!(links.len(), 2);
1177        assert!(links
1178            .iter()
1179            .any(|v| *v == "</listen/home/links>; rel=\"monitor\""));
1180        assert!(links
1181            .iter()
1182            .any(|v| *v == "</proc/worlds>; rel=\"collection\""));
1183
1184        let head = unwrap_response(
1185            execute_head(
1186                headers.clone(),
1187                auth::Tier::Anon,
1188                world_path("home/links"),
1189                &core,
1190                &TraceCtx::disabled(),
1191            )
1192            .await,
1193        );
1194        assert_eq!(head.headers().get_all(header::LINK).iter().count(), 2);
1195
1196        let _ = std::fs::remove_dir_all(dir);
1197    }
1198
1199    #[test]
1200    fn if_range_controls_whether_range_is_applied() {
1201        let mut h = HeaderMap::new();
1202        h.insert(header::RANGE, HeaderValue::from_static("bytes=1-3"));
1203        h.insert(
1204            header::IF_RANGE,
1205            HeaderValue::from_static("\"hmac-current\""),
1206        );
1207        assert_eq!(hs::effective_range(&h, 6, "hmac-current"), Ok(Some((1, 3))));
1208
1209        h.insert(header::IF_RANGE, HeaderValue::from_static("\"hmac-stale\""));
1210        assert_eq!(hs::effective_range(&h, 6, "hmac-current"), Ok(None));
1211
1212        h.insert(
1213            header::IF_RANGE,
1214            HeaderValue::from_static("W/\"hmac-current\""),
1215        );
1216        assert_eq!(hs::effective_range(&h, 6, "hmac-current"), Ok(None));
1217    }
1218
1219    #[tokio::test]
1220    async fn stale_if_range_returns_full_body() {
1221        let (core, dir) = test_core("if-range-stale");
1222        core.write_world("home/range", b"abcdef", "text/plain", &[])
1223            .unwrap();
1224        let mut headers = HeaderMap::new();
1225        headers.insert(header::RANGE, HeaderValue::from_static("bytes=1-3"));
1226        headers.insert(header::IF_RANGE, HeaderValue::from_static("\"hmac-stale\""));
1227
1228        let get = unwrap_response(
1229            execute_get(
1230                headers.clone(),
1231                auth::Tier::Anon,
1232                world_path("home/range"),
1233                &core,
1234                &TraceCtx::disabled(),
1235            )
1236            .await,
1237        );
1238        assert_eq!(get.status(), StatusCode::OK);
1239        assert!(get.headers().get(header::CONTENT_RANGE).is_none());
1240        assert_eq!(get.headers().get(header::CONTENT_LENGTH).unwrap(), "6");
1241
1242        let _ = std::fs::remove_dir_all(dir);
1243    }
1244
1245    #[tokio::test]
1246    async fn get_and_head_honor_if_none_match_cache_revalidation() {
1247        let (core, dir) = test_core("read-if-none-match");
1248        let h = world::write_with_audit(
1249            &core.data,
1250            "home/cache",
1251            b"cached body",
1252            "text/plain",
1253            &[],
1254            &core.hmac_key,
1255        )
1256        .unwrap();
1257        let etag = format!("\"{}\"", et::hmac_etag(&h));
1258        let mut headers = HeaderMap::new();
1259        headers.insert(header::IF_NONE_MATCH, HeaderValue::from_str(&etag).unwrap());
1260
1261        let get = unwrap_response(
1262            execute_get(
1263                headers.clone(),
1264                auth::Tier::Anon,
1265                world_path("home/cache"),
1266                &core,
1267                &TraceCtx::disabled(),
1268            )
1269            .await,
1270        );
1271        assert_eq!(get.status(), StatusCode::NOT_MODIFIED);
1272        assert_eq!(get.headers().get(header::ETAG).unwrap(), etag.as_str());
1273        assert!(get
1274            .headers()
1275            .get_all(header::LINK)
1276            .iter()
1277            .any(|v| v == "</listen/home/cache>; rel=\"monitor\""));
1278
1279        let head = unwrap_response(
1280            execute_head(
1281                headers.clone(),
1282                auth::Tier::Anon,
1283                world_path("home/cache"),
1284                &core,
1285                &TraceCtx::disabled(),
1286            )
1287            .await,
1288        );
1289        assert_eq!(head.status(), StatusCode::NOT_MODIFIED);
1290        assert_eq!(head.headers().get(header::ETAG).unwrap(), etag.as_str());
1291
1292        headers.insert(
1293            header::IF_NONE_MATCH,
1294            HeaderValue::from_static("\"hmac-stale\""),
1295        );
1296        let get = unwrap_response(
1297            execute_get(
1298                headers.clone(),
1299                auth::Tier::Anon,
1300                world_path("home/cache"),
1301                &core,
1302                &TraceCtx::disabled(),
1303            )
1304            .await,
1305        );
1306        assert_eq!(get.status(), StatusCode::OK);
1307
1308        let _ = std::fs::remove_dir_all(dir);
1309    }
1310
1311    #[tokio::test]
1312    async fn options_and_405_advertise_allow_headers() {
1313        // PR 4c: `handle_world_method` was retired. OPTIONS is now
1314        // answered directly in `world_handler` (policy-free, never
1315        // enters the FSM); PATCH and any other unsupported method
1316        // are rejected by `pipeline::dispatch` with `MethodNotAllowed`.
1317        let (core, dir) = test_core("allow");
1318        let core = std::sync::Arc::new(core);
1319        let state = server_state_for_tests(core.clone());
1320
1321        let options = options_response(WORLD_ALLOW);
1322        assert_eq!(options.status(), StatusCode::NO_CONTENT);
1323        assert_eq!(options.headers().get(header::ALLOW).unwrap(), WORLD_ALLOW);
1324
1325        let patch = pipeline::run(
1326            Method::PATCH,
1327            "home/allow".to_string(),
1328            HeaderMap::new(),
1329            Bytes::new(),
1330            &state,
1331            0,
1332        )
1333        .await;
1334        assert_eq!(patch.status(), StatusCode::METHOD_NOT_ALLOWED);
1335        assert_eq!(patch.headers().get(header::ALLOW).unwrap(), WORLD_ALLOW);
1336
1337        let _ = std::fs::remove_dir_all(dir);
1338    }
1339
1340    #[tokio::test]
1341    async fn root_and_proc_endpoints_advertise_head_options_and_405() {
1342        let root_head = root_hint(Method::HEAD).await;
1343        assert_eq!(root_head.status(), StatusCode::OK);
1344        assert_eq!(
1345            root_head.headers().get(header::CONTENT_TYPE).unwrap(),
1346            "text/plain; charset=utf-8"
1347        );
1348        assert!(root_head.headers().get(header::CONTENT_LENGTH).is_some());
1349
1350        let root_options = root_hint(Method::OPTIONS).await;
1351        assert_eq!(root_options.status(), StatusCode::NO_CONTENT);
1352        assert_eq!(
1353            root_options.headers().get(header::ALLOW).unwrap(),
1354            ROOT_ALLOW
1355        );
1356
1357        let root_post = root_hint(Method::POST).await;
1358        assert_eq!(root_post.status(), StatusCode::METHOD_NOT_ALLOWED);
1359        assert_eq!(root_post.headers().get(header::ALLOW).unwrap(), ROOT_ALLOW);
1360
1361        let version_head = proc_version(Method::HEAD).await;
1362        assert_eq!(version_head.status(), StatusCode::OK);
1363        assert!(version_head.headers().get(header::CONTENT_LENGTH).is_some());
1364
1365        let version_delete = proc_version(Method::DELETE).await;
1366        assert_eq!(version_delete.status(), StatusCode::METHOD_NOT_ALLOWED);
1367        assert_eq!(
1368            version_delete.headers().get(header::ALLOW).unwrap(),
1369            PROC_ALLOW
1370        );
1371    }
1372
1373    #[tokio::test]
1374    async fn proc_worlds_head_options_and_405_are_plain_http() {
1375        let (core, dir) = test_core("proc-worlds-http");
1376        core.write_world("home/a", b"a", "text/plain", &[]).unwrap();
1377        let state = Arc::new(core);
1378        let headers = HeaderMap::new();
1379
1380        let head = proc_worlds(
1381            State(server_state_for_tests(state.clone())),
1382            Method::HEAD,
1383            headers.clone(),
1384        )
1385        .await;
1386        assert_eq!(head.status(), StatusCode::OK);
1387        assert_eq!(
1388            head.headers().get(header::CONTENT_TYPE).unwrap(),
1389            "text/plain; charset=utf-8"
1390        );
1391        assert!(head.headers().get(header::CONTENT_LENGTH).is_some());
1392
1393        let options = proc_worlds(
1394            State(server_state_for_tests(state.clone())),
1395            Method::OPTIONS,
1396            headers.clone(),
1397        )
1398        .await;
1399        assert_eq!(options.status(), StatusCode::NO_CONTENT);
1400        assert_eq!(options.headers().get(header::ALLOW).unwrap(), PROC_ALLOW);
1401
1402        let delete = proc_worlds(
1403            State(server_state_for_tests(state)),
1404            Method::DELETE,
1405            headers,
1406        )
1407        .await;
1408        assert_eq!(delete.status(), StatusCode::METHOD_NOT_ALLOWED);
1409        assert_eq!(delete.headers().get(header::ALLOW).unwrap(), PROC_ALLOW);
1410
1411        let _ = std::fs::remove_dir_all(dir);
1412    }
1413
1414    #[tokio::test]
1415    async fn proc_namespace_is_reserved_beyond_declared_endpoints() {
1416        let not_found = proc_reserved(Method::GET).await;
1417        assert_eq!(not_found.status(), StatusCode::NOT_FOUND);
1418
1419        let head = proc_reserved(Method::HEAD).await;
1420        assert_eq!(head.status(), StatusCode::NOT_FOUND);
1421
1422        let options = proc_reserved(Method::OPTIONS).await;
1423        assert_eq!(options.status(), StatusCode::NO_CONTENT);
1424        assert_eq!(options.headers().get(header::ALLOW).unwrap(), PROC_ALLOW);
1425
1426        let put = proc_reserved(Method::PUT).await;
1427        assert_eq!(put.status(), StatusCode::METHOD_NOT_ALLOWED);
1428        assert_eq!(put.headers().get(header::ALLOW).unwrap(), PROC_ALLOW);
1429    }
1430
1431    #[tokio::test]
1432    async fn proc_audit_verify_reports_valid_chain_in_headers() {
1433        let (core, dir) = test_core("proc-audit-valid");
1434        let h = world::write_with_audit(
1435            &core.data,
1436            "home/audit-ok",
1437            b"hello",
1438            "text/plain",
1439            &[("x-meta-author".to_owned(), "ranger".to_owned())],
1440            &core.hmac_key,
1441        )
1442        .unwrap();
1443        let state = Arc::new(core);
1444        let resp = proc_audit_verify(
1445            State(server_state_for_tests(state)),
1446            Method::HEAD,
1447            AxPath("home/audit-ok/verify".to_owned()),
1448            HeaderMap::new(),
1449        )
1450        .await;
1451
1452        assert_eq!(resp.status(), StatusCode::OK);
1453        assert_eq!(resp.headers().get("x-audit-valid").unwrap(), "true");
1454        assert_eq!(resp.headers().get("x-audit-events").unwrap(), "1");
1455        assert_eq!(
1456            resp.headers().get("x-audit-latest").unwrap(),
1457            &format!("hmac-{h}")
1458        );
1459        assert_eq!(resp.headers().get(header::CONTENT_LENGTH).unwrap(), "0");
1460
1461        let _ = std::fs::remove_dir_all(dir);
1462    }
1463
1464    #[tokio::test]
1465    async fn proc_audit_verify_reports_broken_chain_in_headers() {
1466        let (core, dir) = test_core("proc-audit-broken");
1467        world::write_with_audit(
1468            &core.data,
1469            "home/audit-broken",
1470            b"hello",
1471            "text/plain",
1472            &[],
1473            &core.hmac_key,
1474        )
1475        .unwrap();
1476        let db = world::world_db(&core.data, "home/audit-broken");
1477        let c = rusqlite::Connection::open(db).unwrap();
1478        c.execute("UPDATE events SET hmac='bad' WHERE id=1", [])
1479            .unwrap();
1480
1481        let state = Arc::new(core);
1482        let resp = proc_audit_verify(
1483            State(server_state_for_tests(state)),
1484            Method::HEAD,
1485            AxPath("home/audit-broken/verify".to_owned()),
1486            HeaderMap::new(),
1487        )
1488        .await;
1489
1490        assert_eq!(resp.status(), StatusCode::CONFLICT);
1491        assert_eq!(resp.headers().get("x-audit-valid").unwrap(), "false");
1492        assert_eq!(resp.headers().get("x-audit-break-at").unwrap(), "0");
1493        assert_eq!(resp.headers().get("x-audit-actual").unwrap(), "hmac-bad");
1494
1495        let _ = std::fs::remove_dir_all(dir);
1496    }
1497
1498    #[tokio::test]
1499    async fn proc_audit_verify_escapes_tampered_header_values() {
1500        let (core, dir) = test_core("proc-audit-header-escape");
1501        world::write_with_audit(
1502            &core.data,
1503            "home/audit-escaped",
1504            b"hello",
1505            "text/plain",
1506            &[],
1507            &core.hmac_key,
1508        )
1509        .unwrap();
1510        let db = world::world_db(&core.data, "home/audit-escaped");
1511        let c = rusqlite::Connection::open(db).unwrap();
1512        c.execute(
1513            "UPDATE events SET hmac=? WHERE id=1",
1514            ["bad\nInjected: yes"],
1515        )
1516        .unwrap();
1517
1518        let state = Arc::new(core);
1519        let resp = proc_audit_verify(
1520            State(server_state_for_tests(state)),
1521            Method::HEAD,
1522            AxPath("home/audit-escaped/verify".to_owned()),
1523            HeaderMap::new(),
1524        )
1525        .await;
1526
1527        assert_eq!(resp.status(), StatusCode::CONFLICT);
1528        assert_eq!(
1529            resp.headers().get("x-audit-actual").unwrap(),
1530            "hmac-bad\\x0aInjected: yes"
1531        );
1532
1533        let _ = std::fs::remove_dir_all(dir);
1534    }
1535
1536    #[tokio::test]
1537    async fn proc_audit_verify_reports_memory_world_not_applicable() {
1538        let (core, dir) = test_core("proc-audit-memory");
1539        core.write_world("tmp/scratch", b"draft", "text/plain", &[])
1540            .unwrap();
1541        let state = Arc::new(core);
1542        let resp = proc_audit_verify(
1543            State(server_state_for_tests(state)),
1544            Method::HEAD,
1545            AxPath("tmp/scratch/verify".to_owned()),
1546            HeaderMap::new(),
1547        )
1548        .await;
1549
1550        assert_eq!(resp.status(), StatusCode::NO_CONTENT);
1551        assert_eq!(resp.headers().get("x-audit-valid").unwrap(), "n/a");
1552
1553        let _ = std::fs::remove_dir_all(dir);
1554    }
1555
1556    #[tokio::test]
1557    async fn proc_audit_verify_missing_disk_world_does_not_create_db() {
1558        let (core, dir) = test_core("proc-audit-missing-no-create");
1559        let db = world::world_db(&core.data, "home/missing-audit");
1560        assert!(!db.exists());
1561
1562        let state = Arc::new(core);
1563        let resp = proc_audit_verify(
1564            State(server_state_for_tests(state)),
1565            Method::HEAD,
1566            AxPath("home/missing-audit/verify".to_owned()),
1567            HeaderMap::new(),
1568        )
1569        .await;
1570
1571        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
1572        assert!(!db.exists());
1573
1574        let _ = std::fs::remove_dir_all(dir);
1575    }
1576
1577    #[tokio::test]
1578    async fn put_created_returns_location() {
1579        let (core, dir) = test_core("put-location");
1580        let headers = HeaderMap::new();
1581        let resp = unwrap_response(
1582            execute_put(
1583                headers.clone(),
1584                Bytes::from_static(b"new"),
1585                auth::Tier::Write,
1586                world_path("home/created"),
1587                &core,
1588                &TraceCtx::disabled(),
1589            )
1590            .await,
1591        );
1592
1593        assert_eq!(resp.status(), StatusCode::CREATED);
1594        assert_eq!(
1595            resp.headers().get(header::LOCATION).unwrap(),
1596            "/home/created"
1597        );
1598
1599        let resp = unwrap_response(
1600            execute_put(
1601                headers.clone(),
1602                Bytes::from_static(b"again"),
1603                auth::Tier::Write,
1604                world_path("home/created"),
1605                &core,
1606                &TraceCtx::disabled(),
1607            )
1608            .await,
1609        );
1610        assert_eq!(resp.status(), StatusCode::OK);
1611        assert!(resp.headers().get(header::LOCATION).is_none());
1612
1613        let _ = std::fs::remove_dir_all(dir);
1614    }
1615
1616    #[tokio::test]
1617    async fn location_and_link_headers_percent_encode_world_urls() {
1618        let (core, dir) = test_core("encoded-headers");
1619        let headers = HeaderMap::new();
1620        let resp = unwrap_response(
1621            execute_put(
1622                headers.clone(),
1623                Bytes::from_static(b"new"),
1624                auth::Tier::Write,
1625                world_path("home/café report"),
1626                &core,
1627                &TraceCtx::disabled(),
1628            )
1629            .await,
1630        );
1631        assert_eq!(resp.status(), StatusCode::CREATED);
1632        assert_eq!(
1633            resp.headers().get(header::LOCATION).unwrap(),
1634            "/home/caf%C3%A9%20report"
1635        );
1636
1637        let get = unwrap_response(
1638            execute_get(
1639                headers.clone(),
1640                auth::Tier::Anon,
1641                world_path("home/café report"),
1642                &core,
1643                &TraceCtx::disabled(),
1644            )
1645            .await,
1646        );
1647        let links: Vec<_> = get.headers().get_all(header::LINK).iter().collect();
1648        assert!(links
1649            .iter()
1650            .any(|v| *v == "</listen/home/caf%C3%A9%20report>; rel=\"monitor\""));
1651
1652        let _ = std::fs::remove_dir_all(dir);
1653    }
1654
1655    #[tokio::test]
1656    async fn unicode_worlds_roundtrip_body_headers_and_proc_listing() {
1657        let (core, dir) = test_core("unicode-roundtrip");
1658        let headers = vec![(
1659            "content-disposition".to_string(),
1660            "attachment; filename*=UTF-8''%E6%8A%A5%E5%91%8A.pdf".to_string(),
1661        )];
1662        core.write_world(
1663            "home/销售/报告",
1664            "你好,世界".as_bytes(),
1665            "text/plain; charset=utf-8",
1666            &headers,
1667        )
1668        .unwrap();
1669
1670        let req_headers = HeaderMap::new();
1671        let get = unwrap_response(
1672            execute_get(
1673                req_headers.clone(),
1674                auth::Tier::Anon,
1675                world_path("home/销售/报告"),
1676                &core,
1677                &TraceCtx::disabled(),
1678            )
1679            .await,
1680        );
1681        assert_eq!(get.status(), StatusCode::OK);
1682        assert_eq!(
1683            get.headers().get(header::CONTENT_TYPE).unwrap(),
1684            "text/plain; charset=utf-8"
1685        );
1686        assert_eq!(
1687            get.headers().get(header::CONTENT_DISPOSITION).unwrap(),
1688            "attachment; filename*=UTF-8''%E6%8A%A5%E5%91%8A.pdf"
1689        );
1690        assert!(
1691            get.headers()
1692                .get_all(header::LINK)
1693                .iter()
1694                .any(|v| *v
1695                    == "</listen/home/%E9%94%80%E5%94%AE/%E6%8A%A5%E5%91%8A>; rel=\"monitor\"")
1696        );
1697
1698        let names = store::list_all(&core.data, &core.mem);
1699        assert_eq!(world_list_body(&names.unwrap()), "home/销售/报告\n");
1700
1701        let _ = std::fs::remove_dir_all(dir);
1702    }
1703
1704    #[test]
1705    fn etag_lists_match_http_strong_and_weak_rules() {
1706        assert!(et::etag_list_strong_matches("\"hmac-abc\"", "hmac-abc"));
1707        assert!(et::etag_list_strong_matches(
1708            "\"other\", \"hmac-abc\"",
1709            "hmac-abc"
1710        ));
1711        assert!(et::etag_list_strong_matches("*", "hmac-abc"));
1712        assert!(!et::etag_list_strong_matches("W/\"hmac-abc\"", "hmac-abc"));
1713        assert!(!et::etag_list_strong_matches("\"other\"", "hmac-abc"));
1714
1715        assert!(et::etag_list_weak_matches("W/\"hmac-abc\"", "hmac-abc"));
1716    }
1717
1718    #[test]
1719    fn if_none_match_star_blocks_existing_world() {
1720        let (core, dir) = test_core("if-none-match-star");
1721        core.write_world("home/cas", b"one", "text/plain; charset=utf-8", &[])
1722            .unwrap();
1723
1724        let mut headers = HeaderMap::new();
1725        headers.insert(header::IF_NONE_MATCH, HeaderValue::from_static("*"));
1726
1727        assert!(hs::check_write_preconditions(&core, "home/cas", &headers).is_err());
1728        assert!(hs::check_write_preconditions(&core, "home/new", &headers).is_ok());
1729
1730        let _ = std::fs::remove_dir_all(dir);
1731    }
1732
1733    #[tokio::test]
1734    async fn put_and_post_honor_write_preconditions_at_handler_level() {
1735        let (core, dir) = test_core("write-preconditions");
1736        let h = world::write_with_audit(
1737            &core.data,
1738            "home/cas",
1739            b"one",
1740            "text/plain; charset=utf-8",
1741            &[],
1742            &core.hmac_key,
1743        )
1744        .unwrap();
1745
1746        let mut stale = HeaderMap::new();
1747        stale.insert(header::IF_MATCH, HeaderValue::from_static("\"hmac-stale\""));
1748        let put = unwrap_response(
1749            execute_put(
1750                stale.clone(),
1751                Bytes::from_static(b"two"),
1752                auth::Tier::Write,
1753                world_path("home/cas"),
1754                &core,
1755                &TraceCtx::disabled(),
1756            )
1757            .await,
1758        );
1759        assert_eq!(put.status(), StatusCode::PRECONDITION_FAILED);
1760
1761        let post = unwrap_response(
1762            execute_post(
1763                stale.clone(),
1764                Bytes::from_static(b" plus"),
1765                auth::Tier::Write,
1766                world_path("home/cas"),
1767                &core,
1768                &TraceCtx::disabled(),
1769            )
1770            .await,
1771        );
1772        assert_eq!(post.status(), StatusCode::PRECONDITION_FAILED);
1773
1774        let mut good = HeaderMap::new();
1775        good.insert(
1776            header::IF_MATCH,
1777            HeaderValue::from_str(&format!("\"{}\"", et::hmac_etag(&h))).unwrap(),
1778        );
1779        let post = unwrap_response(
1780            execute_post(
1781                good.clone(),
1782                Bytes::from_static(b" plus"),
1783                auth::Tier::Write,
1784                world_path("home/cas"),
1785                &core,
1786                &TraceCtx::disabled(),
1787            )
1788            .await,
1789        );
1790        assert_eq!(post.status(), StatusCode::OK);
1791
1792        let _ = std::fs::remove_dir_all(dir);
1793    }
1794
1795    #[test]
1796    fn if_match_accepts_current_hmac_etag_only() {
1797        let (core, dir) = test_core("if-match-hmac");
1798        core.write_world("home/cas", b"one", "text/plain; charset=utf-8", &[])
1799            .unwrap();
1800        let mut conn = world::open(&core.data, "home/cas").unwrap();
1801        let h = audit::append_with_conn_existing(
1802            &mut conn,
1803            "put",
1804            "home/cas",
1805            &world::sha256_hex(b"one"),
1806            3,
1807            "text/plain; charset=utf-8",
1808            &[],
1809            &core.hmac_key,
1810        )
1811        .unwrap();
1812        drop(conn);
1813        let etag = format!("\"{}\"", et::hmac_etag(&h));
1814
1815        let mut good = HeaderMap::new();
1816        good.insert(header::IF_MATCH, HeaderValue::from_str(&etag).unwrap());
1817        assert!(hs::check_write_preconditions(&core, "home/cas", &good).is_ok());
1818
1819        let mut stale = HeaderMap::new();
1820        stale.insert(header::IF_MATCH, HeaderValue::from_static("\"hmac-stale\""));
1821        assert!(hs::check_write_preconditions(&core, "home/cas", &stale).is_err());
1822
1823        let _ = std::fs::remove_dir_all(dir);
1824    }
1825
1826    #[test]
1827    fn request_content_type_preserves_http_content_type_verbatim() {
1828        let mut headers = HeaderMap::new();
1829        headers.insert(
1830            header::CONTENT_TYPE,
1831            HeaderValue::from_static("application/pdf"),
1832        );
1833        assert_eq!(hs::request_content_type(&headers), "application/pdf");
1834
1835        headers.insert(
1836            header::CONTENT_TYPE,
1837            HeaderValue::from_static("text/html; charset=utf-8"),
1838        );
1839        assert_eq!(
1840            hs::request_content_type(&headers),
1841            "text/html; charset=utf-8"
1842        );
1843
1844        headers.clear();
1845        assert_eq!(
1846            hs::request_content_type(&headers),
1847            "application/octet-stream"
1848        );
1849    }
1850
1851    #[test]
1852    fn request_meta_headers_persist_default_representation_headers_only() {
1853        // L2 (DEFAULT_PERSIST_HEADERS) covers standard representation
1854        // headers that round-trip with the body without the operator
1855        // configuring anything. Custom headers (x-author,
1856        // x-future-http-thing, x-meta-*) are NOT persisted under the
1857        // empty allowlist -- the operator must opt in via
1858        // ELASTIK_PERSIST_HEADERS. Layer 1 hard-deny stays in force
1859        // either way.
1860        let mut headers = HeaderMap::new();
1861        headers.insert(header::CONTENT_TYPE, HeaderValue::from_static("image/png"));
1862        headers.insert(header::CONTENT_ENCODING, HeaderValue::from_static("gzip"));
1863        headers.insert(header::CONTENT_LANGUAGE, HeaderValue::from_static("zh-CN"));
1864        headers.insert(
1865            header::CONTENT_DISPOSITION,
1866            HeaderValue::from_static("attachment; filename=\"report.pdf\""),
1867        );
1868        headers.insert(
1869            header::CACHE_CONTROL,
1870            HeaderValue::from_static("max-age=60"),
1871        );
1872        headers.insert("access-control-allow-origin", HeaderValue::from_static("*"));
1873        headers.insert(
1874            "access-control-allow-methods",
1875            HeaderValue::from_static("GET, HEAD"),
1876        );
1877        headers.insert(
1878            "access-control-expose-headers",
1879            HeaderValue::from_static("ETag"),
1880        );
1881        headers.insert(
1882            "content-security-policy",
1883            HeaderValue::from_static("default-src 'self'"),
1884        );
1885        headers.insert("x-frame-options", HeaderValue::from_static("DENY"));
1886        headers.insert("permissions-policy", HeaderValue::from_static("camera=()"));
1887        headers.insert(
1888            "cross-origin-resource-policy",
1889            HeaderValue::from_static("same-origin"),
1890        );
1891        headers.insert(
1892            "x-content-type-options",
1893            HeaderValue::from_static("nosniff"),
1894        );
1895        headers.insert("x-future-http-thing", HeaderValue::from_static("ok"));
1896        headers.insert("x-meta-author", HeaderValue::from_static("ranger"));
1897
1898        headers.insert(
1899            header::AUTHORIZATION,
1900            HeaderValue::from_str(&format!("{} {}", "Bearer", "secret")).unwrap(),
1901        );
1902        headers.insert(
1903            "proxy-authorization",
1904            HeaderValue::from_str(&format!("{} {}", "Bearer", "secret")).unwrap(),
1905        );
1906        headers.insert(header::COOKIE, HeaderValue::from_static("sid=secret"));
1907        headers.insert(header::SET_COOKIE, HeaderValue::from_static("sid=secret"));
1908        headers.insert(header::HOST, HeaderValue::from_static("localhost:3105"));
1909        headers.insert(header::CONNECTION, HeaderValue::from_static("keep-alive"));
1910        headers.insert("keep-alive", HeaderValue::from_static("timeout=5"));
1911        headers.insert(
1912            header::TRANSFER_ENCODING,
1913            HeaderValue::from_static("chunked"),
1914        );
1915        headers.insert(header::TE, HeaderValue::from_static("trailers"));
1916        headers.insert(header::TRAILER, HeaderValue::from_static("expires"));
1917        headers.insert(header::UPGRADE, HeaderValue::from_static("websocket"));
1918        headers.insert("http2-settings", HeaderValue::from_static("abc"));
1919        headers.insert(header::CONTENT_LENGTH, HeaderValue::from_static("999"));
1920        headers.insert(header::ETAG, HeaderValue::from_static("\"fake\""));
1921        headers.insert(header::ALLOW, HeaderValue::from_static("GET"));
1922        headers.insert(header::LOCATION, HeaderValue::from_static("/elsewhere"));
1923        headers.insert(header::LINK, HeaderValue::from_static("</x>; rel=\"next\""));
1924        headers.insert(
1925            header::WWW_AUTHENTICATE,
1926            HeaderValue::from_static("Bearer realm=\"x\""),
1927        );
1928        headers.insert(header::ACCEPT, HeaderValue::from_static("text/html"));
1929        headers.insert(header::ACCEPT_ENCODING, HeaderValue::from_static("gzip"));
1930        headers.insert(header::ACCEPT_LANGUAGE, HeaderValue::from_static("zh-CN"));
1931        headers.insert("accept-charset", HeaderValue::from_static("utf-8"));
1932        headers.insert(header::RANGE, HeaderValue::from_static("bytes=0-1"));
1933        headers.insert(header::IF_MATCH, HeaderValue::from_static("\"abc\""));
1934        headers.insert(header::IF_NONE_MATCH, HeaderValue::from_static("\"abc\""));
1935        headers.insert(header::IF_RANGE, HeaderValue::from_static("\"abc\""));
1936        headers.insert(
1937            header::IF_MODIFIED_SINCE,
1938            HeaderValue::from_static("Wed, 21 Oct 2015 07:28:00 GMT"),
1939        );
1940        headers.insert(
1941            header::IF_UNMODIFIED_SINCE,
1942            HeaderValue::from_static("Wed, 21 Oct 2015 07:28:00 GMT"),
1943        );
1944        headers.insert(header::EXPECT, HeaderValue::from_static("100-continue"));
1945        headers.insert("sec-fetch-mode", HeaderValue::from_static("cors"));
1946        headers.insert("sec-ch-ua", HeaderValue::from_static("\"Chromium\""));
1947        headers.insert("dnt", HeaderValue::from_static("1"));
1948        headers.insert(
1949            header::ORIGIN,
1950            HeaderValue::from_static("https://example.com"),
1951        );
1952        headers.insert(
1953            header::REFERER,
1954            HeaderValue::from_static("https://example.com/"),
1955        );
1956        headers.insert(header::USER_AGENT, HeaderValue::from_static("curl"));
1957        headers.insert(header::SERVER, HeaderValue::from_static("fake"));
1958        headers.insert(
1959            header::DATE,
1960            HeaderValue::from_static("Wed, 21 Oct 2015 07:28:00 GMT"),
1961        );
1962        headers.insert(header::AGE, HeaderValue::from_static("10"));
1963        headers.insert(header::VARY, HeaderValue::from_static("*"));
1964        headers.insert("via", HeaderValue::from_static("1.1 proxy"));
1965        headers.insert("forwarded", HeaderValue::from_static("for=127.0.0.1"));
1966        headers.insert("x-forwarded-for", HeaderValue::from_static("127.0.0.1"));
1967        headers.insert("x-forwarded-host", HeaderValue::from_static("example.com"));
1968        headers.insert("x-forwarded-proto", HeaderValue::from_static("https"));
1969        headers.insert("x-real-ip", HeaderValue::from_static("203.0.113.7"));
1970        headers.insert("true-client-ip", HeaderValue::from_static("203.0.113.7"));
1971        headers.insert("client-ip", HeaderValue::from_static("203.0.113.7"));
1972        headers.insert("clear-site-data", HeaderValue::from_static("\"cookies\""));
1973        // Distributed tracing pollutants (W3C + Zipkin + AWS X-Ray).
1974        // Auto-injected by APM agents; if persisted, next reader
1975        // replays writer's trace ID and corrupts downstream tracing.
1976        headers.insert(
1977            "traceparent",
1978            HeaderValue::from_static("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"),
1979        );
1980        headers.insert(
1981            "tracestate",
1982            HeaderValue::from_static("rojo=00f067aa0ba902b7"),
1983        );
1984        headers.insert(
1985            "baggage",
1986            HeaderValue::from_static("userId=alice,serverNode=DF%2028"),
1987        );
1988        headers.insert(
1989            "b3",
1990            HeaderValue::from_static("80f198ee56343ba864fe8b2a57d3eff7-e457b5a2e4d86bd1-1"),
1991        );
1992        headers.insert(
1993            "x-b3-traceid",
1994            HeaderValue::from_static("80f198ee56343ba864fe8b2a57d3eff7"),
1995        );
1996        headers.insert("x-b3-spanid", HeaderValue::from_static("e457b5a2e4d86bd1"));
1997        headers.insert("x-b3-sampled", HeaderValue::from_static("1"));
1998        headers.insert(
1999            "x-amzn-trace-id",
2000            HeaderValue::from_static("Root=1-5759e988-bd862e3fe1be46a994272793"),
2001        );
2002        headers.insert("cf-ray", HeaderValue::from_static("8f1234567abcdef0-IAD"));
2003        headers.insert("cf-connecting-ip", HeaderValue::from_static("203.0.113.7"));
2004        headers.insert(
2005            "cf-visitor",
2006            HeaderValue::from_static("{\"scheme\":\"https\"}"),
2007        );
2008        // HTTP transport version markers.
2009        headers.insert(
2010            "http3-settings",
2011            HeaderValue::from_static("AAMAAABkAARAAgAAAAAEAIAAAAA"),
2012        );
2013        headers.insert("pragma", HeaderValue::from_static("no-cache"));
2014
2015        let allowlist = hs::HeaderAllowlist::empty();
2016        let user_deny = hs::HeaderAllowlist::empty();
2017        let meta = hs::request_meta_headers(&headers, &allowlist, &user_deny);
2018        let has = |name: &str| meta.iter().any(|(n, _)| n == name);
2019
2020        assert!(meta.contains(&("content-encoding".to_string(), "gzip".to_string())));
2021        assert!(meta.contains(&("content-language".to_string(), "zh-CN".to_string())));
2022        assert!(meta.contains(&(
2023            "content-disposition".to_string(),
2024            "attachment; filename=\"report.pdf\"".to_string()
2025        )));
2026        assert!(meta.contains(&("cache-control".to_string(), "max-age=60".to_string())));
2027        assert!(meta.contains(&("access-control-allow-origin".to_string(), "*".to_string())));
2028        assert!(meta.contains(&(
2029            "content-security-policy".to_string(),
2030            "default-src 'self'".to_string()
2031        )));
2032        assert!(meta.contains(&("x-frame-options".to_string(), "DENY".to_string())));
2033        assert!(meta.contains(&("permissions-policy".to_string(), "camera=()".to_string())));
2034        assert!(meta.contains(&(
2035            "cross-origin-resource-policy".to_string(),
2036            "same-origin".to_string()
2037        )));
2038        // Custom headers (no `x-` shortcut, no auto-allowlist for
2039        // x-meta-*) MUST NOT persist under the empty allowlist.
2040        assert!(
2041            !has("x-meta-author"),
2042            "x-meta-* is opt-in via ELASTIK_PERSIST_HEADERS"
2043        );
2044        assert!(
2045            !has("x-future-http-thing"),
2046            "unknown x- headers default-deny"
2047        );
2048
2049        for name in [
2050            "authorization",
2051            "proxy-authorization",
2052            "cookie",
2053            "set-cookie",
2054            "host",
2055            "connection",
2056            "keep-alive",
2057            "transfer-encoding",
2058            "te",
2059            "trailer",
2060            "upgrade",
2061            "http2-settings",
2062            "content-type",
2063            "content-length",
2064            "etag",
2065            "allow",
2066            "location",
2067            "link",
2068            "www-authenticate",
2069            "accept",
2070            "accept-encoding",
2071            "accept-language",
2072            "accept-charset",
2073            "range",
2074            "if-match",
2075            "if-none-match",
2076            "if-range",
2077            "if-modified-since",
2078            "if-unmodified-since",
2079            "expect",
2080            "sec-fetch-mode",
2081            "sec-ch-ua",
2082            "dnt",
2083            "origin",
2084            "referer",
2085            "user-agent",
2086            "server",
2087            "date",
2088            "age",
2089            "vary",
2090            "x-request-id",
2091            "x-elapsed-us",
2092            "x-elapsed-ms",
2093            "x-content-type-options",
2094            "via",
2095            "forwarded",
2096            "x-forwarded-for",
2097            "x-forwarded-host",
2098            "x-forwarded-proto",
2099            "x-real-ip",
2100            "true-client-ip",
2101            "client-ip",
2102            "clear-site-data",
2103            // Distributed tracing pollutants.
2104            "traceparent",
2105            "tracestate",
2106            "baggage",
2107            "b3",
2108            "x-b3-traceid",
2109            "x-b3-spanid",
2110            "x-b3-sampled",
2111            // Cloud-provider runtime injections.
2112            "x-amzn-trace-id",
2113            "cf-ray",
2114            "cf-connecting-ip",
2115            "cf-visitor",
2116            // Transport version markers + HTTP/1.0 living fossil.
2117            "http3-settings",
2118            "pragma",
2119        ] {
2120            assert!(!has(name), "{name} should not be persisted");
2121        }
2122    }
2123
2124    #[test]
2125    fn request_meta_headers_deduplicate_repeated_names_last_wins() {
2126        let mut headers = HeaderMap::new();
2127        headers.append("x-meta-author", HeaderValue::from_static("alice"));
2128        headers.append("x-meta-author", HeaderValue::from_static("bob"));
2129
2130        // Allowlist x-meta-* so the dedup behaviour is observable;
2131        // the empty allowlist would correctly drop both entries.
2132        let allowlist = hs::HeaderAllowlist::parse("x-meta-*");
2133        let user_deny = hs::HeaderAllowlist::empty();
2134        let meta = hs::request_meta_headers(&headers, &allowlist, &user_deny);
2135
2136        assert_eq!(meta, vec![("x-meta-author".to_string(), "bob".to_string())]);
2137    }
2138
2139    #[test]
2140    fn request_meta_headers_user_allowlist_persists_custom_headers() {
2141        // Layer 3 (user allowlist) opens custom-header round-trip
2142        // on top of the default-allow set. Exact match and prefix
2143        // match (`x-my-*`) both work.
2144        let mut headers = HeaderMap::new();
2145        headers.insert("x-author", HeaderValue::from_static("ranger"));
2146        headers.insert("x-version", HeaderValue::from_static("7.1.0"));
2147        headers.insert("x-my-tag", HeaderValue::from_static("custom"));
2148        headers.insert("x-my-region", HeaderValue::from_static("ap-east-1"));
2149        headers.insert("x-other", HeaderValue::from_static("not-allowlisted"));
2150        // L1 hard deny -- must lose to user allowlist below.
2151        headers.insert(
2152            "traceparent",
2153            HeaderValue::from_static("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"),
2154        );
2155        // L2 default-allow stays on top of L3.
2156        headers.insert(
2157            header::CACHE_CONTROL,
2158            HeaderValue::from_static("max-age=60"),
2159        );
2160
2161        let allowlist = hs::HeaderAllowlist::parse("x-author,x-version,x-my-*,traceparent");
2162        let user_deny = hs::HeaderAllowlist::empty();
2163        let meta = hs::request_meta_headers(&headers, &allowlist, &user_deny);
2164        let has = |name: &str| meta.iter().any(|(n, _)| n == name);
2165
2166        // L3 named exact: persisted.
2167        assert!(has("x-author"));
2168        assert!(has("x-version"));
2169        // L3 prefix wildcard: persisted.
2170        assert!(has("x-my-tag"));
2171        assert!(has("x-my-region"));
2172        // Not allowlisted -> default-deny (L3 didn't match, L2 doesn't cover).
2173        assert!(!has("x-other"));
2174        // L1 hard deny ALWAYS wins -- even if traceparent is in the
2175        // user allowlist, it never persists.
2176        assert!(
2177            !has("traceparent"),
2178            "L1 hard deny must override user allowlist; tracing context never persists"
2179        );
2180        // L2 default-allow still applies alongside L3.
2181        assert!(has("cache-control"));
2182    }
2183
2184    #[test]
2185    fn request_meta_headers_user_deny_subtracts_from_default_allow() {
2186        // Layer 1.5 (user deny / ELASTIK_DENY_HEADERS) lets an
2187        // operator subtract a header from the built-in
2188        // DEFAULT_PERSIST_HEADERS without recompiling. Order:
2189        //   L1 hard deny > L1.5 user deny > L2 default > L3 user allow.
2190        let mut headers = HeaderMap::new();
2191        // L2 defaults that should normally persist:
2192        headers.insert(
2193            header::CACHE_CONTROL,
2194            HeaderValue::from_static("max-age=60"),
2195        );
2196        headers.insert(header::CONTENT_ENCODING, HeaderValue::from_static("gzip"));
2197        headers.insert("permissions-policy", HeaderValue::from_static("camera=()"));
2198        // L3-allowlisted custom that user_deny should also win over:
2199        headers.insert("x-author", HeaderValue::from_static("ranger"));
2200
2201        // Operator says: drop cache-control + permissions-policy
2202        // from defaults; also kill any allowlisted x-author (user
2203        // deny beats user allow).
2204        let allowlist = hs::HeaderAllowlist::parse("x-author");
2205        let user_deny = hs::HeaderAllowlist::parse("cache-control,permissions-policy,x-author");
2206        let meta = hs::request_meta_headers(&headers, &allowlist, &user_deny);
2207        let has = |name: &str| meta.iter().any(|(n, _)| n == name);
2208
2209        // L2 defaults the operator denied -> dropped.
2210        assert!(
2211            !has("cache-control"),
2212            "user-deny must subtract from L2 default-allow"
2213        );
2214        assert!(!has("permissions-policy"));
2215        // L2 default not in user_deny -> still persisted.
2216        assert!(has("content-encoding"));
2217        // L3 allowlisted but also user_denied -> deny wins.
2218        assert!(
2219            !has("x-author"),
2220            "user-deny must override user-allow when both match"
2221        );
2222    }
2223
2224    #[test]
2225    fn request_meta_headers_is_case_insensitive_per_rfc_7230() {
2226        // RFC 7230: HTTP header field names are case-insensitive.
2227        // axum's HeaderMap canonicalizes incoming names to
2228        // lowercase via `HeaderName::as_str`; this test pins that
2229        // axum-side normalization PLUS verifies that the
2230        // env-var-side parser lowercases its input.
2231        let mut headers = HeaderMap::new();
2232        // Insert with mixed case -- axum stores keys in lowercase
2233        // canonical form, so the iteration in
2234        // `request_meta_headers` sees lowercase.
2235        headers.insert("X-Author", HeaderValue::from_static("ranger"));
2236        headers.insert("CACHE-CONTROL", HeaderValue::from_static("max-age=60"));
2237        headers.insert("Traceparent", HeaderValue::from_static("00-...-01"));
2238        headers.insert("X-Forwarded-For", HeaderValue::from_static("203.0.113.7"));
2239
2240        // Operator wrote ELASTIK_PERSIST_HEADERS with mixed case --
2241        // parser lowercases.
2242        let allowlist = hs::HeaderAllowlist::parse("X-AUTHOR, X-Version");
2243        let user_deny = hs::HeaderAllowlist::empty();
2244        let meta = hs::request_meta_headers(&headers, &allowlist, &user_deny);
2245        let has = |name: &str| meta.iter().any(|(n, _)| n == name);
2246
2247        // Mixed-case allowlist entry persists the (lowercase-stored)
2248        // input header.
2249        assert!(has("x-author"), "X-Author allowlisted as X-AUTHOR persists");
2250        // L2 default catches CACHE-CONTROL regardless of input case.
2251        assert!(has("cache-control"));
2252        // L1 hard-deny still blocks tracing context regardless of
2253        // input case.
2254        assert!(!has("traceparent"));
2255        assert!(!has("x-forwarded-for"));
2256
2257        // Stored header names are always lowercase canonical form.
2258        for (name, _) in &meta {
2259            assert_eq!(
2260                name,
2261                &name.to_ascii_lowercase(),
2262                "stored meta keys must be lowercase"
2263            );
2264        }
2265    }
2266
2267    #[test]
2268    fn header_allowlist_parser_handles_whitespace_dedup_and_wildcards() {
2269        // Whitespace per entry trimmed; case folded; duplicates de-duped;
2270        // empty / all-* entries skipped.
2271        let allow = hs::HeaderAllowlist::parse(
2272            "  X-Author , x-version, x-my-*, x-version, , *, x-my-* , x-AUTHOR",
2273        );
2274        assert!(allow.matches("x-author"));
2275        assert!(allow.matches("x-version"));
2276        assert!(allow.matches("x-my-tag"));
2277        assert!(allow.matches("x-my-region"));
2278        assert!(!allow.matches("x-other"));
2279        assert!(!allow.matches(""));
2280        // A bare `*` in the env doesn't open the floodgates.
2281        assert!(!allow.matches("anything-else"));
2282
2283        // Empty input == empty allowlist.
2284        assert!(hs::HeaderAllowlist::parse("").is_empty());
2285        assert!(hs::HeaderAllowlist::parse("   ,  ,").is_empty());
2286    }
2287
2288    #[tokio::test]
2289    async fn get_returns_stored_standard_representation_headers() {
2290        let (core, dir) = test_core("representation-headers");
2291        let headers = vec![
2292            ("content-encoding".to_string(), "gzip".to_string()),
2293            (
2294                "content-disposition".to_string(),
2295                "attachment; filename=\"report.pdf\"".to_string(),
2296            ),
2297            ("access-control-allow-origin".to_string(), "*".to_string()),
2298            (
2299                "content-security-policy".to_string(),
2300                "default-src 'self'".to_string(),
2301            ),
2302            ("x-frame-options".to_string(), "DENY".to_string()),
2303        ];
2304        core.write_world(
2305            "home/gzip",
2306            b"compressed bytes",
2307            "application/pdf",
2308            &headers,
2309        )
2310        .unwrap();
2311
2312        let req_headers = HeaderMap::new();
2313        let resp = unwrap_response(
2314            execute_get(
2315                req_headers.clone(),
2316                auth::Tier::Anon,
2317                world_path("home/gzip"),
2318                &core,
2319                &TraceCtx::disabled(),
2320            )
2321            .await,
2322        );
2323        assert_eq!(resp.status(), StatusCode::OK);
2324        assert_eq!(
2325            resp.headers().get(header::CONTENT_ENCODING).unwrap(),
2326            "gzip"
2327        );
2328        assert_eq!(
2329            resp.headers().get(header::CONTENT_DISPOSITION).unwrap(),
2330            "attachment; filename=\"report.pdf\""
2331        );
2332        assert_eq!(
2333            resp.headers().get("access-control-allow-origin").unwrap(),
2334            "*"
2335        );
2336        assert_eq!(
2337            resp.headers().get("content-security-policy").unwrap(),
2338            "default-src 'self'"
2339        );
2340        assert_eq!(resp.headers().get("x-frame-options").unwrap(), "DENY");
2341
2342        let _ = std::fs::remove_dir_all(dir);
2343    }
2344
2345    #[test]
2346    fn worlds_store_content_type_not_private_extensions() {
2347        let (core, dir) = test_core("content-type");
2348        core.write_world("home/pdf", b"%PDF-1.7", "application/pdf", &[])
2349            .unwrap();
2350
2351        let stage = core.read_world("home/pdf").unwrap().unwrap();
2352        assert_eq!(stage.content_type, "application/pdf");
2353        assert_eq!(stage.body, b"%PDF-1.7");
2354
2355        let _ = std::fs::remove_dir_all(dir);
2356    }
2357
2358    #[test]
2359    fn storage_prefix_routes_memory_and_disk_modes() {
2360        assert!(!store::is_memory_world("home/report"));
2361        assert!(!store::is_memory_world("etc/config"));
2362        assert!(store::is_memory_world("tmp/scratch"));
2363        assert!(store::is_memory_world("dev/fb0"));
2364        assert!(store::is_memory_world("sys/status"));
2365        assert!(store::is_persistent("home/report"));
2366        assert!(!store::is_persistent("tmp/scratch"));
2367    }
2368
2369    #[test]
2370    fn memory_worlds_do_not_create_sqlite_files_or_audit_chain() {
2371        let (core, dir) = test_core("memory-world");
2372        core.write_world(
2373            "tmp/scratch",
2374            b"draft",
2375            "text/plain; charset=utf-8",
2376            &[("x-meta-owner".to_string(), "agent".to_string())],
2377        )
2378        .unwrap();
2379
2380        let stage = core.read_world("tmp/scratch").unwrap().unwrap();
2381        assert_eq!(stage.body, b"draft");
2382        assert_eq!(stage.content_type, "text/plain; charset=utf-8");
2383        assert_eq!(
2384            stage.headers,
2385            vec![("x-meta-owner".to_string(), "agent".to_string())]
2386        );
2387        assert!(!world::world_db(&core.data, "tmp/scratch").exists());
2388        assert!(audit::latest_hmac(&core.data, "tmp/scratch").is_none());
2389
2390        let names = store::list_all(&core.data, &core.mem);
2391        assert_eq!(names.unwrap(), vec!["tmp/scratch".to_string()]);
2392
2393        let _ = std::fs::remove_dir_all(dir);
2394    }
2395
2396    #[test]
2397    fn disk_worlds_create_sqlite_files_and_audit_chain_when_using_audit_path() {
2398        let (core, dir) = test_core("disk-world");
2399        let h = world::write_with_audit(
2400            &core.data,
2401            "home/report",
2402            b"final",
2403            "text/plain; charset=utf-8",
2404            &[],
2405            &core.hmac_key,
2406        )
2407        .unwrap();
2408
2409        let stage = core.read_world("home/report").unwrap().unwrap();
2410        assert_eq!(stage.body, b"final");
2411        assert!(world::world_db(&core.data, "home/report").exists());
2412        assert_eq!(audit::latest_hmac(&core.data, "home/report"), Some(h));
2413
2414        let names = store::list_all(&core.data, &core.mem);
2415        assert_eq!(names.unwrap(), vec!["home/report".to_string()]);
2416
2417        let _ = std::fs::remove_dir_all(dir);
2418    }
2419
2420    #[test]
2421    fn audit_keeps_historical_metadata_without_json_payload() {
2422        let (core, dir) = test_core("audit-meta");
2423        let headers = vec![("x-meta-author".to_string(), "ranger".to_string())];
2424        let h = world::write_with_audit(
2425            &core.data,
2426            "home/audit-meta",
2427            b"hello",
2428            "text/plain; charset=utf-8",
2429            &headers,
2430            &core.hmac_key,
2431        )
2432        .unwrap();
2433
2434        let c = rusqlite::Connection::open(world::world_db(&core.data, "home/audit-meta")).unwrap();
2435        let (content_type, meta_sha256): (String, String) = c
2436            .query_row(
2437                "SELECT content_type, meta_sha256 FROM events WHERE hmac=?",
2438                [h],
2439                |r| Ok((r.get(0)?, r.get(1)?)),
2440            )
2441            .unwrap();
2442        assert_eq!(content_type, "text/plain; charset=utf-8");
2443        assert_eq!(
2444            meta_sha256,
2445            audit::meta_sha256("text/plain; charset=utf-8", &headers)
2446        );
2447
2448        let author: String = c
2449            .query_row(
2450                "SELECT value FROM event_headers WHERE name='x-meta-author'",
2451                [],
2452                |r| r.get(0),
2453            )
2454            .unwrap();
2455        assert_eq!(author, "ranger");
2456
2457        let _ = std::fs::remove_dir_all(dir);
2458    }
2459
2460    #[tokio::test]
2461    async fn post_audit_uses_existing_representation_metadata() {
2462        let (core, dir) = test_core("post-audit-meta");
2463        let headers = vec![
2464            ("content-encoding".to_string(), "gzip".to_string()),
2465            ("x-meta-author".to_string(), "ranger".to_string()),
2466        ];
2467        world::write_with_audit(
2468            &core.data,
2469            "home/post-audit-meta",
2470            b"hello",
2471            "text/plain",
2472            &headers,
2473            &core.hmac_key,
2474        )
2475        .unwrap();
2476
2477        let mut req_headers = HeaderMap::new();
2478        req_headers.insert(
2479            header::CONTENT_TYPE,
2480            HeaderValue::from_static("application/pdf"),
2481        );
2482        req_headers.insert(header::CONTENT_LANGUAGE, HeaderValue::from_static("zh-CN"));
2483        let resp = unwrap_response(
2484            execute_post(
2485                req_headers.clone(),
2486                Bytes::from_static(b" world"),
2487                auth::Tier::Write,
2488                world_path("home/post-audit-meta"),
2489                &core,
2490                &TraceCtx::disabled(),
2491            )
2492            .await,
2493        );
2494        assert_eq!(resp.status(), StatusCode::OK);
2495
2496        let c = rusqlite::Connection::open(world::world_db(&core.data, "home/post-audit-meta"))
2497            .unwrap();
2498        let (content_type, meta_sha256): (String, String) = c
2499            .query_row(
2500                "SELECT content_type, meta_sha256 FROM events WHERE event_type='append'",
2501                [],
2502                |r| Ok((r.get(0)?, r.get(1)?)),
2503            )
2504            .unwrap();
2505        assert_eq!(content_type, "text/plain");
2506        assert_eq!(meta_sha256, audit::meta_sha256("text/plain", &headers));
2507
2508        let language_count: i64 = c
2509            .query_row(
2510                "SELECT COUNT(*) FROM event_headers WHERE name='content-language'",
2511                [],
2512                |r| r.get(0),
2513            )
2514            .unwrap();
2515        assert_eq!(language_count, 0);
2516
2517        let _ = std::fs::remove_dir_all(dir);
2518    }
2519
2520    #[tokio::test]
2521    async fn delete_honors_if_match_before_audit_or_remove() {
2522        let (core, dir) = test_core("delete-if-match");
2523        let core = Arc::new(core);
2524        let state = server_state_for_tests(core.clone());
2525        let h = world::write_with_audit(
2526            &core.data,
2527            "home/delete-cas",
2528            b"alive",
2529            "text/plain; charset=utf-8",
2530            &[],
2531            &core.hmac_key,
2532        )
2533        .unwrap();
2534
2535        let mut stale = HeaderMap::new();
2536        stale.insert(header::IF_MATCH, HeaderValue::from_static("\"hmac-stale\""));
2537        let resp = unwrap_response(
2538            execute_delete(
2539                stale.clone(),
2540                auth::Tier::Approve,
2541                world_path("home/delete-cas"),
2542                &state,
2543                &TraceCtx::disabled(),
2544            )
2545            .await,
2546        );
2547        assert_eq!(resp.status(), StatusCode::PRECONDITION_FAILED);
2548        assert!(core.read_world("home/delete-cas").unwrap().is_some());
2549
2550        let mut good = HeaderMap::new();
2551        good.insert(
2552            header::IF_MATCH,
2553            HeaderValue::from_str(&format!("\"{}\"", et::hmac_etag(&h))).unwrap(),
2554        );
2555        let resp = unwrap_response(
2556            execute_delete(
2557                good.clone(),
2558                auth::Tier::Approve,
2559                world_path("home/delete-cas"),
2560                &state,
2561                &TraceCtx::disabled(),
2562            )
2563            .await,
2564        );
2565        assert_eq!(resp.status(), StatusCode::NO_CONTENT);
2566        assert!(core.read_world("home/delete-cas").unwrap().is_none());
2567        assert!(core.read_world("var/log/deletes").unwrap().is_some());
2568        assert!(matches!(
2569            core.cached_verify_chain("var/log/deletes").unwrap(),
2570            Some(audit::VerifyReport::Valid(_))
2571        ));
2572        let ledger = world::open_existing(&core.data, "var/log/deletes")
2573            .unwrap()
2574            .unwrap();
2575        let mut stmt = ledger
2576            .prepare("SELECT event_type FROM events ORDER BY id")
2577            .unwrap();
2578        let events: Vec<String> = stmt
2579            .query_map([], |r| r.get(0))
2580            .unwrap()
2581            .collect::<Result<_, _>>()
2582            .unwrap();
2583        assert_eq!(events, vec!["delete_intent", "delete_commit"]);
2584
2585        let _ = std::fs::remove_dir_all(dir);
2586    }
2587
2588    #[tokio::test]
2589    async fn delete_returns_500_when_commit_audit_fails_after_physical_delete() {
2590        let (core, dir) = test_core("delete-commit-audit-fail");
2591        let core = Arc::new(core);
2592        let state = server_state_for_tests(core.clone());
2593        core.write_world("home/delete-degraded", b"alive", "text/plain", &[])
2594            .unwrap();
2595        world::write_with_audit(
2596            &core.data,
2597            "var/log/deletes",
2598            b"ledger",
2599            "text/plain",
2600            &[],
2601            &core.hmac_key,
2602        )
2603        .unwrap();
2604        core.delete_ledger_created.store(true, Ordering::Relaxed);
2605        {
2606            let c =
2607                rusqlite::Connection::open(world::world_db(&core.data, "var/log/deletes")).unwrap();
2608            c.execute_batch(
2609                r#"
2610                CREATE TRIGGER fail_delete_commit
2611                BEFORE INSERT ON events
2612                WHEN NEW.event_type='delete_commit'
2613                BEGIN
2614                    SELECT RAISE(FAIL, 'delete_commit blocked');
2615                END;
2616                "#,
2617            )
2618            .unwrap();
2619        }
2620
2621        let resp = unwrap_response(
2622            execute_delete(
2623                HeaderMap::new(),
2624                auth::Tier::Approve,
2625                world_path("home/delete-degraded"),
2626                &state,
2627                &TraceCtx::disabled(),
2628            )
2629            .await,
2630        );
2631
2632        assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR);
2633        assert!(core.read_world("home/delete-degraded").unwrap().is_none());
2634        let ledger = world::open_existing(&core.data, "var/log/deletes")
2635            .unwrap()
2636            .unwrap();
2637        let events = ledger
2638            .prepare("SELECT event_type FROM events ORDER BY id")
2639            .unwrap()
2640            .query_map([], |r| r.get::<_, String>(0))
2641            .unwrap()
2642            .collect::<Result<Vec<_>, _>>()
2643            .unwrap();
2644        assert_eq!(events, vec!["put", "delete_intent", "delete_commit_failed"]);
2645
2646        let _ = std::fs::remove_dir_all(dir);
2647    }
2648
2649    #[tokio::test]
2650    async fn delete_rejects_auth_token_and_append_only_ledger() {
2651        let (core, dir) = test_core("delete-policy");
2652        let core = Arc::new(core);
2653        let state = server_state_for_tests(core.clone());
2654        world::write_with_audit(
2655            &core.data,
2656            "home/delete-policy",
2657            b"alive",
2658            "text/plain; charset=utf-8",
2659            &[],
2660            &core.hmac_key,
2661        )
2662        .unwrap();
2663        world::write_with_audit(
2664            &core.data,
2665            "var/log/deletes",
2666            b"ledger",
2667            "text/plain; charset=utf-8",
2668            &[],
2669            &core.hmac_key,
2670        )
2671        .unwrap();
2672        let headers = HeaderMap::new();
2673
2674        let auth_delete = unwrap_response(
2675            execute_delete(
2676                headers.clone(),
2677                auth::Tier::Write,
2678                world_path("home/delete-policy"),
2679                &state,
2680                &TraceCtx::disabled(),
2681            )
2682            .await,
2683        );
2684        assert_eq!(auth_delete.status(), StatusCode::UNAUTHORIZED);
2685        assert!(core.read_world("home/delete-policy").unwrap().is_some());
2686
2687        let ledger_delete = unwrap_response(
2688            execute_delete(
2689                headers.clone(),
2690                auth::Tier::Approve,
2691                world_path("var/log/deletes"),
2692                &state,
2693                &TraceCtx::disabled(),
2694            )
2695            .await,
2696        );
2697        assert_eq!(ledger_delete.status(), StatusCode::UNAUTHORIZED);
2698        assert_eq!(
2699            ledger_delete
2700                .headers()
2701                .get(header::WWW_AUTHENTICATE)
2702                .unwrap(),
2703            "Bearer realm=\"elastik\""
2704        );
2705        assert_eq!(
2706            ledger_delete.headers().get(header::CONTENT_TYPE).unwrap(),
2707            "text/plain; charset=utf-8"
2708        );
2709        assert_eq!(
2710            response_text(ledger_delete).await,
2711            "auth required: delete ledger is append-only\n"
2712        );
2713        assert!(core.read_world("var/log/deletes").unwrap().is_some());
2714
2715        let _ = std::fs::remove_dir_all(dir);
2716    }
2717
2718    #[tokio::test]
2719    async fn delete_missing_world_does_not_write_delete_ledger() {
2720        let (core, dir) = test_core("delete-missing");
2721        let core = Arc::new(core);
2722        let state = server_state_for_tests(core.clone());
2723        let headers = HeaderMap::new();
2724        let resp = unwrap_response(
2725            execute_delete(
2726                headers.clone(),
2727                auth::Tier::Approve,
2728                world_path("home/missing"),
2729                &state,
2730                &TraceCtx::disabled(),
2731            )
2732            .await,
2733        );
2734        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2735        assert!(core.read_world("var/log/deletes").unwrap().is_none());
2736
2737        let _ = std::fs::remove_dir_all(dir);
2738    }
2739
2740    #[test]
2741    fn proc_worlds_body_is_plain_lines() {
2742        assert_eq!(world_list_body(&[]), "");
2743        assert_eq!(
2744            world_list_body(&["home/a".to_owned(), "tmp/b".to_owned()]),
2745            "home/a\ntmp/b\n"
2746        );
2747    }
2748
2749    #[tokio::test]
2750    async fn proc_du_and_df_report_resource_usage() {
2751        let (mut core, dir) = test_core("proc-du-df");
2752        core.max_storage_bytes = Some(10);
2753        core.write_world("home/hello", b"hello", "text/plain", &[])
2754            .unwrap();
2755        core.write_world("tmp/scratch", b"data", "text/plain", &[])
2756            .unwrap();
2757        let state = Arc::new(core);
2758        let headers = HeaderMap::new();
2759
2760        let du = proc_du(
2761            State(server_state_for_tests(state.clone())),
2762            Method::GET,
2763            headers.clone(),
2764        )
2765        .await;
2766        assert_eq!(du.status(), StatusCode::OK);
2767        let du_body = response_text(du).await;
2768        assert!(du_body.contains("home/hello\t5\n"));
2769        assert!(du_body.contains("tmp/scratch\t4\n"));
2770
2771        let df = proc_df(
2772            State(server_state_for_tests(state.clone())),
2773            Method::GET,
2774            headers.clone(),
2775        )
2776        .await;
2777        assert_eq!(df.status(), StatusCode::OK);
2778        let df_body = response_text(df).await;
2779        assert!(df_body.contains("storage\t5\t10\t5\n"));
2780        assert!(df_body.contains("memory\t4\t268435456\t268435452\n"));
2781        assert!(df_body.contains("worlds\t2\tunlimited\tunlimited\n"));
2782
2783        let head = proc_du(State(server_state_for_tests(state)), Method::HEAD, headers).await;
2784        assert_eq!(head.status(), StatusCode::OK);
2785        assert_eq!(head.headers().get(header::CONTENT_LENGTH).unwrap(), "27");
2786        assert_eq!(response_text(head).await, "");
2787
2788        let _ = std::fs::remove_dir_all(dir);
2789    }
2790
2791    #[tokio::test]
2792    async fn proc_du_and_df_require_read_token_when_enabled() {
2793        let (mut core, dir) = test_core("proc-du-df-read-token");
2794        core.tokens.read = auth::NonEmptyBytes::new(b"reader".to_vec());
2795        let state = Arc::new(core);
2796        let headers = HeaderMap::new();
2797
2798        let du = proc_du(
2799            State(server_state_for_tests(state.clone())),
2800            Method::GET,
2801            headers.clone(),
2802        )
2803        .await;
2804        assert_eq!(du.status(), StatusCode::UNAUTHORIZED);
2805
2806        let df = proc_df(
2807            State(server_state_for_tests(state.clone())),
2808            Method::GET,
2809            headers,
2810        )
2811        .await;
2812        assert_eq!(df.status(), StatusCode::UNAUTHORIZED);
2813
2814        let mut auth_headers = HeaderMap::new();
2815        let auth_value =
2816            HeaderValue::from_str(&format!("{} {}", "Bearer", "reader")).expect("valid test auth");
2817        auth_headers.insert(header::AUTHORIZATION, auth_value);
2818        let authorized = proc_df(
2819            State(server_state_for_tests(state)),
2820            Method::GET,
2821            auth_headers,
2822        )
2823        .await;
2824        assert_eq!(authorized.status(), StatusCode::OK);
2825
2826        let _ = std::fs::remove_dir_all(dir);
2827    }
2828
2829    #[tokio::test]
2830    async fn proc_df_world_count_tracks_durable_put_and_delete() {
2831        let (core, dir) = test_core("proc-df-world-count");
2832        let headers = HeaderMap::new();
2833
2834        let put = unwrap_response(
2835            execute_put(
2836                headers.clone(),
2837                Bytes::from_static(b"x"),
2838                auth::Tier::Write,
2839                world_path("home/count"),
2840                &core,
2841                &TraceCtx::disabled(),
2842            )
2843            .await,
2844        );
2845        assert_eq!(put.status(), StatusCode::CREATED);
2846
2847        let state = Arc::new(core);
2848        let server_state = server_state_for_tests(state.clone());
2849        let before = proc_df(State(server_state.clone()), Method::GET, headers.clone()).await;
2850        assert!(response_text(before)
2851            .await
2852            .contains("worlds\t1\tunlimited\tunlimited\n"));
2853
2854        let delete = unwrap_response(
2855            execute_delete(
2856                headers.clone(),
2857                auth::Tier::Approve,
2858                world_path("home/count"),
2859                &server_state,
2860                &TraceCtx::disabled(),
2861            )
2862            .await,
2863        );
2864        assert_eq!(delete.status(), StatusCode::NO_CONTENT);
2865
2866        let after = proc_df(State(server_state), Method::GET, headers).await;
2867        let after_body = response_text(after).await;
2868        assert!(after_body.contains("storage\t0\tunlimited\tunlimited\n"));
2869        assert!(after_body.contains("worlds\t0\tunlimited\tunlimited\n"));
2870
2871        let _ = std::fs::remove_dir_all(dir);
2872    }
2873
2874    #[tokio::test]
2875    async fn proc_pool_emits_metrics_with_type_labels() {
2876        // Warm the cache via a PUT + GET, then assert the metrics
2877        // body has the right counter / snapshot labels and tracks
2878        // hits + misses correctly. After a DELETE the
2879        // `ledger_writer_inits` counter must bump from 0 to 1
2880        // (lazy-init fired exactly once).
2881        let (core, dir) = test_core("proc-pool-metrics");
2882        let headers = HeaderMap::new();
2883
2884        let put = unwrap_response(
2885            execute_put(
2886                headers.clone(),
2887                Bytes::from_static(b"hello"),
2888                auth::Tier::Write,
2889                world_path("home/m"),
2890                &core,
2891                &TraceCtx::disabled(),
2892            )
2893            .await,
2894        );
2895        assert_eq!(put.status(), StatusCode::CREATED);
2896
2897        // First GET = miss (Phase 3); second GET = hit (Phase 1).
2898        for _ in 0..2 {
2899            let get = unwrap_response(
2900                execute_get(
2901                    headers.clone(),
2902                    auth::Tier::Read,
2903                    world_path("home/m"),
2904                    &core,
2905                    &TraceCtx::disabled(),
2906                )
2907                .await,
2908            );
2909            assert_eq!(get.status(), StatusCode::OK);
2910        }
2911
2912        let state = Arc::new(core);
2913        let server_state = server_state_for_tests(state.clone());
2914        let resp = proc_pool(State(server_state.clone()), Method::GET, headers.clone()).await;
2915        let body = response_text(resp).await;
2916
2917        assert!(body.contains("read_cache_entries 1 snapshot\n"));
2918        assert!(body.contains("read_cache_tombstones 0 snapshot\n"));
2919        assert!(body.contains("read_cache_hits 1 counter\n"));
2920        assert!(body.contains("read_cache_misses 1 counter\n"));
2921        assert!(body.contains("read_cache_capped 0 counter\n"));
2922        assert!(body.contains("read_cache_evictions 0 counter\n"));
2923        assert!(body.contains("read_cache_open_fails 0 counter\n"));
2924        assert!(body.contains("read_cache_max_entries "));
2925        // No DELETE issued yet -- ledger writer never lazy-inited.
2926        assert!(body.contains("ledger_writer_inits 0 counter\n"));
2927
2928        // After a DELETE, the lazy-init fires exactly once
2929        // (Codex P3: counter not snapshot).
2930        let _ = unwrap_response(
2931            execute_delete(
2932                headers.clone(),
2933                auth::Tier::Approve,
2934                world_path("home/m"),
2935                &server_state,
2936                &TraceCtx::disabled(),
2937            )
2938            .await,
2939        );
2940        let resp2 = proc_pool(State(server_state), Method::GET, headers).await;
2941        let body2 = response_text(resp2).await;
2942        assert!(
2943            body2.contains("ledger_writer_inits 1 counter\n"),
2944            "expected counter to bump to 1 after first DELETE; body=\n{body2}"
2945        );
2946
2947        let _ = std::fs::remove_dir_all(dir);
2948    }
2949
2950    #[tokio::test]
2951    async fn proc_pool_reports_read_cache_eviction_values() {
2952        let (core, dir) = test_core_with_read_cache_max("proc-pool-eviction-values", 2);
2953        let headers = HeaderMap::new();
2954
2955        for world in ["home/a", "home/b", "home/c"] {
2956            let put = unwrap_response(
2957                execute_put(
2958                    headers.clone(),
2959                    Bytes::from_static(b"x"),
2960                    auth::Tier::Write,
2961                    world_path(world),
2962                    &core,
2963                    &TraceCtx::disabled(),
2964                )
2965                .await,
2966            );
2967            assert_eq!(put.status(), StatusCode::CREATED);
2968        }
2969
2970        for world in ["home/a", "home/b", "home/c"] {
2971            let get = unwrap_response(
2972                execute_get(
2973                    headers.clone(),
2974                    auth::Tier::Read,
2975                    world_path(world),
2976                    &core,
2977                    &TraceCtx::disabled(),
2978                )
2979                .await,
2980            );
2981            assert_eq!(get.status(), StatusCode::OK);
2982        }
2983
2984        let state = Arc::new(core);
2985        let server_state = server_state_for_tests(state);
2986        let resp = proc_pool(State(server_state), Method::GET, headers).await;
2987        let body = response_text(resp).await;
2988
2989        assert!(body.contains("read_cache_entries 2 snapshot\n"));
2990        assert!(body.contains("read_cache_hits 0 counter\n"));
2991        assert!(body.contains("read_cache_misses 3 counter\n"));
2992        assert!(body.contains("read_cache_capped 1 counter\n"));
2993        assert!(body.contains("read_cache_evictions 1 counter\n"));
2994        assert!(body.contains("read_cache_open_fails 0 counter\n"));
2995
2996        let _ = std::fs::remove_dir_all(dir);
2997    }
2998
2999    #[tokio::test]
3000    async fn proc_pool_requires_read_token_when_enabled() {
3001        // Codex P3 sub-finding: /proc/pool exposes read-cache and
3002        // ledger writer internals -- match the auth-deny coverage of
3003        // /proc/du and /proc/df. With a read token configured, an
3004        // unauthenticated GET must return 401, not leak metrics.
3005        let (mut core, dir) = test_core("proc-pool-read-token");
3006        core.tokens.read = auth::NonEmptyBytes::new(b"reader".to_vec());
3007        let state = Arc::new(core);
3008        let headers = HeaderMap::new();
3009
3010        let resp = proc_pool(State(server_state_for_tests(state)), Method::GET, headers).await;
3011        assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
3012
3013        let _ = std::fs::remove_dir_all(dir);
3014    }
3015
3016    #[cfg(feature = "multi-thread")]
3017    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
3018    async fn concurrent_first_deletes_increment_world_count_exactly_once() {
3019        // Bug 16 race coverage. Three concurrent DELETEs on a fresh
3020        // Core (delete_ledger_created starts false). The ledger gets
3021        // created on the first DELETE that wins the
3022        // `swap(true, AcqRel)` edge -- exactly one of them sees
3023        // was_first=true and bumps `durable_world_count`. The other
3024        // two see the swap-already-true and skip the bump.
3025        //
3026        // Setup: PUT three distinct worlds (durable_world_count = 3).
3027        // Then run three concurrent DELETEs in parallel.
3028        // Final state: durable_world_count = 3 + 1[ledger creation,
3029        // exactly once] - 3[three deletes succeed] = 1.
3030        //
3031        // Without Bug 16's `swap` ordering, two or all three of the
3032        // racing DELETEs would each independently observe
3033        // delete_ledger_created==false (via world_exists_blocking)
3034        // and each bump the counter, leading to drift. With the
3035        // atomic swap, only the unique false->true transition bumps.
3036        let (core, dir) = test_core("concurrent-first-deletes");
3037        let headers = HeaderMap::new();
3038        for w in ["home/a", "home/b", "home/c"] {
3039            let put = unwrap_response(
3040                execute_put(
3041                    headers.clone(),
3042                    Bytes::from_static(b"x"),
3043                    auth::Tier::Write,
3044                    world_path(w),
3045                    &core,
3046                    &TraceCtx::disabled(),
3047                )
3048                .await,
3049            );
3050            assert_eq!(put.status(), StatusCode::CREATED);
3051        }
3052        assert_eq!(core.durable_world_count.load(Ordering::Relaxed), 3);
3053        assert!(!core.delete_ledger_created.load(Ordering::Relaxed));
3054
3055        let state = Arc::new(core);
3056        let server_state = server_state_for_tests(state.clone());
3057        let s1 = server_state.clone();
3058        let s2 = server_state.clone();
3059        let s3 = server_state.clone();
3060        let h1 = headers.clone();
3061        let h2 = headers.clone();
3062        let h3 = headers.clone();
3063        let trace1 = TraceCtx::disabled();
3064        let trace2 = TraceCtx::disabled();
3065        let trace3 = TraceCtx::disabled();
3066        let (r1, r2, r3) = tokio::join!(
3067            execute_delete(h1, auth::Tier::Approve, world_path("home/a"), &s1, &trace1),
3068            execute_delete(h2, auth::Tier::Approve, world_path("home/b"), &s2, &trace2),
3069            execute_delete(h3, auth::Tier::Approve, world_path("home/c"), &s3, &trace3),
3070        );
3071        assert_eq!(unwrap_response(r1).status(), StatusCode::NO_CONTENT);
3072        assert_eq!(unwrap_response(r2).status(), StatusCode::NO_CONTENT);
3073        assert_eq!(unwrap_response(r3).status(), StatusCode::NO_CONTENT);
3074
3075        // 3 (original) + 1 (ledger creation, exactly once) - 3 (three
3076        // deletes) = 1.
3077        assert_eq!(state.durable_world_count.load(Ordering::Relaxed), 1);
3078        assert!(state.delete_ledger_created.load(Ordering::Relaxed));
3079
3080        let _ = std::fs::remove_dir_all(dir);
3081    }
3082
3083    async fn response_text(resp: Response) -> String {
3084        let bytes = axum::body::to_bytes(resp.into_body(), usize::MAX)
3085            .await
3086            .unwrap();
3087        String::from_utf8(bytes.to_vec()).unwrap()
3088    }
3089
3090    // ── pipeline route-level coverage (PR 4b/4c) ───────────────
3091    //
3092    // The white-box tests above call `execute_get` / `execute_head`
3093    // / `execute_put` / `execute_post` / `execute_delete` directly
3094    // (renamed mechanically from `handle_*` in PR 4c) -- they cover
3095    // verb-handler logic in isolation. The tests below call
3096    // `pipeline::run` with the same Core fixture, exercising the
3097    // `world_handler -> pipeline::run -> handler::execute` route that
3098    // every real request now takes. Without these, a bug in the
3099    // pipeline wiring (path canonicalization, auth threading,
3100    // dispatch, response construction in the handler) would not be
3101    // caught by white-box tests; only e2e_blackbox would surface it.
3102
3103    #[tokio::test]
3104    async fn pipeline_get_existing_world_returns_200_with_body() {
3105        let (core, dir) = test_core("pipeline-get-200");
3106        core.write_world("home/hello", b"hello world", "text/plain", &[])
3107            .unwrap();
3108        let core = Arc::new(core);
3109        let state = server_state_for_tests(core.clone());
3110
3111        let resp = pipeline::run(
3112            Method::GET,
3113            "/home/hello".to_string(),
3114            HeaderMap::new(),
3115            Bytes::new(),
3116            &state,
3117            42,
3118        )
3119        .await;
3120
3121        assert_eq!(resp.status(), StatusCode::OK);
3122        assert_eq!(
3123            resp.headers()
3124                .get(header::CONTENT_TYPE)
3125                .and_then(|v| v.to_str().ok()),
3126            Some("text/plain")
3127        );
3128        assert_eq!(response_text(resp).await, "hello world");
3129
3130        let _ = std::fs::remove_dir_all(dir);
3131    }
3132
3133    #[tokio::test]
3134    async fn pipeline_head_existing_world_returns_headers_no_body() {
3135        let (core, dir) = test_core("pipeline-head-200");
3136        core.write_world("home/hello", b"hello world", "text/plain", &[])
3137            .unwrap();
3138        let core = Arc::new(core);
3139        let state = server_state_for_tests(core.clone());
3140
3141        let resp = pipeline::run(
3142            Method::HEAD,
3143            "/home/hello".to_string(),
3144            HeaderMap::new(),
3145            Bytes::new(),
3146            &state,
3147            43,
3148        )
3149        .await;
3150
3151        assert_eq!(resp.status(), StatusCode::OK);
3152        assert_eq!(
3153            resp.headers()
3154                .get(header::CONTENT_LENGTH)
3155                .and_then(|v| v.to_str().ok()),
3156            Some("11")
3157        );
3158        // HEAD body must be empty even though Content-Length says 11.
3159        assert_eq!(response_text(resp).await, "");
3160
3161        let _ = std::fs::remove_dir_all(dir);
3162    }
3163
3164    #[tokio::test]
3165    async fn pipeline_get_nonexistent_world_returns_404() {
3166        let (core, dir) = test_core("pipeline-get-404");
3167        let core = Arc::new(core);
3168        let state = server_state_for_tests(core.clone());
3169
3170        let resp = pipeline::run(
3171            Method::GET,
3172            "/home/missing".to_string(),
3173            HeaderMap::new(),
3174            Bytes::new(),
3175            &state,
3176            44,
3177        )
3178        .await;
3179
3180        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
3181
3182        let _ = std::fs::remove_dir_all(dir);
3183    }
3184
3185    #[tokio::test]
3186    async fn pipeline_get_invalid_dot_segment_returns_400() {
3187        let (core, dir) = test_core("pipeline-get-400");
3188        let core = Arc::new(core);
3189        let state = server_state_for_tests(core.clone());
3190
3191        let resp = pipeline::run(
3192            Method::GET,
3193            "/home/../etc/secret".to_string(),
3194            HeaderMap::new(),
3195            Bytes::new(),
3196            &state,
3197            45,
3198        )
3199        .await;
3200
3201        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
3202
3203        let _ = std::fs::remove_dir_all(dir);
3204    }
3205
3206    #[tokio::test]
3207    async fn pipeline_get_with_read_token_required_rejects_anon() {
3208        let (mut core, dir) = test_core("pipeline-get-401");
3209        core.tokens.read = auth::NonEmptyBytes::new(b"reader".to_vec());
3210        core.write_world("home/secret", b"shhh", "text/plain", &[])
3211            .unwrap();
3212        let core = Arc::new(core);
3213        let state = server_state_for_tests(core.clone());
3214
3215        let resp = pipeline::run(
3216            Method::GET,
3217            "/home/secret".to_string(),
3218            HeaderMap::new(), // no Authorization header
3219            Bytes::new(),
3220            &state,
3221            46,
3222        )
3223        .await;
3224
3225        assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
3226
3227        let _ = std::fs::remove_dir_all(dir);
3228    }
3229
3230    /// World-handler glue test (1/2): the full extractor chain
3231    /// (`State` + `Extension<RequestId>` + `Method` + `AxPath` +
3232    /// `HeaderMap` + `Bytes`) wires up correctly and `world_handler`
3233    /// routes GET to the pipeline. Goes through `tower::oneshot` so
3234    /// the `add_server_response_headers` middleware fires too -- that
3235    /// way we can assert the unified `x-request-id` header lands on
3236    /// the response.
3237    #[tokio::test]
3238    async fn world_handler_get_routes_through_pipeline() {
3239        use axum::body::Body;
3240        use axum::http::Request as HttpRequest;
3241        use tower::ServiceExt;
3242
3243        let (core, dir) = test_core("world-handler-get");
3244        core.write_world("home/hello", b"hello world", "text/plain", &[])
3245            .unwrap();
3246        let core = Arc::new(core);
3247        let state = server_state_for_tests(core.clone());
3248
3249        let app = Router::new()
3250            .route("/*world", any(world_handler))
3251            .layer(axum::middleware::from_fn_with_state(
3252                state.clone(),
3253                add_server_response_headers,
3254            ))
3255            .with_state(state);
3256
3257        let req = HttpRequest::builder()
3258            .method("GET")
3259            .uri("/home/hello")
3260            .body(Body::empty())
3261            .unwrap();
3262        let resp = app.oneshot(req).await.unwrap();
3263
3264        assert_eq!(resp.status(), StatusCode::OK);
3265        // Middleware stamps x-request-id; pipeline trace uses the
3266        // same id (`pipeline::run` reads RequestId from extensions
3267        // rather than allocating its own). The fact that this
3268        // header is present on the response is what the unification
3269        // fix guarantees -- without it, the bug would slip through
3270        // again silently.
3271        let req_id_header = resp
3272            .headers()
3273            .get("x-request-id")
3274            .and_then(|v| v.to_str().ok())
3275            .unwrap_or("missing");
3276        assert!(
3277            req_id_header.parse::<u64>().is_ok(),
3278            "x-request-id should be a numeric id, got {req_id_header:?}"
3279        );
3280        assert_eq!(response_text(resp).await, "hello world");
3281
3282        let _ = std::fs::remove_dir_all(dir);
3283    }
3284
3285    /// World-handler glue test (2/2): the same extractor chain works
3286    /// for HEAD. Validates the `if matches!(method, Method::GET |
3287    /// Method::HEAD)` short-circuit covers HEAD too, not just GET.
3288    #[tokio::test]
3289    async fn world_handler_head_routes_through_pipeline() {
3290        use axum::body::Body;
3291        use axum::http::Request as HttpRequest;
3292        use tower::ServiceExt;
3293
3294        let (core, dir) = test_core("world-handler-head");
3295        core.write_world("home/hello", b"hello world", "text/plain", &[])
3296            .unwrap();
3297        let core = Arc::new(core);
3298        let state = server_state_for_tests(core.clone());
3299
3300        let app = Router::new()
3301            .route("/*world", any(world_handler))
3302            .layer(axum::middleware::from_fn_with_state(
3303                state.clone(),
3304                add_server_response_headers,
3305            ))
3306            .with_state(state);
3307
3308        let req = HttpRequest::builder()
3309            .method("HEAD")
3310            .uri("/home/hello")
3311            .body(Body::empty())
3312            .unwrap();
3313        let resp = app.oneshot(req).await.unwrap();
3314
3315        assert_eq!(resp.status(), StatusCode::OK);
3316        assert_eq!(
3317            resp.headers()
3318                .get(header::CONTENT_LENGTH)
3319                .and_then(|v| v.to_str().ok()),
3320            Some("11"),
3321        );
3322        assert!(resp.headers().get("x-request-id").is_some());
3323        // HEAD body must be empty even though Content-Length says 11.
3324        assert_eq!(response_text(resp).await, "");
3325
3326        let _ = std::fs::remove_dir_all(dir);
3327    }
3328
3329    /// 304 path: `If-None-Match: <current-etag>` short-circuits
3330    /// inside `execute_get` to `hs::not_modified`, returning
3331    /// `Phase::ExecutedRead(304)`.
3332    #[tokio::test]
3333    async fn pipeline_get_if_none_match_returns_304() {
3334        let (core, dir) = test_core("pipeline-304");
3335        core.write_world("home/cached", b"cached body", "text/plain", &[])
3336            .unwrap();
3337        let core = Arc::new(core);
3338        let state = server_state_for_tests(core.clone());
3339
3340        // First GET to discover the current etag.
3341        let first = pipeline::run(
3342            Method::GET,
3343            "/home/cached".to_string(),
3344            HeaderMap::new(),
3345            Bytes::new(),
3346            &state,
3347            100,
3348        )
3349        .await;
3350        let etag = first
3351            .headers()
3352            .get(header::ETAG)
3353            .and_then(|v| v.to_str().ok())
3354            .expect("first GET must return an ETag header")
3355            .to_string();
3356
3357        let mut headers = HeaderMap::new();
3358        headers.insert(header::IF_NONE_MATCH, HeaderValue::from_str(&etag).unwrap());
3359        let resp = pipeline::run(
3360            Method::GET,
3361            "/home/cached".to_string(),
3362            headers,
3363            Bytes::new(),
3364            &state,
3365            101,
3366        )
3367        .await;
3368
3369        assert_eq!(resp.status(), StatusCode::NOT_MODIFIED);
3370        // 304 bodies must be empty.
3371        assert_eq!(response_text(resp).await, "");
3372
3373        let _ = std::fs::remove_dir_all(dir);
3374    }
3375
3376    /// 416 path: `Range: bytes=999-` against a 3-byte world is out
3377    /// of range. `effective_range` returns `Err(())` and
3378    /// `execute_get` produces `Phase::Error { reason:
3379    /// RangeNotSatisfiable }`. We assert BOTH the surfaced HTTP
3380    /// status (via `pipeline::run`) AND the structured reason
3381    /// variant (via direct `handler::execute`) so the 12th
3382    /// `ErrorReason` variant is verified to fire on a real
3383    /// read-path code path. Without the second assertion, a
3384    /// regression that emits the right status but the wrong reason
3385    /// would slip through and quietly poison trace / metrics.
3386    #[tokio::test]
3387    async fn pipeline_get_out_of_range_returns_416_with_reason() {
3388        let (core, dir) = test_core("pipeline-416");
3389        core.write_world("home/short", b"abc", "text/plain", &[])
3390            .unwrap();
3391        let core = Arc::new(core);
3392        let state = server_state_for_tests(core.clone());
3393
3394        // 1) Production path: pipeline::run surfaces 416 to the wire.
3395        let mut headers = HeaderMap::new();
3396        headers.insert(header::RANGE, HeaderValue::from_static("bytes=999-"));
3397        let resp = pipeline::run(
3398            Method::GET,
3399            "/home/short".to_string(),
3400            headers,
3401            Bytes::new(),
3402            &state,
3403            200,
3404        )
3405        .await;
3406        assert_eq!(resp.status(), StatusCode::RANGE_NOT_SATISFIABLE);
3407
3408        // 2) Internal phase: handler::execute returns the explicit
3409        // Phase::Error{RangeNotSatisfiable} variant.
3410        let mut headers2 = HeaderMap::new();
3411        headers2.insert(header::RANGE, HeaderValue::from_static("bytes=999-"));
3412        let phase = crate::handler::execute(
3413            Verb::Get,
3414            headers2,
3415            Bytes::new(),
3416            auth::Tier::Anon,
3417            world_path("home/short"),
3418            &state,
3419            &TraceCtx::disabled(),
3420        )
3421        .await;
3422        match phase {
3423            Phase::Error {
3424                reason: ErrorReason::RangeNotSatisfiable,
3425                resp,
3426            } => {
3427                assert_eq!(resp.status(), StatusCode::RANGE_NOT_SATISFIABLE);
3428            }
3429            _ => panic!("expected Phase::Error{{RangeNotSatisfiable}}"),
3430        }
3431
3432        let _ = std::fs::remove_dir_all(dir);
3433    }
3434
3435    #[tokio::test]
3436    async fn pipeline_get_range_returns_206_with_chunk() {
3437        let (core, dir) = test_core("pipeline-get-range");
3438        core.write_world("home/range", b"abcdef", "text/plain", &[])
3439            .unwrap();
3440        let core = Arc::new(core);
3441        let state = server_state_for_tests(core.clone());
3442
3443        let mut headers = HeaderMap::new();
3444        headers.insert(header::RANGE, HeaderValue::from_static("bytes=1-3"));
3445
3446        let resp = pipeline::run(
3447            Method::GET,
3448            "/home/range".to_string(),
3449            headers,
3450            Bytes::new(),
3451            &state,
3452            47,
3453        )
3454        .await;
3455
3456        assert_eq!(resp.status(), StatusCode::PARTIAL_CONTENT);
3457        assert_eq!(
3458            resp.headers()
3459                .get(header::CONTENT_RANGE)
3460                .and_then(|v| v.to_str().ok()),
3461            Some("bytes 1-3/6")
3462        );
3463        assert_eq!(response_text(resp).await, "bcd");
3464
3465        let _ = std::fs::remove_dir_all(dir);
3466    }
3467
3468    fn test_core(label: &str) -> (Core, PathBuf) {
3469        test_core_with_read_cache_max(label, crate::read_cache::DEFAULT_READ_CACHE_MAX_ENTRIES)
3470    }
3471
3472    fn test_core_with_read_cache_max(
3473        label: &str,
3474        read_cache_max_entries: usize,
3475    ) -> (Core, PathBuf) {
3476        let mut dir = std::env::temp_dir();
3477        let nanos = std::time::SystemTime::now()
3478            .duration_since(std::time::UNIX_EPOCH)
3479            .unwrap()
3480            .as_nanos();
3481        dir.push(format!(
3482            "elastik-core-{label}-{}-{nanos}",
3483            std::process::id()
3484        ));
3485        std::fs::create_dir_all(&dir).unwrap();
3486        (
3487            {
3488                let (events, _) = broadcast::channel(16);
3489                Core {
3490                    data: dir.clone(),
3491                    tokens: auth::Tokens {
3492                        read: None,
3493                        write: None,
3494                        approve: None,
3495                    },
3496                    hmac_key: b"test-key".to_vec(),
3497                    mem: Arc::new(store::MemoryStore::new()),
3498                    max_world_bytes: DEFAULT_MAX_WORLD_BYTES,
3499                    max_memory_bytes: DEFAULT_MAX_MEMORY_BYTES,
3500                    max_storage_bytes: None,
3501                    storage_body_bytes: Arc::new(AtomicUsize::new(0)),
3502                    durable_world_count: Arc::new(AtomicUsize::new(0)),
3503                    delete_ledger_created: Arc::new(AtomicBool::new(false)),
3504                    events,
3505                    listen_slots: Arc::new(Semaphore::new(DEFAULT_MAX_LISTEN_CONNECTIONS)),
3506                    listen_replay_max: DEFAULT_LISTEN_REPLAY_MAX,
3507                    event_log: Arc::new(StdMutex::new(VecDeque::with_capacity(
3508                        DEFAULT_LISTEN_REPLAY_MAX,
3509                    ))),
3510                    shutdown: watch::channel(false).1,
3511                    next_event: crate::state::new_event_counter(),
3512                    world_locks: Arc::new(DashMap::new()),
3513                    ledger: Arc::new(crate::ledger::LedgerWriter::new()),
3514                    read_cache: Arc::new(crate::read_cache::ReadCache::new(read_cache_max_entries)),
3515                }
3516            },
3517            dir,
3518        )
3519    }
3520
3521    #[tokio::test]
3522    async fn write_permit_is_bound_to_one_world() {
3523        struct NoopTrace;
3524        impl crate::world_ops::WriteTraceHooks for NoopTrace {}
3525
3526        let (core, dir) = test_core("permit-bound");
3527        let world = world_path("home/permit-a");
3528        let permit = crate::world_ops::authorize_write(&world, auth::Tier::Write)
3529            .expect("write token tier should authorize home writes");
3530        let req = crate::world_ops::ReplaceRequest {
3531            body: Bytes::from_static(b"right-door"),
3532            content_type: "text/plain; charset=utf-8".to_owned(),
3533            headers: Vec::new(),
3534            preconditions: et::Preconditions::default(),
3535        };
3536
3537        crate::world_ops::replace_write(&core, &permit, req, &NoopTrace)
3538            .await
3539            .expect("permit writes only its bound world");
3540
3541        assert_eq!(
3542            core.read_world("home/permit-a").unwrap().unwrap().body,
3543            b"right-door"
3544        );
3545        assert!(core.read_world("home/permit-b").unwrap().is_none());
3546
3547        let _ = std::fs::remove_dir_all(dir);
3548    }
3549
3550    #[test]
3551    fn write_permit_preserves_path_based_approve_gate() {
3552        assert!(matches!(
3553            crate::world_ops::authorize_write(&world_path("etc/config"), auth::Tier::Write),
3554            Err(crate::world_ops::WriteError::Auth(AuthGate::WriteApprove))
3555        ));
3556        assert!(
3557            crate::world_ops::authorize_write(&world_path("etc/config"), auth::Tier::Approve)
3558                .is_ok()
3559        );
3560        assert!(
3561            crate::world_ops::authorize_write(&world_path("home/config"), auth::Tier::Write)
3562                .is_ok()
3563        );
3564    }
3565}