Skip to main content

elastik_core/
lib.rs

1//! # Elastik — Audi-ted L5 Storage Engine
2//!
3//! `elastik-core` is a protocol-neutral storage engine: canonical paths, opaque
4//! bytes, content-addressed versioning, an HMAC-chained audit log, and a
5//! four-tier access model. **SQLite for files.**
6//!
7//! ## Quick start
8//!
9//! ```no_run
10//! # #[cfg(feature = "unstable-engine")]
11//! # async fn run() {
12//! use elastik_core::{
13//!     AccessTier, Engine, Preconditions, Representation, SecretBytes, ValidatedWorldPath,
14//! };
15//! use bytes::Bytes;
16//!
17//! let engine = Engine::builder()
18//!     .data_root("./data")
19//!     .key(SecretBytes::new(b"shared-hmac-secret".to_vec()).expect("hmac key"))
20//!     .build()
21//!     .expect("engine builds");
22//!
23//! let world = ValidatedWorldPath::new("home/hello").expect("canonical path");
24//!
25//! // Store bytes at a path.
26//! engine
27//!     .replace(
28//!         &world,
29//!         Representation {
30//!             body: Bytes::from_static(b"hi"),
31//!             content_type: "text/plain".into(),
32//!             headers: Vec::new(),
33//!         },
34//!         Preconditions::none(),
35//!         AccessTier::Write,
36//!     )
37//!     .await
38//!     .expect("write succeeds");
39//!
40//! // Retrieve bytes by path.
41//! let read = engine.read(&world, AccessTier::Read).expect("read succeeds");
42//! assert!(read.is_some());
43//! # }
44//! ```
45//!
46//! ## What the library does
47//!
48//! - **Bytes at paths.** Canonical `home/`, `tmp/`, `dev/`, `sys/`, `etc/`,
49//!   `lib/`, `boot/`, `usr/`, `var/` namespaces decide durable-vs-transient
50//!   without per-call configuration.
51//! - **Versions everything.** Every successful write returns an ETag; reads,
52//!   replaces, and appends honour `Preconditions::if_match` / `if_none_match`.
53//! - **Audits everything.** HMAC-chained ledger; `Engine::verify_audit`
54//!   returns a typed [`AuditVerify`] result and refuses to start when an
55//!   existing chain is corrupted.
56//! - **Authenticates everything.** [`AccessTier`] (Anon / Read / Write /
57//!   Approve) plus token-bytes verification via [`Engine::verify_token`].
58//! - **Subscribes to changes.** [`Engine::subscribe`] returns an
59//!   [`EngineSubscription`] with replay-then-live ordering.
60//!
61//! ## What the library does *not* do
62//!
63//! No HTTP, no CoAP, no SSE, no server runtime. Those live in the
64//! `elastik-core` binary and consume this library through the unstable public
65//! [`Engine`] API. The library does not read environment variables, does not
66//! bind sockets, and does not depend on `axum`, `hyper`, `tower`,
67//! `tokio-stream`, `futures-util`, or `base64` in a default-feature build.
68//!
69//! ## Feature flags
70//!
71//! - `bundled-sqlite` *(default)* — link a bundled SQLite via `rusqlite/bundled`.
72//! - `coap` *(default)* — enable the CoAP adapter inside the binary.
73//! - `multi-thread` *(default)* — enable Tokio's multi-thread runtime for the
74//!   binary.
75//! - `unstable-engine` — expose the public [`Engine`] facade. **The API shape
76//!   is allowed to change between minor versions while this gate stays.**
77//! - `unstable-engine-bin` *(default)* — superset that adds `axum`, `base64`,
78//!   `futures-util`, Tokio `net`/`signal`, and `tracing-subscriber`; the
79//!   `elastik-core` binary requires this feature.
80//!
81//! Minimal library-only build: `cargo build --lib --no-default-features
82//! --features bundled-sqlite,unstable-engine`.
83mod audit;
84mod auth;
85#[cfg(test)]
86#[cfg(feature = "coap")]
87#[path = "server/coap.rs"]
88mod coap;
89#[cfg(test)]
90#[cfg(feature = "coap")]
91#[path = "server/coap_errors.rs"]
92mod coap_errors;
93#[cfg(test)]
94mod config;
95mod defaults;
96mod delete_ops;
97mod engine;
98mod engine_introspection;
99mod engine_ops;
100mod engine_trace;
101mod engine_types;
102mod etag;
103mod event;
104#[cfg(test)]
105#[path = "server/handler.rs"]
106mod handler;
107#[cfg(test)]
108mod http_range;
109#[cfg(test)]
110mod http_semantics;
111mod ledger;
112#[cfg(test)]
113#[path = "server/listen.rs"]
114mod listen;
115#[cfg(test)]
116#[path = "server/middleware.rs"]
117mod middleware;
118mod path;
119#[cfg(test)]
120#[path = "server/pipeline.rs"]
121mod pipeline;
122#[cfg(test)]
123#[path = "server/proc.rs"]
124mod proc;
125mod read_cache;
126#[cfg(test)]
127#[path = "server/response.rs"]
128mod response;
129#[cfg(test)]
130#[path = "server/route.rs"]
131mod route;
132#[cfg(test)]
133mod server;
134mod state;
135mod storage_class;
136mod store;
137mod world;
138mod world_ops;
139
140// Re-export protocol-neutral helpers at the crate root. HTTP adapter modules
141// are only compiled here for legacy white-box tests; production binary builds
142// own their adapter helpers from `main.rs`.
143#[cfg(test)]
144pub(crate) use crate::path::*;
145#[cfg(test)]
146pub(crate) use crate::pipeline::*;
147#[cfg(test)]
148pub(crate) use crate::proc::*;
149#[cfg(test)]
150pub(crate) use crate::response::*;
151pub(crate) use crate::state::*;
152pub(crate) use crate::storage_class::*;
153#[cfg(feature = "unstable-engine")]
154pub use auth::AuthGate;
155#[cfg(not(feature = "unstable-engine"))]
156pub(crate) use auth::AuthGate;
157#[cfg(all(feature = "unstable-engine", feature = "coap"))]
158#[doc(hidden)]
159pub use engine::ShutdownToken;
160#[cfg(feature = "unstable-engine")]
161pub use engine::{Engine, EngineBuildError, EngineBuilder, EngineError};
162#[cfg(feature = "unstable-engine")]
163pub use engine_introspection::{
164    AuditBroken, AuditValid, AuditVerify, DfSnapshot, InvalidProcPath, PoolSnapshot, ProcEndpoint,
165    ValidatedProcPath, WorldUsage,
166};
167#[cfg(feature = "unstable-engine")]
168pub use engine_trace::{DeleteMetadata, EngineDeleteTraceHooks, EngineWriteTraceHooks};
169#[cfg(feature = "unstable-engine")]
170pub use engine_types::{
171    AccessTier, ChangeEvent, EmptyKeyError, EngineSubscription, EtagMatcher, InvalidWorldPath,
172    Preconditions, ReadResult, Representation, SecretBytes, SubscribePattern,
173    SubscriptionRecvError, ValidatedWorldPath, WriteKind, WriteResult,
174};
175
176use std::path::Path;
177use std::time::Duration;
178
179#[cfg(test)]
180pub(crate) use crate::defaults::{
181    DEFAULT_LISTEN_REPLAY_MAX, DEFAULT_MAX_LISTEN_CONNECTIONS, DEFAULT_MAX_MEMORY_BYTES,
182    DEFAULT_MAX_WORLD_BYTES,
183};
184
185// Test-only HTTP constants for legacy white-box tests. Production constants
186// live on the binary side.
187#[cfg(test)]
188pub(crate) const VERSION: &str = env!("CARGO_PKG_VERSION");
189#[cfg(test)]
190pub(crate) const WORLD_ALLOW: &str = "GET, HEAD, PUT, POST, DELETE, OPTIONS";
191
192fn acquire_data_root_writer_lock(data: &Path) -> rusqlite::Result<rusqlite::Connection> {
193    let c = rusqlite::Connection::open(data.join(".elastik-writer-lock.sqlite3"))?;
194    c.busy_timeout(Duration::from_millis(0))?;
195    c.execute_batch(
196        r#"
197        PRAGMA journal_mode=WAL;
198        CREATE TABLE IF NOT EXISTS writer_lock(
199            id INTEGER PRIMARY KEY CHECK(id=1),
200            holder TEXT NOT NULL DEFAULT ''
201        );
202        INSERT OR IGNORE INTO writer_lock(id, holder) VALUES(1, '');
203        BEGIN IMMEDIATE;
204        "#,
205    )?;
206    c.execute(
207        "UPDATE writer_lock SET holder=?1 WHERE id=1",
208        [std::process::id().to_string()],
209    )?;
210    Ok(c)
211}
212
213// Legacy white-box tests still compile the HTTP route modules through this
214// library crate. Production builds compile those modules only from `main.rs`.
215
216// ─── /<world> all five methods ──────────────────────────────────────
217// Path validation lives in `path.rs`. Adapter-side canonicalization lives in
218// `server/path.rs` and is not part of the production library.
219
220// ─── helpers ────────────────────────────────────────────────────────
221
222pub(crate) fn can_write(world_name: &str, tier: auth::Tier) -> bool {
223    // Harvard gate: /lib/, /etc/, /boot/, /usr/, and audit logs
224    // require approve. /home/, /tmp/, /dev/, /sys/, and non-log
225    // /var/ worlds accept the normal token. Anon refused.
226    let needs_approve = needs_write_approve(world_name);
227    match tier {
228        auth::Tier::Anon => false,
229        auth::Tier::Read => false,
230        auth::Tier::Write => !needs_approve,
231        auth::Tier::Approve => true,
232    }
233}
234
235/// Mirrors the harvard-gate decision in `can_write`. Lifted out so
236/// `handler::execute_put` / `execute_post` can classify a rejected
237/// write as `Auth(Write)` vs `Auth(WriteApprove)` for the FSM trace
238/// without re-deriving the predicate.
239pub(crate) fn needs_write_approve(world_name: &str) -> bool {
240    exact_or_child(world_name, "lib")
241        || exact_or_child(world_name, "etc")
242        || exact_or_child(world_name, "boot")
243        || exact_or_child(world_name, "usr")
244        || exact_or_child(world_name, "var/log")
245}
246
247pub(crate) fn can_delete(tier: auth::Tier) -> bool {
248    matches!(tier, auth::Tier::Approve)
249}
250
251// (memory_write_projected_bytes / memory_append_projected_bytes were
252// removed: the snapshot-based projection they computed could be observed
253// by two concurrent writers before either had committed, letting them
254// both pass and overshoot max_memory_bytes once the global write_lock was
255// gone. Quota is now enforced inside MemoryStore::write_with_quota /
256// append_with_quota, atomically with the write itself.)
257
258// `require_read` (auth gate for proc handlers) lives in proc.rs now,
259// next to its only callers.
260
261// Response constructors (not_found, unauthorized, bad_request,
262// payload_too_large, insufficient_storage,
263// storage_temporarily_unavailable, storage_quota_exceeded,
264// options_response, method_not_allowed, precondition_failed,
265// server_error, storage_error, is_insufficient_storage_error,
266// is_transient_storage_error, audit_valid, audit_broken,
267// audit_header_value, audit_not_applicable, proc_text_response,
268// du_body, df_body, world_list_body,
269// to_header_map) live in `response.rs` and are re-exported at the
270// crate root. Helpers below use them through that re-export.
271
272pub(crate) fn exact_or_child(world_name: &str, prefix: &str) -> bool {
273    world_name == prefix
274        || world_name
275            .strip_prefix(prefix)
276            .is_some_and(|rest| rest.starts_with('/'))
277}
278
279pub(crate) fn can_read(core: &Core, tier: auth::Tier) -> bool {
280    !core.tokens.read_required()
281        || matches!(
282            tier,
283            auth::Tier::Read | auth::Tier::Write | auth::Tier::Approve
284        )
285}
286
287#[cfg(test)]
288mod tests {
289    use super::*;
290    #[cfg(feature = "coap")]
291    use crate::config::coap_bind_from_env;
292    use crate::config::{
293        env_nonzero_usize, env_optional_usize, hmac_key_from_env_value, listen_addr,
294        should_warn_public_read,
295    };
296    use crate::etag as et;
297    use crate::handler::{execute_delete, execute_get, execute_head, execute_post, execute_put};
298    use crate::http_semantics as hs;
299    use crate::middleware::{add_server_response_headers, stamp_core_response_headers};
300    use crate::route::world_handler;
301    use axum::body::Bytes;
302    use axum::extract::{Path as AxPath, State};
303    use axum::http::{header, HeaderMap, HeaderValue, Method, StatusCode};
304    use axum::response::Response;
305    use axum::routing::any;
306    use axum::Router;
307    use dashmap::DashMap;
308    use std::collections::VecDeque;
309    use std::net::IpAddr;
310    use std::path::PathBuf;
311    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
312    use std::sync::{Arc, Mutex as StdMutex, Mutex as TestMutex, OnceLock};
313    use tokio::sync::{broadcast, watch, Semaphore};
314
315    fn server_state_for_tests(core: Arc<Core>) -> crate::server::ServerState {
316        crate::server::ServerState::from_core_for_tests(core, DEFAULT_MAX_WORLD_BYTES)
317    }
318
319    /// PR 4c: 50 unit tests below were renamed mechanically from
320    /// `handle_*` (sync `&Core -> Response`) to `execute_*` (async
321    /// `&Core, &TraceCtx -> Phase`). The verb-handler entry point
322    /// returns `Phase`, but the assertions all operate on the
323    /// underlying `Response`. This helper unwraps the three terminal
324    /// `Phase` variants `execute_*` can return so tests can keep
325    /// asserting on `.status()` / `.headers()` without scaffolding
326    /// per call site.
327    fn unwrap_response(phase: Phase) -> Response {
328        match phase {
329            Phase::ExecutedRead(r) | Phase::CommittedWrite(r) | Phase::Done(r) => r,
330            Phase::Error { resp, .. } => resp,
331            // execute_* never returns a non-terminal Phase; reaching
332            // any of these would be a bug in the handler module.
333            Phase::Received { .. }
334            | Phase::Authenticated { .. }
335            | Phase::PathValidated { .. }
336            | Phase::Dispatched { .. } => {
337                panic!("execute_* returned a non-terminal Phase variant")
338            }
339        }
340    }
341
342    fn world_path(world: &str) -> crate::engine_types::ValidatedWorldPath {
343        crate::engine_types::ValidatedWorldPath::new(world).unwrap()
344    }
345
346    fn env_lock() -> &'static TestMutex<()> {
347        static LOCK: OnceLock<TestMutex<()>> = OnceLock::new();
348        LOCK.get_or_init(|| TestMutex::new(()))
349    }
350
351    #[cfg(feature = "coap")]
352    struct CoapEnvGuard {
353        host: Option<String>,
354        port: Option<String>,
355    }
356
357    #[cfg(feature = "coap")]
358    impl CoapEnvGuard {
359        fn capture() -> Self {
360            Self {
361                host: std::env::var("ELASTIK_COAP_HOST").ok(),
362                port: std::env::var("ELASTIK_COAP_PORT").ok(),
363            }
364        }
365    }
366
367    #[cfg(feature = "coap")]
368    impl Drop for CoapEnvGuard {
369        fn drop(&mut self) {
370            match &self.host {
371                Some(v) => std::env::set_var("ELASTIK_COAP_HOST", v),
372                None => std::env::remove_var("ELASTIK_COAP_HOST"),
373            }
374            match &self.port {
375                Some(v) => std::env::set_var("ELASTIK_COAP_PORT", v),
376                None => std::env::remove_var("ELASTIK_COAP_PORT"),
377            }
378        }
379    }
380
381    #[test]
382    fn hmac_key_requires_nonempty_semantic_content() {
383        assert!(hmac_key_from_env_value(None).is_none());
384        assert!(hmac_key_from_env_value(Some(String::new())).is_none());
385        assert!(hmac_key_from_env_value(Some(" \t\n".to_string())).is_none());
386        assert_eq!(
387            hmac_key_from_env_value(Some(" secret ".to_string())).unwrap(),
388            b" secret ".to_vec()
389        );
390    }
391
392    #[test]
393    fn resource_cap_env_zero_falls_back_to_default() {
394        let _guard = env_lock().lock().unwrap();
395        let key = format!("ELASTIK_TEST_ZERO_CAP_{}", std::process::id());
396        std::env::set_var(&key, "0");
397        assert_eq!(env_nonzero_usize(&key, 7), 7);
398        std::env::set_var(&key, "9");
399        assert_eq!(env_nonzero_usize(&key, 7), 9);
400        std::env::remove_var(&key);
401    }
402
403    #[test]
404    fn optional_storage_quota_zero_is_unlimited() {
405        let _guard = env_lock().lock().unwrap();
406        let key = format!("ELASTIK_TEST_STORAGE_CAP_{}", std::process::id());
407        std::env::remove_var(&key);
408        assert_eq!(env_optional_usize(&key), None);
409        std::env::set_var(&key, "");
410        assert_eq!(env_optional_usize(&key), None);
411        std::env::set_var(&key, " \t ");
412        assert_eq!(env_optional_usize(&key), None);
413        std::env::set_var(&key, "0");
414        assert_eq!(env_optional_usize(&key), None);
415        std::env::set_var(&key, "11");
416        assert_eq!(env_optional_usize(&key), Some(11));
417        std::env::set_var(&key, "10GB");
418        assert!(std::panic::catch_unwind(|| env_optional_usize(&key)).is_err());
419        std::env::remove_var(&key);
420    }
421
422    #[test]
423    fn data_root_writer_lock_is_exclusive() {
424        let dir =
425            std::env::temp_dir().join(format!("elastik-data-lock-test-{}", std::process::id()));
426        let _ = std::fs::remove_dir_all(&dir);
427        std::fs::create_dir_all(&dir).unwrap();
428
429        let first = acquire_data_root_writer_lock(&dir).unwrap();
430        assert!(acquire_data_root_writer_lock(&dir).is_err());
431        drop(first);
432        let second = acquire_data_root_writer_lock(&dir).unwrap();
433        drop(second);
434
435        let _ = std::fs::remove_dir_all(dir);
436    }
437
438    #[test]
439    fn sqlite_disk_full_maps_to_507() {
440        let err = rusqlite::Error::SqliteFailure(
441            rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_FULL),
442            None,
443        );
444        assert!(is_insufficient_storage_error(&err));
445
446        let resp = storage_error("test", err);
447        assert_eq!(resp.status(), StatusCode::INSUFFICIENT_STORAGE);
448    }
449
450    #[test]
451    fn sqlite_busy_and_locked_map_to_503_retry_after() {
452        for code in [rusqlite::ffi::SQLITE_BUSY, rusqlite::ffi::SQLITE_LOCKED] {
453            let err = rusqlite::Error::SqliteFailure(rusqlite::ffi::Error::new(code), None);
454            assert!(is_transient_storage_error(&err));
455
456            let resp = storage_error("test", err);
457            assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE);
458            assert_eq!(
459                resp.headers()
460                    .get(header::RETRY_AFTER)
461                    .and_then(|v| v.to_str().ok()),
462                Some("1")
463            );
464        }
465    }
466
467    #[test]
468    fn non_storage_sqlite_errors_stay_500() {
469        let err = rusqlite::Error::SqliteFailure(
470            rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_CORRUPT),
471            None,
472        );
473        assert!(!is_insufficient_storage_error(&err));
474
475        let resp = storage_error("test", err);
476        assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR);
477    }
478
479    #[tokio::test]
480    async fn durable_storage_quota_returns_507_without_writing() {
481        let (mut core, dir) = test_core("storage-quota");
482        core.max_storage_bytes = Some(4);
483        let headers = HeaderMap::new();
484
485        let first = unwrap_response(
486            execute_put(
487                headers.clone(),
488                Bytes::from_static(b"1234"),
489                auth::Tier::Write,
490                world_path("home/a"),
491                &core,
492                &TraceCtx::disabled(),
493            )
494            .await,
495        );
496        assert_eq!(first.status(), StatusCode::CREATED);
497
498        let over = unwrap_response(
499            execute_put(
500                headers.clone(),
501                Bytes::from_static(b"5"),
502                auth::Tier::Write,
503                world_path("home/b"),
504                &core,
505                &TraceCtx::disabled(),
506            )
507            .await,
508        );
509        assert_eq!(over.status(), StatusCode::INSUFFICIENT_STORAGE);
510        assert_eq!(over.headers().get("x-storage-usage").unwrap(), "4");
511        assert_eq!(over.headers().get("x-storage-quota").unwrap(), "4");
512        assert_eq!(over.headers().get("x-storage-needed").unwrap(), "1");
513        assert!(core.read_world("home/b").unwrap().is_none());
514
515        let append = unwrap_response(
516            execute_post(
517                headers.clone(),
518                Bytes::from_static(b"5"),
519                auth::Tier::Write,
520                world_path("home/a"),
521                &core,
522                &TraceCtx::disabled(),
523            )
524            .await,
525        );
526        assert_eq!(append.status(), StatusCode::INSUFFICIENT_STORAGE);
527        assert_eq!(append.headers().get("x-storage-usage").unwrap(), "4");
528        assert_eq!(append.headers().get("x-storage-quota").unwrap(), "4");
529        assert_eq!(append.headers().get("x-storage-needed").unwrap(), "1");
530        assert_eq!(
531            core.read_world("home/a").unwrap().unwrap().body,
532            b"1234".to_vec()
533        );
534
535        let _ = std::fs::remove_dir_all(dir);
536    }
537
538    #[tokio::test]
539    async fn concurrent_puts_to_distinct_worlds_do_not_overshoot_quota() {
540        // Regression: under per-world locks, the previous "snapshot then
541        // write then adjust" pattern let two concurrent PUTs to different
542        // worlds both observe usage below quota, both commit, and only
543        // afterwards push the counter past quota (lost-update race).
544        // Atomic CAS reservation in `Core::reserve_storage` closes that
545        // race. This test fires N concurrent PUTs each just under the
546        // quota and asserts that ANY mix of accept/reject keeps the
547        // counter coherent and never overshoots.
548        let (mut core, dir) = test_core("quota-race");
549        let quota = 100;
550        core.max_storage_bytes = Some(quota);
551        let core = Arc::new(core);
552
553        let workers = 16;
554        let body_len = 12; // 16 * 12 = 192 > quota: definitely contention
555        let mut handles = Vec::with_capacity(workers);
556        for i in 0..workers {
557            let core = core.clone();
558            handles.push(tokio::spawn(async move {
559                let path = format!("home/race/{i}");
560                let body = Bytes::copy_from_slice(&vec![b'x'; body_len]);
561                unwrap_response(
562                    execute_put(
563                        HeaderMap::new(),
564                        body,
565                        auth::Tier::Write,
566                        world_path(&path),
567                        &core,
568                        &TraceCtx::disabled(),
569                    )
570                    .await,
571                )
572            }));
573        }
574
575        let mut accepted: usize = 0;
576        for handle in handles {
577            let resp = handle.await.unwrap();
578            match resp.status() {
579                StatusCode::CREATED | StatusCode::OK => accepted += 1,
580                StatusCode::INSUFFICIENT_STORAGE => {}
581                other => panic!("unexpected status: {other}"),
582            }
583        }
584
585        let used = core.storage_body_bytes.load(Ordering::Relaxed);
586        let counted = accepted * body_len;
587        assert_eq!(used, counted, "counter must equal sum of accepted bodies");
588        assert!(
589            used <= quota,
590            "counter must never exceed quota: {used} > {quota}"
591        );
592
593        let _ = std::fs::remove_dir_all(dir);
594    }
595
596    #[tokio::test]
597    async fn concurrent_memory_puts_do_not_overshoot_max_memory_bytes() {
598        // Mirror of concurrent_puts_to_distinct_worlds_do_not_overshoot_quota
599        // for memory worlds. Per-world locks let writes to /tmp/a and
600        // /tmp/b run concurrently; before this fix each one would read
601        // total_bytes() as a snapshot, both pass the budget check, and
602        // both commit -- overshooting ELASTIK_MAX_MEMORY_BYTES. With the
603        // check + write fused under the MemoryStore HashMap mutex inside
604        // write_with_quota, only the writes whose accept order keeps the
605        // running total under the cap can commit.
606        let (mut core, dir) = test_core("memory-quota-race");
607        let cap = 100;
608        core.max_memory_bytes = cap;
609        let core = Arc::new(core);
610
611        let workers = 16;
612        let body_len = 12; // 16 * 12 = 192 > cap: forced contention
613        let mut handles = Vec::with_capacity(workers);
614        for i in 0..workers {
615            let core = core.clone();
616            handles.push(tokio::spawn(async move {
617                let path = format!("tmp/race/{i}");
618                let body = Bytes::copy_from_slice(&vec![b'm'; body_len]);
619                unwrap_response(
620                    execute_put(
621                        HeaderMap::new(),
622                        body,
623                        auth::Tier::Write,
624                        world_path(&path),
625                        &core,
626                        &TraceCtx::disabled(),
627                    )
628                    .await,
629                )
630            }));
631        }
632
633        let mut accepted: usize = 0;
634        for handle in handles {
635            let resp = handle.await.unwrap();
636            match resp.status() {
637                StatusCode::CREATED | StatusCode::OK => accepted += 1,
638                StatusCode::PAYLOAD_TOO_LARGE => {}
639                other => panic!("unexpected status: {other}"),
640            }
641        }
642
643        let used = core.mem.total_bytes();
644        let counted = accepted * body_len;
645        assert_eq!(
646            used, counted,
647            "memory total must equal sum of accepted bodies"
648        );
649        assert!(
650            used <= cap,
651            "memory total must never exceed cap: {used} > {cap}"
652        );
653
654        let _ = std::fs::remove_dir_all(dir);
655    }
656
657    #[test]
658    fn var_log_requires_approve_token() {
659        assert!(!can_write("var/log", auth::Tier::Anon));
660        assert!(!can_write("var/log", auth::Tier::Read));
661        assert!(!can_write("var/log", auth::Tier::Write));
662        assert!(can_write("var/log", auth::Tier::Approve));
663        assert!(!can_write("var/log/deletes", auth::Tier::Anon));
664        assert!(!can_write("var/log/deletes", auth::Tier::Read));
665        assert!(!can_write("var/log/deletes", auth::Tier::Write));
666        assert!(can_write("var/log/deletes", auth::Tier::Approve));
667    }
668
669    #[test]
670    fn delete_requires_approve_token() {
671        assert!(!can_delete(auth::Tier::Anon));
672        assert!(!can_delete(auth::Tier::Read));
673        assert!(!can_delete(auth::Tier::Write));
674        assert!(can_delete(auth::Tier::Approve));
675    }
676
677    #[test]
678    fn listen_addr_brackets_ipv6_hosts() {
679        assert_eq!(listen_addr("127.0.0.1", 3105), "127.0.0.1:3105");
680        assert_eq!(listen_addr("0.0.0.0", 3105), "0.0.0.0:3105");
681        assert_eq!(listen_addr("::1", 3105), "[::1]:3105");
682        assert_eq!(listen_addr("localhost", 3105), "localhost:3105");
683    }
684
685    #[cfg(feature = "coap")]
686    #[test]
687    fn coap_bind_is_opt_in_by_port_env() {
688        let _lock = env_lock().lock().unwrap();
689        let _guard = CoapEnvGuard::capture();
690        std::env::remove_var("ELASTIK_COAP_HOST");
691        std::env::remove_var("ELASTIK_COAP_PORT");
692
693        assert_eq!(coap_bind_from_env(), None);
694
695        std::env::set_var("ELASTIK_COAP_HOST", "0.0.0.0");
696        assert_eq!(coap_bind_from_env(), None);
697
698        std::env::set_var("ELASTIK_COAP_PORT", "5683");
699        assert_eq!(coap_bind_from_env(), Some(("0.0.0.0".to_owned(), 5683)));
700
701        std::env::set_var("ELASTIK_COAP_HOST", "127.0.0.1");
702        std::env::set_var("ELASTIK_COAP_PORT", " ");
703        assert_eq!(coap_bind_from_env(), None);
704
705        std::env::set_var("ELASTIK_COAP_PORT", "not-a-port");
706        assert_eq!(coap_bind_from_env(), None);
707    }
708
709    #[test]
710    fn non_loopback_public_read_gets_warning_flag() {
711        let mut tokens = auth::Tokens {
712            read: None,
713            write: None,
714            approve: None,
715        };
716        assert!(!should_warn_public_read(
717            "127.0.0.1".parse::<IpAddr>().unwrap(),
718            tokens.read_required()
719        ));
720        assert!(should_warn_public_read(
721            "0.0.0.0".parse::<IpAddr>().unwrap(),
722            tokens.read_required()
723        ));
724
725        tokens.read = auth::NonEmptyBytes::new(b"reader".to_vec());
726        assert!(!should_warn_public_read(
727            "0.0.0.0".parse::<IpAddr>().unwrap(),
728            tokens.read_required()
729        ));
730    }
731
732    #[tokio::test]
733    async fn put_and_post_enforce_world_size_cap() {
734        let (mut core, dir) = test_core("world-size-cap");
735        core.max_world_bytes = 4;
736        let headers = HeaderMap::new();
737
738        let too_big = unwrap_response(
739            execute_put(
740                headers.clone(),
741                Bytes::from_static(b"12345"),
742                auth::Tier::Write,
743                world_path("home/too-big"),
744                &core,
745                &TraceCtx::disabled(),
746            )
747            .await,
748        );
749        assert_eq!(too_big.status(), StatusCode::PAYLOAD_TOO_LARGE);
750
751        let ok = unwrap_response(
752            execute_put(
753                headers.clone(),
754                Bytes::from_static(b"1234"),
755                auth::Tier::Write,
756                world_path("home/four"),
757                &core,
758                &TraceCtx::disabled(),
759            )
760            .await,
761        );
762        assert_eq!(ok.status(), StatusCode::CREATED);
763
764        let append = unwrap_response(
765            execute_post(
766                headers.clone(),
767                Bytes::from_static(b"5"),
768                auth::Tier::Write,
769                world_path("home/four"),
770                &core,
771                &TraceCtx::disabled(),
772            )
773            .await,
774        );
775        assert_eq!(append.status(), StatusCode::PAYLOAD_TOO_LARGE);
776
777        let _ = std::fs::remove_dir_all(dir);
778    }
779
780    #[tokio::test]
781    async fn memory_backend_enforces_total_quota() {
782        let (mut core, dir) = test_core("memory-quota");
783        core.max_memory_bytes = 4;
784        let headers = HeaderMap::new();
785
786        let first = unwrap_response(
787            execute_put(
788                headers.clone(),
789                Bytes::from_static(b"12"),
790                auth::Tier::Write,
791                world_path("tmp/a"),
792                &core,
793                &TraceCtx::disabled(),
794            )
795            .await,
796        );
797        assert_eq!(first.status(), StatusCode::CREATED);
798        let second = unwrap_response(
799            execute_put(
800                headers.clone(),
801                Bytes::from_static(b"34"),
802                auth::Tier::Write,
803                world_path("tmp/b"),
804                &core,
805                &TraceCtx::disabled(),
806            )
807            .await,
808        );
809        assert_eq!(second.status(), StatusCode::CREATED);
810        let third = unwrap_response(
811            execute_put(
812                headers.clone(),
813                Bytes::from_static(b"5"),
814                auth::Tier::Write,
815                world_path("tmp/c"),
816                &core,
817                &TraceCtx::disabled(),
818            )
819            .await,
820        );
821        assert_eq!(third.status(), StatusCode::PAYLOAD_TOO_LARGE);
822
823        let _ = std::fs::remove_dir_all(dir);
824    }
825
826    #[test]
827    fn system_namespace_roots_require_approve_even_if_called_directly() {
828        for name in ["lib", "etc", "boot", "usr"] {
829            assert!(!can_write(name, auth::Tier::Anon), "{name}");
830            assert!(!can_write(name, auth::Tier::Read), "{name}");
831            assert!(!can_write(name, auth::Tier::Write), "{name}");
832            assert!(can_write(name, auth::Tier::Approve), "{name}");
833        }
834    }
835
836    #[test]
837    fn non_log_var_still_accepts_auth_token() {
838        assert!(!can_write("var/cache/rag", auth::Tier::Anon));
839        assert!(!can_write("var/cache/rag", auth::Tier::Read));
840        assert!(can_write("var/cache/rag", auth::Tier::Write));
841        assert!(can_write("var/cache/rag", auth::Tier::Approve));
842    }
843
844    #[test]
845    fn read_token_is_optional_but_gates_reads_when_set() {
846        let (mut core, dir) = test_core("read-token");
847        assert!(can_read(&core, auth::Tier::Anon));
848
849        core.tokens.read = auth::NonEmptyBytes::new(b"reader".to_vec());
850        assert!(!can_read(&core, auth::Tier::Anon));
851        assert!(can_read(&core, auth::Tier::Read));
852        assert!(can_read(&core, auth::Tier::Write));
853        assert!(can_read(&core, auth::Tier::Approve));
854
855        let _ = std::fs::remove_dir_all(dir);
856    }
857
858    #[tokio::test]
859    async fn get_and_head_require_read_token_when_enabled() {
860        let (mut core, dir) = test_core("read-token-handlers");
861        core.write_world("home/private", b"secret", "text/plain", &[])
862            .unwrap();
863        core.tokens.read = auth::NonEmptyBytes::new(b"reader".to_vec());
864
865        let headers = HeaderMap::new();
866        let get_anon = unwrap_response(
867            execute_get(
868                headers.clone(),
869                auth::Tier::Anon,
870                world_path("home/private"),
871                &core,
872                &TraceCtx::disabled(),
873            )
874            .await,
875        );
876        assert_eq!(get_anon.status(), StatusCode::UNAUTHORIZED);
877        let head_reader = unwrap_response(
878            execute_head(
879                headers.clone(),
880                auth::Tier::Read,
881                world_path("home/private"),
882                &core,
883                &TraceCtx::disabled(),
884            )
885            .await,
886        );
887        assert_eq!(head_reader.status(), StatusCode::OK);
888
889        let _ = std::fs::remove_dir_all(dir);
890    }
891
892    #[test]
893    fn unauthorized_responses_advertise_bearer_challenge() {
894        let resp = unauthorized("read requires read token");
895        assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
896        assert_eq!(
897            resp.headers().get(header::WWW_AUTHENTICATE).unwrap(),
898            "Bearer realm=\"elastik\""
899        );
900        assert_eq!(
901            resp.headers().get(header::CONTENT_TYPE).unwrap(),
902            "text/plain; charset=utf-8"
903        );
904    }
905
906    #[test]
907    fn core_response_headers_are_core_owned() {
908        let mut headers = HeaderMap::new();
909        headers.insert(header::VARY, HeaderValue::from_static("*"));
910        headers.insert("x-request-id", HeaderValue::from_static("stale"));
911        headers.insert("x-elapsed-us", HeaderValue::from_static("999"));
912        headers.insert("x-content-type-options", HeaderValue::from_static("sniff"));
913
914        stamp_core_response_headers(42, 7, &mut headers);
915
916        assert_eq!(headers.get("x-request-id").unwrap(), "42");
917        assert_eq!(headers.get("x-elapsed-us").unwrap(), "7");
918        assert_eq!(headers.get(header::VARY).unwrap(), "Authorization");
919        assert_eq!(headers.get("x-content-type-options").unwrap(), "nosniff");
920    }
921
922    #[test]
923    fn poisoned_persisted_headers_are_not_replayed() {
924        let mut out = Vec::new();
925        hs::apply_meta_headers(
926            &[
927                (
928                    "x-custom".to_owned(),
929                    "evil\r\nset-cookie: admin=true".to_owned(),
930                ),
931                ("set-cookie".to_owned(), "sid=admin; Path=/".to_owned()),
932                ("clear-site-data".to_owned(), "\"cookies\"".to_owned()),
933                ("bad name".to_owned(), "ok".to_owned()),
934                ("x-safe".to_owned(), "ok".to_owned()),
935            ],
936            &mut out,
937        );
938
939        assert_eq!(out.len(), 1);
940        assert_eq!(out[0].0.as_str(), "x-safe");
941        assert_eq!(out[0].1, "ok");
942    }
943
944    #[test]
945    fn canonicalize_preserves_explicit_namespaces() {
946        assert_eq!(canonicalize_path("/home/tmp/foo"), "home/tmp/foo");
947        assert_eq!(canonicalize_path("/home/etc/foo"), "home/etc/foo");
948        assert_eq!(canonicalize_path("/tmp/foo"), "tmp/foo");
949        assert_eq!(canonicalize_path("/etc/foo"), "etc/foo");
950        assert_eq!(canonicalize_path("/foo"), "home/foo");
951    }
952
953    #[test]
954    fn control_bytes_are_not_valid_world_names() {
955        assert!(valid_world_name("home/ok"));
956        assert!(!valid_world_name("home/bad\nname"));
957        assert!(!valid_world_name(""));
958    }
959
960    #[test]
961    fn dot_segments_empty_segments_and_backslashes_are_not_valid_world_names() {
962        assert!(!valid_world_name("home/../etc/secret"));
963        assert!(!valid_world_name("home/%2E%2E/etc/secret"));
964        assert!(!valid_world_name("home/./x"));
965        assert!(!valid_world_name("home//x"));
966        assert!(!valid_world_name("home/x/"));
967        assert!(!valid_world_name("home\\x"));
968        assert_eq!(
969            validate_world_name("home/%2E%2E/etc/secret"),
970            Err("world path contains dot or encoded-dot segment")
971        );
972        assert_eq!(
973            validate_world_name("home//x"),
974            Err("world path has empty segment")
975        );
976        assert_eq!(
977            validate_world_name("home\\x"),
978            Err("world path contains backslash")
979        );
980    }
981
982    #[test]
983    fn namespace_roots_and_proc_subtree_are_not_world_names() {
984        for name in [
985            "home",
986            "tmp",
987            "dev",
988            "sys",
989            "proc",
990            "proc/anything",
991            "etc",
992            "lib",
993            "boot",
994            "usr",
995            "var",
996            "var/log",
997        ] {
998            assert!(!valid_world_name(name), "{name}");
999        }
1000        assert!(valid_world_name("home/x"));
1001        assert!(valid_world_name("var/log/deletes"));
1002    }
1003
1004    #[test]
1005    fn byte_ranges_cover_normal_open_and_suffix_forms() {
1006        let mut h = HeaderMap::new();
1007        assert_eq!(hs::parse_range(&h, 10), Ok(None));
1008
1009        h.insert(header::RANGE, HeaderValue::from_static("bytes=2-5"));
1010        assert_eq!(hs::parse_range(&h, 10), Ok(Some((2, 5))));
1011
1012        h.insert(header::RANGE, HeaderValue::from_static("bytes=7-"));
1013        assert_eq!(hs::parse_range(&h, 10), Ok(Some((7, 9))));
1014
1015        h.insert(header::RANGE, HeaderValue::from_static("bytes=-3"));
1016        assert_eq!(hs::parse_range(&h, 10), Ok(Some((7, 9))));
1017
1018        h.insert(header::RANGE, HeaderValue::from_static("bytes=8-99"));
1019        assert_eq!(hs::parse_range(&h, 10), Ok(Some((8, 9))));
1020
1021        h.insert(header::RANGE, HeaderValue::from_static("bytes=11-12"));
1022        assert_eq!(hs::parse_range(&h, 10), Err(()));
1023
1024        h.insert(header::RANGE, HeaderValue::from_static("bytes=0-1,4-5"));
1025        assert_eq!(hs::parse_range(&h, 10), Ok(None));
1026    }
1027
1028    #[tokio::test]
1029    async fn get_and_head_honor_single_byte_range() {
1030        let (core, dir) = test_core("range-handler");
1031        core.write_world("home/range", b"abcdef", "text/plain", &[])
1032            .unwrap();
1033        let mut headers = HeaderMap::new();
1034        headers.insert(header::RANGE, HeaderValue::from_static("bytes=1-3"));
1035
1036        let get = unwrap_response(
1037            execute_get(
1038                headers.clone(),
1039                auth::Tier::Anon,
1040                world_path("home/range"),
1041                &core,
1042                &TraceCtx::disabled(),
1043            )
1044            .await,
1045        );
1046        assert_eq!(get.status(), StatusCode::PARTIAL_CONTENT);
1047        assert_eq!(
1048            get.headers().get(header::CONTENT_RANGE).unwrap(),
1049            "bytes 1-3/6"
1050        );
1051        assert_eq!(get.headers().get(header::CONTENT_LENGTH).unwrap(), "3");
1052
1053        let head = unwrap_response(
1054            execute_head(
1055                headers.clone(),
1056                auth::Tier::Anon,
1057                world_path("home/range"),
1058                &core,
1059                &TraceCtx::disabled(),
1060            )
1061            .await,
1062        );
1063        assert_eq!(head.status(), StatusCode::PARTIAL_CONTENT);
1064        assert_eq!(
1065            head.headers().get(header::CONTENT_RANGE).unwrap(),
1066            "bytes 1-3/6"
1067        );
1068        assert_eq!(head.headers().get(header::CONTENT_LENGTH).unwrap(), "3");
1069
1070        let _ = std::fs::remove_dir_all(dir);
1071    }
1072
1073    #[tokio::test]
1074    async fn get_and_head_advertise_accept_ranges_on_full_body() {
1075        let (core, dir) = test_core("accept-ranges");
1076        core.write_world("home/ranges", b"abcdef", "text/plain", &[])
1077            .unwrap();
1078        let headers = HeaderMap::new();
1079
1080        let get = unwrap_response(
1081            execute_get(
1082                headers.clone(),
1083                auth::Tier::Anon,
1084                world_path("home/ranges"),
1085                &core,
1086                &TraceCtx::disabled(),
1087            )
1088            .await,
1089        );
1090        assert_eq!(get.status(), StatusCode::OK);
1091        assert_eq!(get.headers().get(header::ACCEPT_RANGES).unwrap(), "bytes");
1092
1093        let head = unwrap_response(
1094            execute_head(
1095                headers.clone(),
1096                auth::Tier::Anon,
1097                world_path("home/ranges"),
1098                &core,
1099                &TraceCtx::disabled(),
1100            )
1101            .await,
1102        );
1103        assert_eq!(head.status(), StatusCode::OK);
1104        assert_eq!(head.headers().get(header::ACCEPT_RANGES).unwrap(), "bytes");
1105
1106        let _ = std::fs::remove_dir_all(dir);
1107    }
1108
1109    #[tokio::test]
1110    async fn unsatisfied_range_returns_416_with_content_range() {
1111        let (core, dir) = test_core("range-416");
1112        core.write_world("home/range", b"abcdef", "text/plain", &[])
1113            .unwrap();
1114        let mut headers = HeaderMap::new();
1115        headers.insert(header::RANGE, HeaderValue::from_static("bytes=99-100"));
1116
1117        let get = unwrap_response(
1118            execute_get(
1119                headers.clone(),
1120                auth::Tier::Anon,
1121                world_path("home/range"),
1122                &core,
1123                &TraceCtx::disabled(),
1124            )
1125            .await,
1126        );
1127        assert_eq!(get.status(), StatusCode::RANGE_NOT_SATISFIABLE);
1128        assert_eq!(
1129            get.headers().get(header::CONTENT_RANGE).unwrap(),
1130            "bytes */6"
1131        );
1132        assert_eq!(get.headers().get(header::ACCEPT_RANGES).unwrap(), "bytes");
1133
1134        let _ = std::fs::remove_dir_all(dir);
1135    }
1136
1137    #[tokio::test]
1138    async fn multi_range_is_ignored_and_returns_full_body() {
1139        let (core, dir) = test_core("multi-range");
1140        core.write_world("home/range", b"abcdef", "text/plain", &[])
1141            .unwrap();
1142        let mut headers = HeaderMap::new();
1143        headers.insert(header::RANGE, HeaderValue::from_static("bytes=0-1,4-5"));
1144
1145        let get = unwrap_response(
1146            execute_get(
1147                headers.clone(),
1148                auth::Tier::Anon,
1149                world_path("home/range"),
1150                &core,
1151                &TraceCtx::disabled(),
1152            )
1153            .await,
1154        );
1155        assert_eq!(get.status(), StatusCode::OK);
1156        assert!(get.headers().get(header::CONTENT_RANGE).is_none());
1157        assert_eq!(get.headers().get(header::CONTENT_LENGTH).unwrap(), "6");
1158
1159        let _ = std::fs::remove_dir_all(dir);
1160    }
1161
1162    #[tokio::test]
1163    async fn world_reads_advertise_monitor_and_collection_links() {
1164        let (core, dir) = test_core("link-headers");
1165        core.write_world("home/links", b"hello", "text/plain", &[])
1166            .unwrap();
1167        let headers = HeaderMap::new();
1168
1169        let get = unwrap_response(
1170            execute_get(
1171                headers.clone(),
1172                auth::Tier::Anon,
1173                world_path("home/links"),
1174                &core,
1175                &TraceCtx::disabled(),
1176            )
1177            .await,
1178        );
1179        let links: Vec<_> = get.headers().get_all(header::LINK).iter().collect();
1180        assert_eq!(links.len(), 2);
1181        assert!(links
1182            .iter()
1183            .any(|v| *v == "</listen/home/links>; rel=\"monitor\""));
1184        assert!(links
1185            .iter()
1186            .any(|v| *v == "</proc/worlds>; rel=\"collection\""));
1187
1188        let head = unwrap_response(
1189            execute_head(
1190                headers.clone(),
1191                auth::Tier::Anon,
1192                world_path("home/links"),
1193                &core,
1194                &TraceCtx::disabled(),
1195            )
1196            .await,
1197        );
1198        assert_eq!(head.headers().get_all(header::LINK).iter().count(), 2);
1199
1200        let _ = std::fs::remove_dir_all(dir);
1201    }
1202
1203    #[test]
1204    fn if_range_controls_whether_range_is_applied() {
1205        let mut h = HeaderMap::new();
1206        h.insert(header::RANGE, HeaderValue::from_static("bytes=1-3"));
1207        h.insert(
1208            header::IF_RANGE,
1209            HeaderValue::from_static("\"hmac-current\""),
1210        );
1211        assert_eq!(hs::effective_range(&h, 6, "hmac-current"), Ok(Some((1, 3))));
1212
1213        h.insert(header::IF_RANGE, HeaderValue::from_static("\"hmac-stale\""));
1214        assert_eq!(hs::effective_range(&h, 6, "hmac-current"), Ok(None));
1215
1216        h.insert(
1217            header::IF_RANGE,
1218            HeaderValue::from_static("W/\"hmac-current\""),
1219        );
1220        assert_eq!(hs::effective_range(&h, 6, "hmac-current"), Ok(None));
1221    }
1222
1223    #[tokio::test]
1224    async fn stale_if_range_returns_full_body() {
1225        let (core, dir) = test_core("if-range-stale");
1226        core.write_world("home/range", b"abcdef", "text/plain", &[])
1227            .unwrap();
1228        let mut headers = HeaderMap::new();
1229        headers.insert(header::RANGE, HeaderValue::from_static("bytes=1-3"));
1230        headers.insert(header::IF_RANGE, HeaderValue::from_static("\"hmac-stale\""));
1231
1232        let get = unwrap_response(
1233            execute_get(
1234                headers.clone(),
1235                auth::Tier::Anon,
1236                world_path("home/range"),
1237                &core,
1238                &TraceCtx::disabled(),
1239            )
1240            .await,
1241        );
1242        assert_eq!(get.status(), StatusCode::OK);
1243        assert!(get.headers().get(header::CONTENT_RANGE).is_none());
1244        assert_eq!(get.headers().get(header::CONTENT_LENGTH).unwrap(), "6");
1245
1246        let _ = std::fs::remove_dir_all(dir);
1247    }
1248
1249    #[tokio::test]
1250    async fn get_and_head_honor_if_none_match_cache_revalidation() {
1251        let (core, dir) = test_core("read-if-none-match");
1252        let h = world::write_with_audit(
1253            &core.data,
1254            "home/cache",
1255            b"cached body",
1256            "text/plain",
1257            &[],
1258            &core.hmac_key,
1259        )
1260        .unwrap();
1261        let etag = format!("\"{}\"", et::hmac_etag(&h));
1262        let mut headers = HeaderMap::new();
1263        headers.insert(header::IF_NONE_MATCH, HeaderValue::from_str(&etag).unwrap());
1264
1265        let get = unwrap_response(
1266            execute_get(
1267                headers.clone(),
1268                auth::Tier::Anon,
1269                world_path("home/cache"),
1270                &core,
1271                &TraceCtx::disabled(),
1272            )
1273            .await,
1274        );
1275        assert_eq!(get.status(), StatusCode::NOT_MODIFIED);
1276        assert_eq!(get.headers().get(header::ETAG).unwrap(), etag.as_str());
1277        assert!(get
1278            .headers()
1279            .get_all(header::LINK)
1280            .iter()
1281            .any(|v| v == "</listen/home/cache>; rel=\"monitor\""));
1282
1283        let head = unwrap_response(
1284            execute_head(
1285                headers.clone(),
1286                auth::Tier::Anon,
1287                world_path("home/cache"),
1288                &core,
1289                &TraceCtx::disabled(),
1290            )
1291            .await,
1292        );
1293        assert_eq!(head.status(), StatusCode::NOT_MODIFIED);
1294        assert_eq!(head.headers().get(header::ETAG).unwrap(), etag.as_str());
1295
1296        headers.insert(
1297            header::IF_NONE_MATCH,
1298            HeaderValue::from_static("\"hmac-stale\""),
1299        );
1300        let get = unwrap_response(
1301            execute_get(
1302                headers.clone(),
1303                auth::Tier::Anon,
1304                world_path("home/cache"),
1305                &core,
1306                &TraceCtx::disabled(),
1307            )
1308            .await,
1309        );
1310        assert_eq!(get.status(), StatusCode::OK);
1311
1312        let _ = std::fs::remove_dir_all(dir);
1313    }
1314
1315    #[tokio::test]
1316    async fn options_and_405_advertise_allow_headers() {
1317        // PR 4c: `handle_world_method` was retired. OPTIONS is now
1318        // answered directly in `world_handler` (policy-free, never
1319        // enters the FSM); PATCH and any other unsupported method
1320        // are rejected by `pipeline::dispatch` with `MethodNotAllowed`.
1321        let (core, dir) = test_core("allow");
1322        let core = std::sync::Arc::new(core);
1323        let state = server_state_for_tests(core.clone());
1324
1325        let options = options_response(WORLD_ALLOW);
1326        assert_eq!(options.status(), StatusCode::NO_CONTENT);
1327        assert_eq!(options.headers().get(header::ALLOW).unwrap(), WORLD_ALLOW);
1328
1329        let patch = pipeline::run(
1330            Method::PATCH,
1331            "home/allow".to_string(),
1332            HeaderMap::new(),
1333            Bytes::new(),
1334            &state,
1335            0,
1336        )
1337        .await;
1338        assert_eq!(patch.status(), StatusCode::METHOD_NOT_ALLOWED);
1339        assert_eq!(patch.headers().get(header::ALLOW).unwrap(), WORLD_ALLOW);
1340
1341        let _ = std::fs::remove_dir_all(dir);
1342    }
1343
1344    #[tokio::test]
1345    async fn root_and_proc_endpoints_advertise_head_options_and_405() {
1346        let root_head = root_hint(Method::HEAD).await;
1347        assert_eq!(root_head.status(), StatusCode::OK);
1348        assert_eq!(
1349            root_head.headers().get(header::CONTENT_TYPE).unwrap(),
1350            "text/plain; charset=utf-8"
1351        );
1352        assert!(root_head.headers().get(header::CONTENT_LENGTH).is_some());
1353
1354        let root_options = root_hint(Method::OPTIONS).await;
1355        assert_eq!(root_options.status(), StatusCode::NO_CONTENT);
1356        assert_eq!(
1357            root_options.headers().get(header::ALLOW).unwrap(),
1358            ROOT_ALLOW
1359        );
1360
1361        let root_post = root_hint(Method::POST).await;
1362        assert_eq!(root_post.status(), StatusCode::METHOD_NOT_ALLOWED);
1363        assert_eq!(root_post.headers().get(header::ALLOW).unwrap(), ROOT_ALLOW);
1364
1365        let version_head = proc_version(Method::HEAD).await;
1366        assert_eq!(version_head.status(), StatusCode::OK);
1367        assert!(version_head.headers().get(header::CONTENT_LENGTH).is_some());
1368
1369        let version_delete = proc_version(Method::DELETE).await;
1370        assert_eq!(version_delete.status(), StatusCode::METHOD_NOT_ALLOWED);
1371        assert_eq!(
1372            version_delete.headers().get(header::ALLOW).unwrap(),
1373            PROC_ALLOW
1374        );
1375    }
1376
1377    #[tokio::test]
1378    async fn proc_worlds_head_options_and_405_are_plain_http() {
1379        let (core, dir) = test_core("proc-worlds-http");
1380        core.write_world("home/a", b"a", "text/plain", &[]).unwrap();
1381        let state = Arc::new(core);
1382        let headers = HeaderMap::new();
1383
1384        let head = proc_worlds(
1385            State(server_state_for_tests(state.clone())),
1386            Method::HEAD,
1387            headers.clone(),
1388        )
1389        .await;
1390        assert_eq!(head.status(), StatusCode::OK);
1391        assert_eq!(
1392            head.headers().get(header::CONTENT_TYPE).unwrap(),
1393            "text/plain; charset=utf-8"
1394        );
1395        assert!(head.headers().get(header::CONTENT_LENGTH).is_some());
1396
1397        let options = proc_worlds(
1398            State(server_state_for_tests(state.clone())),
1399            Method::OPTIONS,
1400            headers.clone(),
1401        )
1402        .await;
1403        assert_eq!(options.status(), StatusCode::NO_CONTENT);
1404        assert_eq!(options.headers().get(header::ALLOW).unwrap(), PROC_ALLOW);
1405
1406        let delete = proc_worlds(
1407            State(server_state_for_tests(state)),
1408            Method::DELETE,
1409            headers,
1410        )
1411        .await;
1412        assert_eq!(delete.status(), StatusCode::METHOD_NOT_ALLOWED);
1413        assert_eq!(delete.headers().get(header::ALLOW).unwrap(), PROC_ALLOW);
1414
1415        let _ = std::fs::remove_dir_all(dir);
1416    }
1417
1418    #[tokio::test]
1419    async fn proc_namespace_is_reserved_beyond_declared_endpoints() {
1420        let not_found = proc_reserved(Method::GET).await;
1421        assert_eq!(not_found.status(), StatusCode::NOT_FOUND);
1422
1423        let head = proc_reserved(Method::HEAD).await;
1424        assert_eq!(head.status(), StatusCode::NOT_FOUND);
1425
1426        let options = proc_reserved(Method::OPTIONS).await;
1427        assert_eq!(options.status(), StatusCode::NO_CONTENT);
1428        assert_eq!(options.headers().get(header::ALLOW).unwrap(), PROC_ALLOW);
1429
1430        let put = proc_reserved(Method::PUT).await;
1431        assert_eq!(put.status(), StatusCode::METHOD_NOT_ALLOWED);
1432        assert_eq!(put.headers().get(header::ALLOW).unwrap(), PROC_ALLOW);
1433    }
1434
1435    #[tokio::test]
1436    async fn proc_audit_verify_reports_valid_chain_in_headers() {
1437        let (core, dir) = test_core("proc-audit-valid");
1438        let h = world::write_with_audit(
1439            &core.data,
1440            "home/audit-ok",
1441            b"hello",
1442            "text/plain",
1443            &[("x-meta-author".to_owned(), "ranger".to_owned())],
1444            &core.hmac_key,
1445        )
1446        .unwrap();
1447        let state = Arc::new(core);
1448        let resp = proc_audit_verify(
1449            State(server_state_for_tests(state)),
1450            Method::HEAD,
1451            AxPath("home/audit-ok/verify".to_owned()),
1452            HeaderMap::new(),
1453        )
1454        .await;
1455
1456        assert_eq!(resp.status(), StatusCode::OK);
1457        assert_eq!(resp.headers().get("x-audit-valid").unwrap(), "true");
1458        assert_eq!(resp.headers().get("x-audit-events").unwrap(), "1");
1459        assert_eq!(
1460            resp.headers().get("x-audit-latest").unwrap(),
1461            &format!("hmac-{h}")
1462        );
1463        assert_eq!(resp.headers().get(header::CONTENT_LENGTH).unwrap(), "0");
1464
1465        let _ = std::fs::remove_dir_all(dir);
1466    }
1467
1468    #[tokio::test]
1469    async fn proc_audit_verify_reports_broken_chain_in_headers() {
1470        let (core, dir) = test_core("proc-audit-broken");
1471        world::write_with_audit(
1472            &core.data,
1473            "home/audit-broken",
1474            b"hello",
1475            "text/plain",
1476            &[],
1477            &core.hmac_key,
1478        )
1479        .unwrap();
1480        let db = world::world_db(&core.data, "home/audit-broken");
1481        let c = rusqlite::Connection::open(db).unwrap();
1482        c.execute("UPDATE events SET hmac='bad' WHERE id=1", [])
1483            .unwrap();
1484
1485        let state = Arc::new(core);
1486        let resp = proc_audit_verify(
1487            State(server_state_for_tests(state)),
1488            Method::HEAD,
1489            AxPath("home/audit-broken/verify".to_owned()),
1490            HeaderMap::new(),
1491        )
1492        .await;
1493
1494        assert_eq!(resp.status(), StatusCode::CONFLICT);
1495        assert_eq!(resp.headers().get("x-audit-valid").unwrap(), "false");
1496        assert_eq!(resp.headers().get("x-audit-break-at").unwrap(), "0");
1497        assert_eq!(resp.headers().get("x-audit-actual").unwrap(), "hmac-bad");
1498
1499        let _ = std::fs::remove_dir_all(dir);
1500    }
1501
1502    #[tokio::test]
1503    async fn proc_audit_verify_escapes_tampered_header_values() {
1504        let (core, dir) = test_core("proc-audit-header-escape");
1505        world::write_with_audit(
1506            &core.data,
1507            "home/audit-escaped",
1508            b"hello",
1509            "text/plain",
1510            &[],
1511            &core.hmac_key,
1512        )
1513        .unwrap();
1514        let db = world::world_db(&core.data, "home/audit-escaped");
1515        let c = rusqlite::Connection::open(db).unwrap();
1516        c.execute(
1517            "UPDATE events SET hmac=? WHERE id=1",
1518            ["bad\nInjected: yes"],
1519        )
1520        .unwrap();
1521
1522        let state = Arc::new(core);
1523        let resp = proc_audit_verify(
1524            State(server_state_for_tests(state)),
1525            Method::HEAD,
1526            AxPath("home/audit-escaped/verify".to_owned()),
1527            HeaderMap::new(),
1528        )
1529        .await;
1530
1531        assert_eq!(resp.status(), StatusCode::CONFLICT);
1532        assert_eq!(
1533            resp.headers().get("x-audit-actual").unwrap(),
1534            "hmac-bad\\x0aInjected: yes"
1535        );
1536
1537        let _ = std::fs::remove_dir_all(dir);
1538    }
1539
1540    #[tokio::test]
1541    async fn proc_audit_verify_reports_memory_world_not_applicable() {
1542        let (core, dir) = test_core("proc-audit-memory");
1543        core.write_world("tmp/scratch", b"draft", "text/plain", &[])
1544            .unwrap();
1545        let state = Arc::new(core);
1546        let resp = proc_audit_verify(
1547            State(server_state_for_tests(state)),
1548            Method::HEAD,
1549            AxPath("tmp/scratch/verify".to_owned()),
1550            HeaderMap::new(),
1551        )
1552        .await;
1553
1554        assert_eq!(resp.status(), StatusCode::NO_CONTENT);
1555        assert_eq!(resp.headers().get("x-audit-valid").unwrap(), "n/a");
1556
1557        let _ = std::fs::remove_dir_all(dir);
1558    }
1559
1560    #[tokio::test]
1561    async fn proc_audit_verify_missing_disk_world_does_not_create_db() {
1562        let (core, dir) = test_core("proc-audit-missing-no-create");
1563        let db = world::world_db(&core.data, "home/missing-audit");
1564        assert!(!db.exists());
1565
1566        let state = Arc::new(core);
1567        let resp = proc_audit_verify(
1568            State(server_state_for_tests(state)),
1569            Method::HEAD,
1570            AxPath("home/missing-audit/verify".to_owned()),
1571            HeaderMap::new(),
1572        )
1573        .await;
1574
1575        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
1576        assert!(!db.exists());
1577
1578        let _ = std::fs::remove_dir_all(dir);
1579    }
1580
1581    #[tokio::test]
1582    async fn put_created_returns_location() {
1583        let (core, dir) = test_core("put-location");
1584        let headers = HeaderMap::new();
1585        let resp = unwrap_response(
1586            execute_put(
1587                headers.clone(),
1588                Bytes::from_static(b"new"),
1589                auth::Tier::Write,
1590                world_path("home/created"),
1591                &core,
1592                &TraceCtx::disabled(),
1593            )
1594            .await,
1595        );
1596
1597        assert_eq!(resp.status(), StatusCode::CREATED);
1598        assert_eq!(
1599            resp.headers().get(header::LOCATION).unwrap(),
1600            "/home/created"
1601        );
1602
1603        let resp = unwrap_response(
1604            execute_put(
1605                headers.clone(),
1606                Bytes::from_static(b"again"),
1607                auth::Tier::Write,
1608                world_path("home/created"),
1609                &core,
1610                &TraceCtx::disabled(),
1611            )
1612            .await,
1613        );
1614        assert_eq!(resp.status(), StatusCode::OK);
1615        assert!(resp.headers().get(header::LOCATION).is_none());
1616
1617        let _ = std::fs::remove_dir_all(dir);
1618    }
1619
1620    #[tokio::test]
1621    async fn location_and_link_headers_percent_encode_world_urls() {
1622        let (core, dir) = test_core("encoded-headers");
1623        let headers = HeaderMap::new();
1624        let resp = unwrap_response(
1625            execute_put(
1626                headers.clone(),
1627                Bytes::from_static(b"new"),
1628                auth::Tier::Write,
1629                world_path("home/café report"),
1630                &core,
1631                &TraceCtx::disabled(),
1632            )
1633            .await,
1634        );
1635        assert_eq!(resp.status(), StatusCode::CREATED);
1636        assert_eq!(
1637            resp.headers().get(header::LOCATION).unwrap(),
1638            "/home/caf%C3%A9%20report"
1639        );
1640
1641        let get = unwrap_response(
1642            execute_get(
1643                headers.clone(),
1644                auth::Tier::Anon,
1645                world_path("home/café report"),
1646                &core,
1647                &TraceCtx::disabled(),
1648            )
1649            .await,
1650        );
1651        let links: Vec<_> = get.headers().get_all(header::LINK).iter().collect();
1652        assert!(links
1653            .iter()
1654            .any(|v| *v == "</listen/home/caf%C3%A9%20report>; rel=\"monitor\""));
1655
1656        let _ = std::fs::remove_dir_all(dir);
1657    }
1658
1659    #[tokio::test]
1660    async fn unicode_worlds_roundtrip_body_headers_and_proc_listing() {
1661        let (core, dir) = test_core("unicode-roundtrip");
1662        let headers = vec![(
1663            "content-disposition".to_string(),
1664            "attachment; filename*=UTF-8''%E6%8A%A5%E5%91%8A.pdf".to_string(),
1665        )];
1666        core.write_world(
1667            "home/销售/报告",
1668            "你好,世界".as_bytes(),
1669            "text/plain; charset=utf-8",
1670            &headers,
1671        )
1672        .unwrap();
1673
1674        let req_headers = HeaderMap::new();
1675        let get = unwrap_response(
1676            execute_get(
1677                req_headers.clone(),
1678                auth::Tier::Anon,
1679                world_path("home/销售/报告"),
1680                &core,
1681                &TraceCtx::disabled(),
1682            )
1683            .await,
1684        );
1685        assert_eq!(get.status(), StatusCode::OK);
1686        assert_eq!(
1687            get.headers().get(header::CONTENT_TYPE).unwrap(),
1688            "text/plain; charset=utf-8"
1689        );
1690        assert_eq!(
1691            get.headers().get(header::CONTENT_DISPOSITION).unwrap(),
1692            "attachment; filename*=UTF-8''%E6%8A%A5%E5%91%8A.pdf"
1693        );
1694        assert!(
1695            get.headers()
1696                .get_all(header::LINK)
1697                .iter()
1698                .any(|v| *v
1699                    == "</listen/home/%E9%94%80%E5%94%AE/%E6%8A%A5%E5%91%8A>; rel=\"monitor\"")
1700        );
1701
1702        let names = store::list_all(&core.data, &core.mem);
1703        assert_eq!(world_list_body(&names.unwrap()), "home/销售/报告\n");
1704
1705        let _ = std::fs::remove_dir_all(dir);
1706    }
1707
1708    #[test]
1709    fn etag_lists_match_http_strong_and_weak_rules() {
1710        assert!(et::etag_list_strong_matches("\"hmac-abc\"", "hmac-abc"));
1711        assert!(et::etag_list_strong_matches(
1712            "\"other\", \"hmac-abc\"",
1713            "hmac-abc"
1714        ));
1715        assert!(et::etag_list_strong_matches("*", "hmac-abc"));
1716        assert!(!et::etag_list_strong_matches("W/\"hmac-abc\"", "hmac-abc"));
1717        assert!(!et::etag_list_strong_matches("\"other\"", "hmac-abc"));
1718
1719        assert!(et::etag_list_weak_matches("W/\"hmac-abc\"", "hmac-abc"));
1720    }
1721
1722    #[test]
1723    fn if_none_match_star_blocks_existing_world() {
1724        let (core, dir) = test_core("if-none-match-star");
1725        core.write_world("home/cas", b"one", "text/plain; charset=utf-8", &[])
1726            .unwrap();
1727
1728        let mut headers = HeaderMap::new();
1729        headers.insert(header::IF_NONE_MATCH, HeaderValue::from_static("*"));
1730
1731        assert!(hs::check_write_preconditions(&core, "home/cas", &headers).is_err());
1732        assert!(hs::check_write_preconditions(&core, "home/new", &headers).is_ok());
1733
1734        let _ = std::fs::remove_dir_all(dir);
1735    }
1736
1737    #[tokio::test]
1738    async fn put_and_post_honor_write_preconditions_at_handler_level() {
1739        let (core, dir) = test_core("write-preconditions");
1740        let h = world::write_with_audit(
1741            &core.data,
1742            "home/cas",
1743            b"one",
1744            "text/plain; charset=utf-8",
1745            &[],
1746            &core.hmac_key,
1747        )
1748        .unwrap();
1749
1750        let mut stale = HeaderMap::new();
1751        stale.insert(header::IF_MATCH, HeaderValue::from_static("\"hmac-stale\""));
1752        let put = unwrap_response(
1753            execute_put(
1754                stale.clone(),
1755                Bytes::from_static(b"two"),
1756                auth::Tier::Write,
1757                world_path("home/cas"),
1758                &core,
1759                &TraceCtx::disabled(),
1760            )
1761            .await,
1762        );
1763        assert_eq!(put.status(), StatusCode::PRECONDITION_FAILED);
1764
1765        let post = unwrap_response(
1766            execute_post(
1767                stale.clone(),
1768                Bytes::from_static(b" plus"),
1769                auth::Tier::Write,
1770                world_path("home/cas"),
1771                &core,
1772                &TraceCtx::disabled(),
1773            )
1774            .await,
1775        );
1776        assert_eq!(post.status(), StatusCode::PRECONDITION_FAILED);
1777
1778        let mut good = HeaderMap::new();
1779        good.insert(
1780            header::IF_MATCH,
1781            HeaderValue::from_str(&format!("\"{}\"", et::hmac_etag(&h))).unwrap(),
1782        );
1783        let post = unwrap_response(
1784            execute_post(
1785                good.clone(),
1786                Bytes::from_static(b" plus"),
1787                auth::Tier::Write,
1788                world_path("home/cas"),
1789                &core,
1790                &TraceCtx::disabled(),
1791            )
1792            .await,
1793        );
1794        assert_eq!(post.status(), StatusCode::OK);
1795
1796        let _ = std::fs::remove_dir_all(dir);
1797    }
1798
1799    #[test]
1800    fn if_match_accepts_current_hmac_etag_only() {
1801        let (core, dir) = test_core("if-match-hmac");
1802        core.write_world("home/cas", b"one", "text/plain; charset=utf-8", &[])
1803            .unwrap();
1804        let mut conn = world::open(&core.data, "home/cas").unwrap();
1805        let h = audit::append_with_conn_existing(
1806            &mut conn,
1807            "put",
1808            "home/cas",
1809            &world::sha256_hex(b"one"),
1810            3,
1811            "text/plain; charset=utf-8",
1812            &[],
1813            &core.hmac_key,
1814        )
1815        .unwrap();
1816        drop(conn);
1817        let etag = format!("\"{}\"", et::hmac_etag(&h));
1818
1819        let mut good = HeaderMap::new();
1820        good.insert(header::IF_MATCH, HeaderValue::from_str(&etag).unwrap());
1821        assert!(hs::check_write_preconditions(&core, "home/cas", &good).is_ok());
1822
1823        let mut stale = HeaderMap::new();
1824        stale.insert(header::IF_MATCH, HeaderValue::from_static("\"hmac-stale\""));
1825        assert!(hs::check_write_preconditions(&core, "home/cas", &stale).is_err());
1826
1827        let _ = std::fs::remove_dir_all(dir);
1828    }
1829
1830    #[test]
1831    fn request_content_type_preserves_http_content_type_verbatim() {
1832        let mut headers = HeaderMap::new();
1833        headers.insert(
1834            header::CONTENT_TYPE,
1835            HeaderValue::from_static("application/pdf"),
1836        );
1837        assert_eq!(hs::request_content_type(&headers), "application/pdf");
1838
1839        headers.insert(
1840            header::CONTENT_TYPE,
1841            HeaderValue::from_static("text/html; charset=utf-8"),
1842        );
1843        assert_eq!(
1844            hs::request_content_type(&headers),
1845            "text/html; charset=utf-8"
1846        );
1847
1848        headers.clear();
1849        assert_eq!(
1850            hs::request_content_type(&headers),
1851            "application/octet-stream"
1852        );
1853    }
1854
1855    #[test]
1856    fn request_meta_headers_persist_default_representation_headers_only() {
1857        // L2 (DEFAULT_PERSIST_HEADERS) covers standard representation
1858        // headers that round-trip with the body without the operator
1859        // configuring anything. Custom headers (x-author,
1860        // x-future-http-thing, x-meta-*) are NOT persisted under the
1861        // empty allowlist -- the operator must opt in via
1862        // ELASTIK_PERSIST_HEADERS. Layer 1 hard-deny stays in force
1863        // either way.
1864        let mut headers = HeaderMap::new();
1865        headers.insert(header::CONTENT_TYPE, HeaderValue::from_static("image/png"));
1866        headers.insert(header::CONTENT_ENCODING, HeaderValue::from_static("gzip"));
1867        headers.insert(header::CONTENT_LANGUAGE, HeaderValue::from_static("zh-CN"));
1868        headers.insert(
1869            header::CONTENT_DISPOSITION,
1870            HeaderValue::from_static("attachment; filename=\"report.pdf\""),
1871        );
1872        headers.insert(
1873            header::CACHE_CONTROL,
1874            HeaderValue::from_static("max-age=60"),
1875        );
1876        headers.insert("access-control-allow-origin", HeaderValue::from_static("*"));
1877        headers.insert(
1878            "access-control-allow-methods",
1879            HeaderValue::from_static("GET, HEAD"),
1880        );
1881        headers.insert(
1882            "access-control-expose-headers",
1883            HeaderValue::from_static("ETag"),
1884        );
1885        headers.insert(
1886            "content-security-policy",
1887            HeaderValue::from_static("default-src 'self'"),
1888        );
1889        headers.insert("x-frame-options", HeaderValue::from_static("DENY"));
1890        headers.insert("permissions-policy", HeaderValue::from_static("camera=()"));
1891        headers.insert(
1892            "cross-origin-resource-policy",
1893            HeaderValue::from_static("same-origin"),
1894        );
1895        headers.insert(
1896            "x-content-type-options",
1897            HeaderValue::from_static("nosniff"),
1898        );
1899        headers.insert("x-future-http-thing", HeaderValue::from_static("ok"));
1900        headers.insert("x-meta-author", HeaderValue::from_static("ranger"));
1901
1902        headers.insert(
1903            header::AUTHORIZATION,
1904            HeaderValue::from_str(&format!("{} {}", "Bearer", "secret")).unwrap(),
1905        );
1906        headers.insert(
1907            "proxy-authorization",
1908            HeaderValue::from_str(&format!("{} {}", "Bearer", "secret")).unwrap(),
1909        );
1910        headers.insert(header::COOKIE, HeaderValue::from_static("sid=secret"));
1911        headers.insert(header::SET_COOKIE, HeaderValue::from_static("sid=secret"));
1912        headers.insert(header::HOST, HeaderValue::from_static("localhost:3105"));
1913        headers.insert(header::CONNECTION, HeaderValue::from_static("keep-alive"));
1914        headers.insert("keep-alive", HeaderValue::from_static("timeout=5"));
1915        headers.insert(
1916            header::TRANSFER_ENCODING,
1917            HeaderValue::from_static("chunked"),
1918        );
1919        headers.insert(header::TE, HeaderValue::from_static("trailers"));
1920        headers.insert(header::TRAILER, HeaderValue::from_static("expires"));
1921        headers.insert(header::UPGRADE, HeaderValue::from_static("websocket"));
1922        headers.insert("http2-settings", HeaderValue::from_static("abc"));
1923        headers.insert(header::CONTENT_LENGTH, HeaderValue::from_static("999"));
1924        headers.insert(header::ETAG, HeaderValue::from_static("\"fake\""));
1925        headers.insert(header::ALLOW, HeaderValue::from_static("GET"));
1926        headers.insert(header::LOCATION, HeaderValue::from_static("/elsewhere"));
1927        headers.insert(header::LINK, HeaderValue::from_static("</x>; rel=\"next\""));
1928        headers.insert(
1929            header::WWW_AUTHENTICATE,
1930            HeaderValue::from_static("Bearer realm=\"x\""),
1931        );
1932        headers.insert(header::ACCEPT, HeaderValue::from_static("text/html"));
1933        headers.insert(header::ACCEPT_ENCODING, HeaderValue::from_static("gzip"));
1934        headers.insert(header::ACCEPT_LANGUAGE, HeaderValue::from_static("zh-CN"));
1935        headers.insert("accept-charset", HeaderValue::from_static("utf-8"));
1936        headers.insert(header::RANGE, HeaderValue::from_static("bytes=0-1"));
1937        headers.insert(header::IF_MATCH, HeaderValue::from_static("\"abc\""));
1938        headers.insert(header::IF_NONE_MATCH, HeaderValue::from_static("\"abc\""));
1939        headers.insert(header::IF_RANGE, HeaderValue::from_static("\"abc\""));
1940        headers.insert(
1941            header::IF_MODIFIED_SINCE,
1942            HeaderValue::from_static("Wed, 21 Oct 2015 07:28:00 GMT"),
1943        );
1944        headers.insert(
1945            header::IF_UNMODIFIED_SINCE,
1946            HeaderValue::from_static("Wed, 21 Oct 2015 07:28:00 GMT"),
1947        );
1948        headers.insert(header::EXPECT, HeaderValue::from_static("100-continue"));
1949        headers.insert("sec-fetch-mode", HeaderValue::from_static("cors"));
1950        headers.insert("sec-ch-ua", HeaderValue::from_static("\"Chromium\""));
1951        headers.insert("dnt", HeaderValue::from_static("1"));
1952        headers.insert(
1953            header::ORIGIN,
1954            HeaderValue::from_static("https://example.com"),
1955        );
1956        headers.insert(
1957            header::REFERER,
1958            HeaderValue::from_static("https://example.com/"),
1959        );
1960        headers.insert(header::USER_AGENT, HeaderValue::from_static("curl"));
1961        headers.insert(header::SERVER, HeaderValue::from_static("fake"));
1962        headers.insert(
1963            header::DATE,
1964            HeaderValue::from_static("Wed, 21 Oct 2015 07:28:00 GMT"),
1965        );
1966        headers.insert(header::AGE, HeaderValue::from_static("10"));
1967        headers.insert(header::VARY, HeaderValue::from_static("*"));
1968        headers.insert("via", HeaderValue::from_static("1.1 proxy"));
1969        headers.insert("forwarded", HeaderValue::from_static("for=127.0.0.1"));
1970        headers.insert("x-forwarded-for", HeaderValue::from_static("127.0.0.1"));
1971        headers.insert("x-forwarded-host", HeaderValue::from_static("example.com"));
1972        headers.insert("x-forwarded-proto", HeaderValue::from_static("https"));
1973        headers.insert("x-real-ip", HeaderValue::from_static("203.0.113.7"));
1974        headers.insert("true-client-ip", HeaderValue::from_static("203.0.113.7"));
1975        headers.insert("client-ip", HeaderValue::from_static("203.0.113.7"));
1976        headers.insert("clear-site-data", HeaderValue::from_static("\"cookies\""));
1977        // Distributed tracing pollutants (W3C + Zipkin + AWS X-Ray).
1978        // Auto-injected by APM agents; if persisted, next reader
1979        // replays writer's trace ID and corrupts downstream tracing.
1980        headers.insert(
1981            "traceparent",
1982            HeaderValue::from_static("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"),
1983        );
1984        headers.insert(
1985            "tracestate",
1986            HeaderValue::from_static("rojo=00f067aa0ba902b7"),
1987        );
1988        headers.insert(
1989            "baggage",
1990            HeaderValue::from_static("userId=alice,serverNode=DF%2028"),
1991        );
1992        headers.insert(
1993            "b3",
1994            HeaderValue::from_static("80f198ee56343ba864fe8b2a57d3eff7-e457b5a2e4d86bd1-1"),
1995        );
1996        headers.insert(
1997            "x-b3-traceid",
1998            HeaderValue::from_static("80f198ee56343ba864fe8b2a57d3eff7"),
1999        );
2000        headers.insert("x-b3-spanid", HeaderValue::from_static("e457b5a2e4d86bd1"));
2001        headers.insert("x-b3-sampled", HeaderValue::from_static("1"));
2002        headers.insert(
2003            "x-amzn-trace-id",
2004            HeaderValue::from_static("Root=1-5759e988-bd862e3fe1be46a994272793"),
2005        );
2006        headers.insert("cf-ray", HeaderValue::from_static("8f1234567abcdef0-IAD"));
2007        headers.insert("cf-connecting-ip", HeaderValue::from_static("203.0.113.7"));
2008        headers.insert(
2009            "cf-visitor",
2010            HeaderValue::from_static("{\"scheme\":\"https\"}"),
2011        );
2012        // HTTP transport version markers.
2013        headers.insert(
2014            "http3-settings",
2015            HeaderValue::from_static("AAMAAABkAARAAgAAAAAEAIAAAAA"),
2016        );
2017        headers.insert("pragma", HeaderValue::from_static("no-cache"));
2018
2019        let allowlist = hs::HeaderAllowlist::empty();
2020        let user_deny = hs::HeaderAllowlist::empty();
2021        let meta = hs::request_meta_headers(&headers, &allowlist, &user_deny);
2022        let has = |name: &str| meta.iter().any(|(n, _)| n == name);
2023
2024        assert!(meta.contains(&("content-encoding".to_string(), "gzip".to_string())));
2025        assert!(meta.contains(&("content-language".to_string(), "zh-CN".to_string())));
2026        assert!(meta.contains(&(
2027            "content-disposition".to_string(),
2028            "attachment; filename=\"report.pdf\"".to_string()
2029        )));
2030        assert!(meta.contains(&("cache-control".to_string(), "max-age=60".to_string())));
2031        assert!(meta.contains(&("access-control-allow-origin".to_string(), "*".to_string())));
2032        assert!(meta.contains(&(
2033            "content-security-policy".to_string(),
2034            "default-src 'self'".to_string()
2035        )));
2036        assert!(meta.contains(&("x-frame-options".to_string(), "DENY".to_string())));
2037        assert!(meta.contains(&("permissions-policy".to_string(), "camera=()".to_string())));
2038        assert!(meta.contains(&(
2039            "cross-origin-resource-policy".to_string(),
2040            "same-origin".to_string()
2041        )));
2042        // Custom headers (no `x-` shortcut, no auto-allowlist for
2043        // x-meta-*) MUST NOT persist under the empty allowlist.
2044        assert!(
2045            !has("x-meta-author"),
2046            "x-meta-* is opt-in via ELASTIK_PERSIST_HEADERS"
2047        );
2048        assert!(
2049            !has("x-future-http-thing"),
2050            "unknown x- headers default-deny"
2051        );
2052
2053        for name in [
2054            "authorization",
2055            "proxy-authorization",
2056            "cookie",
2057            "set-cookie",
2058            "host",
2059            "connection",
2060            "keep-alive",
2061            "transfer-encoding",
2062            "te",
2063            "trailer",
2064            "upgrade",
2065            "http2-settings",
2066            "content-type",
2067            "content-length",
2068            "etag",
2069            "allow",
2070            "location",
2071            "link",
2072            "www-authenticate",
2073            "accept",
2074            "accept-encoding",
2075            "accept-language",
2076            "accept-charset",
2077            "range",
2078            "if-match",
2079            "if-none-match",
2080            "if-range",
2081            "if-modified-since",
2082            "if-unmodified-since",
2083            "expect",
2084            "sec-fetch-mode",
2085            "sec-ch-ua",
2086            "dnt",
2087            "origin",
2088            "referer",
2089            "user-agent",
2090            "server",
2091            "date",
2092            "age",
2093            "vary",
2094            "x-request-id",
2095            "x-elapsed-us",
2096            "x-elapsed-ms",
2097            "x-content-type-options",
2098            "via",
2099            "forwarded",
2100            "x-forwarded-for",
2101            "x-forwarded-host",
2102            "x-forwarded-proto",
2103            "x-real-ip",
2104            "true-client-ip",
2105            "client-ip",
2106            "clear-site-data",
2107            // Distributed tracing pollutants.
2108            "traceparent",
2109            "tracestate",
2110            "baggage",
2111            "b3",
2112            "x-b3-traceid",
2113            "x-b3-spanid",
2114            "x-b3-sampled",
2115            // Cloud-provider runtime injections.
2116            "x-amzn-trace-id",
2117            "cf-ray",
2118            "cf-connecting-ip",
2119            "cf-visitor",
2120            // Transport version markers + HTTP/1.0 living fossil.
2121            "http3-settings",
2122            "pragma",
2123        ] {
2124            assert!(!has(name), "{name} should not be persisted");
2125        }
2126    }
2127
2128    #[test]
2129    fn request_meta_headers_deduplicate_repeated_names_last_wins() {
2130        let mut headers = HeaderMap::new();
2131        headers.append("x-meta-author", HeaderValue::from_static("alice"));
2132        headers.append("x-meta-author", HeaderValue::from_static("bob"));
2133
2134        // Allowlist x-meta-* so the dedup behaviour is observable;
2135        // the empty allowlist would correctly drop both entries.
2136        let allowlist = hs::HeaderAllowlist::parse("x-meta-*");
2137        let user_deny = hs::HeaderAllowlist::empty();
2138        let meta = hs::request_meta_headers(&headers, &allowlist, &user_deny);
2139
2140        assert_eq!(meta, vec![("x-meta-author".to_string(), "bob".to_string())]);
2141    }
2142
2143    #[test]
2144    fn request_meta_headers_user_allowlist_persists_custom_headers() {
2145        // Layer 3 (user allowlist) opens custom-header round-trip
2146        // on top of the default-allow set. Exact match and prefix
2147        // match (`x-my-*`) both work.
2148        let mut headers = HeaderMap::new();
2149        headers.insert("x-author", HeaderValue::from_static("ranger"));
2150        headers.insert("x-version", HeaderValue::from_static("7.1.0"));
2151        headers.insert("x-my-tag", HeaderValue::from_static("custom"));
2152        headers.insert("x-my-region", HeaderValue::from_static("ap-east-1"));
2153        headers.insert("x-other", HeaderValue::from_static("not-allowlisted"));
2154        // L1 hard deny -- must lose to user allowlist below.
2155        headers.insert(
2156            "traceparent",
2157            HeaderValue::from_static("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"),
2158        );
2159        // L2 default-allow stays on top of L3.
2160        headers.insert(
2161            header::CACHE_CONTROL,
2162            HeaderValue::from_static("max-age=60"),
2163        );
2164
2165        let allowlist = hs::HeaderAllowlist::parse("x-author,x-version,x-my-*,traceparent");
2166        let user_deny = hs::HeaderAllowlist::empty();
2167        let meta = hs::request_meta_headers(&headers, &allowlist, &user_deny);
2168        let has = |name: &str| meta.iter().any(|(n, _)| n == name);
2169
2170        // L3 named exact: persisted.
2171        assert!(has("x-author"));
2172        assert!(has("x-version"));
2173        // L3 prefix wildcard: persisted.
2174        assert!(has("x-my-tag"));
2175        assert!(has("x-my-region"));
2176        // Not allowlisted -> default-deny (L3 didn't match, L2 doesn't cover).
2177        assert!(!has("x-other"));
2178        // L1 hard deny ALWAYS wins -- even if traceparent is in the
2179        // user allowlist, it never persists.
2180        assert!(
2181            !has("traceparent"),
2182            "L1 hard deny must override user allowlist; tracing context never persists"
2183        );
2184        // L2 default-allow still applies alongside L3.
2185        assert!(has("cache-control"));
2186    }
2187
2188    #[test]
2189    fn request_meta_headers_user_deny_subtracts_from_default_allow() {
2190        // Layer 1.5 (user deny / ELASTIK_DENY_HEADERS) lets an
2191        // operator subtract a header from the built-in
2192        // DEFAULT_PERSIST_HEADERS without recompiling. Order:
2193        //   L1 hard deny > L1.5 user deny > L2 default > L3 user allow.
2194        let mut headers = HeaderMap::new();
2195        // L2 defaults that should normally persist:
2196        headers.insert(
2197            header::CACHE_CONTROL,
2198            HeaderValue::from_static("max-age=60"),
2199        );
2200        headers.insert(header::CONTENT_ENCODING, HeaderValue::from_static("gzip"));
2201        headers.insert("permissions-policy", HeaderValue::from_static("camera=()"));
2202        // L3-allowlisted custom that user_deny should also win over:
2203        headers.insert("x-author", HeaderValue::from_static("ranger"));
2204
2205        // Operator says: drop cache-control + permissions-policy
2206        // from defaults; also kill any allowlisted x-author (user
2207        // deny beats user allow).
2208        let allowlist = hs::HeaderAllowlist::parse("x-author");
2209        let user_deny = hs::HeaderAllowlist::parse("cache-control,permissions-policy,x-author");
2210        let meta = hs::request_meta_headers(&headers, &allowlist, &user_deny);
2211        let has = |name: &str| meta.iter().any(|(n, _)| n == name);
2212
2213        // L2 defaults the operator denied -> dropped.
2214        assert!(
2215            !has("cache-control"),
2216            "user-deny must subtract from L2 default-allow"
2217        );
2218        assert!(!has("permissions-policy"));
2219        // L2 default not in user_deny -> still persisted.
2220        assert!(has("content-encoding"));
2221        // L3 allowlisted but also user_denied -> deny wins.
2222        assert!(
2223            !has("x-author"),
2224            "user-deny must override user-allow when both match"
2225        );
2226    }
2227
2228    #[test]
2229    fn request_meta_headers_is_case_insensitive_per_rfc_7230() {
2230        // RFC 7230: HTTP header field names are case-insensitive.
2231        // axum's HeaderMap canonicalizes incoming names to
2232        // lowercase via `HeaderName::as_str`; this test pins that
2233        // axum-side normalization PLUS verifies that the
2234        // env-var-side parser lowercases its input.
2235        let mut headers = HeaderMap::new();
2236        // Insert with mixed case -- axum stores keys in lowercase
2237        // canonical form, so the iteration in
2238        // `request_meta_headers` sees lowercase.
2239        headers.insert("X-Author", HeaderValue::from_static("ranger"));
2240        headers.insert("CACHE-CONTROL", HeaderValue::from_static("max-age=60"));
2241        headers.insert("Traceparent", HeaderValue::from_static("00-...-01"));
2242        headers.insert("X-Forwarded-For", HeaderValue::from_static("203.0.113.7"));
2243
2244        // Operator wrote ELASTIK_PERSIST_HEADERS with mixed case --
2245        // parser lowercases.
2246        let allowlist = hs::HeaderAllowlist::parse("X-AUTHOR, X-Version");
2247        let user_deny = hs::HeaderAllowlist::empty();
2248        let meta = hs::request_meta_headers(&headers, &allowlist, &user_deny);
2249        let has = |name: &str| meta.iter().any(|(n, _)| n == name);
2250
2251        // Mixed-case allowlist entry persists the (lowercase-stored)
2252        // input header.
2253        assert!(has("x-author"), "X-Author allowlisted as X-AUTHOR persists");
2254        // L2 default catches CACHE-CONTROL regardless of input case.
2255        assert!(has("cache-control"));
2256        // L1 hard-deny still blocks tracing context regardless of
2257        // input case.
2258        assert!(!has("traceparent"));
2259        assert!(!has("x-forwarded-for"));
2260
2261        // Stored header names are always lowercase canonical form.
2262        for (name, _) in &meta {
2263            assert_eq!(
2264                name,
2265                &name.to_ascii_lowercase(),
2266                "stored meta keys must be lowercase"
2267            );
2268        }
2269    }
2270
2271    #[test]
2272    fn header_allowlist_parser_handles_whitespace_dedup_and_wildcards() {
2273        // Whitespace per entry trimmed; case folded; duplicates de-duped;
2274        // empty / all-* entries skipped.
2275        let allow = hs::HeaderAllowlist::parse(
2276            "  X-Author , x-version, x-my-*, x-version, , *, x-my-* , x-AUTHOR",
2277        );
2278        assert!(allow.matches("x-author"));
2279        assert!(allow.matches("x-version"));
2280        assert!(allow.matches("x-my-tag"));
2281        assert!(allow.matches("x-my-region"));
2282        assert!(!allow.matches("x-other"));
2283        assert!(!allow.matches(""));
2284        // A bare `*` in the env doesn't open the floodgates.
2285        assert!(!allow.matches("anything-else"));
2286
2287        // Empty input == empty allowlist.
2288        assert!(hs::HeaderAllowlist::parse("").is_empty());
2289        assert!(hs::HeaderAllowlist::parse("   ,  ,").is_empty());
2290    }
2291
2292    #[tokio::test]
2293    async fn get_returns_stored_standard_representation_headers() {
2294        let (core, dir) = test_core("representation-headers");
2295        let headers = vec![
2296            ("content-encoding".to_string(), "gzip".to_string()),
2297            (
2298                "content-disposition".to_string(),
2299                "attachment; filename=\"report.pdf\"".to_string(),
2300            ),
2301            ("access-control-allow-origin".to_string(), "*".to_string()),
2302            (
2303                "content-security-policy".to_string(),
2304                "default-src 'self'".to_string(),
2305            ),
2306            ("x-frame-options".to_string(), "DENY".to_string()),
2307        ];
2308        core.write_world(
2309            "home/gzip",
2310            b"compressed bytes",
2311            "application/pdf",
2312            &headers,
2313        )
2314        .unwrap();
2315
2316        let req_headers = HeaderMap::new();
2317        let resp = unwrap_response(
2318            execute_get(
2319                req_headers.clone(),
2320                auth::Tier::Anon,
2321                world_path("home/gzip"),
2322                &core,
2323                &TraceCtx::disabled(),
2324            )
2325            .await,
2326        );
2327        assert_eq!(resp.status(), StatusCode::OK);
2328        assert_eq!(
2329            resp.headers().get(header::CONTENT_ENCODING).unwrap(),
2330            "gzip"
2331        );
2332        assert_eq!(
2333            resp.headers().get(header::CONTENT_DISPOSITION).unwrap(),
2334            "attachment; filename=\"report.pdf\""
2335        );
2336        assert_eq!(
2337            resp.headers().get("access-control-allow-origin").unwrap(),
2338            "*"
2339        );
2340        assert_eq!(
2341            resp.headers().get("content-security-policy").unwrap(),
2342            "default-src 'self'"
2343        );
2344        assert_eq!(resp.headers().get("x-frame-options").unwrap(), "DENY");
2345
2346        let _ = std::fs::remove_dir_all(dir);
2347    }
2348
2349    #[test]
2350    fn worlds_store_content_type_not_private_extensions() {
2351        let (core, dir) = test_core("content-type");
2352        core.write_world("home/pdf", b"%PDF-1.7", "application/pdf", &[])
2353            .unwrap();
2354
2355        let stage = core.read_world("home/pdf").unwrap().unwrap();
2356        assert_eq!(stage.content_type, "application/pdf");
2357        assert_eq!(stage.body, b"%PDF-1.7");
2358
2359        let _ = std::fs::remove_dir_all(dir);
2360    }
2361
2362    #[test]
2363    fn storage_prefix_routes_memory_and_disk_modes() {
2364        assert!(!store::is_memory_world("home/report"));
2365        assert!(!store::is_memory_world("etc/config"));
2366        assert!(store::is_memory_world("tmp/scratch"));
2367        assert!(store::is_memory_world("dev/fb0"));
2368        assert!(store::is_memory_world("sys/status"));
2369        assert!(store::is_persistent("home/report"));
2370        assert!(!store::is_persistent("tmp/scratch"));
2371    }
2372
2373    #[test]
2374    fn memory_worlds_do_not_create_sqlite_files_or_audit_chain() {
2375        let (core, dir) = test_core("memory-world");
2376        core.write_world(
2377            "tmp/scratch",
2378            b"draft",
2379            "text/plain; charset=utf-8",
2380            &[("x-meta-owner".to_string(), "agent".to_string())],
2381        )
2382        .unwrap();
2383
2384        let stage = core.read_world("tmp/scratch").unwrap().unwrap();
2385        assert_eq!(stage.body, b"draft");
2386        assert_eq!(stage.content_type, "text/plain; charset=utf-8");
2387        assert_eq!(
2388            stage.headers,
2389            vec![("x-meta-owner".to_string(), "agent".to_string())]
2390        );
2391        assert!(!world::world_db(&core.data, "tmp/scratch").exists());
2392        assert!(audit::latest_hmac(&core.data, "tmp/scratch").is_none());
2393
2394        let names = store::list_all(&core.data, &core.mem);
2395        assert_eq!(names.unwrap(), vec!["tmp/scratch".to_string()]);
2396
2397        let _ = std::fs::remove_dir_all(dir);
2398    }
2399
2400    #[test]
2401    fn disk_worlds_create_sqlite_files_and_audit_chain_when_using_audit_path() {
2402        let (core, dir) = test_core("disk-world");
2403        let h = world::write_with_audit(
2404            &core.data,
2405            "home/report",
2406            b"final",
2407            "text/plain; charset=utf-8",
2408            &[],
2409            &core.hmac_key,
2410        )
2411        .unwrap();
2412
2413        let stage = core.read_world("home/report").unwrap().unwrap();
2414        assert_eq!(stage.body, b"final");
2415        assert!(world::world_db(&core.data, "home/report").exists());
2416        assert_eq!(audit::latest_hmac(&core.data, "home/report"), Some(h));
2417
2418        let names = store::list_all(&core.data, &core.mem);
2419        assert_eq!(names.unwrap(), vec!["home/report".to_string()]);
2420
2421        let _ = std::fs::remove_dir_all(dir);
2422    }
2423
2424    #[test]
2425    fn audit_keeps_historical_metadata_without_json_payload() {
2426        let (core, dir) = test_core("audit-meta");
2427        let headers = vec![("x-meta-author".to_string(), "ranger".to_string())];
2428        let h = world::write_with_audit(
2429            &core.data,
2430            "home/audit-meta",
2431            b"hello",
2432            "text/plain; charset=utf-8",
2433            &headers,
2434            &core.hmac_key,
2435        )
2436        .unwrap();
2437
2438        let c = rusqlite::Connection::open(world::world_db(&core.data, "home/audit-meta")).unwrap();
2439        let (content_type, meta_sha256): (String, String) = c
2440            .query_row(
2441                "SELECT content_type, meta_sha256 FROM events WHERE hmac=?",
2442                [h],
2443                |r| Ok((r.get(0)?, r.get(1)?)),
2444            )
2445            .unwrap();
2446        assert_eq!(content_type, "text/plain; charset=utf-8");
2447        assert_eq!(
2448            meta_sha256,
2449            audit::meta_sha256("text/plain; charset=utf-8", &headers)
2450        );
2451
2452        let author: String = c
2453            .query_row(
2454                "SELECT value FROM event_headers WHERE name='x-meta-author'",
2455                [],
2456                |r| r.get(0),
2457            )
2458            .unwrap();
2459        assert_eq!(author, "ranger");
2460
2461        let _ = std::fs::remove_dir_all(dir);
2462    }
2463
2464    #[tokio::test]
2465    async fn post_audit_uses_existing_representation_metadata() {
2466        let (core, dir) = test_core("post-audit-meta");
2467        let headers = vec![
2468            ("content-encoding".to_string(), "gzip".to_string()),
2469            ("x-meta-author".to_string(), "ranger".to_string()),
2470        ];
2471        world::write_with_audit(
2472            &core.data,
2473            "home/post-audit-meta",
2474            b"hello",
2475            "text/plain",
2476            &headers,
2477            &core.hmac_key,
2478        )
2479        .unwrap();
2480
2481        let mut req_headers = HeaderMap::new();
2482        req_headers.insert(
2483            header::CONTENT_TYPE,
2484            HeaderValue::from_static("application/pdf"),
2485        );
2486        req_headers.insert(header::CONTENT_LANGUAGE, HeaderValue::from_static("zh-CN"));
2487        let resp = unwrap_response(
2488            execute_post(
2489                req_headers.clone(),
2490                Bytes::from_static(b" world"),
2491                auth::Tier::Write,
2492                world_path("home/post-audit-meta"),
2493                &core,
2494                &TraceCtx::disabled(),
2495            )
2496            .await,
2497        );
2498        assert_eq!(resp.status(), StatusCode::OK);
2499
2500        let c = rusqlite::Connection::open(world::world_db(&core.data, "home/post-audit-meta"))
2501            .unwrap();
2502        let (content_type, meta_sha256): (String, String) = c
2503            .query_row(
2504                "SELECT content_type, meta_sha256 FROM events WHERE event_type='append'",
2505                [],
2506                |r| Ok((r.get(0)?, r.get(1)?)),
2507            )
2508            .unwrap();
2509        assert_eq!(content_type, "text/plain");
2510        assert_eq!(meta_sha256, audit::meta_sha256("text/plain", &headers));
2511
2512        let language_count: i64 = c
2513            .query_row(
2514                "SELECT COUNT(*) FROM event_headers WHERE name='content-language'",
2515                [],
2516                |r| r.get(0),
2517            )
2518            .unwrap();
2519        assert_eq!(language_count, 0);
2520
2521        let _ = std::fs::remove_dir_all(dir);
2522    }
2523
2524    #[tokio::test]
2525    async fn delete_honors_if_match_before_audit_or_remove() {
2526        let (core, dir) = test_core("delete-if-match");
2527        let core = Arc::new(core);
2528        let state = server_state_for_tests(core.clone());
2529        let h = world::write_with_audit(
2530            &core.data,
2531            "home/delete-cas",
2532            b"alive",
2533            "text/plain; charset=utf-8",
2534            &[],
2535            &core.hmac_key,
2536        )
2537        .unwrap();
2538
2539        let mut stale = HeaderMap::new();
2540        stale.insert(header::IF_MATCH, HeaderValue::from_static("\"hmac-stale\""));
2541        let resp = unwrap_response(
2542            execute_delete(
2543                stale.clone(),
2544                auth::Tier::Approve,
2545                world_path("home/delete-cas"),
2546                &state,
2547                &TraceCtx::disabled(),
2548            )
2549            .await,
2550        );
2551        assert_eq!(resp.status(), StatusCode::PRECONDITION_FAILED);
2552        assert!(core.read_world("home/delete-cas").unwrap().is_some());
2553
2554        let mut good = HeaderMap::new();
2555        good.insert(
2556            header::IF_MATCH,
2557            HeaderValue::from_str(&format!("\"{}\"", et::hmac_etag(&h))).unwrap(),
2558        );
2559        let resp = unwrap_response(
2560            execute_delete(
2561                good.clone(),
2562                auth::Tier::Approve,
2563                world_path("home/delete-cas"),
2564                &state,
2565                &TraceCtx::disabled(),
2566            )
2567            .await,
2568        );
2569        assert_eq!(resp.status(), StatusCode::NO_CONTENT);
2570        assert!(core.read_world("home/delete-cas").unwrap().is_none());
2571        assert!(core.read_world("var/log/deletes").unwrap().is_some());
2572        assert!(matches!(
2573            core.cached_verify_chain("var/log/deletes").unwrap(),
2574            Some(audit::VerifyReport::Valid(_))
2575        ));
2576        let ledger = world::open_existing(&core.data, "var/log/deletes")
2577            .unwrap()
2578            .unwrap();
2579        let mut stmt = ledger
2580            .prepare("SELECT event_type FROM events ORDER BY id")
2581            .unwrap();
2582        let events: Vec<String> = stmt
2583            .query_map([], |r| r.get(0))
2584            .unwrap()
2585            .collect::<Result<_, _>>()
2586            .unwrap();
2587        assert_eq!(events, vec!["delete_intent", "delete_commit"]);
2588
2589        let _ = std::fs::remove_dir_all(dir);
2590    }
2591
2592    #[tokio::test]
2593    async fn delete_returns_500_when_commit_audit_fails_after_physical_delete() {
2594        let (core, dir) = test_core("delete-commit-audit-fail");
2595        let core = Arc::new(core);
2596        let state = server_state_for_tests(core.clone());
2597        core.write_world("home/delete-degraded", b"alive", "text/plain", &[])
2598            .unwrap();
2599        world::write_with_audit(
2600            &core.data,
2601            "var/log/deletes",
2602            b"ledger",
2603            "text/plain",
2604            &[],
2605            &core.hmac_key,
2606        )
2607        .unwrap();
2608        core.delete_ledger_created.store(true, Ordering::Relaxed);
2609        {
2610            let c =
2611                rusqlite::Connection::open(world::world_db(&core.data, "var/log/deletes")).unwrap();
2612            c.execute_batch(
2613                r#"
2614                CREATE TRIGGER fail_delete_commit
2615                BEFORE INSERT ON events
2616                WHEN NEW.event_type='delete_commit'
2617                BEGIN
2618                    SELECT RAISE(FAIL, 'delete_commit blocked');
2619                END;
2620                "#,
2621            )
2622            .unwrap();
2623        }
2624
2625        let resp = unwrap_response(
2626            execute_delete(
2627                HeaderMap::new(),
2628                auth::Tier::Approve,
2629                world_path("home/delete-degraded"),
2630                &state,
2631                &TraceCtx::disabled(),
2632            )
2633            .await,
2634        );
2635
2636        assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR);
2637        assert!(core.read_world("home/delete-degraded").unwrap().is_none());
2638        let ledger = world::open_existing(&core.data, "var/log/deletes")
2639            .unwrap()
2640            .unwrap();
2641        let events = ledger
2642            .prepare("SELECT event_type FROM events ORDER BY id")
2643            .unwrap()
2644            .query_map([], |r| r.get::<_, String>(0))
2645            .unwrap()
2646            .collect::<Result<Vec<_>, _>>()
2647            .unwrap();
2648        assert_eq!(events, vec!["put", "delete_intent", "delete_commit_failed"]);
2649
2650        let _ = std::fs::remove_dir_all(dir);
2651    }
2652
2653    #[tokio::test]
2654    async fn delete_rejects_auth_token_and_append_only_ledger() {
2655        let (core, dir) = test_core("delete-policy");
2656        let core = Arc::new(core);
2657        let state = server_state_for_tests(core.clone());
2658        world::write_with_audit(
2659            &core.data,
2660            "home/delete-policy",
2661            b"alive",
2662            "text/plain; charset=utf-8",
2663            &[],
2664            &core.hmac_key,
2665        )
2666        .unwrap();
2667        world::write_with_audit(
2668            &core.data,
2669            "var/log/deletes",
2670            b"ledger",
2671            "text/plain; charset=utf-8",
2672            &[],
2673            &core.hmac_key,
2674        )
2675        .unwrap();
2676        let headers = HeaderMap::new();
2677
2678        let auth_delete = unwrap_response(
2679            execute_delete(
2680                headers.clone(),
2681                auth::Tier::Write,
2682                world_path("home/delete-policy"),
2683                &state,
2684                &TraceCtx::disabled(),
2685            )
2686            .await,
2687        );
2688        assert_eq!(auth_delete.status(), StatusCode::UNAUTHORIZED);
2689        assert!(core.read_world("home/delete-policy").unwrap().is_some());
2690
2691        let ledger_delete = unwrap_response(
2692            execute_delete(
2693                headers.clone(),
2694                auth::Tier::Approve,
2695                world_path("var/log/deletes"),
2696                &state,
2697                &TraceCtx::disabled(),
2698            )
2699            .await,
2700        );
2701        assert_eq!(ledger_delete.status(), StatusCode::UNAUTHORIZED);
2702        assert_eq!(
2703            ledger_delete
2704                .headers()
2705                .get(header::WWW_AUTHENTICATE)
2706                .unwrap(),
2707            "Bearer realm=\"elastik\""
2708        );
2709        assert_eq!(
2710            ledger_delete.headers().get(header::CONTENT_TYPE).unwrap(),
2711            "text/plain; charset=utf-8"
2712        );
2713        assert_eq!(
2714            response_text(ledger_delete).await,
2715            "auth required: delete ledger is append-only\n"
2716        );
2717        assert!(core.read_world("var/log/deletes").unwrap().is_some());
2718
2719        let _ = std::fs::remove_dir_all(dir);
2720    }
2721
2722    #[tokio::test]
2723    async fn delete_missing_world_does_not_write_delete_ledger() {
2724        let (core, dir) = test_core("delete-missing");
2725        let core = Arc::new(core);
2726        let state = server_state_for_tests(core.clone());
2727        let headers = HeaderMap::new();
2728        let resp = unwrap_response(
2729            execute_delete(
2730                headers.clone(),
2731                auth::Tier::Approve,
2732                world_path("home/missing"),
2733                &state,
2734                &TraceCtx::disabled(),
2735            )
2736            .await,
2737        );
2738        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2739        assert!(core.read_world("var/log/deletes").unwrap().is_none());
2740
2741        let _ = std::fs::remove_dir_all(dir);
2742    }
2743
2744    #[test]
2745    fn proc_worlds_body_is_plain_lines() {
2746        assert_eq!(world_list_body(&[]), "");
2747        assert_eq!(
2748            world_list_body(&["home/a".to_owned(), "tmp/b".to_owned()]),
2749            "home/a\ntmp/b\n"
2750        );
2751    }
2752
2753    #[tokio::test]
2754    async fn proc_du_and_df_report_resource_usage() {
2755        let (mut core, dir) = test_core("proc-du-df");
2756        core.max_storage_bytes = Some(10);
2757        core.write_world("home/hello", b"hello", "text/plain", &[])
2758            .unwrap();
2759        core.write_world("tmp/scratch", b"data", "text/plain", &[])
2760            .unwrap();
2761        let state = Arc::new(core);
2762        let headers = HeaderMap::new();
2763
2764        let du = proc_du(
2765            State(server_state_for_tests(state.clone())),
2766            Method::GET,
2767            headers.clone(),
2768        )
2769        .await;
2770        assert_eq!(du.status(), StatusCode::OK);
2771        let du_body = response_text(du).await;
2772        assert!(du_body.contains("home/hello\t5\n"));
2773        assert!(du_body.contains("tmp/scratch\t4\n"));
2774
2775        let df = proc_df(
2776            State(server_state_for_tests(state.clone())),
2777            Method::GET,
2778            headers.clone(),
2779        )
2780        .await;
2781        assert_eq!(df.status(), StatusCode::OK);
2782        let df_body = response_text(df).await;
2783        assert!(df_body.contains("storage\t5\t10\t5\n"));
2784        assert!(df_body.contains("memory\t4\t268435456\t268435452\n"));
2785        assert!(df_body.contains("worlds\t2\tunlimited\tunlimited\n"));
2786
2787        let head = proc_du(State(server_state_for_tests(state)), Method::HEAD, headers).await;
2788        assert_eq!(head.status(), StatusCode::OK);
2789        assert_eq!(head.headers().get(header::CONTENT_LENGTH).unwrap(), "27");
2790        assert_eq!(response_text(head).await, "");
2791
2792        let _ = std::fs::remove_dir_all(dir);
2793    }
2794
2795    #[tokio::test]
2796    async fn proc_du_and_df_require_read_token_when_enabled() {
2797        let (mut core, dir) = test_core("proc-du-df-read-token");
2798        core.tokens.read = auth::NonEmptyBytes::new(b"reader".to_vec());
2799        let state = Arc::new(core);
2800        let headers = HeaderMap::new();
2801
2802        let du = proc_du(
2803            State(server_state_for_tests(state.clone())),
2804            Method::GET,
2805            headers.clone(),
2806        )
2807        .await;
2808        assert_eq!(du.status(), StatusCode::UNAUTHORIZED);
2809
2810        let df = proc_df(
2811            State(server_state_for_tests(state.clone())),
2812            Method::GET,
2813            headers,
2814        )
2815        .await;
2816        assert_eq!(df.status(), StatusCode::UNAUTHORIZED);
2817
2818        let mut auth_headers = HeaderMap::new();
2819        let auth_value =
2820            HeaderValue::from_str(&format!("{} {}", "Bearer", "reader")).expect("valid test auth");
2821        auth_headers.insert(header::AUTHORIZATION, auth_value);
2822        let authorized = proc_df(
2823            State(server_state_for_tests(state)),
2824            Method::GET,
2825            auth_headers,
2826        )
2827        .await;
2828        assert_eq!(authorized.status(), StatusCode::OK);
2829
2830        let _ = std::fs::remove_dir_all(dir);
2831    }
2832
2833    #[tokio::test]
2834    async fn proc_df_world_count_tracks_durable_put_and_delete() {
2835        let (core, dir) = test_core("proc-df-world-count");
2836        let headers = HeaderMap::new();
2837
2838        let put = unwrap_response(
2839            execute_put(
2840                headers.clone(),
2841                Bytes::from_static(b"x"),
2842                auth::Tier::Write,
2843                world_path("home/count"),
2844                &core,
2845                &TraceCtx::disabled(),
2846            )
2847            .await,
2848        );
2849        assert_eq!(put.status(), StatusCode::CREATED);
2850
2851        let state = Arc::new(core);
2852        let server_state = server_state_for_tests(state.clone());
2853        let before = proc_df(State(server_state.clone()), Method::GET, headers.clone()).await;
2854        assert!(response_text(before)
2855            .await
2856            .contains("worlds\t1\tunlimited\tunlimited\n"));
2857
2858        let delete = unwrap_response(
2859            execute_delete(
2860                headers.clone(),
2861                auth::Tier::Approve,
2862                world_path("home/count"),
2863                &server_state,
2864                &TraceCtx::disabled(),
2865            )
2866            .await,
2867        );
2868        assert_eq!(delete.status(), StatusCode::NO_CONTENT);
2869
2870        let after = proc_df(State(server_state), Method::GET, headers).await;
2871        let after_body = response_text(after).await;
2872        assert!(after_body.contains("storage\t0\tunlimited\tunlimited\n"));
2873        assert!(after_body.contains("worlds\t0\tunlimited\tunlimited\n"));
2874
2875        let _ = std::fs::remove_dir_all(dir);
2876    }
2877
2878    #[tokio::test]
2879    async fn proc_pool_emits_metrics_with_type_labels() {
2880        // Warm the cache via a PUT + GET, then assert the metrics
2881        // body has the right counter / snapshot labels and tracks
2882        // hits + misses correctly. After a DELETE the
2883        // `ledger_writer_inits` counter must bump from 0 to 1
2884        // (lazy-init fired exactly once).
2885        let (core, dir) = test_core("proc-pool-metrics");
2886        let headers = HeaderMap::new();
2887
2888        let put = unwrap_response(
2889            execute_put(
2890                headers.clone(),
2891                Bytes::from_static(b"hello"),
2892                auth::Tier::Write,
2893                world_path("home/m"),
2894                &core,
2895                &TraceCtx::disabled(),
2896            )
2897            .await,
2898        );
2899        assert_eq!(put.status(), StatusCode::CREATED);
2900
2901        // First GET = miss (Phase 3); second GET = hit (Phase 1).
2902        for _ in 0..2 {
2903            let get = unwrap_response(
2904                execute_get(
2905                    headers.clone(),
2906                    auth::Tier::Read,
2907                    world_path("home/m"),
2908                    &core,
2909                    &TraceCtx::disabled(),
2910                )
2911                .await,
2912            );
2913            assert_eq!(get.status(), StatusCode::OK);
2914        }
2915
2916        let state = Arc::new(core);
2917        let server_state = server_state_for_tests(state.clone());
2918        let resp = proc_pool(State(server_state.clone()), Method::GET, headers.clone()).await;
2919        let body = response_text(resp).await;
2920
2921        assert!(body.contains("read_cache_entries 1 snapshot\n"));
2922        assert!(body.contains("read_cache_tombstones 0 snapshot\n"));
2923        assert!(body.contains("read_cache_hits 1 counter\n"));
2924        assert!(body.contains("read_cache_misses 1 counter\n"));
2925        assert!(body.contains("read_cache_capped 0 counter\n"));
2926        assert!(body.contains("read_cache_open_fails 0 counter\n"));
2927        assert!(body.contains("read_cache_max_entries "));
2928        // No DELETE issued yet -- ledger writer never lazy-inited.
2929        assert!(body.contains("ledger_writer_inits 0 counter\n"));
2930
2931        // After a DELETE, the lazy-init fires exactly once
2932        // (Codex P3: counter not snapshot).
2933        let _ = unwrap_response(
2934            execute_delete(
2935                headers.clone(),
2936                auth::Tier::Approve,
2937                world_path("home/m"),
2938                &server_state,
2939                &TraceCtx::disabled(),
2940            )
2941            .await,
2942        );
2943        let resp2 = proc_pool(State(server_state), Method::GET, headers).await;
2944        let body2 = response_text(resp2).await;
2945        assert!(
2946            body2.contains("ledger_writer_inits 1 counter\n"),
2947            "expected counter to bump to 1 after first DELETE; body=\n{body2}"
2948        );
2949
2950        let _ = std::fs::remove_dir_all(dir);
2951    }
2952
2953    #[tokio::test]
2954    async fn proc_pool_requires_read_token_when_enabled() {
2955        // Codex P3 sub-finding: /proc/pool exposes read-cache and
2956        // ledger writer internals -- match the auth-deny coverage of
2957        // /proc/du and /proc/df. With a read token configured, an
2958        // unauthenticated GET must return 401, not leak metrics.
2959        let (mut core, dir) = test_core("proc-pool-read-token");
2960        core.tokens.read = auth::NonEmptyBytes::new(b"reader".to_vec());
2961        let state = Arc::new(core);
2962        let headers = HeaderMap::new();
2963
2964        let resp = proc_pool(State(server_state_for_tests(state)), Method::GET, headers).await;
2965        assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
2966
2967        let _ = std::fs::remove_dir_all(dir);
2968    }
2969
2970    #[cfg(feature = "multi-thread")]
2971    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
2972    async fn concurrent_first_deletes_increment_world_count_exactly_once() {
2973        // Bug 16 race coverage. Three concurrent DELETEs on a fresh
2974        // Core (delete_ledger_created starts false). The ledger gets
2975        // created on the first DELETE that wins the
2976        // `swap(true, AcqRel)` edge -- exactly one of them sees
2977        // was_first=true and bumps `durable_world_count`. The other
2978        // two see the swap-already-true and skip the bump.
2979        //
2980        // Setup: PUT three distinct worlds (durable_world_count = 3).
2981        // Then run three concurrent DELETEs in parallel.
2982        // Final state: durable_world_count = 3 + 1[ledger creation,
2983        // exactly once] - 3[three deletes succeed] = 1.
2984        //
2985        // Without Bug 16's `swap` ordering, two or all three of the
2986        // racing DELETEs would each independently observe
2987        // delete_ledger_created==false (via world_exists_blocking)
2988        // and each bump the counter, leading to drift. With the
2989        // atomic swap, only the unique false->true transition bumps.
2990        let (core, dir) = test_core("concurrent-first-deletes");
2991        let headers = HeaderMap::new();
2992        for w in ["home/a", "home/b", "home/c"] {
2993            let put = unwrap_response(
2994                execute_put(
2995                    headers.clone(),
2996                    Bytes::from_static(b"x"),
2997                    auth::Tier::Write,
2998                    world_path(w),
2999                    &core,
3000                    &TraceCtx::disabled(),
3001                )
3002                .await,
3003            );
3004            assert_eq!(put.status(), StatusCode::CREATED);
3005        }
3006        assert_eq!(core.durable_world_count.load(Ordering::Relaxed), 3);
3007        assert!(!core.delete_ledger_created.load(Ordering::Relaxed));
3008
3009        let state = Arc::new(core);
3010        let server_state = server_state_for_tests(state.clone());
3011        let s1 = server_state.clone();
3012        let s2 = server_state.clone();
3013        let s3 = server_state.clone();
3014        let h1 = headers.clone();
3015        let h2 = headers.clone();
3016        let h3 = headers.clone();
3017        let trace1 = TraceCtx::disabled();
3018        let trace2 = TraceCtx::disabled();
3019        let trace3 = TraceCtx::disabled();
3020        let (r1, r2, r3) = tokio::join!(
3021            execute_delete(h1, auth::Tier::Approve, world_path("home/a"), &s1, &trace1),
3022            execute_delete(h2, auth::Tier::Approve, world_path("home/b"), &s2, &trace2),
3023            execute_delete(h3, auth::Tier::Approve, world_path("home/c"), &s3, &trace3),
3024        );
3025        assert_eq!(unwrap_response(r1).status(), StatusCode::NO_CONTENT);
3026        assert_eq!(unwrap_response(r2).status(), StatusCode::NO_CONTENT);
3027        assert_eq!(unwrap_response(r3).status(), StatusCode::NO_CONTENT);
3028
3029        // 3 (original) + 1 (ledger creation, exactly once) - 3 (three
3030        // deletes) = 1.
3031        assert_eq!(state.durable_world_count.load(Ordering::Relaxed), 1);
3032        assert!(state.delete_ledger_created.load(Ordering::Relaxed));
3033
3034        let _ = std::fs::remove_dir_all(dir);
3035    }
3036
3037    async fn response_text(resp: Response) -> String {
3038        let bytes = axum::body::to_bytes(resp.into_body(), usize::MAX)
3039            .await
3040            .unwrap();
3041        String::from_utf8(bytes.to_vec()).unwrap()
3042    }
3043
3044    // ── pipeline route-level coverage (PR 4b/4c) ───────────────
3045    //
3046    // The white-box tests above call `execute_get` / `execute_head`
3047    // / `execute_put` / `execute_post` / `execute_delete` directly
3048    // (renamed mechanically from `handle_*` in PR 4c) -- they cover
3049    // verb-handler logic in isolation. The tests below call
3050    // `pipeline::run` with the same Core fixture, exercising the
3051    // `world_handler -> pipeline::run -> handler::execute` route that
3052    // every real request now takes. Without these, a bug in the
3053    // pipeline wiring (path canonicalization, auth threading,
3054    // dispatch, response construction in the handler) would not be
3055    // caught by white-box tests; only e2e_blackbox would surface it.
3056
3057    #[tokio::test]
3058    async fn pipeline_get_existing_world_returns_200_with_body() {
3059        let (core, dir) = test_core("pipeline-get-200");
3060        core.write_world("home/hello", b"hello world", "text/plain", &[])
3061            .unwrap();
3062        let core = Arc::new(core);
3063        let state = server_state_for_tests(core.clone());
3064
3065        let resp = pipeline::run(
3066            Method::GET,
3067            "/home/hello".to_string(),
3068            HeaderMap::new(),
3069            Bytes::new(),
3070            &state,
3071            42,
3072        )
3073        .await;
3074
3075        assert_eq!(resp.status(), StatusCode::OK);
3076        assert_eq!(
3077            resp.headers()
3078                .get(header::CONTENT_TYPE)
3079                .and_then(|v| v.to_str().ok()),
3080            Some("text/plain")
3081        );
3082        assert_eq!(response_text(resp).await, "hello world");
3083
3084        let _ = std::fs::remove_dir_all(dir);
3085    }
3086
3087    #[tokio::test]
3088    async fn pipeline_head_existing_world_returns_headers_no_body() {
3089        let (core, dir) = test_core("pipeline-head-200");
3090        core.write_world("home/hello", b"hello world", "text/plain", &[])
3091            .unwrap();
3092        let core = Arc::new(core);
3093        let state = server_state_for_tests(core.clone());
3094
3095        let resp = pipeline::run(
3096            Method::HEAD,
3097            "/home/hello".to_string(),
3098            HeaderMap::new(),
3099            Bytes::new(),
3100            &state,
3101            43,
3102        )
3103        .await;
3104
3105        assert_eq!(resp.status(), StatusCode::OK);
3106        assert_eq!(
3107            resp.headers()
3108                .get(header::CONTENT_LENGTH)
3109                .and_then(|v| v.to_str().ok()),
3110            Some("11")
3111        );
3112        // HEAD body must be empty even though Content-Length says 11.
3113        assert_eq!(response_text(resp).await, "");
3114
3115        let _ = std::fs::remove_dir_all(dir);
3116    }
3117
3118    #[tokio::test]
3119    async fn pipeline_get_nonexistent_world_returns_404() {
3120        let (core, dir) = test_core("pipeline-get-404");
3121        let core = Arc::new(core);
3122        let state = server_state_for_tests(core.clone());
3123
3124        let resp = pipeline::run(
3125            Method::GET,
3126            "/home/missing".to_string(),
3127            HeaderMap::new(),
3128            Bytes::new(),
3129            &state,
3130            44,
3131        )
3132        .await;
3133
3134        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
3135
3136        let _ = std::fs::remove_dir_all(dir);
3137    }
3138
3139    #[tokio::test]
3140    async fn pipeline_get_invalid_dot_segment_returns_400() {
3141        let (core, dir) = test_core("pipeline-get-400");
3142        let core = Arc::new(core);
3143        let state = server_state_for_tests(core.clone());
3144
3145        let resp = pipeline::run(
3146            Method::GET,
3147            "/home/../etc/secret".to_string(),
3148            HeaderMap::new(),
3149            Bytes::new(),
3150            &state,
3151            45,
3152        )
3153        .await;
3154
3155        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
3156
3157        let _ = std::fs::remove_dir_all(dir);
3158    }
3159
3160    #[tokio::test]
3161    async fn pipeline_get_with_read_token_required_rejects_anon() {
3162        let (mut core, dir) = test_core("pipeline-get-401");
3163        core.tokens.read = auth::NonEmptyBytes::new(b"reader".to_vec());
3164        core.write_world("home/secret", b"shhh", "text/plain", &[])
3165            .unwrap();
3166        let core = Arc::new(core);
3167        let state = server_state_for_tests(core.clone());
3168
3169        let resp = pipeline::run(
3170            Method::GET,
3171            "/home/secret".to_string(),
3172            HeaderMap::new(), // no Authorization header
3173            Bytes::new(),
3174            &state,
3175            46,
3176        )
3177        .await;
3178
3179        assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
3180
3181        let _ = std::fs::remove_dir_all(dir);
3182    }
3183
3184    /// World-handler glue test (1/2): the full extractor chain
3185    /// (`State` + `Extension<RequestId>` + `Method` + `AxPath` +
3186    /// `HeaderMap` + `Bytes`) wires up correctly and `world_handler`
3187    /// routes GET to the pipeline. Goes through `tower::oneshot` so
3188    /// the `add_server_response_headers` middleware fires too -- that
3189    /// way we can assert the unified `x-request-id` header lands on
3190    /// the response.
3191    #[tokio::test]
3192    async fn world_handler_get_routes_through_pipeline() {
3193        use axum::body::Body;
3194        use axum::http::Request as HttpRequest;
3195        use tower::ServiceExt;
3196
3197        let (core, dir) = test_core("world-handler-get");
3198        core.write_world("home/hello", b"hello world", "text/plain", &[])
3199            .unwrap();
3200        let core = Arc::new(core);
3201        let state = server_state_for_tests(core.clone());
3202
3203        let app = Router::new()
3204            .route("/*world", any(world_handler))
3205            .layer(axum::middleware::from_fn_with_state(
3206                state.clone(),
3207                add_server_response_headers,
3208            ))
3209            .with_state(state);
3210
3211        let req = HttpRequest::builder()
3212            .method("GET")
3213            .uri("/home/hello")
3214            .body(Body::empty())
3215            .unwrap();
3216        let resp = app.oneshot(req).await.unwrap();
3217
3218        assert_eq!(resp.status(), StatusCode::OK);
3219        // Middleware stamps x-request-id; pipeline trace uses the
3220        // same id (`pipeline::run` reads RequestId from extensions
3221        // rather than allocating its own). The fact that this
3222        // header is present on the response is what the unification
3223        // fix guarantees -- without it, the bug would slip through
3224        // again silently.
3225        let req_id_header = resp
3226            .headers()
3227            .get("x-request-id")
3228            .and_then(|v| v.to_str().ok())
3229            .unwrap_or("missing");
3230        assert!(
3231            req_id_header.parse::<u64>().is_ok(),
3232            "x-request-id should be a numeric id, got {req_id_header:?}"
3233        );
3234        assert_eq!(response_text(resp).await, "hello world");
3235
3236        let _ = std::fs::remove_dir_all(dir);
3237    }
3238
3239    /// World-handler glue test (2/2): the same extractor chain works
3240    /// for HEAD. Validates the `if matches!(method, Method::GET |
3241    /// Method::HEAD)` short-circuit covers HEAD too, not just GET.
3242    #[tokio::test]
3243    async fn world_handler_head_routes_through_pipeline() {
3244        use axum::body::Body;
3245        use axum::http::Request as HttpRequest;
3246        use tower::ServiceExt;
3247
3248        let (core, dir) = test_core("world-handler-head");
3249        core.write_world("home/hello", b"hello world", "text/plain", &[])
3250            .unwrap();
3251        let core = Arc::new(core);
3252        let state = server_state_for_tests(core.clone());
3253
3254        let app = Router::new()
3255            .route("/*world", any(world_handler))
3256            .layer(axum::middleware::from_fn_with_state(
3257                state.clone(),
3258                add_server_response_headers,
3259            ))
3260            .with_state(state);
3261
3262        let req = HttpRequest::builder()
3263            .method("HEAD")
3264            .uri("/home/hello")
3265            .body(Body::empty())
3266            .unwrap();
3267        let resp = app.oneshot(req).await.unwrap();
3268
3269        assert_eq!(resp.status(), StatusCode::OK);
3270        assert_eq!(
3271            resp.headers()
3272                .get(header::CONTENT_LENGTH)
3273                .and_then(|v| v.to_str().ok()),
3274            Some("11"),
3275        );
3276        assert!(resp.headers().get("x-request-id").is_some());
3277        // HEAD body must be empty even though Content-Length says 11.
3278        assert_eq!(response_text(resp).await, "");
3279
3280        let _ = std::fs::remove_dir_all(dir);
3281    }
3282
3283    /// 304 path: `If-None-Match: <current-etag>` short-circuits
3284    /// inside `execute_get` to `hs::not_modified`, returning
3285    /// `Phase::ExecutedRead(304)`.
3286    #[tokio::test]
3287    async fn pipeline_get_if_none_match_returns_304() {
3288        let (core, dir) = test_core("pipeline-304");
3289        core.write_world("home/cached", b"cached body", "text/plain", &[])
3290            .unwrap();
3291        let core = Arc::new(core);
3292        let state = server_state_for_tests(core.clone());
3293
3294        // First GET to discover the current etag.
3295        let first = pipeline::run(
3296            Method::GET,
3297            "/home/cached".to_string(),
3298            HeaderMap::new(),
3299            Bytes::new(),
3300            &state,
3301            100,
3302        )
3303        .await;
3304        let etag = first
3305            .headers()
3306            .get(header::ETAG)
3307            .and_then(|v| v.to_str().ok())
3308            .expect("first GET must return an ETag header")
3309            .to_string();
3310
3311        let mut headers = HeaderMap::new();
3312        headers.insert(header::IF_NONE_MATCH, HeaderValue::from_str(&etag).unwrap());
3313        let resp = pipeline::run(
3314            Method::GET,
3315            "/home/cached".to_string(),
3316            headers,
3317            Bytes::new(),
3318            &state,
3319            101,
3320        )
3321        .await;
3322
3323        assert_eq!(resp.status(), StatusCode::NOT_MODIFIED);
3324        // 304 bodies must be empty.
3325        assert_eq!(response_text(resp).await, "");
3326
3327        let _ = std::fs::remove_dir_all(dir);
3328    }
3329
3330    /// 416 path: `Range: bytes=999-` against a 3-byte world is out
3331    /// of range. `effective_range` returns `Err(())` and
3332    /// `execute_get` produces `Phase::Error { reason:
3333    /// RangeNotSatisfiable }`. We assert BOTH the surfaced HTTP
3334    /// status (via `pipeline::run`) AND the structured reason
3335    /// variant (via direct `handler::execute`) so the 12th
3336    /// `ErrorReason` variant is verified to fire on a real
3337    /// read-path code path. Without the second assertion, a
3338    /// regression that emits the right status but the wrong reason
3339    /// would slip through and quietly poison trace / metrics.
3340    #[tokio::test]
3341    async fn pipeline_get_out_of_range_returns_416_with_reason() {
3342        let (core, dir) = test_core("pipeline-416");
3343        core.write_world("home/short", b"abc", "text/plain", &[])
3344            .unwrap();
3345        let core = Arc::new(core);
3346        let state = server_state_for_tests(core.clone());
3347
3348        // 1) Production path: pipeline::run surfaces 416 to the wire.
3349        let mut headers = HeaderMap::new();
3350        headers.insert(header::RANGE, HeaderValue::from_static("bytes=999-"));
3351        let resp = pipeline::run(
3352            Method::GET,
3353            "/home/short".to_string(),
3354            headers,
3355            Bytes::new(),
3356            &state,
3357            200,
3358        )
3359        .await;
3360        assert_eq!(resp.status(), StatusCode::RANGE_NOT_SATISFIABLE);
3361
3362        // 2) Internal phase: handler::execute returns the explicit
3363        // Phase::Error{RangeNotSatisfiable} variant.
3364        let mut headers2 = HeaderMap::new();
3365        headers2.insert(header::RANGE, HeaderValue::from_static("bytes=999-"));
3366        let phase = crate::handler::execute(
3367            Verb::Get,
3368            headers2,
3369            Bytes::new(),
3370            auth::Tier::Anon,
3371            world_path("home/short"),
3372            &state,
3373            &TraceCtx::disabled(),
3374        )
3375        .await;
3376        match phase {
3377            Phase::Error {
3378                reason: ErrorReason::RangeNotSatisfiable,
3379                resp,
3380            } => {
3381                assert_eq!(resp.status(), StatusCode::RANGE_NOT_SATISFIABLE);
3382            }
3383            _ => panic!("expected Phase::Error{{RangeNotSatisfiable}}"),
3384        }
3385
3386        let _ = std::fs::remove_dir_all(dir);
3387    }
3388
3389    #[tokio::test]
3390    async fn pipeline_get_range_returns_206_with_chunk() {
3391        let (core, dir) = test_core("pipeline-get-range");
3392        core.write_world("home/range", b"abcdef", "text/plain", &[])
3393            .unwrap();
3394        let core = Arc::new(core);
3395        let state = server_state_for_tests(core.clone());
3396
3397        let mut headers = HeaderMap::new();
3398        headers.insert(header::RANGE, HeaderValue::from_static("bytes=1-3"));
3399
3400        let resp = pipeline::run(
3401            Method::GET,
3402            "/home/range".to_string(),
3403            headers,
3404            Bytes::new(),
3405            &state,
3406            47,
3407        )
3408        .await;
3409
3410        assert_eq!(resp.status(), StatusCode::PARTIAL_CONTENT);
3411        assert_eq!(
3412            resp.headers()
3413                .get(header::CONTENT_RANGE)
3414                .and_then(|v| v.to_str().ok()),
3415            Some("bytes 1-3/6")
3416        );
3417        assert_eq!(response_text(resp).await, "bcd");
3418
3419        let _ = std::fs::remove_dir_all(dir);
3420    }
3421
3422    fn test_core(label: &str) -> (Core, PathBuf) {
3423        let mut dir = std::env::temp_dir();
3424        let nanos = std::time::SystemTime::now()
3425            .duration_since(std::time::UNIX_EPOCH)
3426            .unwrap()
3427            .as_nanos();
3428        dir.push(format!(
3429            "elastik-core-{label}-{}-{nanos}",
3430            std::process::id()
3431        ));
3432        std::fs::create_dir_all(&dir).unwrap();
3433        (
3434            {
3435                let (events, _) = broadcast::channel(16);
3436                Core {
3437                    data: dir.clone(),
3438                    tokens: auth::Tokens {
3439                        read: None,
3440                        write: None,
3441                        approve: None,
3442                    },
3443                    hmac_key: b"test-key".to_vec(),
3444                    mem: Arc::new(store::MemoryStore::new()),
3445                    max_world_bytes: DEFAULT_MAX_WORLD_BYTES,
3446                    max_memory_bytes: DEFAULT_MAX_MEMORY_BYTES,
3447                    max_storage_bytes: None,
3448                    storage_body_bytes: Arc::new(AtomicUsize::new(0)),
3449                    durable_world_count: Arc::new(AtomicUsize::new(0)),
3450                    delete_ledger_created: Arc::new(AtomicBool::new(false)),
3451                    events,
3452                    listen_slots: Arc::new(Semaphore::new(DEFAULT_MAX_LISTEN_CONNECTIONS)),
3453                    listen_replay_max: DEFAULT_LISTEN_REPLAY_MAX,
3454                    event_log: Arc::new(StdMutex::new(VecDeque::with_capacity(
3455                        DEFAULT_LISTEN_REPLAY_MAX,
3456                    ))),
3457                    shutdown: watch::channel(false).1,
3458                    next_event: crate::state::new_event_counter(),
3459                    world_locks: Arc::new(DashMap::new()),
3460                    ledger: Arc::new(crate::ledger::LedgerWriter::new()),
3461                    read_cache: Arc::new(crate::read_cache::ReadCache::new(
3462                        crate::read_cache::DEFAULT_READ_CACHE_MAX_ENTRIES,
3463                    )),
3464                }
3465            },
3466            dir,
3467        )
3468    }
3469
3470    #[tokio::test]
3471    async fn write_permit_is_bound_to_one_world() {
3472        struct NoopTrace;
3473        impl crate::world_ops::WriteTraceHooks for NoopTrace {}
3474
3475        let (core, dir) = test_core("permit-bound");
3476        let world = world_path("home/permit-a");
3477        let permit = crate::world_ops::authorize_write(&world, auth::Tier::Write)
3478            .expect("write token tier should authorize home writes");
3479        let req = crate::world_ops::ReplaceRequest {
3480            body: Bytes::from_static(b"right-door"),
3481            content_type: "text/plain; charset=utf-8".to_owned(),
3482            headers: Vec::new(),
3483            preconditions: et::Preconditions::default(),
3484        };
3485
3486        crate::world_ops::replace_write(&core, &permit, req, &NoopTrace)
3487            .await
3488            .expect("permit writes only its bound world");
3489
3490        assert_eq!(
3491            core.read_world("home/permit-a").unwrap().unwrap().body,
3492            b"right-door"
3493        );
3494        assert!(core.read_world("home/permit-b").unwrap().is_none());
3495
3496        let _ = std::fs::remove_dir_all(dir);
3497    }
3498
3499    #[test]
3500    fn write_permit_preserves_path_based_approve_gate() {
3501        assert!(matches!(
3502            crate::world_ops::authorize_write(&world_path("etc/config"), auth::Tier::Write),
3503            Err(crate::world_ops::WriteError::Auth(AuthGate::WriteApprove))
3504        ));
3505        assert!(
3506            crate::world_ops::authorize_write(&world_path("etc/config"), auth::Tier::Approve)
3507                .is_ok()
3508        );
3509        assert!(
3510            crate::world_ops::authorize_write(&world_path("home/config"), auth::Tier::Write)
3511                .is_ok()
3512        );
3513    }
3514}