1mod audit;
84mod auth;
85#[cfg(test)]
86#[cfg(feature = "coap")]
87#[path = "server/coap.rs"]
88mod coap;
89#[cfg(test)]
90#[cfg(feature = "coap")]
91#[path = "server/coap_errors.rs"]
92mod coap_errors;
93#[cfg(test)]
94mod config;
95mod defaults;
96mod delete_ops;
97mod engine;
98mod engine_introspection;
99mod engine_ops;
100mod engine_trace;
101mod engine_types;
102mod etag;
103mod event;
104#[cfg(test)]
105#[path = "server/handler.rs"]
106mod handler;
107#[cfg(test)]
108mod http_range;
109#[cfg(test)]
110mod http_semantics;
111mod ledger;
112#[cfg(test)]
113#[path = "server/listen.rs"]
114mod listen;
115#[cfg(test)]
116#[path = "server/middleware.rs"]
117mod middleware;
118mod path;
119#[cfg(test)]
120#[path = "server/pipeline.rs"]
121mod pipeline;
122#[cfg(test)]
123#[path = "server/proc.rs"]
124mod proc;
125mod read_cache;
126#[cfg(test)]
127#[path = "server/response.rs"]
128mod response;
129#[cfg(test)]
130#[path = "server/route.rs"]
131mod route;
132#[cfg(test)]
133mod server;
134mod state;
135mod storage_class;
136mod store;
137mod world;
138mod world_ops;
139
140#[cfg(test)]
144pub(crate) use crate::path::*;
145#[cfg(test)]
146pub(crate) use crate::pipeline::*;
147#[cfg(test)]
148pub(crate) use crate::proc::*;
149#[cfg(test)]
150pub(crate) use crate::response::*;
151pub(crate) use crate::state::*;
152pub(crate) use crate::storage_class::*;
153#[cfg(feature = "unstable-engine")]
154pub use auth::AuthGate;
155#[cfg(not(feature = "unstable-engine"))]
156pub(crate) use auth::AuthGate;
157#[cfg(all(feature = "unstable-engine", feature = "coap"))]
158#[doc(hidden)]
159pub use engine::ShutdownToken;
160#[cfg(feature = "unstable-engine")]
161pub use engine::{Engine, EngineBuildError, EngineBuilder, EngineError};
162#[cfg(feature = "unstable-engine")]
163pub use engine_introspection::{
164 AuditBroken, AuditValid, AuditVerify, DfSnapshot, InvalidProcPath, PoolSnapshot, ProcEndpoint,
165 ValidatedProcPath, WorldUsage,
166};
167#[cfg(feature = "unstable-engine")]
168pub use engine_trace::{DeleteMetadata, EngineDeleteTraceHooks, EngineWriteTraceHooks};
169#[cfg(feature = "unstable-engine")]
170pub use engine_types::{
171 AccessTier, ChangeEvent, EmptyKeyError, EngineSubscription, EtagMatcher, InvalidWorldPath,
172 Preconditions, ReadResult, Representation, SecretBytes, SubscribePattern,
173 SubscriptionRecvError, ValidatedWorldPath, WriteKind, WriteResult,
174};
175
176use std::path::Path;
177use std::time::Duration;
178
179#[cfg(test)]
180pub(crate) use crate::defaults::{
181 DEFAULT_LISTEN_REPLAY_MAX, DEFAULT_MAX_LISTEN_CONNECTIONS, DEFAULT_MAX_MEMORY_BYTES,
182 DEFAULT_MAX_WORLD_BYTES,
183};
184
185#[cfg(test)]
188pub(crate) const VERSION: &str = env!("CARGO_PKG_VERSION");
189#[cfg(test)]
190pub(crate) const WORLD_ALLOW: &str = "GET, HEAD, PUT, POST, DELETE, OPTIONS";
191
192fn acquire_data_root_writer_lock(data: &Path) -> rusqlite::Result<rusqlite::Connection> {
193 let c = rusqlite::Connection::open(data.join(".elastik-writer-lock.sqlite3"))?;
194 c.busy_timeout(Duration::from_millis(0))?;
195 c.execute_batch(
196 r#"
197 PRAGMA journal_mode=WAL;
198 CREATE TABLE IF NOT EXISTS writer_lock(
199 id INTEGER PRIMARY KEY CHECK(id=1),
200 holder TEXT NOT NULL DEFAULT ''
201 );
202 INSERT OR IGNORE INTO writer_lock(id, holder) VALUES(1, '');
203 BEGIN IMMEDIATE;
204 "#,
205 )?;
206 c.execute(
207 "UPDATE writer_lock SET holder=?1 WHERE id=1",
208 [std::process::id().to_string()],
209 )?;
210 Ok(c)
211}
212
213pub(crate) fn can_write(world_name: &str, tier: auth::Tier) -> bool {
223 let needs_approve = needs_write_approve(world_name);
227 match tier {
228 auth::Tier::Anon => false,
229 auth::Tier::Read => false,
230 auth::Tier::Write => !needs_approve,
231 auth::Tier::Approve => true,
232 }
233}
234
235pub(crate) fn needs_write_approve(world_name: &str) -> bool {
240 exact_or_child(world_name, "lib")
241 || exact_or_child(world_name, "etc")
242 || exact_or_child(world_name, "boot")
243 || exact_or_child(world_name, "usr")
244 || exact_or_child(world_name, "var/log")
245}
246
247pub(crate) fn can_delete(tier: auth::Tier) -> bool {
248 matches!(tier, auth::Tier::Approve)
249}
250
251pub(crate) fn exact_or_child(world_name: &str, prefix: &str) -> bool {
273 world_name == prefix
274 || world_name
275 .strip_prefix(prefix)
276 .is_some_and(|rest| rest.starts_with('/'))
277}
278
279pub(crate) fn can_read(core: &Core, tier: auth::Tier) -> bool {
280 !core.tokens.read_required()
281 || matches!(
282 tier,
283 auth::Tier::Read | auth::Tier::Write | auth::Tier::Approve
284 )
285}
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290 #[cfg(feature = "coap")]
291 use crate::config::coap_bind_from_env;
292 use crate::config::{
293 env_nonzero_usize, env_optional_usize, hmac_key_from_env_value, listen_addr,
294 should_warn_public_read,
295 };
296 use crate::etag as et;
297 use crate::handler::{execute_delete, execute_get, execute_head, execute_post, execute_put};
298 use crate::http_semantics as hs;
299 use crate::middleware::{add_server_response_headers, stamp_core_response_headers};
300 use crate::route::world_handler;
301 use axum::body::Bytes;
302 use axum::extract::{Path as AxPath, State};
303 use axum::http::{header, HeaderMap, HeaderValue, Method, StatusCode};
304 use axum::response::Response;
305 use axum::routing::any;
306 use axum::Router;
307 use dashmap::DashMap;
308 use std::collections::VecDeque;
309 use std::net::IpAddr;
310 use std::path::PathBuf;
311 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
312 use std::sync::{Arc, Mutex as StdMutex, Mutex as TestMutex, OnceLock};
313 use tokio::sync::{broadcast, watch, Semaphore};
314
315 fn server_state_for_tests(core: Arc<Core>) -> crate::server::ServerState {
316 crate::server::ServerState::from_core_for_tests(core, DEFAULT_MAX_WORLD_BYTES)
317 }
318
319 fn unwrap_response(phase: Phase) -> Response {
328 match phase {
329 Phase::ExecutedRead(r) | Phase::CommittedWrite(r) | Phase::Done(r) => r,
330 Phase::Error { resp, .. } => resp,
331 Phase::Received { .. }
334 | Phase::Authenticated { .. }
335 | Phase::PathValidated { .. }
336 | Phase::Dispatched { .. } => {
337 panic!("execute_* returned a non-terminal Phase variant")
338 }
339 }
340 }
341
342 fn world_path(world: &str) -> crate::engine_types::ValidatedWorldPath {
343 crate::engine_types::ValidatedWorldPath::new(world).unwrap()
344 }
345
346 fn env_lock() -> &'static TestMutex<()> {
347 static LOCK: OnceLock<TestMutex<()>> = OnceLock::new();
348 LOCK.get_or_init(|| TestMutex::new(()))
349 }
350
351 #[cfg(feature = "coap")]
352 struct CoapEnvGuard {
353 host: Option<String>,
354 port: Option<String>,
355 }
356
357 #[cfg(feature = "coap")]
358 impl CoapEnvGuard {
359 fn capture() -> Self {
360 Self {
361 host: std::env::var("ELASTIK_COAP_HOST").ok(),
362 port: std::env::var("ELASTIK_COAP_PORT").ok(),
363 }
364 }
365 }
366
367 #[cfg(feature = "coap")]
368 impl Drop for CoapEnvGuard {
369 fn drop(&mut self) {
370 match &self.host {
371 Some(v) => std::env::set_var("ELASTIK_COAP_HOST", v),
372 None => std::env::remove_var("ELASTIK_COAP_HOST"),
373 }
374 match &self.port {
375 Some(v) => std::env::set_var("ELASTIK_COAP_PORT", v),
376 None => std::env::remove_var("ELASTIK_COAP_PORT"),
377 }
378 }
379 }
380
381 #[test]
382 fn hmac_key_requires_nonempty_semantic_content() {
383 assert!(hmac_key_from_env_value(None).is_none());
384 assert!(hmac_key_from_env_value(Some(String::new())).is_none());
385 assert!(hmac_key_from_env_value(Some(" \t\n".to_string())).is_none());
386 assert_eq!(
387 hmac_key_from_env_value(Some(" secret ".to_string())).unwrap(),
388 b" secret ".to_vec()
389 );
390 }
391
392 #[test]
393 fn resource_cap_env_zero_falls_back_to_default() {
394 let _guard = env_lock().lock().unwrap();
395 let key = format!("ELASTIK_TEST_ZERO_CAP_{}", std::process::id());
396 std::env::set_var(&key, "0");
397 assert_eq!(env_nonzero_usize(&key, 7), 7);
398 std::env::set_var(&key, "9");
399 assert_eq!(env_nonzero_usize(&key, 7), 9);
400 std::env::remove_var(&key);
401 }
402
403 #[test]
404 fn optional_storage_quota_zero_is_unlimited() {
405 let _guard = env_lock().lock().unwrap();
406 let key = format!("ELASTIK_TEST_STORAGE_CAP_{}", std::process::id());
407 std::env::remove_var(&key);
408 assert_eq!(env_optional_usize(&key), None);
409 std::env::set_var(&key, "");
410 assert_eq!(env_optional_usize(&key), None);
411 std::env::set_var(&key, " \t ");
412 assert_eq!(env_optional_usize(&key), None);
413 std::env::set_var(&key, "0");
414 assert_eq!(env_optional_usize(&key), None);
415 std::env::set_var(&key, "11");
416 assert_eq!(env_optional_usize(&key), Some(11));
417 std::env::set_var(&key, "10GB");
418 assert!(std::panic::catch_unwind(|| env_optional_usize(&key)).is_err());
419 std::env::remove_var(&key);
420 }
421
422 #[test]
423 fn data_root_writer_lock_is_exclusive() {
424 let dir =
425 std::env::temp_dir().join(format!("elastik-data-lock-test-{}", std::process::id()));
426 let _ = std::fs::remove_dir_all(&dir);
427 std::fs::create_dir_all(&dir).unwrap();
428
429 let first = acquire_data_root_writer_lock(&dir).unwrap();
430 assert!(acquire_data_root_writer_lock(&dir).is_err());
431 drop(first);
432 let second = acquire_data_root_writer_lock(&dir).unwrap();
433 drop(second);
434
435 let _ = std::fs::remove_dir_all(dir);
436 }
437
438 #[test]
439 fn sqlite_disk_full_maps_to_507() {
440 let err = rusqlite::Error::SqliteFailure(
441 rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_FULL),
442 None,
443 );
444 assert!(is_insufficient_storage_error(&err));
445
446 let resp = storage_error("test", err);
447 assert_eq!(resp.status(), StatusCode::INSUFFICIENT_STORAGE);
448 }
449
450 #[test]
451 fn sqlite_busy_and_locked_map_to_503_retry_after() {
452 for code in [rusqlite::ffi::SQLITE_BUSY, rusqlite::ffi::SQLITE_LOCKED] {
453 let err = rusqlite::Error::SqliteFailure(rusqlite::ffi::Error::new(code), None);
454 assert!(is_transient_storage_error(&err));
455
456 let resp = storage_error("test", err);
457 assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE);
458 assert_eq!(
459 resp.headers()
460 .get(header::RETRY_AFTER)
461 .and_then(|v| v.to_str().ok()),
462 Some("1")
463 );
464 }
465 }
466
467 #[test]
468 fn non_storage_sqlite_errors_stay_500() {
469 let err = rusqlite::Error::SqliteFailure(
470 rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_CORRUPT),
471 None,
472 );
473 assert!(!is_insufficient_storage_error(&err));
474
475 let resp = storage_error("test", err);
476 assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR);
477 }
478
479 #[tokio::test]
480 async fn durable_storage_quota_returns_507_without_writing() {
481 let (mut core, dir) = test_core("storage-quota");
482 core.max_storage_bytes = Some(4);
483 let headers = HeaderMap::new();
484
485 let first = unwrap_response(
486 execute_put(
487 headers.clone(),
488 Bytes::from_static(b"1234"),
489 auth::Tier::Write,
490 world_path("home/a"),
491 &core,
492 &TraceCtx::disabled(),
493 )
494 .await,
495 );
496 assert_eq!(first.status(), StatusCode::CREATED);
497
498 let over = unwrap_response(
499 execute_put(
500 headers.clone(),
501 Bytes::from_static(b"5"),
502 auth::Tier::Write,
503 world_path("home/b"),
504 &core,
505 &TraceCtx::disabled(),
506 )
507 .await,
508 );
509 assert_eq!(over.status(), StatusCode::INSUFFICIENT_STORAGE);
510 assert_eq!(over.headers().get("x-storage-usage").unwrap(), "4");
511 assert_eq!(over.headers().get("x-storage-quota").unwrap(), "4");
512 assert_eq!(over.headers().get("x-storage-needed").unwrap(), "1");
513 assert!(core.read_world("home/b").unwrap().is_none());
514
515 let append = unwrap_response(
516 execute_post(
517 headers.clone(),
518 Bytes::from_static(b"5"),
519 auth::Tier::Write,
520 world_path("home/a"),
521 &core,
522 &TraceCtx::disabled(),
523 )
524 .await,
525 );
526 assert_eq!(append.status(), StatusCode::INSUFFICIENT_STORAGE);
527 assert_eq!(append.headers().get("x-storage-usage").unwrap(), "4");
528 assert_eq!(append.headers().get("x-storage-quota").unwrap(), "4");
529 assert_eq!(append.headers().get("x-storage-needed").unwrap(), "1");
530 assert_eq!(
531 core.read_world("home/a").unwrap().unwrap().body,
532 b"1234".to_vec()
533 );
534
535 let _ = std::fs::remove_dir_all(dir);
536 }
537
538 #[tokio::test]
539 async fn concurrent_puts_to_distinct_worlds_do_not_overshoot_quota() {
540 let (mut core, dir) = test_core("quota-race");
549 let quota = 100;
550 core.max_storage_bytes = Some(quota);
551 let core = Arc::new(core);
552
553 let workers = 16;
554 let body_len = 12; let mut handles = Vec::with_capacity(workers);
556 for i in 0..workers {
557 let core = core.clone();
558 handles.push(tokio::spawn(async move {
559 let path = format!("home/race/{i}");
560 let body = Bytes::copy_from_slice(&vec![b'x'; body_len]);
561 unwrap_response(
562 execute_put(
563 HeaderMap::new(),
564 body,
565 auth::Tier::Write,
566 world_path(&path),
567 &core,
568 &TraceCtx::disabled(),
569 )
570 .await,
571 )
572 }));
573 }
574
575 let mut accepted: usize = 0;
576 for handle in handles {
577 let resp = handle.await.unwrap();
578 match resp.status() {
579 StatusCode::CREATED | StatusCode::OK => accepted += 1,
580 StatusCode::INSUFFICIENT_STORAGE => {}
581 other => panic!("unexpected status: {other}"),
582 }
583 }
584
585 let used = core.storage_body_bytes.load(Ordering::Relaxed);
586 let counted = accepted * body_len;
587 assert_eq!(used, counted, "counter must equal sum of accepted bodies");
588 assert!(
589 used <= quota,
590 "counter must never exceed quota: {used} > {quota}"
591 );
592
593 let _ = std::fs::remove_dir_all(dir);
594 }
595
596 #[tokio::test]
597 async fn concurrent_memory_puts_do_not_overshoot_max_memory_bytes() {
598 let (mut core, dir) = test_core("memory-quota-race");
607 let cap = 100;
608 core.max_memory_bytes = cap;
609 let core = Arc::new(core);
610
611 let workers = 16;
612 let body_len = 12; let mut handles = Vec::with_capacity(workers);
614 for i in 0..workers {
615 let core = core.clone();
616 handles.push(tokio::spawn(async move {
617 let path = format!("tmp/race/{i}");
618 let body = Bytes::copy_from_slice(&vec![b'm'; body_len]);
619 unwrap_response(
620 execute_put(
621 HeaderMap::new(),
622 body,
623 auth::Tier::Write,
624 world_path(&path),
625 &core,
626 &TraceCtx::disabled(),
627 )
628 .await,
629 )
630 }));
631 }
632
633 let mut accepted: usize = 0;
634 for handle in handles {
635 let resp = handle.await.unwrap();
636 match resp.status() {
637 StatusCode::CREATED | StatusCode::OK => accepted += 1,
638 StatusCode::PAYLOAD_TOO_LARGE => {}
639 other => panic!("unexpected status: {other}"),
640 }
641 }
642
643 let used = core.mem.total_bytes();
644 let counted = accepted * body_len;
645 assert_eq!(
646 used, counted,
647 "memory total must equal sum of accepted bodies"
648 );
649 assert!(
650 used <= cap,
651 "memory total must never exceed cap: {used} > {cap}"
652 );
653
654 let _ = std::fs::remove_dir_all(dir);
655 }
656
657 #[test]
658 fn var_log_requires_approve_token() {
659 assert!(!can_write("var/log", auth::Tier::Anon));
660 assert!(!can_write("var/log", auth::Tier::Read));
661 assert!(!can_write("var/log", auth::Tier::Write));
662 assert!(can_write("var/log", auth::Tier::Approve));
663 assert!(!can_write("var/log/deletes", auth::Tier::Anon));
664 assert!(!can_write("var/log/deletes", auth::Tier::Read));
665 assert!(!can_write("var/log/deletes", auth::Tier::Write));
666 assert!(can_write("var/log/deletes", auth::Tier::Approve));
667 }
668
669 #[test]
670 fn delete_requires_approve_token() {
671 assert!(!can_delete(auth::Tier::Anon));
672 assert!(!can_delete(auth::Tier::Read));
673 assert!(!can_delete(auth::Tier::Write));
674 assert!(can_delete(auth::Tier::Approve));
675 }
676
677 #[test]
678 fn listen_addr_brackets_ipv6_hosts() {
679 assert_eq!(listen_addr("127.0.0.1", 3105), "127.0.0.1:3105");
680 assert_eq!(listen_addr("0.0.0.0", 3105), "0.0.0.0:3105");
681 assert_eq!(listen_addr("::1", 3105), "[::1]:3105");
682 assert_eq!(listen_addr("localhost", 3105), "localhost:3105");
683 }
684
685 #[cfg(feature = "coap")]
686 #[test]
687 fn coap_bind_is_opt_in_by_port_env() {
688 let _lock = env_lock().lock().unwrap();
689 let _guard = CoapEnvGuard::capture();
690 std::env::remove_var("ELASTIK_COAP_HOST");
691 std::env::remove_var("ELASTIK_COAP_PORT");
692
693 assert_eq!(coap_bind_from_env(), None);
694
695 std::env::set_var("ELASTIK_COAP_HOST", "0.0.0.0");
696 assert_eq!(coap_bind_from_env(), None);
697
698 std::env::set_var("ELASTIK_COAP_PORT", "5683");
699 assert_eq!(coap_bind_from_env(), Some(("0.0.0.0".to_owned(), 5683)));
700
701 std::env::set_var("ELASTIK_COAP_HOST", "127.0.0.1");
702 std::env::set_var("ELASTIK_COAP_PORT", " ");
703 assert_eq!(coap_bind_from_env(), None);
704
705 std::env::set_var("ELASTIK_COAP_PORT", "not-a-port");
706 assert_eq!(coap_bind_from_env(), None);
707 }
708
709 #[test]
710 fn non_loopback_public_read_gets_warning_flag() {
711 let mut tokens = auth::Tokens {
712 read: None,
713 write: None,
714 approve: None,
715 };
716 assert!(!should_warn_public_read(
717 "127.0.0.1".parse::<IpAddr>().unwrap(),
718 tokens.read_required()
719 ));
720 assert!(should_warn_public_read(
721 "0.0.0.0".parse::<IpAddr>().unwrap(),
722 tokens.read_required()
723 ));
724
725 tokens.read = auth::NonEmptyBytes::new(b"reader".to_vec());
726 assert!(!should_warn_public_read(
727 "0.0.0.0".parse::<IpAddr>().unwrap(),
728 tokens.read_required()
729 ));
730 }
731
732 #[tokio::test]
733 async fn put_and_post_enforce_world_size_cap() {
734 let (mut core, dir) = test_core("world-size-cap");
735 core.max_world_bytes = 4;
736 let headers = HeaderMap::new();
737
738 let too_big = unwrap_response(
739 execute_put(
740 headers.clone(),
741 Bytes::from_static(b"12345"),
742 auth::Tier::Write,
743 world_path("home/too-big"),
744 &core,
745 &TraceCtx::disabled(),
746 )
747 .await,
748 );
749 assert_eq!(too_big.status(), StatusCode::PAYLOAD_TOO_LARGE);
750
751 let ok = unwrap_response(
752 execute_put(
753 headers.clone(),
754 Bytes::from_static(b"1234"),
755 auth::Tier::Write,
756 world_path("home/four"),
757 &core,
758 &TraceCtx::disabled(),
759 )
760 .await,
761 );
762 assert_eq!(ok.status(), StatusCode::CREATED);
763
764 let append = unwrap_response(
765 execute_post(
766 headers.clone(),
767 Bytes::from_static(b"5"),
768 auth::Tier::Write,
769 world_path("home/four"),
770 &core,
771 &TraceCtx::disabled(),
772 )
773 .await,
774 );
775 assert_eq!(append.status(), StatusCode::PAYLOAD_TOO_LARGE);
776
777 let _ = std::fs::remove_dir_all(dir);
778 }
779
780 #[tokio::test]
781 async fn memory_backend_enforces_total_quota() {
782 let (mut core, dir) = test_core("memory-quota");
783 core.max_memory_bytes = 4;
784 let headers = HeaderMap::new();
785
786 let first = unwrap_response(
787 execute_put(
788 headers.clone(),
789 Bytes::from_static(b"12"),
790 auth::Tier::Write,
791 world_path("tmp/a"),
792 &core,
793 &TraceCtx::disabled(),
794 )
795 .await,
796 );
797 assert_eq!(first.status(), StatusCode::CREATED);
798 let second = unwrap_response(
799 execute_put(
800 headers.clone(),
801 Bytes::from_static(b"34"),
802 auth::Tier::Write,
803 world_path("tmp/b"),
804 &core,
805 &TraceCtx::disabled(),
806 )
807 .await,
808 );
809 assert_eq!(second.status(), StatusCode::CREATED);
810 let third = unwrap_response(
811 execute_put(
812 headers.clone(),
813 Bytes::from_static(b"5"),
814 auth::Tier::Write,
815 world_path("tmp/c"),
816 &core,
817 &TraceCtx::disabled(),
818 )
819 .await,
820 );
821 assert_eq!(third.status(), StatusCode::PAYLOAD_TOO_LARGE);
822
823 let _ = std::fs::remove_dir_all(dir);
824 }
825
826 #[test]
827 fn system_namespace_roots_require_approve_even_if_called_directly() {
828 for name in ["lib", "etc", "boot", "usr"] {
829 assert!(!can_write(name, auth::Tier::Anon), "{name}");
830 assert!(!can_write(name, auth::Tier::Read), "{name}");
831 assert!(!can_write(name, auth::Tier::Write), "{name}");
832 assert!(can_write(name, auth::Tier::Approve), "{name}");
833 }
834 }
835
836 #[test]
837 fn non_log_var_still_accepts_auth_token() {
838 assert!(!can_write("var/cache/rag", auth::Tier::Anon));
839 assert!(!can_write("var/cache/rag", auth::Tier::Read));
840 assert!(can_write("var/cache/rag", auth::Tier::Write));
841 assert!(can_write("var/cache/rag", auth::Tier::Approve));
842 }
843
844 #[test]
845 fn read_token_is_optional_but_gates_reads_when_set() {
846 let (mut core, dir) = test_core("read-token");
847 assert!(can_read(&core, auth::Tier::Anon));
848
849 core.tokens.read = auth::NonEmptyBytes::new(b"reader".to_vec());
850 assert!(!can_read(&core, auth::Tier::Anon));
851 assert!(can_read(&core, auth::Tier::Read));
852 assert!(can_read(&core, auth::Tier::Write));
853 assert!(can_read(&core, auth::Tier::Approve));
854
855 let _ = std::fs::remove_dir_all(dir);
856 }
857
858 #[tokio::test]
859 async fn get_and_head_require_read_token_when_enabled() {
860 let (mut core, dir) = test_core("read-token-handlers");
861 core.write_world("home/private", b"secret", "text/plain", &[])
862 .unwrap();
863 core.tokens.read = auth::NonEmptyBytes::new(b"reader".to_vec());
864
865 let headers = HeaderMap::new();
866 let get_anon = unwrap_response(
867 execute_get(
868 headers.clone(),
869 auth::Tier::Anon,
870 world_path("home/private"),
871 &core,
872 &TraceCtx::disabled(),
873 )
874 .await,
875 );
876 assert_eq!(get_anon.status(), StatusCode::UNAUTHORIZED);
877 let head_reader = unwrap_response(
878 execute_head(
879 headers.clone(),
880 auth::Tier::Read,
881 world_path("home/private"),
882 &core,
883 &TraceCtx::disabled(),
884 )
885 .await,
886 );
887 assert_eq!(head_reader.status(), StatusCode::OK);
888
889 let _ = std::fs::remove_dir_all(dir);
890 }
891
892 #[test]
893 fn unauthorized_responses_advertise_bearer_challenge() {
894 let resp = unauthorized("read requires read token");
895 assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
896 assert_eq!(
897 resp.headers().get(header::WWW_AUTHENTICATE).unwrap(),
898 "Bearer realm=\"elastik\""
899 );
900 assert_eq!(
901 resp.headers().get(header::CONTENT_TYPE).unwrap(),
902 "text/plain; charset=utf-8"
903 );
904 }
905
906 #[test]
907 fn core_response_headers_are_core_owned() {
908 let mut headers = HeaderMap::new();
909 headers.insert(header::VARY, HeaderValue::from_static("*"));
910 headers.insert("x-request-id", HeaderValue::from_static("stale"));
911 headers.insert("x-elapsed-us", HeaderValue::from_static("999"));
912 headers.insert("x-content-type-options", HeaderValue::from_static("sniff"));
913
914 stamp_core_response_headers(42, 7, &mut headers);
915
916 assert_eq!(headers.get("x-request-id").unwrap(), "42");
917 assert_eq!(headers.get("x-elapsed-us").unwrap(), "7");
918 assert_eq!(headers.get(header::VARY).unwrap(), "Authorization");
919 assert_eq!(headers.get("x-content-type-options").unwrap(), "nosniff");
920 }
921
922 #[test]
923 fn poisoned_persisted_headers_are_not_replayed() {
924 let mut out = Vec::new();
925 hs::apply_meta_headers(
926 &[
927 (
928 "x-custom".to_owned(),
929 "evil\r\nset-cookie: admin=true".to_owned(),
930 ),
931 ("set-cookie".to_owned(), "sid=admin; Path=/".to_owned()),
932 ("clear-site-data".to_owned(), "\"cookies\"".to_owned()),
933 ("bad name".to_owned(), "ok".to_owned()),
934 ("x-safe".to_owned(), "ok".to_owned()),
935 ],
936 &mut out,
937 );
938
939 assert_eq!(out.len(), 1);
940 assert_eq!(out[0].0.as_str(), "x-safe");
941 assert_eq!(out[0].1, "ok");
942 }
943
944 #[test]
945 fn canonicalize_preserves_explicit_namespaces() {
946 assert_eq!(canonicalize_path("/home/tmp/foo"), "home/tmp/foo");
947 assert_eq!(canonicalize_path("/home/etc/foo"), "home/etc/foo");
948 assert_eq!(canonicalize_path("/tmp/foo"), "tmp/foo");
949 assert_eq!(canonicalize_path("/etc/foo"), "etc/foo");
950 assert_eq!(canonicalize_path("/foo"), "home/foo");
951 }
952
953 #[test]
954 fn control_bytes_are_not_valid_world_names() {
955 assert!(valid_world_name("home/ok"));
956 assert!(!valid_world_name("home/bad\nname"));
957 assert!(!valid_world_name(""));
958 }
959
960 #[test]
961 fn dot_segments_empty_segments_and_backslashes_are_not_valid_world_names() {
962 assert!(!valid_world_name("home/../etc/secret"));
963 assert!(!valid_world_name("home/%2E%2E/etc/secret"));
964 assert!(!valid_world_name("home/./x"));
965 assert!(!valid_world_name("home//x"));
966 assert!(!valid_world_name("home/x/"));
967 assert!(!valid_world_name("home\\x"));
968 assert_eq!(
969 validate_world_name("home/%2E%2E/etc/secret"),
970 Err("world path contains dot or encoded-dot segment")
971 );
972 assert_eq!(
973 validate_world_name("home//x"),
974 Err("world path has empty segment")
975 );
976 assert_eq!(
977 validate_world_name("home\\x"),
978 Err("world path contains backslash")
979 );
980 }
981
982 #[test]
983 fn namespace_roots_and_proc_subtree_are_not_world_names() {
984 for name in [
985 "home",
986 "tmp",
987 "dev",
988 "sys",
989 "proc",
990 "proc/anything",
991 "etc",
992 "lib",
993 "boot",
994 "usr",
995 "var",
996 "var/log",
997 ] {
998 assert!(!valid_world_name(name), "{name}");
999 }
1000 assert!(valid_world_name("home/x"));
1001 assert!(valid_world_name("var/log/deletes"));
1002 }
1003
1004 #[test]
1005 fn byte_ranges_cover_normal_open_and_suffix_forms() {
1006 let mut h = HeaderMap::new();
1007 assert_eq!(hs::parse_range(&h, 10), Ok(None));
1008
1009 h.insert(header::RANGE, HeaderValue::from_static("bytes=2-5"));
1010 assert_eq!(hs::parse_range(&h, 10), Ok(Some((2, 5))));
1011
1012 h.insert(header::RANGE, HeaderValue::from_static("bytes=7-"));
1013 assert_eq!(hs::parse_range(&h, 10), Ok(Some((7, 9))));
1014
1015 h.insert(header::RANGE, HeaderValue::from_static("bytes=-3"));
1016 assert_eq!(hs::parse_range(&h, 10), Ok(Some((7, 9))));
1017
1018 h.insert(header::RANGE, HeaderValue::from_static("bytes=8-99"));
1019 assert_eq!(hs::parse_range(&h, 10), Ok(Some((8, 9))));
1020
1021 h.insert(header::RANGE, HeaderValue::from_static("bytes=11-12"));
1022 assert_eq!(hs::parse_range(&h, 10), Err(()));
1023
1024 h.insert(header::RANGE, HeaderValue::from_static("bytes=0-1,4-5"));
1025 assert_eq!(hs::parse_range(&h, 10), Ok(None));
1026 }
1027
1028 #[tokio::test]
1029 async fn get_and_head_honor_single_byte_range() {
1030 let (core, dir) = test_core("range-handler");
1031 core.write_world("home/range", b"abcdef", "text/plain", &[])
1032 .unwrap();
1033 let mut headers = HeaderMap::new();
1034 headers.insert(header::RANGE, HeaderValue::from_static("bytes=1-3"));
1035
1036 let get = unwrap_response(
1037 execute_get(
1038 headers.clone(),
1039 auth::Tier::Anon,
1040 world_path("home/range"),
1041 &core,
1042 &TraceCtx::disabled(),
1043 )
1044 .await,
1045 );
1046 assert_eq!(get.status(), StatusCode::PARTIAL_CONTENT);
1047 assert_eq!(
1048 get.headers().get(header::CONTENT_RANGE).unwrap(),
1049 "bytes 1-3/6"
1050 );
1051 assert_eq!(get.headers().get(header::CONTENT_LENGTH).unwrap(), "3");
1052
1053 let head = unwrap_response(
1054 execute_head(
1055 headers.clone(),
1056 auth::Tier::Anon,
1057 world_path("home/range"),
1058 &core,
1059 &TraceCtx::disabled(),
1060 )
1061 .await,
1062 );
1063 assert_eq!(head.status(), StatusCode::PARTIAL_CONTENT);
1064 assert_eq!(
1065 head.headers().get(header::CONTENT_RANGE).unwrap(),
1066 "bytes 1-3/6"
1067 );
1068 assert_eq!(head.headers().get(header::CONTENT_LENGTH).unwrap(), "3");
1069
1070 let _ = std::fs::remove_dir_all(dir);
1071 }
1072
1073 #[tokio::test]
1074 async fn get_and_head_advertise_accept_ranges_on_full_body() {
1075 let (core, dir) = test_core("accept-ranges");
1076 core.write_world("home/ranges", b"abcdef", "text/plain", &[])
1077 .unwrap();
1078 let headers = HeaderMap::new();
1079
1080 let get = unwrap_response(
1081 execute_get(
1082 headers.clone(),
1083 auth::Tier::Anon,
1084 world_path("home/ranges"),
1085 &core,
1086 &TraceCtx::disabled(),
1087 )
1088 .await,
1089 );
1090 assert_eq!(get.status(), StatusCode::OK);
1091 assert_eq!(get.headers().get(header::ACCEPT_RANGES).unwrap(), "bytes");
1092
1093 let head = unwrap_response(
1094 execute_head(
1095 headers.clone(),
1096 auth::Tier::Anon,
1097 world_path("home/ranges"),
1098 &core,
1099 &TraceCtx::disabled(),
1100 )
1101 .await,
1102 );
1103 assert_eq!(head.status(), StatusCode::OK);
1104 assert_eq!(head.headers().get(header::ACCEPT_RANGES).unwrap(), "bytes");
1105
1106 let _ = std::fs::remove_dir_all(dir);
1107 }
1108
1109 #[tokio::test]
1110 async fn unsatisfied_range_returns_416_with_content_range() {
1111 let (core, dir) = test_core("range-416");
1112 core.write_world("home/range", b"abcdef", "text/plain", &[])
1113 .unwrap();
1114 let mut headers = HeaderMap::new();
1115 headers.insert(header::RANGE, HeaderValue::from_static("bytes=99-100"));
1116
1117 let get = unwrap_response(
1118 execute_get(
1119 headers.clone(),
1120 auth::Tier::Anon,
1121 world_path("home/range"),
1122 &core,
1123 &TraceCtx::disabled(),
1124 )
1125 .await,
1126 );
1127 assert_eq!(get.status(), StatusCode::RANGE_NOT_SATISFIABLE);
1128 assert_eq!(
1129 get.headers().get(header::CONTENT_RANGE).unwrap(),
1130 "bytes */6"
1131 );
1132 assert_eq!(get.headers().get(header::ACCEPT_RANGES).unwrap(), "bytes");
1133
1134 let _ = std::fs::remove_dir_all(dir);
1135 }
1136
1137 #[tokio::test]
1138 async fn multi_range_is_ignored_and_returns_full_body() {
1139 let (core, dir) = test_core("multi-range");
1140 core.write_world("home/range", b"abcdef", "text/plain", &[])
1141 .unwrap();
1142 let mut headers = HeaderMap::new();
1143 headers.insert(header::RANGE, HeaderValue::from_static("bytes=0-1,4-5"));
1144
1145 let get = unwrap_response(
1146 execute_get(
1147 headers.clone(),
1148 auth::Tier::Anon,
1149 world_path("home/range"),
1150 &core,
1151 &TraceCtx::disabled(),
1152 )
1153 .await,
1154 );
1155 assert_eq!(get.status(), StatusCode::OK);
1156 assert!(get.headers().get(header::CONTENT_RANGE).is_none());
1157 assert_eq!(get.headers().get(header::CONTENT_LENGTH).unwrap(), "6");
1158
1159 let _ = std::fs::remove_dir_all(dir);
1160 }
1161
1162 #[tokio::test]
1163 async fn world_reads_advertise_monitor_and_collection_links() {
1164 let (core, dir) = test_core("link-headers");
1165 core.write_world("home/links", b"hello", "text/plain", &[])
1166 .unwrap();
1167 let headers = HeaderMap::new();
1168
1169 let get = unwrap_response(
1170 execute_get(
1171 headers.clone(),
1172 auth::Tier::Anon,
1173 world_path("home/links"),
1174 &core,
1175 &TraceCtx::disabled(),
1176 )
1177 .await,
1178 );
1179 let links: Vec<_> = get.headers().get_all(header::LINK).iter().collect();
1180 assert_eq!(links.len(), 2);
1181 assert!(links
1182 .iter()
1183 .any(|v| *v == "</listen/home/links>; rel=\"monitor\""));
1184 assert!(links
1185 .iter()
1186 .any(|v| *v == "</proc/worlds>; rel=\"collection\""));
1187
1188 let head = unwrap_response(
1189 execute_head(
1190 headers.clone(),
1191 auth::Tier::Anon,
1192 world_path("home/links"),
1193 &core,
1194 &TraceCtx::disabled(),
1195 )
1196 .await,
1197 );
1198 assert_eq!(head.headers().get_all(header::LINK).iter().count(), 2);
1199
1200 let _ = std::fs::remove_dir_all(dir);
1201 }
1202
1203 #[test]
1204 fn if_range_controls_whether_range_is_applied() {
1205 let mut h = HeaderMap::new();
1206 h.insert(header::RANGE, HeaderValue::from_static("bytes=1-3"));
1207 h.insert(
1208 header::IF_RANGE,
1209 HeaderValue::from_static("\"hmac-current\""),
1210 );
1211 assert_eq!(hs::effective_range(&h, 6, "hmac-current"), Ok(Some((1, 3))));
1212
1213 h.insert(header::IF_RANGE, HeaderValue::from_static("\"hmac-stale\""));
1214 assert_eq!(hs::effective_range(&h, 6, "hmac-current"), Ok(None));
1215
1216 h.insert(
1217 header::IF_RANGE,
1218 HeaderValue::from_static("W/\"hmac-current\""),
1219 );
1220 assert_eq!(hs::effective_range(&h, 6, "hmac-current"), Ok(None));
1221 }
1222
1223 #[tokio::test]
1224 async fn stale_if_range_returns_full_body() {
1225 let (core, dir) = test_core("if-range-stale");
1226 core.write_world("home/range", b"abcdef", "text/plain", &[])
1227 .unwrap();
1228 let mut headers = HeaderMap::new();
1229 headers.insert(header::RANGE, HeaderValue::from_static("bytes=1-3"));
1230 headers.insert(header::IF_RANGE, HeaderValue::from_static("\"hmac-stale\""));
1231
1232 let get = unwrap_response(
1233 execute_get(
1234 headers.clone(),
1235 auth::Tier::Anon,
1236 world_path("home/range"),
1237 &core,
1238 &TraceCtx::disabled(),
1239 )
1240 .await,
1241 );
1242 assert_eq!(get.status(), StatusCode::OK);
1243 assert!(get.headers().get(header::CONTENT_RANGE).is_none());
1244 assert_eq!(get.headers().get(header::CONTENT_LENGTH).unwrap(), "6");
1245
1246 let _ = std::fs::remove_dir_all(dir);
1247 }
1248
1249 #[tokio::test]
1250 async fn get_and_head_honor_if_none_match_cache_revalidation() {
1251 let (core, dir) = test_core("read-if-none-match");
1252 let h = world::write_with_audit(
1253 &core.data,
1254 "home/cache",
1255 b"cached body",
1256 "text/plain",
1257 &[],
1258 &core.hmac_key,
1259 )
1260 .unwrap();
1261 let etag = format!("\"{}\"", et::hmac_etag(&h));
1262 let mut headers = HeaderMap::new();
1263 headers.insert(header::IF_NONE_MATCH, HeaderValue::from_str(&etag).unwrap());
1264
1265 let get = unwrap_response(
1266 execute_get(
1267 headers.clone(),
1268 auth::Tier::Anon,
1269 world_path("home/cache"),
1270 &core,
1271 &TraceCtx::disabled(),
1272 )
1273 .await,
1274 );
1275 assert_eq!(get.status(), StatusCode::NOT_MODIFIED);
1276 assert_eq!(get.headers().get(header::ETAG).unwrap(), etag.as_str());
1277 assert!(get
1278 .headers()
1279 .get_all(header::LINK)
1280 .iter()
1281 .any(|v| v == "</listen/home/cache>; rel=\"monitor\""));
1282
1283 let head = unwrap_response(
1284 execute_head(
1285 headers.clone(),
1286 auth::Tier::Anon,
1287 world_path("home/cache"),
1288 &core,
1289 &TraceCtx::disabled(),
1290 )
1291 .await,
1292 );
1293 assert_eq!(head.status(), StatusCode::NOT_MODIFIED);
1294 assert_eq!(head.headers().get(header::ETAG).unwrap(), etag.as_str());
1295
1296 headers.insert(
1297 header::IF_NONE_MATCH,
1298 HeaderValue::from_static("\"hmac-stale\""),
1299 );
1300 let get = unwrap_response(
1301 execute_get(
1302 headers.clone(),
1303 auth::Tier::Anon,
1304 world_path("home/cache"),
1305 &core,
1306 &TraceCtx::disabled(),
1307 )
1308 .await,
1309 );
1310 assert_eq!(get.status(), StatusCode::OK);
1311
1312 let _ = std::fs::remove_dir_all(dir);
1313 }
1314
1315 #[tokio::test]
1316 async fn options_and_405_advertise_allow_headers() {
1317 let (core, dir) = test_core("allow");
1322 let core = std::sync::Arc::new(core);
1323 let state = server_state_for_tests(core.clone());
1324
1325 let options = options_response(WORLD_ALLOW);
1326 assert_eq!(options.status(), StatusCode::NO_CONTENT);
1327 assert_eq!(options.headers().get(header::ALLOW).unwrap(), WORLD_ALLOW);
1328
1329 let patch = pipeline::run(
1330 Method::PATCH,
1331 "home/allow".to_string(),
1332 HeaderMap::new(),
1333 Bytes::new(),
1334 &state,
1335 0,
1336 )
1337 .await;
1338 assert_eq!(patch.status(), StatusCode::METHOD_NOT_ALLOWED);
1339 assert_eq!(patch.headers().get(header::ALLOW).unwrap(), WORLD_ALLOW);
1340
1341 let _ = std::fs::remove_dir_all(dir);
1342 }
1343
1344 #[tokio::test]
1345 async fn root_and_proc_endpoints_advertise_head_options_and_405() {
1346 let root_head = root_hint(Method::HEAD).await;
1347 assert_eq!(root_head.status(), StatusCode::OK);
1348 assert_eq!(
1349 root_head.headers().get(header::CONTENT_TYPE).unwrap(),
1350 "text/plain; charset=utf-8"
1351 );
1352 assert!(root_head.headers().get(header::CONTENT_LENGTH).is_some());
1353
1354 let root_options = root_hint(Method::OPTIONS).await;
1355 assert_eq!(root_options.status(), StatusCode::NO_CONTENT);
1356 assert_eq!(
1357 root_options.headers().get(header::ALLOW).unwrap(),
1358 ROOT_ALLOW
1359 );
1360
1361 let root_post = root_hint(Method::POST).await;
1362 assert_eq!(root_post.status(), StatusCode::METHOD_NOT_ALLOWED);
1363 assert_eq!(root_post.headers().get(header::ALLOW).unwrap(), ROOT_ALLOW);
1364
1365 let version_head = proc_version(Method::HEAD).await;
1366 assert_eq!(version_head.status(), StatusCode::OK);
1367 assert!(version_head.headers().get(header::CONTENT_LENGTH).is_some());
1368
1369 let version_delete = proc_version(Method::DELETE).await;
1370 assert_eq!(version_delete.status(), StatusCode::METHOD_NOT_ALLOWED);
1371 assert_eq!(
1372 version_delete.headers().get(header::ALLOW).unwrap(),
1373 PROC_ALLOW
1374 );
1375 }
1376
1377 #[tokio::test]
1378 async fn proc_worlds_head_options_and_405_are_plain_http() {
1379 let (core, dir) = test_core("proc-worlds-http");
1380 core.write_world("home/a", b"a", "text/plain", &[]).unwrap();
1381 let state = Arc::new(core);
1382 let headers = HeaderMap::new();
1383
1384 let head = proc_worlds(
1385 State(server_state_for_tests(state.clone())),
1386 Method::HEAD,
1387 headers.clone(),
1388 )
1389 .await;
1390 assert_eq!(head.status(), StatusCode::OK);
1391 assert_eq!(
1392 head.headers().get(header::CONTENT_TYPE).unwrap(),
1393 "text/plain; charset=utf-8"
1394 );
1395 assert!(head.headers().get(header::CONTENT_LENGTH).is_some());
1396
1397 let options = proc_worlds(
1398 State(server_state_for_tests(state.clone())),
1399 Method::OPTIONS,
1400 headers.clone(),
1401 )
1402 .await;
1403 assert_eq!(options.status(), StatusCode::NO_CONTENT);
1404 assert_eq!(options.headers().get(header::ALLOW).unwrap(), PROC_ALLOW);
1405
1406 let delete = proc_worlds(
1407 State(server_state_for_tests(state)),
1408 Method::DELETE,
1409 headers,
1410 )
1411 .await;
1412 assert_eq!(delete.status(), StatusCode::METHOD_NOT_ALLOWED);
1413 assert_eq!(delete.headers().get(header::ALLOW).unwrap(), PROC_ALLOW);
1414
1415 let _ = std::fs::remove_dir_all(dir);
1416 }
1417
1418 #[tokio::test]
1419 async fn proc_namespace_is_reserved_beyond_declared_endpoints() {
1420 let not_found = proc_reserved(Method::GET).await;
1421 assert_eq!(not_found.status(), StatusCode::NOT_FOUND);
1422
1423 let head = proc_reserved(Method::HEAD).await;
1424 assert_eq!(head.status(), StatusCode::NOT_FOUND);
1425
1426 let options = proc_reserved(Method::OPTIONS).await;
1427 assert_eq!(options.status(), StatusCode::NO_CONTENT);
1428 assert_eq!(options.headers().get(header::ALLOW).unwrap(), PROC_ALLOW);
1429
1430 let put = proc_reserved(Method::PUT).await;
1431 assert_eq!(put.status(), StatusCode::METHOD_NOT_ALLOWED);
1432 assert_eq!(put.headers().get(header::ALLOW).unwrap(), PROC_ALLOW);
1433 }
1434
1435 #[tokio::test]
1436 async fn proc_audit_verify_reports_valid_chain_in_headers() {
1437 let (core, dir) = test_core("proc-audit-valid");
1438 let h = world::write_with_audit(
1439 &core.data,
1440 "home/audit-ok",
1441 b"hello",
1442 "text/plain",
1443 &[("x-meta-author".to_owned(), "ranger".to_owned())],
1444 &core.hmac_key,
1445 )
1446 .unwrap();
1447 let state = Arc::new(core);
1448 let resp = proc_audit_verify(
1449 State(server_state_for_tests(state)),
1450 Method::HEAD,
1451 AxPath("home/audit-ok/verify".to_owned()),
1452 HeaderMap::new(),
1453 )
1454 .await;
1455
1456 assert_eq!(resp.status(), StatusCode::OK);
1457 assert_eq!(resp.headers().get("x-audit-valid").unwrap(), "true");
1458 assert_eq!(resp.headers().get("x-audit-events").unwrap(), "1");
1459 assert_eq!(
1460 resp.headers().get("x-audit-latest").unwrap(),
1461 &format!("hmac-{h}")
1462 );
1463 assert_eq!(resp.headers().get(header::CONTENT_LENGTH).unwrap(), "0");
1464
1465 let _ = std::fs::remove_dir_all(dir);
1466 }
1467
1468 #[tokio::test]
1469 async fn proc_audit_verify_reports_broken_chain_in_headers() {
1470 let (core, dir) = test_core("proc-audit-broken");
1471 world::write_with_audit(
1472 &core.data,
1473 "home/audit-broken",
1474 b"hello",
1475 "text/plain",
1476 &[],
1477 &core.hmac_key,
1478 )
1479 .unwrap();
1480 let db = world::world_db(&core.data, "home/audit-broken");
1481 let c = rusqlite::Connection::open(db).unwrap();
1482 c.execute("UPDATE events SET hmac='bad' WHERE id=1", [])
1483 .unwrap();
1484
1485 let state = Arc::new(core);
1486 let resp = proc_audit_verify(
1487 State(server_state_for_tests(state)),
1488 Method::HEAD,
1489 AxPath("home/audit-broken/verify".to_owned()),
1490 HeaderMap::new(),
1491 )
1492 .await;
1493
1494 assert_eq!(resp.status(), StatusCode::CONFLICT);
1495 assert_eq!(resp.headers().get("x-audit-valid").unwrap(), "false");
1496 assert_eq!(resp.headers().get("x-audit-break-at").unwrap(), "0");
1497 assert_eq!(resp.headers().get("x-audit-actual").unwrap(), "hmac-bad");
1498
1499 let _ = std::fs::remove_dir_all(dir);
1500 }
1501
1502 #[tokio::test]
1503 async fn proc_audit_verify_escapes_tampered_header_values() {
1504 let (core, dir) = test_core("proc-audit-header-escape");
1505 world::write_with_audit(
1506 &core.data,
1507 "home/audit-escaped",
1508 b"hello",
1509 "text/plain",
1510 &[],
1511 &core.hmac_key,
1512 )
1513 .unwrap();
1514 let db = world::world_db(&core.data, "home/audit-escaped");
1515 let c = rusqlite::Connection::open(db).unwrap();
1516 c.execute(
1517 "UPDATE events SET hmac=? WHERE id=1",
1518 ["bad\nInjected: yes"],
1519 )
1520 .unwrap();
1521
1522 let state = Arc::new(core);
1523 let resp = proc_audit_verify(
1524 State(server_state_for_tests(state)),
1525 Method::HEAD,
1526 AxPath("home/audit-escaped/verify".to_owned()),
1527 HeaderMap::new(),
1528 )
1529 .await;
1530
1531 assert_eq!(resp.status(), StatusCode::CONFLICT);
1532 assert_eq!(
1533 resp.headers().get("x-audit-actual").unwrap(),
1534 "hmac-bad\\x0aInjected: yes"
1535 );
1536
1537 let _ = std::fs::remove_dir_all(dir);
1538 }
1539
1540 #[tokio::test]
1541 async fn proc_audit_verify_reports_memory_world_not_applicable() {
1542 let (core, dir) = test_core("proc-audit-memory");
1543 core.write_world("tmp/scratch", b"draft", "text/plain", &[])
1544 .unwrap();
1545 let state = Arc::new(core);
1546 let resp = proc_audit_verify(
1547 State(server_state_for_tests(state)),
1548 Method::HEAD,
1549 AxPath("tmp/scratch/verify".to_owned()),
1550 HeaderMap::new(),
1551 )
1552 .await;
1553
1554 assert_eq!(resp.status(), StatusCode::NO_CONTENT);
1555 assert_eq!(resp.headers().get("x-audit-valid").unwrap(), "n/a");
1556
1557 let _ = std::fs::remove_dir_all(dir);
1558 }
1559
1560 #[tokio::test]
1561 async fn proc_audit_verify_missing_disk_world_does_not_create_db() {
1562 let (core, dir) = test_core("proc-audit-missing-no-create");
1563 let db = world::world_db(&core.data, "home/missing-audit");
1564 assert!(!db.exists());
1565
1566 let state = Arc::new(core);
1567 let resp = proc_audit_verify(
1568 State(server_state_for_tests(state)),
1569 Method::HEAD,
1570 AxPath("home/missing-audit/verify".to_owned()),
1571 HeaderMap::new(),
1572 )
1573 .await;
1574
1575 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
1576 assert!(!db.exists());
1577
1578 let _ = std::fs::remove_dir_all(dir);
1579 }
1580
1581 #[tokio::test]
1582 async fn put_created_returns_location() {
1583 let (core, dir) = test_core("put-location");
1584 let headers = HeaderMap::new();
1585 let resp = unwrap_response(
1586 execute_put(
1587 headers.clone(),
1588 Bytes::from_static(b"new"),
1589 auth::Tier::Write,
1590 world_path("home/created"),
1591 &core,
1592 &TraceCtx::disabled(),
1593 )
1594 .await,
1595 );
1596
1597 assert_eq!(resp.status(), StatusCode::CREATED);
1598 assert_eq!(
1599 resp.headers().get(header::LOCATION).unwrap(),
1600 "/home/created"
1601 );
1602
1603 let resp = unwrap_response(
1604 execute_put(
1605 headers.clone(),
1606 Bytes::from_static(b"again"),
1607 auth::Tier::Write,
1608 world_path("home/created"),
1609 &core,
1610 &TraceCtx::disabled(),
1611 )
1612 .await,
1613 );
1614 assert_eq!(resp.status(), StatusCode::OK);
1615 assert!(resp.headers().get(header::LOCATION).is_none());
1616
1617 let _ = std::fs::remove_dir_all(dir);
1618 }
1619
1620 #[tokio::test]
1621 async fn location_and_link_headers_percent_encode_world_urls() {
1622 let (core, dir) = test_core("encoded-headers");
1623 let headers = HeaderMap::new();
1624 let resp = unwrap_response(
1625 execute_put(
1626 headers.clone(),
1627 Bytes::from_static(b"new"),
1628 auth::Tier::Write,
1629 world_path("home/café report"),
1630 &core,
1631 &TraceCtx::disabled(),
1632 )
1633 .await,
1634 );
1635 assert_eq!(resp.status(), StatusCode::CREATED);
1636 assert_eq!(
1637 resp.headers().get(header::LOCATION).unwrap(),
1638 "/home/caf%C3%A9%20report"
1639 );
1640
1641 let get = unwrap_response(
1642 execute_get(
1643 headers.clone(),
1644 auth::Tier::Anon,
1645 world_path("home/café report"),
1646 &core,
1647 &TraceCtx::disabled(),
1648 )
1649 .await,
1650 );
1651 let links: Vec<_> = get.headers().get_all(header::LINK).iter().collect();
1652 assert!(links
1653 .iter()
1654 .any(|v| *v == "</listen/home/caf%C3%A9%20report>; rel=\"monitor\""));
1655
1656 let _ = std::fs::remove_dir_all(dir);
1657 }
1658
1659 #[tokio::test]
1660 async fn unicode_worlds_roundtrip_body_headers_and_proc_listing() {
1661 let (core, dir) = test_core("unicode-roundtrip");
1662 let headers = vec![(
1663 "content-disposition".to_string(),
1664 "attachment; filename*=UTF-8''%E6%8A%A5%E5%91%8A.pdf".to_string(),
1665 )];
1666 core.write_world(
1667 "home/销售/报告",
1668 "你好,世界".as_bytes(),
1669 "text/plain; charset=utf-8",
1670 &headers,
1671 )
1672 .unwrap();
1673
1674 let req_headers = HeaderMap::new();
1675 let get = unwrap_response(
1676 execute_get(
1677 req_headers.clone(),
1678 auth::Tier::Anon,
1679 world_path("home/销售/报告"),
1680 &core,
1681 &TraceCtx::disabled(),
1682 )
1683 .await,
1684 );
1685 assert_eq!(get.status(), StatusCode::OK);
1686 assert_eq!(
1687 get.headers().get(header::CONTENT_TYPE).unwrap(),
1688 "text/plain; charset=utf-8"
1689 );
1690 assert_eq!(
1691 get.headers().get(header::CONTENT_DISPOSITION).unwrap(),
1692 "attachment; filename*=UTF-8''%E6%8A%A5%E5%91%8A.pdf"
1693 );
1694 assert!(
1695 get.headers()
1696 .get_all(header::LINK)
1697 .iter()
1698 .any(|v| *v
1699 == "</listen/home/%E9%94%80%E5%94%AE/%E6%8A%A5%E5%91%8A>; rel=\"monitor\"")
1700 );
1701
1702 let names = store::list_all(&core.data, &core.mem);
1703 assert_eq!(world_list_body(&names.unwrap()), "home/销售/报告\n");
1704
1705 let _ = std::fs::remove_dir_all(dir);
1706 }
1707
1708 #[test]
1709 fn etag_lists_match_http_strong_and_weak_rules() {
1710 assert!(et::etag_list_strong_matches("\"hmac-abc\"", "hmac-abc"));
1711 assert!(et::etag_list_strong_matches(
1712 "\"other\", \"hmac-abc\"",
1713 "hmac-abc"
1714 ));
1715 assert!(et::etag_list_strong_matches("*", "hmac-abc"));
1716 assert!(!et::etag_list_strong_matches("W/\"hmac-abc\"", "hmac-abc"));
1717 assert!(!et::etag_list_strong_matches("\"other\"", "hmac-abc"));
1718
1719 assert!(et::etag_list_weak_matches("W/\"hmac-abc\"", "hmac-abc"));
1720 }
1721
1722 #[test]
1723 fn if_none_match_star_blocks_existing_world() {
1724 let (core, dir) = test_core("if-none-match-star");
1725 core.write_world("home/cas", b"one", "text/plain; charset=utf-8", &[])
1726 .unwrap();
1727
1728 let mut headers = HeaderMap::new();
1729 headers.insert(header::IF_NONE_MATCH, HeaderValue::from_static("*"));
1730
1731 assert!(hs::check_write_preconditions(&core, "home/cas", &headers).is_err());
1732 assert!(hs::check_write_preconditions(&core, "home/new", &headers).is_ok());
1733
1734 let _ = std::fs::remove_dir_all(dir);
1735 }
1736
1737 #[tokio::test]
1738 async fn put_and_post_honor_write_preconditions_at_handler_level() {
1739 let (core, dir) = test_core("write-preconditions");
1740 let h = world::write_with_audit(
1741 &core.data,
1742 "home/cas",
1743 b"one",
1744 "text/plain; charset=utf-8",
1745 &[],
1746 &core.hmac_key,
1747 )
1748 .unwrap();
1749
1750 let mut stale = HeaderMap::new();
1751 stale.insert(header::IF_MATCH, HeaderValue::from_static("\"hmac-stale\""));
1752 let put = unwrap_response(
1753 execute_put(
1754 stale.clone(),
1755 Bytes::from_static(b"two"),
1756 auth::Tier::Write,
1757 world_path("home/cas"),
1758 &core,
1759 &TraceCtx::disabled(),
1760 )
1761 .await,
1762 );
1763 assert_eq!(put.status(), StatusCode::PRECONDITION_FAILED);
1764
1765 let post = unwrap_response(
1766 execute_post(
1767 stale.clone(),
1768 Bytes::from_static(b" plus"),
1769 auth::Tier::Write,
1770 world_path("home/cas"),
1771 &core,
1772 &TraceCtx::disabled(),
1773 )
1774 .await,
1775 );
1776 assert_eq!(post.status(), StatusCode::PRECONDITION_FAILED);
1777
1778 let mut good = HeaderMap::new();
1779 good.insert(
1780 header::IF_MATCH,
1781 HeaderValue::from_str(&format!("\"{}\"", et::hmac_etag(&h))).unwrap(),
1782 );
1783 let post = unwrap_response(
1784 execute_post(
1785 good.clone(),
1786 Bytes::from_static(b" plus"),
1787 auth::Tier::Write,
1788 world_path("home/cas"),
1789 &core,
1790 &TraceCtx::disabled(),
1791 )
1792 .await,
1793 );
1794 assert_eq!(post.status(), StatusCode::OK);
1795
1796 let _ = std::fs::remove_dir_all(dir);
1797 }
1798
1799 #[test]
1800 fn if_match_accepts_current_hmac_etag_only() {
1801 let (core, dir) = test_core("if-match-hmac");
1802 core.write_world("home/cas", b"one", "text/plain; charset=utf-8", &[])
1803 .unwrap();
1804 let mut conn = world::open(&core.data, "home/cas").unwrap();
1805 let h = audit::append_with_conn_existing(
1806 &mut conn,
1807 "put",
1808 "home/cas",
1809 &world::sha256_hex(b"one"),
1810 3,
1811 "text/plain; charset=utf-8",
1812 &[],
1813 &core.hmac_key,
1814 )
1815 .unwrap();
1816 drop(conn);
1817 let etag = format!("\"{}\"", et::hmac_etag(&h));
1818
1819 let mut good = HeaderMap::new();
1820 good.insert(header::IF_MATCH, HeaderValue::from_str(&etag).unwrap());
1821 assert!(hs::check_write_preconditions(&core, "home/cas", &good).is_ok());
1822
1823 let mut stale = HeaderMap::new();
1824 stale.insert(header::IF_MATCH, HeaderValue::from_static("\"hmac-stale\""));
1825 assert!(hs::check_write_preconditions(&core, "home/cas", &stale).is_err());
1826
1827 let _ = std::fs::remove_dir_all(dir);
1828 }
1829
1830 #[test]
1831 fn request_content_type_preserves_http_content_type_verbatim() {
1832 let mut headers = HeaderMap::new();
1833 headers.insert(
1834 header::CONTENT_TYPE,
1835 HeaderValue::from_static("application/pdf"),
1836 );
1837 assert_eq!(hs::request_content_type(&headers), "application/pdf");
1838
1839 headers.insert(
1840 header::CONTENT_TYPE,
1841 HeaderValue::from_static("text/html; charset=utf-8"),
1842 );
1843 assert_eq!(
1844 hs::request_content_type(&headers),
1845 "text/html; charset=utf-8"
1846 );
1847
1848 headers.clear();
1849 assert_eq!(
1850 hs::request_content_type(&headers),
1851 "application/octet-stream"
1852 );
1853 }
1854
1855 #[test]
1856 fn request_meta_headers_persist_default_representation_headers_only() {
1857 let mut headers = HeaderMap::new();
1865 headers.insert(header::CONTENT_TYPE, HeaderValue::from_static("image/png"));
1866 headers.insert(header::CONTENT_ENCODING, HeaderValue::from_static("gzip"));
1867 headers.insert(header::CONTENT_LANGUAGE, HeaderValue::from_static("zh-CN"));
1868 headers.insert(
1869 header::CONTENT_DISPOSITION,
1870 HeaderValue::from_static("attachment; filename=\"report.pdf\""),
1871 );
1872 headers.insert(
1873 header::CACHE_CONTROL,
1874 HeaderValue::from_static("max-age=60"),
1875 );
1876 headers.insert("access-control-allow-origin", HeaderValue::from_static("*"));
1877 headers.insert(
1878 "access-control-allow-methods",
1879 HeaderValue::from_static("GET, HEAD"),
1880 );
1881 headers.insert(
1882 "access-control-expose-headers",
1883 HeaderValue::from_static("ETag"),
1884 );
1885 headers.insert(
1886 "content-security-policy",
1887 HeaderValue::from_static("default-src 'self'"),
1888 );
1889 headers.insert("x-frame-options", HeaderValue::from_static("DENY"));
1890 headers.insert("permissions-policy", HeaderValue::from_static("camera=()"));
1891 headers.insert(
1892 "cross-origin-resource-policy",
1893 HeaderValue::from_static("same-origin"),
1894 );
1895 headers.insert(
1896 "x-content-type-options",
1897 HeaderValue::from_static("nosniff"),
1898 );
1899 headers.insert("x-future-http-thing", HeaderValue::from_static("ok"));
1900 headers.insert("x-meta-author", HeaderValue::from_static("ranger"));
1901
1902 headers.insert(
1903 header::AUTHORIZATION,
1904 HeaderValue::from_str(&format!("{} {}", "Bearer", "secret")).unwrap(),
1905 );
1906 headers.insert(
1907 "proxy-authorization",
1908 HeaderValue::from_str(&format!("{} {}", "Bearer", "secret")).unwrap(),
1909 );
1910 headers.insert(header::COOKIE, HeaderValue::from_static("sid=secret"));
1911 headers.insert(header::SET_COOKIE, HeaderValue::from_static("sid=secret"));
1912 headers.insert(header::HOST, HeaderValue::from_static("localhost:3105"));
1913 headers.insert(header::CONNECTION, HeaderValue::from_static("keep-alive"));
1914 headers.insert("keep-alive", HeaderValue::from_static("timeout=5"));
1915 headers.insert(
1916 header::TRANSFER_ENCODING,
1917 HeaderValue::from_static("chunked"),
1918 );
1919 headers.insert(header::TE, HeaderValue::from_static("trailers"));
1920 headers.insert(header::TRAILER, HeaderValue::from_static("expires"));
1921 headers.insert(header::UPGRADE, HeaderValue::from_static("websocket"));
1922 headers.insert("http2-settings", HeaderValue::from_static("abc"));
1923 headers.insert(header::CONTENT_LENGTH, HeaderValue::from_static("999"));
1924 headers.insert(header::ETAG, HeaderValue::from_static("\"fake\""));
1925 headers.insert(header::ALLOW, HeaderValue::from_static("GET"));
1926 headers.insert(header::LOCATION, HeaderValue::from_static("/elsewhere"));
1927 headers.insert(header::LINK, HeaderValue::from_static("</x>; rel=\"next\""));
1928 headers.insert(
1929 header::WWW_AUTHENTICATE,
1930 HeaderValue::from_static("Bearer realm=\"x\""),
1931 );
1932 headers.insert(header::ACCEPT, HeaderValue::from_static("text/html"));
1933 headers.insert(header::ACCEPT_ENCODING, HeaderValue::from_static("gzip"));
1934 headers.insert(header::ACCEPT_LANGUAGE, HeaderValue::from_static("zh-CN"));
1935 headers.insert("accept-charset", HeaderValue::from_static("utf-8"));
1936 headers.insert(header::RANGE, HeaderValue::from_static("bytes=0-1"));
1937 headers.insert(header::IF_MATCH, HeaderValue::from_static("\"abc\""));
1938 headers.insert(header::IF_NONE_MATCH, HeaderValue::from_static("\"abc\""));
1939 headers.insert(header::IF_RANGE, HeaderValue::from_static("\"abc\""));
1940 headers.insert(
1941 header::IF_MODIFIED_SINCE,
1942 HeaderValue::from_static("Wed, 21 Oct 2015 07:28:00 GMT"),
1943 );
1944 headers.insert(
1945 header::IF_UNMODIFIED_SINCE,
1946 HeaderValue::from_static("Wed, 21 Oct 2015 07:28:00 GMT"),
1947 );
1948 headers.insert(header::EXPECT, HeaderValue::from_static("100-continue"));
1949 headers.insert("sec-fetch-mode", HeaderValue::from_static("cors"));
1950 headers.insert("sec-ch-ua", HeaderValue::from_static("\"Chromium\""));
1951 headers.insert("dnt", HeaderValue::from_static("1"));
1952 headers.insert(
1953 header::ORIGIN,
1954 HeaderValue::from_static("https://example.com"),
1955 );
1956 headers.insert(
1957 header::REFERER,
1958 HeaderValue::from_static("https://example.com/"),
1959 );
1960 headers.insert(header::USER_AGENT, HeaderValue::from_static("curl"));
1961 headers.insert(header::SERVER, HeaderValue::from_static("fake"));
1962 headers.insert(
1963 header::DATE,
1964 HeaderValue::from_static("Wed, 21 Oct 2015 07:28:00 GMT"),
1965 );
1966 headers.insert(header::AGE, HeaderValue::from_static("10"));
1967 headers.insert(header::VARY, HeaderValue::from_static("*"));
1968 headers.insert("via", HeaderValue::from_static("1.1 proxy"));
1969 headers.insert("forwarded", HeaderValue::from_static("for=127.0.0.1"));
1970 headers.insert("x-forwarded-for", HeaderValue::from_static("127.0.0.1"));
1971 headers.insert("x-forwarded-host", HeaderValue::from_static("example.com"));
1972 headers.insert("x-forwarded-proto", HeaderValue::from_static("https"));
1973 headers.insert("x-real-ip", HeaderValue::from_static("203.0.113.7"));
1974 headers.insert("true-client-ip", HeaderValue::from_static("203.0.113.7"));
1975 headers.insert("client-ip", HeaderValue::from_static("203.0.113.7"));
1976 headers.insert("clear-site-data", HeaderValue::from_static("\"cookies\""));
1977 headers.insert(
1981 "traceparent",
1982 HeaderValue::from_static("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"),
1983 );
1984 headers.insert(
1985 "tracestate",
1986 HeaderValue::from_static("rojo=00f067aa0ba902b7"),
1987 );
1988 headers.insert(
1989 "baggage",
1990 HeaderValue::from_static("userId=alice,serverNode=DF%2028"),
1991 );
1992 headers.insert(
1993 "b3",
1994 HeaderValue::from_static("80f198ee56343ba864fe8b2a57d3eff7-e457b5a2e4d86bd1-1"),
1995 );
1996 headers.insert(
1997 "x-b3-traceid",
1998 HeaderValue::from_static("80f198ee56343ba864fe8b2a57d3eff7"),
1999 );
2000 headers.insert("x-b3-spanid", HeaderValue::from_static("e457b5a2e4d86bd1"));
2001 headers.insert("x-b3-sampled", HeaderValue::from_static("1"));
2002 headers.insert(
2003 "x-amzn-trace-id",
2004 HeaderValue::from_static("Root=1-5759e988-bd862e3fe1be46a994272793"),
2005 );
2006 headers.insert("cf-ray", HeaderValue::from_static("8f1234567abcdef0-IAD"));
2007 headers.insert("cf-connecting-ip", HeaderValue::from_static("203.0.113.7"));
2008 headers.insert(
2009 "cf-visitor",
2010 HeaderValue::from_static("{\"scheme\":\"https\"}"),
2011 );
2012 headers.insert(
2014 "http3-settings",
2015 HeaderValue::from_static("AAMAAABkAARAAgAAAAAEAIAAAAA"),
2016 );
2017 headers.insert("pragma", HeaderValue::from_static("no-cache"));
2018
2019 let allowlist = hs::HeaderAllowlist::empty();
2020 let user_deny = hs::HeaderAllowlist::empty();
2021 let meta = hs::request_meta_headers(&headers, &allowlist, &user_deny);
2022 let has = |name: &str| meta.iter().any(|(n, _)| n == name);
2023
2024 assert!(meta.contains(&("content-encoding".to_string(), "gzip".to_string())));
2025 assert!(meta.contains(&("content-language".to_string(), "zh-CN".to_string())));
2026 assert!(meta.contains(&(
2027 "content-disposition".to_string(),
2028 "attachment; filename=\"report.pdf\"".to_string()
2029 )));
2030 assert!(meta.contains(&("cache-control".to_string(), "max-age=60".to_string())));
2031 assert!(meta.contains(&("access-control-allow-origin".to_string(), "*".to_string())));
2032 assert!(meta.contains(&(
2033 "content-security-policy".to_string(),
2034 "default-src 'self'".to_string()
2035 )));
2036 assert!(meta.contains(&("x-frame-options".to_string(), "DENY".to_string())));
2037 assert!(meta.contains(&("permissions-policy".to_string(), "camera=()".to_string())));
2038 assert!(meta.contains(&(
2039 "cross-origin-resource-policy".to_string(),
2040 "same-origin".to_string()
2041 )));
2042 assert!(
2045 !has("x-meta-author"),
2046 "x-meta-* is opt-in via ELASTIK_PERSIST_HEADERS"
2047 );
2048 assert!(
2049 !has("x-future-http-thing"),
2050 "unknown x- headers default-deny"
2051 );
2052
2053 for name in [
2054 "authorization",
2055 "proxy-authorization",
2056 "cookie",
2057 "set-cookie",
2058 "host",
2059 "connection",
2060 "keep-alive",
2061 "transfer-encoding",
2062 "te",
2063 "trailer",
2064 "upgrade",
2065 "http2-settings",
2066 "content-type",
2067 "content-length",
2068 "etag",
2069 "allow",
2070 "location",
2071 "link",
2072 "www-authenticate",
2073 "accept",
2074 "accept-encoding",
2075 "accept-language",
2076 "accept-charset",
2077 "range",
2078 "if-match",
2079 "if-none-match",
2080 "if-range",
2081 "if-modified-since",
2082 "if-unmodified-since",
2083 "expect",
2084 "sec-fetch-mode",
2085 "sec-ch-ua",
2086 "dnt",
2087 "origin",
2088 "referer",
2089 "user-agent",
2090 "server",
2091 "date",
2092 "age",
2093 "vary",
2094 "x-request-id",
2095 "x-elapsed-us",
2096 "x-elapsed-ms",
2097 "x-content-type-options",
2098 "via",
2099 "forwarded",
2100 "x-forwarded-for",
2101 "x-forwarded-host",
2102 "x-forwarded-proto",
2103 "x-real-ip",
2104 "true-client-ip",
2105 "client-ip",
2106 "clear-site-data",
2107 "traceparent",
2109 "tracestate",
2110 "baggage",
2111 "b3",
2112 "x-b3-traceid",
2113 "x-b3-spanid",
2114 "x-b3-sampled",
2115 "x-amzn-trace-id",
2117 "cf-ray",
2118 "cf-connecting-ip",
2119 "cf-visitor",
2120 "http3-settings",
2122 "pragma",
2123 ] {
2124 assert!(!has(name), "{name} should not be persisted");
2125 }
2126 }
2127
2128 #[test]
2129 fn request_meta_headers_deduplicate_repeated_names_last_wins() {
2130 let mut headers = HeaderMap::new();
2131 headers.append("x-meta-author", HeaderValue::from_static("alice"));
2132 headers.append("x-meta-author", HeaderValue::from_static("bob"));
2133
2134 let allowlist = hs::HeaderAllowlist::parse("x-meta-*");
2137 let user_deny = hs::HeaderAllowlist::empty();
2138 let meta = hs::request_meta_headers(&headers, &allowlist, &user_deny);
2139
2140 assert_eq!(meta, vec![("x-meta-author".to_string(), "bob".to_string())]);
2141 }
2142
2143 #[test]
2144 fn request_meta_headers_user_allowlist_persists_custom_headers() {
2145 let mut headers = HeaderMap::new();
2149 headers.insert("x-author", HeaderValue::from_static("ranger"));
2150 headers.insert("x-version", HeaderValue::from_static("7.1.0"));
2151 headers.insert("x-my-tag", HeaderValue::from_static("custom"));
2152 headers.insert("x-my-region", HeaderValue::from_static("ap-east-1"));
2153 headers.insert("x-other", HeaderValue::from_static("not-allowlisted"));
2154 headers.insert(
2156 "traceparent",
2157 HeaderValue::from_static("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"),
2158 );
2159 headers.insert(
2161 header::CACHE_CONTROL,
2162 HeaderValue::from_static("max-age=60"),
2163 );
2164
2165 let allowlist = hs::HeaderAllowlist::parse("x-author,x-version,x-my-*,traceparent");
2166 let user_deny = hs::HeaderAllowlist::empty();
2167 let meta = hs::request_meta_headers(&headers, &allowlist, &user_deny);
2168 let has = |name: &str| meta.iter().any(|(n, _)| n == name);
2169
2170 assert!(has("x-author"));
2172 assert!(has("x-version"));
2173 assert!(has("x-my-tag"));
2175 assert!(has("x-my-region"));
2176 assert!(!has("x-other"));
2178 assert!(
2181 !has("traceparent"),
2182 "L1 hard deny must override user allowlist; tracing context never persists"
2183 );
2184 assert!(has("cache-control"));
2186 }
2187
2188 #[test]
2189 fn request_meta_headers_user_deny_subtracts_from_default_allow() {
2190 let mut headers = HeaderMap::new();
2195 headers.insert(
2197 header::CACHE_CONTROL,
2198 HeaderValue::from_static("max-age=60"),
2199 );
2200 headers.insert(header::CONTENT_ENCODING, HeaderValue::from_static("gzip"));
2201 headers.insert("permissions-policy", HeaderValue::from_static("camera=()"));
2202 headers.insert("x-author", HeaderValue::from_static("ranger"));
2204
2205 let allowlist = hs::HeaderAllowlist::parse("x-author");
2209 let user_deny = hs::HeaderAllowlist::parse("cache-control,permissions-policy,x-author");
2210 let meta = hs::request_meta_headers(&headers, &allowlist, &user_deny);
2211 let has = |name: &str| meta.iter().any(|(n, _)| n == name);
2212
2213 assert!(
2215 !has("cache-control"),
2216 "user-deny must subtract from L2 default-allow"
2217 );
2218 assert!(!has("permissions-policy"));
2219 assert!(has("content-encoding"));
2221 assert!(
2223 !has("x-author"),
2224 "user-deny must override user-allow when both match"
2225 );
2226 }
2227
2228 #[test]
2229 fn request_meta_headers_is_case_insensitive_per_rfc_7230() {
2230 let mut headers = HeaderMap::new();
2236 headers.insert("X-Author", HeaderValue::from_static("ranger"));
2240 headers.insert("CACHE-CONTROL", HeaderValue::from_static("max-age=60"));
2241 headers.insert("Traceparent", HeaderValue::from_static("00-...-01"));
2242 headers.insert("X-Forwarded-For", HeaderValue::from_static("203.0.113.7"));
2243
2244 let allowlist = hs::HeaderAllowlist::parse("X-AUTHOR, X-Version");
2247 let user_deny = hs::HeaderAllowlist::empty();
2248 let meta = hs::request_meta_headers(&headers, &allowlist, &user_deny);
2249 let has = |name: &str| meta.iter().any(|(n, _)| n == name);
2250
2251 assert!(has("x-author"), "X-Author allowlisted as X-AUTHOR persists");
2254 assert!(has("cache-control"));
2256 assert!(!has("traceparent"));
2259 assert!(!has("x-forwarded-for"));
2260
2261 for (name, _) in &meta {
2263 assert_eq!(
2264 name,
2265 &name.to_ascii_lowercase(),
2266 "stored meta keys must be lowercase"
2267 );
2268 }
2269 }
2270
2271 #[test]
2272 fn header_allowlist_parser_handles_whitespace_dedup_and_wildcards() {
2273 let allow = hs::HeaderAllowlist::parse(
2276 " X-Author , x-version, x-my-*, x-version, , *, x-my-* , x-AUTHOR",
2277 );
2278 assert!(allow.matches("x-author"));
2279 assert!(allow.matches("x-version"));
2280 assert!(allow.matches("x-my-tag"));
2281 assert!(allow.matches("x-my-region"));
2282 assert!(!allow.matches("x-other"));
2283 assert!(!allow.matches(""));
2284 assert!(!allow.matches("anything-else"));
2286
2287 assert!(hs::HeaderAllowlist::parse("").is_empty());
2289 assert!(hs::HeaderAllowlist::parse(" , ,").is_empty());
2290 }
2291
2292 #[tokio::test]
2293 async fn get_returns_stored_standard_representation_headers() {
2294 let (core, dir) = test_core("representation-headers");
2295 let headers = vec![
2296 ("content-encoding".to_string(), "gzip".to_string()),
2297 (
2298 "content-disposition".to_string(),
2299 "attachment; filename=\"report.pdf\"".to_string(),
2300 ),
2301 ("access-control-allow-origin".to_string(), "*".to_string()),
2302 (
2303 "content-security-policy".to_string(),
2304 "default-src 'self'".to_string(),
2305 ),
2306 ("x-frame-options".to_string(), "DENY".to_string()),
2307 ];
2308 core.write_world(
2309 "home/gzip",
2310 b"compressed bytes",
2311 "application/pdf",
2312 &headers,
2313 )
2314 .unwrap();
2315
2316 let req_headers = HeaderMap::new();
2317 let resp = unwrap_response(
2318 execute_get(
2319 req_headers.clone(),
2320 auth::Tier::Anon,
2321 world_path("home/gzip"),
2322 &core,
2323 &TraceCtx::disabled(),
2324 )
2325 .await,
2326 );
2327 assert_eq!(resp.status(), StatusCode::OK);
2328 assert_eq!(
2329 resp.headers().get(header::CONTENT_ENCODING).unwrap(),
2330 "gzip"
2331 );
2332 assert_eq!(
2333 resp.headers().get(header::CONTENT_DISPOSITION).unwrap(),
2334 "attachment; filename=\"report.pdf\""
2335 );
2336 assert_eq!(
2337 resp.headers().get("access-control-allow-origin").unwrap(),
2338 "*"
2339 );
2340 assert_eq!(
2341 resp.headers().get("content-security-policy").unwrap(),
2342 "default-src 'self'"
2343 );
2344 assert_eq!(resp.headers().get("x-frame-options").unwrap(), "DENY");
2345
2346 let _ = std::fs::remove_dir_all(dir);
2347 }
2348
2349 #[test]
2350 fn worlds_store_content_type_not_private_extensions() {
2351 let (core, dir) = test_core("content-type");
2352 core.write_world("home/pdf", b"%PDF-1.7", "application/pdf", &[])
2353 .unwrap();
2354
2355 let stage = core.read_world("home/pdf").unwrap().unwrap();
2356 assert_eq!(stage.content_type, "application/pdf");
2357 assert_eq!(stage.body, b"%PDF-1.7");
2358
2359 let _ = std::fs::remove_dir_all(dir);
2360 }
2361
2362 #[test]
2363 fn storage_prefix_routes_memory_and_disk_modes() {
2364 assert!(!store::is_memory_world("home/report"));
2365 assert!(!store::is_memory_world("etc/config"));
2366 assert!(store::is_memory_world("tmp/scratch"));
2367 assert!(store::is_memory_world("dev/fb0"));
2368 assert!(store::is_memory_world("sys/status"));
2369 assert!(store::is_persistent("home/report"));
2370 assert!(!store::is_persistent("tmp/scratch"));
2371 }
2372
2373 #[test]
2374 fn memory_worlds_do_not_create_sqlite_files_or_audit_chain() {
2375 let (core, dir) = test_core("memory-world");
2376 core.write_world(
2377 "tmp/scratch",
2378 b"draft",
2379 "text/plain; charset=utf-8",
2380 &[("x-meta-owner".to_string(), "agent".to_string())],
2381 )
2382 .unwrap();
2383
2384 let stage = core.read_world("tmp/scratch").unwrap().unwrap();
2385 assert_eq!(stage.body, b"draft");
2386 assert_eq!(stage.content_type, "text/plain; charset=utf-8");
2387 assert_eq!(
2388 stage.headers,
2389 vec![("x-meta-owner".to_string(), "agent".to_string())]
2390 );
2391 assert!(!world::world_db(&core.data, "tmp/scratch").exists());
2392 assert!(audit::latest_hmac(&core.data, "tmp/scratch").is_none());
2393
2394 let names = store::list_all(&core.data, &core.mem);
2395 assert_eq!(names.unwrap(), vec!["tmp/scratch".to_string()]);
2396
2397 let _ = std::fs::remove_dir_all(dir);
2398 }
2399
2400 #[test]
2401 fn disk_worlds_create_sqlite_files_and_audit_chain_when_using_audit_path() {
2402 let (core, dir) = test_core("disk-world");
2403 let h = world::write_with_audit(
2404 &core.data,
2405 "home/report",
2406 b"final",
2407 "text/plain; charset=utf-8",
2408 &[],
2409 &core.hmac_key,
2410 )
2411 .unwrap();
2412
2413 let stage = core.read_world("home/report").unwrap().unwrap();
2414 assert_eq!(stage.body, b"final");
2415 assert!(world::world_db(&core.data, "home/report").exists());
2416 assert_eq!(audit::latest_hmac(&core.data, "home/report"), Some(h));
2417
2418 let names = store::list_all(&core.data, &core.mem);
2419 assert_eq!(names.unwrap(), vec!["home/report".to_string()]);
2420
2421 let _ = std::fs::remove_dir_all(dir);
2422 }
2423
2424 #[test]
2425 fn audit_keeps_historical_metadata_without_json_payload() {
2426 let (core, dir) = test_core("audit-meta");
2427 let headers = vec![("x-meta-author".to_string(), "ranger".to_string())];
2428 let h = world::write_with_audit(
2429 &core.data,
2430 "home/audit-meta",
2431 b"hello",
2432 "text/plain; charset=utf-8",
2433 &headers,
2434 &core.hmac_key,
2435 )
2436 .unwrap();
2437
2438 let c = rusqlite::Connection::open(world::world_db(&core.data, "home/audit-meta")).unwrap();
2439 let (content_type, meta_sha256): (String, String) = c
2440 .query_row(
2441 "SELECT content_type, meta_sha256 FROM events WHERE hmac=?",
2442 [h],
2443 |r| Ok((r.get(0)?, r.get(1)?)),
2444 )
2445 .unwrap();
2446 assert_eq!(content_type, "text/plain; charset=utf-8");
2447 assert_eq!(
2448 meta_sha256,
2449 audit::meta_sha256("text/plain; charset=utf-8", &headers)
2450 );
2451
2452 let author: String = c
2453 .query_row(
2454 "SELECT value FROM event_headers WHERE name='x-meta-author'",
2455 [],
2456 |r| r.get(0),
2457 )
2458 .unwrap();
2459 assert_eq!(author, "ranger");
2460
2461 let _ = std::fs::remove_dir_all(dir);
2462 }
2463
2464 #[tokio::test]
2465 async fn post_audit_uses_existing_representation_metadata() {
2466 let (core, dir) = test_core("post-audit-meta");
2467 let headers = vec![
2468 ("content-encoding".to_string(), "gzip".to_string()),
2469 ("x-meta-author".to_string(), "ranger".to_string()),
2470 ];
2471 world::write_with_audit(
2472 &core.data,
2473 "home/post-audit-meta",
2474 b"hello",
2475 "text/plain",
2476 &headers,
2477 &core.hmac_key,
2478 )
2479 .unwrap();
2480
2481 let mut req_headers = HeaderMap::new();
2482 req_headers.insert(
2483 header::CONTENT_TYPE,
2484 HeaderValue::from_static("application/pdf"),
2485 );
2486 req_headers.insert(header::CONTENT_LANGUAGE, HeaderValue::from_static("zh-CN"));
2487 let resp = unwrap_response(
2488 execute_post(
2489 req_headers.clone(),
2490 Bytes::from_static(b" world"),
2491 auth::Tier::Write,
2492 world_path("home/post-audit-meta"),
2493 &core,
2494 &TraceCtx::disabled(),
2495 )
2496 .await,
2497 );
2498 assert_eq!(resp.status(), StatusCode::OK);
2499
2500 let c = rusqlite::Connection::open(world::world_db(&core.data, "home/post-audit-meta"))
2501 .unwrap();
2502 let (content_type, meta_sha256): (String, String) = c
2503 .query_row(
2504 "SELECT content_type, meta_sha256 FROM events WHERE event_type='append'",
2505 [],
2506 |r| Ok((r.get(0)?, r.get(1)?)),
2507 )
2508 .unwrap();
2509 assert_eq!(content_type, "text/plain");
2510 assert_eq!(meta_sha256, audit::meta_sha256("text/plain", &headers));
2511
2512 let language_count: i64 = c
2513 .query_row(
2514 "SELECT COUNT(*) FROM event_headers WHERE name='content-language'",
2515 [],
2516 |r| r.get(0),
2517 )
2518 .unwrap();
2519 assert_eq!(language_count, 0);
2520
2521 let _ = std::fs::remove_dir_all(dir);
2522 }
2523
2524 #[tokio::test]
2525 async fn delete_honors_if_match_before_audit_or_remove() {
2526 let (core, dir) = test_core("delete-if-match");
2527 let core = Arc::new(core);
2528 let state = server_state_for_tests(core.clone());
2529 let h = world::write_with_audit(
2530 &core.data,
2531 "home/delete-cas",
2532 b"alive",
2533 "text/plain; charset=utf-8",
2534 &[],
2535 &core.hmac_key,
2536 )
2537 .unwrap();
2538
2539 let mut stale = HeaderMap::new();
2540 stale.insert(header::IF_MATCH, HeaderValue::from_static("\"hmac-stale\""));
2541 let resp = unwrap_response(
2542 execute_delete(
2543 stale.clone(),
2544 auth::Tier::Approve,
2545 world_path("home/delete-cas"),
2546 &state,
2547 &TraceCtx::disabled(),
2548 )
2549 .await,
2550 );
2551 assert_eq!(resp.status(), StatusCode::PRECONDITION_FAILED);
2552 assert!(core.read_world("home/delete-cas").unwrap().is_some());
2553
2554 let mut good = HeaderMap::new();
2555 good.insert(
2556 header::IF_MATCH,
2557 HeaderValue::from_str(&format!("\"{}\"", et::hmac_etag(&h))).unwrap(),
2558 );
2559 let resp = unwrap_response(
2560 execute_delete(
2561 good.clone(),
2562 auth::Tier::Approve,
2563 world_path("home/delete-cas"),
2564 &state,
2565 &TraceCtx::disabled(),
2566 )
2567 .await,
2568 );
2569 assert_eq!(resp.status(), StatusCode::NO_CONTENT);
2570 assert!(core.read_world("home/delete-cas").unwrap().is_none());
2571 assert!(core.read_world("var/log/deletes").unwrap().is_some());
2572 assert!(matches!(
2573 core.cached_verify_chain("var/log/deletes").unwrap(),
2574 Some(audit::VerifyReport::Valid(_))
2575 ));
2576 let ledger = world::open_existing(&core.data, "var/log/deletes")
2577 .unwrap()
2578 .unwrap();
2579 let mut stmt = ledger
2580 .prepare("SELECT event_type FROM events ORDER BY id")
2581 .unwrap();
2582 let events: Vec<String> = stmt
2583 .query_map([], |r| r.get(0))
2584 .unwrap()
2585 .collect::<Result<_, _>>()
2586 .unwrap();
2587 assert_eq!(events, vec!["delete_intent", "delete_commit"]);
2588
2589 let _ = std::fs::remove_dir_all(dir);
2590 }
2591
2592 #[tokio::test]
2593 async fn delete_returns_500_when_commit_audit_fails_after_physical_delete() {
2594 let (core, dir) = test_core("delete-commit-audit-fail");
2595 let core = Arc::new(core);
2596 let state = server_state_for_tests(core.clone());
2597 core.write_world("home/delete-degraded", b"alive", "text/plain", &[])
2598 .unwrap();
2599 world::write_with_audit(
2600 &core.data,
2601 "var/log/deletes",
2602 b"ledger",
2603 "text/plain",
2604 &[],
2605 &core.hmac_key,
2606 )
2607 .unwrap();
2608 core.delete_ledger_created.store(true, Ordering::Relaxed);
2609 {
2610 let c =
2611 rusqlite::Connection::open(world::world_db(&core.data, "var/log/deletes")).unwrap();
2612 c.execute_batch(
2613 r#"
2614 CREATE TRIGGER fail_delete_commit
2615 BEFORE INSERT ON events
2616 WHEN NEW.event_type='delete_commit'
2617 BEGIN
2618 SELECT RAISE(FAIL, 'delete_commit blocked');
2619 END;
2620 "#,
2621 )
2622 .unwrap();
2623 }
2624
2625 let resp = unwrap_response(
2626 execute_delete(
2627 HeaderMap::new(),
2628 auth::Tier::Approve,
2629 world_path("home/delete-degraded"),
2630 &state,
2631 &TraceCtx::disabled(),
2632 )
2633 .await,
2634 );
2635
2636 assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR);
2637 assert!(core.read_world("home/delete-degraded").unwrap().is_none());
2638 let ledger = world::open_existing(&core.data, "var/log/deletes")
2639 .unwrap()
2640 .unwrap();
2641 let events = ledger
2642 .prepare("SELECT event_type FROM events ORDER BY id")
2643 .unwrap()
2644 .query_map([], |r| r.get::<_, String>(0))
2645 .unwrap()
2646 .collect::<Result<Vec<_>, _>>()
2647 .unwrap();
2648 assert_eq!(events, vec!["put", "delete_intent", "delete_commit_failed"]);
2649
2650 let _ = std::fs::remove_dir_all(dir);
2651 }
2652
2653 #[tokio::test]
2654 async fn delete_rejects_auth_token_and_append_only_ledger() {
2655 let (core, dir) = test_core("delete-policy");
2656 let core = Arc::new(core);
2657 let state = server_state_for_tests(core.clone());
2658 world::write_with_audit(
2659 &core.data,
2660 "home/delete-policy",
2661 b"alive",
2662 "text/plain; charset=utf-8",
2663 &[],
2664 &core.hmac_key,
2665 )
2666 .unwrap();
2667 world::write_with_audit(
2668 &core.data,
2669 "var/log/deletes",
2670 b"ledger",
2671 "text/plain; charset=utf-8",
2672 &[],
2673 &core.hmac_key,
2674 )
2675 .unwrap();
2676 let headers = HeaderMap::new();
2677
2678 let auth_delete = unwrap_response(
2679 execute_delete(
2680 headers.clone(),
2681 auth::Tier::Write,
2682 world_path("home/delete-policy"),
2683 &state,
2684 &TraceCtx::disabled(),
2685 )
2686 .await,
2687 );
2688 assert_eq!(auth_delete.status(), StatusCode::UNAUTHORIZED);
2689 assert!(core.read_world("home/delete-policy").unwrap().is_some());
2690
2691 let ledger_delete = unwrap_response(
2692 execute_delete(
2693 headers.clone(),
2694 auth::Tier::Approve,
2695 world_path("var/log/deletes"),
2696 &state,
2697 &TraceCtx::disabled(),
2698 )
2699 .await,
2700 );
2701 assert_eq!(ledger_delete.status(), StatusCode::UNAUTHORIZED);
2702 assert_eq!(
2703 ledger_delete
2704 .headers()
2705 .get(header::WWW_AUTHENTICATE)
2706 .unwrap(),
2707 "Bearer realm=\"elastik\""
2708 );
2709 assert_eq!(
2710 ledger_delete.headers().get(header::CONTENT_TYPE).unwrap(),
2711 "text/plain; charset=utf-8"
2712 );
2713 assert_eq!(
2714 response_text(ledger_delete).await,
2715 "auth required: delete ledger is append-only\n"
2716 );
2717 assert!(core.read_world("var/log/deletes").unwrap().is_some());
2718
2719 let _ = std::fs::remove_dir_all(dir);
2720 }
2721
2722 #[tokio::test]
2723 async fn delete_missing_world_does_not_write_delete_ledger() {
2724 let (core, dir) = test_core("delete-missing");
2725 let core = Arc::new(core);
2726 let state = server_state_for_tests(core.clone());
2727 let headers = HeaderMap::new();
2728 let resp = unwrap_response(
2729 execute_delete(
2730 headers.clone(),
2731 auth::Tier::Approve,
2732 world_path("home/missing"),
2733 &state,
2734 &TraceCtx::disabled(),
2735 )
2736 .await,
2737 );
2738 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
2739 assert!(core.read_world("var/log/deletes").unwrap().is_none());
2740
2741 let _ = std::fs::remove_dir_all(dir);
2742 }
2743
2744 #[test]
2745 fn proc_worlds_body_is_plain_lines() {
2746 assert_eq!(world_list_body(&[]), "");
2747 assert_eq!(
2748 world_list_body(&["home/a".to_owned(), "tmp/b".to_owned()]),
2749 "home/a\ntmp/b\n"
2750 );
2751 }
2752
2753 #[tokio::test]
2754 async fn proc_du_and_df_report_resource_usage() {
2755 let (mut core, dir) = test_core("proc-du-df");
2756 core.max_storage_bytes = Some(10);
2757 core.write_world("home/hello", b"hello", "text/plain", &[])
2758 .unwrap();
2759 core.write_world("tmp/scratch", b"data", "text/plain", &[])
2760 .unwrap();
2761 let state = Arc::new(core);
2762 let headers = HeaderMap::new();
2763
2764 let du = proc_du(
2765 State(server_state_for_tests(state.clone())),
2766 Method::GET,
2767 headers.clone(),
2768 )
2769 .await;
2770 assert_eq!(du.status(), StatusCode::OK);
2771 let du_body = response_text(du).await;
2772 assert!(du_body.contains("home/hello\t5\n"));
2773 assert!(du_body.contains("tmp/scratch\t4\n"));
2774
2775 let df = proc_df(
2776 State(server_state_for_tests(state.clone())),
2777 Method::GET,
2778 headers.clone(),
2779 )
2780 .await;
2781 assert_eq!(df.status(), StatusCode::OK);
2782 let df_body = response_text(df).await;
2783 assert!(df_body.contains("storage\t5\t10\t5\n"));
2784 assert!(df_body.contains("memory\t4\t268435456\t268435452\n"));
2785 assert!(df_body.contains("worlds\t2\tunlimited\tunlimited\n"));
2786
2787 let head = proc_du(State(server_state_for_tests(state)), Method::HEAD, headers).await;
2788 assert_eq!(head.status(), StatusCode::OK);
2789 assert_eq!(head.headers().get(header::CONTENT_LENGTH).unwrap(), "27");
2790 assert_eq!(response_text(head).await, "");
2791
2792 let _ = std::fs::remove_dir_all(dir);
2793 }
2794
2795 #[tokio::test]
2796 async fn proc_du_and_df_require_read_token_when_enabled() {
2797 let (mut core, dir) = test_core("proc-du-df-read-token");
2798 core.tokens.read = auth::NonEmptyBytes::new(b"reader".to_vec());
2799 let state = Arc::new(core);
2800 let headers = HeaderMap::new();
2801
2802 let du = proc_du(
2803 State(server_state_for_tests(state.clone())),
2804 Method::GET,
2805 headers.clone(),
2806 )
2807 .await;
2808 assert_eq!(du.status(), StatusCode::UNAUTHORIZED);
2809
2810 let df = proc_df(
2811 State(server_state_for_tests(state.clone())),
2812 Method::GET,
2813 headers,
2814 )
2815 .await;
2816 assert_eq!(df.status(), StatusCode::UNAUTHORIZED);
2817
2818 let mut auth_headers = HeaderMap::new();
2819 let auth_value =
2820 HeaderValue::from_str(&format!("{} {}", "Bearer", "reader")).expect("valid test auth");
2821 auth_headers.insert(header::AUTHORIZATION, auth_value);
2822 let authorized = proc_df(
2823 State(server_state_for_tests(state)),
2824 Method::GET,
2825 auth_headers,
2826 )
2827 .await;
2828 assert_eq!(authorized.status(), StatusCode::OK);
2829
2830 let _ = std::fs::remove_dir_all(dir);
2831 }
2832
2833 #[tokio::test]
2834 async fn proc_df_world_count_tracks_durable_put_and_delete() {
2835 let (core, dir) = test_core("proc-df-world-count");
2836 let headers = HeaderMap::new();
2837
2838 let put = unwrap_response(
2839 execute_put(
2840 headers.clone(),
2841 Bytes::from_static(b"x"),
2842 auth::Tier::Write,
2843 world_path("home/count"),
2844 &core,
2845 &TraceCtx::disabled(),
2846 )
2847 .await,
2848 );
2849 assert_eq!(put.status(), StatusCode::CREATED);
2850
2851 let state = Arc::new(core);
2852 let server_state = server_state_for_tests(state.clone());
2853 let before = proc_df(State(server_state.clone()), Method::GET, headers.clone()).await;
2854 assert!(response_text(before)
2855 .await
2856 .contains("worlds\t1\tunlimited\tunlimited\n"));
2857
2858 let delete = unwrap_response(
2859 execute_delete(
2860 headers.clone(),
2861 auth::Tier::Approve,
2862 world_path("home/count"),
2863 &server_state,
2864 &TraceCtx::disabled(),
2865 )
2866 .await,
2867 );
2868 assert_eq!(delete.status(), StatusCode::NO_CONTENT);
2869
2870 let after = proc_df(State(server_state), Method::GET, headers).await;
2871 let after_body = response_text(after).await;
2872 assert!(after_body.contains("storage\t0\tunlimited\tunlimited\n"));
2873 assert!(after_body.contains("worlds\t0\tunlimited\tunlimited\n"));
2874
2875 let _ = std::fs::remove_dir_all(dir);
2876 }
2877
2878 #[tokio::test]
2879 async fn proc_pool_emits_metrics_with_type_labels() {
2880 let (core, dir) = test_core("proc-pool-metrics");
2886 let headers = HeaderMap::new();
2887
2888 let put = unwrap_response(
2889 execute_put(
2890 headers.clone(),
2891 Bytes::from_static(b"hello"),
2892 auth::Tier::Write,
2893 world_path("home/m"),
2894 &core,
2895 &TraceCtx::disabled(),
2896 )
2897 .await,
2898 );
2899 assert_eq!(put.status(), StatusCode::CREATED);
2900
2901 for _ in 0..2 {
2903 let get = unwrap_response(
2904 execute_get(
2905 headers.clone(),
2906 auth::Tier::Read,
2907 world_path("home/m"),
2908 &core,
2909 &TraceCtx::disabled(),
2910 )
2911 .await,
2912 );
2913 assert_eq!(get.status(), StatusCode::OK);
2914 }
2915
2916 let state = Arc::new(core);
2917 let server_state = server_state_for_tests(state.clone());
2918 let resp = proc_pool(State(server_state.clone()), Method::GET, headers.clone()).await;
2919 let body = response_text(resp).await;
2920
2921 assert!(body.contains("read_cache_entries 1 snapshot\n"));
2922 assert!(body.contains("read_cache_tombstones 0 snapshot\n"));
2923 assert!(body.contains("read_cache_hits 1 counter\n"));
2924 assert!(body.contains("read_cache_misses 1 counter\n"));
2925 assert!(body.contains("read_cache_capped 0 counter\n"));
2926 assert!(body.contains("read_cache_open_fails 0 counter\n"));
2927 assert!(body.contains("read_cache_max_entries "));
2928 assert!(body.contains("ledger_writer_inits 0 counter\n"));
2930
2931 let _ = unwrap_response(
2934 execute_delete(
2935 headers.clone(),
2936 auth::Tier::Approve,
2937 world_path("home/m"),
2938 &server_state,
2939 &TraceCtx::disabled(),
2940 )
2941 .await,
2942 );
2943 let resp2 = proc_pool(State(server_state), Method::GET, headers).await;
2944 let body2 = response_text(resp2).await;
2945 assert!(
2946 body2.contains("ledger_writer_inits 1 counter\n"),
2947 "expected counter to bump to 1 after first DELETE; body=\n{body2}"
2948 );
2949
2950 let _ = std::fs::remove_dir_all(dir);
2951 }
2952
2953 #[tokio::test]
2954 async fn proc_pool_requires_read_token_when_enabled() {
2955 let (mut core, dir) = test_core("proc-pool-read-token");
2960 core.tokens.read = auth::NonEmptyBytes::new(b"reader".to_vec());
2961 let state = Arc::new(core);
2962 let headers = HeaderMap::new();
2963
2964 let resp = proc_pool(State(server_state_for_tests(state)), Method::GET, headers).await;
2965 assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
2966
2967 let _ = std::fs::remove_dir_all(dir);
2968 }
2969
2970 #[cfg(feature = "multi-thread")]
2971 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
2972 async fn concurrent_first_deletes_increment_world_count_exactly_once() {
2973 let (core, dir) = test_core("concurrent-first-deletes");
2991 let headers = HeaderMap::new();
2992 for w in ["home/a", "home/b", "home/c"] {
2993 let put = unwrap_response(
2994 execute_put(
2995 headers.clone(),
2996 Bytes::from_static(b"x"),
2997 auth::Tier::Write,
2998 world_path(w),
2999 &core,
3000 &TraceCtx::disabled(),
3001 )
3002 .await,
3003 );
3004 assert_eq!(put.status(), StatusCode::CREATED);
3005 }
3006 assert_eq!(core.durable_world_count.load(Ordering::Relaxed), 3);
3007 assert!(!core.delete_ledger_created.load(Ordering::Relaxed));
3008
3009 let state = Arc::new(core);
3010 let server_state = server_state_for_tests(state.clone());
3011 let s1 = server_state.clone();
3012 let s2 = server_state.clone();
3013 let s3 = server_state.clone();
3014 let h1 = headers.clone();
3015 let h2 = headers.clone();
3016 let h3 = headers.clone();
3017 let trace1 = TraceCtx::disabled();
3018 let trace2 = TraceCtx::disabled();
3019 let trace3 = TraceCtx::disabled();
3020 let (r1, r2, r3) = tokio::join!(
3021 execute_delete(h1, auth::Tier::Approve, world_path("home/a"), &s1, &trace1),
3022 execute_delete(h2, auth::Tier::Approve, world_path("home/b"), &s2, &trace2),
3023 execute_delete(h3, auth::Tier::Approve, world_path("home/c"), &s3, &trace3),
3024 );
3025 assert_eq!(unwrap_response(r1).status(), StatusCode::NO_CONTENT);
3026 assert_eq!(unwrap_response(r2).status(), StatusCode::NO_CONTENT);
3027 assert_eq!(unwrap_response(r3).status(), StatusCode::NO_CONTENT);
3028
3029 assert_eq!(state.durable_world_count.load(Ordering::Relaxed), 1);
3032 assert!(state.delete_ledger_created.load(Ordering::Relaxed));
3033
3034 let _ = std::fs::remove_dir_all(dir);
3035 }
3036
3037 async fn response_text(resp: Response) -> String {
3038 let bytes = axum::body::to_bytes(resp.into_body(), usize::MAX)
3039 .await
3040 .unwrap();
3041 String::from_utf8(bytes.to_vec()).unwrap()
3042 }
3043
3044 #[tokio::test]
3058 async fn pipeline_get_existing_world_returns_200_with_body() {
3059 let (core, dir) = test_core("pipeline-get-200");
3060 core.write_world("home/hello", b"hello world", "text/plain", &[])
3061 .unwrap();
3062 let core = Arc::new(core);
3063 let state = server_state_for_tests(core.clone());
3064
3065 let resp = pipeline::run(
3066 Method::GET,
3067 "/home/hello".to_string(),
3068 HeaderMap::new(),
3069 Bytes::new(),
3070 &state,
3071 42,
3072 )
3073 .await;
3074
3075 assert_eq!(resp.status(), StatusCode::OK);
3076 assert_eq!(
3077 resp.headers()
3078 .get(header::CONTENT_TYPE)
3079 .and_then(|v| v.to_str().ok()),
3080 Some("text/plain")
3081 );
3082 assert_eq!(response_text(resp).await, "hello world");
3083
3084 let _ = std::fs::remove_dir_all(dir);
3085 }
3086
3087 #[tokio::test]
3088 async fn pipeline_head_existing_world_returns_headers_no_body() {
3089 let (core, dir) = test_core("pipeline-head-200");
3090 core.write_world("home/hello", b"hello world", "text/plain", &[])
3091 .unwrap();
3092 let core = Arc::new(core);
3093 let state = server_state_for_tests(core.clone());
3094
3095 let resp = pipeline::run(
3096 Method::HEAD,
3097 "/home/hello".to_string(),
3098 HeaderMap::new(),
3099 Bytes::new(),
3100 &state,
3101 43,
3102 )
3103 .await;
3104
3105 assert_eq!(resp.status(), StatusCode::OK);
3106 assert_eq!(
3107 resp.headers()
3108 .get(header::CONTENT_LENGTH)
3109 .and_then(|v| v.to_str().ok()),
3110 Some("11")
3111 );
3112 assert_eq!(response_text(resp).await, "");
3114
3115 let _ = std::fs::remove_dir_all(dir);
3116 }
3117
3118 #[tokio::test]
3119 async fn pipeline_get_nonexistent_world_returns_404() {
3120 let (core, dir) = test_core("pipeline-get-404");
3121 let core = Arc::new(core);
3122 let state = server_state_for_tests(core.clone());
3123
3124 let resp = pipeline::run(
3125 Method::GET,
3126 "/home/missing".to_string(),
3127 HeaderMap::new(),
3128 Bytes::new(),
3129 &state,
3130 44,
3131 )
3132 .await;
3133
3134 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
3135
3136 let _ = std::fs::remove_dir_all(dir);
3137 }
3138
3139 #[tokio::test]
3140 async fn pipeline_get_invalid_dot_segment_returns_400() {
3141 let (core, dir) = test_core("pipeline-get-400");
3142 let core = Arc::new(core);
3143 let state = server_state_for_tests(core.clone());
3144
3145 let resp = pipeline::run(
3146 Method::GET,
3147 "/home/../etc/secret".to_string(),
3148 HeaderMap::new(),
3149 Bytes::new(),
3150 &state,
3151 45,
3152 )
3153 .await;
3154
3155 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
3156
3157 let _ = std::fs::remove_dir_all(dir);
3158 }
3159
3160 #[tokio::test]
3161 async fn pipeline_get_with_read_token_required_rejects_anon() {
3162 let (mut core, dir) = test_core("pipeline-get-401");
3163 core.tokens.read = auth::NonEmptyBytes::new(b"reader".to_vec());
3164 core.write_world("home/secret", b"shhh", "text/plain", &[])
3165 .unwrap();
3166 let core = Arc::new(core);
3167 let state = server_state_for_tests(core.clone());
3168
3169 let resp = pipeline::run(
3170 Method::GET,
3171 "/home/secret".to_string(),
3172 HeaderMap::new(), Bytes::new(),
3174 &state,
3175 46,
3176 )
3177 .await;
3178
3179 assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
3180
3181 let _ = std::fs::remove_dir_all(dir);
3182 }
3183
3184 #[tokio::test]
3192 async fn world_handler_get_routes_through_pipeline() {
3193 use axum::body::Body;
3194 use axum::http::Request as HttpRequest;
3195 use tower::ServiceExt;
3196
3197 let (core, dir) = test_core("world-handler-get");
3198 core.write_world("home/hello", b"hello world", "text/plain", &[])
3199 .unwrap();
3200 let core = Arc::new(core);
3201 let state = server_state_for_tests(core.clone());
3202
3203 let app = Router::new()
3204 .route("/*world", any(world_handler))
3205 .layer(axum::middleware::from_fn_with_state(
3206 state.clone(),
3207 add_server_response_headers,
3208 ))
3209 .with_state(state);
3210
3211 let req = HttpRequest::builder()
3212 .method("GET")
3213 .uri("/home/hello")
3214 .body(Body::empty())
3215 .unwrap();
3216 let resp = app.oneshot(req).await.unwrap();
3217
3218 assert_eq!(resp.status(), StatusCode::OK);
3219 let req_id_header = resp
3226 .headers()
3227 .get("x-request-id")
3228 .and_then(|v| v.to_str().ok())
3229 .unwrap_or("missing");
3230 assert!(
3231 req_id_header.parse::<u64>().is_ok(),
3232 "x-request-id should be a numeric id, got {req_id_header:?}"
3233 );
3234 assert_eq!(response_text(resp).await, "hello world");
3235
3236 let _ = std::fs::remove_dir_all(dir);
3237 }
3238
3239 #[tokio::test]
3243 async fn world_handler_head_routes_through_pipeline() {
3244 use axum::body::Body;
3245 use axum::http::Request as HttpRequest;
3246 use tower::ServiceExt;
3247
3248 let (core, dir) = test_core("world-handler-head");
3249 core.write_world("home/hello", b"hello world", "text/plain", &[])
3250 .unwrap();
3251 let core = Arc::new(core);
3252 let state = server_state_for_tests(core.clone());
3253
3254 let app = Router::new()
3255 .route("/*world", any(world_handler))
3256 .layer(axum::middleware::from_fn_with_state(
3257 state.clone(),
3258 add_server_response_headers,
3259 ))
3260 .with_state(state);
3261
3262 let req = HttpRequest::builder()
3263 .method("HEAD")
3264 .uri("/home/hello")
3265 .body(Body::empty())
3266 .unwrap();
3267 let resp = app.oneshot(req).await.unwrap();
3268
3269 assert_eq!(resp.status(), StatusCode::OK);
3270 assert_eq!(
3271 resp.headers()
3272 .get(header::CONTENT_LENGTH)
3273 .and_then(|v| v.to_str().ok()),
3274 Some("11"),
3275 );
3276 assert!(resp.headers().get("x-request-id").is_some());
3277 assert_eq!(response_text(resp).await, "");
3279
3280 let _ = std::fs::remove_dir_all(dir);
3281 }
3282
3283 #[tokio::test]
3287 async fn pipeline_get_if_none_match_returns_304() {
3288 let (core, dir) = test_core("pipeline-304");
3289 core.write_world("home/cached", b"cached body", "text/plain", &[])
3290 .unwrap();
3291 let core = Arc::new(core);
3292 let state = server_state_for_tests(core.clone());
3293
3294 let first = pipeline::run(
3296 Method::GET,
3297 "/home/cached".to_string(),
3298 HeaderMap::new(),
3299 Bytes::new(),
3300 &state,
3301 100,
3302 )
3303 .await;
3304 let etag = first
3305 .headers()
3306 .get(header::ETAG)
3307 .and_then(|v| v.to_str().ok())
3308 .expect("first GET must return an ETag header")
3309 .to_string();
3310
3311 let mut headers = HeaderMap::new();
3312 headers.insert(header::IF_NONE_MATCH, HeaderValue::from_str(&etag).unwrap());
3313 let resp = pipeline::run(
3314 Method::GET,
3315 "/home/cached".to_string(),
3316 headers,
3317 Bytes::new(),
3318 &state,
3319 101,
3320 )
3321 .await;
3322
3323 assert_eq!(resp.status(), StatusCode::NOT_MODIFIED);
3324 assert_eq!(response_text(resp).await, "");
3326
3327 let _ = std::fs::remove_dir_all(dir);
3328 }
3329
3330 #[tokio::test]
3341 async fn pipeline_get_out_of_range_returns_416_with_reason() {
3342 let (core, dir) = test_core("pipeline-416");
3343 core.write_world("home/short", b"abc", "text/plain", &[])
3344 .unwrap();
3345 let core = Arc::new(core);
3346 let state = server_state_for_tests(core.clone());
3347
3348 let mut headers = HeaderMap::new();
3350 headers.insert(header::RANGE, HeaderValue::from_static("bytes=999-"));
3351 let resp = pipeline::run(
3352 Method::GET,
3353 "/home/short".to_string(),
3354 headers,
3355 Bytes::new(),
3356 &state,
3357 200,
3358 )
3359 .await;
3360 assert_eq!(resp.status(), StatusCode::RANGE_NOT_SATISFIABLE);
3361
3362 let mut headers2 = HeaderMap::new();
3365 headers2.insert(header::RANGE, HeaderValue::from_static("bytes=999-"));
3366 let phase = crate::handler::execute(
3367 Verb::Get,
3368 headers2,
3369 Bytes::new(),
3370 auth::Tier::Anon,
3371 world_path("home/short"),
3372 &state,
3373 &TraceCtx::disabled(),
3374 )
3375 .await;
3376 match phase {
3377 Phase::Error {
3378 reason: ErrorReason::RangeNotSatisfiable,
3379 resp,
3380 } => {
3381 assert_eq!(resp.status(), StatusCode::RANGE_NOT_SATISFIABLE);
3382 }
3383 _ => panic!("expected Phase::Error{{RangeNotSatisfiable}}"),
3384 }
3385
3386 let _ = std::fs::remove_dir_all(dir);
3387 }
3388
3389 #[tokio::test]
3390 async fn pipeline_get_range_returns_206_with_chunk() {
3391 let (core, dir) = test_core("pipeline-get-range");
3392 core.write_world("home/range", b"abcdef", "text/plain", &[])
3393 .unwrap();
3394 let core = Arc::new(core);
3395 let state = server_state_for_tests(core.clone());
3396
3397 let mut headers = HeaderMap::new();
3398 headers.insert(header::RANGE, HeaderValue::from_static("bytes=1-3"));
3399
3400 let resp = pipeline::run(
3401 Method::GET,
3402 "/home/range".to_string(),
3403 headers,
3404 Bytes::new(),
3405 &state,
3406 47,
3407 )
3408 .await;
3409
3410 assert_eq!(resp.status(), StatusCode::PARTIAL_CONTENT);
3411 assert_eq!(
3412 resp.headers()
3413 .get(header::CONTENT_RANGE)
3414 .and_then(|v| v.to_str().ok()),
3415 Some("bytes 1-3/6")
3416 );
3417 assert_eq!(response_text(resp).await, "bcd");
3418
3419 let _ = std::fs::remove_dir_all(dir);
3420 }
3421
3422 fn test_core(label: &str) -> (Core, PathBuf) {
3423 let mut dir = std::env::temp_dir();
3424 let nanos = std::time::SystemTime::now()
3425 .duration_since(std::time::UNIX_EPOCH)
3426 .unwrap()
3427 .as_nanos();
3428 dir.push(format!(
3429 "elastik-core-{label}-{}-{nanos}",
3430 std::process::id()
3431 ));
3432 std::fs::create_dir_all(&dir).unwrap();
3433 (
3434 {
3435 let (events, _) = broadcast::channel(16);
3436 Core {
3437 data: dir.clone(),
3438 tokens: auth::Tokens {
3439 read: None,
3440 write: None,
3441 approve: None,
3442 },
3443 hmac_key: b"test-key".to_vec(),
3444 mem: Arc::new(store::MemoryStore::new()),
3445 max_world_bytes: DEFAULT_MAX_WORLD_BYTES,
3446 max_memory_bytes: DEFAULT_MAX_MEMORY_BYTES,
3447 max_storage_bytes: None,
3448 storage_body_bytes: Arc::new(AtomicUsize::new(0)),
3449 durable_world_count: Arc::new(AtomicUsize::new(0)),
3450 delete_ledger_created: Arc::new(AtomicBool::new(false)),
3451 events,
3452 listen_slots: Arc::new(Semaphore::new(DEFAULT_MAX_LISTEN_CONNECTIONS)),
3453 listen_replay_max: DEFAULT_LISTEN_REPLAY_MAX,
3454 event_log: Arc::new(StdMutex::new(VecDeque::with_capacity(
3455 DEFAULT_LISTEN_REPLAY_MAX,
3456 ))),
3457 shutdown: watch::channel(false).1,
3458 next_event: crate::state::new_event_counter(),
3459 world_locks: Arc::new(DashMap::new()),
3460 ledger: Arc::new(crate::ledger::LedgerWriter::new()),
3461 read_cache: Arc::new(crate::read_cache::ReadCache::new(
3462 crate::read_cache::DEFAULT_READ_CACHE_MAX_ENTRIES,
3463 )),
3464 }
3465 },
3466 dir,
3467 )
3468 }
3469
3470 #[tokio::test]
3471 async fn write_permit_is_bound_to_one_world() {
3472 struct NoopTrace;
3473 impl crate::world_ops::WriteTraceHooks for NoopTrace {}
3474
3475 let (core, dir) = test_core("permit-bound");
3476 let world = world_path("home/permit-a");
3477 let permit = crate::world_ops::authorize_write(&world, auth::Tier::Write)
3478 .expect("write token tier should authorize home writes");
3479 let req = crate::world_ops::ReplaceRequest {
3480 body: Bytes::from_static(b"right-door"),
3481 content_type: "text/plain; charset=utf-8".to_owned(),
3482 headers: Vec::new(),
3483 preconditions: et::Preconditions::default(),
3484 };
3485
3486 crate::world_ops::replace_write(&core, &permit, req, &NoopTrace)
3487 .await
3488 .expect("permit writes only its bound world");
3489
3490 assert_eq!(
3491 core.read_world("home/permit-a").unwrap().unwrap().body,
3492 b"right-door"
3493 );
3494 assert!(core.read_world("home/permit-b").unwrap().is_none());
3495
3496 let _ = std::fs::remove_dir_all(dir);
3497 }
3498
3499 #[test]
3500 fn write_permit_preserves_path_based_approve_gate() {
3501 assert!(matches!(
3502 crate::world_ops::authorize_write(&world_path("etc/config"), auth::Tier::Write),
3503 Err(crate::world_ops::WriteError::Auth(AuthGate::WriteApprove))
3504 ));
3505 assert!(
3506 crate::world_ops::authorize_write(&world_path("etc/config"), auth::Tier::Approve)
3507 .is_ok()
3508 );
3509 assert!(
3510 crate::world_ops::authorize_write(&world_path("home/config"), auth::Tier::Write)
3511 .is_ok()
3512 );
3513 }
3514}