Skip to main content

truss/adapters/server/
mod.rs

1//! HTTP image-transform server.
2//!
3//! # Threading model
4//!
5//! The server uses **synchronous, blocking I/O** with one OS thread per TCP
6//! connection. This is a deliberate design choice, not a limitation:
7//!
8//! - **Simplicity:** No async runtime (tokio/async-std) dependency for the core
9//!   server. This reduces binary size, compile time, and cognitive overhead.
10//! - **Predictable resource usage:** Each connection consumes a fixed stack
11//!   allocation. There is no task queue, no hidden buffering, and no executor
12//!   scheduling overhead.
13//! - **Bounded concurrency:** `TRUSS_MAX_CONCURRENT_TRANSFORMS` (default 64)
14//!   caps the number of simultaneous image transforms via a semaphore-like
15//!   `TransformSlot` guard. Excess requests receive 503 Service Unavailable.
16//!
17//! **Trade-off:** Slow clients (slow uploads, slow TLS handshakes) block their
18//! thread for the duration of the connection. In production deployments, a
19//! reverse proxy (nginx, envoy, CloudFront) should handle slow-client buffering.
20//!
21//! This design may be reconsidered if the server needs to handle thousands of
22//! concurrent idle connections (e.g., WebSocket or SSE), but for a
23//! request-response image API the current model is sufficient.
24
25mod auth;
26#[cfg(feature = "azure")]
27pub mod azure;
28mod cache;
29mod config;
30#[cfg(feature = "gcs")]
31pub mod gcs;
32mod http_parse;
33mod metrics;
34mod multipart;
35mod negotiate;
36mod rate_limit;
37mod remote;
38mod response;
39#[cfg(feature = "s3")]
40pub mod s3;
41
42#[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
43pub use config::StorageBackend;
44use config::StorageBackendLabel;
45pub use config::{DEFAULT_BIND_ADDR, DEFAULT_STORAGE_ROOT, LogHandler, LogLevel, ServerConfig};
46
47use auth::{
48    authorize_request, authorize_request_headers, authorize_signed_request,
49    canonical_query_without_signature, extend_transform_query, parse_optional_bool_query,
50    parse_optional_float_query, parse_optional_integer_query, parse_optional_u8_query,
51    parse_query_params, required_query_param, signed_source_query, url_authority,
52    validate_public_query_names,
53};
54use cache::{
55    CacheLookup, TransformCache, compute_cache_key, compute_watermark_identity,
56    try_versioned_cache_lookup,
57};
58use http_parse::{
59    HttpRequest, parse_named, parse_optional_named, read_request_body, read_request_headers,
60    request_has_json_content_type,
61};
62use metrics::{
63    CACHE_HITS_TOTAL, CACHE_MISSES_TOTAL, RouteMetric, record_http_metrics,
64    record_http_request_duration, record_storage_duration, record_transform_duration,
65    record_transform_error, record_watermark_transform, render_metrics_text, status_code,
66    storage_backend_index_from_config, uptime_seconds,
67};
68use multipart::{parse_multipart_boundary, parse_upload_request};
69use negotiate::{
70    CacheHitStatus, ImageResponsePolicy, PublicSourceKind, build_image_etag,
71    build_image_response_headers, if_none_match_matches, negotiate_output_format,
72};
73use remote::{read_remote_watermark_bytes, resolve_source_bytes};
74use response::{
75    HttpResponse, NOT_FOUND_BODY, bad_request_response, service_unavailable_response,
76    too_many_requests_response, transform_error_response, unsupported_media_type_response,
77    write_response, write_response_compressed,
78};
79
80use crate::{
81    CropRegion, Fit, MediaType, Position, RawArtifact, Rgba8, Rotation, TransformOptions,
82    TransformRequest, WatermarkInput, sniff_artifact, transform_raster, transform_svg,
83};
84use hmac::{Hmac, Mac};
85use serde::Deserialize;
86use serde_json::json;
87use sha2::{Digest, Sha256};
88use std::collections::BTreeMap;
89use std::env;
90use std::io;
91use std::net::{TcpListener, TcpStream};
92use std::str::FromStr;
93use std::sync::Arc;
94use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU8, AtomicU64, Ordering};
95use std::time::{Duration, Instant};
96use subtle::ConstantTimeEq;
97use url::Url;
98use uuid::Uuid;
99
100/// Writes a line to stderr using a raw file-descriptor/handle write, bypassing
101/// Rust's `std::io::Stderr` type whose internal `ReentrantLock` can interfere
102/// with `MutexGuard` drop ordering in Rust 2024 edition, breaking HTTP
103/// keep-alive.
104pub(crate) fn stderr_write(msg: &str) {
105    use std::io::Write;
106
107    let bytes = msg.as_bytes();
108    let mut buf = Vec::with_capacity(bytes.len() + 1);
109    buf.extend_from_slice(bytes);
110    buf.push(b'\n');
111
112    #[cfg(unix)]
113    {
114        use std::os::fd::FromRawFd;
115        // SAFETY: fd 2 (stderr) is always valid for the lifetime of the process.
116        let mut f = unsafe { std::fs::File::from_raw_fd(2) };
117        let _ = f.write_all(&buf);
118        // Do not drop `f` — that would close fd 2 (stderr).
119        std::mem::forget(f);
120    }
121
122    #[cfg(windows)]
123    {
124        use std::os::windows::io::FromRawHandle;
125
126        unsafe extern "system" {
127            fn GetStdHandle(nStdHandle: u32) -> *mut std::ffi::c_void;
128        }
129
130        const STD_ERROR_HANDLE: u32 = (-12_i32) as u32;
131        // SAFETY: GetStdHandle(STD_ERROR_HANDLE) returns the stderr handle
132        // which is always valid for the lifetime of the process.
133        let handle = unsafe { GetStdHandle(STD_ERROR_HANDLE) };
134        let mut f = unsafe { std::fs::File::from_raw_handle(handle) };
135        let _ = f.write_all(&buf);
136        // Do not drop `f` — that would close the stderr handle.
137        std::mem::forget(f);
138    }
139}
140
141const SOCKET_READ_TIMEOUT: Duration = Duration::from_secs(60);
142const SOCKET_WRITE_TIMEOUT: Duration = Duration::from_secs(60);
143/// Number of worker threads for handling incoming connections concurrently.
144const WORKER_THREADS: usize = 8;
145type HmacSha256 = Hmac<Sha256>;
146
147#[derive(Clone, Copy)]
148struct PublicCacheControl {
149    max_age: u32,
150    stale_while_revalidate: u32,
151}
152
153#[derive(Clone, Copy)]
154struct ImageResponseConfig {
155    disable_accept_negotiation: bool,
156    public_cache_control: PublicCacheControl,
157    transform_deadline: Duration,
158}
159
160/// RAII guard that holds a concurrency slot for an in-flight image transform.
161///
162/// The counter is incremented on successful acquisition and decremented when
163/// the guard is dropped, ensuring the slot is always released even if the
164/// caller returns early or panics.
165struct TransformSlot {
166    counter: Arc<AtomicU64>,
167}
168
169impl TransformSlot {
170    fn try_acquire(counter: &Arc<AtomicU64>, limit: u64) -> Option<Self> {
171        let prev = counter.fetch_add(1, Ordering::Relaxed);
172        if prev >= limit {
173            counter.fetch_sub(1, Ordering::Relaxed);
174            None
175        } else {
176            Some(Self {
177                counter: Arc::clone(counter),
178            })
179        }
180    }
181}
182
183impl Drop for TransformSlot {
184    fn drop(&mut self) {
185        self.counter.fetch_sub(1, Ordering::Relaxed);
186    }
187}
188
189/// Source selector used when generating a signed public transform URL.
190#[derive(Debug, Clone, PartialEq, Eq)]
191pub enum SignedUrlSource {
192    /// Generates a signed `GET /images/by-path` URL.
193    Path {
194        /// The storage-relative source path.
195        path: String,
196        /// An optional source version token.
197        version: Option<String>,
198    },
199    /// Generates a signed `GET /images/by-url` URL.
200    Url {
201        /// The remote source URL.
202        url: String,
203        /// An optional source version token.
204        version: Option<String>,
205    },
206}
207
208/// Builds a signed public transform URL for the server adapter.
209///
210/// The resulting URL targets either `GET /images/by-path` or `GET /images/by-url` depending on
211/// `source`. `base_url` must be an absolute `http` or `https` URL that points at the externally
212/// visible server origin. The helper applies the same canonical query and HMAC-SHA256 signature
213/// scheme that the server adapter verifies at request time.
214///
215/// The helper serializes only explicitly requested transform options and omits fields that would
216/// resolve to the documented defaults on the server side.
217///
218/// # Errors
219///
220/// Returns an error string when `base_url` is not an absolute `http` or `https` URL, when the
221/// visible authority cannot be determined, or when the HMAC state cannot be initialized.
222///
223/// # Examples
224///
225/// ```
226/// use truss::adapters::server::{sign_public_url, SignedUrlSource};
227/// use truss::{MediaType, TransformOptions};
228///
229/// let url = sign_public_url(
230///     "https://cdn.example.com",
231///     SignedUrlSource::Path {
232///         path: "/image.png".to_string(),
233///         version: None,
234///     },
235///     &TransformOptions {
236///         format: Some(MediaType::Jpeg),
237///         ..TransformOptions::default()
238///     },
239///     "public-dev",
240///     "secret-value",
241///     4_102_444_800,
242///     None,
243///     None,
244/// )
245/// .unwrap();
246///
247/// assert!(url.starts_with("https://cdn.example.com/images/by-path?"));
248/// assert!(url.contains("keyId=public-dev"));
249/// assert!(url.contains("signature="));
250/// ```
251/// Optional watermark parameters for signed URL generation.
252#[derive(Debug, Default)]
253pub struct SignedWatermarkParams {
254    pub url: String,
255    pub position: Option<String>,
256    pub opacity: Option<u8>,
257    pub margin: Option<u32>,
258}
259
260#[allow(clippy::too_many_arguments)]
261pub fn sign_public_url(
262    base_url: &str,
263    source: SignedUrlSource,
264    options: &TransformOptions,
265    key_id: &str,
266    secret: &str,
267    expires: u64,
268    watermark: Option<&SignedWatermarkParams>,
269    preset: Option<&str>,
270) -> Result<String, String> {
271    let base_url = Url::parse(base_url).map_err(|error| format!("base URL is invalid: {error}"))?;
272    match base_url.scheme() {
273        "http" | "https" => {}
274        _ => return Err("base URL must use the http or https scheme".to_string()),
275    }
276
277    let route_path = match source {
278        SignedUrlSource::Path { .. } => "/images/by-path",
279        SignedUrlSource::Url { .. } => "/images/by-url",
280    };
281    let mut endpoint = base_url
282        .join(route_path)
283        .map_err(|error| format!("failed to resolve the public endpoint URL: {error}"))?;
284    let authority = url_authority(&endpoint)?;
285    let mut query = signed_source_query(source);
286    if let Some(name) = preset {
287        query.insert("preset".to_string(), name.to_string());
288    }
289    extend_transform_query(&mut query, options);
290    if let Some(wm) = watermark {
291        query.insert("watermarkUrl".to_string(), wm.url.clone());
292        if let Some(ref pos) = wm.position {
293            query.insert("watermarkPosition".to_string(), pos.clone());
294        }
295        if let Some(opacity) = wm.opacity {
296            query.insert("watermarkOpacity".to_string(), opacity.to_string());
297        }
298        if let Some(margin) = wm.margin {
299            query.insert("watermarkMargin".to_string(), margin.to_string());
300        }
301    }
302    query.insert("keyId".to_string(), key_id.to_string());
303    query.insert("expires".to_string(), expires.to_string());
304
305    let canonical = format!(
306        "GET\n{}\n{}\n{}",
307        authority,
308        endpoint.path(),
309        canonical_query_without_signature(&query)
310    );
311    let mut mac = HmacSha256::new_from_slice(secret.as_bytes())
312        .map_err(|error| format!("failed to initialize signed URL HMAC: {error}"))?;
313    mac.update(canonical.as_bytes());
314    query.insert(
315        "signature".to_string(),
316        hex::encode(mac.finalize().into_bytes()),
317    );
318
319    let mut serializer = url::form_urlencoded::Serializer::new(String::new());
320    for (name, value) in query {
321        serializer.append_pair(&name, &value);
322    }
323    endpoint.set_query(Some(&serializer.finish()));
324    Ok(endpoint.into())
325}
326
327/// Returns the bind address for the HTTP server adapter.
328///
329/// The adapter reads `TRUSS_BIND_ADDR` when it is present. Otherwise it falls back to
330/// [`DEFAULT_BIND_ADDR`].
331pub fn bind_addr() -> String {
332    env::var("TRUSS_BIND_ADDR").unwrap_or_else(|_| DEFAULT_BIND_ADDR.to_string())
333}
334
335/// Serves requests until the listener stops producing connections.
336///
337/// This helper loads [`ServerConfig`] from the process environment and then delegates to
338/// [`serve_with_config`]. Health endpoints remain available even when the private API is not
339/// configured, but authenticated transform requests will return `503 Service Unavailable`
340/// unless `TRUSS_BEARER_TOKEN` is set.
341///
342/// # Errors
343///
344/// Returns an [`io::Error`] when the storage root cannot be resolved, when accepting the next
345/// connection fails, or when a response cannot be written to the socket.
346pub fn serve(listener: TcpListener) -> io::Result<()> {
347    let config = ServerConfig::from_env()?;
348
349    // Fail fast: verify the storage backend is reachable before accepting
350    // connections so that configuration errors are surfaced immediately.
351    for (ok, name) in storage_health_check(&config) {
352        if !ok {
353            return Err(io::Error::new(
354                io::ErrorKind::ConnectionRefused,
355                format!(
356                    "storage connectivity check failed for `{name}` — verify the backend \
357                     endpoint, credentials, and container/bucket configuration"
358                ),
359            ));
360        }
361    }
362
363    serve_with_config(listener, config)
364}
365
366/// Serves requests with an explicit server configuration.
367///
368/// This is the adapter entry point for tests and embedding scenarios that want deterministic
369/// configuration instead of environment-variable lookup.
370///
371/// # Errors
372///
373/// Returns an [`io::Error`] when accepting the next connection fails or when a response cannot
374/// be written to the socket.
375pub fn serve_with_config(listener: TcpListener, config: ServerConfig) -> io::Result<()> {
376    let config = Arc::new(config);
377    let (sender, receiver) = std::sync::mpsc::channel::<TcpStream>();
378
379    // Spawn a pool of worker threads sized to the configured concurrency limit
380    // (with a minimum of WORKER_THREADS to leave headroom for non-transform
381    // requests such as health checks and metrics).  Each thread pulls connections
382    // from the shared channel and handles them independently, so a slow request
383    // no longer blocks all other clients.
384    let receiver = Arc::new(std::sync::Mutex::new(receiver));
385    let pool_size = (config.max_concurrent_transforms as usize).max(WORKER_THREADS);
386    let mut workers = Vec::with_capacity(pool_size);
387    for _ in 0..pool_size {
388        let rx = Arc::clone(&receiver);
389        let cfg = Arc::clone(&config);
390        workers.push(std::thread::spawn(move || {
391            loop {
392                let stream = {
393                    let guard = rx.lock().expect("worker lock poisoned");
394                    match guard.recv() {
395                        Ok(stream) => stream,
396                        Err(_) => break,
397                    }
398                }; // MutexGuard dropped here — before handle_stream runs.
399                if let Err(err) = handle_stream(stream, &cfg) {
400                    cfg.log_warn(&format!("failed to handle connection: {err}"));
401                }
402            }
403        }));
404    }
405
406    // Install signal handler for graceful shutdown.  The handler sets the
407    // shared `draining` flag (so /health/ready returns 503 immediately) and
408    // writes a byte to a self-pipe to wake the accept loop.
409    let (shutdown_read_fd, shutdown_write_fd) = create_shutdown_pipe()?;
410    install_signal_handler(
411        Arc::clone(&config.draining),
412        shutdown_write_fd,
413        Arc::clone(&config.log_level),
414    );
415
416    // Spawn a background thread to hot-reload presets when TRUSS_PRESETS_FILE changes.
417    if let Some(ref path) = config.presets_file_path {
418        let presets = Arc::clone(&config.presets);
419        let draining = Arc::clone(&config.draining);
420        let cfg = Arc::clone(&config);
421        let path = path.clone();
422        std::thread::Builder::new()
423            .name("preset-watcher".into())
424            .spawn(move || preset_watcher(presets, path, draining, cfg))
425            .expect("failed to spawn preset watcher thread");
426    }
427
428    // Set the listener to non-blocking so we can multiplex between incoming
429    // connections and the shutdown pipe.
430    listener.set_nonblocking(true)?;
431
432    loop {
433        // Wait for activity on the listener or shutdown pipe. On Unix we use
434        // poll(2) to block efficiently; on Windows we fall back to polling the
435        // draining flag with a short sleep.
436        wait_for_accept_or_shutdown(&listener, shutdown_read_fd, &config.draining);
437
438        // Check the shutdown pipe first.
439        if poll_shutdown_pipe(shutdown_read_fd) {
440            break;
441        }
442
443        // Also check the draining flag directly (needed on Windows where the
444        // shutdown pipe is not available).
445        if config.draining.load(Ordering::SeqCst) {
446            break;
447        }
448
449        match listener.accept() {
450            Ok((stream, _addr)) => {
451                // Accepted connections are always blocking for the workers.
452                let _ = stream.set_nonblocking(false);
453                if sender.send(stream).is_err() {
454                    break;
455                }
456            }
457            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
458                // Spurious wakeup — retry.
459            }
460            Err(err) => return Err(err),
461        }
462    }
463
464    // --- Drain phase ---
465    let drain_secs = config.shutdown_drain_secs;
466    config.log(&format!(
467        "shutdown: drain started, waiting {drain_secs}s for load balancers"
468    ));
469    if drain_secs > 0 {
470        std::thread::sleep(Duration::from_secs(drain_secs));
471    }
472    config.log("shutdown: drain complete, closing listener");
473
474    // Stop dispatching new connections to workers.
475    drop(sender);
476    // Worker drain deadline: 15s so that total shutdown (drain + worker drain)
477    // fits within Kubernetes default terminationGracePeriodSeconds of 30s.
478    let deadline = std::time::Instant::now() + Duration::from_secs(15);
479    for worker in workers {
480        let remaining = deadline.saturating_duration_since(std::time::Instant::now());
481        if remaining.is_zero() {
482            stderr_write("shutdown: timed out waiting for worker threads");
483            break;
484        }
485        // Park the main thread until the worker finishes or the deadline
486        // elapses. We cannot interrupt a blocked worker, but the socket
487        // read/write timeouts ensure workers do not block forever.
488        let worker_done =
489            std::sync::Arc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
490        let wd = std::sync::Arc::clone(&worker_done);
491        std::thread::spawn(move || {
492            let _ = worker.join();
493            let (lock, cvar) = &*wd;
494            *lock.lock().expect("shutdown notify lock") = true;
495            cvar.notify_one();
496        });
497        let (lock, cvar) = &*worker_done;
498        let mut done = lock.lock().expect("shutdown wait lock");
499        while !*done {
500            let (guard, timeout) = cvar
501                .wait_timeout(done, remaining)
502                .expect("shutdown condvar wait");
503            done = guard;
504            if timeout.timed_out() {
505                stderr_write("shutdown: timed out waiting for a worker thread");
506                break;
507            }
508        }
509    }
510
511    config.log("shutdown: complete");
512    close_shutdown_pipe(shutdown_read_fd, shutdown_write_fd);
513    Ok(())
514}
515
516// ---------------------------------------------------------------------------
517// Shutdown pipe helpers — a minimal self-pipe for waking the accept loop from
518// a signal handler without requiring async I/O.
519// ---------------------------------------------------------------------------
520
521#[cfg(unix)]
522fn create_shutdown_pipe() -> io::Result<(i32, i32)> {
523    let mut fds = [0i32; 2];
524    if unsafe { libc::pipe(fds.as_mut_ptr()) } != 0 {
525        return Err(io::Error::last_os_error());
526    }
527    // Make both ends non-blocking: the read end so `poll_shutdown_pipe` never
528    // stalls, and the write end so the signal handler never blocks.
529    unsafe {
530        libc::fcntl(fds[0], libc::F_SETFL, libc::O_NONBLOCK);
531        libc::fcntl(fds[1], libc::F_SETFL, libc::O_NONBLOCK);
532    }
533    Ok((fds[0], fds[1]))
534}
535
536#[cfg(windows)]
537fn create_shutdown_pipe() -> io::Result<(i32, i32)> {
538    // On Windows we fall back to a polling approach using the draining flag.
539    Ok((-1, -1))
540}
541
542#[cfg(unix)]
543fn poll_shutdown_pipe(read_fd: i32) -> bool {
544    let mut buf = [0u8; 1];
545    let n = unsafe { libc::read(read_fd, buf.as_mut_ptr().cast(), 1) };
546    n > 0
547}
548
549#[cfg(windows)]
550fn poll_shutdown_pipe(_read_fd: i32) -> bool {
551    false
552}
553
554/// Block until the listener socket or the shutdown pipe has data ready.
555/// On Unix this uses `poll(2)` for zero-CPU-cost waiting; on Windows it falls
556/// back to a short sleep since the shutdown pipe is not available.
557#[cfg(unix)]
558fn wait_for_accept_or_shutdown(
559    listener: &std::net::TcpListener,
560    shutdown_read_fd: i32,
561    _draining: &AtomicBool,
562) {
563    use std::os::unix::io::AsRawFd;
564    let listener_fd = listener.as_raw_fd();
565    let mut fds = [
566        libc::pollfd {
567            fd: listener_fd,
568            events: libc::POLLIN,
569            revents: 0,
570        },
571        libc::pollfd {
572            fd: shutdown_read_fd,
573            events: libc::POLLIN,
574            revents: 0,
575        },
576    ];
577    // Block indefinitely (-1 timeout). Signal delivery will interrupt with
578    // EINTR, which is fine — we just re-check the shutdown conditions.
579    unsafe { libc::poll(fds.as_mut_ptr(), 2, -1) };
580}
581
582#[cfg(windows)]
583fn wait_for_accept_or_shutdown(
584    _listener: &std::net::TcpListener,
585    _shutdown_read_fd: i32,
586    draining: &AtomicBool,
587) {
588    // On Windows, poll(2) is not available for the listener socket. Sleep
589    // briefly and let the caller check the draining flag.
590    if !draining.load(Ordering::SeqCst) {
591        std::thread::sleep(Duration::from_millis(10));
592    }
593}
594
595#[cfg(unix)]
596fn close_shutdown_pipe(read_fd: i32, write_fd: i32) {
597    unsafe {
598        libc::close(read_fd);
599        libc::close(write_fd);
600    }
601}
602
603#[cfg(windows)]
604fn close_shutdown_pipe(_read_fd: i32, _write_fd: i32) {}
605
606/// Global write-end of the shutdown pipe, written to from the signal handler.
607static SHUTDOWN_PIPE_WR: AtomicI32 = AtomicI32::new(-1);
608/// Global draining flag set by the signal handler.
609static GLOBAL_DRAINING: std::sync::atomic::AtomicPtr<AtomicBool> =
610    std::sync::atomic::AtomicPtr::new(std::ptr::null_mut());
611/// Global log level, cycled by SIGUSR1 (Unix only).
612static GLOBAL_LOG_LEVEL: std::sync::atomic::AtomicPtr<AtomicU8> =
613    std::sync::atomic::AtomicPtr::new(std::ptr::null_mut());
614
615#[cfg(unix)]
616fn install_signal_handler(draining: Arc<AtomicBool>, write_fd: i32, log_level: Arc<AtomicU8>) {
617    // Store the write fd and draining pointer in globals accessible from the
618    // async-signal-safe handler.
619    SHUTDOWN_PIPE_WR.store(write_fd, Ordering::SeqCst);
620    // SAFETY: `Arc::into_raw` leaks intentionally — the pointer remains valid
621    // for the process lifetime.  The signal handler only calls `AtomicBool::store`
622    // and `libc::write`, both of which are async-signal-safe.
623    let ptr = Arc::into_raw(draining).cast_mut();
624    GLOBAL_DRAINING.store(ptr, Ordering::SeqCst);
625    // SAFETY: same as above — leaked intentionally for the process lifetime.
626    let lvl_ptr = Arc::into_raw(log_level).cast_mut();
627    GLOBAL_LOG_LEVEL.store(lvl_ptr, Ordering::SeqCst);
628
629    // Use sigaction instead of signal to avoid SysV semantics where the handler
630    // is reset to SIG_DFL after the first invocation. SA_RESTART ensures that
631    // interrupted syscalls are automatically restarted.
632    unsafe {
633        let mut sa: libc::sigaction = std::mem::zeroed();
634        sa.sa_sigaction = signal_handler as *const () as libc::sighandler_t;
635        libc::sigemptyset(&mut sa.sa_mask);
636        sa.sa_flags = libc::SA_RESTART;
637        libc::sigaction(libc::SIGTERM, &sa, std::ptr::null_mut());
638        libc::sigaction(libc::SIGINT, &sa, std::ptr::null_mut());
639
640        let mut sa_usr1: libc::sigaction = std::mem::zeroed();
641        sa_usr1.sa_sigaction = sigusr1_handler as *const () as libc::sighandler_t;
642        libc::sigemptyset(&mut sa_usr1.sa_mask);
643        sa_usr1.sa_flags = libc::SA_RESTART;
644        libc::sigaction(libc::SIGUSR1, &sa_usr1, std::ptr::null_mut());
645    }
646}
647
648/// SIGUSR1 handler: cycles the log level.
649///
650/// This is async-signal-safe because it only performs atomic load/store
651/// operations and a raw `libc::write` to stderr.
652#[cfg(unix)]
653extern "C" fn sigusr1_handler(_sig: libc::c_int) {
654    let ptr = GLOBAL_LOG_LEVEL.load(Ordering::SeqCst);
655    if ptr.is_null() {
656        return;
657    }
658    let level_atomic = unsafe { &*ptr };
659    let current = level_atomic.load(Ordering::SeqCst);
660    let next = LogLevel::from_u8(current).cycle();
661    level_atomic.store(next as u8, Ordering::SeqCst);
662
663    // Write a log message directly to stderr (async-signal-safe).
664    let msg = match next {
665        LogLevel::Error => b"[log] level changed to error\n" as &[u8],
666        LogLevel::Warn => b"[log] level changed to warn\n",
667        LogLevel::Info => b"[log] level changed to info\n",
668        LogLevel::Debug => b"[log] level changed to debug\n",
669    };
670    unsafe { libc::write(2, msg.as_ptr().cast(), msg.len()) };
671}
672
673#[cfg(unix)]
674extern "C" fn signal_handler(_sig: libc::c_int) {
675    // Set the draining flag — async-signal-safe (atomic store).
676    let ptr = GLOBAL_DRAINING.load(Ordering::SeqCst);
677    if !ptr.is_null() {
678        unsafe { (*ptr).store(true, Ordering::SeqCst) };
679    }
680    // Wake the accept loop by writing to the self-pipe.
681    let fd = SHUTDOWN_PIPE_WR.load(Ordering::SeqCst);
682    if fd >= 0 {
683        let byte: u8 = 1;
684        unsafe { libc::write(fd, (&byte as *const u8).cast(), 1) };
685    }
686}
687
688#[cfg(windows)]
689fn install_signal_handler(draining: Arc<AtomicBool>, _write_fd: i32, _log_level: Arc<AtomicU8>) {
690    // Store the draining pointer in the global so the signal handler can set it.
691    let ptr = Arc::into_raw(draining).cast_mut();
692    GLOBAL_DRAINING.store(ptr, Ordering::SeqCst);
693
694    // On Windows, register a SIGINT handler (Ctrl+C) via the C runtime.
695    // The accept loop checks `draining` in the WouldBlock branch.
696    unsafe {
697        libc::signal(libc::SIGINT, windows_signal_handler as libc::sighandler_t);
698    }
699}
700
701#[cfg(windows)]
702extern "C" fn windows_signal_handler(_sig: libc::c_int) {
703    let ptr = GLOBAL_DRAINING.load(Ordering::SeqCst);
704    if !ptr.is_null() {
705        unsafe { (*ptr).store(true, Ordering::SeqCst) };
706    }
707    // Re-register the handler since Windows resets to SIG_DFL after each signal.
708    unsafe {
709        libc::signal(libc::SIGINT, windows_signal_handler as libc::sighandler_t);
710    }
711}
712
713// ---------------------------------------------------------------------------
714// Preset hot-reload watcher
715// ---------------------------------------------------------------------------
716
717/// Polling interval for the preset file watcher.
718const PRESET_WATCH_INTERVAL: Duration = Duration::from_secs(5);
719
720/// Background thread that watches `TRUSS_PRESETS_FILE` for changes and reloads
721/// presets atomically. On parse failure, the previous valid presets are kept.
722fn preset_watcher(
723    presets: Arc<std::sync::RwLock<std::collections::HashMap<String, TransformOptionsPayload>>>,
724    path: std::path::PathBuf,
725    draining: Arc<AtomicBool>,
726    config: Arc<ServerConfig>,
727) {
728    use config::parse_presets_file;
729    use std::fs;
730
731    let mut last_modified = fs::metadata(&path).and_then(|m| m.modified()).ok();
732
733    loop {
734        std::thread::sleep(PRESET_WATCH_INTERVAL);
735
736        if draining.load(Ordering::Relaxed) {
737            break;
738        }
739
740        let current_modified = match fs::metadata(&path).and_then(|m| m.modified()) {
741            Ok(mtime) => Some(mtime),
742            Err(err) => {
743                config.log_warn(&format!(
744                    "[presets] failed to stat `{}`: {err}",
745                    path.display()
746                ));
747                continue;
748            }
749        };
750
751        if current_modified == last_modified {
752            continue;
753        }
754
755        match parse_presets_file(&path) {
756            Ok(new_presets) => {
757                let count = new_presets.len();
758                *presets.write().expect("presets lock poisoned") = new_presets;
759                last_modified = current_modified;
760                config.log(&format!(
761                    "[presets] reloaded {count} presets from `{}`",
762                    path.display()
763                ));
764            }
765            Err(err) => {
766                config.log_warn(&format!(
767                    "[presets] reload failed for `{}`: {err} (keeping previous presets)",
768                    path.display()
769                ));
770                // Do NOT update last_modified here — the file may have been read
771                // mid-write (torn read). By keeping the old mtime, the watcher
772                // will retry on the next poll cycle and pick up the completed file.
773            }
774        }
775    }
776}
777
778/// Serves exactly one request using configuration loaded from the environment.
779///
780/// This helper is primarily useful in tests that want to drive the server over a real TCP
781/// socket but do not need a long-running loop.
782///
783/// # Errors
784///
785/// Returns an [`io::Error`] when the storage root cannot be resolved, when accepting the next
786/// connection fails, or when a response cannot be written to the socket.
787pub fn serve_once(listener: TcpListener) -> io::Result<()> {
788    let config = ServerConfig::from_env()?;
789    serve_once_with_config(listener, config)
790}
791
792/// Serves exactly one request with an explicit server configuration.
793///
794/// # Errors
795///
796/// Returns an [`io::Error`] when accepting the next connection fails or when a response cannot
797/// be written to the socket.
798pub fn serve_once_with_config(listener: TcpListener, config: ServerConfig) -> io::Result<()> {
799    let (stream, _) = listener.accept()?;
800    handle_stream(stream, &config)
801}
802
803#[derive(Debug, Deserialize)]
804#[serde(deny_unknown_fields)]
805struct TransformImageRequestPayload {
806    source: TransformSourcePayload,
807    #[serde(default)]
808    options: TransformOptionsPayload,
809    #[serde(default)]
810    watermark: Option<WatermarkPayload>,
811}
812
813#[derive(Debug, Deserialize)]
814#[serde(tag = "kind", rename_all = "lowercase")]
815enum TransformSourcePayload {
816    Path {
817        path: String,
818        version: Option<String>,
819    },
820    Url {
821        url: String,
822        version: Option<String>,
823    },
824    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
825    Storage {
826        bucket: Option<String>,
827        key: String,
828        version: Option<String>,
829    },
830}
831
832impl TransformSourcePayload {
833    /// Computes a stable source hash from the reference and version, avoiding the
834    /// need to read the full source bytes when a version tag is present. Returns
835    /// `None` when no version is available, in which case the caller must fall back
836    /// to the content-hash approach.
837    /// Computes a stable source hash that includes the instance configuration
838    /// boundaries (storage root, allow_insecure_url_sources) so that cache entries
839    /// cannot be reused across instances with different security settings sharing
840    /// the same cache directory.
841    fn versioned_source_hash(&self, config: &ServerConfig) -> Option<String> {
842        let (kind, reference, version): (&str, std::borrow::Cow<'_, str>, Option<&str>) = match self
843        {
844            Self::Path { path, version } => ("path", path.as_str().into(), version.as_deref()),
845            Self::Url { url, version } => ("url", url.as_str().into(), version.as_deref()),
846            #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
847            Self::Storage {
848                bucket,
849                key,
850                version,
851            } => {
852                let (scheme, effective_bucket) =
853                    storage_scheme_and_bucket(bucket.as_deref(), config);
854                let effective_bucket = effective_bucket?;
855                (
856                    "storage",
857                    format!("{scheme}://{effective_bucket}/{key}").into(),
858                    version.as_deref(),
859                )
860            }
861        };
862        let version = version?;
863        // Use newline separators so that values containing colons cannot collide
864        // with different (reference, version) pairs. Include configuration boundaries
865        // to prevent cross-instance cache poisoning.
866        let mut id = String::new();
867        id.push_str(kind);
868        id.push('\n');
869        id.push_str(&reference);
870        id.push('\n');
871        id.push_str(version);
872        id.push('\n');
873        id.push_str(config.storage_root.to_string_lossy().as_ref());
874        id.push('\n');
875        id.push_str(if config.allow_insecure_url_sources {
876            "insecure"
877        } else {
878            "strict"
879        });
880        #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
881        {
882            id.push('\n');
883            id.push_str(storage_backend_label(config));
884            #[cfg(feature = "s3")]
885            if let Some(ref ctx) = config.s3_context
886                && let Some(ref endpoint) = ctx.endpoint_url
887            {
888                id.push('\n');
889                id.push_str(endpoint);
890            }
891            #[cfg(feature = "gcs")]
892            if let Some(ref ctx) = config.gcs_context
893                && let Some(ref endpoint) = ctx.endpoint_url
894            {
895                id.push('\n');
896                id.push_str(endpoint);
897            }
898            #[cfg(feature = "azure")]
899            if let Some(ref ctx) = config.azure_context {
900                id.push('\n');
901                id.push_str(&ctx.endpoint_url);
902            }
903        }
904        Some(hex::encode(Sha256::digest(id.as_bytes())))
905    }
906
907    /// Returns the storage backend label for metrics based on the source kind,
908    /// rather than the server config default.  Path → Filesystem, Storage →
909    /// whatever the config backend is, Url → None (no storage backend).
910    fn metrics_backend_label(&self, _config: &ServerConfig) -> Option<StorageBackendLabel> {
911        match self {
912            Self::Path { .. } => Some(StorageBackendLabel::Filesystem),
913            Self::Url { .. } => None,
914            #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
915            Self::Storage { .. } => Some(_config.storage_backend_label()),
916        }
917    }
918}
919
920#[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
921fn storage_scheme_and_bucket<'a>(
922    explicit_bucket: Option<&'a str>,
923    config: &'a ServerConfig,
924) -> (&'static str, Option<&'a str>) {
925    match config.storage_backend {
926        #[cfg(feature = "s3")]
927        StorageBackend::S3 => {
928            let bucket = explicit_bucket.or(config
929                .s3_context
930                .as_ref()
931                .map(|ctx| ctx.default_bucket.as_str()));
932            ("s3", bucket)
933        }
934        #[cfg(feature = "gcs")]
935        StorageBackend::Gcs => {
936            let bucket = explicit_bucket.or(config
937                .gcs_context
938                .as_ref()
939                .map(|ctx| ctx.default_bucket.as_str()));
940            ("gcs", bucket)
941        }
942        StorageBackend::Filesystem => ("fs", explicit_bucket),
943        #[cfg(feature = "azure")]
944        StorageBackend::Azure => {
945            let bucket = explicit_bucket.or(config
946                .azure_context
947                .as_ref()
948                .map(|ctx| ctx.default_container.as_str()));
949            ("azure", bucket)
950        }
951    }
952}
953
954#[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
955fn is_object_storage_backend(config: &ServerConfig) -> bool {
956    match config.storage_backend {
957        StorageBackend::Filesystem => false,
958        #[cfg(feature = "s3")]
959        StorageBackend::S3 => true,
960        #[cfg(feature = "gcs")]
961        StorageBackend::Gcs => true,
962        #[cfg(feature = "azure")]
963        StorageBackend::Azure => true,
964    }
965}
966
967#[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
968fn storage_backend_label(config: &ServerConfig) -> &'static str {
969    match config.storage_backend {
970        StorageBackend::Filesystem => "fs-backend",
971        #[cfg(feature = "s3")]
972        StorageBackend::S3 => "s3-backend",
973        #[cfg(feature = "gcs")]
974        StorageBackend::Gcs => "gcs-backend",
975        #[cfg(feature = "azure")]
976        StorageBackend::Azure => "azure-backend",
977    }
978}
979
980#[derive(Clone, Debug, Default, Deserialize, PartialEq)]
981#[serde(default, rename_all = "camelCase", deny_unknown_fields)]
982pub struct TransformOptionsPayload {
983    pub width: Option<u32>,
984    pub height: Option<u32>,
985    pub fit: Option<String>,
986    pub position: Option<String>,
987    pub format: Option<String>,
988    pub quality: Option<u8>,
989    pub background: Option<String>,
990    pub rotate: Option<u16>,
991    pub auto_orient: Option<bool>,
992    pub strip_metadata: Option<bool>,
993    pub preserve_exif: Option<bool>,
994    pub crop: Option<String>,
995    pub blur: Option<f32>,
996    pub sharpen: Option<f32>,
997}
998
999impl TransformOptionsPayload {
1000    /// Merges per-request overrides on top of preset defaults.
1001    /// Each field in `overrides` takes precedence when set (`Some`).
1002    fn with_overrides(self, overrides: &TransformOptionsPayload) -> Self {
1003        Self {
1004            width: overrides.width.or(self.width),
1005            height: overrides.height.or(self.height),
1006            fit: overrides.fit.clone().or(self.fit),
1007            position: overrides.position.clone().or(self.position),
1008            format: overrides.format.clone().or(self.format),
1009            quality: overrides.quality.or(self.quality),
1010            background: overrides.background.clone().or(self.background),
1011            rotate: overrides.rotate.or(self.rotate),
1012            auto_orient: overrides.auto_orient.or(self.auto_orient),
1013            strip_metadata: overrides.strip_metadata.or(self.strip_metadata),
1014            preserve_exif: overrides.preserve_exif.or(self.preserve_exif),
1015            crop: overrides.crop.clone().or(self.crop),
1016            blur: overrides.blur.or(self.blur),
1017            sharpen: overrides.sharpen.or(self.sharpen),
1018        }
1019    }
1020
1021    fn into_options(self) -> Result<TransformOptions, HttpResponse> {
1022        let defaults = TransformOptions::default();
1023
1024        Ok(TransformOptions {
1025            width: self.width,
1026            height: self.height,
1027            fit: parse_optional_named(self.fit.as_deref(), "fit", Fit::from_str)?,
1028            position: parse_optional_named(
1029                self.position.as_deref(),
1030                "position",
1031                Position::from_str,
1032            )?,
1033            format: parse_optional_named(self.format.as_deref(), "format", MediaType::from_str)?,
1034            quality: self.quality,
1035            background: parse_optional_named(
1036                self.background.as_deref(),
1037                "background",
1038                Rgba8::from_hex,
1039            )?,
1040            rotate: match self.rotate {
1041                Some(value) => parse_named(&value.to_string(), "rotate", Rotation::from_str)?,
1042                None => defaults.rotate,
1043            },
1044            auto_orient: self.auto_orient.unwrap_or(defaults.auto_orient),
1045            strip_metadata: self.strip_metadata.unwrap_or(defaults.strip_metadata),
1046            preserve_exif: self.preserve_exif.unwrap_or(defaults.preserve_exif),
1047            crop: parse_optional_named(self.crop.as_deref(), "crop", CropRegion::from_str)?,
1048            blur: self.blur,
1049            sharpen: self.sharpen,
1050            deadline: defaults.deadline,
1051        })
1052    }
1053}
1054
1055/// Overall request deadline for outbound fetches (source + watermark combined).
1056const REQUEST_DEADLINE_SECS: u64 = 60;
1057
1058const WATERMARK_DEFAULT_POSITION: Position = Position::BottomRight;
1059const WATERMARK_DEFAULT_OPACITY: u8 = 50;
1060const WATERMARK_DEFAULT_MARGIN: u32 = 10;
1061const WATERMARK_MAX_MARGIN: u32 = 9999;
1062
1063#[derive(Debug, Default, Deserialize)]
1064#[serde(default, rename_all = "camelCase", deny_unknown_fields)]
1065struct WatermarkPayload {
1066    url: Option<String>,
1067    position: Option<String>,
1068    opacity: Option<u8>,
1069    margin: Option<u32>,
1070}
1071
1072/// Validated watermark parameters ready for fetching. No network I/O performed.
1073struct ValidatedWatermarkPayload {
1074    url: String,
1075    position: Position,
1076    opacity: u8,
1077    margin: u32,
1078}
1079
1080impl ValidatedWatermarkPayload {
1081    fn cache_identity(&self) -> String {
1082        compute_watermark_identity(
1083            &self.url,
1084            self.position.as_name(),
1085            self.opacity,
1086            self.margin,
1087        )
1088    }
1089}
1090
1091/// Validates watermark payload fields without performing network I/O.
1092fn validate_watermark_payload(
1093    payload: Option<&WatermarkPayload>,
1094) -> Result<Option<ValidatedWatermarkPayload>, HttpResponse> {
1095    let Some(wm) = payload else {
1096        return Ok(None);
1097    };
1098    let url = wm.url.as_deref().filter(|u| !u.is_empty()).ok_or_else(|| {
1099        bad_request_response("watermark.url is required when watermark is present")
1100    })?;
1101
1102    let position = parse_optional_named(
1103        wm.position.as_deref(),
1104        "watermark.position",
1105        Position::from_str,
1106    )?
1107    .unwrap_or(WATERMARK_DEFAULT_POSITION);
1108
1109    let opacity = wm.opacity.unwrap_or(WATERMARK_DEFAULT_OPACITY);
1110    if opacity == 0 || opacity > 100 {
1111        return Err(bad_request_response(
1112            "watermark.opacity must be between 1 and 100",
1113        ));
1114    }
1115    let margin = wm.margin.unwrap_or(WATERMARK_DEFAULT_MARGIN);
1116    if margin > WATERMARK_MAX_MARGIN {
1117        return Err(bad_request_response(
1118            "watermark.margin must be at most 9999",
1119        ));
1120    }
1121
1122    Ok(Some(ValidatedWatermarkPayload {
1123        url: url.to_string(),
1124        position,
1125        opacity,
1126        margin,
1127    }))
1128}
1129
1130/// Fetches watermark image and builds WatermarkInput. Called after try_acquire.
1131fn fetch_watermark(
1132    validated: ValidatedWatermarkPayload,
1133    config: &ServerConfig,
1134    deadline: Option<Instant>,
1135) -> Result<WatermarkInput, HttpResponse> {
1136    let bytes = read_remote_watermark_bytes(&validated.url, config, deadline)?;
1137    let artifact = sniff_artifact(RawArtifact::new(bytes, None))
1138        .map_err(|error| bad_request_response(&format!("watermark image is invalid: {error}")))?;
1139    if !artifact.media_type.is_raster() {
1140        return Err(bad_request_response(
1141            "watermark image must be a raster format (not SVG)",
1142        ));
1143    }
1144    Ok(WatermarkInput {
1145        image: artifact,
1146        position: validated.position,
1147        opacity: validated.opacity,
1148        margin: validated.margin,
1149    })
1150}
1151
1152fn resolve_multipart_watermark(
1153    bytes: Vec<u8>,
1154    position: Option<String>,
1155    opacity: Option<u8>,
1156    margin: Option<u32>,
1157) -> Result<WatermarkInput, HttpResponse> {
1158    let artifact = sniff_artifact(RawArtifact::new(bytes, None))
1159        .map_err(|error| bad_request_response(&format!("watermark image is invalid: {error}")))?;
1160    if !artifact.media_type.is_raster() {
1161        return Err(bad_request_response(
1162            "watermark image must be a raster format (not SVG)",
1163        ));
1164    }
1165    let position = parse_optional_named(
1166        position.as_deref(),
1167        "watermark_position",
1168        Position::from_str,
1169    )?
1170    .unwrap_or(WATERMARK_DEFAULT_POSITION);
1171    let opacity = opacity.unwrap_or(WATERMARK_DEFAULT_OPACITY);
1172    if opacity == 0 || opacity > 100 {
1173        return Err(bad_request_response(
1174            "watermark_opacity must be between 1 and 100",
1175        ));
1176    }
1177    let margin = margin.unwrap_or(WATERMARK_DEFAULT_MARGIN);
1178    if margin > WATERMARK_MAX_MARGIN {
1179        return Err(bad_request_response(
1180            "watermark_margin must be at most 9999",
1181        ));
1182    }
1183    Ok(WatermarkInput {
1184        image: artifact,
1185        position,
1186        opacity,
1187        margin,
1188    })
1189}
1190
1191struct AccessLogEntry<'a> {
1192    request_id: &'a str,
1193    method: &'a str,
1194    path: &'a str,
1195    route: &'a str,
1196    status: &'a str,
1197    start: Instant,
1198    cache_status: Option<&'a str>,
1199    watermark: bool,
1200}
1201
1202/// Extracts the `X-Request-Id` header value from request headers.
1203/// Returns `None` if the header is absent, empty, or contains
1204/// characters unsafe for HTTP headers (CR, LF, NUL).
1205fn extract_request_id(headers: &[(String, String)]) -> Option<String> {
1206    headers.iter().find_map(|(name, value)| {
1207        if name != "x-request-id" || value.is_empty() {
1208            return None;
1209        }
1210        if value
1211            .bytes()
1212            .any(|b| b == b'\r' || b == b'\n' || b == b'\0')
1213        {
1214            return None;
1215        }
1216        Some(value.clone())
1217    })
1218}
1219
1220/// Classifies the `Cache-Status` response header as `"hit"` or `"miss"`.
1221/// Returns `None` when the header is absent.
1222fn extract_cache_status(headers: &[(String, String)]) -> Option<&'static str> {
1223    headers
1224        .iter()
1225        .find_map(|(name, value)| (name == "Cache-Status").then_some(value.as_str()))
1226        .map(|v| if v.contains("hit") { "hit" } else { "miss" })
1227}
1228
1229/// Extracts and removes the internal `X-Truss-Watermark` header, returning whether it was set.
1230fn extract_watermark_flag(headers: &mut Vec<(String, String)>) -> bool {
1231    let pos = headers
1232        .iter()
1233        .position(|(name, _)| name == "X-Truss-Watermark");
1234    if let Some(idx) = pos {
1235        headers.swap_remove(idx);
1236        true
1237    } else {
1238        false
1239    }
1240}
1241
1242fn emit_access_log(config: &ServerConfig, entry: &AccessLogEntry<'_>) {
1243    config.log(
1244        &json!({
1245            "kind": "access_log",
1246            "request_id": entry.request_id,
1247            "method": entry.method,
1248            "path": entry.path,
1249            "route": entry.route,
1250            "status": entry.status,
1251            "latency_ms": entry.start.elapsed().as_millis() as u64,
1252            "cache_status": entry.cache_status,
1253            "watermark": entry.watermark,
1254        })
1255        .to_string(),
1256    );
1257}
1258
1259fn handle_stream(mut stream: TcpStream, config: &ServerConfig) -> io::Result<()> {
1260    // Prevent slow or stalled clients from blocking the accept loop indefinitely.
1261    if let Err(err) = stream.set_read_timeout(Some(SOCKET_READ_TIMEOUT)) {
1262        config.log_warn(&format!("failed to set socket read timeout: {err}"));
1263    }
1264    if let Err(err) = stream.set_write_timeout(Some(SOCKET_WRITE_TIMEOUT)) {
1265        config.log_warn(&format!("failed to set socket write timeout: {err}"));
1266    }
1267
1268    // Extract the peer IP once for rate limiting. If peer_addr fails
1269    // (e.g. the socket was already closed), skip rate limiting for this
1270    // connection rather than rejecting it.
1271    let peer_ip = stream.peer_addr().ok().map(|addr| addr.ip());
1272
1273    let mut requests_served: u64 = 0;
1274
1275    loop {
1276        let partial = match read_request_headers(&mut stream, config.max_upload_bytes) {
1277            Ok(partial) => partial,
1278            Err(response) => {
1279                if requests_served > 0 {
1280                    return Ok(());
1281                }
1282                let _ = write_response(&mut stream, response, true);
1283                return Ok(());
1284            }
1285        };
1286
1287        // Start timing after headers are read so latency reflects server
1288        // processing time, not client send / socket-wait time.
1289        let start = Instant::now();
1290
1291        let request_id =
1292            extract_request_id(&partial.headers).unwrap_or_else(|| Uuid::new_v4().to_string());
1293
1294        // --- Per-IP rate limiting ---
1295        if let (Some(limiter), Some(ip)) = (&config.rate_limiter, peer_ip)
1296            && !limiter.check(ip)
1297        {
1298            let mut response = too_many_requests_response("rate limit exceeded — try again later");
1299            response
1300                .headers
1301                .push(("X-Request-Id".to_string(), request_id.clone()));
1302            record_http_metrics(RouteMetric::Unknown, response.status);
1303            let sc = status_code(response.status).unwrap_or("unknown");
1304            let method_log = partial.method.clone();
1305            let path_log = partial.path().to_string();
1306            let _ = write_response(&mut stream, response, true);
1307            record_http_request_duration(RouteMetric::Unknown, start);
1308            emit_access_log(
1309                config,
1310                &AccessLogEntry {
1311                    request_id: &request_id,
1312                    method: &method_log,
1313                    path: &path_log,
1314                    route: &path_log,
1315                    status: sc,
1316                    start,
1317                    cache_status: None,
1318                    watermark: false,
1319                },
1320            );
1321            return Ok(());
1322        }
1323
1324        let client_wants_close = partial
1325            .headers
1326            .iter()
1327            .any(|(name, value)| name == "connection" && value.eq_ignore_ascii_case("close"));
1328
1329        let accepts_gzip = config.enable_compression
1330            && http_parse::header_value(&partial.headers, "accept-encoding")
1331                .is_some_and(|v| http_parse::accepts_encoding(v, "gzip"));
1332
1333        let is_head = partial.method == "HEAD";
1334
1335        let requires_auth = matches!(
1336            (partial.method.as_str(), partial.path()),
1337            ("POST", "/images:transform") | ("POST", "/images")
1338        );
1339        if requires_auth
1340            && let Err(mut response) = authorize_request_headers(&partial.headers, config)
1341        {
1342            response
1343                .headers
1344                .push(("X-Request-Id".to_string(), request_id.clone()));
1345            record_http_metrics(RouteMetric::Unknown, response.status);
1346            let sc = status_code(response.status).unwrap_or("unknown");
1347            let method_log = partial.method.clone();
1348            let path_log = partial.path().to_string();
1349            let _ = write_response_compressed(
1350                &mut stream,
1351                response,
1352                true,
1353                accepts_gzip,
1354                config.compression_level,
1355            );
1356            record_http_request_duration(RouteMetric::Unknown, start);
1357            emit_access_log(
1358                config,
1359                &AccessLogEntry {
1360                    request_id: &request_id,
1361                    method: &method_log,
1362                    path: &path_log,
1363                    route: &path_log,
1364                    status: sc,
1365                    start,
1366                    cache_status: None,
1367                    watermark: false,
1368                },
1369            );
1370            return Ok(());
1371        }
1372
1373        // Early-reject /metrics requests before draining the body so that
1374        // unauthenticated or disabled-metrics requests do not force a body read.
1375        if matches!(
1376            (partial.method.as_str(), partial.path()),
1377            ("GET" | "HEAD", "/metrics")
1378        ) {
1379            let early_response = if config.disable_metrics {
1380                Some(HttpResponse::problem(
1381                    "404 Not Found",
1382                    NOT_FOUND_BODY.as_bytes().to_vec(),
1383                ))
1384            } else if let Some(expected) = &config.metrics_token {
1385                let provided = http_parse::header_value(&partial.headers, "authorization")
1386                    .and_then(|value| {
1387                        let (scheme, token) = value.split_once(|c: char| c.is_whitespace())?;
1388                        scheme.eq_ignore_ascii_case("Bearer").then(|| token.trim())
1389                    });
1390                match provided {
1391                    Some(token) if token.as_bytes().ct_eq(expected.as_bytes()).into() => None,
1392                    _ => Some(response::auth_required_response(
1393                        "metrics endpoint requires authentication",
1394                    )),
1395                }
1396            } else {
1397                None
1398            };
1399
1400            if let Some(mut response) = early_response {
1401                response
1402                    .headers
1403                    .push(("X-Request-Id".to_string(), request_id.clone()));
1404                record_http_metrics(RouteMetric::Metrics, response.status);
1405                let sc = status_code(response.status).unwrap_or("unknown");
1406                let method_log = partial.method.clone();
1407                let path_log = partial.path().to_string();
1408                let _ = write_response_compressed(
1409                    &mut stream,
1410                    response,
1411                    true,
1412                    accepts_gzip,
1413                    config.compression_level,
1414                );
1415                record_http_request_duration(RouteMetric::Metrics, start);
1416                emit_access_log(
1417                    config,
1418                    &AccessLogEntry {
1419                        request_id: &request_id,
1420                        method: &method_log,
1421                        path: &path_log,
1422                        route: "/metrics",
1423                        status: sc,
1424                        start,
1425                        cache_status: None,
1426                        watermark: false,
1427                    },
1428                );
1429                return Ok(());
1430            }
1431        }
1432
1433        // Clone method/path before `read_request_body` consumes `partial`.
1434        let method = partial.method.clone();
1435        let path = partial.path().to_string();
1436
1437        let request = match read_request_body(&mut stream, partial) {
1438            Ok(request) => request,
1439            Err(mut response) => {
1440                response
1441                    .headers
1442                    .push(("X-Request-Id".to_string(), request_id.clone()));
1443                record_http_metrics(RouteMetric::Unknown, response.status);
1444                let sc = status_code(response.status).unwrap_or("unknown");
1445                let _ = write_response_compressed(
1446                    &mut stream,
1447                    response,
1448                    true,
1449                    accepts_gzip,
1450                    config.compression_level,
1451                );
1452                record_http_request_duration(RouteMetric::Unknown, start);
1453                emit_access_log(
1454                    config,
1455                    &AccessLogEntry {
1456                        request_id: &request_id,
1457                        method: &method,
1458                        path: &path,
1459                        route: &path,
1460                        status: sc,
1461                        start,
1462                        cache_status: None,
1463                        watermark: false,
1464                    },
1465                );
1466                return Ok(());
1467            }
1468        };
1469        let route = classify_route(&request);
1470        let mut response = route_request(request, config);
1471        record_http_metrics(route, response.status);
1472
1473        response
1474            .headers
1475            .push(("X-Request-Id".to_string(), request_id.clone()));
1476
1477        let cache_status = extract_cache_status(&response.headers);
1478        let had_watermark = extract_watermark_flag(&mut response.headers);
1479
1480        let sc = status_code(response.status).unwrap_or("unknown");
1481
1482        if is_head {
1483            response.body = Vec::new();
1484        }
1485
1486        requests_served += 1;
1487        let close_after = client_wants_close || requests_served >= config.keep_alive_max_requests;
1488
1489        write_response_compressed(
1490            &mut stream,
1491            response,
1492            close_after,
1493            accepts_gzip,
1494            config.compression_level,
1495        )?;
1496        record_http_request_duration(route, start);
1497
1498        emit_access_log(
1499            config,
1500            &AccessLogEntry {
1501                request_id: &request_id,
1502                method: &method,
1503                path: &path,
1504                route: route.as_label(),
1505                status: sc,
1506                start,
1507                cache_status,
1508                watermark: had_watermark,
1509            },
1510        );
1511
1512        if close_after {
1513            return Ok(());
1514        }
1515    }
1516}
1517
1518fn route_request(request: HttpRequest, config: &ServerConfig) -> HttpResponse {
1519    let method = request.method.clone();
1520    let path = request.path().to_string();
1521
1522    match (method.as_str(), path.as_str()) {
1523        ("GET" | "HEAD", "/health") => handle_health(config),
1524        ("GET" | "HEAD", "/health/live") => handle_health_live(),
1525        ("GET" | "HEAD", "/health/ready") => handle_health_ready(config),
1526        ("GET" | "HEAD", "/images/by-path") => handle_public_path_request(request, config),
1527        ("GET" | "HEAD", "/images/by-url") => handle_public_url_request(request, config),
1528        ("POST", "/images:transform") => handle_transform_request(request, config),
1529        ("POST", "/images") => handle_upload_request(request, config),
1530        ("GET" | "HEAD", "/metrics") => handle_metrics_request(request, config),
1531        _ => HttpResponse::problem("404 Not Found", NOT_FOUND_BODY.as_bytes().to_vec()),
1532    }
1533}
1534
1535fn classify_route(request: &HttpRequest) -> RouteMetric {
1536    match (request.method.as_str(), request.path()) {
1537        ("GET" | "HEAD", "/health") => RouteMetric::Health,
1538        ("GET" | "HEAD", "/health/live") => RouteMetric::HealthLive,
1539        ("GET" | "HEAD", "/health/ready") => RouteMetric::HealthReady,
1540        ("GET" | "HEAD", "/images/by-path") => RouteMetric::PublicByPath,
1541        ("GET" | "HEAD", "/images/by-url") => RouteMetric::PublicByUrl,
1542        ("POST", "/images:transform") => RouteMetric::Transform,
1543        ("POST", "/images") => RouteMetric::Upload,
1544        ("GET" | "HEAD", "/metrics") => RouteMetric::Metrics,
1545        _ => RouteMetric::Unknown,
1546    }
1547}
1548
1549fn handle_transform_request(request: HttpRequest, config: &ServerConfig) -> HttpResponse {
1550    let request_deadline = Some(Instant::now() + Duration::from_secs(REQUEST_DEADLINE_SECS));
1551
1552    if let Err(response) = authorize_request(&request, config) {
1553        return response;
1554    }
1555
1556    if !request_has_json_content_type(&request) {
1557        return unsupported_media_type_response("content-type must be application/json");
1558    }
1559
1560    let payload: TransformImageRequestPayload = match serde_json::from_slice(&request.body) {
1561        Ok(payload) => payload,
1562        Err(error) => {
1563            return bad_request_response(&format!("request body must be valid JSON: {error}"));
1564        }
1565    };
1566    let options = match payload.options.into_options() {
1567        Ok(options) => options,
1568        Err(response) => return response,
1569    };
1570
1571    let versioned_hash = payload.source.versioned_source_hash(config);
1572    let validated_wm = match validate_watermark_payload(payload.watermark.as_ref()) {
1573        Ok(wm) => wm,
1574        Err(response) => return response,
1575    };
1576    let watermark_id = validated_wm.as_ref().map(|v| v.cache_identity());
1577
1578    if let Some(response) = try_versioned_cache_lookup(
1579        versioned_hash.as_deref(),
1580        &options,
1581        &request,
1582        ImageResponsePolicy::PrivateTransform,
1583        config,
1584        watermark_id.as_deref(),
1585    ) {
1586        return response;
1587    }
1588
1589    let storage_start = Instant::now();
1590    let backend_label = payload.source.metrics_backend_label(config);
1591    let backend_idx = backend_label.map(|l| storage_backend_index_from_config(&l));
1592    let source_bytes = match resolve_source_bytes(payload.source, config, request_deadline) {
1593        Ok(bytes) => {
1594            if let Some(idx) = backend_idx {
1595                record_storage_duration(idx, storage_start);
1596            }
1597            bytes
1598        }
1599        Err(response) => {
1600            if let Some(idx) = backend_idx {
1601                record_storage_duration(idx, storage_start);
1602            }
1603            return response;
1604        }
1605    };
1606    transform_source_bytes(
1607        source_bytes,
1608        options,
1609        versioned_hash.as_deref(),
1610        &request,
1611        ImageResponsePolicy::PrivateTransform,
1612        config,
1613        WatermarkSource::from_validated(validated_wm),
1614        watermark_id.as_deref(),
1615        request_deadline,
1616    )
1617}
1618
1619fn handle_public_path_request(request: HttpRequest, config: &ServerConfig) -> HttpResponse {
1620    handle_public_get_request(request, config, PublicSourceKind::Path)
1621}
1622
1623fn handle_public_url_request(request: HttpRequest, config: &ServerConfig) -> HttpResponse {
1624    handle_public_get_request(request, config, PublicSourceKind::Url)
1625}
1626
1627fn handle_public_get_request(
1628    request: HttpRequest,
1629    config: &ServerConfig,
1630    source_kind: PublicSourceKind,
1631) -> HttpResponse {
1632    let request_deadline = Some(Instant::now() + Duration::from_secs(REQUEST_DEADLINE_SECS));
1633    let query = match parse_query_params(&request) {
1634        Ok(query) => query,
1635        Err(response) => return response,
1636    };
1637    if let Err(response) = authorize_signed_request(&request, &query, config) {
1638        return response;
1639    }
1640    let (source, options, watermark_payload) =
1641        match parse_public_get_request(&query, source_kind, config) {
1642            Ok(parsed) => parsed,
1643            Err(response) => return response,
1644        };
1645
1646    let validated_wm = match validate_watermark_payload(watermark_payload.as_ref()) {
1647        Ok(wm) => wm,
1648        Err(response) => return response,
1649    };
1650    let watermark_id = validated_wm.as_ref().map(|v| v.cache_identity());
1651
1652    // When the storage backend is object storage (S3 or GCS), convert Path
1653    // sources to Storage sources so that the `path` query parameter is
1654    // resolved as an object key.
1655    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
1656    let source = if is_object_storage_backend(config) {
1657        match source {
1658            TransformSourcePayload::Path { path, version } => TransformSourcePayload::Storage {
1659                bucket: None,
1660                key: path.trim_start_matches('/').to_string(),
1661                version,
1662            },
1663            other => other,
1664        }
1665    } else {
1666        source
1667    };
1668
1669    let versioned_hash = source.versioned_source_hash(config);
1670    if let Some(response) = try_versioned_cache_lookup(
1671        versioned_hash.as_deref(),
1672        &options,
1673        &request,
1674        ImageResponsePolicy::PublicGet,
1675        config,
1676        watermark_id.as_deref(),
1677    ) {
1678        return response;
1679    }
1680
1681    let storage_start = Instant::now();
1682    let backend_label = source.metrics_backend_label(config);
1683    let backend_idx = backend_label.map(|l| storage_backend_index_from_config(&l));
1684    let source_bytes = match resolve_source_bytes(source, config, request_deadline) {
1685        Ok(bytes) => {
1686            if let Some(idx) = backend_idx {
1687                record_storage_duration(idx, storage_start);
1688            }
1689            bytes
1690        }
1691        Err(response) => {
1692            if let Some(idx) = backend_idx {
1693                record_storage_duration(idx, storage_start);
1694            }
1695            return response;
1696        }
1697    };
1698
1699    transform_source_bytes(
1700        source_bytes,
1701        options,
1702        versioned_hash.as_deref(),
1703        &request,
1704        ImageResponsePolicy::PublicGet,
1705        config,
1706        WatermarkSource::from_validated(validated_wm),
1707        watermark_id.as_deref(),
1708        request_deadline,
1709    )
1710}
1711
1712fn handle_upload_request(request: HttpRequest, config: &ServerConfig) -> HttpResponse {
1713    if let Err(response) = authorize_request(&request, config) {
1714        return response;
1715    }
1716
1717    let boundary = match parse_multipart_boundary(&request) {
1718        Ok(boundary) => boundary,
1719        Err(response) => return response,
1720    };
1721    let (file_bytes, options, watermark) = match parse_upload_request(&request.body, &boundary) {
1722        Ok(parts) => parts,
1723        Err(response) => return response,
1724    };
1725    let watermark_identity = watermark.as_ref().map(|wm| {
1726        let content_hash = hex::encode(sha2::Sha256::digest(&wm.image.bytes));
1727        cache::compute_watermark_content_identity(
1728            &content_hash,
1729            wm.position.as_name(),
1730            wm.opacity,
1731            wm.margin,
1732        )
1733    });
1734    transform_source_bytes(
1735        file_bytes,
1736        options,
1737        None,
1738        &request,
1739        ImageResponsePolicy::PrivateTransform,
1740        config,
1741        WatermarkSource::from_ready(watermark),
1742        watermark_identity.as_deref(),
1743        None,
1744    )
1745}
1746
1747/// Returns the number of free bytes on the filesystem containing `path`,
1748/// or `None` if the query fails.
1749#[cfg(target_os = "linux")]
1750fn disk_free_bytes(path: &std::path::Path) -> Option<u64> {
1751    use std::ffi::CString;
1752
1753    let c_path = CString::new(path.to_str()?).ok()?;
1754    let mut stat: libc::statvfs = unsafe { std::mem::zeroed() };
1755    let ret = unsafe { libc::statvfs(c_path.as_ptr(), &mut stat) };
1756    if ret == 0 {
1757        stat.f_bavail.checked_mul(stat.f_frsize)
1758    } else {
1759        None
1760    }
1761}
1762
1763#[cfg(not(target_os = "linux"))]
1764fn disk_free_bytes(_path: &std::path::Path) -> Option<u64> {
1765    None
1766}
1767
1768/// Returns the current process RSS (Resident Set Size) in bytes by reading
1769/// `/proc/self/status`. Returns `None` on non-Linux platforms or on read failure.
1770#[cfg(target_os = "linux")]
1771fn process_rss_bytes() -> Option<u64> {
1772    let status = std::fs::read_to_string("/proc/self/status").ok()?;
1773    for line in status.lines() {
1774        if let Some(value) = line.strip_prefix("VmRSS:") {
1775            let value = value.trim();
1776            // Format: "123456 kB"
1777            let kb_str = value.strip_suffix(" kB")?.trim();
1778            let kb: u64 = kb_str.parse().ok()?;
1779            return kb.checked_mul(1024);
1780        }
1781    }
1782    None
1783}
1784
1785#[cfg(not(target_os = "linux"))]
1786fn process_rss_bytes() -> Option<u64> {
1787    None
1788}
1789
1790/// Returns a minimal liveness response confirming the process is running.
1791fn handle_health_live() -> HttpResponse {
1792    let body = serde_json::to_vec(&json!({
1793        "status": "ok",
1794        "service": "truss",
1795        "version": env!("CARGO_PKG_VERSION"),
1796    }))
1797    .expect("serialize liveness");
1798    let mut body = body;
1799    body.push(b'\n');
1800    HttpResponse::json("200 OK", body)
1801}
1802
1803/// Returns a readiness response after checking that critical infrastructure
1804/// dependencies are available (storage root, cache root if configured, S3
1805/// reachability) and configurable resource thresholds.
1806fn handle_health_ready(config: &ServerConfig) -> HttpResponse {
1807    let mut checks: Vec<serde_json::Value> = Vec::new();
1808    let mut all_ok = true;
1809
1810    // When the server is draining (shutdown signal received), immediately
1811    // report not-ready so that load balancers stop routing traffic.
1812    // Skip expensive probes (storage, disk, memory) — they are irrelevant
1813    // once the process is shutting down.
1814    if config.draining.load(Ordering::Relaxed) {
1815        let mut body = serde_json::to_vec(&json!({
1816            "status": "fail",
1817            "checks": [{ "name": "draining", "status": "fail" }],
1818        }))
1819        .expect("serialize readiness");
1820        body.push(b'\n');
1821        return HttpResponse::problem("503 Service Unavailable", body);
1822    }
1823
1824    for (ok, name) in storage_health_check(config) {
1825        checks.push(json!({
1826            "name": name,
1827            "status": if ok { "ok" } else { "fail" },
1828        }));
1829        if !ok {
1830            all_ok = false;
1831        }
1832    }
1833
1834    if let Some(cache_root) = &config.cache_root {
1835        let cache_ok = cache_root.is_dir();
1836        checks.push(json!({
1837            "name": "cacheRoot",
1838            "status": if cache_ok { "ok" } else { "fail" },
1839        }));
1840        if !cache_ok {
1841            all_ok = false;
1842        }
1843    }
1844
1845    if let Some(cache_root) = &config.cache_root {
1846        let free = disk_free_bytes(cache_root);
1847        let threshold = config.health_cache_min_free_bytes;
1848        let disk_ok = match (free, threshold) {
1849            (Some(f), Some(min)) => f >= min,
1850            _ => true,
1851        };
1852        let mut check = json!({
1853            "name": "cacheDiskFree",
1854            "status": if disk_ok { "ok" } else { "fail" },
1855        });
1856        if let Some(f) = free {
1857            check["freeBytes"] = json!(f);
1858        }
1859        if let Some(min) = threshold {
1860            check["thresholdBytes"] = json!(min);
1861        }
1862        checks.push(check);
1863        if !disk_ok {
1864            all_ok = false;
1865        }
1866    }
1867
1868    // Concurrency utilization
1869    let in_flight = config.transforms_in_flight.load(Ordering::Relaxed);
1870    let overloaded = in_flight >= config.max_concurrent_transforms;
1871    checks.push(json!({
1872        "name": "transformCapacity",
1873        "status": if overloaded { "fail" } else { "ok" },
1874        "current": in_flight,
1875        "max": config.max_concurrent_transforms,
1876    }));
1877    if overloaded {
1878        all_ok = false;
1879    }
1880
1881    // Memory usage (Linux only) — skip entirely when RSS is unavailable
1882    if let Some(rss_bytes) = process_rss_bytes() {
1883        let threshold = config.health_max_memory_bytes;
1884        let mem_ok = threshold.is_none_or(|max| rss_bytes <= max);
1885        let mut check = json!({
1886            "name": "memoryUsage",
1887            "status": if mem_ok { "ok" } else { "fail" },
1888            "rssBytes": rss_bytes,
1889        });
1890        if let Some(max) = threshold {
1891            check["thresholdBytes"] = json!(max);
1892        }
1893        checks.push(check);
1894        if !mem_ok {
1895            all_ok = false;
1896        }
1897    }
1898
1899    let status_str = if all_ok { "ok" } else { "fail" };
1900    let mut body = serde_json::to_vec(&json!({
1901        "status": status_str,
1902        "checks": checks,
1903    }))
1904    .expect("serialize readiness");
1905    body.push(b'\n');
1906
1907    if all_ok {
1908        HttpResponse::json("200 OK", body)
1909    } else {
1910        HttpResponse::json("503 Service Unavailable", body)
1911    }
1912}
1913
1914/// Returns a comprehensive diagnostic health response.
1915fn storage_health_check(config: &ServerConfig) -> Vec<(bool, &'static str)> {
1916    #[allow(unused_mut)]
1917    let mut checks = vec![(config.storage_root.is_dir(), "storageRoot")];
1918    #[cfg(feature = "s3")]
1919    if config.storage_backend == StorageBackend::S3 {
1920        let reachable = config
1921            .s3_context
1922            .as_ref()
1923            .is_some_and(|ctx| ctx.check_reachable());
1924        checks.push((reachable, "storageBackend"));
1925    }
1926    #[cfg(feature = "gcs")]
1927    if config.storage_backend == StorageBackend::Gcs {
1928        let reachable = config
1929            .gcs_context
1930            .as_ref()
1931            .is_some_and(|ctx| ctx.check_reachable());
1932        checks.push((reachable, "storageBackend"));
1933    }
1934    #[cfg(feature = "azure")]
1935    if config.storage_backend == StorageBackend::Azure {
1936        let reachable = config
1937            .azure_context
1938            .as_ref()
1939            .is_some_and(|ctx| ctx.check_reachable());
1940        checks.push((reachable, "storageBackend"));
1941    }
1942    checks
1943}
1944
1945fn handle_health(config: &ServerConfig) -> HttpResponse {
1946    let mut checks: Vec<serde_json::Value> = Vec::new();
1947    let mut all_ok = true;
1948
1949    for (ok, name) in storage_health_check(config) {
1950        checks.push(json!({
1951            "name": name,
1952            "status": if ok { "ok" } else { "fail" },
1953        }));
1954        if !ok {
1955            all_ok = false;
1956        }
1957    }
1958
1959    if let Some(cache_root) = &config.cache_root {
1960        let cache_ok = cache_root.is_dir();
1961        checks.push(json!({
1962            "name": "cacheRoot",
1963            "status": if cache_ok { "ok" } else { "fail" },
1964        }));
1965        if !cache_ok {
1966            all_ok = false;
1967        }
1968    }
1969
1970    // Cache disk free space
1971    if let Some(cache_root) = &config.cache_root {
1972        let free = disk_free_bytes(cache_root);
1973        let threshold = config.health_cache_min_free_bytes;
1974        let disk_ok = match (free, threshold) {
1975            (Some(f), Some(min)) => f >= min,
1976            _ => true,
1977        };
1978        let mut check = json!({
1979            "name": "cacheDiskFree",
1980            "status": if disk_ok { "ok" } else { "fail" },
1981        });
1982        if let Some(f) = free {
1983            check["freeBytes"] = json!(f);
1984        }
1985        if let Some(min) = threshold {
1986            check["thresholdBytes"] = json!(min);
1987        }
1988        checks.push(check);
1989        if !disk_ok {
1990            all_ok = false;
1991        }
1992    }
1993
1994    // Concurrency utilization
1995    let in_flight = config.transforms_in_flight.load(Ordering::Relaxed);
1996    let overloaded = in_flight >= config.max_concurrent_transforms;
1997    checks.push(json!({
1998        "name": "transformCapacity",
1999        "status": if overloaded { "fail" } else { "ok" },
2000        "current": in_flight,
2001        "max": config.max_concurrent_transforms,
2002    }));
2003    if overloaded {
2004        all_ok = false;
2005    }
2006
2007    // Memory usage (Linux only)
2008    let rss = process_rss_bytes();
2009    if let Some(rss_bytes) = rss {
2010        let threshold = config.health_max_memory_bytes;
2011        let mem_ok = threshold.is_none_or(|max| rss_bytes <= max);
2012        let mut check = json!({
2013            "name": "memoryUsage",
2014            "status": if mem_ok { "ok" } else { "fail" },
2015            "rssBytes": rss_bytes,
2016        });
2017        if let Some(max) = threshold {
2018            check["thresholdBytes"] = json!(max);
2019        }
2020        checks.push(check);
2021        if !mem_ok {
2022            all_ok = false;
2023        }
2024    }
2025
2026    let status_str = if all_ok { "ok" } else { "fail" };
2027    let mut body = serde_json::to_vec(&json!({
2028        "status": status_str,
2029        "service": "truss",
2030        "version": env!("CARGO_PKG_VERSION"),
2031        "uptimeSeconds": uptime_seconds(),
2032        "checks": checks,
2033        "maxInputPixels": config.max_input_pixels,
2034    }))
2035    .expect("serialize health");
2036    body.push(b'\n');
2037
2038    HttpResponse::json("200 OK", body)
2039}
2040
2041fn handle_metrics_request(request: HttpRequest, config: &ServerConfig) -> HttpResponse {
2042    if config.disable_metrics {
2043        return HttpResponse::problem("404 Not Found", NOT_FOUND_BODY.as_bytes().to_vec());
2044    }
2045
2046    if let Some(expected) = &config.metrics_token {
2047        let provided = request.header("authorization").and_then(|value| {
2048            let (scheme, token) = value.split_once(|c: char| c.is_whitespace())?;
2049            scheme.eq_ignore_ascii_case("Bearer").then(|| token.trim())
2050        });
2051        match provided {
2052            Some(token) if token.as_bytes().ct_eq(expected.as_bytes()).into() => {}
2053            _ => {
2054                return response::auth_required_response(
2055                    "metrics endpoint requires authentication",
2056                );
2057            }
2058        }
2059    }
2060
2061    HttpResponse::text(
2062        "200 OK",
2063        "text/plain; version=0.0.4; charset=utf-8",
2064        render_metrics_text(
2065            config.max_concurrent_transforms,
2066            &config.transforms_in_flight,
2067        )
2068        .into_bytes(),
2069    )
2070}
2071
2072fn parse_public_get_request(
2073    query: &BTreeMap<String, String>,
2074    source_kind: PublicSourceKind,
2075    config: &ServerConfig,
2076) -> Result<
2077    (
2078        TransformSourcePayload,
2079        TransformOptions,
2080        Option<WatermarkPayload>,
2081    ),
2082    HttpResponse,
2083> {
2084    validate_public_query_names(query, source_kind)?;
2085
2086    let source = match source_kind {
2087        PublicSourceKind::Path => TransformSourcePayload::Path {
2088            path: required_query_param(query, "path")?.to_string(),
2089            version: query.get("version").cloned(),
2090        },
2091        PublicSourceKind::Url => TransformSourcePayload::Url {
2092            url: required_query_param(query, "url")?.to_string(),
2093            version: query.get("version").cloned(),
2094        },
2095    };
2096
2097    let has_orphaned_watermark_params = query.contains_key("watermarkPosition")
2098        || query.contains_key("watermarkOpacity")
2099        || query.contains_key("watermarkMargin");
2100    let watermark = if query.contains_key("watermarkUrl") {
2101        Some(WatermarkPayload {
2102            url: query.get("watermarkUrl").cloned(),
2103            position: query.get("watermarkPosition").cloned(),
2104            opacity: parse_optional_u8_query(query, "watermarkOpacity")?,
2105            margin: parse_optional_integer_query(query, "watermarkMargin")?,
2106        })
2107    } else if has_orphaned_watermark_params {
2108        return Err(bad_request_response(
2109            "watermarkPosition, watermarkOpacity, and watermarkMargin require watermarkUrl",
2110        ));
2111    } else {
2112        None
2113    };
2114
2115    // Build per-request overrides from query parameters.
2116    let per_request = TransformOptionsPayload {
2117        width: parse_optional_integer_query(query, "width")?,
2118        height: parse_optional_integer_query(query, "height")?,
2119        fit: query.get("fit").cloned(),
2120        position: query.get("position").cloned(),
2121        format: query.get("format").cloned(),
2122        quality: parse_optional_u8_query(query, "quality")?,
2123        background: query.get("background").cloned(),
2124        rotate: query
2125            .get("rotate")
2126            .map(|v| v.parse::<u16>())
2127            .transpose()
2128            .map_err(|_| bad_request_response("rotate must be 0, 90, 180, or 270"))?,
2129        auto_orient: parse_optional_bool_query(query, "autoOrient")?,
2130        strip_metadata: parse_optional_bool_query(query, "stripMetadata")?,
2131        preserve_exif: parse_optional_bool_query(query, "preserveExif")?,
2132        crop: query.get("crop").cloned(),
2133        blur: parse_optional_float_query(query, "blur")?,
2134        sharpen: parse_optional_float_query(query, "sharpen")?,
2135    };
2136
2137    // Resolve preset and merge with per-request overrides.
2138    let merged = if let Some(preset_name) = query.get("preset") {
2139        let presets = config.presets.read().expect("presets lock poisoned");
2140        let preset = presets
2141            .get(preset_name)
2142            .ok_or_else(|| bad_request_response(&format!("unknown preset `{preset_name}`")))?;
2143        preset.clone().with_overrides(&per_request)
2144    } else {
2145        per_request
2146    };
2147
2148    let options = merged.into_options()?;
2149
2150    Ok((source, options, watermark))
2151}
2152
2153/// Watermark source: either already resolved (multipart upload) or deferred (URL fetch).
2154enum WatermarkSource {
2155    Deferred(ValidatedWatermarkPayload),
2156    Ready(WatermarkInput),
2157    None,
2158}
2159
2160impl WatermarkSource {
2161    fn from_validated(validated: Option<ValidatedWatermarkPayload>) -> Self {
2162        match validated {
2163            Some(v) => Self::Deferred(v),
2164            None => Self::None,
2165        }
2166    }
2167
2168    fn from_ready(input: Option<WatermarkInput>) -> Self {
2169        match input {
2170            Some(w) => Self::Ready(w),
2171            None => Self::None,
2172        }
2173    }
2174
2175    fn is_some(&self) -> bool {
2176        !matches!(self, Self::None)
2177    }
2178}
2179
2180#[allow(clippy::too_many_arguments)]
2181fn transform_source_bytes(
2182    source_bytes: Vec<u8>,
2183    options: TransformOptions,
2184    versioned_hash: Option<&str>,
2185    request: &HttpRequest,
2186    response_policy: ImageResponsePolicy,
2187    config: &ServerConfig,
2188    watermark: WatermarkSource,
2189    watermark_identity: Option<&str>,
2190    request_deadline: Option<Instant>,
2191) -> HttpResponse {
2192    let content_hash;
2193    let source_hash = match versioned_hash {
2194        Some(hash) => hash,
2195        None => {
2196            content_hash = hex::encode(Sha256::digest(&source_bytes));
2197            &content_hash
2198        }
2199    };
2200
2201    let cache = config.cache_root.as_ref().map(|root| {
2202        TransformCache::new(root.clone())
2203            .with_log_handler(config.log_handler.clone())
2204            .with_max_bytes(config.cache_max_bytes)
2205    });
2206
2207    if let Some(ref cache) = cache
2208        && options.format.is_some()
2209    {
2210        let cache_key = compute_cache_key(source_hash, &options, None, watermark_identity);
2211        if let CacheLookup::Hit {
2212            media_type,
2213            body,
2214            age,
2215        } = cache.get(&cache_key)
2216        {
2217            CACHE_HITS_TOTAL.fetch_add(1, Ordering::Relaxed);
2218            let etag = build_image_etag(&body);
2219            let mut headers = build_image_response_headers(
2220                media_type,
2221                &etag,
2222                response_policy,
2223                false,
2224                CacheHitStatus::Hit,
2225                config.public_max_age_seconds,
2226                config.public_stale_while_revalidate_seconds,
2227                &config.custom_response_headers,
2228            );
2229            headers.push(("Age".to_string(), age.as_secs().to_string()));
2230            if matches!(response_policy, ImageResponsePolicy::PublicGet)
2231                && if_none_match_matches(request.header("if-none-match"), &etag)
2232            {
2233                return HttpResponse::empty("304 Not Modified", headers);
2234            }
2235            return HttpResponse::binary_with_headers(
2236                "200 OK",
2237                media_type.as_mime(),
2238                headers,
2239                body,
2240            );
2241        }
2242    }
2243
2244    let _slot = match TransformSlot::try_acquire(
2245        &config.transforms_in_flight,
2246        config.max_concurrent_transforms,
2247    ) {
2248        Some(slot) => slot,
2249        None => return service_unavailable_response("too many concurrent transforms; retry later"),
2250    };
2251    transform_source_bytes_inner(
2252        source_bytes,
2253        options,
2254        request,
2255        response_policy,
2256        cache.as_ref(),
2257        source_hash,
2258        ImageResponseConfig {
2259            disable_accept_negotiation: config.disable_accept_negotiation,
2260            public_cache_control: PublicCacheControl {
2261                max_age: config.public_max_age_seconds,
2262                stale_while_revalidate: config.public_stale_while_revalidate_seconds,
2263            },
2264            transform_deadline: Duration::from_secs(config.transform_deadline_secs),
2265        },
2266        watermark,
2267        watermark_identity,
2268        config,
2269        request_deadline,
2270    )
2271}
2272
2273#[allow(clippy::too_many_arguments)]
2274fn transform_source_bytes_inner(
2275    source_bytes: Vec<u8>,
2276    mut options: TransformOptions,
2277    request: &HttpRequest,
2278    response_policy: ImageResponsePolicy,
2279    cache: Option<&TransformCache>,
2280    source_hash: &str,
2281    response_config: ImageResponseConfig,
2282    watermark_source: WatermarkSource,
2283    watermark_identity: Option<&str>,
2284    config: &ServerConfig,
2285    request_deadline: Option<Instant>,
2286) -> HttpResponse {
2287    if options.deadline.is_none() {
2288        options.deadline = Some(response_config.transform_deadline);
2289    }
2290    let artifact = match sniff_artifact(RawArtifact::new(source_bytes, None)) {
2291        Ok(artifact) => artifact,
2292        Err(error) => {
2293            record_transform_error(&error);
2294            return transform_error_response(error);
2295        }
2296    };
2297    let negotiation_used =
2298        if options.format.is_none() && !response_config.disable_accept_negotiation {
2299            match negotiate_output_format(
2300                request.header("accept"),
2301                &artifact,
2302                &config.format_preference,
2303            ) {
2304                Ok(Some(format)) => {
2305                    options.format = Some(format);
2306                    true
2307                }
2308                Ok(None) => false,
2309                Err(response) => return response,
2310            }
2311        } else {
2312            false
2313        };
2314
2315    // Check input pixel count against the server-level limit before decode.
2316    // This runs before the cache lookup so that a policy change (lowering the
2317    // limit) takes effect immediately, even for previously-cached images.
2318    if let (Some(w), Some(h)) = (artifact.metadata.width, artifact.metadata.height) {
2319        let pixels = u64::from(w) * u64::from(h);
2320        if pixels > config.max_input_pixels {
2321            return response::unprocessable_entity_response(&format!(
2322                "input image has {pixels} pixels, server limit is {}",
2323                config.max_input_pixels
2324            ));
2325        }
2326    }
2327
2328    let negotiated_accept = if negotiation_used {
2329        request.header("accept")
2330    } else {
2331        None
2332    };
2333    let cache_key = compute_cache_key(source_hash, &options, negotiated_accept, watermark_identity);
2334
2335    if let Some(cache) = cache
2336        && let CacheLookup::Hit {
2337            media_type,
2338            body,
2339            age,
2340        } = cache.get(&cache_key)
2341    {
2342        CACHE_HITS_TOTAL.fetch_add(1, Ordering::Relaxed);
2343        let etag = build_image_etag(&body);
2344        let mut headers = build_image_response_headers(
2345            media_type,
2346            &etag,
2347            response_policy,
2348            negotiation_used,
2349            CacheHitStatus::Hit,
2350            response_config.public_cache_control.max_age,
2351            response_config.public_cache_control.stale_while_revalidate,
2352            &config.custom_response_headers,
2353        );
2354        headers.push(("Age".to_string(), age.as_secs().to_string()));
2355        if matches!(response_policy, ImageResponsePolicy::PublicGet)
2356            && if_none_match_matches(request.header("if-none-match"), &etag)
2357        {
2358            return HttpResponse::empty("304 Not Modified", headers);
2359        }
2360        return HttpResponse::binary_with_headers("200 OK", media_type.as_mime(), headers, body);
2361    }
2362
2363    if cache.is_some() {
2364        CACHE_MISSES_TOTAL.fetch_add(1, Ordering::Relaxed);
2365    }
2366
2367    let is_svg = artifact.media_type == MediaType::Svg;
2368
2369    // Resolve watermark: reject SVG+watermark early (before fetch), then fetch if deferred.
2370    let watermark = if is_svg && watermark_source.is_some() {
2371        return bad_request_response("watermark is not supported for SVG source images");
2372    } else {
2373        match watermark_source {
2374            WatermarkSource::Deferred(validated) => {
2375                match fetch_watermark(validated, config, request_deadline) {
2376                    Ok(wm) => {
2377                        record_watermark_transform();
2378                        Some(wm)
2379                    }
2380                    Err(response) => return response,
2381                }
2382            }
2383            WatermarkSource::Ready(wm) => {
2384                record_watermark_transform();
2385                Some(wm)
2386            }
2387            WatermarkSource::None => None,
2388        }
2389    };
2390
2391    let had_watermark = watermark.is_some();
2392
2393    let transform_start = Instant::now();
2394    let mut request_obj = TransformRequest::new(artifact, options);
2395    request_obj.watermark = watermark;
2396    let result = if is_svg {
2397        match transform_svg(request_obj) {
2398            Ok(result) => result,
2399            Err(error) => {
2400                record_transform_error(&error);
2401                return transform_error_response(error);
2402            }
2403        }
2404    } else {
2405        match transform_raster(request_obj) {
2406            Ok(result) => result,
2407            Err(error) => {
2408                record_transform_error(&error);
2409                return transform_error_response(error);
2410            }
2411        }
2412    };
2413    record_transform_duration(result.artifact.media_type, transform_start);
2414
2415    for warning in &result.warnings {
2416        let msg = format!("truss: {warning}");
2417        if let Some(c) = cache
2418            && let Some(handler) = &c.log_handler
2419        {
2420            handler(&msg);
2421        } else {
2422            stderr_write(&msg);
2423        }
2424    }
2425
2426    let output = result.artifact;
2427
2428    if let Some(cache) = cache {
2429        cache.put(&cache_key, output.media_type, &output.bytes);
2430    }
2431
2432    let cache_hit_status = if cache.is_some() {
2433        CacheHitStatus::Miss
2434    } else {
2435        CacheHitStatus::Disabled
2436    };
2437
2438    let etag = build_image_etag(&output.bytes);
2439    let headers = build_image_response_headers(
2440        output.media_type,
2441        &etag,
2442        response_policy,
2443        negotiation_used,
2444        cache_hit_status,
2445        response_config.public_cache_control.max_age,
2446        response_config.public_cache_control.stale_while_revalidate,
2447        &config.custom_response_headers,
2448    );
2449
2450    if matches!(response_policy, ImageResponsePolicy::PublicGet)
2451        && if_none_match_matches(request.header("if-none-match"), &etag)
2452    {
2453        return HttpResponse::empty("304 Not Modified", headers);
2454    }
2455
2456    let mut response = HttpResponse::binary_with_headers(
2457        "200 OK",
2458        output.media_type.as_mime(),
2459        headers,
2460        output.bytes,
2461    );
2462    if had_watermark {
2463        response
2464            .headers
2465            .push(("X-Truss-Watermark".to_string(), "true".to_string()));
2466    }
2467    response
2468}
2469
2470#[cfg(test)]
2471mod tests {
2472    use serial_test::serial;
2473
2474    use super::config::DEFAULT_MAX_CONCURRENT_TRANSFORMS;
2475    use super::config::{
2476        DEFAULT_PUBLIC_MAX_AGE_SECONDS, DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS,
2477        parse_presets_from_env,
2478    };
2479    use super::http_parse::{
2480        DEFAULT_MAX_UPLOAD_BODY_BYTES, HttpRequest, find_header_terminator, read_request_body,
2481        read_request_headers, resolve_storage_path,
2482    };
2483    use super::multipart::parse_multipart_form_data;
2484    use super::remote::{PinnedResolver, prepare_remote_fetch_target};
2485    use super::response::auth_required_response;
2486    use super::response::{HttpResponse, bad_request_response};
2487    use super::{
2488        CacheHitStatus, DEFAULT_BIND_ADDR, ImageResponsePolicy, PublicSourceKind, ServerConfig,
2489        SignedUrlSource, TransformOptionsPayload, TransformSourcePayload, WatermarkSource,
2490        authorize_signed_request, bind_addr, build_image_etag, build_image_response_headers,
2491        canonical_query_without_signature, negotiate_output_format, parse_public_get_request,
2492        route_request, serve_once_with_config, sign_public_url, transform_source_bytes,
2493    };
2494    use crate::{
2495        Artifact, ArtifactMetadata, Fit, MediaType, RawArtifact, TransformOptions, sniff_artifact,
2496    };
2497    use hmac::{Hmac, Mac};
2498    use image::codecs::png::PngEncoder;
2499    use image::{ColorType, ImageEncoder, Rgba, RgbaImage};
2500    use sha2::Sha256;
2501    use std::collections::{BTreeMap, HashMap};
2502    use std::env;
2503    use std::fs;
2504    use std::io::{Cursor, Read, Write};
2505    use std::net::{SocketAddr, TcpListener, TcpStream};
2506    use std::path::{Path, PathBuf};
2507    use std::sync::atomic::Ordering;
2508    use std::thread;
2509    use std::time::{Duration, SystemTime, UNIX_EPOCH};
2510
2511    /// Test-only convenience wrapper that reads headers + body in one shot,
2512    /// preserving the original `read_request` semantics for existing tests.
2513    fn read_request<R: Read>(stream: &mut R) -> Result<HttpRequest, HttpResponse> {
2514        let partial = read_request_headers(stream, DEFAULT_MAX_UPLOAD_BODY_BYTES)?;
2515        read_request_body(stream, partial)
2516    }
2517
2518    fn png_bytes() -> Vec<u8> {
2519        let image = RgbaImage::from_pixel(4, 3, Rgba([10, 20, 30, 255]));
2520        let mut bytes = Vec::new();
2521        PngEncoder::new(&mut bytes)
2522            .write_image(&image, 4, 3, ColorType::Rgba8.into())
2523            .expect("encode png");
2524        bytes
2525    }
2526
2527    fn temp_dir(name: &str) -> PathBuf {
2528        let unique = SystemTime::now()
2529            .duration_since(UNIX_EPOCH)
2530            .expect("current time")
2531            .as_nanos();
2532        let path = std::env::temp_dir().join(format!("truss-server-{name}-{unique}"));
2533        fs::create_dir_all(&path).expect("create temp dir");
2534        path
2535    }
2536
2537    fn write_png(path: &Path) {
2538        fs::write(path, png_bytes()).expect("write png fixture");
2539    }
2540
2541    fn artifact_with_alpha(has_alpha: bool) -> Artifact {
2542        Artifact::new(
2543            png_bytes(),
2544            MediaType::Png,
2545            ArtifactMetadata {
2546                width: Some(4),
2547                height: Some(3),
2548                frame_count: 1,
2549                duration: None,
2550                has_alpha: Some(has_alpha),
2551            },
2552        )
2553    }
2554
2555    fn sign_public_query(
2556        method: &str,
2557        authority: &str,
2558        path: &str,
2559        query: &BTreeMap<String, String>,
2560        secret: &str,
2561    ) -> String {
2562        let canonical = format!(
2563            "{method}\n{authority}\n{path}\n{}",
2564            canonical_query_without_signature(query)
2565        );
2566        let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).expect("create hmac");
2567        mac.update(canonical.as_bytes());
2568        hex::encode(mac.finalize().into_bytes())
2569    }
2570
2571    type FixtureResponse = (String, Vec<(String, String)>, Vec<u8>);
2572
2573    fn read_fixture_request(stream: &mut TcpStream) {
2574        stream
2575            .set_nonblocking(false)
2576            .expect("configure fixture stream blocking mode");
2577        stream
2578            .set_read_timeout(Some(Duration::from_millis(100)))
2579            .expect("configure fixture stream timeout");
2580
2581        let deadline = std::time::Instant::now() + Duration::from_secs(2);
2582        let mut buffer = Vec::new();
2583        let mut chunk = [0_u8; 1024];
2584        let header_end = loop {
2585            let read = match stream.read(&mut chunk) {
2586                Ok(read) => read,
2587                Err(error)
2588                    if matches!(
2589                        error.kind(),
2590                        std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut
2591                    ) && std::time::Instant::now() < deadline =>
2592                {
2593                    thread::sleep(Duration::from_millis(10));
2594                    continue;
2595                }
2596                Err(error) => panic!("read fixture request headers: {error}"),
2597            };
2598            if read == 0 {
2599                panic!("fixture request ended before headers were complete");
2600            }
2601            buffer.extend_from_slice(&chunk[..read]);
2602            if let Some(index) = find_header_terminator(&buffer) {
2603                break index;
2604            }
2605        };
2606
2607        let header_text = std::str::from_utf8(&buffer[..header_end]).expect("fixture request utf8");
2608        let content_length = header_text
2609            .split("\r\n")
2610            .filter_map(|line| line.split_once(':'))
2611            .find_map(|(name, value)| {
2612                name.trim()
2613                    .eq_ignore_ascii_case("content-length")
2614                    .then_some(value.trim())
2615            })
2616            .map(|value| {
2617                value
2618                    .parse::<usize>()
2619                    .expect("fixture content-length should be numeric")
2620            })
2621            .unwrap_or(0);
2622
2623        let mut body = buffer.len().saturating_sub(header_end + 4);
2624        while body < content_length {
2625            let read = match stream.read(&mut chunk) {
2626                Ok(read) => read,
2627                Err(error)
2628                    if matches!(
2629                        error.kind(),
2630                        std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut
2631                    ) && std::time::Instant::now() < deadline =>
2632                {
2633                    thread::sleep(Duration::from_millis(10));
2634                    continue;
2635                }
2636                Err(error) => panic!("read fixture request body: {error}"),
2637            };
2638            if read == 0 {
2639                panic!("fixture request body was truncated");
2640            }
2641            body += read;
2642        }
2643    }
2644
2645    fn spawn_http_server(responses: Vec<FixtureResponse>) -> (String, thread::JoinHandle<()>) {
2646        let listener = TcpListener::bind("127.0.0.1:0").expect("bind fixture server");
2647        listener
2648            .set_nonblocking(true)
2649            .expect("configure fixture server");
2650        let addr = listener.local_addr().expect("fixture server addr");
2651        let url = format!("http://{addr}/image");
2652
2653        let handle = thread::spawn(move || {
2654            for (status, headers, body) in responses {
2655                let deadline = std::time::Instant::now() + Duration::from_secs(10);
2656                let mut accepted = None;
2657                while std::time::Instant::now() < deadline {
2658                    match listener.accept() {
2659                        Ok(stream) => {
2660                            accepted = Some(stream);
2661                            break;
2662                        }
2663                        Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => {
2664                            thread::sleep(Duration::from_millis(10));
2665                        }
2666                        Err(error) => panic!("accept fixture request: {error}"),
2667                    }
2668                }
2669
2670                let Some((mut stream, _)) = accepted else {
2671                    break;
2672                };
2673                read_fixture_request(&mut stream);
2674                let mut header = format!(
2675                    "HTTP/1.1 {status}\r\nContent-Length: {}\r\nConnection: close\r\n",
2676                    body.len()
2677                );
2678                for (name, value) in headers {
2679                    header.push_str(&format!("{name}: {value}\r\n"));
2680                }
2681                header.push_str("\r\n");
2682                stream
2683                    .write_all(header.as_bytes())
2684                    .expect("write fixture headers");
2685                stream.write_all(&body).expect("write fixture body");
2686                stream.flush().expect("flush fixture response");
2687            }
2688        });
2689
2690        (url, handle)
2691    }
2692
2693    fn transform_request(path: &str) -> HttpRequest {
2694        HttpRequest {
2695            method: "POST".to_string(),
2696            target: "/images:transform".to_string(),
2697            version: "HTTP/1.1".to_string(),
2698            headers: vec![
2699                ("authorization".to_string(), "Bearer secret".to_string()),
2700                ("content-type".to_string(), "application/json".to_string()),
2701            ],
2702            body: format!(
2703                "{{\"source\":{{\"kind\":\"path\",\"path\":\"{path}\"}},\"options\":{{\"format\":\"jpeg\"}}}}"
2704            )
2705            .into_bytes(),
2706        }
2707    }
2708
2709    fn transform_url_request(url: &str) -> HttpRequest {
2710        HttpRequest {
2711            method: "POST".to_string(),
2712            target: "/images:transform".to_string(),
2713            version: "HTTP/1.1".to_string(),
2714            headers: vec![
2715                ("authorization".to_string(), "Bearer secret".to_string()),
2716                ("content-type".to_string(), "application/json".to_string()),
2717            ],
2718            body: format!(
2719                "{{\"source\":{{\"kind\":\"url\",\"url\":\"{url}\"}},\"options\":{{\"format\":\"jpeg\"}}}}"
2720            )
2721            .into_bytes(),
2722        }
2723    }
2724
2725    fn upload_request(file_bytes: &[u8], options_json: Option<&str>) -> HttpRequest {
2726        let boundary = "truss-test-boundary";
2727        let mut body = Vec::new();
2728        body.extend_from_slice(
2729            format!(
2730                "--{boundary}\r\nContent-Disposition: form-data; name=\"file\"; filename=\"image.png\"\r\nContent-Type: image/png\r\n\r\n"
2731            )
2732            .as_bytes(),
2733        );
2734        body.extend_from_slice(file_bytes);
2735        body.extend_from_slice(b"\r\n");
2736
2737        if let Some(options_json) = options_json {
2738            body.extend_from_slice(
2739                format!(
2740                    "--{boundary}\r\nContent-Disposition: form-data; name=\"options\"\r\nContent-Type: application/json\r\n\r\n{options_json}\r\n"
2741                )
2742                .as_bytes(),
2743            );
2744        }
2745
2746        body.extend_from_slice(format!("--{boundary}--\r\n").as_bytes());
2747
2748        HttpRequest {
2749            method: "POST".to_string(),
2750            target: "/images".to_string(),
2751            version: "HTTP/1.1".to_string(),
2752            headers: vec![
2753                ("authorization".to_string(), "Bearer secret".to_string()),
2754                (
2755                    "content-type".to_string(),
2756                    format!("multipart/form-data; boundary={boundary}"),
2757                ),
2758            ],
2759            body,
2760        }
2761    }
2762
2763    fn metrics_request(with_auth: bool) -> HttpRequest {
2764        let mut headers = Vec::new();
2765        if with_auth {
2766            headers.push(("authorization".to_string(), "Bearer secret".to_string()));
2767        }
2768
2769        HttpRequest {
2770            method: "GET".to_string(),
2771            target: "/metrics".to_string(),
2772            version: "HTTP/1.1".to_string(),
2773            headers,
2774            body: Vec::new(),
2775        }
2776    }
2777
2778    fn response_body(response: &HttpResponse) -> &str {
2779        std::str::from_utf8(&response.body).expect("utf8 response body")
2780    }
2781
2782    fn signed_public_request(target: &str, host: &str, secret: &str) -> HttpRequest {
2783        let (path, query) = target.split_once('?').expect("target has query");
2784        let mut query = url::form_urlencoded::parse(query.as_bytes())
2785            .into_owned()
2786            .collect::<BTreeMap<_, _>>();
2787        let signature = sign_public_query("GET", host, path, &query, secret);
2788        query.insert("signature".to_string(), signature);
2789        let final_query = url::form_urlencoded::Serializer::new(String::new())
2790            .extend_pairs(
2791                query
2792                    .iter()
2793                    .map(|(name, value)| (name.as_str(), value.as_str())),
2794            )
2795            .finish();
2796
2797        HttpRequest {
2798            method: "GET".to_string(),
2799            target: format!("{path}?{final_query}"),
2800            version: "HTTP/1.1".to_string(),
2801            headers: vec![("host".to_string(), host.to_string())],
2802            body: Vec::new(),
2803        }
2804    }
2805
2806    #[test]
2807    fn uses_default_bind_addr_when_env_is_missing() {
2808        unsafe { std::env::remove_var("TRUSS_BIND_ADDR") };
2809        assert_eq!(bind_addr(), DEFAULT_BIND_ADDR);
2810    }
2811
2812    #[test]
2813    fn authorize_signed_request_accepts_a_valid_signature() {
2814        let request = signed_public_request(
2815            "/images/by-path?path=%2Fimage.png&keyId=public-dev&expires=4102444800&format=jpeg",
2816            "assets.example.com",
2817            "secret-value",
2818        );
2819        let query = super::auth::parse_query_params(&request).expect("parse query");
2820        let config = ServerConfig::new(temp_dir("public-auth"), None)
2821            .with_signed_url_credentials("public-dev", "secret-value");
2822
2823        authorize_signed_request(&request, &query, &config).expect("signed auth should pass");
2824    }
2825
2826    #[test]
2827    fn authorize_signed_request_uses_public_base_url_authority() {
2828        let request = signed_public_request(
2829            "/images/by-path?path=%2Fimage.png&keyId=public-dev&expires=4102444800&format=jpeg",
2830            "cdn.example.com",
2831            "secret-value",
2832        );
2833        let query = super::auth::parse_query_params(&request).expect("parse query");
2834        let mut config = ServerConfig::new(temp_dir("public-authority"), None)
2835            .with_signed_url_credentials("public-dev", "secret-value");
2836        config.public_base_url = Some("https://cdn.example.com".to_string());
2837
2838        authorize_signed_request(&request, &query, &config).expect("signed auth should pass");
2839    }
2840
2841    #[test]
2842    fn negotiate_output_format_prefers_alpha_safe_formats_for_transparent_inputs() {
2843        let format = negotiate_output_format(
2844            Some("image/jpeg,image/png"),
2845            &artifact_with_alpha(true),
2846            &[],
2847        )
2848        .expect("negotiate output format")
2849        .expect("resolved output format");
2850
2851        assert_eq!(format, MediaType::Png);
2852    }
2853
2854    #[test]
2855    fn negotiate_output_format_prefers_avif_for_wildcard_accept() {
2856        let format = negotiate_output_format(Some("image/*"), &artifact_with_alpha(false), &[])
2857            .expect("negotiate output format")
2858            .expect("resolved output format");
2859
2860        assert_eq!(format, MediaType::Avif);
2861    }
2862
2863    #[test]
2864    fn build_image_response_headers_include_cache_and_safety_metadata() {
2865        let headers = build_image_response_headers(
2866            MediaType::Webp,
2867            &build_image_etag(b"demo"),
2868            ImageResponsePolicy::PublicGet,
2869            true,
2870            CacheHitStatus::Disabled,
2871            DEFAULT_PUBLIC_MAX_AGE_SECONDS,
2872            DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS,
2873            &[],
2874        );
2875
2876        assert!(headers.contains(&(
2877            "Cache-Control".to_string(),
2878            "public, max-age=3600, stale-while-revalidate=60".to_string()
2879        )));
2880        assert!(headers.contains(&("Vary".to_string(), "Accept".to_string())));
2881        assert!(headers.contains(&("X-Content-Type-Options".to_string(), "nosniff".to_string())));
2882        assert!(headers.contains(&(
2883            "Content-Disposition".to_string(),
2884            "inline; filename=\"truss.webp\"".to_string()
2885        )));
2886        assert!(headers.contains(&(
2887            "Cache-Status".to_string(),
2888            "\"truss\"; fwd=miss".to_string()
2889        )));
2890    }
2891
2892    #[test]
2893    fn build_image_response_headers_include_csp_sandbox_for_svg() {
2894        let headers = build_image_response_headers(
2895            MediaType::Svg,
2896            &build_image_etag(b"svg-data"),
2897            ImageResponsePolicy::PublicGet,
2898            true,
2899            CacheHitStatus::Disabled,
2900            DEFAULT_PUBLIC_MAX_AGE_SECONDS,
2901            DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS,
2902            &[],
2903        );
2904
2905        assert!(headers.contains(&("Content-Security-Policy".to_string(), "sandbox".to_string())));
2906    }
2907
2908    #[test]
2909    fn build_image_response_headers_omit_csp_sandbox_for_raster() {
2910        let headers = build_image_response_headers(
2911            MediaType::Png,
2912            &build_image_etag(b"png-data"),
2913            ImageResponsePolicy::PublicGet,
2914            true,
2915            CacheHitStatus::Disabled,
2916            DEFAULT_PUBLIC_MAX_AGE_SECONDS,
2917            DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS,
2918            &[],
2919        );
2920
2921        assert!(!headers.iter().any(|(k, _)| *k == "Content-Security-Policy"));
2922    }
2923
2924    #[test]
2925    fn backpressure_rejects_when_at_capacity() {
2926        let config = ServerConfig::new(std::env::temp_dir(), None);
2927        config
2928            .transforms_in_flight
2929            .store(DEFAULT_MAX_CONCURRENT_TRANSFORMS, Ordering::Relaxed);
2930
2931        let request = HttpRequest {
2932            method: "POST".to_string(),
2933            target: "/transform".to_string(),
2934            version: "HTTP/1.1".to_string(),
2935            headers: Vec::new(),
2936            body: Vec::new(),
2937        };
2938
2939        let png_bytes = {
2940            let mut buf = Vec::new();
2941            let encoder = image::codecs::png::PngEncoder::new(&mut buf);
2942            encoder
2943                .write_image(&[255, 0, 0, 255], 1, 1, image::ExtendedColorType::Rgba8)
2944                .unwrap();
2945            buf
2946        };
2947
2948        let response = transform_source_bytes(
2949            png_bytes,
2950            TransformOptions::default(),
2951            None,
2952            &request,
2953            ImageResponsePolicy::PrivateTransform,
2954            &config,
2955            WatermarkSource::None,
2956            None,
2957            None,
2958        );
2959
2960        assert!(response.status.contains("503"));
2961
2962        assert_eq!(
2963            config.transforms_in_flight.load(Ordering::Relaxed),
2964            DEFAULT_MAX_CONCURRENT_TRANSFORMS
2965        );
2966    }
2967
2968    #[test]
2969    fn backpressure_rejects_with_custom_concurrency_limit() {
2970        let custom_limit = 2u64;
2971        let mut config = ServerConfig::new(std::env::temp_dir(), None);
2972        config.max_concurrent_transforms = custom_limit;
2973        config
2974            .transforms_in_flight
2975            .store(custom_limit, Ordering::Relaxed);
2976
2977        let request = HttpRequest {
2978            method: "POST".to_string(),
2979            target: "/transform".to_string(),
2980            version: "HTTP/1.1".to_string(),
2981            headers: Vec::new(),
2982            body: Vec::new(),
2983        };
2984
2985        let png_bytes = {
2986            let mut buf = Vec::new();
2987            let encoder = image::codecs::png::PngEncoder::new(&mut buf);
2988            encoder
2989                .write_image(&[255, 0, 0, 255], 1, 1, image::ExtendedColorType::Rgba8)
2990                .unwrap();
2991            buf
2992        };
2993
2994        let response = transform_source_bytes(
2995            png_bytes,
2996            TransformOptions::default(),
2997            None,
2998            &request,
2999            ImageResponsePolicy::PrivateTransform,
3000            &config,
3001            WatermarkSource::None,
3002            None,
3003            None,
3004        );
3005
3006        assert!(response.status.contains("503"));
3007    }
3008
3009    #[test]
3010    fn compute_cache_key_is_deterministic() {
3011        let opts = TransformOptions {
3012            width: Some(300),
3013            height: Some(200),
3014            format: Some(MediaType::Webp),
3015            ..TransformOptions::default()
3016        };
3017        let key1 = super::cache::compute_cache_key("source-abc", &opts, None, None);
3018        let key2 = super::cache::compute_cache_key("source-abc", &opts, None, None);
3019        assert_eq!(key1, key2);
3020        assert_eq!(key1.len(), 64);
3021    }
3022
3023    #[test]
3024    fn compute_cache_key_differs_for_different_options() {
3025        let opts1 = TransformOptions {
3026            width: Some(300),
3027            format: Some(MediaType::Webp),
3028            ..TransformOptions::default()
3029        };
3030        let opts2 = TransformOptions {
3031            width: Some(400),
3032            format: Some(MediaType::Webp),
3033            ..TransformOptions::default()
3034        };
3035        let key1 = super::cache::compute_cache_key("same-source", &opts1, None, None);
3036        let key2 = super::cache::compute_cache_key("same-source", &opts2, None, None);
3037        assert_ne!(key1, key2);
3038    }
3039
3040    #[test]
3041    fn compute_cache_key_includes_accept_when_present() {
3042        let opts = TransformOptions::default();
3043        let key_no_accept = super::cache::compute_cache_key("src", &opts, None, None);
3044        let key_with_accept =
3045            super::cache::compute_cache_key("src", &opts, Some("image/webp"), None);
3046        assert_ne!(key_no_accept, key_with_accept);
3047    }
3048
3049    #[test]
3050    fn transform_cache_put_and_get_round_trips() {
3051        let dir = tempfile::tempdir().expect("create tempdir");
3052        let cache = super::cache::TransformCache::new(dir.path().to_path_buf());
3053
3054        cache.put(
3055            "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890",
3056            MediaType::Png,
3057            b"png-data",
3058        );
3059        let result = cache.get("abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890");
3060
3061        match result {
3062            super::cache::CacheLookup::Hit {
3063                media_type, body, ..
3064            } => {
3065                assert_eq!(media_type, MediaType::Png);
3066                assert_eq!(body, b"png-data");
3067            }
3068            super::cache::CacheLookup::Miss => panic!("expected cache hit"),
3069        }
3070    }
3071
3072    #[test]
3073    fn transform_cache_miss_for_unknown_key() {
3074        let dir = tempfile::tempdir().expect("create tempdir");
3075        let cache = super::cache::TransformCache::new(dir.path().to_path_buf());
3076
3077        let result = cache.get("0000001234567890abcdef1234567890abcdef1234567890abcdef1234567890");
3078        assert!(matches!(result, super::cache::CacheLookup::Miss));
3079    }
3080
3081    #[test]
3082    fn transform_cache_uses_sharded_layout() {
3083        let dir = tempfile::tempdir().expect("create tempdir");
3084        let cache = super::cache::TransformCache::new(dir.path().to_path_buf());
3085
3086        let key = "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890";
3087        cache.put(key, MediaType::Jpeg, b"jpeg-data");
3088
3089        let expected = dir.path().join("ab").join("cd").join("ef").join(key);
3090        assert!(
3091            expected.exists(),
3092            "sharded file should exist at {expected:?}"
3093        );
3094    }
3095
3096    #[test]
3097    fn transform_cache_expired_entry_is_miss() {
3098        let dir = tempfile::tempdir().expect("create tempdir");
3099        let mut cache = super::cache::TransformCache::new(dir.path().to_path_buf());
3100        cache.ttl = Duration::from_secs(0);
3101
3102        let key = "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890";
3103        cache.put(key, MediaType::Png, b"data");
3104
3105        std::thread::sleep(Duration::from_millis(10));
3106
3107        let result = cache.get(key);
3108        assert!(matches!(result, super::cache::CacheLookup::Miss));
3109    }
3110
3111    #[test]
3112    fn transform_cache_handles_corrupted_entry_as_miss() {
3113        let dir = tempfile::tempdir().expect("create tempdir");
3114        let cache = super::cache::TransformCache::new(dir.path().to_path_buf());
3115
3116        let key = "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890";
3117        let path = cache.entry_path(key);
3118        fs::create_dir_all(path.parent().unwrap()).unwrap();
3119        fs::write(&path, b"corrupted-data-without-header").unwrap();
3120
3121        let result = cache.get(key);
3122        assert!(matches!(result, super::cache::CacheLookup::Miss));
3123    }
3124
3125    #[test]
3126    fn cache_status_header_reflects_hit() {
3127        let headers = build_image_response_headers(
3128            MediaType::Png,
3129            &build_image_etag(b"data"),
3130            ImageResponsePolicy::PublicGet,
3131            false,
3132            CacheHitStatus::Hit,
3133            DEFAULT_PUBLIC_MAX_AGE_SECONDS,
3134            DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS,
3135            &[],
3136        );
3137        assert!(headers.contains(&("Cache-Status".to_string(), "\"truss\"; hit".to_string())));
3138    }
3139
3140    #[test]
3141    fn cache_status_header_reflects_miss() {
3142        let headers = build_image_response_headers(
3143            MediaType::Png,
3144            &build_image_etag(b"data"),
3145            ImageResponsePolicy::PublicGet,
3146            false,
3147            CacheHitStatus::Miss,
3148            DEFAULT_PUBLIC_MAX_AGE_SECONDS,
3149            DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS,
3150            &[],
3151        );
3152        assert!(headers.contains(&(
3153            "Cache-Status".to_string(),
3154            "\"truss\"; fwd=miss".to_string()
3155        )));
3156    }
3157
3158    #[test]
3159    fn origin_cache_put_and_get_round_trips() {
3160        let dir = tempfile::tempdir().expect("create tempdir");
3161        let cache = super::cache::OriginCache::new(dir.path());
3162
3163        cache.put("src", "https://example.com/image.png", b"raw-source-bytes");
3164        let result = cache.get("src", "https://example.com/image.png");
3165
3166        assert_eq!(result.as_deref(), Some(b"raw-source-bytes".as_ref()));
3167    }
3168
3169    #[test]
3170    fn origin_cache_miss_for_unknown_url() {
3171        let dir = tempfile::tempdir().expect("create tempdir");
3172        let cache = super::cache::OriginCache::new(dir.path());
3173
3174        assert!(
3175            cache
3176                .get("src", "https://unknown.example.com/missing.png")
3177                .is_none()
3178        );
3179    }
3180
3181    #[test]
3182    fn origin_cache_expired_entry_is_none() {
3183        let dir = tempfile::tempdir().expect("create tempdir");
3184        let mut cache = super::cache::OriginCache::new(dir.path());
3185        cache.ttl = Duration::from_secs(0);
3186
3187        cache.put("src", "https://example.com/img.png", b"data");
3188        std::thread::sleep(Duration::from_millis(10));
3189
3190        assert!(cache.get("src", "https://example.com/img.png").is_none());
3191    }
3192
3193    #[test]
3194    fn origin_cache_uses_origin_subdirectory() {
3195        let dir = tempfile::tempdir().expect("create tempdir");
3196        let cache = super::cache::OriginCache::new(dir.path());
3197
3198        cache.put("src", "https://example.com/test.png", b"bytes");
3199
3200        let origin_dir = dir.path().join("origin");
3201        assert!(origin_dir.exists(), "origin subdirectory should exist");
3202    }
3203
3204    #[test]
3205    fn sign_public_url_builds_a_signed_path_url() {
3206        let url = sign_public_url(
3207            "https://cdn.example.com",
3208            SignedUrlSource::Path {
3209                path: "/image.png".to_string(),
3210                version: Some("v1".to_string()),
3211            },
3212            &crate::TransformOptions {
3213                format: Some(MediaType::Jpeg),
3214                width: Some(320),
3215                ..crate::TransformOptions::default()
3216            },
3217            "public-dev",
3218            "secret-value",
3219            4_102_444_800,
3220            None,
3221            None,
3222        )
3223        .expect("sign public URL");
3224
3225        assert!(url.starts_with("https://cdn.example.com/images/by-path?"));
3226        assert!(url.contains("path=%2Fimage.png"));
3227        assert!(url.contains("version=v1"));
3228        assert!(url.contains("width=320"));
3229        assert!(url.contains("format=jpeg"));
3230        assert!(url.contains("keyId=public-dev"));
3231        assert!(url.contains("expires=4102444800"));
3232        assert!(url.contains("signature="));
3233    }
3234
3235    #[test]
3236    fn parse_public_get_request_rejects_unknown_query_parameters() {
3237        let query = BTreeMap::from([
3238            ("path".to_string(), "/image.png".to_string()),
3239            ("keyId".to_string(), "public-dev".to_string()),
3240            ("expires".to_string(), "4102444800".to_string()),
3241            ("signature".to_string(), "deadbeef".to_string()),
3242            ("unexpected".to_string(), "value".to_string()),
3243        ]);
3244
3245        let config = ServerConfig::new(temp_dir("parse-query"), None);
3246        let response = parse_public_get_request(&query, PublicSourceKind::Path, &config)
3247            .expect_err("unknown query should fail");
3248
3249        assert_eq!(response.status, "400 Bad Request");
3250        assert!(response_body(&response).contains("is not supported"));
3251    }
3252
3253    #[test]
3254    fn parse_public_get_request_resolves_preset() {
3255        let mut presets = HashMap::new();
3256        presets.insert(
3257            "thumbnail".to_string(),
3258            TransformOptionsPayload {
3259                width: Some(150),
3260                height: Some(150),
3261                fit: Some("cover".to_string()),
3262                ..TransformOptionsPayload::default()
3263            },
3264        );
3265        let config = ServerConfig::new(temp_dir("preset"), None).with_presets(presets);
3266
3267        let query = BTreeMap::from([
3268            ("path".to_string(), "/image.png".to_string()),
3269            ("preset".to_string(), "thumbnail".to_string()),
3270        ]);
3271        let (_, options, _) =
3272            parse_public_get_request(&query, PublicSourceKind::Path, &config).unwrap();
3273
3274        assert_eq!(options.width, Some(150));
3275        assert_eq!(options.height, Some(150));
3276        assert_eq!(options.fit, Some(Fit::Cover));
3277    }
3278
3279    #[test]
3280    fn parse_public_get_request_preset_with_override() {
3281        let mut presets = HashMap::new();
3282        presets.insert(
3283            "thumbnail".to_string(),
3284            TransformOptionsPayload {
3285                width: Some(150),
3286                height: Some(150),
3287                fit: Some("cover".to_string()),
3288                format: Some("webp".to_string()),
3289                ..TransformOptionsPayload::default()
3290            },
3291        );
3292        let config = ServerConfig::new(temp_dir("preset-override"), None).with_presets(presets);
3293
3294        let query = BTreeMap::from([
3295            ("path".to_string(), "/image.png".to_string()),
3296            ("preset".to_string(), "thumbnail".to_string()),
3297            ("width".to_string(), "200".to_string()),
3298            ("format".to_string(), "jpeg".to_string()),
3299        ]);
3300        let (_, options, _) =
3301            parse_public_get_request(&query, PublicSourceKind::Path, &config).unwrap();
3302
3303        assert_eq!(options.width, Some(200));
3304        assert_eq!(options.height, Some(150));
3305        assert_eq!(options.format, Some(MediaType::Jpeg));
3306    }
3307
3308    #[test]
3309    fn parse_public_get_request_rejects_unknown_preset() {
3310        let config = ServerConfig::new(temp_dir("preset-unknown"), None);
3311
3312        let query = BTreeMap::from([
3313            ("path".to_string(), "/image.png".to_string()),
3314            ("preset".to_string(), "nonexistent".to_string()),
3315        ]);
3316        let response = parse_public_get_request(&query, PublicSourceKind::Path, &config)
3317            .expect_err("unknown preset should fail");
3318
3319        assert_eq!(response.status, "400 Bad Request");
3320        assert!(response_body(&response).contains("unknown preset"));
3321    }
3322
3323    #[test]
3324    fn sign_public_url_includes_preset_in_signed_url() {
3325        let url = sign_public_url(
3326            "https://cdn.example.com",
3327            SignedUrlSource::Path {
3328                path: "/image.png".to_string(),
3329                version: None,
3330            },
3331            &crate::TransformOptions::default(),
3332            "public-dev",
3333            "secret-value",
3334            4_102_444_800,
3335            None,
3336            Some("thumbnail"),
3337        )
3338        .expect("sign public URL with preset");
3339
3340        assert!(url.contains("preset=thumbnail"));
3341        assert!(url.contains("signature="));
3342    }
3343
3344    #[test]
3345    #[serial]
3346    fn parse_presets_from_env_parses_json() {
3347        unsafe {
3348            env::set_var(
3349                "TRUSS_PRESETS",
3350                r#"{"thumb":{"width":100,"height":100,"fit":"cover"}}"#,
3351            );
3352            env::remove_var("TRUSS_PRESETS_FILE");
3353        }
3354        let (presets, file_path) = parse_presets_from_env().unwrap();
3355        unsafe {
3356            env::remove_var("TRUSS_PRESETS");
3357        }
3358
3359        assert!(file_path.is_none());
3360        assert_eq!(presets.len(), 1);
3361        let thumb = presets.get("thumb").unwrap();
3362        assert_eq!(thumb.width, Some(100));
3363        assert_eq!(thumb.height, Some(100));
3364        assert_eq!(thumb.fit.as_deref(), Some("cover"));
3365    }
3366
3367    #[test]
3368    fn prepare_remote_fetch_target_pins_the_validated_netloc() {
3369        let target = prepare_remote_fetch_target(
3370            "http://1.1.1.1/image.png",
3371            &ServerConfig::new(temp_dir("pin"), Some("secret".to_string())),
3372        )
3373        .expect("prepare remote target");
3374
3375        assert_eq!(target.netloc, "1.1.1.1:80");
3376        assert_eq!(target.addrs, vec![SocketAddr::from(([1, 1, 1, 1], 80))]);
3377    }
3378
3379    #[test]
3380    fn pinned_resolver_rejects_unexpected_netlocs() {
3381        use ureq::unversioned::resolver::Resolver;
3382
3383        let resolver = PinnedResolver {
3384            expected_netloc: "example.com:443".to_string(),
3385            addrs: vec![SocketAddr::from(([93, 184, 216, 34], 443))],
3386        };
3387
3388        let config = ureq::config::Config::builder().build();
3389        let timeout = ureq::unversioned::transport::NextTimeout {
3390            after: ureq::unversioned::transport::time::Duration::Exact(
3391                std::time::Duration::from_secs(30),
3392            ),
3393            reason: ureq::Timeout::Resolve,
3394        };
3395
3396        let uri: ureq::http::Uri = "https://example.com/path".parse().unwrap();
3397        let result = resolver
3398            .resolve(&uri, &config, timeout)
3399            .expect("resolve expected netloc");
3400        assert_eq!(&result[..], &[SocketAddr::from(([93, 184, 216, 34], 443))]);
3401
3402        let bad_uri: ureq::http::Uri = "https://proxy.example:8080/path".parse().unwrap();
3403        let timeout2 = ureq::unversioned::transport::NextTimeout {
3404            after: ureq::unversioned::transport::time::Duration::Exact(
3405                std::time::Duration::from_secs(30),
3406            ),
3407            reason: ureq::Timeout::Resolve,
3408        };
3409        let error = resolver
3410            .resolve(&bad_uri, &config, timeout2)
3411            .expect_err("unexpected netloc should fail");
3412        assert!(matches!(error, ureq::Error::HostNotFound));
3413    }
3414
3415    #[test]
3416    fn health_live_returns_status_service_version() {
3417        let request = HttpRequest {
3418            method: "GET".to_string(),
3419            target: "/health/live".to_string(),
3420            version: "HTTP/1.1".to_string(),
3421            headers: Vec::new(),
3422            body: Vec::new(),
3423        };
3424
3425        let response = route_request(request, &ServerConfig::new(temp_dir("live"), None));
3426
3427        assert_eq!(response.status, "200 OK");
3428        let body: serde_json::Value =
3429            serde_json::from_slice(&response.body).expect("parse live body");
3430        assert_eq!(body["status"], "ok");
3431        assert_eq!(body["service"], "truss");
3432        assert_eq!(body["version"], env!("CARGO_PKG_VERSION"));
3433    }
3434
3435    #[test]
3436    fn health_ready_returns_ok_when_storage_exists() {
3437        let storage = temp_dir("ready-ok");
3438        let request = HttpRequest {
3439            method: "GET".to_string(),
3440            target: "/health/ready".to_string(),
3441            version: "HTTP/1.1".to_string(),
3442            headers: Vec::new(),
3443            body: Vec::new(),
3444        };
3445
3446        let response = route_request(request, &ServerConfig::new(storage, None));
3447
3448        assert_eq!(response.status, "200 OK");
3449        let body: serde_json::Value =
3450            serde_json::from_slice(&response.body).expect("parse ready body");
3451        assert_eq!(body["status"], "ok");
3452        let checks = body["checks"].as_array().expect("checks array");
3453        assert!(
3454            checks
3455                .iter()
3456                .any(|c| c["name"] == "storageRoot" && c["status"] == "ok")
3457        );
3458    }
3459
3460    #[test]
3461    fn health_ready_returns_503_when_storage_missing() {
3462        let request = HttpRequest {
3463            method: "GET".to_string(),
3464            target: "/health/ready".to_string(),
3465            version: "HTTP/1.1".to_string(),
3466            headers: Vec::new(),
3467            body: Vec::new(),
3468        };
3469
3470        let config = ServerConfig::new(PathBuf::from("/nonexistent-truss-test-dir"), None);
3471        let response = route_request(request, &config);
3472
3473        assert_eq!(response.status, "503 Service Unavailable");
3474        let body: serde_json::Value =
3475            serde_json::from_slice(&response.body).expect("parse ready fail body");
3476        assert_eq!(body["status"], "fail");
3477        let checks = body["checks"].as_array().expect("checks array");
3478        assert!(
3479            checks
3480                .iter()
3481                .any(|c| c["name"] == "storageRoot" && c["status"] == "fail")
3482        );
3483    }
3484
3485    #[test]
3486    fn health_ready_returns_503_when_cache_root_missing() {
3487        let storage = temp_dir("ready-cache-fail");
3488        let mut config = ServerConfig::new(storage, None);
3489        config.cache_root = Some(PathBuf::from("/nonexistent-truss-cache-dir"));
3490
3491        let request = HttpRequest {
3492            method: "GET".to_string(),
3493            target: "/health/ready".to_string(),
3494            version: "HTTP/1.1".to_string(),
3495            headers: Vec::new(),
3496            body: Vec::new(),
3497        };
3498
3499        let response = route_request(request, &config);
3500
3501        assert_eq!(response.status, "503 Service Unavailable");
3502        let body: serde_json::Value =
3503            serde_json::from_slice(&response.body).expect("parse ready cache body");
3504        assert_eq!(body["status"], "fail");
3505        let checks = body["checks"].as_array().expect("checks array");
3506        assert!(
3507            checks
3508                .iter()
3509                .any(|c| c["name"] == "cacheRoot" && c["status"] == "fail")
3510        );
3511    }
3512
3513    #[test]
3514    fn health_returns_comprehensive_diagnostic() {
3515        let storage = temp_dir("health-diag");
3516        let request = HttpRequest {
3517            method: "GET".to_string(),
3518            target: "/health".to_string(),
3519            version: "HTTP/1.1".to_string(),
3520            headers: Vec::new(),
3521            body: Vec::new(),
3522        };
3523
3524        let response = route_request(request, &ServerConfig::new(storage, None));
3525
3526        assert_eq!(response.status, "200 OK");
3527        let body: serde_json::Value =
3528            serde_json::from_slice(&response.body).expect("parse health body");
3529        assert_eq!(body["status"], "ok");
3530        assert_eq!(body["service"], "truss");
3531        assert_eq!(body["version"], env!("CARGO_PKG_VERSION"));
3532        assert!(body["uptimeSeconds"].is_u64());
3533        assert!(body["checks"].is_array());
3534    }
3535
3536    #[test]
3537    fn unknown_path_returns_not_found() {
3538        let request = HttpRequest {
3539            method: "GET".to_string(),
3540            target: "/unknown".to_string(),
3541            version: "HTTP/1.1".to_string(),
3542            headers: Vec::new(),
3543            body: Vec::new(),
3544        };
3545
3546        let response = route_request(request, &ServerConfig::new(temp_dir("not-found"), None));
3547
3548        assert_eq!(response.status, "404 Not Found");
3549        assert_eq!(response.content_type, Some("application/problem+json"));
3550        let body = response_body(&response);
3551        assert!(body.contains("\"type\":\"about:blank\""));
3552        assert!(body.contains("\"title\":\"Not Found\""));
3553        assert!(body.contains("\"status\":404"));
3554        assert!(body.contains("not found"));
3555    }
3556
3557    #[test]
3558    fn transform_endpoint_requires_authentication() {
3559        let storage_root = temp_dir("auth");
3560        write_png(&storage_root.join("image.png"));
3561        let mut request = transform_request("/image.png");
3562        request.headers.retain(|(name, _)| name != "authorization");
3563
3564        let response = route_request(
3565            request,
3566            &ServerConfig::new(storage_root, Some("secret".to_string())),
3567        );
3568
3569        assert_eq!(response.status, "401 Unauthorized");
3570        assert!(response_body(&response).contains("authorization required"));
3571    }
3572
3573    #[test]
3574    fn transform_endpoint_returns_service_unavailable_without_configured_token() {
3575        let storage_root = temp_dir("token");
3576        write_png(&storage_root.join("image.png"));
3577
3578        let response = route_request(
3579            transform_request("/image.png"),
3580            &ServerConfig::new(storage_root, None),
3581        );
3582
3583        assert_eq!(response.status, "503 Service Unavailable");
3584        assert!(response_body(&response).contains("bearer token is not configured"));
3585    }
3586
3587    #[test]
3588    fn transform_endpoint_transforms_a_path_source() {
3589        let storage_root = temp_dir("transform");
3590        write_png(&storage_root.join("image.png"));
3591
3592        let response = route_request(
3593            transform_request("/image.png"),
3594            &ServerConfig::new(storage_root, Some("secret".to_string())),
3595        );
3596
3597        assert_eq!(response.status, "200 OK");
3598        assert_eq!(response.content_type, Some("image/jpeg"));
3599
3600        let artifact = sniff_artifact(RawArtifact::new(response.body, None)).expect("sniff output");
3601        assert_eq!(artifact.media_type, MediaType::Jpeg);
3602        assert_eq!(artifact.metadata.width, Some(4));
3603        assert_eq!(artifact.metadata.height, Some(3));
3604    }
3605
3606    #[test]
3607    fn transform_endpoint_rejects_private_url_sources_by_default() {
3608        let response = route_request(
3609            transform_url_request("http://127.0.0.1:8080/image.png"),
3610            &ServerConfig::new(temp_dir("url-blocked"), Some("secret".to_string())),
3611        );
3612
3613        assert_eq!(response.status, "403 Forbidden");
3614        assert!(response_body(&response).contains("port is not allowed"));
3615    }
3616
3617    #[test]
3618    fn transform_endpoint_transforms_a_url_source_when_insecure_allowance_is_enabled() {
3619        let (url, handle) = spawn_http_server(vec![(
3620            "200 OK".to_string(),
3621            vec![("Content-Type".to_string(), "image/png".to_string())],
3622            png_bytes(),
3623        )]);
3624
3625        let response = route_request(
3626            transform_url_request(&url),
3627            &ServerConfig::new(temp_dir("url"), Some("secret".to_string()))
3628                .with_insecure_url_sources(true),
3629        );
3630
3631        handle.join().expect("join fixture server");
3632
3633        assert_eq!(response.status, "200 OK");
3634        assert_eq!(response.content_type, Some("image/jpeg"));
3635
3636        let artifact = sniff_artifact(RawArtifact::new(response.body, None)).expect("sniff output");
3637        assert_eq!(artifact.media_type, MediaType::Jpeg);
3638    }
3639
3640    #[test]
3641    fn transform_endpoint_follows_remote_redirects() {
3642        let (redirect_url, handle) = spawn_http_server(vec![
3643            (
3644                "302 Found".to_string(),
3645                vec![("Location".to_string(), "/final-image".to_string())],
3646                Vec::new(),
3647            ),
3648            (
3649                "200 OK".to_string(),
3650                vec![("Content-Type".to_string(), "image/png".to_string())],
3651                png_bytes(),
3652            ),
3653        ]);
3654
3655        let response = route_request(
3656            transform_url_request(&redirect_url),
3657            &ServerConfig::new(temp_dir("redirect"), Some("secret".to_string()))
3658                .with_insecure_url_sources(true),
3659        );
3660
3661        handle.join().expect("join fixture server");
3662
3663        assert_eq!(response.status, "200 OK");
3664        let artifact = sniff_artifact(RawArtifact::new(response.body, None)).expect("sniff output");
3665        assert_eq!(artifact.media_type, MediaType::Jpeg);
3666    }
3667
3668    #[test]
3669    fn upload_endpoint_transforms_uploaded_file() {
3670        let response = route_request(
3671            upload_request(&png_bytes(), Some(r#"{"format":"jpeg"}"#)),
3672            &ServerConfig::new(temp_dir("upload"), Some("secret".to_string())),
3673        );
3674
3675        assert_eq!(response.status, "200 OK");
3676        assert_eq!(response.content_type, Some("image/jpeg"));
3677
3678        let artifact = sniff_artifact(RawArtifact::new(response.body, None)).expect("sniff output");
3679        assert_eq!(artifact.media_type, MediaType::Jpeg);
3680    }
3681
3682    #[test]
3683    fn upload_endpoint_requires_a_file_field() {
3684        let boundary = "truss-test-boundary";
3685        let request = HttpRequest {
3686            method: "POST".to_string(),
3687            target: "/images".to_string(),
3688            version: "HTTP/1.1".to_string(),
3689            headers: vec![
3690                ("authorization".to_string(), "Bearer secret".to_string()),
3691                (
3692                    "content-type".to_string(),
3693                    format!("multipart/form-data; boundary={boundary}"),
3694                ),
3695            ],
3696            body: format!(
3697                "--{boundary}\r\nContent-Disposition: form-data; name=\"options\"\r\nContent-Type: application/json\r\n\r\n{{\"format\":\"jpeg\"}}\r\n--{boundary}--\r\n"
3698            )
3699            .into_bytes(),
3700        };
3701
3702        let response = route_request(
3703            request,
3704            &ServerConfig::new(temp_dir("upload-missing-file"), Some("secret".to_string())),
3705        );
3706
3707        assert_eq!(response.status, "400 Bad Request");
3708        assert!(response_body(&response).contains("requires a `file` field"));
3709    }
3710
3711    #[test]
3712    fn upload_endpoint_rejects_non_multipart_content_type() {
3713        let request = HttpRequest {
3714            method: "POST".to_string(),
3715            target: "/images".to_string(),
3716            version: "HTTP/1.1".to_string(),
3717            headers: vec![
3718                ("authorization".to_string(), "Bearer secret".to_string()),
3719                ("content-type".to_string(), "application/json".to_string()),
3720            ],
3721            body: br#"{"file":"not-really-json"}"#.to_vec(),
3722        };
3723
3724        let response = route_request(
3725            request,
3726            &ServerConfig::new(temp_dir("upload-content-type"), Some("secret".to_string())),
3727        );
3728
3729        assert_eq!(response.status, "415 Unsupported Media Type");
3730        assert!(response_body(&response).contains("multipart/form-data"));
3731    }
3732
3733    #[test]
3734    fn parse_upload_request_extracts_file_and_options() {
3735        let request = upload_request(&png_bytes(), Some(r#"{"width":8,"format":"jpeg"}"#));
3736        let boundary =
3737            super::multipart::parse_multipart_boundary(&request).expect("parse boundary");
3738        let (file_bytes, options, _watermark) =
3739            super::multipart::parse_upload_request(&request.body, &boundary)
3740                .expect("parse upload body");
3741
3742        assert_eq!(file_bytes, png_bytes());
3743        assert_eq!(options.width, Some(8));
3744        assert_eq!(options.format, Some(MediaType::Jpeg));
3745    }
3746
3747    #[test]
3748    fn metrics_endpoint_does_not_require_authentication() {
3749        let response = route_request(
3750            metrics_request(false),
3751            &ServerConfig::new(temp_dir("metrics-no-auth"), Some("secret".to_string())),
3752        );
3753
3754        assert_eq!(response.status, "200 OK");
3755    }
3756
3757    #[test]
3758    fn metrics_endpoint_returns_prometheus_text() {
3759        super::metrics::record_http_metrics(super::metrics::RouteMetric::Health, "200 OK");
3760        let response = route_request(
3761            metrics_request(true),
3762            &ServerConfig::new(temp_dir("metrics"), Some("secret".to_string())),
3763        );
3764        let body = response_body(&response);
3765
3766        assert_eq!(response.status, "200 OK");
3767        assert_eq!(
3768            response.content_type,
3769            Some("text/plain; version=0.0.4; charset=utf-8")
3770        );
3771        assert!(body.contains("truss_http_requests_total"));
3772        assert!(body.contains("truss_http_requests_by_route_total{route=\"/health\"}"));
3773        assert!(body.contains("truss_http_responses_total{status=\"200\"}"));
3774        // Histogram metrics
3775        assert!(body.contains("# TYPE truss_http_request_duration_seconds histogram"));
3776        assert!(
3777            body.contains(
3778                "truss_http_request_duration_seconds_bucket{route=\"/health\",le=\"+Inf\"}"
3779            )
3780        );
3781        assert!(body.contains("# TYPE truss_transform_duration_seconds histogram"));
3782        assert!(body.contains("# TYPE truss_storage_request_duration_seconds histogram"));
3783        // Transform error counter
3784        assert!(body.contains("# TYPE truss_transform_errors_total counter"));
3785        assert!(body.contains("truss_transform_errors_total{error_type=\"decode_failed\"}"));
3786    }
3787
3788    #[test]
3789    fn metrics_endpoint_returns_401_when_token_required() {
3790        let mut config = ServerConfig::new(temp_dir("metrics-auth"), None);
3791        config.metrics_token = Some("my-secret-token".to_string());
3792
3793        // No auth header → 401
3794        let response = route_request(metrics_request(false), &config);
3795        assert_eq!(response.status, "401 Unauthorized");
3796    }
3797
3798    #[test]
3799    fn metrics_endpoint_accepts_valid_token() {
3800        let mut config = ServerConfig::new(temp_dir("metrics-auth-ok"), None);
3801        config.metrics_token = Some("secret".to_string());
3802
3803        // Bearer secret matches
3804        let response = route_request(metrics_request(true), &config);
3805        assert_eq!(response.status, "200 OK");
3806    }
3807
3808    #[test]
3809    fn metrics_endpoint_rejects_wrong_token() {
3810        let mut config = ServerConfig::new(temp_dir("metrics-auth-bad"), None);
3811        config.metrics_token = Some("correct-token".to_string());
3812
3813        // Bearer secret ≠ correct-token
3814        let response = route_request(metrics_request(true), &config);
3815        assert_eq!(response.status, "401 Unauthorized");
3816    }
3817
3818    #[test]
3819    fn metrics_endpoint_returns_404_when_disabled() {
3820        let mut config = ServerConfig::new(temp_dir("metrics-disabled"), None);
3821        config.disable_metrics = true;
3822
3823        let response = route_request(metrics_request(false), &config);
3824        assert_eq!(response.status, "404 Not Found");
3825    }
3826
3827    #[test]
3828    fn transform_endpoint_rejects_unsupported_remote_content_encoding() {
3829        let (url, handle) = spawn_http_server(vec![(
3830            "200 OK".to_string(),
3831            vec![
3832                ("Content-Type".to_string(), "image/png".to_string()),
3833                ("Content-Encoding".to_string(), "compress".to_string()),
3834            ],
3835            png_bytes(),
3836        )]);
3837
3838        let response = route_request(
3839            transform_url_request(&url),
3840            &ServerConfig::new(temp_dir("encoding"), Some("secret".to_string()))
3841                .with_insecure_url_sources(true),
3842        );
3843
3844        handle.join().expect("join fixture server");
3845
3846        assert_eq!(response.status, "502 Bad Gateway");
3847        assert!(response_body(&response).contains("unsupported content-encoding"));
3848    }
3849
3850    #[test]
3851    fn resolve_storage_path_rejects_parent_segments() {
3852        let storage_root = temp_dir("resolve");
3853        let response = resolve_storage_path(&storage_root, "../escape.png")
3854            .expect_err("parent segments should be rejected");
3855
3856        assert_eq!(response.status, "400 Bad Request");
3857        assert!(response_body(&response).contains("must not contain root"));
3858    }
3859
3860    #[test]
3861    fn read_request_parses_headers_and_body() {
3862        let request_bytes = b"POST /images:transform HTTP/1.1\r\nHost: localhost\r\nContent-Type: application/json\r\nContent-Length: 2\r\n\r\n{}";
3863        let mut cursor = Cursor::new(request_bytes);
3864        let request = read_request(&mut cursor).expect("parse request");
3865
3866        assert_eq!(request.method, "POST");
3867        assert_eq!(request.target, "/images:transform");
3868        assert_eq!(request.version, "HTTP/1.1");
3869        assert_eq!(request.header("host"), Some("localhost"));
3870        assert_eq!(request.body, b"{}");
3871    }
3872
3873    #[test]
3874    fn read_request_rejects_duplicate_content_length() {
3875        let request_bytes =
3876            b"POST /images:transform HTTP/1.1\r\nContent-Length: 2\r\nContent-Length: 2\r\n\r\n{}";
3877        let mut cursor = Cursor::new(request_bytes);
3878        let response = read_request(&mut cursor).expect_err("duplicate headers should fail");
3879
3880        assert_eq!(response.status, "400 Bad Request");
3881        assert!(response_body(&response).contains("content-length"));
3882    }
3883
3884    #[test]
3885    fn serve_once_handles_a_tcp_request() {
3886        let storage_root = temp_dir("serve-once");
3887        let config = ServerConfig::new(storage_root, None);
3888        let listener = TcpListener::bind("127.0.0.1:0").expect("bind test listener");
3889        let addr = listener.local_addr().expect("read local addr");
3890
3891        let server = thread::spawn(move || serve_once_with_config(listener, config));
3892
3893        let mut stream = TcpStream::connect(addr).expect("connect to test server");
3894        stream
3895            .write_all(b"GET /health/live HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n")
3896            .expect("write request");
3897
3898        let mut response = String::new();
3899        stream.read_to_string(&mut response).expect("read response");
3900
3901        server
3902            .join()
3903            .expect("join test server thread")
3904            .expect("serve one request");
3905
3906        assert!(response.starts_with("HTTP/1.1 200 OK"));
3907        assert!(response.contains("Content-Type: application/json"));
3908        assert!(response.contains("\"status\":\"ok\""));
3909        assert!(response.contains("\"service\":\"truss\""));
3910        assert!(response.contains("\"version\":"));
3911    }
3912
3913    #[test]
3914    fn helper_error_responses_use_rfc7807_problem_details() {
3915        let response = auth_required_response("authorization required");
3916        let bad_request = bad_request_response("bad input");
3917
3918        assert_eq!(
3919            response.content_type,
3920            Some("application/problem+json"),
3921            "error responses must use application/problem+json"
3922        );
3923        assert_eq!(bad_request.content_type, Some("application/problem+json"),);
3924
3925        let auth_body = response_body(&response);
3926        assert!(auth_body.contains("authorization required"));
3927        assert!(auth_body.contains("\"type\":\"about:blank\""));
3928        assert!(auth_body.contains("\"title\":\"Unauthorized\""));
3929        assert!(auth_body.contains("\"status\":401"));
3930
3931        let bad_body = response_body(&bad_request);
3932        assert!(bad_body.contains("bad input"));
3933        assert!(bad_body.contains("\"type\":\"about:blank\""));
3934        assert!(bad_body.contains("\"title\":\"Bad Request\""));
3935        assert!(bad_body.contains("\"status\":400"));
3936    }
3937
3938    #[test]
3939    fn parse_headers_rejects_duplicate_host() {
3940        let lines = "Host: example.com\r\nHost: evil.com\r\n";
3941        let result = super::http_parse::parse_headers(lines.split("\r\n"));
3942        assert!(result.is_err());
3943    }
3944
3945    #[test]
3946    fn parse_headers_rejects_duplicate_authorization() {
3947        let lines = "Authorization: Bearer a\r\nAuthorization: Bearer b\r\n";
3948        let result = super::http_parse::parse_headers(lines.split("\r\n"));
3949        assert!(result.is_err());
3950    }
3951
3952    #[test]
3953    fn parse_headers_rejects_duplicate_content_type() {
3954        let lines = "Content-Type: application/json\r\nContent-Type: text/plain\r\n";
3955        let result = super::http_parse::parse_headers(lines.split("\r\n"));
3956        assert!(result.is_err());
3957    }
3958
3959    #[test]
3960    fn parse_headers_rejects_duplicate_transfer_encoding() {
3961        let lines = "Transfer-Encoding: chunked\r\nTransfer-Encoding: gzip\r\n";
3962        let result = super::http_parse::parse_headers(lines.split("\r\n"));
3963        assert!(result.is_err());
3964    }
3965
3966    #[test]
3967    fn parse_headers_rejects_single_transfer_encoding() {
3968        let lines = "Host: example.com\r\nTransfer-Encoding: chunked\r\n";
3969        let result = super::http_parse::parse_headers(lines.split("\r\n"));
3970        let err = result.unwrap_err();
3971        assert!(
3972            err.status.starts_with("501"),
3973            "expected 501 status, got: {}",
3974            err.status
3975        );
3976        assert!(
3977            String::from_utf8_lossy(&err.body).contains("Transfer-Encoding"),
3978            "error response should mention Transfer-Encoding"
3979        );
3980    }
3981
3982    #[test]
3983    fn parse_headers_rejects_transfer_encoding_identity() {
3984        let lines = "Transfer-Encoding: identity\r\n";
3985        let result = super::http_parse::parse_headers(lines.split("\r\n"));
3986        assert!(result.is_err());
3987    }
3988
3989    #[test]
3990    fn parse_headers_allows_single_instances_of_singleton_headers() {
3991        let lines =
3992            "Host: example.com\r\nAuthorization: Bearer tok\r\nContent-Type: application/json\r\n";
3993        let result = super::http_parse::parse_headers(lines.split("\r\n"));
3994        assert!(result.is_ok());
3995        assert_eq!(result.unwrap().len(), 3);
3996    }
3997
3998    #[test]
3999    fn max_body_for_multipart_uses_upload_limit() {
4000        let headers = vec![(
4001            "content-type".to_string(),
4002            "multipart/form-data; boundary=abc".to_string(),
4003        )];
4004        assert_eq!(
4005            super::http_parse::max_body_for_headers(
4006                &headers,
4007                super::http_parse::DEFAULT_MAX_UPLOAD_BODY_BYTES
4008            ),
4009            super::http_parse::DEFAULT_MAX_UPLOAD_BODY_BYTES
4010        );
4011    }
4012
4013    #[test]
4014    fn max_body_for_json_uses_default_limit() {
4015        let headers = vec![("content-type".to_string(), "application/json".to_string())];
4016        assert_eq!(
4017            super::http_parse::max_body_for_headers(
4018                &headers,
4019                super::http_parse::DEFAULT_MAX_UPLOAD_BODY_BYTES
4020            ),
4021            super::http_parse::MAX_REQUEST_BODY_BYTES
4022        );
4023    }
4024
4025    #[test]
4026    fn max_body_for_no_content_type_uses_default_limit() {
4027        let headers: Vec<(String, String)> = vec![];
4028        assert_eq!(
4029            super::http_parse::max_body_for_headers(
4030                &headers,
4031                super::http_parse::DEFAULT_MAX_UPLOAD_BODY_BYTES
4032            ),
4033            super::http_parse::MAX_REQUEST_BODY_BYTES
4034        );
4035    }
4036
4037    fn make_test_config() -> ServerConfig {
4038        ServerConfig::new(std::env::temp_dir(), None)
4039    }
4040
4041    #[test]
4042    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4043    fn storage_backend_parse_filesystem_aliases() {
4044        assert_eq!(
4045            super::StorageBackend::parse("filesystem").unwrap(),
4046            super::StorageBackend::Filesystem
4047        );
4048        assert_eq!(
4049            super::StorageBackend::parse("fs").unwrap(),
4050            super::StorageBackend::Filesystem
4051        );
4052        assert_eq!(
4053            super::StorageBackend::parse("local").unwrap(),
4054            super::StorageBackend::Filesystem
4055        );
4056    }
4057
4058    #[test]
4059    #[cfg(feature = "s3")]
4060    fn storage_backend_parse_s3() {
4061        assert_eq!(
4062            super::StorageBackend::parse("s3").unwrap(),
4063            super::StorageBackend::S3
4064        );
4065        assert_eq!(
4066            super::StorageBackend::parse("S3").unwrap(),
4067            super::StorageBackend::S3
4068        );
4069    }
4070
4071    #[test]
4072    #[cfg(feature = "gcs")]
4073    fn storage_backend_parse_gcs() {
4074        assert_eq!(
4075            super::StorageBackend::parse("gcs").unwrap(),
4076            super::StorageBackend::Gcs
4077        );
4078        assert_eq!(
4079            super::StorageBackend::parse("GCS").unwrap(),
4080            super::StorageBackend::Gcs
4081        );
4082    }
4083
4084    #[test]
4085    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4086    fn storage_backend_parse_rejects_unknown() {
4087        assert!(super::StorageBackend::parse("").is_err());
4088        #[cfg(not(feature = "azure"))]
4089        assert!(super::StorageBackend::parse("azure").is_err());
4090        #[cfg(feature = "azure")]
4091        assert!(super::StorageBackend::parse("azure").is_ok());
4092    }
4093
4094    #[test]
4095    fn versioned_source_hash_returns_none_without_version() {
4096        let source = TransformSourcePayload::Path {
4097            path: "/photos/hero.jpg".to_string(),
4098            version: None,
4099        };
4100        assert!(source.versioned_source_hash(&make_test_config()).is_none());
4101    }
4102
4103    #[test]
4104    fn versioned_source_hash_is_deterministic() {
4105        let cfg = make_test_config();
4106        let source = TransformSourcePayload::Path {
4107            path: "/photos/hero.jpg".to_string(),
4108            version: Some("v1".to_string()),
4109        };
4110        let hash1 = source.versioned_source_hash(&cfg).unwrap();
4111        let hash2 = source.versioned_source_hash(&cfg).unwrap();
4112        assert_eq!(hash1, hash2);
4113        assert_eq!(hash1.len(), 64);
4114    }
4115
4116    #[test]
4117    fn versioned_source_hash_differs_by_version() {
4118        let cfg = make_test_config();
4119        let v1 = TransformSourcePayload::Path {
4120            path: "/photos/hero.jpg".to_string(),
4121            version: Some("v1".to_string()),
4122        };
4123        let v2 = TransformSourcePayload::Path {
4124            path: "/photos/hero.jpg".to_string(),
4125            version: Some("v2".to_string()),
4126        };
4127        assert_ne!(
4128            v1.versioned_source_hash(&cfg).unwrap(),
4129            v2.versioned_source_hash(&cfg).unwrap()
4130        );
4131    }
4132
4133    #[test]
4134    fn versioned_source_hash_differs_by_kind() {
4135        let cfg = make_test_config();
4136        let path = TransformSourcePayload::Path {
4137            path: "example.com/image.jpg".to_string(),
4138            version: Some("v1".to_string()),
4139        };
4140        let url = TransformSourcePayload::Url {
4141            url: "example.com/image.jpg".to_string(),
4142            version: Some("v1".to_string()),
4143        };
4144        assert_ne!(
4145            path.versioned_source_hash(&cfg).unwrap(),
4146            url.versioned_source_hash(&cfg).unwrap()
4147        );
4148    }
4149
4150    #[test]
4151    fn versioned_source_hash_differs_by_storage_root() {
4152        let cfg1 = ServerConfig::new(PathBuf::from("/data/images"), None);
4153        let cfg2 = ServerConfig::new(PathBuf::from("/other/images"), None);
4154        let source = TransformSourcePayload::Path {
4155            path: "/photos/hero.jpg".to_string(),
4156            version: Some("v1".to_string()),
4157        };
4158        assert_ne!(
4159            source.versioned_source_hash(&cfg1).unwrap(),
4160            source.versioned_source_hash(&cfg2).unwrap()
4161        );
4162    }
4163
4164    #[test]
4165    fn versioned_source_hash_differs_by_insecure_flag() {
4166        let mut cfg1 = make_test_config();
4167        cfg1.allow_insecure_url_sources = false;
4168        let mut cfg2 = make_test_config();
4169        cfg2.allow_insecure_url_sources = true;
4170        let source = TransformSourcePayload::Url {
4171            url: "http://example.com/img.jpg".to_string(),
4172            version: Some("v1".to_string()),
4173        };
4174        assert_ne!(
4175            source.versioned_source_hash(&cfg1).unwrap(),
4176            source.versioned_source_hash(&cfg2).unwrap()
4177        );
4178    }
4179
4180    #[test]
4181    #[cfg(feature = "s3")]
4182    fn versioned_source_hash_storage_variant_is_deterministic() {
4183        let cfg = make_test_config();
4184        let source = TransformSourcePayload::Storage {
4185            bucket: Some("my-bucket".to_string()),
4186            key: "photos/hero.jpg".to_string(),
4187            version: Some("v1".to_string()),
4188        };
4189        let hash1 = source.versioned_source_hash(&cfg).unwrap();
4190        let hash2 = source.versioned_source_hash(&cfg).unwrap();
4191        assert_eq!(hash1, hash2);
4192        assert_eq!(hash1.len(), 64);
4193    }
4194
4195    #[test]
4196    #[cfg(feature = "s3")]
4197    fn versioned_source_hash_storage_differs_from_path() {
4198        let cfg = make_test_config();
4199        let path_source = TransformSourcePayload::Path {
4200            path: "photos/hero.jpg".to_string(),
4201            version: Some("v1".to_string()),
4202        };
4203        let storage_source = TransformSourcePayload::Storage {
4204            bucket: Some("my-bucket".to_string()),
4205            key: "photos/hero.jpg".to_string(),
4206            version: Some("v1".to_string()),
4207        };
4208        assert_ne!(
4209            path_source.versioned_source_hash(&cfg).unwrap(),
4210            storage_source.versioned_source_hash(&cfg).unwrap()
4211        );
4212    }
4213
4214    #[test]
4215    #[cfg(feature = "s3")]
4216    fn versioned_source_hash_storage_differs_by_bucket() {
4217        let cfg = make_test_config();
4218        let s1 = TransformSourcePayload::Storage {
4219            bucket: Some("bucket-a".to_string()),
4220            key: "image.jpg".to_string(),
4221            version: Some("v1".to_string()),
4222        };
4223        let s2 = TransformSourcePayload::Storage {
4224            bucket: Some("bucket-b".to_string()),
4225            key: "image.jpg".to_string(),
4226            version: Some("v1".to_string()),
4227        };
4228        assert_ne!(
4229            s1.versioned_source_hash(&cfg).unwrap(),
4230            s2.versioned_source_hash(&cfg).unwrap()
4231        );
4232    }
4233
4234    #[test]
4235    #[cfg(feature = "s3")]
4236    fn versioned_source_hash_differs_by_backend() {
4237        let cfg_fs = make_test_config();
4238        let mut cfg_s3 = make_test_config();
4239        cfg_s3.storage_backend = super::StorageBackend::S3;
4240
4241        let source = TransformSourcePayload::Path {
4242            path: "photos/hero.jpg".to_string(),
4243            version: Some("v1".to_string()),
4244        };
4245        assert_ne!(
4246            source.versioned_source_hash(&cfg_fs).unwrap(),
4247            source.versioned_source_hash(&cfg_s3).unwrap()
4248        );
4249    }
4250
4251    #[test]
4252    #[cfg(feature = "s3")]
4253    fn versioned_source_hash_storage_differs_by_endpoint() {
4254        let mut cfg_a = make_test_config();
4255        cfg_a.storage_backend = super::StorageBackend::S3;
4256        cfg_a.s3_context = Some(std::sync::Arc::new(super::s3::S3Context::for_test(
4257            "shared",
4258            Some("http://minio-a:9000"),
4259        )));
4260
4261        let mut cfg_b = make_test_config();
4262        cfg_b.storage_backend = super::StorageBackend::S3;
4263        cfg_b.s3_context = Some(std::sync::Arc::new(super::s3::S3Context::for_test(
4264            "shared",
4265            Some("http://minio-b:9000"),
4266        )));
4267
4268        let source = TransformSourcePayload::Storage {
4269            bucket: None,
4270            key: "image.jpg".to_string(),
4271            version: Some("v1".to_string()),
4272        };
4273        assert_ne!(
4274            source.versioned_source_hash(&cfg_a).unwrap(),
4275            source.versioned_source_hash(&cfg_b).unwrap(),
4276        );
4277        assert_ne!(cfg_a, cfg_b);
4278    }
4279
4280    #[test]
4281    #[cfg(feature = "s3")]
4282    fn storage_backend_default_is_filesystem() {
4283        let cfg = make_test_config();
4284        assert_eq!(cfg.storage_backend, super::StorageBackend::Filesystem);
4285        assert!(cfg.s3_context.is_none());
4286    }
4287
4288    #[test]
4289    #[cfg(feature = "s3")]
4290    fn storage_payload_deserializes_storage_variant() {
4291        let json = r#"{"source":{"kind":"storage","key":"photos/hero.jpg"},"options":{}}"#;
4292        let payload: super::TransformImageRequestPayload = serde_json::from_str(json).unwrap();
4293        match payload.source {
4294            TransformSourcePayload::Storage {
4295                bucket,
4296                key,
4297                version,
4298            } => {
4299                assert!(bucket.is_none());
4300                assert_eq!(key, "photos/hero.jpg");
4301                assert!(version.is_none());
4302            }
4303            _ => panic!("expected Storage variant"),
4304        }
4305    }
4306
4307    #[test]
4308    #[cfg(feature = "s3")]
4309    fn storage_payload_deserializes_with_bucket() {
4310        let json = r#"{"source":{"kind":"storage","bucket":"my-bucket","key":"img.png","version":"v2"},"options":{}}"#;
4311        let payload: super::TransformImageRequestPayload = serde_json::from_str(json).unwrap();
4312        match payload.source {
4313            TransformSourcePayload::Storage {
4314                bucket,
4315                key,
4316                version,
4317            } => {
4318                assert_eq!(bucket.as_deref(), Some("my-bucket"));
4319                assert_eq!(key, "img.png");
4320                assert_eq!(version.as_deref(), Some("v2"));
4321            }
4322            _ => panic!("expected Storage variant"),
4323        }
4324    }
4325
4326    // -----------------------------------------------------------------------
4327    // S3: default_bucket fallback with bucket: None
4328    // -----------------------------------------------------------------------
4329
4330    #[test]
4331    #[cfg(feature = "s3")]
4332    fn versioned_source_hash_uses_default_bucket_when_bucket_is_none() {
4333        let mut cfg_a = make_test_config();
4334        cfg_a.storage_backend = super::StorageBackend::S3;
4335        cfg_a.s3_context = Some(std::sync::Arc::new(super::s3::S3Context::for_test(
4336            "bucket-a", None,
4337        )));
4338
4339        let mut cfg_b = make_test_config();
4340        cfg_b.storage_backend = super::StorageBackend::S3;
4341        cfg_b.s3_context = Some(std::sync::Arc::new(super::s3::S3Context::for_test(
4342            "bucket-b", None,
4343        )));
4344
4345        let source = TransformSourcePayload::Storage {
4346            bucket: None,
4347            key: "image.jpg".to_string(),
4348            version: Some("v1".to_string()),
4349        };
4350        // Different default_bucket ⇒ different hash
4351        assert_ne!(
4352            source.versioned_source_hash(&cfg_a).unwrap(),
4353            source.versioned_source_hash(&cfg_b).unwrap(),
4354        );
4355        // PartialEq also distinguishes them
4356        assert_ne!(cfg_a, cfg_b);
4357    }
4358
4359    #[test]
4360    #[cfg(feature = "s3")]
4361    fn versioned_source_hash_returns_none_without_bucket_or_context() {
4362        let mut cfg = make_test_config();
4363        cfg.storage_backend = super::StorageBackend::S3;
4364        cfg.s3_context = None;
4365
4366        let source = TransformSourcePayload::Storage {
4367            bucket: None,
4368            key: "image.jpg".to_string(),
4369            version: Some("v1".to_string()),
4370        };
4371        // No bucket available ⇒ None (falls back to content-hash)
4372        assert!(source.versioned_source_hash(&cfg).is_none());
4373    }
4374
4375    // -----------------------------------------------------------------------
4376    // S3: from_env branches
4377    //
4378    // These tests mutate process-global environment variables. A mutex
4379    // serializes them so that parallel test threads cannot interfere, and
4380    // each test saves/restores the variables it touches.
4381    // -----------------------------------------------------------------------
4382
4383    #[cfg(feature = "s3")]
4384    static FROM_ENV_MUTEX: std::sync::Mutex<()> = std::sync::Mutex::new(());
4385
4386    #[cfg(feature = "s3")]
4387    const S3_ENV_VARS: &[&str] = &[
4388        "TRUSS_STORAGE_ROOT",
4389        "TRUSS_STORAGE_BACKEND",
4390        "TRUSS_S3_BUCKET",
4391    ];
4392
4393    /// Save current values, run `f`, then restore originals regardless of
4394    /// panics. Holds `FROM_ENV_MUTEX` for the duration.
4395    #[cfg(feature = "s3")]
4396    fn with_s3_env<F: FnOnce()>(vars: &[(&str, Option<&str>)], f: F) {
4397        let _guard = FROM_ENV_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
4398        let saved: Vec<(&str, Option<String>)> = S3_ENV_VARS
4399            .iter()
4400            .map(|k| (*k, std::env::var(k).ok()))
4401            .collect();
4402        // Apply requested overrides
4403        for &(key, value) in vars {
4404            match value {
4405                Some(v) => unsafe { std::env::set_var(key, v) },
4406                None => unsafe { std::env::remove_var(key) },
4407            }
4408        }
4409        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(f));
4410        // Restore originals
4411        for (key, original) in saved {
4412            match original {
4413                Some(v) => unsafe { std::env::set_var(key, v) },
4414                None => unsafe { std::env::remove_var(key) },
4415            }
4416        }
4417        if let Err(payload) = result {
4418            std::panic::resume_unwind(payload);
4419        }
4420    }
4421
4422    #[test]
4423    #[cfg(feature = "s3")]
4424    fn from_env_rejects_invalid_storage_backend() {
4425        let storage = temp_dir("env-bad-backend");
4426        let storage_str = storage.to_str().unwrap().to_string();
4427        with_s3_env(
4428            &[
4429                ("TRUSS_STORAGE_ROOT", Some(&storage_str)),
4430                ("TRUSS_STORAGE_BACKEND", Some("nosuchbackend")),
4431                ("TRUSS_S3_BUCKET", None),
4432            ],
4433            || {
4434                let result = ServerConfig::from_env();
4435                assert!(result.is_err());
4436                let msg = result.unwrap_err().to_string();
4437                assert!(msg.contains("unknown storage backend"), "got: {msg}");
4438            },
4439        );
4440        let _ = std::fs::remove_dir_all(storage);
4441    }
4442
4443    #[test]
4444    #[cfg(feature = "s3")]
4445    fn from_env_rejects_s3_without_bucket() {
4446        let storage = temp_dir("env-no-bucket");
4447        let storage_str = storage.to_str().unwrap().to_string();
4448        with_s3_env(
4449            &[
4450                ("TRUSS_STORAGE_ROOT", Some(&storage_str)),
4451                ("TRUSS_STORAGE_BACKEND", Some("s3")),
4452                ("TRUSS_S3_BUCKET", None),
4453            ],
4454            || {
4455                let result = ServerConfig::from_env();
4456                assert!(result.is_err());
4457                let msg = result.unwrap_err().to_string();
4458                assert!(msg.contains("TRUSS_S3_BUCKET"), "got: {msg}");
4459            },
4460        );
4461        let _ = std::fs::remove_dir_all(storage);
4462    }
4463
4464    #[test]
4465    #[cfg(feature = "s3")]
4466    fn from_env_accepts_s3_with_bucket() {
4467        let storage = temp_dir("env-s3-ok");
4468        let storage_str = storage.to_str().unwrap().to_string();
4469        with_s3_env(
4470            &[
4471                ("TRUSS_STORAGE_ROOT", Some(&storage_str)),
4472                ("TRUSS_STORAGE_BACKEND", Some("s3")),
4473                ("TRUSS_S3_BUCKET", Some("my-images")),
4474            ],
4475            || {
4476                let cfg =
4477                    ServerConfig::from_env().expect("from_env should succeed with s3 + bucket");
4478                assert_eq!(cfg.storage_backend, super::StorageBackend::S3);
4479                let ctx = cfg.s3_context.expect("s3_context should be Some");
4480                assert_eq!(ctx.default_bucket, "my-images");
4481            },
4482        );
4483        let _ = std::fs::remove_dir_all(storage);
4484    }
4485
4486    // -----------------------------------------------------------------------
4487    // S3: health endpoint
4488    // -----------------------------------------------------------------------
4489
4490    #[test]
4491    #[cfg(feature = "s3")]
4492    fn health_ready_s3_returns_503_when_context_missing() {
4493        let storage = temp_dir("health-s3-no-ctx");
4494        let mut config = ServerConfig::new(storage.clone(), None);
4495        config.storage_backend = super::StorageBackend::S3;
4496        config.s3_context = None;
4497
4498        let request = super::http_parse::HttpRequest {
4499            method: "GET".to_string(),
4500            target: "/health/ready".to_string(),
4501            version: "HTTP/1.1".to_string(),
4502            headers: Vec::new(),
4503            body: Vec::new(),
4504        };
4505        let response = route_request(request, &config);
4506        let _ = std::fs::remove_dir_all(storage);
4507
4508        assert_eq!(response.status, "503 Service Unavailable");
4509        let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
4510        let checks = body["checks"].as_array().expect("checks array");
4511        assert!(
4512            checks
4513                .iter()
4514                .any(|c| c["name"] == "storageBackend" && c["status"] == "fail"),
4515            "expected s3Client fail check in {body}",
4516        );
4517    }
4518
4519    #[test]
4520    #[cfg(feature = "s3")]
4521    fn health_ready_s3_includes_s3_client_check() {
4522        let storage = temp_dir("health-s3-ok");
4523        let mut config = ServerConfig::new(storage.clone(), None);
4524        config.storage_backend = super::StorageBackend::S3;
4525        config.s3_context = Some(std::sync::Arc::new(super::s3::S3Context::for_test(
4526            "test-bucket",
4527            None,
4528        )));
4529
4530        let request = super::http_parse::HttpRequest {
4531            method: "GET".to_string(),
4532            target: "/health/ready".to_string(),
4533            version: "HTTP/1.1".to_string(),
4534            headers: Vec::new(),
4535            body: Vec::new(),
4536        };
4537        let response = route_request(request, &config);
4538        let _ = std::fs::remove_dir_all(storage);
4539
4540        // The s3Client check will report "fail" because there is no real S3
4541        // endpoint, but the important thing is that the check is present.
4542        let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
4543        let checks = body["checks"].as_array().expect("checks array");
4544        assert!(
4545            checks.iter().any(|c| c["name"] == "storageBackend"),
4546            "expected s3Client check in {body}",
4547        );
4548    }
4549
4550    // -----------------------------------------------------------------------
4551    // S3: public by-path remap (leading slash trimmed, Storage variant used)
4552    // -----------------------------------------------------------------------
4553
4554    /// Replicates the Path→Storage remap that `handle_public_get_request`
4555    /// performs when `storage_backend == S3`, so we can inspect the resulting
4556    /// key without issuing a real S3 request.
4557    #[cfg(feature = "s3")]
4558    fn remap_path_to_storage(path: &str, version: Option<&str>) -> TransformSourcePayload {
4559        let source = TransformSourcePayload::Path {
4560            path: path.to_string(),
4561            version: version.map(|v| v.to_string()),
4562        };
4563        match source {
4564            TransformSourcePayload::Path { path, version } => TransformSourcePayload::Storage {
4565                bucket: None,
4566                key: path.trim_start_matches('/').to_string(),
4567                version,
4568            },
4569            other => other,
4570        }
4571    }
4572
4573    #[test]
4574    #[cfg(feature = "s3")]
4575    fn public_by_path_s3_remap_trims_leading_slash() {
4576        // Paths with a leading slash (the common case from signed URLs like
4577        // `path=/image.png`) must have the slash stripped so that the S3 key
4578        // is `image.png`, not `/image.png`.
4579        let source = remap_path_to_storage("/photos/hero.jpg", Some("v1"));
4580        match &source {
4581            TransformSourcePayload::Storage { key, .. } => {
4582                assert_eq!(key, "photos/hero.jpg", "leading / must be trimmed");
4583            }
4584            _ => panic!("expected Storage variant after remap"),
4585        }
4586
4587        // Without a leading slash the key must be unchanged.
4588        let source2 = remap_path_to_storage("photos/hero.jpg", Some("v1"));
4589        match &source2 {
4590            TransformSourcePayload::Storage { key, .. } => {
4591                assert_eq!(key, "photos/hero.jpg");
4592            }
4593            _ => panic!("expected Storage variant after remap"),
4594        }
4595
4596        // Both must produce the same versioned hash (same effective key).
4597        let mut cfg = make_test_config();
4598        cfg.storage_backend = super::StorageBackend::S3;
4599        cfg.s3_context = Some(std::sync::Arc::new(super::s3::S3Context::for_test(
4600            "my-bucket",
4601            None,
4602        )));
4603        assert_eq!(
4604            source.versioned_source_hash(&cfg),
4605            source2.versioned_source_hash(&cfg),
4606            "leading-slash and no-leading-slash paths must hash identically after trim",
4607        );
4608    }
4609
4610    #[test]
4611    #[cfg(feature = "s3")]
4612    fn public_by_path_s3_remap_produces_storage_variant() {
4613        // Verify the remap converts Path to Storage with bucket: None.
4614        let source = remap_path_to_storage("/image.png", None);
4615        match source {
4616            TransformSourcePayload::Storage {
4617                bucket,
4618                key,
4619                version,
4620            } => {
4621                assert!(bucket.is_none(), "bucket must be None (use default)");
4622                assert_eq!(key, "image.png");
4623                assert!(version.is_none());
4624            }
4625            _ => panic!("expected Storage variant"),
4626        }
4627    }
4628
4629    // -----------------------------------------------------------------------
4630    // GCS: health endpoint
4631    // -----------------------------------------------------------------------
4632
4633    #[test]
4634    #[cfg(feature = "gcs")]
4635    fn health_ready_gcs_returns_503_when_context_missing() {
4636        let storage = temp_dir("health-gcs-no-ctx");
4637        let mut config = ServerConfig::new(storage.clone(), None);
4638        config.storage_backend = super::StorageBackend::Gcs;
4639        config.gcs_context = None;
4640
4641        let request = super::http_parse::HttpRequest {
4642            method: "GET".to_string(),
4643            target: "/health/ready".to_string(),
4644            version: "HTTP/1.1".to_string(),
4645            headers: Vec::new(),
4646            body: Vec::new(),
4647        };
4648        let response = route_request(request, &config);
4649        let _ = std::fs::remove_dir_all(storage);
4650
4651        assert_eq!(response.status, "503 Service Unavailable");
4652        let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
4653        let checks = body["checks"].as_array().expect("checks array");
4654        assert!(
4655            checks
4656                .iter()
4657                .any(|c| c["name"] == "storageBackend" && c["status"] == "fail"),
4658            "expected gcsClient fail check in {body}",
4659        );
4660    }
4661
4662    #[test]
4663    #[cfg(feature = "gcs")]
4664    fn health_ready_gcs_includes_gcs_client_check() {
4665        let storage = temp_dir("health-gcs-ok");
4666        let mut config = ServerConfig::new(storage.clone(), None);
4667        config.storage_backend = super::StorageBackend::Gcs;
4668        config.gcs_context = Some(std::sync::Arc::new(super::gcs::GcsContext::for_test(
4669            "test-bucket",
4670            None,
4671        )));
4672
4673        let request = super::http_parse::HttpRequest {
4674            method: "GET".to_string(),
4675            target: "/health/ready".to_string(),
4676            version: "HTTP/1.1".to_string(),
4677            headers: Vec::new(),
4678            body: Vec::new(),
4679        };
4680        let response = route_request(request, &config);
4681        let _ = std::fs::remove_dir_all(storage);
4682
4683        // The gcsClient check will report "fail" because there is no real GCS
4684        // endpoint, but the important thing is that the check is present.
4685        let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
4686        let checks = body["checks"].as_array().expect("checks array");
4687        assert!(
4688            checks.iter().any(|c| c["name"] == "storageBackend"),
4689            "expected gcsClient check in {body}",
4690        );
4691    }
4692
4693    // -----------------------------------------------------------------------
4694    // GCS: public by-path remap (leading slash trimmed, Storage variant used)
4695    // -----------------------------------------------------------------------
4696
4697    #[cfg(feature = "gcs")]
4698    fn remap_path_to_storage_gcs(path: &str, version: Option<&str>) -> TransformSourcePayload {
4699        let source = TransformSourcePayload::Path {
4700            path: path.to_string(),
4701            version: version.map(|v| v.to_string()),
4702        };
4703        match source {
4704            TransformSourcePayload::Path { path, version } => TransformSourcePayload::Storage {
4705                bucket: None,
4706                key: path.trim_start_matches('/').to_string(),
4707                version,
4708            },
4709            other => other,
4710        }
4711    }
4712
4713    #[test]
4714    #[cfg(feature = "gcs")]
4715    fn public_by_path_gcs_remap_trims_leading_slash() {
4716        let source = remap_path_to_storage_gcs("/photos/hero.jpg", Some("v1"));
4717        match &source {
4718            TransformSourcePayload::Storage { key, .. } => {
4719                assert_eq!(key, "photos/hero.jpg", "leading / must be trimmed");
4720            }
4721            _ => panic!("expected Storage variant after remap"),
4722        }
4723
4724        let source2 = remap_path_to_storage_gcs("photos/hero.jpg", Some("v1"));
4725        match &source2 {
4726            TransformSourcePayload::Storage { key, .. } => {
4727                assert_eq!(key, "photos/hero.jpg");
4728            }
4729            _ => panic!("expected Storage variant after remap"),
4730        }
4731
4732        let mut cfg = make_test_config();
4733        cfg.storage_backend = super::StorageBackend::Gcs;
4734        cfg.gcs_context = Some(std::sync::Arc::new(super::gcs::GcsContext::for_test(
4735            "my-bucket",
4736            None,
4737        )));
4738        assert_eq!(
4739            source.versioned_source_hash(&cfg),
4740            source2.versioned_source_hash(&cfg),
4741            "leading-slash and no-leading-slash paths must hash identically after trim",
4742        );
4743    }
4744
4745    #[test]
4746    #[cfg(feature = "gcs")]
4747    fn public_by_path_gcs_remap_produces_storage_variant() {
4748        let source = remap_path_to_storage_gcs("/image.png", None);
4749        match source {
4750            TransformSourcePayload::Storage {
4751                bucket,
4752                key,
4753                version,
4754            } => {
4755                assert!(bucket.is_none(), "bucket must be None (use default)");
4756                assert_eq!(key, "image.png");
4757                assert!(version.is_none());
4758            }
4759            _ => panic!("expected Storage variant"),
4760        }
4761    }
4762
4763    // -----------------------------------------------------------------------
4764    // Azure: health endpoint
4765    // -----------------------------------------------------------------------
4766
4767    #[test]
4768    #[cfg(feature = "azure")]
4769    fn health_ready_azure_returns_503_when_context_missing() {
4770        let storage = temp_dir("health-azure-no-ctx");
4771        let mut config = ServerConfig::new(storage.clone(), None);
4772        config.storage_backend = super::StorageBackend::Azure;
4773        config.azure_context = None;
4774
4775        let request = super::http_parse::HttpRequest {
4776            method: "GET".to_string(),
4777            target: "/health/ready".to_string(),
4778            version: "HTTP/1.1".to_string(),
4779            headers: Vec::new(),
4780            body: Vec::new(),
4781        };
4782        let response = route_request(request, &config);
4783        let _ = std::fs::remove_dir_all(storage);
4784
4785        assert_eq!(response.status, "503 Service Unavailable");
4786        let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
4787        let checks = body["checks"].as_array().expect("checks array");
4788        assert!(
4789            checks
4790                .iter()
4791                .any(|c| c["name"] == "storageBackend" && c["status"] == "fail"),
4792            "expected azureClient fail check in {body}",
4793        );
4794    }
4795
4796    #[test]
4797    #[cfg(feature = "azure")]
4798    fn health_ready_azure_includes_azure_client_check() {
4799        let storage = temp_dir("health-azure-ok");
4800        let mut config = ServerConfig::new(storage.clone(), None);
4801        config.storage_backend = super::StorageBackend::Azure;
4802        config.azure_context = Some(std::sync::Arc::new(super::azure::AzureContext::for_test(
4803            "test-bucket",
4804            "http://localhost:10000/devstoreaccount1",
4805        )));
4806
4807        let request = super::http_parse::HttpRequest {
4808            method: "GET".to_string(),
4809            target: "/health/ready".to_string(),
4810            version: "HTTP/1.1".to_string(),
4811            headers: Vec::new(),
4812            body: Vec::new(),
4813        };
4814        let response = route_request(request, &config);
4815        let _ = std::fs::remove_dir_all(storage);
4816
4817        let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
4818        let checks = body["checks"].as_array().expect("checks array");
4819        assert!(
4820            checks.iter().any(|c| c["name"] == "storageBackend"),
4821            "expected azureClient check in {body}",
4822        );
4823    }
4824
4825    #[test]
4826    fn read_request_rejects_json_body_over_1mib() {
4827        let body = vec![b'x'; super::http_parse::MAX_REQUEST_BODY_BYTES + 1];
4828        let content_length = body.len();
4829        let raw = format!(
4830            "POST /images:transform HTTP/1.1\r\n\
4831             Content-Type: application/json\r\n\
4832             Content-Length: {content_length}\r\n\r\n"
4833        );
4834        let mut data = raw.into_bytes();
4835        data.extend_from_slice(&body);
4836        let result = read_request(&mut data.as_slice());
4837        assert!(result.is_err());
4838    }
4839
4840    #[test]
4841    fn read_request_accepts_multipart_body_over_1mib() {
4842        let payload_size = super::http_parse::MAX_REQUEST_BODY_BYTES + 100;
4843        let body_content = vec![b'A'; payload_size];
4844        let boundary = "test-boundary-123";
4845        let mut body = Vec::new();
4846        body.extend_from_slice(format!("--{boundary}\r\nContent-Disposition: form-data; name=\"file\"; filename=\"big.jpg\"\r\n\r\n").as_bytes());
4847        body.extend_from_slice(&body_content);
4848        body.extend_from_slice(format!("\r\n--{boundary}--\r\n").as_bytes());
4849        let content_length = body.len();
4850        let raw = format!(
4851            "POST /images HTTP/1.1\r\n\
4852             Content-Type: multipart/form-data; boundary={boundary}\r\n\
4853             Content-Length: {content_length}\r\n\r\n"
4854        );
4855        let mut data = raw.into_bytes();
4856        data.extend_from_slice(&body);
4857        let result = read_request(&mut data.as_slice());
4858        assert!(
4859            result.is_ok(),
4860            "multipart upload over 1 MiB should be accepted"
4861        );
4862    }
4863
4864    #[test]
4865    fn multipart_boundary_in_payload_does_not_split_part() {
4866        let boundary = "abc123";
4867        let fake_boundary_in_payload = format!("\r\n--{boundary}NOTREAL");
4868        let part_body = format!("before{fake_boundary_in_payload}after");
4869        let body = format!(
4870            "--{boundary}\r\n\
4871             Content-Disposition: form-data; name=\"file\"\r\n\
4872             Content-Type: application/octet-stream\r\n\r\n\
4873             {part_body}\r\n\
4874             --{boundary}--\r\n"
4875        );
4876
4877        let parts = parse_multipart_form_data(body.as_bytes(), boundary)
4878            .expect("should parse despite boundary-like string in payload");
4879        assert_eq!(parts.len(), 1, "should have exactly one part");
4880
4881        let part_data = &body.as_bytes()[parts[0].body_range.clone()];
4882        let part_text = std::str::from_utf8(part_data).unwrap();
4883        assert!(
4884            part_text.contains("NOTREAL"),
4885            "part body should contain the full fake boundary string"
4886        );
4887    }
4888
4889    #[test]
4890    fn multipart_normal_two_parts_still_works() {
4891        let boundary = "testboundary";
4892        let body = format!(
4893            "--{boundary}\r\n\
4894             Content-Disposition: form-data; name=\"field1\"\r\n\r\n\
4895             value1\r\n\
4896             --{boundary}\r\n\
4897             Content-Disposition: form-data; name=\"field2\"\r\n\r\n\
4898             value2\r\n\
4899             --{boundary}--\r\n"
4900        );
4901
4902        let parts = parse_multipart_form_data(body.as_bytes(), boundary)
4903            .expect("should parse two normal parts");
4904        assert_eq!(parts.len(), 2);
4905        assert_eq!(parts[0].name, "field1");
4906        assert_eq!(parts[1].name, "field2");
4907    }
4908
4909    #[test]
4910    #[serial]
4911    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4912    fn test_storage_timeout_default() {
4913        unsafe {
4914            std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4915        }
4916        let config = ServerConfig::from_env().unwrap();
4917        assert_eq!(config.storage_timeout_secs, 30);
4918    }
4919
4920    #[test]
4921    #[serial]
4922    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4923    fn test_storage_timeout_custom() {
4924        unsafe {
4925            std::env::set_var("TRUSS_STORAGE_TIMEOUT_SECS", "60");
4926        }
4927        let config = ServerConfig::from_env().unwrap();
4928        assert_eq!(config.storage_timeout_secs, 60);
4929        unsafe {
4930            std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4931        }
4932    }
4933
4934    #[test]
4935    #[serial]
4936    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4937    fn test_storage_timeout_min_boundary() {
4938        unsafe {
4939            std::env::set_var("TRUSS_STORAGE_TIMEOUT_SECS", "1");
4940        }
4941        let config = ServerConfig::from_env().unwrap();
4942        assert_eq!(config.storage_timeout_secs, 1);
4943        unsafe {
4944            std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4945        }
4946    }
4947
4948    #[test]
4949    #[serial]
4950    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4951    fn test_storage_timeout_max_boundary() {
4952        unsafe {
4953            std::env::set_var("TRUSS_STORAGE_TIMEOUT_SECS", "300");
4954        }
4955        let config = ServerConfig::from_env().unwrap();
4956        assert_eq!(config.storage_timeout_secs, 300);
4957        unsafe {
4958            std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4959        }
4960    }
4961
4962    #[test]
4963    #[serial]
4964    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4965    fn test_storage_timeout_empty_string_uses_default() {
4966        unsafe {
4967            std::env::set_var("TRUSS_STORAGE_TIMEOUT_SECS", "");
4968        }
4969        let config = ServerConfig::from_env().unwrap();
4970        assert_eq!(config.storage_timeout_secs, 30);
4971        unsafe {
4972            std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4973        }
4974    }
4975
4976    #[test]
4977    #[serial]
4978    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4979    fn test_storage_timeout_zero_rejected() {
4980        unsafe {
4981            std::env::set_var("TRUSS_STORAGE_TIMEOUT_SECS", "0");
4982        }
4983        let err = ServerConfig::from_env().unwrap_err();
4984        assert!(
4985            err.to_string().contains("between 1 and 300"),
4986            "error should mention valid range: {err}"
4987        );
4988        unsafe {
4989            std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4990        }
4991    }
4992
4993    #[test]
4994    #[serial]
4995    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4996    fn test_storage_timeout_over_max_rejected() {
4997        unsafe {
4998            std::env::set_var("TRUSS_STORAGE_TIMEOUT_SECS", "301");
4999        }
5000        let err = ServerConfig::from_env().unwrap_err();
5001        assert!(
5002            err.to_string().contains("between 1 and 300"),
5003            "error should mention valid range: {err}"
5004        );
5005        unsafe {
5006            std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
5007        }
5008    }
5009
5010    #[test]
5011    #[serial]
5012    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
5013    fn test_storage_timeout_non_numeric_rejected() {
5014        unsafe {
5015            std::env::set_var("TRUSS_STORAGE_TIMEOUT_SECS", "abc");
5016        }
5017        let err = ServerConfig::from_env().unwrap_err();
5018        assert!(
5019            err.to_string().contains("positive integer"),
5020            "error should mention positive integer: {err}"
5021        );
5022        unsafe {
5023            std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
5024        }
5025    }
5026
5027    #[test]
5028    #[serial]
5029    fn test_max_concurrent_transforms_default() {
5030        unsafe {
5031            std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
5032        }
5033        let config = ServerConfig::from_env().unwrap();
5034        assert_eq!(config.max_concurrent_transforms, 64);
5035    }
5036
5037    #[test]
5038    #[serial]
5039    fn test_max_concurrent_transforms_custom() {
5040        unsafe {
5041            std::env::set_var("TRUSS_MAX_CONCURRENT_TRANSFORMS", "128");
5042        }
5043        let config = ServerConfig::from_env().unwrap();
5044        assert_eq!(config.max_concurrent_transforms, 128);
5045        unsafe {
5046            std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
5047        }
5048    }
5049
5050    #[test]
5051    #[serial]
5052    fn test_max_concurrent_transforms_min_boundary() {
5053        unsafe {
5054            std::env::set_var("TRUSS_MAX_CONCURRENT_TRANSFORMS", "1");
5055        }
5056        let config = ServerConfig::from_env().unwrap();
5057        assert_eq!(config.max_concurrent_transforms, 1);
5058        unsafe {
5059            std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
5060        }
5061    }
5062
5063    #[test]
5064    #[serial]
5065    fn test_max_concurrent_transforms_max_boundary() {
5066        unsafe {
5067            std::env::set_var("TRUSS_MAX_CONCURRENT_TRANSFORMS", "1024");
5068        }
5069        let config = ServerConfig::from_env().unwrap();
5070        assert_eq!(config.max_concurrent_transforms, 1024);
5071        unsafe {
5072            std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
5073        }
5074    }
5075
5076    #[test]
5077    #[serial]
5078    fn test_max_concurrent_transforms_empty_uses_default() {
5079        unsafe {
5080            std::env::set_var("TRUSS_MAX_CONCURRENT_TRANSFORMS", "");
5081        }
5082        let config = ServerConfig::from_env().unwrap();
5083        assert_eq!(config.max_concurrent_transforms, 64);
5084        unsafe {
5085            std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
5086        }
5087    }
5088
5089    #[test]
5090    #[serial]
5091    fn test_max_concurrent_transforms_zero_rejected() {
5092        unsafe {
5093            std::env::set_var("TRUSS_MAX_CONCURRENT_TRANSFORMS", "0");
5094        }
5095        let err = ServerConfig::from_env().unwrap_err();
5096        assert!(
5097            err.to_string().contains("between 1 and 1024"),
5098            "error should mention valid range: {err}"
5099        );
5100        unsafe {
5101            std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
5102        }
5103    }
5104
5105    #[test]
5106    #[serial]
5107    fn test_max_concurrent_transforms_over_max_rejected() {
5108        unsafe {
5109            std::env::set_var("TRUSS_MAX_CONCURRENT_TRANSFORMS", "1025");
5110        }
5111        let err = ServerConfig::from_env().unwrap_err();
5112        assert!(
5113            err.to_string().contains("between 1 and 1024"),
5114            "error should mention valid range: {err}"
5115        );
5116        unsafe {
5117            std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
5118        }
5119    }
5120
5121    #[test]
5122    #[serial]
5123    fn test_max_concurrent_transforms_non_numeric_rejected() {
5124        unsafe {
5125            std::env::set_var("TRUSS_MAX_CONCURRENT_TRANSFORMS", "abc");
5126        }
5127        let err = ServerConfig::from_env().unwrap_err();
5128        assert!(
5129            err.to_string().contains("positive integer"),
5130            "error should mention positive integer: {err}"
5131        );
5132        unsafe {
5133            std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
5134        }
5135    }
5136
5137    #[test]
5138    #[serial]
5139    fn test_transform_deadline_default() {
5140        unsafe {
5141            std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
5142        }
5143        let config = ServerConfig::from_env().unwrap();
5144        assert_eq!(config.transform_deadline_secs, 30);
5145    }
5146
5147    #[test]
5148    #[serial]
5149    fn test_transform_deadline_custom() {
5150        unsafe {
5151            std::env::set_var("TRUSS_TRANSFORM_DEADLINE_SECS", "60");
5152        }
5153        let config = ServerConfig::from_env().unwrap();
5154        assert_eq!(config.transform_deadline_secs, 60);
5155        unsafe {
5156            std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
5157        }
5158    }
5159
5160    #[test]
5161    #[serial]
5162    fn test_transform_deadline_min_boundary() {
5163        unsafe {
5164            std::env::set_var("TRUSS_TRANSFORM_DEADLINE_SECS", "1");
5165        }
5166        let config = ServerConfig::from_env().unwrap();
5167        assert_eq!(config.transform_deadline_secs, 1);
5168        unsafe {
5169            std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
5170        }
5171    }
5172
5173    #[test]
5174    #[serial]
5175    fn test_transform_deadline_max_boundary() {
5176        unsafe {
5177            std::env::set_var("TRUSS_TRANSFORM_DEADLINE_SECS", "300");
5178        }
5179        let config = ServerConfig::from_env().unwrap();
5180        assert_eq!(config.transform_deadline_secs, 300);
5181        unsafe {
5182            std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
5183        }
5184    }
5185
5186    #[test]
5187    #[serial]
5188    fn test_transform_deadline_empty_uses_default() {
5189        unsafe {
5190            std::env::set_var("TRUSS_TRANSFORM_DEADLINE_SECS", "");
5191        }
5192        let config = ServerConfig::from_env().unwrap();
5193        assert_eq!(config.transform_deadline_secs, 30);
5194        unsafe {
5195            std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
5196        }
5197    }
5198
5199    #[test]
5200    #[serial]
5201    fn test_transform_deadline_zero_rejected() {
5202        unsafe {
5203            std::env::set_var("TRUSS_TRANSFORM_DEADLINE_SECS", "0");
5204        }
5205        let err = ServerConfig::from_env().unwrap_err();
5206        assert!(
5207            err.to_string().contains("between 1 and 300"),
5208            "error should mention valid range: {err}"
5209        );
5210        unsafe {
5211            std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
5212        }
5213    }
5214
5215    #[test]
5216    #[serial]
5217    fn test_transform_deadline_over_max_rejected() {
5218        unsafe {
5219            std::env::set_var("TRUSS_TRANSFORM_DEADLINE_SECS", "301");
5220        }
5221        let err = ServerConfig::from_env().unwrap_err();
5222        assert!(
5223            err.to_string().contains("between 1 and 300"),
5224            "error should mention valid range: {err}"
5225        );
5226        unsafe {
5227            std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
5228        }
5229    }
5230
5231    #[test]
5232    #[serial]
5233    fn test_transform_deadline_non_numeric_rejected() {
5234        unsafe {
5235            std::env::set_var("TRUSS_TRANSFORM_DEADLINE_SECS", "abc");
5236        }
5237        let err = ServerConfig::from_env().unwrap_err();
5238        assert!(
5239            err.to_string().contains("positive integer"),
5240            "error should mention positive integer: {err}"
5241        );
5242        unsafe {
5243            std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
5244        }
5245    }
5246
5247    #[test]
5248    #[serial]
5249    #[cfg(feature = "azure")]
5250    fn test_azure_container_env_var_required() {
5251        unsafe {
5252            std::env::set_var("TRUSS_STORAGE_BACKEND", "azure");
5253            std::env::remove_var("TRUSS_AZURE_CONTAINER");
5254        }
5255        let err = ServerConfig::from_env().unwrap_err();
5256        assert!(
5257            err.to_string().contains("TRUSS_AZURE_CONTAINER"),
5258            "error should mention TRUSS_AZURE_CONTAINER: {err}"
5259        );
5260        unsafe {
5261            std::env::remove_var("TRUSS_STORAGE_BACKEND");
5262        }
5263    }
5264
5265    #[test]
5266    fn server_config_debug_redacts_bearer_token_and_signed_url_secret() {
5267        let mut config = ServerConfig::new(
5268            temp_dir("debug-redact"),
5269            Some("super-secret-token-12345".to_string()),
5270        );
5271        config.signed_url_key_id = Some("visible-key-id".to_string());
5272        config.signed_url_secret = Some("super-secret-hmac-key".to_string());
5273        let debug = format!("{config:?}");
5274        assert!(
5275            !debug.contains("super-secret-token-12345"),
5276            "bearer_token leaked in Debug output: {debug}"
5277        );
5278        assert!(
5279            !debug.contains("super-secret-hmac-key"),
5280            "signed_url_secret leaked in Debug output: {debug}"
5281        );
5282        assert!(
5283            debug.contains("[REDACTED]"),
5284            "expected [REDACTED] in Debug output: {debug}"
5285        );
5286        assert!(
5287            debug.contains("visible-key-id"),
5288            "signed_url_key_id should be visible: {debug}"
5289        );
5290    }
5291
5292    #[test]
5293    fn authorize_headers_accepts_correct_bearer_token() {
5294        let config = ServerConfig::new(temp_dir("auth-ok"), Some("correct-token".to_string()));
5295        let headers = vec![(
5296            "authorization".to_string(),
5297            "Bearer correct-token".to_string(),
5298        )];
5299        assert!(super::authorize_request_headers(&headers, &config).is_ok());
5300    }
5301
5302    #[test]
5303    fn authorize_headers_rejects_wrong_bearer_token() {
5304        let config = ServerConfig::new(temp_dir("auth-wrong"), Some("correct-token".to_string()));
5305        let headers = vec![(
5306            "authorization".to_string(),
5307            "Bearer wrong-token".to_string(),
5308        )];
5309        let err = super::authorize_request_headers(&headers, &config).unwrap_err();
5310        assert_eq!(err.status, "401 Unauthorized");
5311    }
5312
5313    #[test]
5314    fn authorize_headers_rejects_missing_header() {
5315        let config = ServerConfig::new(temp_dir("auth-missing"), Some("correct-token".to_string()));
5316        let headers: Vec<(String, String)> = vec![];
5317        let err = super::authorize_request_headers(&headers, &config).unwrap_err();
5318        assert_eq!(err.status, "401 Unauthorized");
5319    }
5320
5321    // ── TransformSlot RAII guard ──────────────────────────────────────
5322
5323    #[test]
5324    fn transform_slot_acquire_succeeds_under_limit() {
5325        use std::sync::Arc;
5326        use std::sync::atomic::AtomicU64;
5327
5328        let counter = Arc::new(AtomicU64::new(0));
5329        let slot = super::TransformSlot::try_acquire(&counter, 2);
5330        assert!(slot.is_some());
5331        assert_eq!(counter.load(Ordering::Relaxed), 1);
5332    }
5333
5334    #[test]
5335    fn transform_slot_acquire_returns_none_at_limit() {
5336        use std::sync::Arc;
5337        use std::sync::atomic::AtomicU64;
5338
5339        let counter = Arc::new(AtomicU64::new(0));
5340        let _s1 = super::TransformSlot::try_acquire(&counter, 1).unwrap();
5341        let s2 = super::TransformSlot::try_acquire(&counter, 1);
5342        assert!(s2.is_none());
5343        // Counter must still be 1 (failed acquire must not leak).
5344        assert_eq!(counter.load(Ordering::Relaxed), 1);
5345    }
5346
5347    #[test]
5348    fn transform_slot_drop_decrements_counter() {
5349        use std::sync::Arc;
5350        use std::sync::atomic::AtomicU64;
5351
5352        let counter = Arc::new(AtomicU64::new(0));
5353        {
5354            let _slot = super::TransformSlot::try_acquire(&counter, 4).unwrap();
5355            assert_eq!(counter.load(Ordering::Relaxed), 1);
5356        }
5357        // After drop the counter must return to zero.
5358        assert_eq!(counter.load(Ordering::Relaxed), 0);
5359    }
5360
5361    #[test]
5362    fn transform_slot_multiple_acquires_up_to_limit() {
5363        use std::sync::Arc;
5364        use std::sync::atomic::AtomicU64;
5365
5366        let counter = Arc::new(AtomicU64::new(0));
5367        let limit = 3u64;
5368        let mut slots = Vec::new();
5369        for _ in 0..limit {
5370            slots.push(super::TransformSlot::try_acquire(&counter, limit).unwrap());
5371        }
5372        assert_eq!(counter.load(Ordering::Relaxed), limit);
5373        // One more must fail.
5374        assert!(super::TransformSlot::try_acquire(&counter, limit).is_none());
5375        assert_eq!(counter.load(Ordering::Relaxed), limit);
5376        // Drop all slots.
5377        slots.clear();
5378        assert_eq!(counter.load(Ordering::Relaxed), 0);
5379    }
5380
5381    // ── Access log via emit_access_log ────────────────────────────────
5382
5383    #[test]
5384    fn emit_access_log_produces_json_with_expected_fields() {
5385        use std::sync::{Arc, Mutex};
5386        use std::time::Instant;
5387
5388        let captured = Arc::new(Mutex::new(String::new()));
5389        let captured_clone = Arc::clone(&captured);
5390        let handler: super::LogHandler =
5391            Arc::new(move |msg: &str| *captured_clone.lock().unwrap() = msg.to_owned());
5392
5393        let mut config = ServerConfig::new(temp_dir("access-log"), None);
5394        config.log_handler = Some(handler);
5395
5396        let start = Instant::now();
5397        super::emit_access_log(
5398            &config,
5399            &super::AccessLogEntry {
5400                request_id: "req-123",
5401                method: "GET",
5402                path: "/image.png",
5403                route: "transform",
5404                status: "200",
5405                start,
5406                cache_status: Some("hit"),
5407                watermark: false,
5408            },
5409        );
5410
5411        let output = captured.lock().unwrap().clone();
5412        let parsed: serde_json::Value = serde_json::from_str(&output).expect("valid JSON");
5413        assert_eq!(parsed["kind"], "access_log");
5414        assert_eq!(parsed["request_id"], "req-123");
5415        assert_eq!(parsed["method"], "GET");
5416        assert_eq!(parsed["path"], "/image.png");
5417        assert_eq!(parsed["route"], "transform");
5418        assert_eq!(parsed["status"], "200");
5419        assert_eq!(parsed["cache_status"], "hit");
5420        assert!(parsed["latency_ms"].is_u64());
5421    }
5422
5423    #[test]
5424    fn emit_access_log_null_cache_status_when_none() {
5425        use std::sync::{Arc, Mutex};
5426        use std::time::Instant;
5427
5428        let captured = Arc::new(Mutex::new(String::new()));
5429        let captured_clone = Arc::clone(&captured);
5430        let handler: super::LogHandler =
5431            Arc::new(move |msg: &str| *captured_clone.lock().unwrap() = msg.to_owned());
5432
5433        let mut config = ServerConfig::new(temp_dir("access-log-none"), None);
5434        config.log_handler = Some(handler);
5435
5436        super::emit_access_log(
5437            &config,
5438            &super::AccessLogEntry {
5439                request_id: "req-456",
5440                method: "POST",
5441                path: "/upload",
5442                route: "upload",
5443                status: "201",
5444                start: Instant::now(),
5445                cache_status: None,
5446                watermark: false,
5447            },
5448        );
5449
5450        let output = captured.lock().unwrap().clone();
5451        let parsed: serde_json::Value = serde_json::from_str(&output).expect("valid JSON");
5452        assert!(parsed["cache_status"].is_null());
5453    }
5454
5455    // ── X-Request-Id header ───────────────────────────────────────────
5456
5457    #[test]
5458    fn x_request_id_is_extracted_from_incoming_headers() {
5459        let headers = vec![
5460            ("host".to_string(), "localhost".to_string()),
5461            ("x-request-id".to_string(), "custom-id-abc".to_string()),
5462        ];
5463        assert_eq!(
5464            super::extract_request_id(&headers),
5465            Some("custom-id-abc".to_string())
5466        );
5467    }
5468
5469    #[test]
5470    fn x_request_id_not_extracted_when_empty() {
5471        let headers = vec![("x-request-id".to_string(), "".to_string())];
5472        assert!(super::extract_request_id(&headers).is_none());
5473    }
5474
5475    #[test]
5476    fn x_request_id_not_extracted_when_absent() {
5477        let headers = vec![("host".to_string(), "localhost".to_string())];
5478        assert!(super::extract_request_id(&headers).is_none());
5479    }
5480
5481    // ── Cache status extraction ───────────────────────────────────────
5482
5483    #[test]
5484    fn cache_status_hit_detected() {
5485        let headers: Vec<(String, String)> =
5486            vec![("Cache-Status".to_string(), "\"truss\"; hit".to_string())];
5487        assert_eq!(super::extract_cache_status(&headers), Some("hit"));
5488    }
5489
5490    #[test]
5491    fn cache_status_miss_detected() {
5492        let headers: Vec<(String, String)> = vec![(
5493            "Cache-Status".to_string(),
5494            "\"truss\"; fwd=miss".to_string(),
5495        )];
5496        assert_eq!(super::extract_cache_status(&headers), Some("miss"));
5497    }
5498
5499    #[test]
5500    fn cache_status_none_when_header_absent() {
5501        let headers: Vec<(String, String)> =
5502            vec![("Content-Type".to_string(), "image/png".to_string())];
5503        assert!(super::extract_cache_status(&headers).is_none());
5504    }
5505
5506    #[test]
5507    fn signing_keys_populated_by_with_signed_url_credentials() {
5508        let config = ServerConfig::new(temp_dir("signing-keys-populated"), None)
5509            .with_signed_url_credentials("key-alpha", "secret-alpha");
5510
5511        assert_eq!(
5512            config.signing_keys.get("key-alpha").map(String::as_str),
5513            Some("secret-alpha")
5514        );
5515    }
5516
5517    #[test]
5518    fn authorize_signed_request_accepts_multiple_keys() {
5519        let mut extra = HashMap::new();
5520        extra.insert("key-beta".to_string(), "secret-beta".to_string());
5521        let config = ServerConfig::new(temp_dir("multi-key-accept"), None)
5522            .with_signed_url_credentials("key-alpha", "secret-alpha")
5523            .with_signing_keys(extra);
5524
5525        // Sign with key-alpha
5526        let request_alpha = signed_public_request(
5527            "/images/by-path?path=%2Fimage.png&keyId=key-alpha&expires=4102444800&format=jpeg",
5528            "assets.example.com",
5529            "secret-alpha",
5530        );
5531        let query_alpha =
5532            super::auth::parse_query_params(&request_alpha).expect("parse query alpha");
5533        authorize_signed_request(&request_alpha, &query_alpha, &config)
5534            .expect("key-alpha should be accepted");
5535
5536        // Sign with key-beta
5537        let request_beta = signed_public_request(
5538            "/images/by-path?path=%2Fimage.png&keyId=key-beta&expires=4102444800&format=jpeg",
5539            "assets.example.com",
5540            "secret-beta",
5541        );
5542        let query_beta = super::auth::parse_query_params(&request_beta).expect("parse query beta");
5543        authorize_signed_request(&request_beta, &query_beta, &config)
5544            .expect("key-beta should be accepted");
5545    }
5546
5547    #[test]
5548    fn authorize_signed_request_rejects_unknown_key() {
5549        let config = ServerConfig::new(temp_dir("unknown-key-reject"), None)
5550            .with_signed_url_credentials("key-alpha", "secret-alpha");
5551
5552        let request = signed_public_request(
5553            "/images/by-path?path=%2Fimage.png&keyId=key-unknown&expires=4102444800&format=jpeg",
5554            "assets.example.com",
5555            "secret-unknown",
5556        );
5557        let query = super::auth::parse_query_params(&request).expect("parse query");
5558        authorize_signed_request(&request, &query, &config)
5559            .expect_err("unknown key should be rejected");
5560    }
5561
5562    // ── Security: X-Request-Id CRLF injection prevention ─────────────
5563
5564    #[test]
5565    fn x_request_id_rejects_crlf_injection() {
5566        let headers = vec![(
5567            "x-request-id".to_string(),
5568            "evil\r\nX-Injected: true".to_string(),
5569        )];
5570        assert!(
5571            super::extract_request_id(&headers).is_none(),
5572            "CRLF in request ID must be rejected"
5573        );
5574    }
5575
5576    #[test]
5577    fn x_request_id_rejects_lone_cr() {
5578        let headers = vec![("x-request-id".to_string(), "evil\rid".to_string())];
5579        assert!(super::extract_request_id(&headers).is_none());
5580    }
5581
5582    #[test]
5583    fn x_request_id_rejects_lone_lf() {
5584        let headers = vec![("x-request-id".to_string(), "evil\nid".to_string())];
5585        assert!(super::extract_request_id(&headers).is_none());
5586    }
5587
5588    #[test]
5589    fn x_request_id_rejects_nul_byte() {
5590        let headers = vec![("x-request-id".to_string(), "evil\0id".to_string())];
5591        assert!(super::extract_request_id(&headers).is_none());
5592    }
5593
5594    #[test]
5595    fn x_request_id_accepts_normal_uuid() {
5596        let headers = vec![(
5597            "x-request-id".to_string(),
5598            "550e8400-e29b-41d4-a716-446655440000".to_string(),
5599        )];
5600        assert_eq!(
5601            super::extract_request_id(&headers),
5602            Some("550e8400-e29b-41d4-a716-446655440000".to_string())
5603        );
5604    }
5605
5606    // ── Characterization: ServerConfig defaults ──────────────────────
5607
5608    #[test]
5609    fn server_config_new_has_expected_defaults() {
5610        let root = temp_dir("cfg-defaults");
5611        let config = ServerConfig::new(root.clone(), None);
5612        assert_eq!(config.storage_root, root);
5613        assert!(config.bearer_token.is_none());
5614        assert!(config.signed_url_secret.is_none());
5615        assert!(config.signing_keys.is_empty());
5616        assert!(config.presets.read().unwrap().is_empty());
5617        assert_eq!(
5618            config.max_concurrent_transforms,
5619            DEFAULT_MAX_CONCURRENT_TRANSFORMS
5620        );
5621        assert_eq!(
5622            config.public_max_age_seconds,
5623            DEFAULT_PUBLIC_MAX_AGE_SECONDS
5624        );
5625        assert_eq!(
5626            config.public_stale_while_revalidate_seconds,
5627            DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS
5628        );
5629        assert!(!config.allow_insecure_url_sources);
5630    }
5631
5632    #[test]
5633    fn server_config_builder_with_signed_url_credentials_overwrites() {
5634        let root = temp_dir("cfg-builder");
5635        let config = ServerConfig::new(root, None)
5636            .with_signed_url_credentials("key1", "secret1")
5637            .with_signed_url_credentials("key2", "secret2");
5638        assert!(config.signing_keys.contains_key("key1"));
5639        assert!(config.signing_keys.contains_key("key2"));
5640    }
5641
5642    // ── Characterization: route_request classification ───────────────
5643
5644    #[test]
5645    fn route_request_returns_not_found_for_unknown_path() {
5646        let root = temp_dir("route-unknown");
5647        let config = ServerConfig::new(root, None);
5648        let request = HttpRequest {
5649            method: "GET".to_string(),
5650            target: "/nonexistent".to_string(),
5651            version: "HTTP/1.1".to_string(),
5652            headers: vec![("host".to_string(), "localhost".to_string())],
5653            body: Vec::new(),
5654        };
5655        let response = route_request(request, &config);
5656        assert_eq!(response.status, "404 Not Found");
5657    }
5658
5659    #[test]
5660    fn route_request_health_returns_200() {
5661        let root = temp_dir("route-health");
5662        let config = ServerConfig::new(root, None);
5663        let request = HttpRequest {
5664            method: "GET".to_string(),
5665            target: "/health".to_string(),
5666            version: "HTTP/1.1".to_string(),
5667            headers: vec![("host".to_string(), "localhost".to_string())],
5668            body: Vec::new(),
5669        };
5670        let response = route_request(request, &config);
5671        assert_eq!(response.status, "200 OK");
5672    }
5673
5674    // ── Characterization: TransformSlot thread safety ────────────────
5675
5676    #[test]
5677    fn transform_slot_concurrent_acquire_respects_limit() {
5678        use std::sync::Arc;
5679        use std::sync::Barrier;
5680        use std::sync::atomic::AtomicU64;
5681
5682        let counter = Arc::new(AtomicU64::new(0));
5683        let limit = 4u64;
5684        let num_threads = 16;
5685        let barrier = Arc::new(Barrier::new(num_threads));
5686        let acquired = Arc::new(AtomicU64::new(0));
5687
5688        let handles: Vec<_> = (0..num_threads)
5689            .map(|_| {
5690                let counter = Arc::clone(&counter);
5691                let barrier = Arc::clone(&barrier);
5692                let acquired = Arc::clone(&acquired);
5693                thread::spawn(move || {
5694                    barrier.wait();
5695                    if let Some(_slot) = super::TransformSlot::try_acquire(&counter, limit) {
5696                        acquired.fetch_add(1, Ordering::Relaxed);
5697                        thread::sleep(Duration::from_millis(10));
5698                    }
5699                })
5700            })
5701            .collect();
5702
5703        for h in handles {
5704            h.join().unwrap();
5705        }
5706        assert_eq!(counter.load(Ordering::Relaxed), 0);
5707    }
5708
5709    #[test]
5710    #[serial]
5711    fn test_max_input_pixels_default() {
5712        unsafe {
5713            std::env::remove_var("TRUSS_MAX_INPUT_PIXELS");
5714        }
5715        let config = ServerConfig::from_env().unwrap();
5716        assert_eq!(config.max_input_pixels, 40_000_000);
5717    }
5718
5719    #[test]
5720    #[serial]
5721    fn test_max_input_pixels_custom() {
5722        unsafe {
5723            std::env::set_var("TRUSS_MAX_INPUT_PIXELS", "10000000");
5724        }
5725        let config = ServerConfig::from_env().unwrap();
5726        assert_eq!(config.max_input_pixels, 10_000_000);
5727        unsafe {
5728            std::env::remove_var("TRUSS_MAX_INPUT_PIXELS");
5729        }
5730    }
5731
5732    #[test]
5733    #[serial]
5734    fn test_max_input_pixels_min_boundary() {
5735        unsafe {
5736            std::env::set_var("TRUSS_MAX_INPUT_PIXELS", "1");
5737        }
5738        let config = ServerConfig::from_env().unwrap();
5739        assert_eq!(config.max_input_pixels, 1);
5740        unsafe {
5741            std::env::remove_var("TRUSS_MAX_INPUT_PIXELS");
5742        }
5743    }
5744
5745    #[test]
5746    #[serial]
5747    fn test_max_input_pixels_max_boundary() {
5748        unsafe {
5749            std::env::set_var("TRUSS_MAX_INPUT_PIXELS", "100000000");
5750        }
5751        let config = ServerConfig::from_env().unwrap();
5752        assert_eq!(config.max_input_pixels, 100_000_000);
5753        unsafe {
5754            std::env::remove_var("TRUSS_MAX_INPUT_PIXELS");
5755        }
5756    }
5757
5758    #[test]
5759    #[serial]
5760    fn test_max_input_pixels_empty_uses_default() {
5761        unsafe {
5762            std::env::set_var("TRUSS_MAX_INPUT_PIXELS", "");
5763        }
5764        let config = ServerConfig::from_env().unwrap();
5765        assert_eq!(config.max_input_pixels, 40_000_000);
5766        unsafe {
5767            std::env::remove_var("TRUSS_MAX_INPUT_PIXELS");
5768        }
5769    }
5770
5771    #[test]
5772    #[serial]
5773    fn test_max_input_pixels_zero_rejected() {
5774        unsafe {
5775            std::env::set_var("TRUSS_MAX_INPUT_PIXELS", "0");
5776        }
5777        let err = ServerConfig::from_env().unwrap_err();
5778        assert!(err.to_string().contains("TRUSS_MAX_INPUT_PIXELS"));
5779        unsafe {
5780            std::env::remove_var("TRUSS_MAX_INPUT_PIXELS");
5781        }
5782    }
5783
5784    #[test]
5785    #[serial]
5786    fn test_max_input_pixels_over_max_rejected() {
5787        unsafe {
5788            std::env::set_var("TRUSS_MAX_INPUT_PIXELS", "100000001");
5789        }
5790        let err = ServerConfig::from_env().unwrap_err();
5791        assert!(err.to_string().contains("TRUSS_MAX_INPUT_PIXELS"));
5792        unsafe {
5793            std::env::remove_var("TRUSS_MAX_INPUT_PIXELS");
5794        }
5795    }
5796
5797    #[test]
5798    #[serial]
5799    fn test_max_input_pixels_non_numeric_rejected() {
5800        unsafe {
5801            std::env::set_var("TRUSS_MAX_INPUT_PIXELS", "abc");
5802        }
5803        let err = ServerConfig::from_env().unwrap_err();
5804        assert!(err.to_string().contains("TRUSS_MAX_INPUT_PIXELS"));
5805        unsafe {
5806            std::env::remove_var("TRUSS_MAX_INPUT_PIXELS");
5807        }
5808    }
5809
5810    #[test]
5811    #[serial]
5812    fn test_max_upload_bytes_default() {
5813        unsafe {
5814            std::env::remove_var("TRUSS_MAX_UPLOAD_BYTES");
5815        }
5816        let config = ServerConfig::from_env().unwrap();
5817        assert_eq!(config.max_upload_bytes, 100 * 1024 * 1024);
5818    }
5819
5820    #[test]
5821    #[serial]
5822    fn test_max_upload_bytes_custom() {
5823        unsafe {
5824            std::env::set_var("TRUSS_MAX_UPLOAD_BYTES", "5242880");
5825        }
5826        let config = ServerConfig::from_env().unwrap();
5827        assert_eq!(config.max_upload_bytes, 5 * 1024 * 1024);
5828        unsafe {
5829            std::env::remove_var("TRUSS_MAX_UPLOAD_BYTES");
5830        }
5831    }
5832
5833    #[test]
5834    #[serial]
5835    fn test_max_upload_bytes_zero_rejected() {
5836        unsafe {
5837            std::env::set_var("TRUSS_MAX_UPLOAD_BYTES", "0");
5838        }
5839        let err = ServerConfig::from_env().unwrap_err();
5840        assert!(err.to_string().contains("TRUSS_MAX_UPLOAD_BYTES"));
5841        unsafe {
5842            std::env::remove_var("TRUSS_MAX_UPLOAD_BYTES");
5843        }
5844    }
5845
5846    #[test]
5847    #[serial]
5848    fn test_max_upload_bytes_non_numeric_rejected() {
5849        unsafe {
5850            std::env::set_var("TRUSS_MAX_UPLOAD_BYTES", "abc");
5851        }
5852        let err = ServerConfig::from_env().unwrap_err();
5853        assert!(err.to_string().contains("TRUSS_MAX_UPLOAD_BYTES"));
5854        unsafe {
5855            std::env::remove_var("TRUSS_MAX_UPLOAD_BYTES");
5856        }
5857    }
5858
5859    #[test]
5860    fn max_body_for_multipart_uses_custom_upload_limit() {
5861        let headers = vec![(
5862            "content-type".to_string(),
5863            "multipart/form-data; boundary=abc".to_string(),
5864        )];
5865        let custom_limit = 5 * 1024 * 1024;
5866        assert_eq!(
5867            super::http_parse::max_body_for_headers(&headers, custom_limit),
5868            custom_limit
5869        );
5870    }
5871
5872    #[test]
5873    fn health_includes_max_input_pixels() {
5874        let storage = temp_dir("health-pixels");
5875        let request = HttpRequest {
5876            method: "GET".to_string(),
5877            target: "/health".to_string(),
5878            version: "HTTP/1.1".to_string(),
5879            headers: Vec::new(),
5880            body: Vec::new(),
5881        };
5882
5883        let config = ServerConfig::new(storage, None);
5884        let response = route_request(request, &config);
5885
5886        assert_eq!(response.status, "200 OK");
5887        let body: serde_json::Value =
5888            serde_json::from_slice(&response.body).expect("parse health body");
5889        assert_eq!(body["maxInputPixels"], 40_000_000);
5890    }
5891
5892    #[test]
5893    fn health_includes_transform_capacity_details() {
5894        let storage = temp_dir("health-capacity");
5895        let config = ServerConfig::new(storage, None);
5896        let response = route_request(
5897            HttpRequest {
5898                method: "GET".to_string(),
5899                target: "/health".to_string(),
5900                version: "HTTP/1.1".to_string(),
5901                headers: Vec::new(),
5902                body: Vec::new(),
5903            },
5904            &config,
5905        );
5906        let body: serde_json::Value =
5907            serde_json::from_slice(&response.body).expect("parse health body");
5908        let checks = body["checks"].as_array().expect("checks array");
5909        let capacity = checks
5910            .iter()
5911            .find(|c| c["name"] == "transformCapacity")
5912            .expect("transformCapacity check");
5913        assert_eq!(capacity["current"], 0);
5914        assert_eq!(capacity["max"], 64);
5915    }
5916
5917    #[cfg(target_os = "linux")]
5918    #[test]
5919    fn process_rss_bytes_returns_some() {
5920        let rss = super::process_rss_bytes();
5921        assert!(rss.is_some());
5922        assert!(rss.unwrap() > 0);
5923    }
5924
5925    #[cfg(target_os = "linux")]
5926    #[test]
5927    fn disk_free_bytes_returns_some_for_existing_dir() {
5928        let dir = temp_dir("disk-free");
5929        let free = super::disk_free_bytes(&dir);
5930        assert!(free.is_some());
5931        assert!(free.unwrap() > 0);
5932    }
5933
5934    #[test]
5935    fn health_ready_returns_503_when_memory_exceeded() {
5936        let storage = temp_dir("health-mem");
5937        let mut config = ServerConfig::new(storage, None);
5938        // Set threshold to 1 byte — guaranteed to be exceeded.
5939        config.health_max_memory_bytes = Some(1);
5940        let response = route_request(
5941            HttpRequest {
5942                method: "GET".to_string(),
5943                target: "/health/ready".to_string(),
5944                version: "HTTP/1.1".to_string(),
5945                headers: Vec::new(),
5946                body: Vec::new(),
5947            },
5948            &config,
5949        );
5950        // On Linux, RSS > 1 byte → 503. On other platforms, memory check
5951        // is skipped so the response is 200.
5952        if cfg!(target_os = "linux") {
5953            assert_eq!(response.status, "503 Service Unavailable");
5954        }
5955    }
5956
5957    #[test]
5958    fn health_includes_memory_usage_on_linux() {
5959        let storage = temp_dir("health-mem-report");
5960        let mut config = ServerConfig::new(storage, None);
5961        config.health_max_memory_bytes = Some(u64::MAX);
5962        let response = route_request(
5963            HttpRequest {
5964                method: "GET".to_string(),
5965                target: "/health".to_string(),
5966                version: "HTTP/1.1".to_string(),
5967                headers: Vec::new(),
5968                body: Vec::new(),
5969            },
5970            &config,
5971        );
5972        let body: serde_json::Value =
5973            serde_json::from_slice(&response.body).expect("parse health body");
5974        if cfg!(target_os = "linux") {
5975            let checks = body["checks"].as_array().expect("checks array");
5976            let mem = checks
5977                .iter()
5978                .find(|c| c["name"] == "memoryUsage")
5979                .expect("memoryUsage check");
5980            assert_eq!(mem["status"], "ok");
5981            assert!(mem["rssBytes"].as_u64().unwrap() > 0);
5982        }
5983    }
5984
5985    #[cfg(target_os = "linux")]
5986    #[test]
5987    fn disk_free_bytes_returns_none_for_nonexistent_path() {
5988        let free = super::disk_free_bytes(std::path::Path::new("/nonexistent/path/xyz"));
5989        assert!(free.is_none());
5990    }
5991
5992    #[test]
5993    fn health_ready_503_body_contains_fail_status() {
5994        let storage = temp_dir("health-ready-body");
5995        std::fs::remove_dir_all(&storage).ok();
5996        let config = ServerConfig::new(storage, None);
5997        let response = route_request(
5998            HttpRequest {
5999                method: "GET".to_string(),
6000                target: "/health/ready".to_string(),
6001                version: "HTTP/1.1".to_string(),
6002                headers: Vec::new(),
6003                body: Vec::new(),
6004            },
6005            &config,
6006        );
6007        assert_eq!(response.status, "503 Service Unavailable");
6008        let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
6009        assert_eq!(body["status"], "fail");
6010        let checks = body["checks"].as_array().expect("checks array");
6011        let storage_check = checks
6012            .iter()
6013            .find(|c| c["name"] == "storageRoot")
6014            .expect("storageRoot check");
6015        assert_eq!(storage_check["status"], "fail");
6016    }
6017
6018    #[test]
6019    fn health_ready_cache_disk_free_shown_when_cache_root_set() {
6020        let storage = temp_dir("health-ready-cache-disk");
6021        let cache = temp_dir("health-ready-cache-disk-cache");
6022        let mut config = ServerConfig::new(storage, None).with_cache_root(cache);
6023        config.health_cache_min_free_bytes = Some(1);
6024        let response = route_request(
6025            HttpRequest {
6026                method: "GET".to_string(),
6027                target: "/health/ready".to_string(),
6028                version: "HTTP/1.1".to_string(),
6029                headers: Vec::new(),
6030                body: Vec::new(),
6031            },
6032            &config,
6033        );
6034        let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
6035        let checks = body["checks"].as_array().expect("checks array");
6036        let disk_check = checks
6037            .iter()
6038            .find(|c| c["name"] == "cacheDiskFree")
6039            .expect("cacheDiskFree check");
6040        assert_eq!(disk_check["status"], "ok");
6041        if cfg!(target_os = "linux") {
6042            assert!(disk_check["freeBytes"].as_u64().is_some());
6043        }
6044        assert_eq!(disk_check["thresholdBytes"], 1);
6045    }
6046
6047    #[test]
6048    fn health_ready_no_cache_disk_free_without_cache_root() {
6049        let storage = temp_dir("health-ready-no-cache");
6050        let config = ServerConfig::new(storage, None);
6051        let response = route_request(
6052            HttpRequest {
6053                method: "GET".to_string(),
6054                target: "/health/ready".to_string(),
6055                version: "HTTP/1.1".to_string(),
6056                headers: Vec::new(),
6057                body: Vec::new(),
6058            },
6059            &config,
6060        );
6061        let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
6062        let checks = body["checks"].as_array().expect("checks array");
6063        assert!(
6064            checks.iter().all(|c| c["name"] != "cacheDiskFree"),
6065            "cacheDiskFree should not appear without cache_root"
6066        );
6067    }
6068
6069    #[test]
6070    fn health_ready_memory_check_includes_details() {
6071        let storage = temp_dir("health-ready-mem-detail");
6072        let mut config = ServerConfig::new(storage, None);
6073        config.health_max_memory_bytes = Some(u64::MAX);
6074        let response = route_request(
6075            HttpRequest {
6076                method: "GET".to_string(),
6077                target: "/health/ready".to_string(),
6078                version: "HTTP/1.1".to_string(),
6079                headers: Vec::new(),
6080                body: Vec::new(),
6081            },
6082            &config,
6083        );
6084        let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
6085        let checks = body["checks"].as_array().expect("checks array");
6086        let mem = checks.iter().find(|c| c["name"] == "memoryUsage");
6087        if cfg!(target_os = "linux") {
6088            let mem = mem.expect("memoryUsage check present on Linux");
6089            assert_eq!(mem["status"], "ok");
6090            assert_eq!(mem["thresholdBytes"], u64::MAX);
6091            assert!(mem["rssBytes"].as_u64().is_some());
6092        } else {
6093            assert!(mem.is_none(), "memoryUsage should be absent on non-Linux");
6094        }
6095    }
6096
6097    // ── graceful shutdown: draining flag ─────────────────────────────
6098
6099    #[test]
6100    fn health_ready_returns_503_when_draining() {
6101        let storage = temp_dir("health-ready-draining");
6102        let config = ServerConfig::new(storage, None);
6103        config.draining.store(true, Ordering::Relaxed);
6104
6105        let response = route_request(
6106            HttpRequest {
6107                method: "GET".to_string(),
6108                target: "/health/ready".to_string(),
6109                version: "HTTP/1.1".to_string(),
6110                headers: Vec::new(),
6111                body: Vec::new(),
6112            },
6113            &config,
6114        );
6115
6116        assert_eq!(response.status, "503 Service Unavailable");
6117        let body: serde_json::Value =
6118            serde_json::from_slice(&response.body).expect("parse ready body");
6119        assert_eq!(body["status"], "fail");
6120        let checks = body["checks"].as_array().expect("checks array");
6121        assert!(
6122            checks
6123                .iter()
6124                .any(|c| c["name"] == "draining" && c["status"] == "fail")
6125        );
6126    }
6127
6128    #[test]
6129    fn health_ready_returns_ok_when_not_draining() {
6130        let storage = temp_dir("health-ready-not-draining");
6131        let config = ServerConfig::new(storage, None);
6132        // draining is false by default.
6133        let response = route_request(
6134            HttpRequest {
6135                method: "GET".to_string(),
6136                target: "/health/ready".to_string(),
6137                version: "HTTP/1.1".to_string(),
6138                headers: Vec::new(),
6139                body: Vec::new(),
6140            },
6141            &config,
6142        );
6143
6144        assert_eq!(response.status, "200 OK");
6145        let body: serde_json::Value =
6146            serde_json::from_slice(&response.body).expect("parse ready body");
6147        assert_eq!(body["status"], "ok");
6148        // Should not have a draining check entry.
6149        let checks = body["checks"].as_array().expect("checks array");
6150        assert!(!checks.iter().any(|c| c["name"] == "draining"));
6151    }
6152
6153    // ── Drain during normal request processing (m10) ─────────────
6154
6155    #[test]
6156    fn health_live_returns_200_while_draining() {
6157        let storage = temp_dir("live-draining");
6158        let config = ServerConfig::new(storage, None);
6159        config.draining.store(true, Ordering::Relaxed);
6160
6161        let response = route_request(
6162            HttpRequest {
6163                method: "GET".to_string(),
6164                target: "/health/live".to_string(),
6165                version: "HTTP/1.1".to_string(),
6166                headers: Vec::new(),
6167                body: Vec::new(),
6168            },
6169            &config,
6170        );
6171
6172        // Liveness should always return 200 even when draining — only
6173        // readiness returns 503.
6174        assert_eq!(response.status, "200 OK");
6175    }
6176
6177    #[test]
6178    fn normal_request_processed_while_draining() {
6179        let storage = temp_dir("normal-draining");
6180        let config = ServerConfig::new(storage, None);
6181        config.draining.store(true, Ordering::Relaxed);
6182
6183        // A non-health, non-image request should still be routed (e.g. 404
6184        // because the path doesn't match any route) — it should NOT get a
6185        // 503 just because the server is draining.
6186        let response = route_request(
6187            HttpRequest {
6188                method: "GET".to_string(),
6189                target: "/nonexistent".to_string(),
6190                version: "HTTP/1.1".to_string(),
6191                headers: Vec::new(),
6192                body: Vec::new(),
6193            },
6194            &config,
6195        );
6196
6197        // The path doesn't match any route, so we get 404 — NOT 503.
6198        assert_eq!(response.status, "404 Not Found");
6199    }
6200
6201    // ── preset hot-reload watcher ────────────────────────────────────
6202
6203    #[test]
6204    fn preset_watcher_reloads_on_file_change() {
6205        use std::sync::Arc;
6206        use std::sync::atomic::{AtomicBool, Ordering};
6207
6208        let dir = std::env::temp_dir().join(format!(
6209            "truss_test_watcher_{}",
6210            std::time::SystemTime::UNIX_EPOCH
6211                .elapsed()
6212                .unwrap()
6213                .as_nanos()
6214        ));
6215        std::fs::create_dir_all(&dir).unwrap();
6216        let path = dir.join("presets.json");
6217        std::fs::write(&path, r#"{"thumb":{"width":100}}"#).unwrap();
6218
6219        let presets = Arc::new(std::sync::RwLock::new({
6220            let mut m = std::collections::HashMap::new();
6221            m.insert(
6222                "thumb".to_string(),
6223                TransformOptionsPayload {
6224                    width: Some(100),
6225                    height: None,
6226                    fit: None,
6227                    position: None,
6228                    format: None,
6229                    quality: None,
6230                    background: None,
6231                    rotate: None,
6232                    auto_orient: None,
6233                    strip_metadata: None,
6234                    preserve_exif: None,
6235                    crop: None,
6236                    blur: None,
6237                    sharpen: None,
6238                },
6239            );
6240            m
6241        }));
6242        let draining = Arc::new(AtomicBool::new(false));
6243        let config = Arc::new(ServerConfig::new(dir.clone(), None));
6244
6245        let presets_clone = Arc::clone(&presets);
6246        let draining_clone = Arc::clone(&draining);
6247        let config_clone = Arc::clone(&config);
6248        let path_clone = path.clone();
6249
6250        let handle = std::thread::spawn(move || {
6251            super::preset_watcher(presets_clone, path_clone, draining_clone, config_clone);
6252        });
6253
6254        // Wait a moment, then update the file with a new mtime.
6255        std::thread::sleep(std::time::Duration::from_millis(100));
6256        // Ensure a different mtime by sleeping briefly.
6257        std::thread::sleep(std::time::Duration::from_secs(1));
6258        std::fs::write(&path, r#"{"thumb":{"width":200},"banner":{"width":800}}"#).unwrap();
6259
6260        // Wait for the watcher to pick up the change (poll interval is 5s).
6261        std::thread::sleep(std::time::Duration::from_secs(6));
6262
6263        // Verify updated presets.
6264        {
6265            let p = presets.read().unwrap();
6266            assert_eq!(p.len(), 2, "expected 2 presets after reload");
6267            assert_eq!(p["thumb"].width, Some(200));
6268            assert_eq!(p["banner"].width, Some(800));
6269        }
6270
6271        // Stop the watcher.
6272        draining.store(true, Ordering::Relaxed);
6273        handle.join().unwrap();
6274
6275        std::fs::remove_dir_all(&dir).unwrap();
6276    }
6277
6278    #[test]
6279    fn preset_watcher_keeps_old_presets_on_invalid_file() {
6280        use std::sync::Arc;
6281        use std::sync::atomic::{AtomicBool, Ordering};
6282
6283        let dir = std::env::temp_dir().join(format!(
6284            "truss_test_watcher_invalid_{}",
6285            std::time::SystemTime::UNIX_EPOCH
6286                .elapsed()
6287                .unwrap()
6288                .as_nanos()
6289        ));
6290        std::fs::create_dir_all(&dir).unwrap();
6291        let path = dir.join("presets.json");
6292        std::fs::write(&path, r#"{"thumb":{"width":100}}"#).unwrap();
6293
6294        let presets = Arc::new(std::sync::RwLock::new({
6295            let mut m = std::collections::HashMap::new();
6296            m.insert(
6297                "thumb".to_string(),
6298                TransformOptionsPayload {
6299                    width: Some(100),
6300                    height: None,
6301                    fit: None,
6302                    position: None,
6303                    format: None,
6304                    quality: None,
6305                    background: None,
6306                    rotate: None,
6307                    auto_orient: None,
6308                    strip_metadata: None,
6309                    preserve_exif: None,
6310                    crop: None,
6311                    blur: None,
6312                    sharpen: None,
6313                },
6314            );
6315            m
6316        }));
6317        let draining = Arc::new(AtomicBool::new(false));
6318        let config = Arc::new(ServerConfig::new(dir.clone(), None));
6319
6320        let presets_clone = Arc::clone(&presets);
6321        let draining_clone = Arc::clone(&draining);
6322        let config_clone = Arc::clone(&config);
6323        let path_clone = path.clone();
6324
6325        let handle = std::thread::spawn(move || {
6326            super::preset_watcher(presets_clone, path_clone, draining_clone, config_clone);
6327        });
6328
6329        // Write invalid JSON after a brief delay.
6330        std::thread::sleep(std::time::Duration::from_millis(100));
6331        std::thread::sleep(std::time::Duration::from_secs(1));
6332        std::fs::write(&path, "invalid json!!!").unwrap();
6333
6334        // Wait for the watcher to process.
6335        std::thread::sleep(std::time::Duration::from_secs(6));
6336
6337        // Original presets should still be in place.
6338        {
6339            let p = presets.read().unwrap();
6340            assert_eq!(p.len(), 1, "presets should not change on invalid file");
6341            assert_eq!(p["thumb"].width, Some(100));
6342        }
6343
6344        draining.store(true, Ordering::Relaxed);
6345        handle.join().unwrap();
6346
6347        std::fs::remove_dir_all(&dir).unwrap();
6348    }
6349}