Skip to main content

truss/adapters/server/
mod.rs

1mod auth;
2#[cfg(feature = "azure")]
3pub mod azure;
4mod cache;
5mod config;
6#[cfg(feature = "gcs")]
7pub mod gcs;
8mod http_parse;
9mod metrics;
10mod multipart;
11mod negotiate;
12mod remote;
13mod response;
14#[cfg(feature = "s3")]
15pub mod s3;
16
17#[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
18pub use config::StorageBackend;
19use config::StorageBackendLabel;
20pub use config::{DEFAULT_BIND_ADDR, DEFAULT_STORAGE_ROOT, LogHandler, ServerConfig};
21
22use auth::{
23    authorize_request, authorize_request_headers, authorize_signed_request,
24    canonical_query_without_signature, extend_transform_query, parse_optional_bool_query,
25    parse_optional_float_query, parse_optional_integer_query, parse_optional_u8_query,
26    parse_query_params, required_query_param, signed_source_query, url_authority,
27    validate_public_query_names,
28};
29use cache::{
30    CacheLookup, TransformCache, compute_cache_key, compute_watermark_identity,
31    try_versioned_cache_lookup,
32};
33use http_parse::{
34    HttpRequest, parse_named, parse_optional_named, read_request_body, read_request_headers,
35    request_has_json_content_type,
36};
37use metrics::{
38    CACHE_HITS_TOTAL, CACHE_MISSES_TOTAL, RouteMetric, record_http_metrics,
39    record_http_request_duration, record_storage_duration, record_transform_duration,
40    record_transform_error, record_watermark_transform, render_metrics_text, status_code,
41    storage_backend_index_from_config, uptime_seconds,
42};
43use multipart::{parse_multipart_boundary, parse_upload_request};
44use negotiate::{
45    CacheHitStatus, ImageResponsePolicy, PublicSourceKind, build_image_etag,
46    build_image_response_headers, if_none_match_matches, negotiate_output_format,
47};
48use remote::{read_remote_watermark_bytes, resolve_source_bytes};
49use response::{
50    HttpResponse, NOT_FOUND_BODY, bad_request_response, service_unavailable_response,
51    transform_error_response, unsupported_media_type_response, write_response,
52    write_response_compressed,
53};
54
55use crate::{
56    CropRegion, Fit, MediaType, Position, RawArtifact, Rgba8, Rotation, TransformOptions,
57    TransformRequest, WatermarkInput, sniff_artifact, transform_raster, transform_svg,
58};
59use hmac::{Hmac, Mac};
60use serde::Deserialize;
61use serde_json::json;
62use sha2::{Digest, Sha256};
63use std::collections::BTreeMap;
64use std::env;
65use std::io;
66use std::net::{TcpListener, TcpStream};
67use std::str::FromStr;
68use std::sync::Arc;
69use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, Ordering};
70use std::time::{Duration, Instant};
71use subtle::ConstantTimeEq;
72use url::Url;
73use uuid::Uuid;
74
75/// Writes a line to stderr using a raw file-descriptor/handle write, bypassing
76/// Rust's `std::io::Stderr` type whose internal `ReentrantLock` can interfere
77/// with `MutexGuard` drop ordering in Rust 2024 edition, breaking HTTP
78/// keep-alive.
79pub(crate) fn stderr_write(msg: &str) {
80    use std::io::Write;
81
82    let bytes = msg.as_bytes();
83    let mut buf = Vec::with_capacity(bytes.len() + 1);
84    buf.extend_from_slice(bytes);
85    buf.push(b'\n');
86
87    #[cfg(unix)]
88    {
89        use std::os::fd::FromRawFd;
90        // SAFETY: fd 2 (stderr) is always valid for the lifetime of the process.
91        let mut f = unsafe { std::fs::File::from_raw_fd(2) };
92        let _ = f.write_all(&buf);
93        // Do not drop `f` — that would close fd 2 (stderr).
94        std::mem::forget(f);
95    }
96
97    #[cfg(windows)]
98    {
99        use std::os::windows::io::FromRawHandle;
100
101        unsafe extern "system" {
102            fn GetStdHandle(nStdHandle: u32) -> *mut std::ffi::c_void;
103        }
104
105        const STD_ERROR_HANDLE: u32 = (-12_i32) as u32;
106        // SAFETY: GetStdHandle(STD_ERROR_HANDLE) returns the stderr handle
107        // which is always valid for the lifetime of the process.
108        let handle = unsafe { GetStdHandle(STD_ERROR_HANDLE) };
109        let mut f = unsafe { std::fs::File::from_raw_handle(handle) };
110        let _ = f.write_all(&buf);
111        // Do not drop `f` — that would close the stderr handle.
112        std::mem::forget(f);
113    }
114}
115
116const SOCKET_READ_TIMEOUT: Duration = Duration::from_secs(60);
117const SOCKET_WRITE_TIMEOUT: Duration = Duration::from_secs(60);
118/// Number of worker threads for handling incoming connections concurrently.
119const WORKER_THREADS: usize = 8;
120type HmacSha256 = Hmac<Sha256>;
121
122#[derive(Clone, Copy)]
123struct PublicCacheControl {
124    max_age: u32,
125    stale_while_revalidate: u32,
126}
127
128#[derive(Clone, Copy)]
129struct ImageResponseConfig {
130    disable_accept_negotiation: bool,
131    public_cache_control: PublicCacheControl,
132    transform_deadline: Duration,
133}
134
135/// RAII guard that holds a concurrency slot for an in-flight image transform.
136///
137/// The counter is incremented on successful acquisition and decremented when
138/// the guard is dropped, ensuring the slot is always released even if the
139/// caller returns early or panics.
140struct TransformSlot {
141    counter: Arc<AtomicU64>,
142}
143
144impl TransformSlot {
145    fn try_acquire(counter: &Arc<AtomicU64>, limit: u64) -> Option<Self> {
146        let prev = counter.fetch_add(1, Ordering::Relaxed);
147        if prev >= limit {
148            counter.fetch_sub(1, Ordering::Relaxed);
149            None
150        } else {
151            Some(Self {
152                counter: Arc::clone(counter),
153            })
154        }
155    }
156}
157
158impl Drop for TransformSlot {
159    fn drop(&mut self) {
160        self.counter.fetch_sub(1, Ordering::Relaxed);
161    }
162}
163
164/// Source selector used when generating a signed public transform URL.
165#[derive(Debug, Clone, PartialEq, Eq)]
166pub enum SignedUrlSource {
167    /// Generates a signed `GET /images/by-path` URL.
168    Path {
169        /// The storage-relative source path.
170        path: String,
171        /// An optional source version token.
172        version: Option<String>,
173    },
174    /// Generates a signed `GET /images/by-url` URL.
175    Url {
176        /// The remote source URL.
177        url: String,
178        /// An optional source version token.
179        version: Option<String>,
180    },
181}
182
183/// Builds a signed public transform URL for the server adapter.
184///
185/// The resulting URL targets either `GET /images/by-path` or `GET /images/by-url` depending on
186/// `source`. `base_url` must be an absolute `http` or `https` URL that points at the externally
187/// visible server origin. The helper applies the same canonical query and HMAC-SHA256 signature
188/// scheme that the server adapter verifies at request time.
189///
190/// The helper serializes only explicitly requested transform options and omits fields that would
191/// resolve to the documented defaults on the server side.
192///
193/// # Errors
194///
195/// Returns an error string when `base_url` is not an absolute `http` or `https` URL, when the
196/// visible authority cannot be determined, or when the HMAC state cannot be initialized.
197///
198/// # Examples
199///
200/// ```
201/// use truss::adapters::server::{sign_public_url, SignedUrlSource};
202/// use truss::{MediaType, TransformOptions};
203///
204/// let url = sign_public_url(
205///     "https://cdn.example.com",
206///     SignedUrlSource::Path {
207///         path: "/image.png".to_string(),
208///         version: None,
209///     },
210///     &TransformOptions {
211///         format: Some(MediaType::Jpeg),
212///         ..TransformOptions::default()
213///     },
214///     "public-dev",
215///     "secret-value",
216///     4_102_444_800,
217///     None,
218///     None,
219/// )
220/// .unwrap();
221///
222/// assert!(url.starts_with("https://cdn.example.com/images/by-path?"));
223/// assert!(url.contains("keyId=public-dev"));
224/// assert!(url.contains("signature="));
225/// ```
226/// Optional watermark parameters for signed URL generation.
227#[derive(Debug, Default)]
228pub struct SignedWatermarkParams {
229    pub url: String,
230    pub position: Option<String>,
231    pub opacity: Option<u8>,
232    pub margin: Option<u32>,
233}
234
235#[allow(clippy::too_many_arguments)]
236pub fn sign_public_url(
237    base_url: &str,
238    source: SignedUrlSource,
239    options: &TransformOptions,
240    key_id: &str,
241    secret: &str,
242    expires: u64,
243    watermark: Option<&SignedWatermarkParams>,
244    preset: Option<&str>,
245) -> Result<String, String> {
246    let base_url = Url::parse(base_url).map_err(|error| format!("base URL is invalid: {error}"))?;
247    match base_url.scheme() {
248        "http" | "https" => {}
249        _ => return Err("base URL must use the http or https scheme".to_string()),
250    }
251
252    let route_path = match source {
253        SignedUrlSource::Path { .. } => "/images/by-path",
254        SignedUrlSource::Url { .. } => "/images/by-url",
255    };
256    let mut endpoint = base_url
257        .join(route_path)
258        .map_err(|error| format!("failed to resolve the public endpoint URL: {error}"))?;
259    let authority = url_authority(&endpoint)?;
260    let mut query = signed_source_query(source);
261    if let Some(name) = preset {
262        query.insert("preset".to_string(), name.to_string());
263    }
264    extend_transform_query(&mut query, options);
265    if let Some(wm) = watermark {
266        query.insert("watermarkUrl".to_string(), wm.url.clone());
267        if let Some(ref pos) = wm.position {
268            query.insert("watermarkPosition".to_string(), pos.clone());
269        }
270        if let Some(opacity) = wm.opacity {
271            query.insert("watermarkOpacity".to_string(), opacity.to_string());
272        }
273        if let Some(margin) = wm.margin {
274            query.insert("watermarkMargin".to_string(), margin.to_string());
275        }
276    }
277    query.insert("keyId".to_string(), key_id.to_string());
278    query.insert("expires".to_string(), expires.to_string());
279
280    let canonical = format!(
281        "GET\n{}\n{}\n{}",
282        authority,
283        endpoint.path(),
284        canonical_query_without_signature(&query)
285    );
286    let mut mac = HmacSha256::new_from_slice(secret.as_bytes())
287        .map_err(|error| format!("failed to initialize signed URL HMAC: {error}"))?;
288    mac.update(canonical.as_bytes());
289    query.insert(
290        "signature".to_string(),
291        hex::encode(mac.finalize().into_bytes()),
292    );
293
294    let mut serializer = url::form_urlencoded::Serializer::new(String::new());
295    for (name, value) in query {
296        serializer.append_pair(&name, &value);
297    }
298    endpoint.set_query(Some(&serializer.finish()));
299    Ok(endpoint.into())
300}
301
302/// Returns the bind address for the HTTP server adapter.
303///
304/// The adapter reads `TRUSS_BIND_ADDR` when it is present. Otherwise it falls back to
305/// [`DEFAULT_BIND_ADDR`].
306pub fn bind_addr() -> String {
307    env::var("TRUSS_BIND_ADDR").unwrap_or_else(|_| DEFAULT_BIND_ADDR.to_string())
308}
309
310/// Serves requests until the listener stops producing connections.
311///
312/// This helper loads [`ServerConfig`] from the process environment and then delegates to
313/// [`serve_with_config`]. Health endpoints remain available even when the private API is not
314/// configured, but authenticated transform requests will return `503 Service Unavailable`
315/// unless `TRUSS_BEARER_TOKEN` is set.
316///
317/// # Errors
318///
319/// Returns an [`io::Error`] when the storage root cannot be resolved, when accepting the next
320/// connection fails, or when a response cannot be written to the socket.
321pub fn serve(listener: TcpListener) -> io::Result<()> {
322    let config = ServerConfig::from_env()?;
323
324    // Fail fast: verify the storage backend is reachable before accepting
325    // connections so that configuration errors are surfaced immediately.
326    for (ok, name) in storage_health_check(&config) {
327        if !ok {
328            return Err(io::Error::new(
329                io::ErrorKind::ConnectionRefused,
330                format!(
331                    "storage connectivity check failed for `{name}` — verify the backend \
332                     endpoint, credentials, and container/bucket configuration"
333                ),
334            ));
335        }
336    }
337
338    serve_with_config(listener, config)
339}
340
341/// Serves requests with an explicit server configuration.
342///
343/// This is the adapter entry point for tests and embedding scenarios that want deterministic
344/// configuration instead of environment-variable lookup.
345///
346/// # Errors
347///
348/// Returns an [`io::Error`] when accepting the next connection fails or when a response cannot
349/// be written to the socket.
350pub fn serve_with_config(listener: TcpListener, config: ServerConfig) -> io::Result<()> {
351    let config = Arc::new(config);
352    let (sender, receiver) = std::sync::mpsc::channel::<TcpStream>();
353
354    // Spawn a pool of worker threads sized to the configured concurrency limit
355    // (with a minimum of WORKER_THREADS to leave headroom for non-transform
356    // requests such as health checks and metrics).  Each thread pulls connections
357    // from the shared channel and handles them independently, so a slow request
358    // no longer blocks all other clients.
359    let receiver = Arc::new(std::sync::Mutex::new(receiver));
360    let pool_size = (config.max_concurrent_transforms as usize).max(WORKER_THREADS);
361    let mut workers = Vec::with_capacity(pool_size);
362    for _ in 0..pool_size {
363        let rx = Arc::clone(&receiver);
364        let cfg = Arc::clone(&config);
365        workers.push(std::thread::spawn(move || {
366            loop {
367                let stream = {
368                    let guard = rx.lock().expect("worker lock poisoned");
369                    match guard.recv() {
370                        Ok(stream) => stream,
371                        Err(_) => break,
372                    }
373                }; // MutexGuard dropped here — before handle_stream runs.
374                if let Err(err) = handle_stream(stream, &cfg) {
375                    cfg.log(&format!("failed to handle connection: {err}"));
376                }
377            }
378        }));
379    }
380
381    // Install signal handler for graceful shutdown.  The handler sets the
382    // shared `draining` flag (so /health/ready returns 503 immediately) and
383    // writes a byte to a self-pipe to wake the accept loop.
384    let (shutdown_read_fd, shutdown_write_fd) = create_shutdown_pipe()?;
385    install_signal_handler(Arc::clone(&config.draining), shutdown_write_fd);
386
387    // Set the listener to non-blocking so we can multiplex between incoming
388    // connections and the shutdown pipe.
389    listener.set_nonblocking(true)?;
390
391    loop {
392        // Wait for activity on the listener or shutdown pipe. On Unix we use
393        // poll(2) to block efficiently; on Windows we fall back to polling the
394        // draining flag with a short sleep.
395        wait_for_accept_or_shutdown(&listener, shutdown_read_fd, &config.draining);
396
397        // Check the shutdown pipe first.
398        if poll_shutdown_pipe(shutdown_read_fd) {
399            break;
400        }
401
402        // Also check the draining flag directly (needed on Windows where the
403        // shutdown pipe is not available).
404        if config.draining.load(Ordering::SeqCst) {
405            break;
406        }
407
408        match listener.accept() {
409            Ok((stream, _addr)) => {
410                // Accepted connections are always blocking for the workers.
411                let _ = stream.set_nonblocking(false);
412                if sender.send(stream).is_err() {
413                    break;
414                }
415            }
416            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
417                // Spurious wakeup — retry.
418            }
419            Err(err) => return Err(err),
420        }
421    }
422
423    // --- Drain phase ---
424    let drain_secs = config.shutdown_drain_secs;
425    config.log(&format!(
426        "shutdown: drain started, waiting {drain_secs}s for load balancers"
427    ));
428    if drain_secs > 0 {
429        std::thread::sleep(Duration::from_secs(drain_secs));
430    }
431    config.log("shutdown: drain complete, closing listener");
432
433    // Stop dispatching new connections to workers.
434    drop(sender);
435    // Worker drain deadline: 15s so that total shutdown (drain + worker drain)
436    // fits within Kubernetes default terminationGracePeriodSeconds of 30s.
437    let deadline = std::time::Instant::now() + Duration::from_secs(15);
438    for worker in workers {
439        let remaining = deadline.saturating_duration_since(std::time::Instant::now());
440        if remaining.is_zero() {
441            stderr_write("shutdown: timed out waiting for worker threads");
442            break;
443        }
444        // Park the main thread until the worker finishes or the deadline
445        // elapses. We cannot interrupt a blocked worker, but the socket
446        // read/write timeouts ensure workers do not block forever.
447        let worker_done =
448            std::sync::Arc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
449        let wd = std::sync::Arc::clone(&worker_done);
450        std::thread::spawn(move || {
451            let _ = worker.join();
452            let (lock, cvar) = &*wd;
453            *lock.lock().expect("shutdown notify lock") = true;
454            cvar.notify_one();
455        });
456        let (lock, cvar) = &*worker_done;
457        let mut done = lock.lock().expect("shutdown wait lock");
458        while !*done {
459            let (guard, timeout) = cvar
460                .wait_timeout(done, remaining)
461                .expect("shutdown condvar wait");
462            done = guard;
463            if timeout.timed_out() {
464                stderr_write("shutdown: timed out waiting for a worker thread");
465                break;
466            }
467        }
468    }
469
470    config.log("shutdown: complete");
471    close_shutdown_pipe(shutdown_read_fd, shutdown_write_fd);
472    Ok(())
473}
474
475// ---------------------------------------------------------------------------
476// Shutdown pipe helpers — a minimal self-pipe for waking the accept loop from
477// a signal handler without requiring async I/O.
478// ---------------------------------------------------------------------------
479
480#[cfg(unix)]
481fn create_shutdown_pipe() -> io::Result<(i32, i32)> {
482    let mut fds = [0i32; 2];
483    if unsafe { libc::pipe(fds.as_mut_ptr()) } != 0 {
484        return Err(io::Error::last_os_error());
485    }
486    // Make both ends non-blocking: the read end so `poll_shutdown_pipe` never
487    // stalls, and the write end so the signal handler never blocks.
488    unsafe {
489        libc::fcntl(fds[0], libc::F_SETFL, libc::O_NONBLOCK);
490        libc::fcntl(fds[1], libc::F_SETFL, libc::O_NONBLOCK);
491    }
492    Ok((fds[0], fds[1]))
493}
494
495#[cfg(windows)]
496fn create_shutdown_pipe() -> io::Result<(i32, i32)> {
497    // On Windows we fall back to a polling approach using the draining flag.
498    Ok((-1, -1))
499}
500
501#[cfg(unix)]
502fn poll_shutdown_pipe(read_fd: i32) -> bool {
503    let mut buf = [0u8; 1];
504    let n = unsafe { libc::read(read_fd, buf.as_mut_ptr().cast(), 1) };
505    n > 0
506}
507
508#[cfg(windows)]
509fn poll_shutdown_pipe(_read_fd: i32) -> bool {
510    false
511}
512
513/// Block until the listener socket or the shutdown pipe has data ready.
514/// On Unix this uses `poll(2)` for zero-CPU-cost waiting; on Windows it falls
515/// back to a short sleep since the shutdown pipe is not available.
516#[cfg(unix)]
517fn wait_for_accept_or_shutdown(
518    listener: &std::net::TcpListener,
519    shutdown_read_fd: i32,
520    _draining: &AtomicBool,
521) {
522    use std::os::unix::io::AsRawFd;
523    let listener_fd = listener.as_raw_fd();
524    let mut fds = [
525        libc::pollfd {
526            fd: listener_fd,
527            events: libc::POLLIN,
528            revents: 0,
529        },
530        libc::pollfd {
531            fd: shutdown_read_fd,
532            events: libc::POLLIN,
533            revents: 0,
534        },
535    ];
536    // Block indefinitely (-1 timeout). Signal delivery will interrupt with
537    // EINTR, which is fine — we just re-check the shutdown conditions.
538    unsafe { libc::poll(fds.as_mut_ptr(), 2, -1) };
539}
540
541#[cfg(windows)]
542fn wait_for_accept_or_shutdown(
543    _listener: &std::net::TcpListener,
544    _shutdown_read_fd: i32,
545    draining: &AtomicBool,
546) {
547    // On Windows, poll(2) is not available for the listener socket. Sleep
548    // briefly and let the caller check the draining flag.
549    if !draining.load(Ordering::SeqCst) {
550        std::thread::sleep(Duration::from_millis(10));
551    }
552}
553
554#[cfg(unix)]
555fn close_shutdown_pipe(read_fd: i32, write_fd: i32) {
556    unsafe {
557        libc::close(read_fd);
558        libc::close(write_fd);
559    }
560}
561
562#[cfg(windows)]
563fn close_shutdown_pipe(_read_fd: i32, _write_fd: i32) {}
564
565/// Global write-end of the shutdown pipe, written to from the signal handler.
566static SHUTDOWN_PIPE_WR: AtomicI32 = AtomicI32::new(-1);
567/// Global draining flag set by the signal handler.
568static GLOBAL_DRAINING: std::sync::atomic::AtomicPtr<AtomicBool> =
569    std::sync::atomic::AtomicPtr::new(std::ptr::null_mut());
570
571#[cfg(unix)]
572fn install_signal_handler(draining: Arc<AtomicBool>, write_fd: i32) {
573    // Store the write fd and draining pointer in globals accessible from the
574    // async-signal-safe handler.
575    SHUTDOWN_PIPE_WR.store(write_fd, Ordering::SeqCst);
576    // SAFETY: `Arc::into_raw` leaks intentionally — the pointer remains valid
577    // for the process lifetime.  The signal handler only calls `AtomicBool::store`
578    // and `libc::write`, both of which are async-signal-safe.
579    let ptr = Arc::into_raw(draining).cast_mut();
580    GLOBAL_DRAINING.store(ptr, Ordering::SeqCst);
581
582    // Use sigaction instead of signal to avoid SysV semantics where the handler
583    // is reset to SIG_DFL after the first invocation. SA_RESTART ensures that
584    // interrupted syscalls are automatically restarted.
585    unsafe {
586        let mut sa: libc::sigaction = std::mem::zeroed();
587        sa.sa_sigaction = signal_handler as *const () as libc::sighandler_t;
588        libc::sigemptyset(&mut sa.sa_mask);
589        sa.sa_flags = libc::SA_RESTART;
590        libc::sigaction(libc::SIGTERM, &sa, std::ptr::null_mut());
591        libc::sigaction(libc::SIGINT, &sa, std::ptr::null_mut());
592    }
593}
594
595#[cfg(unix)]
596extern "C" fn signal_handler(_sig: libc::c_int) {
597    // Set the draining flag — async-signal-safe (atomic store).
598    let ptr = GLOBAL_DRAINING.load(Ordering::SeqCst);
599    if !ptr.is_null() {
600        unsafe { (*ptr).store(true, Ordering::SeqCst) };
601    }
602    // Wake the accept loop by writing to the self-pipe.
603    let fd = SHUTDOWN_PIPE_WR.load(Ordering::SeqCst);
604    if fd >= 0 {
605        let byte: u8 = 1;
606        unsafe { libc::write(fd, (&byte as *const u8).cast(), 1) };
607    }
608}
609
610#[cfg(windows)]
611fn install_signal_handler(draining: Arc<AtomicBool>, _write_fd: i32) {
612    // Store the draining pointer in the global so the signal handler can set it.
613    let ptr = Arc::into_raw(draining).cast_mut();
614    GLOBAL_DRAINING.store(ptr, Ordering::SeqCst);
615
616    // On Windows, register a SIGINT handler (Ctrl+C) via the C runtime.
617    // The accept loop checks `draining` in the WouldBlock branch.
618    unsafe {
619        libc::signal(libc::SIGINT, windows_signal_handler as libc::sighandler_t);
620    }
621}
622
623#[cfg(windows)]
624extern "C" fn windows_signal_handler(_sig: libc::c_int) {
625    let ptr = GLOBAL_DRAINING.load(Ordering::SeqCst);
626    if !ptr.is_null() {
627        unsafe { (*ptr).store(true, Ordering::SeqCst) };
628    }
629    // Re-register the handler since Windows resets to SIG_DFL after each signal.
630    unsafe {
631        libc::signal(libc::SIGINT, windows_signal_handler as libc::sighandler_t);
632    }
633}
634
635/// Serves exactly one request using configuration loaded from the environment.
636///
637/// This helper is primarily useful in tests that want to drive the server over a real TCP
638/// socket but do not need a long-running loop.
639///
640/// # Errors
641///
642/// Returns an [`io::Error`] when the storage root cannot be resolved, when accepting the next
643/// connection fails, or when a response cannot be written to the socket.
644pub fn serve_once(listener: TcpListener) -> io::Result<()> {
645    let config = ServerConfig::from_env()?;
646    serve_once_with_config(listener, config)
647}
648
649/// Serves exactly one request with an explicit server configuration.
650///
651/// # Errors
652///
653/// Returns an [`io::Error`] when accepting the next connection fails or when a response cannot
654/// be written to the socket.
655pub fn serve_once_with_config(listener: TcpListener, config: ServerConfig) -> io::Result<()> {
656    let (stream, _) = listener.accept()?;
657    handle_stream(stream, &config)
658}
659
660#[derive(Debug, Deserialize)]
661#[serde(deny_unknown_fields)]
662struct TransformImageRequestPayload {
663    source: TransformSourcePayload,
664    #[serde(default)]
665    options: TransformOptionsPayload,
666    #[serde(default)]
667    watermark: Option<WatermarkPayload>,
668}
669
670#[derive(Debug, Deserialize)]
671#[serde(tag = "kind", rename_all = "lowercase")]
672enum TransformSourcePayload {
673    Path {
674        path: String,
675        version: Option<String>,
676    },
677    Url {
678        url: String,
679        version: Option<String>,
680    },
681    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
682    Storage {
683        bucket: Option<String>,
684        key: String,
685        version: Option<String>,
686    },
687}
688
689impl TransformSourcePayload {
690    /// Computes a stable source hash from the reference and version, avoiding the
691    /// need to read the full source bytes when a version tag is present. Returns
692    /// `None` when no version is available, in which case the caller must fall back
693    /// to the content-hash approach.
694    /// Computes a stable source hash that includes the instance configuration
695    /// boundaries (storage root, allow_insecure_url_sources) so that cache entries
696    /// cannot be reused across instances with different security settings sharing
697    /// the same cache directory.
698    fn versioned_source_hash(&self, config: &ServerConfig) -> Option<String> {
699        let (kind, reference, version): (&str, std::borrow::Cow<'_, str>, Option<&str>) = match self
700        {
701            Self::Path { path, version } => ("path", path.as_str().into(), version.as_deref()),
702            Self::Url { url, version } => ("url", url.as_str().into(), version.as_deref()),
703            #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
704            Self::Storage {
705                bucket,
706                key,
707                version,
708            } => {
709                let (scheme, effective_bucket) =
710                    storage_scheme_and_bucket(bucket.as_deref(), config);
711                let effective_bucket = effective_bucket?;
712                (
713                    "storage",
714                    format!("{scheme}://{effective_bucket}/{key}").into(),
715                    version.as_deref(),
716                )
717            }
718        };
719        let version = version?;
720        // Use newline separators so that values containing colons cannot collide
721        // with different (reference, version) pairs. Include configuration boundaries
722        // to prevent cross-instance cache poisoning.
723        let mut id = String::new();
724        id.push_str(kind);
725        id.push('\n');
726        id.push_str(&reference);
727        id.push('\n');
728        id.push_str(version);
729        id.push('\n');
730        id.push_str(config.storage_root.to_string_lossy().as_ref());
731        id.push('\n');
732        id.push_str(if config.allow_insecure_url_sources {
733            "insecure"
734        } else {
735            "strict"
736        });
737        #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
738        {
739            id.push('\n');
740            id.push_str(storage_backend_label(config));
741            #[cfg(feature = "s3")]
742            if let Some(ref ctx) = config.s3_context
743                && let Some(ref endpoint) = ctx.endpoint_url
744            {
745                id.push('\n');
746                id.push_str(endpoint);
747            }
748            #[cfg(feature = "gcs")]
749            if let Some(ref ctx) = config.gcs_context
750                && let Some(ref endpoint) = ctx.endpoint_url
751            {
752                id.push('\n');
753                id.push_str(endpoint);
754            }
755            #[cfg(feature = "azure")]
756            if let Some(ref ctx) = config.azure_context {
757                id.push('\n');
758                id.push_str(&ctx.endpoint_url);
759            }
760        }
761        Some(hex::encode(Sha256::digest(id.as_bytes())))
762    }
763
764    /// Returns the storage backend label for metrics based on the source kind,
765    /// rather than the server config default.  Path → Filesystem, Storage →
766    /// whatever the config backend is, Url → None (no storage backend).
767    fn metrics_backend_label(&self, _config: &ServerConfig) -> Option<StorageBackendLabel> {
768        match self {
769            Self::Path { .. } => Some(StorageBackendLabel::Filesystem),
770            Self::Url { .. } => None,
771            #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
772            Self::Storage { .. } => Some(_config.storage_backend_label()),
773        }
774    }
775}
776
777#[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
778fn storage_scheme_and_bucket<'a>(
779    explicit_bucket: Option<&'a str>,
780    config: &'a ServerConfig,
781) -> (&'static str, Option<&'a str>) {
782    match config.storage_backend {
783        #[cfg(feature = "s3")]
784        StorageBackend::S3 => {
785            let bucket = explicit_bucket.or(config
786                .s3_context
787                .as_ref()
788                .map(|ctx| ctx.default_bucket.as_str()));
789            ("s3", bucket)
790        }
791        #[cfg(feature = "gcs")]
792        StorageBackend::Gcs => {
793            let bucket = explicit_bucket.or(config
794                .gcs_context
795                .as_ref()
796                .map(|ctx| ctx.default_bucket.as_str()));
797            ("gcs", bucket)
798        }
799        StorageBackend::Filesystem => ("fs", explicit_bucket),
800        #[cfg(feature = "azure")]
801        StorageBackend::Azure => {
802            let bucket = explicit_bucket.or(config
803                .azure_context
804                .as_ref()
805                .map(|ctx| ctx.default_container.as_str()));
806            ("azure", bucket)
807        }
808    }
809}
810
811#[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
812fn is_object_storage_backend(config: &ServerConfig) -> bool {
813    match config.storage_backend {
814        StorageBackend::Filesystem => false,
815        #[cfg(feature = "s3")]
816        StorageBackend::S3 => true,
817        #[cfg(feature = "gcs")]
818        StorageBackend::Gcs => true,
819        #[cfg(feature = "azure")]
820        StorageBackend::Azure => true,
821    }
822}
823
824#[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
825fn storage_backend_label(config: &ServerConfig) -> &'static str {
826    match config.storage_backend {
827        StorageBackend::Filesystem => "fs-backend",
828        #[cfg(feature = "s3")]
829        StorageBackend::S3 => "s3-backend",
830        #[cfg(feature = "gcs")]
831        StorageBackend::Gcs => "gcs-backend",
832        #[cfg(feature = "azure")]
833        StorageBackend::Azure => "azure-backend",
834    }
835}
836
837#[derive(Clone, Debug, Default, Deserialize, PartialEq)]
838#[serde(default, rename_all = "camelCase", deny_unknown_fields)]
839pub struct TransformOptionsPayload {
840    pub width: Option<u32>,
841    pub height: Option<u32>,
842    pub fit: Option<String>,
843    pub position: Option<String>,
844    pub format: Option<String>,
845    pub quality: Option<u8>,
846    pub background: Option<String>,
847    pub rotate: Option<u16>,
848    pub auto_orient: Option<bool>,
849    pub strip_metadata: Option<bool>,
850    pub preserve_exif: Option<bool>,
851    pub crop: Option<String>,
852    pub blur: Option<f32>,
853    pub sharpen: Option<f32>,
854}
855
856impl TransformOptionsPayload {
857    /// Merges per-request overrides on top of preset defaults.
858    /// Each field in `overrides` takes precedence when set (`Some`).
859    fn with_overrides(self, overrides: &TransformOptionsPayload) -> Self {
860        Self {
861            width: overrides.width.or(self.width),
862            height: overrides.height.or(self.height),
863            fit: overrides.fit.clone().or(self.fit),
864            position: overrides.position.clone().or(self.position),
865            format: overrides.format.clone().or(self.format),
866            quality: overrides.quality.or(self.quality),
867            background: overrides.background.clone().or(self.background),
868            rotate: overrides.rotate.or(self.rotate),
869            auto_orient: overrides.auto_orient.or(self.auto_orient),
870            strip_metadata: overrides.strip_metadata.or(self.strip_metadata),
871            preserve_exif: overrides.preserve_exif.or(self.preserve_exif),
872            crop: overrides.crop.clone().or(self.crop),
873            blur: overrides.blur.or(self.blur),
874            sharpen: overrides.sharpen.or(self.sharpen),
875        }
876    }
877
878    fn into_options(self) -> Result<TransformOptions, HttpResponse> {
879        let defaults = TransformOptions::default();
880
881        Ok(TransformOptions {
882            width: self.width,
883            height: self.height,
884            fit: parse_optional_named(self.fit.as_deref(), "fit", Fit::from_str)?,
885            position: parse_optional_named(
886                self.position.as_deref(),
887                "position",
888                Position::from_str,
889            )?,
890            format: parse_optional_named(self.format.as_deref(), "format", MediaType::from_str)?,
891            quality: self.quality,
892            background: parse_optional_named(
893                self.background.as_deref(),
894                "background",
895                Rgba8::from_hex,
896            )?,
897            rotate: match self.rotate {
898                Some(value) => parse_named(&value.to_string(), "rotate", Rotation::from_str)?,
899                None => defaults.rotate,
900            },
901            auto_orient: self.auto_orient.unwrap_or(defaults.auto_orient),
902            strip_metadata: self.strip_metadata.unwrap_or(defaults.strip_metadata),
903            preserve_exif: self.preserve_exif.unwrap_or(defaults.preserve_exif),
904            crop: parse_optional_named(self.crop.as_deref(), "crop", CropRegion::from_str)?,
905            blur: self.blur,
906            sharpen: self.sharpen,
907            deadline: defaults.deadline,
908        })
909    }
910}
911
912/// Overall request deadline for outbound fetches (source + watermark combined).
913const REQUEST_DEADLINE_SECS: u64 = 60;
914
915const WATERMARK_DEFAULT_POSITION: Position = Position::BottomRight;
916const WATERMARK_DEFAULT_OPACITY: u8 = 50;
917const WATERMARK_DEFAULT_MARGIN: u32 = 10;
918const WATERMARK_MAX_MARGIN: u32 = 9999;
919
920#[derive(Debug, Default, Deserialize)]
921#[serde(default, rename_all = "camelCase", deny_unknown_fields)]
922struct WatermarkPayload {
923    url: Option<String>,
924    position: Option<String>,
925    opacity: Option<u8>,
926    margin: Option<u32>,
927}
928
929/// Validated watermark parameters ready for fetching. No network I/O performed.
930struct ValidatedWatermarkPayload {
931    url: String,
932    position: Position,
933    opacity: u8,
934    margin: u32,
935}
936
937impl ValidatedWatermarkPayload {
938    fn cache_identity(&self) -> String {
939        compute_watermark_identity(
940            &self.url,
941            self.position.as_name(),
942            self.opacity,
943            self.margin,
944        )
945    }
946}
947
948/// Validates watermark payload fields without performing network I/O.
949fn validate_watermark_payload(
950    payload: Option<&WatermarkPayload>,
951) -> Result<Option<ValidatedWatermarkPayload>, HttpResponse> {
952    let Some(wm) = payload else {
953        return Ok(None);
954    };
955    let url = wm.url.as_deref().filter(|u| !u.is_empty()).ok_or_else(|| {
956        bad_request_response("watermark.url is required when watermark is present")
957    })?;
958
959    let position = parse_optional_named(
960        wm.position.as_deref(),
961        "watermark.position",
962        Position::from_str,
963    )?
964    .unwrap_or(WATERMARK_DEFAULT_POSITION);
965
966    let opacity = wm.opacity.unwrap_or(WATERMARK_DEFAULT_OPACITY);
967    if opacity == 0 || opacity > 100 {
968        return Err(bad_request_response(
969            "watermark.opacity must be between 1 and 100",
970        ));
971    }
972    let margin = wm.margin.unwrap_or(WATERMARK_DEFAULT_MARGIN);
973    if margin > WATERMARK_MAX_MARGIN {
974        return Err(bad_request_response(
975            "watermark.margin must be at most 9999",
976        ));
977    }
978
979    Ok(Some(ValidatedWatermarkPayload {
980        url: url.to_string(),
981        position,
982        opacity,
983        margin,
984    }))
985}
986
987/// Fetches watermark image and builds WatermarkInput. Called after try_acquire.
988fn fetch_watermark(
989    validated: ValidatedWatermarkPayload,
990    config: &ServerConfig,
991    deadline: Option<Instant>,
992) -> Result<WatermarkInput, HttpResponse> {
993    let bytes = read_remote_watermark_bytes(&validated.url, config, deadline)?;
994    let artifact = sniff_artifact(RawArtifact::new(bytes, None))
995        .map_err(|error| bad_request_response(&format!("watermark image is invalid: {error}")))?;
996    if !artifact.media_type.is_raster() {
997        return Err(bad_request_response(
998            "watermark image must be a raster format (not SVG)",
999        ));
1000    }
1001    Ok(WatermarkInput {
1002        image: artifact,
1003        position: validated.position,
1004        opacity: validated.opacity,
1005        margin: validated.margin,
1006    })
1007}
1008
1009fn resolve_multipart_watermark(
1010    bytes: Vec<u8>,
1011    position: Option<String>,
1012    opacity: Option<u8>,
1013    margin: Option<u32>,
1014) -> Result<WatermarkInput, HttpResponse> {
1015    let artifact = sniff_artifact(RawArtifact::new(bytes, None))
1016        .map_err(|error| bad_request_response(&format!("watermark image is invalid: {error}")))?;
1017    if !artifact.media_type.is_raster() {
1018        return Err(bad_request_response(
1019            "watermark image must be a raster format (not SVG)",
1020        ));
1021    }
1022    let position = parse_optional_named(
1023        position.as_deref(),
1024        "watermark_position",
1025        Position::from_str,
1026    )?
1027    .unwrap_or(WATERMARK_DEFAULT_POSITION);
1028    let opacity = opacity.unwrap_or(WATERMARK_DEFAULT_OPACITY);
1029    if opacity == 0 || opacity > 100 {
1030        return Err(bad_request_response(
1031            "watermark_opacity must be between 1 and 100",
1032        ));
1033    }
1034    let margin = margin.unwrap_or(WATERMARK_DEFAULT_MARGIN);
1035    if margin > WATERMARK_MAX_MARGIN {
1036        return Err(bad_request_response(
1037            "watermark_margin must be at most 9999",
1038        ));
1039    }
1040    Ok(WatermarkInput {
1041        image: artifact,
1042        position,
1043        opacity,
1044        margin,
1045    })
1046}
1047
1048struct AccessLogEntry<'a> {
1049    request_id: &'a str,
1050    method: &'a str,
1051    path: &'a str,
1052    route: &'a str,
1053    status: &'a str,
1054    start: Instant,
1055    cache_status: Option<&'a str>,
1056    watermark: bool,
1057}
1058
1059/// Extracts the `X-Request-Id` header value from request headers.
1060/// Returns `None` if the header is absent, empty, or contains
1061/// characters unsafe for HTTP headers (CR, LF, NUL).
1062fn extract_request_id(headers: &[(String, String)]) -> Option<String> {
1063    headers.iter().find_map(|(name, value)| {
1064        if name != "x-request-id" || value.is_empty() {
1065            return None;
1066        }
1067        if value
1068            .bytes()
1069            .any(|b| b == b'\r' || b == b'\n' || b == b'\0')
1070        {
1071            return None;
1072        }
1073        Some(value.clone())
1074    })
1075}
1076
1077/// Classifies the `Cache-Status` response header as `"hit"` or `"miss"`.
1078/// Returns `None` when the header is absent.
1079fn extract_cache_status(headers: &[(String, String)]) -> Option<&'static str> {
1080    headers
1081        .iter()
1082        .find_map(|(name, value)| (name == "Cache-Status").then_some(value.as_str()))
1083        .map(|v| if v.contains("hit") { "hit" } else { "miss" })
1084}
1085
1086/// Extracts and removes the internal `X-Truss-Watermark` header, returning whether it was set.
1087fn extract_watermark_flag(headers: &mut Vec<(String, String)>) -> bool {
1088    let pos = headers
1089        .iter()
1090        .position(|(name, _)| name == "X-Truss-Watermark");
1091    if let Some(idx) = pos {
1092        headers.swap_remove(idx);
1093        true
1094    } else {
1095        false
1096    }
1097}
1098
1099fn emit_access_log(config: &ServerConfig, entry: &AccessLogEntry<'_>) {
1100    config.log(
1101        &json!({
1102            "kind": "access_log",
1103            "request_id": entry.request_id,
1104            "method": entry.method,
1105            "path": entry.path,
1106            "route": entry.route,
1107            "status": entry.status,
1108            "latency_ms": entry.start.elapsed().as_millis() as u64,
1109            "cache_status": entry.cache_status,
1110            "watermark": entry.watermark,
1111        })
1112        .to_string(),
1113    );
1114}
1115
1116fn handle_stream(mut stream: TcpStream, config: &ServerConfig) -> io::Result<()> {
1117    // Prevent slow or stalled clients from blocking the accept loop indefinitely.
1118    if let Err(err) = stream.set_read_timeout(Some(SOCKET_READ_TIMEOUT)) {
1119        config.log(&format!("failed to set socket read timeout: {err}"));
1120    }
1121    if let Err(err) = stream.set_write_timeout(Some(SOCKET_WRITE_TIMEOUT)) {
1122        config.log(&format!("failed to set socket write timeout: {err}"));
1123    }
1124
1125    let mut requests_served: u64 = 0;
1126
1127    loop {
1128        let partial = match read_request_headers(&mut stream, config.max_upload_bytes) {
1129            Ok(partial) => partial,
1130            Err(response) => {
1131                if requests_served > 0 {
1132                    return Ok(());
1133                }
1134                let _ = write_response(&mut stream, response, true);
1135                return Ok(());
1136            }
1137        };
1138
1139        // Start timing after headers are read so latency reflects server
1140        // processing time, not client send / socket-wait time.
1141        let start = Instant::now();
1142
1143        let request_id =
1144            extract_request_id(&partial.headers).unwrap_or_else(|| Uuid::new_v4().to_string());
1145
1146        let client_wants_close = partial
1147            .headers
1148            .iter()
1149            .any(|(name, value)| name == "connection" && value.eq_ignore_ascii_case("close"));
1150
1151        let accepts_gzip = config.enable_compression
1152            && http_parse::header_value(&partial.headers, "accept-encoding")
1153                .is_some_and(|v| http_parse::accepts_encoding(v, "gzip"));
1154
1155        let is_head = partial.method == "HEAD";
1156
1157        let requires_auth = matches!(
1158            (partial.method.as_str(), partial.path()),
1159            ("POST", "/images:transform") | ("POST", "/images")
1160        );
1161        if requires_auth
1162            && let Err(mut response) = authorize_request_headers(&partial.headers, config)
1163        {
1164            response
1165                .headers
1166                .push(("X-Request-Id".to_string(), request_id.clone()));
1167            record_http_metrics(RouteMetric::Unknown, response.status);
1168            let sc = status_code(response.status).unwrap_or("unknown");
1169            let method_log = partial.method.clone();
1170            let path_log = partial.path().to_string();
1171            let _ = write_response_compressed(
1172                &mut stream,
1173                response,
1174                true,
1175                accepts_gzip,
1176                config.compression_level,
1177            );
1178            record_http_request_duration(RouteMetric::Unknown, start);
1179            emit_access_log(
1180                config,
1181                &AccessLogEntry {
1182                    request_id: &request_id,
1183                    method: &method_log,
1184                    path: &path_log,
1185                    route: &path_log,
1186                    status: sc,
1187                    start,
1188                    cache_status: None,
1189                    watermark: false,
1190                },
1191            );
1192            return Ok(());
1193        }
1194
1195        // Early-reject /metrics requests before draining the body so that
1196        // unauthenticated or disabled-metrics requests do not force a body read.
1197        if matches!(
1198            (partial.method.as_str(), partial.path()),
1199            ("GET" | "HEAD", "/metrics")
1200        ) {
1201            let early_response = if config.disable_metrics {
1202                Some(HttpResponse::problem(
1203                    "404 Not Found",
1204                    NOT_FOUND_BODY.as_bytes().to_vec(),
1205                ))
1206            } else if let Some(expected) = &config.metrics_token {
1207                let provided = http_parse::header_value(&partial.headers, "authorization")
1208                    .and_then(|value| {
1209                        let (scheme, token) = value.split_once(|c: char| c.is_whitespace())?;
1210                        scheme.eq_ignore_ascii_case("Bearer").then(|| token.trim())
1211                    });
1212                match provided {
1213                    Some(token) if token.as_bytes().ct_eq(expected.as_bytes()).into() => None,
1214                    _ => Some(response::auth_required_response(
1215                        "metrics endpoint requires authentication",
1216                    )),
1217                }
1218            } else {
1219                None
1220            };
1221
1222            if let Some(mut response) = early_response {
1223                response
1224                    .headers
1225                    .push(("X-Request-Id".to_string(), request_id.clone()));
1226                record_http_metrics(RouteMetric::Metrics, response.status);
1227                let sc = status_code(response.status).unwrap_or("unknown");
1228                let method_log = partial.method.clone();
1229                let path_log = partial.path().to_string();
1230                let _ = write_response_compressed(
1231                    &mut stream,
1232                    response,
1233                    true,
1234                    accepts_gzip,
1235                    config.compression_level,
1236                );
1237                record_http_request_duration(RouteMetric::Metrics, start);
1238                emit_access_log(
1239                    config,
1240                    &AccessLogEntry {
1241                        request_id: &request_id,
1242                        method: &method_log,
1243                        path: &path_log,
1244                        route: "/metrics",
1245                        status: sc,
1246                        start,
1247                        cache_status: None,
1248                        watermark: false,
1249                    },
1250                );
1251                return Ok(());
1252            }
1253        }
1254
1255        // Clone method/path before `read_request_body` consumes `partial`.
1256        let method = partial.method.clone();
1257        let path = partial.path().to_string();
1258
1259        let request = match read_request_body(&mut stream, partial) {
1260            Ok(request) => request,
1261            Err(mut response) => {
1262                response
1263                    .headers
1264                    .push(("X-Request-Id".to_string(), request_id.clone()));
1265                record_http_metrics(RouteMetric::Unknown, response.status);
1266                let sc = status_code(response.status).unwrap_or("unknown");
1267                let _ = write_response_compressed(
1268                    &mut stream,
1269                    response,
1270                    true,
1271                    accepts_gzip,
1272                    config.compression_level,
1273                );
1274                record_http_request_duration(RouteMetric::Unknown, start);
1275                emit_access_log(
1276                    config,
1277                    &AccessLogEntry {
1278                        request_id: &request_id,
1279                        method: &method,
1280                        path: &path,
1281                        route: &path,
1282                        status: sc,
1283                        start,
1284                        cache_status: None,
1285                        watermark: false,
1286                    },
1287                );
1288                return Ok(());
1289            }
1290        };
1291        let route = classify_route(&request);
1292        let mut response = route_request(request, config);
1293        record_http_metrics(route, response.status);
1294
1295        response
1296            .headers
1297            .push(("X-Request-Id".to_string(), request_id.clone()));
1298
1299        let cache_status = extract_cache_status(&response.headers);
1300        let had_watermark = extract_watermark_flag(&mut response.headers);
1301
1302        let sc = status_code(response.status).unwrap_or("unknown");
1303
1304        if is_head {
1305            response.body = Vec::new();
1306        }
1307
1308        requests_served += 1;
1309        let close_after = client_wants_close || requests_served >= config.keep_alive_max_requests;
1310
1311        write_response_compressed(
1312            &mut stream,
1313            response,
1314            close_after,
1315            accepts_gzip,
1316            config.compression_level,
1317        )?;
1318        record_http_request_duration(route, start);
1319
1320        emit_access_log(
1321            config,
1322            &AccessLogEntry {
1323                request_id: &request_id,
1324                method: &method,
1325                path: &path,
1326                route: route.as_label(),
1327                status: sc,
1328                start,
1329                cache_status,
1330                watermark: had_watermark,
1331            },
1332        );
1333
1334        if close_after {
1335            return Ok(());
1336        }
1337    }
1338}
1339
1340fn route_request(request: HttpRequest, config: &ServerConfig) -> HttpResponse {
1341    let method = request.method.clone();
1342    let path = request.path().to_string();
1343
1344    match (method.as_str(), path.as_str()) {
1345        ("GET" | "HEAD", "/health") => handle_health(config),
1346        ("GET" | "HEAD", "/health/live") => handle_health_live(),
1347        ("GET" | "HEAD", "/health/ready") => handle_health_ready(config),
1348        ("GET" | "HEAD", "/images/by-path") => handle_public_path_request(request, config),
1349        ("GET" | "HEAD", "/images/by-url") => handle_public_url_request(request, config),
1350        ("POST", "/images:transform") => handle_transform_request(request, config),
1351        ("POST", "/images") => handle_upload_request(request, config),
1352        ("GET" | "HEAD", "/metrics") => handle_metrics_request(request, config),
1353        _ => HttpResponse::problem("404 Not Found", NOT_FOUND_BODY.as_bytes().to_vec()),
1354    }
1355}
1356
1357fn classify_route(request: &HttpRequest) -> RouteMetric {
1358    match (request.method.as_str(), request.path()) {
1359        ("GET" | "HEAD", "/health") => RouteMetric::Health,
1360        ("GET" | "HEAD", "/health/live") => RouteMetric::HealthLive,
1361        ("GET" | "HEAD", "/health/ready") => RouteMetric::HealthReady,
1362        ("GET" | "HEAD", "/images/by-path") => RouteMetric::PublicByPath,
1363        ("GET" | "HEAD", "/images/by-url") => RouteMetric::PublicByUrl,
1364        ("POST", "/images:transform") => RouteMetric::Transform,
1365        ("POST", "/images") => RouteMetric::Upload,
1366        ("GET" | "HEAD", "/metrics") => RouteMetric::Metrics,
1367        _ => RouteMetric::Unknown,
1368    }
1369}
1370
1371fn handle_transform_request(request: HttpRequest, config: &ServerConfig) -> HttpResponse {
1372    let request_deadline = Some(Instant::now() + Duration::from_secs(REQUEST_DEADLINE_SECS));
1373
1374    if let Err(response) = authorize_request(&request, config) {
1375        return response;
1376    }
1377
1378    if !request_has_json_content_type(&request) {
1379        return unsupported_media_type_response("content-type must be application/json");
1380    }
1381
1382    let payload: TransformImageRequestPayload = match serde_json::from_slice(&request.body) {
1383        Ok(payload) => payload,
1384        Err(error) => {
1385            return bad_request_response(&format!("request body must be valid JSON: {error}"));
1386        }
1387    };
1388    let options = match payload.options.into_options() {
1389        Ok(options) => options,
1390        Err(response) => return response,
1391    };
1392
1393    let versioned_hash = payload.source.versioned_source_hash(config);
1394    let validated_wm = match validate_watermark_payload(payload.watermark.as_ref()) {
1395        Ok(wm) => wm,
1396        Err(response) => return response,
1397    };
1398    let watermark_id = validated_wm.as_ref().map(|v| v.cache_identity());
1399
1400    if let Some(response) = try_versioned_cache_lookup(
1401        versioned_hash.as_deref(),
1402        &options,
1403        &request,
1404        ImageResponsePolicy::PrivateTransform,
1405        config,
1406        watermark_id.as_deref(),
1407    ) {
1408        return response;
1409    }
1410
1411    let storage_start = Instant::now();
1412    let backend_label = payload.source.metrics_backend_label(config);
1413    let backend_idx = backend_label.map(|l| storage_backend_index_from_config(&l));
1414    let source_bytes = match resolve_source_bytes(payload.source, config, request_deadline) {
1415        Ok(bytes) => {
1416            if let Some(idx) = backend_idx {
1417                record_storage_duration(idx, storage_start);
1418            }
1419            bytes
1420        }
1421        Err(response) => {
1422            if let Some(idx) = backend_idx {
1423                record_storage_duration(idx, storage_start);
1424            }
1425            return response;
1426        }
1427    };
1428    transform_source_bytes(
1429        source_bytes,
1430        options,
1431        versioned_hash.as_deref(),
1432        &request,
1433        ImageResponsePolicy::PrivateTransform,
1434        config,
1435        WatermarkSource::from_validated(validated_wm),
1436        watermark_id.as_deref(),
1437        request_deadline,
1438    )
1439}
1440
1441fn handle_public_path_request(request: HttpRequest, config: &ServerConfig) -> HttpResponse {
1442    handle_public_get_request(request, config, PublicSourceKind::Path)
1443}
1444
1445fn handle_public_url_request(request: HttpRequest, config: &ServerConfig) -> HttpResponse {
1446    handle_public_get_request(request, config, PublicSourceKind::Url)
1447}
1448
1449fn handle_public_get_request(
1450    request: HttpRequest,
1451    config: &ServerConfig,
1452    source_kind: PublicSourceKind,
1453) -> HttpResponse {
1454    let request_deadline = Some(Instant::now() + Duration::from_secs(REQUEST_DEADLINE_SECS));
1455    let query = match parse_query_params(&request) {
1456        Ok(query) => query,
1457        Err(response) => return response,
1458    };
1459    if let Err(response) = authorize_signed_request(&request, &query, config) {
1460        return response;
1461    }
1462    let (source, options, watermark_payload) =
1463        match parse_public_get_request(&query, source_kind, config) {
1464            Ok(parsed) => parsed,
1465            Err(response) => return response,
1466        };
1467
1468    let validated_wm = match validate_watermark_payload(watermark_payload.as_ref()) {
1469        Ok(wm) => wm,
1470        Err(response) => return response,
1471    };
1472    let watermark_id = validated_wm.as_ref().map(|v| v.cache_identity());
1473
1474    // When the storage backend is object storage (S3 or GCS), convert Path
1475    // sources to Storage sources so that the `path` query parameter is
1476    // resolved as an object key.
1477    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
1478    let source = if is_object_storage_backend(config) {
1479        match source {
1480            TransformSourcePayload::Path { path, version } => TransformSourcePayload::Storage {
1481                bucket: None,
1482                key: path.trim_start_matches('/').to_string(),
1483                version,
1484            },
1485            other => other,
1486        }
1487    } else {
1488        source
1489    };
1490
1491    let versioned_hash = source.versioned_source_hash(config);
1492    if let Some(response) = try_versioned_cache_lookup(
1493        versioned_hash.as_deref(),
1494        &options,
1495        &request,
1496        ImageResponsePolicy::PublicGet,
1497        config,
1498        watermark_id.as_deref(),
1499    ) {
1500        return response;
1501    }
1502
1503    let storage_start = Instant::now();
1504    let backend_label = source.metrics_backend_label(config);
1505    let backend_idx = backend_label.map(|l| storage_backend_index_from_config(&l));
1506    let source_bytes = match resolve_source_bytes(source, config, request_deadline) {
1507        Ok(bytes) => {
1508            if let Some(idx) = backend_idx {
1509                record_storage_duration(idx, storage_start);
1510            }
1511            bytes
1512        }
1513        Err(response) => {
1514            if let Some(idx) = backend_idx {
1515                record_storage_duration(idx, storage_start);
1516            }
1517            return response;
1518        }
1519    };
1520
1521    transform_source_bytes(
1522        source_bytes,
1523        options,
1524        versioned_hash.as_deref(),
1525        &request,
1526        ImageResponsePolicy::PublicGet,
1527        config,
1528        WatermarkSource::from_validated(validated_wm),
1529        watermark_id.as_deref(),
1530        request_deadline,
1531    )
1532}
1533
1534fn handle_upload_request(request: HttpRequest, config: &ServerConfig) -> HttpResponse {
1535    if let Err(response) = authorize_request(&request, config) {
1536        return response;
1537    }
1538
1539    let boundary = match parse_multipart_boundary(&request) {
1540        Ok(boundary) => boundary,
1541        Err(response) => return response,
1542    };
1543    let (file_bytes, options, watermark) = match parse_upload_request(&request.body, &boundary) {
1544        Ok(parts) => parts,
1545        Err(response) => return response,
1546    };
1547    let watermark_identity = watermark.as_ref().map(|wm| {
1548        let content_hash = hex::encode(sha2::Sha256::digest(&wm.image.bytes));
1549        cache::compute_watermark_content_identity(
1550            &content_hash,
1551            wm.position.as_name(),
1552            wm.opacity,
1553            wm.margin,
1554        )
1555    });
1556    transform_source_bytes(
1557        file_bytes,
1558        options,
1559        None,
1560        &request,
1561        ImageResponsePolicy::PrivateTransform,
1562        config,
1563        WatermarkSource::from_ready(watermark),
1564        watermark_identity.as_deref(),
1565        None,
1566    )
1567}
1568
1569/// Returns the number of free bytes on the filesystem containing `path`,
1570/// or `None` if the query fails.
1571#[cfg(target_os = "linux")]
1572fn disk_free_bytes(path: &std::path::Path) -> Option<u64> {
1573    use std::ffi::CString;
1574
1575    let c_path = CString::new(path.to_str()?).ok()?;
1576    let mut stat: libc::statvfs = unsafe { std::mem::zeroed() };
1577    let ret = unsafe { libc::statvfs(c_path.as_ptr(), &mut stat) };
1578    if ret == 0 {
1579        stat.f_bavail.checked_mul(stat.f_frsize)
1580    } else {
1581        None
1582    }
1583}
1584
1585#[cfg(not(target_os = "linux"))]
1586fn disk_free_bytes(_path: &std::path::Path) -> Option<u64> {
1587    None
1588}
1589
1590/// Returns the current process RSS (Resident Set Size) in bytes by reading
1591/// `/proc/self/status`. Returns `None` on non-Linux platforms or on read failure.
1592#[cfg(target_os = "linux")]
1593fn process_rss_bytes() -> Option<u64> {
1594    let status = std::fs::read_to_string("/proc/self/status").ok()?;
1595    for line in status.lines() {
1596        if let Some(value) = line.strip_prefix("VmRSS:") {
1597            let value = value.trim();
1598            // Format: "123456 kB"
1599            let kb_str = value.strip_suffix(" kB")?.trim();
1600            let kb: u64 = kb_str.parse().ok()?;
1601            return kb.checked_mul(1024);
1602        }
1603    }
1604    None
1605}
1606
1607#[cfg(not(target_os = "linux"))]
1608fn process_rss_bytes() -> Option<u64> {
1609    None
1610}
1611
1612/// Returns a minimal liveness response confirming the process is running.
1613fn handle_health_live() -> HttpResponse {
1614    let body = serde_json::to_vec(&json!({
1615        "status": "ok",
1616        "service": "truss",
1617        "version": env!("CARGO_PKG_VERSION"),
1618    }))
1619    .expect("serialize liveness");
1620    let mut body = body;
1621    body.push(b'\n');
1622    HttpResponse::json("200 OK", body)
1623}
1624
1625/// Returns a readiness response after checking that critical infrastructure
1626/// dependencies are available (storage root, cache root if configured, S3
1627/// reachability) and configurable resource thresholds.
1628fn handle_health_ready(config: &ServerConfig) -> HttpResponse {
1629    let mut checks: Vec<serde_json::Value> = Vec::new();
1630    let mut all_ok = true;
1631
1632    // When the server is draining (shutdown signal received), immediately
1633    // report not-ready so that load balancers stop routing traffic.
1634    // Skip expensive probes (storage, disk, memory) — they are irrelevant
1635    // once the process is shutting down.
1636    if config.draining.load(Ordering::Relaxed) {
1637        let mut body = serde_json::to_vec(&json!({
1638            "status": "fail",
1639            "checks": [{ "name": "draining", "status": "fail" }],
1640        }))
1641        .expect("serialize readiness");
1642        body.push(b'\n');
1643        return HttpResponse::problem("503 Service Unavailable", body);
1644    }
1645
1646    for (ok, name) in storage_health_check(config) {
1647        checks.push(json!({
1648            "name": name,
1649            "status": if ok { "ok" } else { "fail" },
1650        }));
1651        if !ok {
1652            all_ok = false;
1653        }
1654    }
1655
1656    if let Some(cache_root) = &config.cache_root {
1657        let cache_ok = cache_root.is_dir();
1658        checks.push(json!({
1659            "name": "cacheRoot",
1660            "status": if cache_ok { "ok" } else { "fail" },
1661        }));
1662        if !cache_ok {
1663            all_ok = false;
1664        }
1665    }
1666
1667    if let Some(cache_root) = &config.cache_root {
1668        let free = disk_free_bytes(cache_root);
1669        let threshold = config.health_cache_min_free_bytes;
1670        let disk_ok = match (free, threshold) {
1671            (Some(f), Some(min)) => f >= min,
1672            _ => true,
1673        };
1674        let mut check = json!({
1675            "name": "cacheDiskFree",
1676            "status": if disk_ok { "ok" } else { "fail" },
1677        });
1678        if let Some(f) = free {
1679            check["freeBytes"] = json!(f);
1680        }
1681        if let Some(min) = threshold {
1682            check["thresholdBytes"] = json!(min);
1683        }
1684        checks.push(check);
1685        if !disk_ok {
1686            all_ok = false;
1687        }
1688    }
1689
1690    // Concurrency utilization
1691    let in_flight = config.transforms_in_flight.load(Ordering::Relaxed);
1692    let overloaded = in_flight >= config.max_concurrent_transforms;
1693    checks.push(json!({
1694        "name": "transformCapacity",
1695        "status": if overloaded { "fail" } else { "ok" },
1696        "current": in_flight,
1697        "max": config.max_concurrent_transforms,
1698    }));
1699    if overloaded {
1700        all_ok = false;
1701    }
1702
1703    // Memory usage (Linux only) — skip entirely when RSS is unavailable
1704    if let Some(rss_bytes) = process_rss_bytes() {
1705        let threshold = config.health_max_memory_bytes;
1706        let mem_ok = threshold.is_none_or(|max| rss_bytes <= max);
1707        let mut check = json!({
1708            "name": "memoryUsage",
1709            "status": if mem_ok { "ok" } else { "fail" },
1710            "rssBytes": rss_bytes,
1711        });
1712        if let Some(max) = threshold {
1713            check["thresholdBytes"] = json!(max);
1714        }
1715        checks.push(check);
1716        if !mem_ok {
1717            all_ok = false;
1718        }
1719    }
1720
1721    let status_str = if all_ok { "ok" } else { "fail" };
1722    let mut body = serde_json::to_vec(&json!({
1723        "status": status_str,
1724        "checks": checks,
1725    }))
1726    .expect("serialize readiness");
1727    body.push(b'\n');
1728
1729    if all_ok {
1730        HttpResponse::json("200 OK", body)
1731    } else {
1732        HttpResponse::json("503 Service Unavailable", body)
1733    }
1734}
1735
1736/// Returns a comprehensive diagnostic health response.
1737fn storage_health_check(config: &ServerConfig) -> Vec<(bool, &'static str)> {
1738    #[allow(unused_mut)]
1739    let mut checks = vec![(config.storage_root.is_dir(), "storageRoot")];
1740    #[cfg(feature = "s3")]
1741    if config.storage_backend == StorageBackend::S3 {
1742        let reachable = config
1743            .s3_context
1744            .as_ref()
1745            .is_some_and(|ctx| ctx.check_reachable());
1746        checks.push((reachable, "storageBackend"));
1747    }
1748    #[cfg(feature = "gcs")]
1749    if config.storage_backend == StorageBackend::Gcs {
1750        let reachable = config
1751            .gcs_context
1752            .as_ref()
1753            .is_some_and(|ctx| ctx.check_reachable());
1754        checks.push((reachable, "storageBackend"));
1755    }
1756    #[cfg(feature = "azure")]
1757    if config.storage_backend == StorageBackend::Azure {
1758        let reachable = config
1759            .azure_context
1760            .as_ref()
1761            .is_some_and(|ctx| ctx.check_reachable());
1762        checks.push((reachable, "storageBackend"));
1763    }
1764    checks
1765}
1766
1767fn handle_health(config: &ServerConfig) -> HttpResponse {
1768    let mut checks: Vec<serde_json::Value> = Vec::new();
1769    let mut all_ok = true;
1770
1771    for (ok, name) in storage_health_check(config) {
1772        checks.push(json!({
1773            "name": name,
1774            "status": if ok { "ok" } else { "fail" },
1775        }));
1776        if !ok {
1777            all_ok = false;
1778        }
1779    }
1780
1781    if let Some(cache_root) = &config.cache_root {
1782        let cache_ok = cache_root.is_dir();
1783        checks.push(json!({
1784            "name": "cacheRoot",
1785            "status": if cache_ok { "ok" } else { "fail" },
1786        }));
1787        if !cache_ok {
1788            all_ok = false;
1789        }
1790    }
1791
1792    // Cache disk free space
1793    if let Some(cache_root) = &config.cache_root {
1794        let free = disk_free_bytes(cache_root);
1795        let threshold = config.health_cache_min_free_bytes;
1796        let disk_ok = match (free, threshold) {
1797            (Some(f), Some(min)) => f >= min,
1798            _ => true,
1799        };
1800        let mut check = json!({
1801            "name": "cacheDiskFree",
1802            "status": if disk_ok { "ok" } else { "fail" },
1803        });
1804        if let Some(f) = free {
1805            check["freeBytes"] = json!(f);
1806        }
1807        if let Some(min) = threshold {
1808            check["thresholdBytes"] = json!(min);
1809        }
1810        checks.push(check);
1811        if !disk_ok {
1812            all_ok = false;
1813        }
1814    }
1815
1816    // Concurrency utilization
1817    let in_flight = config.transforms_in_flight.load(Ordering::Relaxed);
1818    let overloaded = in_flight >= config.max_concurrent_transforms;
1819    checks.push(json!({
1820        "name": "transformCapacity",
1821        "status": if overloaded { "fail" } else { "ok" },
1822        "current": in_flight,
1823        "max": config.max_concurrent_transforms,
1824    }));
1825    if overloaded {
1826        all_ok = false;
1827    }
1828
1829    // Memory usage (Linux only)
1830    let rss = process_rss_bytes();
1831    if let Some(rss_bytes) = rss {
1832        let threshold = config.health_max_memory_bytes;
1833        let mem_ok = threshold.is_none_or(|max| rss_bytes <= max);
1834        let mut check = json!({
1835            "name": "memoryUsage",
1836            "status": if mem_ok { "ok" } else { "fail" },
1837            "rssBytes": rss_bytes,
1838        });
1839        if let Some(max) = threshold {
1840            check["thresholdBytes"] = json!(max);
1841        }
1842        checks.push(check);
1843        if !mem_ok {
1844            all_ok = false;
1845        }
1846    }
1847
1848    let status_str = if all_ok { "ok" } else { "fail" };
1849    let mut body = serde_json::to_vec(&json!({
1850        "status": status_str,
1851        "service": "truss",
1852        "version": env!("CARGO_PKG_VERSION"),
1853        "uptimeSeconds": uptime_seconds(),
1854        "checks": checks,
1855        "maxInputPixels": config.max_input_pixels,
1856    }))
1857    .expect("serialize health");
1858    body.push(b'\n');
1859
1860    HttpResponse::json("200 OK", body)
1861}
1862
1863fn handle_metrics_request(request: HttpRequest, config: &ServerConfig) -> HttpResponse {
1864    if config.disable_metrics {
1865        return HttpResponse::problem("404 Not Found", NOT_FOUND_BODY.as_bytes().to_vec());
1866    }
1867
1868    if let Some(expected) = &config.metrics_token {
1869        let provided = request.header("authorization").and_then(|value| {
1870            let (scheme, token) = value.split_once(|c: char| c.is_whitespace())?;
1871            scheme.eq_ignore_ascii_case("Bearer").then(|| token.trim())
1872        });
1873        match provided {
1874            Some(token) if token.as_bytes().ct_eq(expected.as_bytes()).into() => {}
1875            _ => {
1876                return response::auth_required_response(
1877                    "metrics endpoint requires authentication",
1878                );
1879            }
1880        }
1881    }
1882
1883    HttpResponse::text(
1884        "200 OK",
1885        "text/plain; version=0.0.4; charset=utf-8",
1886        render_metrics_text(
1887            config.max_concurrent_transforms,
1888            &config.transforms_in_flight,
1889        )
1890        .into_bytes(),
1891    )
1892}
1893
1894fn parse_public_get_request(
1895    query: &BTreeMap<String, String>,
1896    source_kind: PublicSourceKind,
1897    config: &ServerConfig,
1898) -> Result<
1899    (
1900        TransformSourcePayload,
1901        TransformOptions,
1902        Option<WatermarkPayload>,
1903    ),
1904    HttpResponse,
1905> {
1906    validate_public_query_names(query, source_kind)?;
1907
1908    let source = match source_kind {
1909        PublicSourceKind::Path => TransformSourcePayload::Path {
1910            path: required_query_param(query, "path")?.to_string(),
1911            version: query.get("version").cloned(),
1912        },
1913        PublicSourceKind::Url => TransformSourcePayload::Url {
1914            url: required_query_param(query, "url")?.to_string(),
1915            version: query.get("version").cloned(),
1916        },
1917    };
1918
1919    let has_orphaned_watermark_params = query.contains_key("watermarkPosition")
1920        || query.contains_key("watermarkOpacity")
1921        || query.contains_key("watermarkMargin");
1922    let watermark = if query.contains_key("watermarkUrl") {
1923        Some(WatermarkPayload {
1924            url: query.get("watermarkUrl").cloned(),
1925            position: query.get("watermarkPosition").cloned(),
1926            opacity: parse_optional_u8_query(query, "watermarkOpacity")?,
1927            margin: parse_optional_integer_query(query, "watermarkMargin")?,
1928        })
1929    } else if has_orphaned_watermark_params {
1930        return Err(bad_request_response(
1931            "watermarkPosition, watermarkOpacity, and watermarkMargin require watermarkUrl",
1932        ));
1933    } else {
1934        None
1935    };
1936
1937    // Build per-request overrides from query parameters.
1938    let per_request = TransformOptionsPayload {
1939        width: parse_optional_integer_query(query, "width")?,
1940        height: parse_optional_integer_query(query, "height")?,
1941        fit: query.get("fit").cloned(),
1942        position: query.get("position").cloned(),
1943        format: query.get("format").cloned(),
1944        quality: parse_optional_u8_query(query, "quality")?,
1945        background: query.get("background").cloned(),
1946        rotate: query
1947            .get("rotate")
1948            .map(|v| v.parse::<u16>())
1949            .transpose()
1950            .map_err(|_| bad_request_response("rotate must be 0, 90, 180, or 270"))?,
1951        auto_orient: parse_optional_bool_query(query, "autoOrient")?,
1952        strip_metadata: parse_optional_bool_query(query, "stripMetadata")?,
1953        preserve_exif: parse_optional_bool_query(query, "preserveExif")?,
1954        crop: query.get("crop").cloned(),
1955        blur: parse_optional_float_query(query, "blur")?,
1956        sharpen: parse_optional_float_query(query, "sharpen")?,
1957    };
1958
1959    // Resolve preset and merge with per-request overrides.
1960    let merged = if let Some(preset_name) = query.get("preset") {
1961        let preset = config
1962            .presets
1963            .get(preset_name)
1964            .ok_or_else(|| bad_request_response(&format!("unknown preset `{preset_name}`")))?;
1965        preset.clone().with_overrides(&per_request)
1966    } else {
1967        per_request
1968    };
1969
1970    let options = merged.into_options()?;
1971
1972    Ok((source, options, watermark))
1973}
1974
1975/// Watermark source: either already resolved (multipart upload) or deferred (URL fetch).
1976enum WatermarkSource {
1977    Deferred(ValidatedWatermarkPayload),
1978    Ready(WatermarkInput),
1979    None,
1980}
1981
1982impl WatermarkSource {
1983    fn from_validated(validated: Option<ValidatedWatermarkPayload>) -> Self {
1984        match validated {
1985            Some(v) => Self::Deferred(v),
1986            None => Self::None,
1987        }
1988    }
1989
1990    fn from_ready(input: Option<WatermarkInput>) -> Self {
1991        match input {
1992            Some(w) => Self::Ready(w),
1993            None => Self::None,
1994        }
1995    }
1996
1997    fn is_some(&self) -> bool {
1998        !matches!(self, Self::None)
1999    }
2000}
2001
2002#[allow(clippy::too_many_arguments)]
2003fn transform_source_bytes(
2004    source_bytes: Vec<u8>,
2005    options: TransformOptions,
2006    versioned_hash: Option<&str>,
2007    request: &HttpRequest,
2008    response_policy: ImageResponsePolicy,
2009    config: &ServerConfig,
2010    watermark: WatermarkSource,
2011    watermark_identity: Option<&str>,
2012    request_deadline: Option<Instant>,
2013) -> HttpResponse {
2014    let content_hash;
2015    let source_hash = match versioned_hash {
2016        Some(hash) => hash,
2017        None => {
2018            content_hash = hex::encode(Sha256::digest(&source_bytes));
2019            &content_hash
2020        }
2021    };
2022
2023    let cache = config
2024        .cache_root
2025        .as_ref()
2026        .map(|root| TransformCache::new(root.clone()).with_log_handler(config.log_handler.clone()));
2027
2028    if let Some(ref cache) = cache
2029        && options.format.is_some()
2030    {
2031        let cache_key = compute_cache_key(source_hash, &options, None, watermark_identity);
2032        if let CacheLookup::Hit {
2033            media_type,
2034            body,
2035            age,
2036        } = cache.get(&cache_key)
2037        {
2038            CACHE_HITS_TOTAL.fetch_add(1, Ordering::Relaxed);
2039            let etag = build_image_etag(&body);
2040            let mut headers = build_image_response_headers(
2041                media_type,
2042                &etag,
2043                response_policy,
2044                false,
2045                CacheHitStatus::Hit,
2046                config.public_max_age_seconds,
2047                config.public_stale_while_revalidate_seconds,
2048                &config.custom_response_headers,
2049            );
2050            headers.push(("Age".to_string(), age.as_secs().to_string()));
2051            if matches!(response_policy, ImageResponsePolicy::PublicGet)
2052                && if_none_match_matches(request.header("if-none-match"), &etag)
2053            {
2054                return HttpResponse::empty("304 Not Modified", headers);
2055            }
2056            return HttpResponse::binary_with_headers(
2057                "200 OK",
2058                media_type.as_mime(),
2059                headers,
2060                body,
2061            );
2062        }
2063    }
2064
2065    let _slot = match TransformSlot::try_acquire(
2066        &config.transforms_in_flight,
2067        config.max_concurrent_transforms,
2068    ) {
2069        Some(slot) => slot,
2070        None => return service_unavailable_response("too many concurrent transforms; retry later"),
2071    };
2072    transform_source_bytes_inner(
2073        source_bytes,
2074        options,
2075        request,
2076        response_policy,
2077        cache.as_ref(),
2078        source_hash,
2079        ImageResponseConfig {
2080            disable_accept_negotiation: config.disable_accept_negotiation,
2081            public_cache_control: PublicCacheControl {
2082                max_age: config.public_max_age_seconds,
2083                stale_while_revalidate: config.public_stale_while_revalidate_seconds,
2084            },
2085            transform_deadline: Duration::from_secs(config.transform_deadline_secs),
2086        },
2087        watermark,
2088        watermark_identity,
2089        config,
2090        request_deadline,
2091    )
2092}
2093
2094#[allow(clippy::too_many_arguments)]
2095fn transform_source_bytes_inner(
2096    source_bytes: Vec<u8>,
2097    mut options: TransformOptions,
2098    request: &HttpRequest,
2099    response_policy: ImageResponsePolicy,
2100    cache: Option<&TransformCache>,
2101    source_hash: &str,
2102    response_config: ImageResponseConfig,
2103    watermark_source: WatermarkSource,
2104    watermark_identity: Option<&str>,
2105    config: &ServerConfig,
2106    request_deadline: Option<Instant>,
2107) -> HttpResponse {
2108    if options.deadline.is_none() {
2109        options.deadline = Some(response_config.transform_deadline);
2110    }
2111    let artifact = match sniff_artifact(RawArtifact::new(source_bytes, None)) {
2112        Ok(artifact) => artifact,
2113        Err(error) => {
2114            record_transform_error(&error);
2115            return transform_error_response(error);
2116        }
2117    };
2118    let negotiation_used =
2119        if options.format.is_none() && !response_config.disable_accept_negotiation {
2120            match negotiate_output_format(request.header("accept"), &artifact) {
2121                Ok(Some(format)) => {
2122                    options.format = Some(format);
2123                    true
2124                }
2125                Ok(None) => false,
2126                Err(response) => return response,
2127            }
2128        } else {
2129            false
2130        };
2131
2132    // Check input pixel count against the server-level limit before decode.
2133    // This runs before the cache lookup so that a policy change (lowering the
2134    // limit) takes effect immediately, even for previously-cached images.
2135    if let (Some(w), Some(h)) = (artifact.metadata.width, artifact.metadata.height) {
2136        let pixels = u64::from(w) * u64::from(h);
2137        if pixels > config.max_input_pixels {
2138            return response::unprocessable_entity_response(&format!(
2139                "input image has {pixels} pixels, server limit is {}",
2140                config.max_input_pixels
2141            ));
2142        }
2143    }
2144
2145    let negotiated_accept = if negotiation_used {
2146        request.header("accept")
2147    } else {
2148        None
2149    };
2150    let cache_key = compute_cache_key(source_hash, &options, negotiated_accept, watermark_identity);
2151
2152    if let Some(cache) = cache
2153        && let CacheLookup::Hit {
2154            media_type,
2155            body,
2156            age,
2157        } = cache.get(&cache_key)
2158    {
2159        CACHE_HITS_TOTAL.fetch_add(1, Ordering::Relaxed);
2160        let etag = build_image_etag(&body);
2161        let mut headers = build_image_response_headers(
2162            media_type,
2163            &etag,
2164            response_policy,
2165            negotiation_used,
2166            CacheHitStatus::Hit,
2167            response_config.public_cache_control.max_age,
2168            response_config.public_cache_control.stale_while_revalidate,
2169            &config.custom_response_headers,
2170        );
2171        headers.push(("Age".to_string(), age.as_secs().to_string()));
2172        if matches!(response_policy, ImageResponsePolicy::PublicGet)
2173            && if_none_match_matches(request.header("if-none-match"), &etag)
2174        {
2175            return HttpResponse::empty("304 Not Modified", headers);
2176        }
2177        return HttpResponse::binary_with_headers("200 OK", media_type.as_mime(), headers, body);
2178    }
2179
2180    if cache.is_some() {
2181        CACHE_MISSES_TOTAL.fetch_add(1, Ordering::Relaxed);
2182    }
2183
2184    let is_svg = artifact.media_type == MediaType::Svg;
2185
2186    // Resolve watermark: reject SVG+watermark early (before fetch), then fetch if deferred.
2187    let watermark = if is_svg && watermark_source.is_some() {
2188        return bad_request_response("watermark is not supported for SVG source images");
2189    } else {
2190        match watermark_source {
2191            WatermarkSource::Deferred(validated) => {
2192                match fetch_watermark(validated, config, request_deadline) {
2193                    Ok(wm) => {
2194                        record_watermark_transform();
2195                        Some(wm)
2196                    }
2197                    Err(response) => return response,
2198                }
2199            }
2200            WatermarkSource::Ready(wm) => {
2201                record_watermark_transform();
2202                Some(wm)
2203            }
2204            WatermarkSource::None => None,
2205        }
2206    };
2207
2208    let had_watermark = watermark.is_some();
2209
2210    let transform_start = Instant::now();
2211    let mut request_obj = TransformRequest::new(artifact, options);
2212    request_obj.watermark = watermark;
2213    let result = if is_svg {
2214        match transform_svg(request_obj) {
2215            Ok(result) => result,
2216            Err(error) => {
2217                record_transform_error(&error);
2218                return transform_error_response(error);
2219            }
2220        }
2221    } else {
2222        match transform_raster(request_obj) {
2223            Ok(result) => result,
2224            Err(error) => {
2225                record_transform_error(&error);
2226                return transform_error_response(error);
2227            }
2228        }
2229    };
2230    record_transform_duration(result.artifact.media_type, transform_start);
2231
2232    for warning in &result.warnings {
2233        let msg = format!("truss: {warning}");
2234        if let Some(c) = cache
2235            && let Some(handler) = &c.log_handler
2236        {
2237            handler(&msg);
2238        } else {
2239            stderr_write(&msg);
2240        }
2241    }
2242
2243    let output = result.artifact;
2244
2245    if let Some(cache) = cache {
2246        cache.put(&cache_key, output.media_type, &output.bytes);
2247    }
2248
2249    let cache_hit_status = if cache.is_some() {
2250        CacheHitStatus::Miss
2251    } else {
2252        CacheHitStatus::Disabled
2253    };
2254
2255    let etag = build_image_etag(&output.bytes);
2256    let headers = build_image_response_headers(
2257        output.media_type,
2258        &etag,
2259        response_policy,
2260        negotiation_used,
2261        cache_hit_status,
2262        response_config.public_cache_control.max_age,
2263        response_config.public_cache_control.stale_while_revalidate,
2264        &config.custom_response_headers,
2265    );
2266
2267    if matches!(response_policy, ImageResponsePolicy::PublicGet)
2268        && if_none_match_matches(request.header("if-none-match"), &etag)
2269    {
2270        return HttpResponse::empty("304 Not Modified", headers);
2271    }
2272
2273    let mut response = HttpResponse::binary_with_headers(
2274        "200 OK",
2275        output.media_type.as_mime(),
2276        headers,
2277        output.bytes,
2278    );
2279    if had_watermark {
2280        response
2281            .headers
2282            .push(("X-Truss-Watermark".to_string(), "true".to_string()));
2283    }
2284    response
2285}
2286
2287#[cfg(test)]
2288mod tests {
2289    use serial_test::serial;
2290
2291    use super::config::DEFAULT_MAX_CONCURRENT_TRANSFORMS;
2292    use super::config::{
2293        DEFAULT_PUBLIC_MAX_AGE_SECONDS, DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS,
2294        parse_presets_from_env,
2295    };
2296    use super::http_parse::{
2297        DEFAULT_MAX_UPLOAD_BODY_BYTES, HttpRequest, find_header_terminator, read_request_body,
2298        read_request_headers, resolve_storage_path,
2299    };
2300    use super::multipart::parse_multipart_form_data;
2301    use super::remote::{PinnedResolver, prepare_remote_fetch_target};
2302    use super::response::auth_required_response;
2303    use super::response::{HttpResponse, bad_request_response};
2304    use super::{
2305        CacheHitStatus, DEFAULT_BIND_ADDR, ImageResponsePolicy, PublicSourceKind, ServerConfig,
2306        SignedUrlSource, TransformOptionsPayload, TransformSourcePayload, WatermarkSource,
2307        authorize_signed_request, bind_addr, build_image_etag, build_image_response_headers,
2308        canonical_query_without_signature, negotiate_output_format, parse_public_get_request,
2309        route_request, serve_once_with_config, sign_public_url, transform_source_bytes,
2310    };
2311    use crate::{
2312        Artifact, ArtifactMetadata, Fit, MediaType, RawArtifact, TransformOptions, sniff_artifact,
2313    };
2314    use hmac::{Hmac, Mac};
2315    use image::codecs::png::PngEncoder;
2316    use image::{ColorType, ImageEncoder, Rgba, RgbaImage};
2317    use sha2::Sha256;
2318    use std::collections::{BTreeMap, HashMap};
2319    use std::env;
2320    use std::fs;
2321    use std::io::{Cursor, Read, Write};
2322    use std::net::{SocketAddr, TcpListener, TcpStream};
2323    use std::path::{Path, PathBuf};
2324    use std::sync::atomic::Ordering;
2325    use std::thread;
2326    use std::time::{Duration, SystemTime, UNIX_EPOCH};
2327
2328    /// Test-only convenience wrapper that reads headers + body in one shot,
2329    /// preserving the original `read_request` semantics for existing tests.
2330    fn read_request<R: Read>(stream: &mut R) -> Result<HttpRequest, HttpResponse> {
2331        let partial = read_request_headers(stream, DEFAULT_MAX_UPLOAD_BODY_BYTES)?;
2332        read_request_body(stream, partial)
2333    }
2334
2335    fn png_bytes() -> Vec<u8> {
2336        let image = RgbaImage::from_pixel(4, 3, Rgba([10, 20, 30, 255]));
2337        let mut bytes = Vec::new();
2338        PngEncoder::new(&mut bytes)
2339            .write_image(&image, 4, 3, ColorType::Rgba8.into())
2340            .expect("encode png");
2341        bytes
2342    }
2343
2344    fn temp_dir(name: &str) -> PathBuf {
2345        let unique = SystemTime::now()
2346            .duration_since(UNIX_EPOCH)
2347            .expect("current time")
2348            .as_nanos();
2349        let path = std::env::temp_dir().join(format!("truss-server-{name}-{unique}"));
2350        fs::create_dir_all(&path).expect("create temp dir");
2351        path
2352    }
2353
2354    fn write_png(path: &Path) {
2355        fs::write(path, png_bytes()).expect("write png fixture");
2356    }
2357
2358    fn artifact_with_alpha(has_alpha: bool) -> Artifact {
2359        Artifact::new(
2360            png_bytes(),
2361            MediaType::Png,
2362            ArtifactMetadata {
2363                width: Some(4),
2364                height: Some(3),
2365                frame_count: 1,
2366                duration: None,
2367                has_alpha: Some(has_alpha),
2368            },
2369        )
2370    }
2371
2372    fn sign_public_query(
2373        method: &str,
2374        authority: &str,
2375        path: &str,
2376        query: &BTreeMap<String, String>,
2377        secret: &str,
2378    ) -> String {
2379        let canonical = format!(
2380            "{method}\n{authority}\n{path}\n{}",
2381            canonical_query_without_signature(query)
2382        );
2383        let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).expect("create hmac");
2384        mac.update(canonical.as_bytes());
2385        hex::encode(mac.finalize().into_bytes())
2386    }
2387
2388    type FixtureResponse = (String, Vec<(String, String)>, Vec<u8>);
2389
2390    fn read_fixture_request(stream: &mut TcpStream) {
2391        stream
2392            .set_nonblocking(false)
2393            .expect("configure fixture stream blocking mode");
2394        stream
2395            .set_read_timeout(Some(Duration::from_millis(100)))
2396            .expect("configure fixture stream timeout");
2397
2398        let deadline = std::time::Instant::now() + Duration::from_secs(2);
2399        let mut buffer = Vec::new();
2400        let mut chunk = [0_u8; 1024];
2401        let header_end = loop {
2402            let read = match stream.read(&mut chunk) {
2403                Ok(read) => read,
2404                Err(error)
2405                    if matches!(
2406                        error.kind(),
2407                        std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut
2408                    ) && std::time::Instant::now() < deadline =>
2409                {
2410                    thread::sleep(Duration::from_millis(10));
2411                    continue;
2412                }
2413                Err(error) => panic!("read fixture request headers: {error}"),
2414            };
2415            if read == 0 {
2416                panic!("fixture request ended before headers were complete");
2417            }
2418            buffer.extend_from_slice(&chunk[..read]);
2419            if let Some(index) = find_header_terminator(&buffer) {
2420                break index;
2421            }
2422        };
2423
2424        let header_text = std::str::from_utf8(&buffer[..header_end]).expect("fixture request utf8");
2425        let content_length = header_text
2426            .split("\r\n")
2427            .filter_map(|line| line.split_once(':'))
2428            .find_map(|(name, value)| {
2429                name.trim()
2430                    .eq_ignore_ascii_case("content-length")
2431                    .then_some(value.trim())
2432            })
2433            .map(|value| {
2434                value
2435                    .parse::<usize>()
2436                    .expect("fixture content-length should be numeric")
2437            })
2438            .unwrap_or(0);
2439
2440        let mut body = buffer.len().saturating_sub(header_end + 4);
2441        while body < content_length {
2442            let read = match stream.read(&mut chunk) {
2443                Ok(read) => read,
2444                Err(error)
2445                    if matches!(
2446                        error.kind(),
2447                        std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut
2448                    ) && std::time::Instant::now() < deadline =>
2449                {
2450                    thread::sleep(Duration::from_millis(10));
2451                    continue;
2452                }
2453                Err(error) => panic!("read fixture request body: {error}"),
2454            };
2455            if read == 0 {
2456                panic!("fixture request body was truncated");
2457            }
2458            body += read;
2459        }
2460    }
2461
2462    fn spawn_http_server(responses: Vec<FixtureResponse>) -> (String, thread::JoinHandle<()>) {
2463        let listener = TcpListener::bind("127.0.0.1:0").expect("bind fixture server");
2464        listener
2465            .set_nonblocking(true)
2466            .expect("configure fixture server");
2467        let addr = listener.local_addr().expect("fixture server addr");
2468        let url = format!("http://{addr}/image");
2469
2470        let handle = thread::spawn(move || {
2471            for (status, headers, body) in responses {
2472                let deadline = std::time::Instant::now() + Duration::from_secs(10);
2473                let mut accepted = None;
2474                while std::time::Instant::now() < deadline {
2475                    match listener.accept() {
2476                        Ok(stream) => {
2477                            accepted = Some(stream);
2478                            break;
2479                        }
2480                        Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => {
2481                            thread::sleep(Duration::from_millis(10));
2482                        }
2483                        Err(error) => panic!("accept fixture request: {error}"),
2484                    }
2485                }
2486
2487                let Some((mut stream, _)) = accepted else {
2488                    break;
2489                };
2490                read_fixture_request(&mut stream);
2491                let mut header = format!(
2492                    "HTTP/1.1 {status}\r\nContent-Length: {}\r\nConnection: close\r\n",
2493                    body.len()
2494                );
2495                for (name, value) in headers {
2496                    header.push_str(&format!("{name}: {value}\r\n"));
2497                }
2498                header.push_str("\r\n");
2499                stream
2500                    .write_all(header.as_bytes())
2501                    .expect("write fixture headers");
2502                stream.write_all(&body).expect("write fixture body");
2503                stream.flush().expect("flush fixture response");
2504            }
2505        });
2506
2507        (url, handle)
2508    }
2509
2510    fn transform_request(path: &str) -> HttpRequest {
2511        HttpRequest {
2512            method: "POST".to_string(),
2513            target: "/images:transform".to_string(),
2514            version: "HTTP/1.1".to_string(),
2515            headers: vec![
2516                ("authorization".to_string(), "Bearer secret".to_string()),
2517                ("content-type".to_string(), "application/json".to_string()),
2518            ],
2519            body: format!(
2520                "{{\"source\":{{\"kind\":\"path\",\"path\":\"{path}\"}},\"options\":{{\"format\":\"jpeg\"}}}}"
2521            )
2522            .into_bytes(),
2523        }
2524    }
2525
2526    fn transform_url_request(url: &str) -> HttpRequest {
2527        HttpRequest {
2528            method: "POST".to_string(),
2529            target: "/images:transform".to_string(),
2530            version: "HTTP/1.1".to_string(),
2531            headers: vec![
2532                ("authorization".to_string(), "Bearer secret".to_string()),
2533                ("content-type".to_string(), "application/json".to_string()),
2534            ],
2535            body: format!(
2536                "{{\"source\":{{\"kind\":\"url\",\"url\":\"{url}\"}},\"options\":{{\"format\":\"jpeg\"}}}}"
2537            )
2538            .into_bytes(),
2539        }
2540    }
2541
2542    fn upload_request(file_bytes: &[u8], options_json: Option<&str>) -> HttpRequest {
2543        let boundary = "truss-test-boundary";
2544        let mut body = Vec::new();
2545        body.extend_from_slice(
2546            format!(
2547                "--{boundary}\r\nContent-Disposition: form-data; name=\"file\"; filename=\"image.png\"\r\nContent-Type: image/png\r\n\r\n"
2548            )
2549            .as_bytes(),
2550        );
2551        body.extend_from_slice(file_bytes);
2552        body.extend_from_slice(b"\r\n");
2553
2554        if let Some(options_json) = options_json {
2555            body.extend_from_slice(
2556                format!(
2557                    "--{boundary}\r\nContent-Disposition: form-data; name=\"options\"\r\nContent-Type: application/json\r\n\r\n{options_json}\r\n"
2558                )
2559                .as_bytes(),
2560            );
2561        }
2562
2563        body.extend_from_slice(format!("--{boundary}--\r\n").as_bytes());
2564
2565        HttpRequest {
2566            method: "POST".to_string(),
2567            target: "/images".to_string(),
2568            version: "HTTP/1.1".to_string(),
2569            headers: vec![
2570                ("authorization".to_string(), "Bearer secret".to_string()),
2571                (
2572                    "content-type".to_string(),
2573                    format!("multipart/form-data; boundary={boundary}"),
2574                ),
2575            ],
2576            body,
2577        }
2578    }
2579
2580    fn metrics_request(with_auth: bool) -> HttpRequest {
2581        let mut headers = Vec::new();
2582        if with_auth {
2583            headers.push(("authorization".to_string(), "Bearer secret".to_string()));
2584        }
2585
2586        HttpRequest {
2587            method: "GET".to_string(),
2588            target: "/metrics".to_string(),
2589            version: "HTTP/1.1".to_string(),
2590            headers,
2591            body: Vec::new(),
2592        }
2593    }
2594
2595    fn response_body(response: &HttpResponse) -> &str {
2596        std::str::from_utf8(&response.body).expect("utf8 response body")
2597    }
2598
2599    fn signed_public_request(target: &str, host: &str, secret: &str) -> HttpRequest {
2600        let (path, query) = target.split_once('?').expect("target has query");
2601        let mut query = url::form_urlencoded::parse(query.as_bytes())
2602            .into_owned()
2603            .collect::<BTreeMap<_, _>>();
2604        let signature = sign_public_query("GET", host, path, &query, secret);
2605        query.insert("signature".to_string(), signature);
2606        let final_query = url::form_urlencoded::Serializer::new(String::new())
2607            .extend_pairs(
2608                query
2609                    .iter()
2610                    .map(|(name, value)| (name.as_str(), value.as_str())),
2611            )
2612            .finish();
2613
2614        HttpRequest {
2615            method: "GET".to_string(),
2616            target: format!("{path}?{final_query}"),
2617            version: "HTTP/1.1".to_string(),
2618            headers: vec![("host".to_string(), host.to_string())],
2619            body: Vec::new(),
2620        }
2621    }
2622
2623    #[test]
2624    fn uses_default_bind_addr_when_env_is_missing() {
2625        unsafe { std::env::remove_var("TRUSS_BIND_ADDR") };
2626        assert_eq!(bind_addr(), DEFAULT_BIND_ADDR);
2627    }
2628
2629    #[test]
2630    fn authorize_signed_request_accepts_a_valid_signature() {
2631        let request = signed_public_request(
2632            "/images/by-path?path=%2Fimage.png&keyId=public-dev&expires=4102444800&format=jpeg",
2633            "assets.example.com",
2634            "secret-value",
2635        );
2636        let query = super::auth::parse_query_params(&request).expect("parse query");
2637        let config = ServerConfig::new(temp_dir("public-auth"), None)
2638            .with_signed_url_credentials("public-dev", "secret-value");
2639
2640        authorize_signed_request(&request, &query, &config).expect("signed auth should pass");
2641    }
2642
2643    #[test]
2644    fn authorize_signed_request_uses_public_base_url_authority() {
2645        let request = signed_public_request(
2646            "/images/by-path?path=%2Fimage.png&keyId=public-dev&expires=4102444800&format=jpeg",
2647            "cdn.example.com",
2648            "secret-value",
2649        );
2650        let query = super::auth::parse_query_params(&request).expect("parse query");
2651        let mut config = ServerConfig::new(temp_dir("public-authority"), None)
2652            .with_signed_url_credentials("public-dev", "secret-value");
2653        config.public_base_url = Some("https://cdn.example.com".to_string());
2654
2655        authorize_signed_request(&request, &query, &config).expect("signed auth should pass");
2656    }
2657
2658    #[test]
2659    fn negotiate_output_format_prefers_alpha_safe_formats_for_transparent_inputs() {
2660        let format =
2661            negotiate_output_format(Some("image/jpeg,image/png"), &artifact_with_alpha(true))
2662                .expect("negotiate output format")
2663                .expect("resolved output format");
2664
2665        assert_eq!(format, MediaType::Png);
2666    }
2667
2668    #[test]
2669    fn negotiate_output_format_prefers_avif_for_wildcard_accept() {
2670        let format = negotiate_output_format(Some("image/*"), &artifact_with_alpha(false))
2671            .expect("negotiate output format")
2672            .expect("resolved output format");
2673
2674        assert_eq!(format, MediaType::Avif);
2675    }
2676
2677    #[test]
2678    fn build_image_response_headers_include_cache_and_safety_metadata() {
2679        let headers = build_image_response_headers(
2680            MediaType::Webp,
2681            &build_image_etag(b"demo"),
2682            ImageResponsePolicy::PublicGet,
2683            true,
2684            CacheHitStatus::Disabled,
2685            DEFAULT_PUBLIC_MAX_AGE_SECONDS,
2686            DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS,
2687            &[],
2688        );
2689
2690        assert!(headers.contains(&(
2691            "Cache-Control".to_string(),
2692            "public, max-age=3600, stale-while-revalidate=60".to_string()
2693        )));
2694        assert!(headers.contains(&("Vary".to_string(), "Accept".to_string())));
2695        assert!(headers.contains(&("X-Content-Type-Options".to_string(), "nosniff".to_string())));
2696        assert!(headers.contains(&(
2697            "Content-Disposition".to_string(),
2698            "inline; filename=\"truss.webp\"".to_string()
2699        )));
2700        assert!(headers.contains(&(
2701            "Cache-Status".to_string(),
2702            "\"truss\"; fwd=miss".to_string()
2703        )));
2704    }
2705
2706    #[test]
2707    fn build_image_response_headers_include_csp_sandbox_for_svg() {
2708        let headers = build_image_response_headers(
2709            MediaType::Svg,
2710            &build_image_etag(b"svg-data"),
2711            ImageResponsePolicy::PublicGet,
2712            true,
2713            CacheHitStatus::Disabled,
2714            DEFAULT_PUBLIC_MAX_AGE_SECONDS,
2715            DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS,
2716            &[],
2717        );
2718
2719        assert!(headers.contains(&("Content-Security-Policy".to_string(), "sandbox".to_string())));
2720    }
2721
2722    #[test]
2723    fn build_image_response_headers_omit_csp_sandbox_for_raster() {
2724        let headers = build_image_response_headers(
2725            MediaType::Png,
2726            &build_image_etag(b"png-data"),
2727            ImageResponsePolicy::PublicGet,
2728            true,
2729            CacheHitStatus::Disabled,
2730            DEFAULT_PUBLIC_MAX_AGE_SECONDS,
2731            DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS,
2732            &[],
2733        );
2734
2735        assert!(!headers.iter().any(|(k, _)| *k == "Content-Security-Policy"));
2736    }
2737
2738    #[test]
2739    fn backpressure_rejects_when_at_capacity() {
2740        let config = ServerConfig::new(std::env::temp_dir(), None);
2741        config
2742            .transforms_in_flight
2743            .store(DEFAULT_MAX_CONCURRENT_TRANSFORMS, Ordering::Relaxed);
2744
2745        let request = HttpRequest {
2746            method: "POST".to_string(),
2747            target: "/transform".to_string(),
2748            version: "HTTP/1.1".to_string(),
2749            headers: Vec::new(),
2750            body: Vec::new(),
2751        };
2752
2753        let png_bytes = {
2754            let mut buf = Vec::new();
2755            let encoder = image::codecs::png::PngEncoder::new(&mut buf);
2756            encoder
2757                .write_image(&[255, 0, 0, 255], 1, 1, image::ExtendedColorType::Rgba8)
2758                .unwrap();
2759            buf
2760        };
2761
2762        let response = transform_source_bytes(
2763            png_bytes,
2764            TransformOptions::default(),
2765            None,
2766            &request,
2767            ImageResponsePolicy::PrivateTransform,
2768            &config,
2769            WatermarkSource::None,
2770            None,
2771            None,
2772        );
2773
2774        assert!(response.status.contains("503"));
2775
2776        assert_eq!(
2777            config.transforms_in_flight.load(Ordering::Relaxed),
2778            DEFAULT_MAX_CONCURRENT_TRANSFORMS
2779        );
2780    }
2781
2782    #[test]
2783    fn backpressure_rejects_with_custom_concurrency_limit() {
2784        let custom_limit = 2u64;
2785        let mut config = ServerConfig::new(std::env::temp_dir(), None);
2786        config.max_concurrent_transforms = custom_limit;
2787        config
2788            .transforms_in_flight
2789            .store(custom_limit, Ordering::Relaxed);
2790
2791        let request = HttpRequest {
2792            method: "POST".to_string(),
2793            target: "/transform".to_string(),
2794            version: "HTTP/1.1".to_string(),
2795            headers: Vec::new(),
2796            body: Vec::new(),
2797        };
2798
2799        let png_bytes = {
2800            let mut buf = Vec::new();
2801            let encoder = image::codecs::png::PngEncoder::new(&mut buf);
2802            encoder
2803                .write_image(&[255, 0, 0, 255], 1, 1, image::ExtendedColorType::Rgba8)
2804                .unwrap();
2805            buf
2806        };
2807
2808        let response = transform_source_bytes(
2809            png_bytes,
2810            TransformOptions::default(),
2811            None,
2812            &request,
2813            ImageResponsePolicy::PrivateTransform,
2814            &config,
2815            WatermarkSource::None,
2816            None,
2817            None,
2818        );
2819
2820        assert!(response.status.contains("503"));
2821    }
2822
2823    #[test]
2824    fn compute_cache_key_is_deterministic() {
2825        let opts = TransformOptions {
2826            width: Some(300),
2827            height: Some(200),
2828            format: Some(MediaType::Webp),
2829            ..TransformOptions::default()
2830        };
2831        let key1 = super::cache::compute_cache_key("source-abc", &opts, None, None);
2832        let key2 = super::cache::compute_cache_key("source-abc", &opts, None, None);
2833        assert_eq!(key1, key2);
2834        assert_eq!(key1.len(), 64);
2835    }
2836
2837    #[test]
2838    fn compute_cache_key_differs_for_different_options() {
2839        let opts1 = TransformOptions {
2840            width: Some(300),
2841            format: Some(MediaType::Webp),
2842            ..TransformOptions::default()
2843        };
2844        let opts2 = TransformOptions {
2845            width: Some(400),
2846            format: Some(MediaType::Webp),
2847            ..TransformOptions::default()
2848        };
2849        let key1 = super::cache::compute_cache_key("same-source", &opts1, None, None);
2850        let key2 = super::cache::compute_cache_key("same-source", &opts2, None, None);
2851        assert_ne!(key1, key2);
2852    }
2853
2854    #[test]
2855    fn compute_cache_key_includes_accept_when_present() {
2856        let opts = TransformOptions::default();
2857        let key_no_accept = super::cache::compute_cache_key("src", &opts, None, None);
2858        let key_with_accept =
2859            super::cache::compute_cache_key("src", &opts, Some("image/webp"), None);
2860        assert_ne!(key_no_accept, key_with_accept);
2861    }
2862
2863    #[test]
2864    fn transform_cache_put_and_get_round_trips() {
2865        let dir = tempfile::tempdir().expect("create tempdir");
2866        let cache = super::cache::TransformCache::new(dir.path().to_path_buf());
2867
2868        cache.put(
2869            "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890",
2870            MediaType::Png,
2871            b"png-data",
2872        );
2873        let result = cache.get("abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890");
2874
2875        match result {
2876            super::cache::CacheLookup::Hit {
2877                media_type, body, ..
2878            } => {
2879                assert_eq!(media_type, MediaType::Png);
2880                assert_eq!(body, b"png-data");
2881            }
2882            super::cache::CacheLookup::Miss => panic!("expected cache hit"),
2883        }
2884    }
2885
2886    #[test]
2887    fn transform_cache_miss_for_unknown_key() {
2888        let dir = tempfile::tempdir().expect("create tempdir");
2889        let cache = super::cache::TransformCache::new(dir.path().to_path_buf());
2890
2891        let result = cache.get("0000001234567890abcdef1234567890abcdef1234567890abcdef1234567890");
2892        assert!(matches!(result, super::cache::CacheLookup::Miss));
2893    }
2894
2895    #[test]
2896    fn transform_cache_uses_sharded_layout() {
2897        let dir = tempfile::tempdir().expect("create tempdir");
2898        let cache = super::cache::TransformCache::new(dir.path().to_path_buf());
2899
2900        let key = "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890";
2901        cache.put(key, MediaType::Jpeg, b"jpeg-data");
2902
2903        let expected = dir.path().join("ab").join("cd").join("ef").join(key);
2904        assert!(
2905            expected.exists(),
2906            "sharded file should exist at {expected:?}"
2907        );
2908    }
2909
2910    #[test]
2911    fn transform_cache_expired_entry_is_miss() {
2912        let dir = tempfile::tempdir().expect("create tempdir");
2913        let mut cache = super::cache::TransformCache::new(dir.path().to_path_buf());
2914        cache.ttl = Duration::from_secs(0);
2915
2916        let key = "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890";
2917        cache.put(key, MediaType::Png, b"data");
2918
2919        std::thread::sleep(Duration::from_millis(10));
2920
2921        let result = cache.get(key);
2922        assert!(matches!(result, super::cache::CacheLookup::Miss));
2923    }
2924
2925    #[test]
2926    fn transform_cache_handles_corrupted_entry_as_miss() {
2927        let dir = tempfile::tempdir().expect("create tempdir");
2928        let cache = super::cache::TransformCache::new(dir.path().to_path_buf());
2929
2930        let key = "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890";
2931        let path = cache.entry_path(key);
2932        fs::create_dir_all(path.parent().unwrap()).unwrap();
2933        fs::write(&path, b"corrupted-data-without-header").unwrap();
2934
2935        let result = cache.get(key);
2936        assert!(matches!(result, super::cache::CacheLookup::Miss));
2937    }
2938
2939    #[test]
2940    fn cache_status_header_reflects_hit() {
2941        let headers = build_image_response_headers(
2942            MediaType::Png,
2943            &build_image_etag(b"data"),
2944            ImageResponsePolicy::PublicGet,
2945            false,
2946            CacheHitStatus::Hit,
2947            DEFAULT_PUBLIC_MAX_AGE_SECONDS,
2948            DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS,
2949            &[],
2950        );
2951        assert!(headers.contains(&("Cache-Status".to_string(), "\"truss\"; hit".to_string())));
2952    }
2953
2954    #[test]
2955    fn cache_status_header_reflects_miss() {
2956        let headers = build_image_response_headers(
2957            MediaType::Png,
2958            &build_image_etag(b"data"),
2959            ImageResponsePolicy::PublicGet,
2960            false,
2961            CacheHitStatus::Miss,
2962            DEFAULT_PUBLIC_MAX_AGE_SECONDS,
2963            DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS,
2964            &[],
2965        );
2966        assert!(headers.contains(&(
2967            "Cache-Status".to_string(),
2968            "\"truss\"; fwd=miss".to_string()
2969        )));
2970    }
2971
2972    #[test]
2973    fn origin_cache_put_and_get_round_trips() {
2974        let dir = tempfile::tempdir().expect("create tempdir");
2975        let cache = super::cache::OriginCache::new(dir.path());
2976
2977        cache.put("src", "https://example.com/image.png", b"raw-source-bytes");
2978        let result = cache.get("src", "https://example.com/image.png");
2979
2980        assert_eq!(result.as_deref(), Some(b"raw-source-bytes".as_ref()));
2981    }
2982
2983    #[test]
2984    fn origin_cache_miss_for_unknown_url() {
2985        let dir = tempfile::tempdir().expect("create tempdir");
2986        let cache = super::cache::OriginCache::new(dir.path());
2987
2988        assert!(
2989            cache
2990                .get("src", "https://unknown.example.com/missing.png")
2991                .is_none()
2992        );
2993    }
2994
2995    #[test]
2996    fn origin_cache_expired_entry_is_none() {
2997        let dir = tempfile::tempdir().expect("create tempdir");
2998        let mut cache = super::cache::OriginCache::new(dir.path());
2999        cache.ttl = Duration::from_secs(0);
3000
3001        cache.put("src", "https://example.com/img.png", b"data");
3002        std::thread::sleep(Duration::from_millis(10));
3003
3004        assert!(cache.get("src", "https://example.com/img.png").is_none());
3005    }
3006
3007    #[test]
3008    fn origin_cache_uses_origin_subdirectory() {
3009        let dir = tempfile::tempdir().expect("create tempdir");
3010        let cache = super::cache::OriginCache::new(dir.path());
3011
3012        cache.put("src", "https://example.com/test.png", b"bytes");
3013
3014        let origin_dir = dir.path().join("origin");
3015        assert!(origin_dir.exists(), "origin subdirectory should exist");
3016    }
3017
3018    #[test]
3019    fn sign_public_url_builds_a_signed_path_url() {
3020        let url = sign_public_url(
3021            "https://cdn.example.com",
3022            SignedUrlSource::Path {
3023                path: "/image.png".to_string(),
3024                version: Some("v1".to_string()),
3025            },
3026            &crate::TransformOptions {
3027                format: Some(MediaType::Jpeg),
3028                width: Some(320),
3029                ..crate::TransformOptions::default()
3030            },
3031            "public-dev",
3032            "secret-value",
3033            4_102_444_800,
3034            None,
3035            None,
3036        )
3037        .expect("sign public URL");
3038
3039        assert!(url.starts_with("https://cdn.example.com/images/by-path?"));
3040        assert!(url.contains("path=%2Fimage.png"));
3041        assert!(url.contains("version=v1"));
3042        assert!(url.contains("width=320"));
3043        assert!(url.contains("format=jpeg"));
3044        assert!(url.contains("keyId=public-dev"));
3045        assert!(url.contains("expires=4102444800"));
3046        assert!(url.contains("signature="));
3047    }
3048
3049    #[test]
3050    fn parse_public_get_request_rejects_unknown_query_parameters() {
3051        let query = BTreeMap::from([
3052            ("path".to_string(), "/image.png".to_string()),
3053            ("keyId".to_string(), "public-dev".to_string()),
3054            ("expires".to_string(), "4102444800".to_string()),
3055            ("signature".to_string(), "deadbeef".to_string()),
3056            ("unexpected".to_string(), "value".to_string()),
3057        ]);
3058
3059        let config = ServerConfig::new(temp_dir("parse-query"), None);
3060        let response = parse_public_get_request(&query, PublicSourceKind::Path, &config)
3061            .expect_err("unknown query should fail");
3062
3063        assert_eq!(response.status, "400 Bad Request");
3064        assert!(response_body(&response).contains("is not supported"));
3065    }
3066
3067    #[test]
3068    fn parse_public_get_request_resolves_preset() {
3069        let mut presets = HashMap::new();
3070        presets.insert(
3071            "thumbnail".to_string(),
3072            TransformOptionsPayload {
3073                width: Some(150),
3074                height: Some(150),
3075                fit: Some("cover".to_string()),
3076                ..TransformOptionsPayload::default()
3077            },
3078        );
3079        let config = ServerConfig::new(temp_dir("preset"), None).with_presets(presets);
3080
3081        let query = BTreeMap::from([
3082            ("path".to_string(), "/image.png".to_string()),
3083            ("preset".to_string(), "thumbnail".to_string()),
3084        ]);
3085        let (_, options, _) =
3086            parse_public_get_request(&query, PublicSourceKind::Path, &config).unwrap();
3087
3088        assert_eq!(options.width, Some(150));
3089        assert_eq!(options.height, Some(150));
3090        assert_eq!(options.fit, Some(Fit::Cover));
3091    }
3092
3093    #[test]
3094    fn parse_public_get_request_preset_with_override() {
3095        let mut presets = HashMap::new();
3096        presets.insert(
3097            "thumbnail".to_string(),
3098            TransformOptionsPayload {
3099                width: Some(150),
3100                height: Some(150),
3101                fit: Some("cover".to_string()),
3102                format: Some("webp".to_string()),
3103                ..TransformOptionsPayload::default()
3104            },
3105        );
3106        let config = ServerConfig::new(temp_dir("preset-override"), None).with_presets(presets);
3107
3108        let query = BTreeMap::from([
3109            ("path".to_string(), "/image.png".to_string()),
3110            ("preset".to_string(), "thumbnail".to_string()),
3111            ("width".to_string(), "200".to_string()),
3112            ("format".to_string(), "jpeg".to_string()),
3113        ]);
3114        let (_, options, _) =
3115            parse_public_get_request(&query, PublicSourceKind::Path, &config).unwrap();
3116
3117        assert_eq!(options.width, Some(200));
3118        assert_eq!(options.height, Some(150));
3119        assert_eq!(options.format, Some(MediaType::Jpeg));
3120    }
3121
3122    #[test]
3123    fn parse_public_get_request_rejects_unknown_preset() {
3124        let config = ServerConfig::new(temp_dir("preset-unknown"), None);
3125
3126        let query = BTreeMap::from([
3127            ("path".to_string(), "/image.png".to_string()),
3128            ("preset".to_string(), "nonexistent".to_string()),
3129        ]);
3130        let response = parse_public_get_request(&query, PublicSourceKind::Path, &config)
3131            .expect_err("unknown preset should fail");
3132
3133        assert_eq!(response.status, "400 Bad Request");
3134        assert!(response_body(&response).contains("unknown preset"));
3135    }
3136
3137    #[test]
3138    fn sign_public_url_includes_preset_in_signed_url() {
3139        let url = sign_public_url(
3140            "https://cdn.example.com",
3141            SignedUrlSource::Path {
3142                path: "/image.png".to_string(),
3143                version: None,
3144            },
3145            &crate::TransformOptions::default(),
3146            "public-dev",
3147            "secret-value",
3148            4_102_444_800,
3149            None,
3150            Some("thumbnail"),
3151        )
3152        .expect("sign public URL with preset");
3153
3154        assert!(url.contains("preset=thumbnail"));
3155        assert!(url.contains("signature="));
3156    }
3157
3158    #[test]
3159    #[serial]
3160    fn parse_presets_from_env_parses_json() {
3161        unsafe {
3162            env::set_var(
3163                "TRUSS_PRESETS",
3164                r#"{"thumb":{"width":100,"height":100,"fit":"cover"}}"#,
3165            );
3166            env::remove_var("TRUSS_PRESETS_FILE");
3167        }
3168        let presets = parse_presets_from_env().unwrap();
3169        unsafe {
3170            env::remove_var("TRUSS_PRESETS");
3171        }
3172
3173        assert_eq!(presets.len(), 1);
3174        let thumb = presets.get("thumb").unwrap();
3175        assert_eq!(thumb.width, Some(100));
3176        assert_eq!(thumb.height, Some(100));
3177        assert_eq!(thumb.fit.as_deref(), Some("cover"));
3178    }
3179
3180    #[test]
3181    fn prepare_remote_fetch_target_pins_the_validated_netloc() {
3182        let target = prepare_remote_fetch_target(
3183            "http://1.1.1.1/image.png",
3184            &ServerConfig::new(temp_dir("pin"), Some("secret".to_string())),
3185        )
3186        .expect("prepare remote target");
3187
3188        assert_eq!(target.netloc, "1.1.1.1:80");
3189        assert_eq!(target.addrs, vec![SocketAddr::from(([1, 1, 1, 1], 80))]);
3190    }
3191
3192    #[test]
3193    fn pinned_resolver_rejects_unexpected_netlocs() {
3194        use ureq::unversioned::resolver::Resolver;
3195
3196        let resolver = PinnedResolver {
3197            expected_netloc: "example.com:443".to_string(),
3198            addrs: vec![SocketAddr::from(([93, 184, 216, 34], 443))],
3199        };
3200
3201        let config = ureq::config::Config::builder().build();
3202        let timeout = ureq::unversioned::transport::NextTimeout {
3203            after: ureq::unversioned::transport::time::Duration::Exact(
3204                std::time::Duration::from_secs(30),
3205            ),
3206            reason: ureq::Timeout::Resolve,
3207        };
3208
3209        let uri: ureq::http::Uri = "https://example.com/path".parse().unwrap();
3210        let result = resolver
3211            .resolve(&uri, &config, timeout)
3212            .expect("resolve expected netloc");
3213        assert_eq!(&result[..], &[SocketAddr::from(([93, 184, 216, 34], 443))]);
3214
3215        let bad_uri: ureq::http::Uri = "https://proxy.example:8080/path".parse().unwrap();
3216        let timeout2 = ureq::unversioned::transport::NextTimeout {
3217            after: ureq::unversioned::transport::time::Duration::Exact(
3218                std::time::Duration::from_secs(30),
3219            ),
3220            reason: ureq::Timeout::Resolve,
3221        };
3222        let error = resolver
3223            .resolve(&bad_uri, &config, timeout2)
3224            .expect_err("unexpected netloc should fail");
3225        assert!(matches!(error, ureq::Error::HostNotFound));
3226    }
3227
3228    #[test]
3229    fn health_live_returns_status_service_version() {
3230        let request = HttpRequest {
3231            method: "GET".to_string(),
3232            target: "/health/live".to_string(),
3233            version: "HTTP/1.1".to_string(),
3234            headers: Vec::new(),
3235            body: Vec::new(),
3236        };
3237
3238        let response = route_request(request, &ServerConfig::new(temp_dir("live"), None));
3239
3240        assert_eq!(response.status, "200 OK");
3241        let body: serde_json::Value =
3242            serde_json::from_slice(&response.body).expect("parse live body");
3243        assert_eq!(body["status"], "ok");
3244        assert_eq!(body["service"], "truss");
3245        assert_eq!(body["version"], env!("CARGO_PKG_VERSION"));
3246    }
3247
3248    #[test]
3249    fn health_ready_returns_ok_when_storage_exists() {
3250        let storage = temp_dir("ready-ok");
3251        let request = HttpRequest {
3252            method: "GET".to_string(),
3253            target: "/health/ready".to_string(),
3254            version: "HTTP/1.1".to_string(),
3255            headers: Vec::new(),
3256            body: Vec::new(),
3257        };
3258
3259        let response = route_request(request, &ServerConfig::new(storage, None));
3260
3261        assert_eq!(response.status, "200 OK");
3262        let body: serde_json::Value =
3263            serde_json::from_slice(&response.body).expect("parse ready body");
3264        assert_eq!(body["status"], "ok");
3265        let checks = body["checks"].as_array().expect("checks array");
3266        assert!(
3267            checks
3268                .iter()
3269                .any(|c| c["name"] == "storageRoot" && c["status"] == "ok")
3270        );
3271    }
3272
3273    #[test]
3274    fn health_ready_returns_503_when_storage_missing() {
3275        let request = HttpRequest {
3276            method: "GET".to_string(),
3277            target: "/health/ready".to_string(),
3278            version: "HTTP/1.1".to_string(),
3279            headers: Vec::new(),
3280            body: Vec::new(),
3281        };
3282
3283        let config = ServerConfig::new(PathBuf::from("/nonexistent-truss-test-dir"), None);
3284        let response = route_request(request, &config);
3285
3286        assert_eq!(response.status, "503 Service Unavailable");
3287        let body: serde_json::Value =
3288            serde_json::from_slice(&response.body).expect("parse ready fail body");
3289        assert_eq!(body["status"], "fail");
3290        let checks = body["checks"].as_array().expect("checks array");
3291        assert!(
3292            checks
3293                .iter()
3294                .any(|c| c["name"] == "storageRoot" && c["status"] == "fail")
3295        );
3296    }
3297
3298    #[test]
3299    fn health_ready_returns_503_when_cache_root_missing() {
3300        let storage = temp_dir("ready-cache-fail");
3301        let mut config = ServerConfig::new(storage, None);
3302        config.cache_root = Some(PathBuf::from("/nonexistent-truss-cache-dir"));
3303
3304        let request = HttpRequest {
3305            method: "GET".to_string(),
3306            target: "/health/ready".to_string(),
3307            version: "HTTP/1.1".to_string(),
3308            headers: Vec::new(),
3309            body: Vec::new(),
3310        };
3311
3312        let response = route_request(request, &config);
3313
3314        assert_eq!(response.status, "503 Service Unavailable");
3315        let body: serde_json::Value =
3316            serde_json::from_slice(&response.body).expect("parse ready cache body");
3317        assert_eq!(body["status"], "fail");
3318        let checks = body["checks"].as_array().expect("checks array");
3319        assert!(
3320            checks
3321                .iter()
3322                .any(|c| c["name"] == "cacheRoot" && c["status"] == "fail")
3323        );
3324    }
3325
3326    #[test]
3327    fn health_returns_comprehensive_diagnostic() {
3328        let storage = temp_dir("health-diag");
3329        let request = HttpRequest {
3330            method: "GET".to_string(),
3331            target: "/health".to_string(),
3332            version: "HTTP/1.1".to_string(),
3333            headers: Vec::new(),
3334            body: Vec::new(),
3335        };
3336
3337        let response = route_request(request, &ServerConfig::new(storage, None));
3338
3339        assert_eq!(response.status, "200 OK");
3340        let body: serde_json::Value =
3341            serde_json::from_slice(&response.body).expect("parse health body");
3342        assert_eq!(body["status"], "ok");
3343        assert_eq!(body["service"], "truss");
3344        assert_eq!(body["version"], env!("CARGO_PKG_VERSION"));
3345        assert!(body["uptimeSeconds"].is_u64());
3346        assert!(body["checks"].is_array());
3347    }
3348
3349    #[test]
3350    fn unknown_path_returns_not_found() {
3351        let request = HttpRequest {
3352            method: "GET".to_string(),
3353            target: "/unknown".to_string(),
3354            version: "HTTP/1.1".to_string(),
3355            headers: Vec::new(),
3356            body: Vec::new(),
3357        };
3358
3359        let response = route_request(request, &ServerConfig::new(temp_dir("not-found"), None));
3360
3361        assert_eq!(response.status, "404 Not Found");
3362        assert_eq!(response.content_type, Some("application/problem+json"));
3363        let body = response_body(&response);
3364        assert!(body.contains("\"type\":\"about:blank\""));
3365        assert!(body.contains("\"title\":\"Not Found\""));
3366        assert!(body.contains("\"status\":404"));
3367        assert!(body.contains("not found"));
3368    }
3369
3370    #[test]
3371    fn transform_endpoint_requires_authentication() {
3372        let storage_root = temp_dir("auth");
3373        write_png(&storage_root.join("image.png"));
3374        let mut request = transform_request("/image.png");
3375        request.headers.retain(|(name, _)| name != "authorization");
3376
3377        let response = route_request(
3378            request,
3379            &ServerConfig::new(storage_root, Some("secret".to_string())),
3380        );
3381
3382        assert_eq!(response.status, "401 Unauthorized");
3383        assert!(response_body(&response).contains("authorization required"));
3384    }
3385
3386    #[test]
3387    fn transform_endpoint_returns_service_unavailable_without_configured_token() {
3388        let storage_root = temp_dir("token");
3389        write_png(&storage_root.join("image.png"));
3390
3391        let response = route_request(
3392            transform_request("/image.png"),
3393            &ServerConfig::new(storage_root, None),
3394        );
3395
3396        assert_eq!(response.status, "503 Service Unavailable");
3397        assert!(response_body(&response).contains("bearer token is not configured"));
3398    }
3399
3400    #[test]
3401    fn transform_endpoint_transforms_a_path_source() {
3402        let storage_root = temp_dir("transform");
3403        write_png(&storage_root.join("image.png"));
3404
3405        let response = route_request(
3406            transform_request("/image.png"),
3407            &ServerConfig::new(storage_root, Some("secret".to_string())),
3408        );
3409
3410        assert_eq!(response.status, "200 OK");
3411        assert_eq!(response.content_type, Some("image/jpeg"));
3412
3413        let artifact = sniff_artifact(RawArtifact::new(response.body, None)).expect("sniff output");
3414        assert_eq!(artifact.media_type, MediaType::Jpeg);
3415        assert_eq!(artifact.metadata.width, Some(4));
3416        assert_eq!(artifact.metadata.height, Some(3));
3417    }
3418
3419    #[test]
3420    fn transform_endpoint_rejects_private_url_sources_by_default() {
3421        let response = route_request(
3422            transform_url_request("http://127.0.0.1:8080/image.png"),
3423            &ServerConfig::new(temp_dir("url-blocked"), Some("secret".to_string())),
3424        );
3425
3426        assert_eq!(response.status, "403 Forbidden");
3427        assert!(response_body(&response).contains("port is not allowed"));
3428    }
3429
3430    #[test]
3431    fn transform_endpoint_transforms_a_url_source_when_insecure_allowance_is_enabled() {
3432        let (url, handle) = spawn_http_server(vec![(
3433            "200 OK".to_string(),
3434            vec![("Content-Type".to_string(), "image/png".to_string())],
3435            png_bytes(),
3436        )]);
3437
3438        let response = route_request(
3439            transform_url_request(&url),
3440            &ServerConfig::new(temp_dir("url"), Some("secret".to_string()))
3441                .with_insecure_url_sources(true),
3442        );
3443
3444        handle.join().expect("join fixture server");
3445
3446        assert_eq!(response.status, "200 OK");
3447        assert_eq!(response.content_type, Some("image/jpeg"));
3448
3449        let artifact = sniff_artifact(RawArtifact::new(response.body, None)).expect("sniff output");
3450        assert_eq!(artifact.media_type, MediaType::Jpeg);
3451    }
3452
3453    #[test]
3454    fn transform_endpoint_follows_remote_redirects() {
3455        let (redirect_url, handle) = spawn_http_server(vec![
3456            (
3457                "302 Found".to_string(),
3458                vec![("Location".to_string(), "/final-image".to_string())],
3459                Vec::new(),
3460            ),
3461            (
3462                "200 OK".to_string(),
3463                vec![("Content-Type".to_string(), "image/png".to_string())],
3464                png_bytes(),
3465            ),
3466        ]);
3467
3468        let response = route_request(
3469            transform_url_request(&redirect_url),
3470            &ServerConfig::new(temp_dir("redirect"), Some("secret".to_string()))
3471                .with_insecure_url_sources(true),
3472        );
3473
3474        handle.join().expect("join fixture server");
3475
3476        assert_eq!(response.status, "200 OK");
3477        let artifact = sniff_artifact(RawArtifact::new(response.body, None)).expect("sniff output");
3478        assert_eq!(artifact.media_type, MediaType::Jpeg);
3479    }
3480
3481    #[test]
3482    fn upload_endpoint_transforms_uploaded_file() {
3483        let response = route_request(
3484            upload_request(&png_bytes(), Some(r#"{"format":"jpeg"}"#)),
3485            &ServerConfig::new(temp_dir("upload"), Some("secret".to_string())),
3486        );
3487
3488        assert_eq!(response.status, "200 OK");
3489        assert_eq!(response.content_type, Some("image/jpeg"));
3490
3491        let artifact = sniff_artifact(RawArtifact::new(response.body, None)).expect("sniff output");
3492        assert_eq!(artifact.media_type, MediaType::Jpeg);
3493    }
3494
3495    #[test]
3496    fn upload_endpoint_requires_a_file_field() {
3497        let boundary = "truss-test-boundary";
3498        let request = HttpRequest {
3499            method: "POST".to_string(),
3500            target: "/images".to_string(),
3501            version: "HTTP/1.1".to_string(),
3502            headers: vec![
3503                ("authorization".to_string(), "Bearer secret".to_string()),
3504                (
3505                    "content-type".to_string(),
3506                    format!("multipart/form-data; boundary={boundary}"),
3507                ),
3508            ],
3509            body: format!(
3510                "--{boundary}\r\nContent-Disposition: form-data; name=\"options\"\r\nContent-Type: application/json\r\n\r\n{{\"format\":\"jpeg\"}}\r\n--{boundary}--\r\n"
3511            )
3512            .into_bytes(),
3513        };
3514
3515        let response = route_request(
3516            request,
3517            &ServerConfig::new(temp_dir("upload-missing-file"), Some("secret".to_string())),
3518        );
3519
3520        assert_eq!(response.status, "400 Bad Request");
3521        assert!(response_body(&response).contains("requires a `file` field"));
3522    }
3523
3524    #[test]
3525    fn upload_endpoint_rejects_non_multipart_content_type() {
3526        let request = HttpRequest {
3527            method: "POST".to_string(),
3528            target: "/images".to_string(),
3529            version: "HTTP/1.1".to_string(),
3530            headers: vec![
3531                ("authorization".to_string(), "Bearer secret".to_string()),
3532                ("content-type".to_string(), "application/json".to_string()),
3533            ],
3534            body: br#"{"file":"not-really-json"}"#.to_vec(),
3535        };
3536
3537        let response = route_request(
3538            request,
3539            &ServerConfig::new(temp_dir("upload-content-type"), Some("secret".to_string())),
3540        );
3541
3542        assert_eq!(response.status, "415 Unsupported Media Type");
3543        assert!(response_body(&response).contains("multipart/form-data"));
3544    }
3545
3546    #[test]
3547    fn parse_upload_request_extracts_file_and_options() {
3548        let request = upload_request(&png_bytes(), Some(r#"{"width":8,"format":"jpeg"}"#));
3549        let boundary =
3550            super::multipart::parse_multipart_boundary(&request).expect("parse boundary");
3551        let (file_bytes, options, _watermark) =
3552            super::multipart::parse_upload_request(&request.body, &boundary)
3553                .expect("parse upload body");
3554
3555        assert_eq!(file_bytes, png_bytes());
3556        assert_eq!(options.width, Some(8));
3557        assert_eq!(options.format, Some(MediaType::Jpeg));
3558    }
3559
3560    #[test]
3561    fn metrics_endpoint_does_not_require_authentication() {
3562        let response = route_request(
3563            metrics_request(false),
3564            &ServerConfig::new(temp_dir("metrics-no-auth"), Some("secret".to_string())),
3565        );
3566
3567        assert_eq!(response.status, "200 OK");
3568    }
3569
3570    #[test]
3571    fn metrics_endpoint_returns_prometheus_text() {
3572        super::metrics::record_http_metrics(super::metrics::RouteMetric::Health, "200 OK");
3573        let response = route_request(
3574            metrics_request(true),
3575            &ServerConfig::new(temp_dir("metrics"), Some("secret".to_string())),
3576        );
3577        let body = response_body(&response);
3578
3579        assert_eq!(response.status, "200 OK");
3580        assert_eq!(
3581            response.content_type,
3582            Some("text/plain; version=0.0.4; charset=utf-8")
3583        );
3584        assert!(body.contains("truss_http_requests_total"));
3585        assert!(body.contains("truss_http_requests_by_route_total{route=\"/health\"}"));
3586        assert!(body.contains("truss_http_responses_total{status=\"200\"}"));
3587        // Histogram metrics
3588        assert!(body.contains("# TYPE truss_http_request_duration_seconds histogram"));
3589        assert!(
3590            body.contains(
3591                "truss_http_request_duration_seconds_bucket{route=\"/health\",le=\"+Inf\"}"
3592            )
3593        );
3594        assert!(body.contains("# TYPE truss_transform_duration_seconds histogram"));
3595        assert!(body.contains("# TYPE truss_storage_request_duration_seconds histogram"));
3596        // Transform error counter
3597        assert!(body.contains("# TYPE truss_transform_errors_total counter"));
3598        assert!(body.contains("truss_transform_errors_total{error_type=\"decode_failed\"}"));
3599    }
3600
3601    #[test]
3602    fn metrics_endpoint_returns_401_when_token_required() {
3603        let mut config = ServerConfig::new(temp_dir("metrics-auth"), None);
3604        config.metrics_token = Some("my-secret-token".to_string());
3605
3606        // No auth header → 401
3607        let response = route_request(metrics_request(false), &config);
3608        assert_eq!(response.status, "401 Unauthorized");
3609    }
3610
3611    #[test]
3612    fn metrics_endpoint_accepts_valid_token() {
3613        let mut config = ServerConfig::new(temp_dir("metrics-auth-ok"), None);
3614        config.metrics_token = Some("secret".to_string());
3615
3616        // Bearer secret matches
3617        let response = route_request(metrics_request(true), &config);
3618        assert_eq!(response.status, "200 OK");
3619    }
3620
3621    #[test]
3622    fn metrics_endpoint_rejects_wrong_token() {
3623        let mut config = ServerConfig::new(temp_dir("metrics-auth-bad"), None);
3624        config.metrics_token = Some("correct-token".to_string());
3625
3626        // Bearer secret ≠ correct-token
3627        let response = route_request(metrics_request(true), &config);
3628        assert_eq!(response.status, "401 Unauthorized");
3629    }
3630
3631    #[test]
3632    fn metrics_endpoint_returns_404_when_disabled() {
3633        let mut config = ServerConfig::new(temp_dir("metrics-disabled"), None);
3634        config.disable_metrics = true;
3635
3636        let response = route_request(metrics_request(false), &config);
3637        assert_eq!(response.status, "404 Not Found");
3638    }
3639
3640    #[test]
3641    fn transform_endpoint_rejects_unsupported_remote_content_encoding() {
3642        let (url, handle) = spawn_http_server(vec![(
3643            "200 OK".to_string(),
3644            vec![
3645                ("Content-Type".to_string(), "image/png".to_string()),
3646                ("Content-Encoding".to_string(), "compress".to_string()),
3647            ],
3648            png_bytes(),
3649        )]);
3650
3651        let response = route_request(
3652            transform_url_request(&url),
3653            &ServerConfig::new(temp_dir("encoding"), Some("secret".to_string()))
3654                .with_insecure_url_sources(true),
3655        );
3656
3657        handle.join().expect("join fixture server");
3658
3659        assert_eq!(response.status, "502 Bad Gateway");
3660        assert!(response_body(&response).contains("unsupported content-encoding"));
3661    }
3662
3663    #[test]
3664    fn resolve_storage_path_rejects_parent_segments() {
3665        let storage_root = temp_dir("resolve");
3666        let response = resolve_storage_path(&storage_root, "../escape.png")
3667            .expect_err("parent segments should be rejected");
3668
3669        assert_eq!(response.status, "400 Bad Request");
3670        assert!(response_body(&response).contains("must not contain root"));
3671    }
3672
3673    #[test]
3674    fn read_request_parses_headers_and_body() {
3675        let request_bytes = b"POST /images:transform HTTP/1.1\r\nHost: localhost\r\nContent-Type: application/json\r\nContent-Length: 2\r\n\r\n{}";
3676        let mut cursor = Cursor::new(request_bytes);
3677        let request = read_request(&mut cursor).expect("parse request");
3678
3679        assert_eq!(request.method, "POST");
3680        assert_eq!(request.target, "/images:transform");
3681        assert_eq!(request.version, "HTTP/1.1");
3682        assert_eq!(request.header("host"), Some("localhost"));
3683        assert_eq!(request.body, b"{}");
3684    }
3685
3686    #[test]
3687    fn read_request_rejects_duplicate_content_length() {
3688        let request_bytes =
3689            b"POST /images:transform HTTP/1.1\r\nContent-Length: 2\r\nContent-Length: 2\r\n\r\n{}";
3690        let mut cursor = Cursor::new(request_bytes);
3691        let response = read_request(&mut cursor).expect_err("duplicate headers should fail");
3692
3693        assert_eq!(response.status, "400 Bad Request");
3694        assert!(response_body(&response).contains("content-length"));
3695    }
3696
3697    #[test]
3698    fn serve_once_handles_a_tcp_request() {
3699        let storage_root = temp_dir("serve-once");
3700        let config = ServerConfig::new(storage_root, None);
3701        let listener = TcpListener::bind("127.0.0.1:0").expect("bind test listener");
3702        let addr = listener.local_addr().expect("read local addr");
3703
3704        let server = thread::spawn(move || serve_once_with_config(listener, config));
3705
3706        let mut stream = TcpStream::connect(addr).expect("connect to test server");
3707        stream
3708            .write_all(b"GET /health/live HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n")
3709            .expect("write request");
3710
3711        let mut response = String::new();
3712        stream.read_to_string(&mut response).expect("read response");
3713
3714        server
3715            .join()
3716            .expect("join test server thread")
3717            .expect("serve one request");
3718
3719        assert!(response.starts_with("HTTP/1.1 200 OK"));
3720        assert!(response.contains("Content-Type: application/json"));
3721        assert!(response.contains("\"status\":\"ok\""));
3722        assert!(response.contains("\"service\":\"truss\""));
3723        assert!(response.contains("\"version\":"));
3724    }
3725
3726    #[test]
3727    fn helper_error_responses_use_rfc7807_problem_details() {
3728        let response = auth_required_response("authorization required");
3729        let bad_request = bad_request_response("bad input");
3730
3731        assert_eq!(
3732            response.content_type,
3733            Some("application/problem+json"),
3734            "error responses must use application/problem+json"
3735        );
3736        assert_eq!(bad_request.content_type, Some("application/problem+json"),);
3737
3738        let auth_body = response_body(&response);
3739        assert!(auth_body.contains("authorization required"));
3740        assert!(auth_body.contains("\"type\":\"about:blank\""));
3741        assert!(auth_body.contains("\"title\":\"Unauthorized\""));
3742        assert!(auth_body.contains("\"status\":401"));
3743
3744        let bad_body = response_body(&bad_request);
3745        assert!(bad_body.contains("bad input"));
3746        assert!(bad_body.contains("\"type\":\"about:blank\""));
3747        assert!(bad_body.contains("\"title\":\"Bad Request\""));
3748        assert!(bad_body.contains("\"status\":400"));
3749    }
3750
3751    #[test]
3752    fn parse_headers_rejects_duplicate_host() {
3753        let lines = "Host: example.com\r\nHost: evil.com\r\n";
3754        let result = super::http_parse::parse_headers(lines.split("\r\n"));
3755        assert!(result.is_err());
3756    }
3757
3758    #[test]
3759    fn parse_headers_rejects_duplicate_authorization() {
3760        let lines = "Authorization: Bearer a\r\nAuthorization: Bearer b\r\n";
3761        let result = super::http_parse::parse_headers(lines.split("\r\n"));
3762        assert!(result.is_err());
3763    }
3764
3765    #[test]
3766    fn parse_headers_rejects_duplicate_content_type() {
3767        let lines = "Content-Type: application/json\r\nContent-Type: text/plain\r\n";
3768        let result = super::http_parse::parse_headers(lines.split("\r\n"));
3769        assert!(result.is_err());
3770    }
3771
3772    #[test]
3773    fn parse_headers_rejects_duplicate_transfer_encoding() {
3774        let lines = "Transfer-Encoding: chunked\r\nTransfer-Encoding: gzip\r\n";
3775        let result = super::http_parse::parse_headers(lines.split("\r\n"));
3776        assert!(result.is_err());
3777    }
3778
3779    #[test]
3780    fn parse_headers_rejects_single_transfer_encoding() {
3781        let lines = "Host: example.com\r\nTransfer-Encoding: chunked\r\n";
3782        let result = super::http_parse::parse_headers(lines.split("\r\n"));
3783        let err = result.unwrap_err();
3784        assert!(
3785            err.status.starts_with("501"),
3786            "expected 501 status, got: {}",
3787            err.status
3788        );
3789        assert!(
3790            String::from_utf8_lossy(&err.body).contains("Transfer-Encoding"),
3791            "error response should mention Transfer-Encoding"
3792        );
3793    }
3794
3795    #[test]
3796    fn parse_headers_rejects_transfer_encoding_identity() {
3797        let lines = "Transfer-Encoding: identity\r\n";
3798        let result = super::http_parse::parse_headers(lines.split("\r\n"));
3799        assert!(result.is_err());
3800    }
3801
3802    #[test]
3803    fn parse_headers_allows_single_instances_of_singleton_headers() {
3804        let lines =
3805            "Host: example.com\r\nAuthorization: Bearer tok\r\nContent-Type: application/json\r\n";
3806        let result = super::http_parse::parse_headers(lines.split("\r\n"));
3807        assert!(result.is_ok());
3808        assert_eq!(result.unwrap().len(), 3);
3809    }
3810
3811    #[test]
3812    fn max_body_for_multipart_uses_upload_limit() {
3813        let headers = vec![(
3814            "content-type".to_string(),
3815            "multipart/form-data; boundary=abc".to_string(),
3816        )];
3817        assert_eq!(
3818            super::http_parse::max_body_for_headers(
3819                &headers,
3820                super::http_parse::DEFAULT_MAX_UPLOAD_BODY_BYTES
3821            ),
3822            super::http_parse::DEFAULT_MAX_UPLOAD_BODY_BYTES
3823        );
3824    }
3825
3826    #[test]
3827    fn max_body_for_json_uses_default_limit() {
3828        let headers = vec![("content-type".to_string(), "application/json".to_string())];
3829        assert_eq!(
3830            super::http_parse::max_body_for_headers(
3831                &headers,
3832                super::http_parse::DEFAULT_MAX_UPLOAD_BODY_BYTES
3833            ),
3834            super::http_parse::MAX_REQUEST_BODY_BYTES
3835        );
3836    }
3837
3838    #[test]
3839    fn max_body_for_no_content_type_uses_default_limit() {
3840        let headers: Vec<(String, String)> = vec![];
3841        assert_eq!(
3842            super::http_parse::max_body_for_headers(
3843                &headers,
3844                super::http_parse::DEFAULT_MAX_UPLOAD_BODY_BYTES
3845            ),
3846            super::http_parse::MAX_REQUEST_BODY_BYTES
3847        );
3848    }
3849
3850    fn make_test_config() -> ServerConfig {
3851        ServerConfig::new(std::env::temp_dir(), None)
3852    }
3853
3854    #[test]
3855    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
3856    fn storage_backend_parse_filesystem_aliases() {
3857        assert_eq!(
3858            super::StorageBackend::parse("filesystem").unwrap(),
3859            super::StorageBackend::Filesystem
3860        );
3861        assert_eq!(
3862            super::StorageBackend::parse("fs").unwrap(),
3863            super::StorageBackend::Filesystem
3864        );
3865        assert_eq!(
3866            super::StorageBackend::parse("local").unwrap(),
3867            super::StorageBackend::Filesystem
3868        );
3869    }
3870
3871    #[test]
3872    #[cfg(feature = "s3")]
3873    fn storage_backend_parse_s3() {
3874        assert_eq!(
3875            super::StorageBackend::parse("s3").unwrap(),
3876            super::StorageBackend::S3
3877        );
3878        assert_eq!(
3879            super::StorageBackend::parse("S3").unwrap(),
3880            super::StorageBackend::S3
3881        );
3882    }
3883
3884    #[test]
3885    #[cfg(feature = "gcs")]
3886    fn storage_backend_parse_gcs() {
3887        assert_eq!(
3888            super::StorageBackend::parse("gcs").unwrap(),
3889            super::StorageBackend::Gcs
3890        );
3891        assert_eq!(
3892            super::StorageBackend::parse("GCS").unwrap(),
3893            super::StorageBackend::Gcs
3894        );
3895    }
3896
3897    #[test]
3898    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
3899    fn storage_backend_parse_rejects_unknown() {
3900        assert!(super::StorageBackend::parse("").is_err());
3901        #[cfg(not(feature = "azure"))]
3902        assert!(super::StorageBackend::parse("azure").is_err());
3903        #[cfg(feature = "azure")]
3904        assert!(super::StorageBackend::parse("azure").is_ok());
3905    }
3906
3907    #[test]
3908    fn versioned_source_hash_returns_none_without_version() {
3909        let source = TransformSourcePayload::Path {
3910            path: "/photos/hero.jpg".to_string(),
3911            version: None,
3912        };
3913        assert!(source.versioned_source_hash(&make_test_config()).is_none());
3914    }
3915
3916    #[test]
3917    fn versioned_source_hash_is_deterministic() {
3918        let cfg = make_test_config();
3919        let source = TransformSourcePayload::Path {
3920            path: "/photos/hero.jpg".to_string(),
3921            version: Some("v1".to_string()),
3922        };
3923        let hash1 = source.versioned_source_hash(&cfg).unwrap();
3924        let hash2 = source.versioned_source_hash(&cfg).unwrap();
3925        assert_eq!(hash1, hash2);
3926        assert_eq!(hash1.len(), 64);
3927    }
3928
3929    #[test]
3930    fn versioned_source_hash_differs_by_version() {
3931        let cfg = make_test_config();
3932        let v1 = TransformSourcePayload::Path {
3933            path: "/photos/hero.jpg".to_string(),
3934            version: Some("v1".to_string()),
3935        };
3936        let v2 = TransformSourcePayload::Path {
3937            path: "/photos/hero.jpg".to_string(),
3938            version: Some("v2".to_string()),
3939        };
3940        assert_ne!(
3941            v1.versioned_source_hash(&cfg).unwrap(),
3942            v2.versioned_source_hash(&cfg).unwrap()
3943        );
3944    }
3945
3946    #[test]
3947    fn versioned_source_hash_differs_by_kind() {
3948        let cfg = make_test_config();
3949        let path = TransformSourcePayload::Path {
3950            path: "example.com/image.jpg".to_string(),
3951            version: Some("v1".to_string()),
3952        };
3953        let url = TransformSourcePayload::Url {
3954            url: "example.com/image.jpg".to_string(),
3955            version: Some("v1".to_string()),
3956        };
3957        assert_ne!(
3958            path.versioned_source_hash(&cfg).unwrap(),
3959            url.versioned_source_hash(&cfg).unwrap()
3960        );
3961    }
3962
3963    #[test]
3964    fn versioned_source_hash_differs_by_storage_root() {
3965        let cfg1 = ServerConfig::new(PathBuf::from("/data/images"), None);
3966        let cfg2 = ServerConfig::new(PathBuf::from("/other/images"), None);
3967        let source = TransformSourcePayload::Path {
3968            path: "/photos/hero.jpg".to_string(),
3969            version: Some("v1".to_string()),
3970        };
3971        assert_ne!(
3972            source.versioned_source_hash(&cfg1).unwrap(),
3973            source.versioned_source_hash(&cfg2).unwrap()
3974        );
3975    }
3976
3977    #[test]
3978    fn versioned_source_hash_differs_by_insecure_flag() {
3979        let mut cfg1 = make_test_config();
3980        cfg1.allow_insecure_url_sources = false;
3981        let mut cfg2 = make_test_config();
3982        cfg2.allow_insecure_url_sources = true;
3983        let source = TransformSourcePayload::Url {
3984            url: "http://example.com/img.jpg".to_string(),
3985            version: Some("v1".to_string()),
3986        };
3987        assert_ne!(
3988            source.versioned_source_hash(&cfg1).unwrap(),
3989            source.versioned_source_hash(&cfg2).unwrap()
3990        );
3991    }
3992
3993    #[test]
3994    #[cfg(feature = "s3")]
3995    fn versioned_source_hash_storage_variant_is_deterministic() {
3996        let cfg = make_test_config();
3997        let source = TransformSourcePayload::Storage {
3998            bucket: Some("my-bucket".to_string()),
3999            key: "photos/hero.jpg".to_string(),
4000            version: Some("v1".to_string()),
4001        };
4002        let hash1 = source.versioned_source_hash(&cfg).unwrap();
4003        let hash2 = source.versioned_source_hash(&cfg).unwrap();
4004        assert_eq!(hash1, hash2);
4005        assert_eq!(hash1.len(), 64);
4006    }
4007
4008    #[test]
4009    #[cfg(feature = "s3")]
4010    fn versioned_source_hash_storage_differs_from_path() {
4011        let cfg = make_test_config();
4012        let path_source = TransformSourcePayload::Path {
4013            path: "photos/hero.jpg".to_string(),
4014            version: Some("v1".to_string()),
4015        };
4016        let storage_source = TransformSourcePayload::Storage {
4017            bucket: Some("my-bucket".to_string()),
4018            key: "photos/hero.jpg".to_string(),
4019            version: Some("v1".to_string()),
4020        };
4021        assert_ne!(
4022            path_source.versioned_source_hash(&cfg).unwrap(),
4023            storage_source.versioned_source_hash(&cfg).unwrap()
4024        );
4025    }
4026
4027    #[test]
4028    #[cfg(feature = "s3")]
4029    fn versioned_source_hash_storage_differs_by_bucket() {
4030        let cfg = make_test_config();
4031        let s1 = TransformSourcePayload::Storage {
4032            bucket: Some("bucket-a".to_string()),
4033            key: "image.jpg".to_string(),
4034            version: Some("v1".to_string()),
4035        };
4036        let s2 = TransformSourcePayload::Storage {
4037            bucket: Some("bucket-b".to_string()),
4038            key: "image.jpg".to_string(),
4039            version: Some("v1".to_string()),
4040        };
4041        assert_ne!(
4042            s1.versioned_source_hash(&cfg).unwrap(),
4043            s2.versioned_source_hash(&cfg).unwrap()
4044        );
4045    }
4046
4047    #[test]
4048    #[cfg(feature = "s3")]
4049    fn versioned_source_hash_differs_by_backend() {
4050        let cfg_fs = make_test_config();
4051        let mut cfg_s3 = make_test_config();
4052        cfg_s3.storage_backend = super::StorageBackend::S3;
4053
4054        let source = TransformSourcePayload::Path {
4055            path: "photos/hero.jpg".to_string(),
4056            version: Some("v1".to_string()),
4057        };
4058        assert_ne!(
4059            source.versioned_source_hash(&cfg_fs).unwrap(),
4060            source.versioned_source_hash(&cfg_s3).unwrap()
4061        );
4062    }
4063
4064    #[test]
4065    #[cfg(feature = "s3")]
4066    fn versioned_source_hash_storage_differs_by_endpoint() {
4067        let mut cfg_a = make_test_config();
4068        cfg_a.storage_backend = super::StorageBackend::S3;
4069        cfg_a.s3_context = Some(std::sync::Arc::new(super::s3::S3Context::for_test(
4070            "shared",
4071            Some("http://minio-a:9000"),
4072        )));
4073
4074        let mut cfg_b = make_test_config();
4075        cfg_b.storage_backend = super::StorageBackend::S3;
4076        cfg_b.s3_context = Some(std::sync::Arc::new(super::s3::S3Context::for_test(
4077            "shared",
4078            Some("http://minio-b:9000"),
4079        )));
4080
4081        let source = TransformSourcePayload::Storage {
4082            bucket: None,
4083            key: "image.jpg".to_string(),
4084            version: Some("v1".to_string()),
4085        };
4086        assert_ne!(
4087            source.versioned_source_hash(&cfg_a).unwrap(),
4088            source.versioned_source_hash(&cfg_b).unwrap(),
4089        );
4090        assert_ne!(cfg_a, cfg_b);
4091    }
4092
4093    #[test]
4094    #[cfg(feature = "s3")]
4095    fn storage_backend_default_is_filesystem() {
4096        let cfg = make_test_config();
4097        assert_eq!(cfg.storage_backend, super::StorageBackend::Filesystem);
4098        assert!(cfg.s3_context.is_none());
4099    }
4100
4101    #[test]
4102    #[cfg(feature = "s3")]
4103    fn storage_payload_deserializes_storage_variant() {
4104        let json = r#"{"source":{"kind":"storage","key":"photos/hero.jpg"},"options":{}}"#;
4105        let payload: super::TransformImageRequestPayload = serde_json::from_str(json).unwrap();
4106        match payload.source {
4107            TransformSourcePayload::Storage {
4108                bucket,
4109                key,
4110                version,
4111            } => {
4112                assert!(bucket.is_none());
4113                assert_eq!(key, "photos/hero.jpg");
4114                assert!(version.is_none());
4115            }
4116            _ => panic!("expected Storage variant"),
4117        }
4118    }
4119
4120    #[test]
4121    #[cfg(feature = "s3")]
4122    fn storage_payload_deserializes_with_bucket() {
4123        let json = r#"{"source":{"kind":"storage","bucket":"my-bucket","key":"img.png","version":"v2"},"options":{}}"#;
4124        let payload: super::TransformImageRequestPayload = serde_json::from_str(json).unwrap();
4125        match payload.source {
4126            TransformSourcePayload::Storage {
4127                bucket,
4128                key,
4129                version,
4130            } => {
4131                assert_eq!(bucket.as_deref(), Some("my-bucket"));
4132                assert_eq!(key, "img.png");
4133                assert_eq!(version.as_deref(), Some("v2"));
4134            }
4135            _ => panic!("expected Storage variant"),
4136        }
4137    }
4138
4139    // -----------------------------------------------------------------------
4140    // S3: default_bucket fallback with bucket: None
4141    // -----------------------------------------------------------------------
4142
4143    #[test]
4144    #[cfg(feature = "s3")]
4145    fn versioned_source_hash_uses_default_bucket_when_bucket_is_none() {
4146        let mut cfg_a = make_test_config();
4147        cfg_a.storage_backend = super::StorageBackend::S3;
4148        cfg_a.s3_context = Some(std::sync::Arc::new(super::s3::S3Context::for_test(
4149            "bucket-a", None,
4150        )));
4151
4152        let mut cfg_b = make_test_config();
4153        cfg_b.storage_backend = super::StorageBackend::S3;
4154        cfg_b.s3_context = Some(std::sync::Arc::new(super::s3::S3Context::for_test(
4155            "bucket-b", None,
4156        )));
4157
4158        let source = TransformSourcePayload::Storage {
4159            bucket: None,
4160            key: "image.jpg".to_string(),
4161            version: Some("v1".to_string()),
4162        };
4163        // Different default_bucket ⇒ different hash
4164        assert_ne!(
4165            source.versioned_source_hash(&cfg_a).unwrap(),
4166            source.versioned_source_hash(&cfg_b).unwrap(),
4167        );
4168        // PartialEq also distinguishes them
4169        assert_ne!(cfg_a, cfg_b);
4170    }
4171
4172    #[test]
4173    #[cfg(feature = "s3")]
4174    fn versioned_source_hash_returns_none_without_bucket_or_context() {
4175        let mut cfg = make_test_config();
4176        cfg.storage_backend = super::StorageBackend::S3;
4177        cfg.s3_context = None;
4178
4179        let source = TransformSourcePayload::Storage {
4180            bucket: None,
4181            key: "image.jpg".to_string(),
4182            version: Some("v1".to_string()),
4183        };
4184        // No bucket available ⇒ None (falls back to content-hash)
4185        assert!(source.versioned_source_hash(&cfg).is_none());
4186    }
4187
4188    // -----------------------------------------------------------------------
4189    // S3: from_env branches
4190    //
4191    // These tests mutate process-global environment variables. A mutex
4192    // serializes them so that parallel test threads cannot interfere, and
4193    // each test saves/restores the variables it touches.
4194    // -----------------------------------------------------------------------
4195
4196    #[cfg(feature = "s3")]
4197    static FROM_ENV_MUTEX: std::sync::Mutex<()> = std::sync::Mutex::new(());
4198
4199    #[cfg(feature = "s3")]
4200    const S3_ENV_VARS: &[&str] = &[
4201        "TRUSS_STORAGE_ROOT",
4202        "TRUSS_STORAGE_BACKEND",
4203        "TRUSS_S3_BUCKET",
4204    ];
4205
4206    /// Save current values, run `f`, then restore originals regardless of
4207    /// panics. Holds `FROM_ENV_MUTEX` for the duration.
4208    #[cfg(feature = "s3")]
4209    fn with_s3_env<F: FnOnce()>(vars: &[(&str, Option<&str>)], f: F) {
4210        let _guard = FROM_ENV_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
4211        let saved: Vec<(&str, Option<String>)> = S3_ENV_VARS
4212            .iter()
4213            .map(|k| (*k, std::env::var(k).ok()))
4214            .collect();
4215        // Apply requested overrides
4216        for &(key, value) in vars {
4217            match value {
4218                Some(v) => unsafe { std::env::set_var(key, v) },
4219                None => unsafe { std::env::remove_var(key) },
4220            }
4221        }
4222        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(f));
4223        // Restore originals
4224        for (key, original) in saved {
4225            match original {
4226                Some(v) => unsafe { std::env::set_var(key, v) },
4227                None => unsafe { std::env::remove_var(key) },
4228            }
4229        }
4230        if let Err(payload) = result {
4231            std::panic::resume_unwind(payload);
4232        }
4233    }
4234
4235    #[test]
4236    #[cfg(feature = "s3")]
4237    fn from_env_rejects_invalid_storage_backend() {
4238        let storage = temp_dir("env-bad-backend");
4239        let storage_str = storage.to_str().unwrap().to_string();
4240        with_s3_env(
4241            &[
4242                ("TRUSS_STORAGE_ROOT", Some(&storage_str)),
4243                ("TRUSS_STORAGE_BACKEND", Some("nosuchbackend")),
4244                ("TRUSS_S3_BUCKET", None),
4245            ],
4246            || {
4247                let result = ServerConfig::from_env();
4248                assert!(result.is_err());
4249                let msg = result.unwrap_err().to_string();
4250                assert!(msg.contains("unknown storage backend"), "got: {msg}");
4251            },
4252        );
4253        let _ = std::fs::remove_dir_all(storage);
4254    }
4255
4256    #[test]
4257    #[cfg(feature = "s3")]
4258    fn from_env_rejects_s3_without_bucket() {
4259        let storage = temp_dir("env-no-bucket");
4260        let storage_str = storage.to_str().unwrap().to_string();
4261        with_s3_env(
4262            &[
4263                ("TRUSS_STORAGE_ROOT", Some(&storage_str)),
4264                ("TRUSS_STORAGE_BACKEND", Some("s3")),
4265                ("TRUSS_S3_BUCKET", None),
4266            ],
4267            || {
4268                let result = ServerConfig::from_env();
4269                assert!(result.is_err());
4270                let msg = result.unwrap_err().to_string();
4271                assert!(msg.contains("TRUSS_S3_BUCKET"), "got: {msg}");
4272            },
4273        );
4274        let _ = std::fs::remove_dir_all(storage);
4275    }
4276
4277    #[test]
4278    #[cfg(feature = "s3")]
4279    fn from_env_accepts_s3_with_bucket() {
4280        let storage = temp_dir("env-s3-ok");
4281        let storage_str = storage.to_str().unwrap().to_string();
4282        with_s3_env(
4283            &[
4284                ("TRUSS_STORAGE_ROOT", Some(&storage_str)),
4285                ("TRUSS_STORAGE_BACKEND", Some("s3")),
4286                ("TRUSS_S3_BUCKET", Some("my-images")),
4287            ],
4288            || {
4289                let cfg =
4290                    ServerConfig::from_env().expect("from_env should succeed with s3 + bucket");
4291                assert_eq!(cfg.storage_backend, super::StorageBackend::S3);
4292                let ctx = cfg.s3_context.expect("s3_context should be Some");
4293                assert_eq!(ctx.default_bucket, "my-images");
4294            },
4295        );
4296        let _ = std::fs::remove_dir_all(storage);
4297    }
4298
4299    // -----------------------------------------------------------------------
4300    // S3: health endpoint
4301    // -----------------------------------------------------------------------
4302
4303    #[test]
4304    #[cfg(feature = "s3")]
4305    fn health_ready_s3_returns_503_when_context_missing() {
4306        let storage = temp_dir("health-s3-no-ctx");
4307        let mut config = ServerConfig::new(storage.clone(), None);
4308        config.storage_backend = super::StorageBackend::S3;
4309        config.s3_context = None;
4310
4311        let request = super::http_parse::HttpRequest {
4312            method: "GET".to_string(),
4313            target: "/health/ready".to_string(),
4314            version: "HTTP/1.1".to_string(),
4315            headers: Vec::new(),
4316            body: Vec::new(),
4317        };
4318        let response = route_request(request, &config);
4319        let _ = std::fs::remove_dir_all(storage);
4320
4321        assert_eq!(response.status, "503 Service Unavailable");
4322        let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
4323        let checks = body["checks"].as_array().expect("checks array");
4324        assert!(
4325            checks
4326                .iter()
4327                .any(|c| c["name"] == "storageBackend" && c["status"] == "fail"),
4328            "expected s3Client fail check in {body}",
4329        );
4330    }
4331
4332    #[test]
4333    #[cfg(feature = "s3")]
4334    fn health_ready_s3_includes_s3_client_check() {
4335        let storage = temp_dir("health-s3-ok");
4336        let mut config = ServerConfig::new(storage.clone(), None);
4337        config.storage_backend = super::StorageBackend::S3;
4338        config.s3_context = Some(std::sync::Arc::new(super::s3::S3Context::for_test(
4339            "test-bucket",
4340            None,
4341        )));
4342
4343        let request = super::http_parse::HttpRequest {
4344            method: "GET".to_string(),
4345            target: "/health/ready".to_string(),
4346            version: "HTTP/1.1".to_string(),
4347            headers: Vec::new(),
4348            body: Vec::new(),
4349        };
4350        let response = route_request(request, &config);
4351        let _ = std::fs::remove_dir_all(storage);
4352
4353        // The s3Client check will report "fail" because there is no real S3
4354        // endpoint, but the important thing is that the check is present.
4355        let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
4356        let checks = body["checks"].as_array().expect("checks array");
4357        assert!(
4358            checks.iter().any(|c| c["name"] == "storageBackend"),
4359            "expected s3Client check in {body}",
4360        );
4361    }
4362
4363    // -----------------------------------------------------------------------
4364    // S3: public by-path remap (leading slash trimmed, Storage variant used)
4365    // -----------------------------------------------------------------------
4366
4367    /// Replicates the Path→Storage remap that `handle_public_get_request`
4368    /// performs when `storage_backend == S3`, so we can inspect the resulting
4369    /// key without issuing a real S3 request.
4370    #[cfg(feature = "s3")]
4371    fn remap_path_to_storage(path: &str, version: Option<&str>) -> TransformSourcePayload {
4372        let source = TransformSourcePayload::Path {
4373            path: path.to_string(),
4374            version: version.map(|v| v.to_string()),
4375        };
4376        match source {
4377            TransformSourcePayload::Path { path, version } => TransformSourcePayload::Storage {
4378                bucket: None,
4379                key: path.trim_start_matches('/').to_string(),
4380                version,
4381            },
4382            other => other,
4383        }
4384    }
4385
4386    #[test]
4387    #[cfg(feature = "s3")]
4388    fn public_by_path_s3_remap_trims_leading_slash() {
4389        // Paths with a leading slash (the common case from signed URLs like
4390        // `path=/image.png`) must have the slash stripped so that the S3 key
4391        // is `image.png`, not `/image.png`.
4392        let source = remap_path_to_storage("/photos/hero.jpg", Some("v1"));
4393        match &source {
4394            TransformSourcePayload::Storage { key, .. } => {
4395                assert_eq!(key, "photos/hero.jpg", "leading / must be trimmed");
4396            }
4397            _ => panic!("expected Storage variant after remap"),
4398        }
4399
4400        // Without a leading slash the key must be unchanged.
4401        let source2 = remap_path_to_storage("photos/hero.jpg", Some("v1"));
4402        match &source2 {
4403            TransformSourcePayload::Storage { key, .. } => {
4404                assert_eq!(key, "photos/hero.jpg");
4405            }
4406            _ => panic!("expected Storage variant after remap"),
4407        }
4408
4409        // Both must produce the same versioned hash (same effective key).
4410        let mut cfg = make_test_config();
4411        cfg.storage_backend = super::StorageBackend::S3;
4412        cfg.s3_context = Some(std::sync::Arc::new(super::s3::S3Context::for_test(
4413            "my-bucket",
4414            None,
4415        )));
4416        assert_eq!(
4417            source.versioned_source_hash(&cfg),
4418            source2.versioned_source_hash(&cfg),
4419            "leading-slash and no-leading-slash paths must hash identically after trim",
4420        );
4421    }
4422
4423    #[test]
4424    #[cfg(feature = "s3")]
4425    fn public_by_path_s3_remap_produces_storage_variant() {
4426        // Verify the remap converts Path to Storage with bucket: None.
4427        let source = remap_path_to_storage("/image.png", None);
4428        match source {
4429            TransformSourcePayload::Storage {
4430                bucket,
4431                key,
4432                version,
4433            } => {
4434                assert!(bucket.is_none(), "bucket must be None (use default)");
4435                assert_eq!(key, "image.png");
4436                assert!(version.is_none());
4437            }
4438            _ => panic!("expected Storage variant"),
4439        }
4440    }
4441
4442    // -----------------------------------------------------------------------
4443    // GCS: health endpoint
4444    // -----------------------------------------------------------------------
4445
4446    #[test]
4447    #[cfg(feature = "gcs")]
4448    fn health_ready_gcs_returns_503_when_context_missing() {
4449        let storage = temp_dir("health-gcs-no-ctx");
4450        let mut config = ServerConfig::new(storage.clone(), None);
4451        config.storage_backend = super::StorageBackend::Gcs;
4452        config.gcs_context = None;
4453
4454        let request = super::http_parse::HttpRequest {
4455            method: "GET".to_string(),
4456            target: "/health/ready".to_string(),
4457            version: "HTTP/1.1".to_string(),
4458            headers: Vec::new(),
4459            body: Vec::new(),
4460        };
4461        let response = route_request(request, &config);
4462        let _ = std::fs::remove_dir_all(storage);
4463
4464        assert_eq!(response.status, "503 Service Unavailable");
4465        let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
4466        let checks = body["checks"].as_array().expect("checks array");
4467        assert!(
4468            checks
4469                .iter()
4470                .any(|c| c["name"] == "storageBackend" && c["status"] == "fail"),
4471            "expected gcsClient fail check in {body}",
4472        );
4473    }
4474
4475    #[test]
4476    #[cfg(feature = "gcs")]
4477    fn health_ready_gcs_includes_gcs_client_check() {
4478        let storage = temp_dir("health-gcs-ok");
4479        let mut config = ServerConfig::new(storage.clone(), None);
4480        config.storage_backend = super::StorageBackend::Gcs;
4481        config.gcs_context = Some(std::sync::Arc::new(super::gcs::GcsContext::for_test(
4482            "test-bucket",
4483            None,
4484        )));
4485
4486        let request = super::http_parse::HttpRequest {
4487            method: "GET".to_string(),
4488            target: "/health/ready".to_string(),
4489            version: "HTTP/1.1".to_string(),
4490            headers: Vec::new(),
4491            body: Vec::new(),
4492        };
4493        let response = route_request(request, &config);
4494        let _ = std::fs::remove_dir_all(storage);
4495
4496        // The gcsClient check will report "fail" because there is no real GCS
4497        // endpoint, but the important thing is that the check is present.
4498        let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
4499        let checks = body["checks"].as_array().expect("checks array");
4500        assert!(
4501            checks.iter().any(|c| c["name"] == "storageBackend"),
4502            "expected gcsClient check in {body}",
4503        );
4504    }
4505
4506    // -----------------------------------------------------------------------
4507    // GCS: public by-path remap (leading slash trimmed, Storage variant used)
4508    // -----------------------------------------------------------------------
4509
4510    #[cfg(feature = "gcs")]
4511    fn remap_path_to_storage_gcs(path: &str, version: Option<&str>) -> TransformSourcePayload {
4512        let source = TransformSourcePayload::Path {
4513            path: path.to_string(),
4514            version: version.map(|v| v.to_string()),
4515        };
4516        match source {
4517            TransformSourcePayload::Path { path, version } => TransformSourcePayload::Storage {
4518                bucket: None,
4519                key: path.trim_start_matches('/').to_string(),
4520                version,
4521            },
4522            other => other,
4523        }
4524    }
4525
4526    #[test]
4527    #[cfg(feature = "gcs")]
4528    fn public_by_path_gcs_remap_trims_leading_slash() {
4529        let source = remap_path_to_storage_gcs("/photos/hero.jpg", Some("v1"));
4530        match &source {
4531            TransformSourcePayload::Storage { key, .. } => {
4532                assert_eq!(key, "photos/hero.jpg", "leading / must be trimmed");
4533            }
4534            _ => panic!("expected Storage variant after remap"),
4535        }
4536
4537        let source2 = remap_path_to_storage_gcs("photos/hero.jpg", Some("v1"));
4538        match &source2 {
4539            TransformSourcePayload::Storage { key, .. } => {
4540                assert_eq!(key, "photos/hero.jpg");
4541            }
4542            _ => panic!("expected Storage variant after remap"),
4543        }
4544
4545        let mut cfg = make_test_config();
4546        cfg.storage_backend = super::StorageBackend::Gcs;
4547        cfg.gcs_context = Some(std::sync::Arc::new(super::gcs::GcsContext::for_test(
4548            "my-bucket",
4549            None,
4550        )));
4551        assert_eq!(
4552            source.versioned_source_hash(&cfg),
4553            source2.versioned_source_hash(&cfg),
4554            "leading-slash and no-leading-slash paths must hash identically after trim",
4555        );
4556    }
4557
4558    #[test]
4559    #[cfg(feature = "gcs")]
4560    fn public_by_path_gcs_remap_produces_storage_variant() {
4561        let source = remap_path_to_storage_gcs("/image.png", None);
4562        match source {
4563            TransformSourcePayload::Storage {
4564                bucket,
4565                key,
4566                version,
4567            } => {
4568                assert!(bucket.is_none(), "bucket must be None (use default)");
4569                assert_eq!(key, "image.png");
4570                assert!(version.is_none());
4571            }
4572            _ => panic!("expected Storage variant"),
4573        }
4574    }
4575
4576    // -----------------------------------------------------------------------
4577    // Azure: health endpoint
4578    // -----------------------------------------------------------------------
4579
4580    #[test]
4581    #[cfg(feature = "azure")]
4582    fn health_ready_azure_returns_503_when_context_missing() {
4583        let storage = temp_dir("health-azure-no-ctx");
4584        let mut config = ServerConfig::new(storage.clone(), None);
4585        config.storage_backend = super::StorageBackend::Azure;
4586        config.azure_context = None;
4587
4588        let request = super::http_parse::HttpRequest {
4589            method: "GET".to_string(),
4590            target: "/health/ready".to_string(),
4591            version: "HTTP/1.1".to_string(),
4592            headers: Vec::new(),
4593            body: Vec::new(),
4594        };
4595        let response = route_request(request, &config);
4596        let _ = std::fs::remove_dir_all(storage);
4597
4598        assert_eq!(response.status, "503 Service Unavailable");
4599        let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
4600        let checks = body["checks"].as_array().expect("checks array");
4601        assert!(
4602            checks
4603                .iter()
4604                .any(|c| c["name"] == "storageBackend" && c["status"] == "fail"),
4605            "expected azureClient fail check in {body}",
4606        );
4607    }
4608
4609    #[test]
4610    #[cfg(feature = "azure")]
4611    fn health_ready_azure_includes_azure_client_check() {
4612        let storage = temp_dir("health-azure-ok");
4613        let mut config = ServerConfig::new(storage.clone(), None);
4614        config.storage_backend = super::StorageBackend::Azure;
4615        config.azure_context = Some(std::sync::Arc::new(super::azure::AzureContext::for_test(
4616            "test-bucket",
4617            "http://localhost:10000/devstoreaccount1",
4618        )));
4619
4620        let request = super::http_parse::HttpRequest {
4621            method: "GET".to_string(),
4622            target: "/health/ready".to_string(),
4623            version: "HTTP/1.1".to_string(),
4624            headers: Vec::new(),
4625            body: Vec::new(),
4626        };
4627        let response = route_request(request, &config);
4628        let _ = std::fs::remove_dir_all(storage);
4629
4630        let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
4631        let checks = body["checks"].as_array().expect("checks array");
4632        assert!(
4633            checks.iter().any(|c| c["name"] == "storageBackend"),
4634            "expected azureClient check in {body}",
4635        );
4636    }
4637
4638    #[test]
4639    fn read_request_rejects_json_body_over_1mib() {
4640        let body = vec![b'x'; super::http_parse::MAX_REQUEST_BODY_BYTES + 1];
4641        let content_length = body.len();
4642        let raw = format!(
4643            "POST /images:transform HTTP/1.1\r\n\
4644             Content-Type: application/json\r\n\
4645             Content-Length: {content_length}\r\n\r\n"
4646        );
4647        let mut data = raw.into_bytes();
4648        data.extend_from_slice(&body);
4649        let result = read_request(&mut data.as_slice());
4650        assert!(result.is_err());
4651    }
4652
4653    #[test]
4654    fn read_request_accepts_multipart_body_over_1mib() {
4655        let payload_size = super::http_parse::MAX_REQUEST_BODY_BYTES + 100;
4656        let body_content = vec![b'A'; payload_size];
4657        let boundary = "test-boundary-123";
4658        let mut body = Vec::new();
4659        body.extend_from_slice(format!("--{boundary}\r\nContent-Disposition: form-data; name=\"file\"; filename=\"big.jpg\"\r\n\r\n").as_bytes());
4660        body.extend_from_slice(&body_content);
4661        body.extend_from_slice(format!("\r\n--{boundary}--\r\n").as_bytes());
4662        let content_length = body.len();
4663        let raw = format!(
4664            "POST /images HTTP/1.1\r\n\
4665             Content-Type: multipart/form-data; boundary={boundary}\r\n\
4666             Content-Length: {content_length}\r\n\r\n"
4667        );
4668        let mut data = raw.into_bytes();
4669        data.extend_from_slice(&body);
4670        let result = read_request(&mut data.as_slice());
4671        assert!(
4672            result.is_ok(),
4673            "multipart upload over 1 MiB should be accepted"
4674        );
4675    }
4676
4677    #[test]
4678    fn multipart_boundary_in_payload_does_not_split_part() {
4679        let boundary = "abc123";
4680        let fake_boundary_in_payload = format!("\r\n--{boundary}NOTREAL");
4681        let part_body = format!("before{fake_boundary_in_payload}after");
4682        let body = format!(
4683            "--{boundary}\r\n\
4684             Content-Disposition: form-data; name=\"file\"\r\n\
4685             Content-Type: application/octet-stream\r\n\r\n\
4686             {part_body}\r\n\
4687             --{boundary}--\r\n"
4688        );
4689
4690        let parts = parse_multipart_form_data(body.as_bytes(), boundary)
4691            .expect("should parse despite boundary-like string in payload");
4692        assert_eq!(parts.len(), 1, "should have exactly one part");
4693
4694        let part_data = &body.as_bytes()[parts[0].body_range.clone()];
4695        let part_text = std::str::from_utf8(part_data).unwrap();
4696        assert!(
4697            part_text.contains("NOTREAL"),
4698            "part body should contain the full fake boundary string"
4699        );
4700    }
4701
4702    #[test]
4703    fn multipart_normal_two_parts_still_works() {
4704        let boundary = "testboundary";
4705        let body = format!(
4706            "--{boundary}\r\n\
4707             Content-Disposition: form-data; name=\"field1\"\r\n\r\n\
4708             value1\r\n\
4709             --{boundary}\r\n\
4710             Content-Disposition: form-data; name=\"field2\"\r\n\r\n\
4711             value2\r\n\
4712             --{boundary}--\r\n"
4713        );
4714
4715        let parts = parse_multipart_form_data(body.as_bytes(), boundary)
4716            .expect("should parse two normal parts");
4717        assert_eq!(parts.len(), 2);
4718        assert_eq!(parts[0].name, "field1");
4719        assert_eq!(parts[1].name, "field2");
4720    }
4721
4722    #[test]
4723    #[serial]
4724    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4725    fn test_storage_timeout_default() {
4726        unsafe {
4727            std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4728        }
4729        let config = ServerConfig::from_env().unwrap();
4730        assert_eq!(config.storage_timeout_secs, 30);
4731    }
4732
4733    #[test]
4734    #[serial]
4735    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4736    fn test_storage_timeout_custom() {
4737        unsafe {
4738            std::env::set_var("TRUSS_STORAGE_TIMEOUT_SECS", "60");
4739        }
4740        let config = ServerConfig::from_env().unwrap();
4741        assert_eq!(config.storage_timeout_secs, 60);
4742        unsafe {
4743            std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4744        }
4745    }
4746
4747    #[test]
4748    #[serial]
4749    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4750    fn test_storage_timeout_min_boundary() {
4751        unsafe {
4752            std::env::set_var("TRUSS_STORAGE_TIMEOUT_SECS", "1");
4753        }
4754        let config = ServerConfig::from_env().unwrap();
4755        assert_eq!(config.storage_timeout_secs, 1);
4756        unsafe {
4757            std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4758        }
4759    }
4760
4761    #[test]
4762    #[serial]
4763    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4764    fn test_storage_timeout_max_boundary() {
4765        unsafe {
4766            std::env::set_var("TRUSS_STORAGE_TIMEOUT_SECS", "300");
4767        }
4768        let config = ServerConfig::from_env().unwrap();
4769        assert_eq!(config.storage_timeout_secs, 300);
4770        unsafe {
4771            std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4772        }
4773    }
4774
4775    #[test]
4776    #[serial]
4777    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4778    fn test_storage_timeout_empty_string_uses_default() {
4779        unsafe {
4780            std::env::set_var("TRUSS_STORAGE_TIMEOUT_SECS", "");
4781        }
4782        let config = ServerConfig::from_env().unwrap();
4783        assert_eq!(config.storage_timeout_secs, 30);
4784        unsafe {
4785            std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4786        }
4787    }
4788
4789    #[test]
4790    #[serial]
4791    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4792    fn test_storage_timeout_zero_rejected() {
4793        unsafe {
4794            std::env::set_var("TRUSS_STORAGE_TIMEOUT_SECS", "0");
4795        }
4796        let err = ServerConfig::from_env().unwrap_err();
4797        assert!(
4798            err.to_string().contains("between 1 and 300"),
4799            "error should mention valid range: {err}"
4800        );
4801        unsafe {
4802            std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4803        }
4804    }
4805
4806    #[test]
4807    #[serial]
4808    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4809    fn test_storage_timeout_over_max_rejected() {
4810        unsafe {
4811            std::env::set_var("TRUSS_STORAGE_TIMEOUT_SECS", "301");
4812        }
4813        let err = ServerConfig::from_env().unwrap_err();
4814        assert!(
4815            err.to_string().contains("between 1 and 300"),
4816            "error should mention valid range: {err}"
4817        );
4818        unsafe {
4819            std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4820        }
4821    }
4822
4823    #[test]
4824    #[serial]
4825    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4826    fn test_storage_timeout_non_numeric_rejected() {
4827        unsafe {
4828            std::env::set_var("TRUSS_STORAGE_TIMEOUT_SECS", "abc");
4829        }
4830        let err = ServerConfig::from_env().unwrap_err();
4831        assert!(
4832            err.to_string().contains("positive integer"),
4833            "error should mention positive integer: {err}"
4834        );
4835        unsafe {
4836            std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4837        }
4838    }
4839
4840    #[test]
4841    #[serial]
4842    fn test_max_concurrent_transforms_default() {
4843        unsafe {
4844            std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
4845        }
4846        let config = ServerConfig::from_env().unwrap();
4847        assert_eq!(config.max_concurrent_transforms, 64);
4848    }
4849
4850    #[test]
4851    #[serial]
4852    fn test_max_concurrent_transforms_custom() {
4853        unsafe {
4854            std::env::set_var("TRUSS_MAX_CONCURRENT_TRANSFORMS", "128");
4855        }
4856        let config = ServerConfig::from_env().unwrap();
4857        assert_eq!(config.max_concurrent_transforms, 128);
4858        unsafe {
4859            std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
4860        }
4861    }
4862
4863    #[test]
4864    #[serial]
4865    fn test_max_concurrent_transforms_min_boundary() {
4866        unsafe {
4867            std::env::set_var("TRUSS_MAX_CONCURRENT_TRANSFORMS", "1");
4868        }
4869        let config = ServerConfig::from_env().unwrap();
4870        assert_eq!(config.max_concurrent_transforms, 1);
4871        unsafe {
4872            std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
4873        }
4874    }
4875
4876    #[test]
4877    #[serial]
4878    fn test_max_concurrent_transforms_max_boundary() {
4879        unsafe {
4880            std::env::set_var("TRUSS_MAX_CONCURRENT_TRANSFORMS", "1024");
4881        }
4882        let config = ServerConfig::from_env().unwrap();
4883        assert_eq!(config.max_concurrent_transforms, 1024);
4884        unsafe {
4885            std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
4886        }
4887    }
4888
4889    #[test]
4890    #[serial]
4891    fn test_max_concurrent_transforms_empty_uses_default() {
4892        unsafe {
4893            std::env::set_var("TRUSS_MAX_CONCURRENT_TRANSFORMS", "");
4894        }
4895        let config = ServerConfig::from_env().unwrap();
4896        assert_eq!(config.max_concurrent_transforms, 64);
4897        unsafe {
4898            std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
4899        }
4900    }
4901
4902    #[test]
4903    #[serial]
4904    fn test_max_concurrent_transforms_zero_rejected() {
4905        unsafe {
4906            std::env::set_var("TRUSS_MAX_CONCURRENT_TRANSFORMS", "0");
4907        }
4908        let err = ServerConfig::from_env().unwrap_err();
4909        assert!(
4910            err.to_string().contains("between 1 and 1024"),
4911            "error should mention valid range: {err}"
4912        );
4913        unsafe {
4914            std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
4915        }
4916    }
4917
4918    #[test]
4919    #[serial]
4920    fn test_max_concurrent_transforms_over_max_rejected() {
4921        unsafe {
4922            std::env::set_var("TRUSS_MAX_CONCURRENT_TRANSFORMS", "1025");
4923        }
4924        let err = ServerConfig::from_env().unwrap_err();
4925        assert!(
4926            err.to_string().contains("between 1 and 1024"),
4927            "error should mention valid range: {err}"
4928        );
4929        unsafe {
4930            std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
4931        }
4932    }
4933
4934    #[test]
4935    #[serial]
4936    fn test_max_concurrent_transforms_non_numeric_rejected() {
4937        unsafe {
4938            std::env::set_var("TRUSS_MAX_CONCURRENT_TRANSFORMS", "abc");
4939        }
4940        let err = ServerConfig::from_env().unwrap_err();
4941        assert!(
4942            err.to_string().contains("positive integer"),
4943            "error should mention positive integer: {err}"
4944        );
4945        unsafe {
4946            std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
4947        }
4948    }
4949
4950    #[test]
4951    #[serial]
4952    fn test_transform_deadline_default() {
4953        unsafe {
4954            std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
4955        }
4956        let config = ServerConfig::from_env().unwrap();
4957        assert_eq!(config.transform_deadline_secs, 30);
4958    }
4959
4960    #[test]
4961    #[serial]
4962    fn test_transform_deadline_custom() {
4963        unsafe {
4964            std::env::set_var("TRUSS_TRANSFORM_DEADLINE_SECS", "60");
4965        }
4966        let config = ServerConfig::from_env().unwrap();
4967        assert_eq!(config.transform_deadline_secs, 60);
4968        unsafe {
4969            std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
4970        }
4971    }
4972
4973    #[test]
4974    #[serial]
4975    fn test_transform_deadline_min_boundary() {
4976        unsafe {
4977            std::env::set_var("TRUSS_TRANSFORM_DEADLINE_SECS", "1");
4978        }
4979        let config = ServerConfig::from_env().unwrap();
4980        assert_eq!(config.transform_deadline_secs, 1);
4981        unsafe {
4982            std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
4983        }
4984    }
4985
4986    #[test]
4987    #[serial]
4988    fn test_transform_deadline_max_boundary() {
4989        unsafe {
4990            std::env::set_var("TRUSS_TRANSFORM_DEADLINE_SECS", "300");
4991        }
4992        let config = ServerConfig::from_env().unwrap();
4993        assert_eq!(config.transform_deadline_secs, 300);
4994        unsafe {
4995            std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
4996        }
4997    }
4998
4999    #[test]
5000    #[serial]
5001    fn test_transform_deadline_empty_uses_default() {
5002        unsafe {
5003            std::env::set_var("TRUSS_TRANSFORM_DEADLINE_SECS", "");
5004        }
5005        let config = ServerConfig::from_env().unwrap();
5006        assert_eq!(config.transform_deadline_secs, 30);
5007        unsafe {
5008            std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
5009        }
5010    }
5011
5012    #[test]
5013    #[serial]
5014    fn test_transform_deadline_zero_rejected() {
5015        unsafe {
5016            std::env::set_var("TRUSS_TRANSFORM_DEADLINE_SECS", "0");
5017        }
5018        let err = ServerConfig::from_env().unwrap_err();
5019        assert!(
5020            err.to_string().contains("between 1 and 300"),
5021            "error should mention valid range: {err}"
5022        );
5023        unsafe {
5024            std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
5025        }
5026    }
5027
5028    #[test]
5029    #[serial]
5030    fn test_transform_deadline_over_max_rejected() {
5031        unsafe {
5032            std::env::set_var("TRUSS_TRANSFORM_DEADLINE_SECS", "301");
5033        }
5034        let err = ServerConfig::from_env().unwrap_err();
5035        assert!(
5036            err.to_string().contains("between 1 and 300"),
5037            "error should mention valid range: {err}"
5038        );
5039        unsafe {
5040            std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
5041        }
5042    }
5043
5044    #[test]
5045    #[serial]
5046    fn test_transform_deadline_non_numeric_rejected() {
5047        unsafe {
5048            std::env::set_var("TRUSS_TRANSFORM_DEADLINE_SECS", "abc");
5049        }
5050        let err = ServerConfig::from_env().unwrap_err();
5051        assert!(
5052            err.to_string().contains("positive integer"),
5053            "error should mention positive integer: {err}"
5054        );
5055        unsafe {
5056            std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
5057        }
5058    }
5059
5060    #[test]
5061    #[serial]
5062    #[cfg(feature = "azure")]
5063    fn test_azure_container_env_var_required() {
5064        unsafe {
5065            std::env::set_var("TRUSS_STORAGE_BACKEND", "azure");
5066            std::env::remove_var("TRUSS_AZURE_CONTAINER");
5067        }
5068        let err = ServerConfig::from_env().unwrap_err();
5069        assert!(
5070            err.to_string().contains("TRUSS_AZURE_CONTAINER"),
5071            "error should mention TRUSS_AZURE_CONTAINER: {err}"
5072        );
5073        unsafe {
5074            std::env::remove_var("TRUSS_STORAGE_BACKEND");
5075        }
5076    }
5077
5078    #[test]
5079    fn server_config_debug_redacts_bearer_token_and_signed_url_secret() {
5080        let mut config = ServerConfig::new(
5081            temp_dir("debug-redact"),
5082            Some("super-secret-token-12345".to_string()),
5083        );
5084        config.signed_url_key_id = Some("visible-key-id".to_string());
5085        config.signed_url_secret = Some("super-secret-hmac-key".to_string());
5086        let debug = format!("{config:?}");
5087        assert!(
5088            !debug.contains("super-secret-token-12345"),
5089            "bearer_token leaked in Debug output: {debug}"
5090        );
5091        assert!(
5092            !debug.contains("super-secret-hmac-key"),
5093            "signed_url_secret leaked in Debug output: {debug}"
5094        );
5095        assert!(
5096            debug.contains("[REDACTED]"),
5097            "expected [REDACTED] in Debug output: {debug}"
5098        );
5099        assert!(
5100            debug.contains("visible-key-id"),
5101            "signed_url_key_id should be visible: {debug}"
5102        );
5103    }
5104
5105    #[test]
5106    fn authorize_headers_accepts_correct_bearer_token() {
5107        let config = ServerConfig::new(temp_dir("auth-ok"), Some("correct-token".to_string()));
5108        let headers = vec![(
5109            "authorization".to_string(),
5110            "Bearer correct-token".to_string(),
5111        )];
5112        assert!(super::authorize_request_headers(&headers, &config).is_ok());
5113    }
5114
5115    #[test]
5116    fn authorize_headers_rejects_wrong_bearer_token() {
5117        let config = ServerConfig::new(temp_dir("auth-wrong"), Some("correct-token".to_string()));
5118        let headers = vec![(
5119            "authorization".to_string(),
5120            "Bearer wrong-token".to_string(),
5121        )];
5122        let err = super::authorize_request_headers(&headers, &config).unwrap_err();
5123        assert_eq!(err.status, "401 Unauthorized");
5124    }
5125
5126    #[test]
5127    fn authorize_headers_rejects_missing_header() {
5128        let config = ServerConfig::new(temp_dir("auth-missing"), Some("correct-token".to_string()));
5129        let headers: Vec<(String, String)> = vec![];
5130        let err = super::authorize_request_headers(&headers, &config).unwrap_err();
5131        assert_eq!(err.status, "401 Unauthorized");
5132    }
5133
5134    // ── TransformSlot RAII guard ──────────────────────────────────────
5135
5136    #[test]
5137    fn transform_slot_acquire_succeeds_under_limit() {
5138        use std::sync::Arc;
5139        use std::sync::atomic::AtomicU64;
5140
5141        let counter = Arc::new(AtomicU64::new(0));
5142        let slot = super::TransformSlot::try_acquire(&counter, 2);
5143        assert!(slot.is_some());
5144        assert_eq!(counter.load(Ordering::Relaxed), 1);
5145    }
5146
5147    #[test]
5148    fn transform_slot_acquire_returns_none_at_limit() {
5149        use std::sync::Arc;
5150        use std::sync::atomic::AtomicU64;
5151
5152        let counter = Arc::new(AtomicU64::new(0));
5153        let _s1 = super::TransformSlot::try_acquire(&counter, 1).unwrap();
5154        let s2 = super::TransformSlot::try_acquire(&counter, 1);
5155        assert!(s2.is_none());
5156        // Counter must still be 1 (failed acquire must not leak).
5157        assert_eq!(counter.load(Ordering::Relaxed), 1);
5158    }
5159
5160    #[test]
5161    fn transform_slot_drop_decrements_counter() {
5162        use std::sync::Arc;
5163        use std::sync::atomic::AtomicU64;
5164
5165        let counter = Arc::new(AtomicU64::new(0));
5166        {
5167            let _slot = super::TransformSlot::try_acquire(&counter, 4).unwrap();
5168            assert_eq!(counter.load(Ordering::Relaxed), 1);
5169        }
5170        // After drop the counter must return to zero.
5171        assert_eq!(counter.load(Ordering::Relaxed), 0);
5172    }
5173
5174    #[test]
5175    fn transform_slot_multiple_acquires_up_to_limit() {
5176        use std::sync::Arc;
5177        use std::sync::atomic::AtomicU64;
5178
5179        let counter = Arc::new(AtomicU64::new(0));
5180        let limit = 3u64;
5181        let mut slots = Vec::new();
5182        for _ in 0..limit {
5183            slots.push(super::TransformSlot::try_acquire(&counter, limit).unwrap());
5184        }
5185        assert_eq!(counter.load(Ordering::Relaxed), limit);
5186        // One more must fail.
5187        assert!(super::TransformSlot::try_acquire(&counter, limit).is_none());
5188        assert_eq!(counter.load(Ordering::Relaxed), limit);
5189        // Drop all slots.
5190        slots.clear();
5191        assert_eq!(counter.load(Ordering::Relaxed), 0);
5192    }
5193
5194    // ── Access log via emit_access_log ────────────────────────────────
5195
5196    #[test]
5197    fn emit_access_log_produces_json_with_expected_fields() {
5198        use std::sync::{Arc, Mutex};
5199        use std::time::Instant;
5200
5201        let captured = Arc::new(Mutex::new(String::new()));
5202        let captured_clone = Arc::clone(&captured);
5203        let handler: super::LogHandler =
5204            Arc::new(move |msg: &str| *captured_clone.lock().unwrap() = msg.to_owned());
5205
5206        let mut config = ServerConfig::new(temp_dir("access-log"), None);
5207        config.log_handler = Some(handler);
5208
5209        let start = Instant::now();
5210        super::emit_access_log(
5211            &config,
5212            &super::AccessLogEntry {
5213                request_id: "req-123",
5214                method: "GET",
5215                path: "/image.png",
5216                route: "transform",
5217                status: "200",
5218                start,
5219                cache_status: Some("hit"),
5220                watermark: false,
5221            },
5222        );
5223
5224        let output = captured.lock().unwrap().clone();
5225        let parsed: serde_json::Value = serde_json::from_str(&output).expect("valid JSON");
5226        assert_eq!(parsed["kind"], "access_log");
5227        assert_eq!(parsed["request_id"], "req-123");
5228        assert_eq!(parsed["method"], "GET");
5229        assert_eq!(parsed["path"], "/image.png");
5230        assert_eq!(parsed["route"], "transform");
5231        assert_eq!(parsed["status"], "200");
5232        assert_eq!(parsed["cache_status"], "hit");
5233        assert!(parsed["latency_ms"].is_u64());
5234    }
5235
5236    #[test]
5237    fn emit_access_log_null_cache_status_when_none() {
5238        use std::sync::{Arc, Mutex};
5239        use std::time::Instant;
5240
5241        let captured = Arc::new(Mutex::new(String::new()));
5242        let captured_clone = Arc::clone(&captured);
5243        let handler: super::LogHandler =
5244            Arc::new(move |msg: &str| *captured_clone.lock().unwrap() = msg.to_owned());
5245
5246        let mut config = ServerConfig::new(temp_dir("access-log-none"), None);
5247        config.log_handler = Some(handler);
5248
5249        super::emit_access_log(
5250            &config,
5251            &super::AccessLogEntry {
5252                request_id: "req-456",
5253                method: "POST",
5254                path: "/upload",
5255                route: "upload",
5256                status: "201",
5257                start: Instant::now(),
5258                cache_status: None,
5259                watermark: false,
5260            },
5261        );
5262
5263        let output = captured.lock().unwrap().clone();
5264        let parsed: serde_json::Value = serde_json::from_str(&output).expect("valid JSON");
5265        assert!(parsed["cache_status"].is_null());
5266    }
5267
5268    // ── X-Request-Id header ───────────────────────────────────────────
5269
5270    #[test]
5271    fn x_request_id_is_extracted_from_incoming_headers() {
5272        let headers = vec![
5273            ("host".to_string(), "localhost".to_string()),
5274            ("x-request-id".to_string(), "custom-id-abc".to_string()),
5275        ];
5276        assert_eq!(
5277            super::extract_request_id(&headers),
5278            Some("custom-id-abc".to_string())
5279        );
5280    }
5281
5282    #[test]
5283    fn x_request_id_not_extracted_when_empty() {
5284        let headers = vec![("x-request-id".to_string(), "".to_string())];
5285        assert!(super::extract_request_id(&headers).is_none());
5286    }
5287
5288    #[test]
5289    fn x_request_id_not_extracted_when_absent() {
5290        let headers = vec![("host".to_string(), "localhost".to_string())];
5291        assert!(super::extract_request_id(&headers).is_none());
5292    }
5293
5294    // ── Cache status extraction ───────────────────────────────────────
5295
5296    #[test]
5297    fn cache_status_hit_detected() {
5298        let headers: Vec<(String, String)> =
5299            vec![("Cache-Status".to_string(), "\"truss\"; hit".to_string())];
5300        assert_eq!(super::extract_cache_status(&headers), Some("hit"));
5301    }
5302
5303    #[test]
5304    fn cache_status_miss_detected() {
5305        let headers: Vec<(String, String)> = vec![(
5306            "Cache-Status".to_string(),
5307            "\"truss\"; fwd=miss".to_string(),
5308        )];
5309        assert_eq!(super::extract_cache_status(&headers), Some("miss"));
5310    }
5311
5312    #[test]
5313    fn cache_status_none_when_header_absent() {
5314        let headers: Vec<(String, String)> =
5315            vec![("Content-Type".to_string(), "image/png".to_string())];
5316        assert!(super::extract_cache_status(&headers).is_none());
5317    }
5318
5319    #[test]
5320    fn signing_keys_populated_by_with_signed_url_credentials() {
5321        let config = ServerConfig::new(temp_dir("signing-keys-populated"), None)
5322            .with_signed_url_credentials("key-alpha", "secret-alpha");
5323
5324        assert_eq!(
5325            config.signing_keys.get("key-alpha").map(String::as_str),
5326            Some("secret-alpha")
5327        );
5328    }
5329
5330    #[test]
5331    fn authorize_signed_request_accepts_multiple_keys() {
5332        let mut extra = HashMap::new();
5333        extra.insert("key-beta".to_string(), "secret-beta".to_string());
5334        let config = ServerConfig::new(temp_dir("multi-key-accept"), None)
5335            .with_signed_url_credentials("key-alpha", "secret-alpha")
5336            .with_signing_keys(extra);
5337
5338        // Sign with key-alpha
5339        let request_alpha = signed_public_request(
5340            "/images/by-path?path=%2Fimage.png&keyId=key-alpha&expires=4102444800&format=jpeg",
5341            "assets.example.com",
5342            "secret-alpha",
5343        );
5344        let query_alpha =
5345            super::auth::parse_query_params(&request_alpha).expect("parse query alpha");
5346        authorize_signed_request(&request_alpha, &query_alpha, &config)
5347            .expect("key-alpha should be accepted");
5348
5349        // Sign with key-beta
5350        let request_beta = signed_public_request(
5351            "/images/by-path?path=%2Fimage.png&keyId=key-beta&expires=4102444800&format=jpeg",
5352            "assets.example.com",
5353            "secret-beta",
5354        );
5355        let query_beta = super::auth::parse_query_params(&request_beta).expect("parse query beta");
5356        authorize_signed_request(&request_beta, &query_beta, &config)
5357            .expect("key-beta should be accepted");
5358    }
5359
5360    #[test]
5361    fn authorize_signed_request_rejects_unknown_key() {
5362        let config = ServerConfig::new(temp_dir("unknown-key-reject"), None)
5363            .with_signed_url_credentials("key-alpha", "secret-alpha");
5364
5365        let request = signed_public_request(
5366            "/images/by-path?path=%2Fimage.png&keyId=key-unknown&expires=4102444800&format=jpeg",
5367            "assets.example.com",
5368            "secret-unknown",
5369        );
5370        let query = super::auth::parse_query_params(&request).expect("parse query");
5371        authorize_signed_request(&request, &query, &config)
5372            .expect_err("unknown key should be rejected");
5373    }
5374
5375    // ── Security: X-Request-Id CRLF injection prevention ─────────────
5376
5377    #[test]
5378    fn x_request_id_rejects_crlf_injection() {
5379        let headers = vec![(
5380            "x-request-id".to_string(),
5381            "evil\r\nX-Injected: true".to_string(),
5382        )];
5383        assert!(
5384            super::extract_request_id(&headers).is_none(),
5385            "CRLF in request ID must be rejected"
5386        );
5387    }
5388
5389    #[test]
5390    fn x_request_id_rejects_lone_cr() {
5391        let headers = vec![("x-request-id".to_string(), "evil\rid".to_string())];
5392        assert!(super::extract_request_id(&headers).is_none());
5393    }
5394
5395    #[test]
5396    fn x_request_id_rejects_lone_lf() {
5397        let headers = vec![("x-request-id".to_string(), "evil\nid".to_string())];
5398        assert!(super::extract_request_id(&headers).is_none());
5399    }
5400
5401    #[test]
5402    fn x_request_id_rejects_nul_byte() {
5403        let headers = vec![("x-request-id".to_string(), "evil\0id".to_string())];
5404        assert!(super::extract_request_id(&headers).is_none());
5405    }
5406
5407    #[test]
5408    fn x_request_id_accepts_normal_uuid() {
5409        let headers = vec![(
5410            "x-request-id".to_string(),
5411            "550e8400-e29b-41d4-a716-446655440000".to_string(),
5412        )];
5413        assert_eq!(
5414            super::extract_request_id(&headers),
5415            Some("550e8400-e29b-41d4-a716-446655440000".to_string())
5416        );
5417    }
5418
5419    // ── Characterization: ServerConfig defaults ──────────────────────
5420
5421    #[test]
5422    fn server_config_new_has_expected_defaults() {
5423        let root = temp_dir("cfg-defaults");
5424        let config = ServerConfig::new(root.clone(), None);
5425        assert_eq!(config.storage_root, root);
5426        assert!(config.bearer_token.is_none());
5427        assert!(config.signed_url_secret.is_none());
5428        assert!(config.signing_keys.is_empty());
5429        assert!(config.presets.is_empty());
5430        assert_eq!(
5431            config.max_concurrent_transforms,
5432            DEFAULT_MAX_CONCURRENT_TRANSFORMS
5433        );
5434        assert_eq!(
5435            config.public_max_age_seconds,
5436            DEFAULT_PUBLIC_MAX_AGE_SECONDS
5437        );
5438        assert_eq!(
5439            config.public_stale_while_revalidate_seconds,
5440            DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS
5441        );
5442        assert!(!config.allow_insecure_url_sources);
5443    }
5444
5445    #[test]
5446    fn server_config_builder_with_signed_url_credentials_overwrites() {
5447        let root = temp_dir("cfg-builder");
5448        let config = ServerConfig::new(root, None)
5449            .with_signed_url_credentials("key1", "secret1")
5450            .with_signed_url_credentials("key2", "secret2");
5451        assert!(config.signing_keys.contains_key("key1"));
5452        assert!(config.signing_keys.contains_key("key2"));
5453    }
5454
5455    // ── Characterization: route_request classification ───────────────
5456
5457    #[test]
5458    fn route_request_returns_not_found_for_unknown_path() {
5459        let root = temp_dir("route-unknown");
5460        let config = ServerConfig::new(root, None);
5461        let request = HttpRequest {
5462            method: "GET".to_string(),
5463            target: "/nonexistent".to_string(),
5464            version: "HTTP/1.1".to_string(),
5465            headers: vec![("host".to_string(), "localhost".to_string())],
5466            body: Vec::new(),
5467        };
5468        let response = route_request(request, &config);
5469        assert_eq!(response.status, "404 Not Found");
5470    }
5471
5472    #[test]
5473    fn route_request_health_returns_200() {
5474        let root = temp_dir("route-health");
5475        let config = ServerConfig::new(root, None);
5476        let request = HttpRequest {
5477            method: "GET".to_string(),
5478            target: "/health".to_string(),
5479            version: "HTTP/1.1".to_string(),
5480            headers: vec![("host".to_string(), "localhost".to_string())],
5481            body: Vec::new(),
5482        };
5483        let response = route_request(request, &config);
5484        assert_eq!(response.status, "200 OK");
5485    }
5486
5487    // ── Characterization: TransformSlot thread safety ────────────────
5488
5489    #[test]
5490    fn transform_slot_concurrent_acquire_respects_limit() {
5491        use std::sync::Arc;
5492        use std::sync::Barrier;
5493        use std::sync::atomic::AtomicU64;
5494
5495        let counter = Arc::new(AtomicU64::new(0));
5496        let limit = 4u64;
5497        let num_threads = 16;
5498        let barrier = Arc::new(Barrier::new(num_threads));
5499        let acquired = Arc::new(AtomicU64::new(0));
5500
5501        let handles: Vec<_> = (0..num_threads)
5502            .map(|_| {
5503                let counter = Arc::clone(&counter);
5504                let barrier = Arc::clone(&barrier);
5505                let acquired = Arc::clone(&acquired);
5506                thread::spawn(move || {
5507                    barrier.wait();
5508                    if let Some(_slot) = super::TransformSlot::try_acquire(&counter, limit) {
5509                        acquired.fetch_add(1, Ordering::Relaxed);
5510                        thread::sleep(Duration::from_millis(10));
5511                    }
5512                })
5513            })
5514            .collect();
5515
5516        for h in handles {
5517            h.join().unwrap();
5518        }
5519        assert_eq!(counter.load(Ordering::Relaxed), 0);
5520    }
5521
5522    #[test]
5523    #[serial]
5524    fn test_max_input_pixels_default() {
5525        unsafe {
5526            std::env::remove_var("TRUSS_MAX_INPUT_PIXELS");
5527        }
5528        let config = ServerConfig::from_env().unwrap();
5529        assert_eq!(config.max_input_pixels, 40_000_000);
5530    }
5531
5532    #[test]
5533    #[serial]
5534    fn test_max_input_pixels_custom() {
5535        unsafe {
5536            std::env::set_var("TRUSS_MAX_INPUT_PIXELS", "10000000");
5537        }
5538        let config = ServerConfig::from_env().unwrap();
5539        assert_eq!(config.max_input_pixels, 10_000_000);
5540        unsafe {
5541            std::env::remove_var("TRUSS_MAX_INPUT_PIXELS");
5542        }
5543    }
5544
5545    #[test]
5546    #[serial]
5547    fn test_max_input_pixels_min_boundary() {
5548        unsafe {
5549            std::env::set_var("TRUSS_MAX_INPUT_PIXELS", "1");
5550        }
5551        let config = ServerConfig::from_env().unwrap();
5552        assert_eq!(config.max_input_pixels, 1);
5553        unsafe {
5554            std::env::remove_var("TRUSS_MAX_INPUT_PIXELS");
5555        }
5556    }
5557
5558    #[test]
5559    #[serial]
5560    fn test_max_input_pixels_max_boundary() {
5561        unsafe {
5562            std::env::set_var("TRUSS_MAX_INPUT_PIXELS", "100000000");
5563        }
5564        let config = ServerConfig::from_env().unwrap();
5565        assert_eq!(config.max_input_pixels, 100_000_000);
5566        unsafe {
5567            std::env::remove_var("TRUSS_MAX_INPUT_PIXELS");
5568        }
5569    }
5570
5571    #[test]
5572    #[serial]
5573    fn test_max_input_pixels_empty_uses_default() {
5574        unsafe {
5575            std::env::set_var("TRUSS_MAX_INPUT_PIXELS", "");
5576        }
5577        let config = ServerConfig::from_env().unwrap();
5578        assert_eq!(config.max_input_pixels, 40_000_000);
5579        unsafe {
5580            std::env::remove_var("TRUSS_MAX_INPUT_PIXELS");
5581        }
5582    }
5583
5584    #[test]
5585    #[serial]
5586    fn test_max_input_pixels_zero_rejected() {
5587        unsafe {
5588            std::env::set_var("TRUSS_MAX_INPUT_PIXELS", "0");
5589        }
5590        let err = ServerConfig::from_env().unwrap_err();
5591        assert!(err.to_string().contains("TRUSS_MAX_INPUT_PIXELS"));
5592        unsafe {
5593            std::env::remove_var("TRUSS_MAX_INPUT_PIXELS");
5594        }
5595    }
5596
5597    #[test]
5598    #[serial]
5599    fn test_max_input_pixels_over_max_rejected() {
5600        unsafe {
5601            std::env::set_var("TRUSS_MAX_INPUT_PIXELS", "100000001");
5602        }
5603        let err = ServerConfig::from_env().unwrap_err();
5604        assert!(err.to_string().contains("TRUSS_MAX_INPUT_PIXELS"));
5605        unsafe {
5606            std::env::remove_var("TRUSS_MAX_INPUT_PIXELS");
5607        }
5608    }
5609
5610    #[test]
5611    #[serial]
5612    fn test_max_input_pixels_non_numeric_rejected() {
5613        unsafe {
5614            std::env::set_var("TRUSS_MAX_INPUT_PIXELS", "abc");
5615        }
5616        let err = ServerConfig::from_env().unwrap_err();
5617        assert!(err.to_string().contains("TRUSS_MAX_INPUT_PIXELS"));
5618        unsafe {
5619            std::env::remove_var("TRUSS_MAX_INPUT_PIXELS");
5620        }
5621    }
5622
5623    #[test]
5624    #[serial]
5625    fn test_max_upload_bytes_default() {
5626        unsafe {
5627            std::env::remove_var("TRUSS_MAX_UPLOAD_BYTES");
5628        }
5629        let config = ServerConfig::from_env().unwrap();
5630        assert_eq!(config.max_upload_bytes, 100 * 1024 * 1024);
5631    }
5632
5633    #[test]
5634    #[serial]
5635    fn test_max_upload_bytes_custom() {
5636        unsafe {
5637            std::env::set_var("TRUSS_MAX_UPLOAD_BYTES", "5242880");
5638        }
5639        let config = ServerConfig::from_env().unwrap();
5640        assert_eq!(config.max_upload_bytes, 5 * 1024 * 1024);
5641        unsafe {
5642            std::env::remove_var("TRUSS_MAX_UPLOAD_BYTES");
5643        }
5644    }
5645
5646    #[test]
5647    #[serial]
5648    fn test_max_upload_bytes_zero_rejected() {
5649        unsafe {
5650            std::env::set_var("TRUSS_MAX_UPLOAD_BYTES", "0");
5651        }
5652        let err = ServerConfig::from_env().unwrap_err();
5653        assert!(err.to_string().contains("TRUSS_MAX_UPLOAD_BYTES"));
5654        unsafe {
5655            std::env::remove_var("TRUSS_MAX_UPLOAD_BYTES");
5656        }
5657    }
5658
5659    #[test]
5660    #[serial]
5661    fn test_max_upload_bytes_non_numeric_rejected() {
5662        unsafe {
5663            std::env::set_var("TRUSS_MAX_UPLOAD_BYTES", "abc");
5664        }
5665        let err = ServerConfig::from_env().unwrap_err();
5666        assert!(err.to_string().contains("TRUSS_MAX_UPLOAD_BYTES"));
5667        unsafe {
5668            std::env::remove_var("TRUSS_MAX_UPLOAD_BYTES");
5669        }
5670    }
5671
5672    #[test]
5673    fn max_body_for_multipart_uses_custom_upload_limit() {
5674        let headers = vec![(
5675            "content-type".to_string(),
5676            "multipart/form-data; boundary=abc".to_string(),
5677        )];
5678        let custom_limit = 5 * 1024 * 1024;
5679        assert_eq!(
5680            super::http_parse::max_body_for_headers(&headers, custom_limit),
5681            custom_limit
5682        );
5683    }
5684
5685    #[test]
5686    fn health_includes_max_input_pixels() {
5687        let storage = temp_dir("health-pixels");
5688        let request = HttpRequest {
5689            method: "GET".to_string(),
5690            target: "/health".to_string(),
5691            version: "HTTP/1.1".to_string(),
5692            headers: Vec::new(),
5693            body: Vec::new(),
5694        };
5695
5696        let config = ServerConfig::new(storage, None);
5697        let response = route_request(request, &config);
5698
5699        assert_eq!(response.status, "200 OK");
5700        let body: serde_json::Value =
5701            serde_json::from_slice(&response.body).expect("parse health body");
5702        assert_eq!(body["maxInputPixels"], 40_000_000);
5703    }
5704
5705    #[test]
5706    fn health_includes_transform_capacity_details() {
5707        let storage = temp_dir("health-capacity");
5708        let config = ServerConfig::new(storage, None);
5709        let response = route_request(
5710            HttpRequest {
5711                method: "GET".to_string(),
5712                target: "/health".to_string(),
5713                version: "HTTP/1.1".to_string(),
5714                headers: Vec::new(),
5715                body: Vec::new(),
5716            },
5717            &config,
5718        );
5719        let body: serde_json::Value =
5720            serde_json::from_slice(&response.body).expect("parse health body");
5721        let checks = body["checks"].as_array().expect("checks array");
5722        let capacity = checks
5723            .iter()
5724            .find(|c| c["name"] == "transformCapacity")
5725            .expect("transformCapacity check");
5726        assert_eq!(capacity["current"], 0);
5727        assert_eq!(capacity["max"], 64);
5728    }
5729
5730    #[cfg(target_os = "linux")]
5731    #[test]
5732    fn process_rss_bytes_returns_some() {
5733        let rss = super::process_rss_bytes();
5734        assert!(rss.is_some());
5735        assert!(rss.unwrap() > 0);
5736    }
5737
5738    #[cfg(target_os = "linux")]
5739    #[test]
5740    fn disk_free_bytes_returns_some_for_existing_dir() {
5741        let dir = temp_dir("disk-free");
5742        let free = super::disk_free_bytes(&dir);
5743        assert!(free.is_some());
5744        assert!(free.unwrap() > 0);
5745    }
5746
5747    #[test]
5748    fn health_ready_returns_503_when_memory_exceeded() {
5749        let storage = temp_dir("health-mem");
5750        let mut config = ServerConfig::new(storage, None);
5751        // Set threshold to 1 byte — guaranteed to be exceeded.
5752        config.health_max_memory_bytes = Some(1);
5753        let response = route_request(
5754            HttpRequest {
5755                method: "GET".to_string(),
5756                target: "/health/ready".to_string(),
5757                version: "HTTP/1.1".to_string(),
5758                headers: Vec::new(),
5759                body: Vec::new(),
5760            },
5761            &config,
5762        );
5763        // On Linux, RSS > 1 byte → 503. On other platforms, memory check
5764        // is skipped so the response is 200.
5765        if cfg!(target_os = "linux") {
5766            assert_eq!(response.status, "503 Service Unavailable");
5767        }
5768    }
5769
5770    #[test]
5771    fn health_includes_memory_usage_on_linux() {
5772        let storage = temp_dir("health-mem-report");
5773        let mut config = ServerConfig::new(storage, None);
5774        config.health_max_memory_bytes = Some(u64::MAX);
5775        let response = route_request(
5776            HttpRequest {
5777                method: "GET".to_string(),
5778                target: "/health".to_string(),
5779                version: "HTTP/1.1".to_string(),
5780                headers: Vec::new(),
5781                body: Vec::new(),
5782            },
5783            &config,
5784        );
5785        let body: serde_json::Value =
5786            serde_json::from_slice(&response.body).expect("parse health body");
5787        if cfg!(target_os = "linux") {
5788            let checks = body["checks"].as_array().expect("checks array");
5789            let mem = checks
5790                .iter()
5791                .find(|c| c["name"] == "memoryUsage")
5792                .expect("memoryUsage check");
5793            assert_eq!(mem["status"], "ok");
5794            assert!(mem["rssBytes"].as_u64().unwrap() > 0);
5795        }
5796    }
5797
5798    #[cfg(target_os = "linux")]
5799    #[test]
5800    fn disk_free_bytes_returns_none_for_nonexistent_path() {
5801        let free = super::disk_free_bytes(std::path::Path::new("/nonexistent/path/xyz"));
5802        assert!(free.is_none());
5803    }
5804
5805    #[test]
5806    fn health_ready_503_body_contains_fail_status() {
5807        let storage = temp_dir("health-ready-body");
5808        std::fs::remove_dir_all(&storage).ok();
5809        let config = ServerConfig::new(storage, None);
5810        let response = route_request(
5811            HttpRequest {
5812                method: "GET".to_string(),
5813                target: "/health/ready".to_string(),
5814                version: "HTTP/1.1".to_string(),
5815                headers: Vec::new(),
5816                body: Vec::new(),
5817            },
5818            &config,
5819        );
5820        assert_eq!(response.status, "503 Service Unavailable");
5821        let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
5822        assert_eq!(body["status"], "fail");
5823        let checks = body["checks"].as_array().expect("checks array");
5824        let storage_check = checks
5825            .iter()
5826            .find(|c| c["name"] == "storageRoot")
5827            .expect("storageRoot check");
5828        assert_eq!(storage_check["status"], "fail");
5829    }
5830
5831    #[test]
5832    fn health_ready_cache_disk_free_shown_when_cache_root_set() {
5833        let storage = temp_dir("health-ready-cache-disk");
5834        let cache = temp_dir("health-ready-cache-disk-cache");
5835        let mut config = ServerConfig::new(storage, None).with_cache_root(cache);
5836        config.health_cache_min_free_bytes = Some(1);
5837        let response = route_request(
5838            HttpRequest {
5839                method: "GET".to_string(),
5840                target: "/health/ready".to_string(),
5841                version: "HTTP/1.1".to_string(),
5842                headers: Vec::new(),
5843                body: Vec::new(),
5844            },
5845            &config,
5846        );
5847        let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
5848        let checks = body["checks"].as_array().expect("checks array");
5849        let disk_check = checks
5850            .iter()
5851            .find(|c| c["name"] == "cacheDiskFree")
5852            .expect("cacheDiskFree check");
5853        assert_eq!(disk_check["status"], "ok");
5854        if cfg!(target_os = "linux") {
5855            assert!(disk_check["freeBytes"].as_u64().is_some());
5856        }
5857        assert_eq!(disk_check["thresholdBytes"], 1);
5858    }
5859
5860    #[test]
5861    fn health_ready_no_cache_disk_free_without_cache_root() {
5862        let storage = temp_dir("health-ready-no-cache");
5863        let config = ServerConfig::new(storage, None);
5864        let response = route_request(
5865            HttpRequest {
5866                method: "GET".to_string(),
5867                target: "/health/ready".to_string(),
5868                version: "HTTP/1.1".to_string(),
5869                headers: Vec::new(),
5870                body: Vec::new(),
5871            },
5872            &config,
5873        );
5874        let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
5875        let checks = body["checks"].as_array().expect("checks array");
5876        assert!(
5877            checks.iter().all(|c| c["name"] != "cacheDiskFree"),
5878            "cacheDiskFree should not appear without cache_root"
5879        );
5880    }
5881
5882    #[test]
5883    fn health_ready_memory_check_includes_details() {
5884        let storage = temp_dir("health-ready-mem-detail");
5885        let mut config = ServerConfig::new(storage, None);
5886        config.health_max_memory_bytes = Some(u64::MAX);
5887        let response = route_request(
5888            HttpRequest {
5889                method: "GET".to_string(),
5890                target: "/health/ready".to_string(),
5891                version: "HTTP/1.1".to_string(),
5892                headers: Vec::new(),
5893                body: Vec::new(),
5894            },
5895            &config,
5896        );
5897        let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
5898        let checks = body["checks"].as_array().expect("checks array");
5899        let mem = checks.iter().find(|c| c["name"] == "memoryUsage");
5900        if cfg!(target_os = "linux") {
5901            let mem = mem.expect("memoryUsage check present on Linux");
5902            assert_eq!(mem["status"], "ok");
5903            assert_eq!(mem["thresholdBytes"], u64::MAX);
5904            assert!(mem["rssBytes"].as_u64().is_some());
5905        } else {
5906            assert!(mem.is_none(), "memoryUsage should be absent on non-Linux");
5907        }
5908    }
5909
5910    // ── graceful shutdown: draining flag ─────────────────────────────
5911
5912    #[test]
5913    fn health_ready_returns_503_when_draining() {
5914        let storage = temp_dir("health-ready-draining");
5915        let config = ServerConfig::new(storage, None);
5916        config.draining.store(true, Ordering::Relaxed);
5917
5918        let response = route_request(
5919            HttpRequest {
5920                method: "GET".to_string(),
5921                target: "/health/ready".to_string(),
5922                version: "HTTP/1.1".to_string(),
5923                headers: Vec::new(),
5924                body: Vec::new(),
5925            },
5926            &config,
5927        );
5928
5929        assert_eq!(response.status, "503 Service Unavailable");
5930        let body: serde_json::Value =
5931            serde_json::from_slice(&response.body).expect("parse ready body");
5932        assert_eq!(body["status"], "fail");
5933        let checks = body["checks"].as_array().expect("checks array");
5934        assert!(
5935            checks
5936                .iter()
5937                .any(|c| c["name"] == "draining" && c["status"] == "fail")
5938        );
5939    }
5940
5941    #[test]
5942    fn health_ready_returns_ok_when_not_draining() {
5943        let storage = temp_dir("health-ready-not-draining");
5944        let config = ServerConfig::new(storage, None);
5945        // draining is false by default.
5946        let response = route_request(
5947            HttpRequest {
5948                method: "GET".to_string(),
5949                target: "/health/ready".to_string(),
5950                version: "HTTP/1.1".to_string(),
5951                headers: Vec::new(),
5952                body: Vec::new(),
5953            },
5954            &config,
5955        );
5956
5957        assert_eq!(response.status, "200 OK");
5958        let body: serde_json::Value =
5959            serde_json::from_slice(&response.body).expect("parse ready body");
5960        assert_eq!(body["status"], "ok");
5961        // Should not have a draining check entry.
5962        let checks = body["checks"].as_array().expect("checks array");
5963        assert!(!checks.iter().any(|c| c["name"] == "draining"));
5964    }
5965
5966    // ── Drain during normal request processing (m10) ─────────────
5967
5968    #[test]
5969    fn health_live_returns_200_while_draining() {
5970        let storage = temp_dir("live-draining");
5971        let config = ServerConfig::new(storage, None);
5972        config.draining.store(true, Ordering::Relaxed);
5973
5974        let response = route_request(
5975            HttpRequest {
5976                method: "GET".to_string(),
5977                target: "/health/live".to_string(),
5978                version: "HTTP/1.1".to_string(),
5979                headers: Vec::new(),
5980                body: Vec::new(),
5981            },
5982            &config,
5983        );
5984
5985        // Liveness should always return 200 even when draining — only
5986        // readiness returns 503.
5987        assert_eq!(response.status, "200 OK");
5988    }
5989
5990    #[test]
5991    fn normal_request_processed_while_draining() {
5992        let storage = temp_dir("normal-draining");
5993        let config = ServerConfig::new(storage, None);
5994        config.draining.store(true, Ordering::Relaxed);
5995
5996        // A non-health, non-image request should still be routed (e.g. 404
5997        // because the path doesn't match any route) — it should NOT get a
5998        // 503 just because the server is draining.
5999        let response = route_request(
6000            HttpRequest {
6001                method: "GET".to_string(),
6002                target: "/nonexistent".to_string(),
6003                version: "HTTP/1.1".to_string(),
6004                headers: Vec::new(),
6005                body: Vec::new(),
6006            },
6007            &config,
6008        );
6009
6010        // The path doesn't match any route, so we get 404 — NOT 503.
6011        assert_eq!(response.status, "404 Not Found");
6012    }
6013}