Skip to main content

truss/adapters/server/
mod.rs

1mod auth;
2#[cfg(feature = "azure")]
3pub mod azure;
4mod cache;
5#[cfg(feature = "gcs")]
6pub mod gcs;
7mod http_parse;
8mod metrics;
9mod multipart;
10mod negotiate;
11mod remote;
12mod response;
13#[cfg(feature = "s3")]
14pub mod s3;
15
16use auth::{
17    authorize_request, authorize_request_headers, authorize_signed_request,
18    canonical_query_without_signature, extend_transform_query, parse_optional_bool_query,
19    parse_optional_float_query, parse_optional_integer_query, parse_optional_u8_query,
20    parse_query_params, required_query_param, signed_source_query, url_authority,
21    validate_public_query_names,
22};
23use cache::{CacheLookup, TransformCache, compute_cache_key, try_versioned_cache_lookup};
24use http_parse::{
25    HttpRequest, parse_named, parse_optional_named, read_request_body, read_request_headers,
26    request_has_json_content_type,
27};
28use metrics::{
29    CACHE_HITS_TOTAL, CACHE_MISSES_TOTAL, DEFAULT_MAX_CONCURRENT_TRANSFORMS, RouteMetric,
30    record_http_metrics, render_metrics_text, status_code, uptime_seconds,
31};
32use multipart::{parse_multipart_boundary, parse_upload_request};
33use negotiate::{
34    CacheHitStatus, ImageResponsePolicy, PublicSourceKind, build_image_etag,
35    build_image_response_headers, if_none_match_matches, negotiate_output_format,
36};
37use remote::resolve_source_bytes;
38use response::{
39    HttpResponse, NOT_FOUND_BODY, bad_request_response, service_unavailable_response,
40    transform_error_response, unsupported_media_type_response, write_response,
41};
42
43use crate::{
44    Fit, MediaType, Position, RawArtifact, Rgba8, Rotation, TransformOptions, TransformRequest,
45    sniff_artifact, transform_raster, transform_svg,
46};
47use hmac::{Hmac, Mac};
48use serde::Deserialize;
49use serde_json::json;
50use sha2::{Digest, Sha256};
51use std::collections::BTreeMap;
52use std::env;
53use std::fmt;
54use std::io;
55use std::net::{TcpListener, TcpStream};
56use std::path::PathBuf;
57use std::str::FromStr;
58use std::sync::Arc;
59use std::sync::atomic::{AtomicU64, Ordering};
60use std::time::{Duration, Instant};
61use url::Url;
62use uuid::Uuid;
63
64/// Writes a line to stderr using a raw file-descriptor write, bypassing Rust's
65/// `std::io::Stderr` type whose internal `ReentrantLock` can interfere with
66/// `MutexGuard` drop ordering in Rust 2024 edition, breaking HTTP keep-alive.
67fn stderr_write(msg: &str) {
68    use std::io::Write;
69    use std::os::fd::FromRawFd;
70
71    // SAFETY: fd 2 (stderr) is always valid for the lifetime of the process.
72    let mut f = unsafe { std::fs::File::from_raw_fd(2) };
73
74    // Build a single buffer to issue one write(2) syscall instead of two.
75    let bytes = msg.as_bytes();
76    let mut buf = Vec::with_capacity(bytes.len() + 1);
77    buf.extend_from_slice(bytes);
78    buf.push(b'\n');
79    let _ = f.write_all(&buf);
80
81    // Do not drop `f` — that would close fd 2 (stderr).
82    std::mem::forget(f);
83}
84
85/// The storage backend that determines how `Path`-based public GET requests are
86/// resolved.
87#[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
88#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89pub enum StorageBackend {
90    /// Source images live on the local filesystem under `storage_root`.
91    Filesystem,
92    /// Source images live in an S3-compatible bucket.
93    #[cfg(feature = "s3")]
94    S3,
95    /// Source images live in a Google Cloud Storage bucket.
96    #[cfg(feature = "gcs")]
97    Gcs,
98    /// Source images live in an Azure Blob Storage container.
99    #[cfg(feature = "azure")]
100    Azure,
101}
102
103#[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
104impl StorageBackend {
105    /// Parses the `TRUSS_STORAGE_BACKEND` environment variable value.
106    pub fn parse(value: &str) -> Result<Self, String> {
107        match value.to_ascii_lowercase().as_str() {
108            "filesystem" | "fs" | "local" => Ok(Self::Filesystem),
109            #[cfg(feature = "s3")]
110            "s3" => Ok(Self::S3),
111            #[cfg(feature = "gcs")]
112            "gcs" => Ok(Self::Gcs),
113            #[cfg(feature = "azure")]
114            "azure" => Ok(Self::Azure),
115            _ => {
116                let mut expected = vec!["filesystem"];
117                #[cfg(feature = "s3")]
118                expected.push("s3");
119                #[cfg(feature = "gcs")]
120                expected.push("gcs");
121                #[cfg(feature = "azure")]
122                expected.push("azure");
123
124                #[allow(unused_mut)]
125                let mut hint = String::new();
126                #[cfg(not(feature = "s3"))]
127                if value.eq_ignore_ascii_case("s3") {
128                    hint = " (hint: rebuild with --features s3)".to_string();
129                }
130                #[cfg(not(feature = "gcs"))]
131                if value.eq_ignore_ascii_case("gcs") {
132                    hint = " (hint: rebuild with --features gcs)".to_string();
133                }
134                #[cfg(not(feature = "azure"))]
135                if value.eq_ignore_ascii_case("azure") {
136                    hint = " (hint: rebuild with --features azure)".to_string();
137                }
138
139                Err(format!(
140                    "unknown storage backend `{value}` (expected {}){hint}",
141                    expected.join(" or ")
142                ))
143            }
144        }
145    }
146}
147
148/// The default bind address for the development HTTP server.
149pub const DEFAULT_BIND_ADDR: &str = "127.0.0.1:8080";
150
151/// The default storage root used by the server adapter.
152pub const DEFAULT_STORAGE_ROOT: &str = ".";
153
154const DEFAULT_PUBLIC_MAX_AGE_SECONDS: u32 = 3600;
155const DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS: u32 = 60;
156const SOCKET_READ_TIMEOUT: Duration = Duration::from_secs(60);
157const SOCKET_WRITE_TIMEOUT: Duration = Duration::from_secs(60);
158/// Number of worker threads for handling incoming connections concurrently.
159const WORKER_THREADS: usize = 8;
160type HmacSha256 = Hmac<Sha256>;
161
162/// Maximum number of requests served over a single keep-alive connection before
163/// the server closes it.  This prevents a single client from monopolising a
164/// worker thread indefinitely.
165const KEEP_ALIVE_MAX_REQUESTS: usize = 100;
166
167/// Default wall-clock deadline (in seconds) for server-side transforms.
168/// Configurable at runtime via `TRUSS_TRANSFORM_DEADLINE_SECS`.
169const DEFAULT_TRANSFORM_DEADLINE_SECS: u64 = 30;
170
171#[derive(Clone, Copy)]
172struct PublicCacheControl {
173    max_age: u32,
174    stale_while_revalidate: u32,
175}
176
177#[derive(Clone, Copy)]
178struct ImageResponseConfig {
179    disable_accept_negotiation: bool,
180    public_cache_control: PublicCacheControl,
181    transform_deadline: Duration,
182}
183
184/// RAII guard that holds a concurrency slot for an in-flight image transform.
185///
186/// The counter is incremented on successful acquisition and decremented when
187/// the guard is dropped, ensuring the slot is always released even if the
188/// caller returns early or panics.
189struct TransformSlot {
190    counter: Arc<AtomicU64>,
191}
192
193impl TransformSlot {
194    fn try_acquire(counter: &Arc<AtomicU64>, limit: u64) -> Option<Self> {
195        let prev = counter.fetch_add(1, Ordering::Relaxed);
196        if prev >= limit {
197            counter.fetch_sub(1, Ordering::Relaxed);
198            None
199        } else {
200            Some(Self {
201                counter: Arc::clone(counter),
202            })
203        }
204    }
205}
206
207impl Drop for TransformSlot {
208    fn drop(&mut self) {
209        self.counter.fetch_sub(1, Ordering::Relaxed);
210    }
211}
212
213/// Runtime configuration for the HTTP server adapter.
214///
215/// The HTTP adapter keeps environment-specific concerns, such as the storage root and
216/// authentication secret, outside the Core transformation API. Tests and embedding runtimes
217/// can construct this value directly, while the CLI entry point typically uses
218/// [`ServerConfig::from_env`] to load the same fields from process environment variables.
219/// A logging callback invoked by the server for diagnostic messages.
220///
221/// Adapters that embed the server can supply a custom handler to route
222/// messages to their preferred logging infrastructure instead of stderr.
223pub type LogHandler = Arc<dyn Fn(&str) + Send + Sync>;
224
225pub struct ServerConfig {
226    /// The storage root used for `source.kind=path` lookups.
227    pub storage_root: PathBuf,
228    /// The expected Bearer token for private endpoints.
229    pub bearer_token: Option<String>,
230    /// The externally visible base URL used for public signed-URL authority.
231    ///
232    /// When this value is set, public signed GET requests use its authority component when
233    /// reconstructing the canonical signature payload. This is primarily useful when the server
234    /// runs behind a reverse proxy and the incoming `Host` header is not the externally visible
235    /// authority that clients sign.
236    pub public_base_url: Option<String>,
237    /// The expected key identifier for public signed GET requests.
238    pub signed_url_key_id: Option<String>,
239    /// The shared secret used to verify public signed GET requests.
240    pub signed_url_secret: Option<String>,
241    /// Whether server-side URL sources may bypass private-network and port restrictions.
242    ///
243    /// This flag is intended for local development and automated tests where fixture servers
244    /// commonly run on loopback addresses and non-standard ports. Production-like configurations
245    /// should keep this disabled.
246    pub allow_insecure_url_sources: bool,
247    /// Optional directory for the on-disk transform cache.
248    ///
249    /// When set, transformed image bytes are cached on disk using a sharded directory layout
250    /// (`ab/cd/ef/<sha256_hex>`). Repeated requests with the same source and transform options
251    /// are served from the cache instead of re-transforming. When `None`, caching is disabled
252    /// and every request performs a fresh transform.
253    pub cache_root: Option<PathBuf>,
254    /// `Cache-Control: max-age` value (in seconds) for public GET image responses.
255    ///
256    /// Defaults to `3600`. Operators can tune this
257    /// via the `TRUSS_PUBLIC_MAX_AGE` environment variable when running behind a CDN.
258    pub public_max_age_seconds: u32,
259    /// `Cache-Control: stale-while-revalidate` value (in seconds) for public GET image responses.
260    ///
261    /// Defaults to `60`. Configurable
262    /// via `TRUSS_PUBLIC_STALE_WHILE_REVALIDATE`.
263    pub public_stale_while_revalidate_seconds: u32,
264    /// Whether Accept-based content negotiation is disabled for public GET endpoints.
265    ///
266    /// When running behind a CDN such as CloudFront, Accept negotiation combined with
267    /// `Vary: Accept` can cause cache key mismatches or mis-served responses if the CDN
268    /// cache policy does not forward the `Accept` header.  Setting this flag to `true`
269    /// disables Accept negotiation entirely: public GET requests that omit the `format`
270    /// query parameter will preserve the input format instead of negotiating via Accept.
271    pub disable_accept_negotiation: bool,
272    /// Optional logging callback for diagnostic messages.
273    ///
274    /// When set, the server routes all diagnostic messages (cache errors, connection
275    /// failures, transform warnings) through this handler. When `None`, messages are
276    /// written to stderr via `eprintln!`.
277    pub log_handler: Option<LogHandler>,
278    /// Maximum number of concurrent image transforms.
279    ///
280    /// Configurable via `TRUSS_MAX_CONCURRENT_TRANSFORMS`. Defaults to 64.
281    pub max_concurrent_transforms: u64,
282    /// Per-transform wall-clock deadline in seconds.
283    ///
284    /// Configurable via `TRUSS_TRANSFORM_DEADLINE_SECS`. Defaults to 30.
285    pub transform_deadline_secs: u64,
286    /// Per-server counter tracking the number of image transforms currently in
287    /// flight.  This is runtime state (not configuration) but lives here so that
288    /// each `serve_with_config` invocation gets an independent counter, avoiding
289    /// cross-server interference when multiple listeners run in the same process
290    /// or during tests.
291    pub transforms_in_flight: Arc<AtomicU64>,
292    /// Download timeout in seconds for object storage backends (S3, GCS, Azure).
293    ///
294    /// Configurable via `TRUSS_STORAGE_TIMEOUT_SECS`. Defaults to 30.
295    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
296    pub storage_timeout_secs: u64,
297    /// The storage backend used to resolve `Path`-based public GET requests.
298    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
299    pub storage_backend: StorageBackend,
300    /// Shared S3 client context, present when `storage_backend` is `S3`.
301    #[cfg(feature = "s3")]
302    pub s3_context: Option<Arc<s3::S3Context>>,
303    /// Shared GCS client context, present when `storage_backend` is `Gcs`.
304    #[cfg(feature = "gcs")]
305    pub gcs_context: Option<Arc<gcs::GcsContext>>,
306    /// Shared Azure Blob Storage client context, present when `storage_backend` is `Azure`.
307    #[cfg(feature = "azure")]
308    pub azure_context: Option<Arc<azure::AzureContext>>,
309}
310
311impl Clone for ServerConfig {
312    fn clone(&self) -> Self {
313        Self {
314            storage_root: self.storage_root.clone(),
315            bearer_token: self.bearer_token.clone(),
316            public_base_url: self.public_base_url.clone(),
317            signed_url_key_id: self.signed_url_key_id.clone(),
318            signed_url_secret: self.signed_url_secret.clone(),
319            allow_insecure_url_sources: self.allow_insecure_url_sources,
320            cache_root: self.cache_root.clone(),
321            public_max_age_seconds: self.public_max_age_seconds,
322            public_stale_while_revalidate_seconds: self.public_stale_while_revalidate_seconds,
323            disable_accept_negotiation: self.disable_accept_negotiation,
324            log_handler: self.log_handler.clone(),
325            max_concurrent_transforms: self.max_concurrent_transforms,
326            transform_deadline_secs: self.transform_deadline_secs,
327            transforms_in_flight: Arc::clone(&self.transforms_in_flight),
328            #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
329            storage_timeout_secs: self.storage_timeout_secs,
330            #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
331            storage_backend: self.storage_backend,
332            #[cfg(feature = "s3")]
333            s3_context: self.s3_context.clone(),
334            #[cfg(feature = "gcs")]
335            gcs_context: self.gcs_context.clone(),
336            #[cfg(feature = "azure")]
337            azure_context: self.azure_context.clone(),
338        }
339    }
340}
341
342impl fmt::Debug for ServerConfig {
343    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
344        let mut d = f.debug_struct("ServerConfig");
345        d.field("storage_root", &self.storage_root)
346            .field(
347                "bearer_token",
348                &self.bearer_token.as_ref().map(|_| "[REDACTED]"),
349            )
350            .field("public_base_url", &self.public_base_url)
351            .field("signed_url_key_id", &self.signed_url_key_id)
352            .field(
353                "signed_url_secret",
354                &self.signed_url_secret.as_ref().map(|_| "[REDACTED]"),
355            )
356            .field(
357                "allow_insecure_url_sources",
358                &self.allow_insecure_url_sources,
359            )
360            .field("cache_root", &self.cache_root)
361            .field("public_max_age_seconds", &self.public_max_age_seconds)
362            .field(
363                "public_stale_while_revalidate_seconds",
364                &self.public_stale_while_revalidate_seconds,
365            )
366            .field(
367                "disable_accept_negotiation",
368                &self.disable_accept_negotiation,
369            )
370            .field("log_handler", &self.log_handler.as_ref().map(|_| ".."))
371            .field("max_concurrent_transforms", &self.max_concurrent_transforms)
372            .field("transform_deadline_secs", &self.transform_deadline_secs);
373        #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
374        {
375            d.field("storage_backend", &self.storage_backend);
376        }
377        #[cfg(feature = "s3")]
378        {
379            d.field("s3_context", &self.s3_context.as_ref().map(|_| ".."));
380        }
381        #[cfg(feature = "gcs")]
382        {
383            d.field("gcs_context", &self.gcs_context.as_ref().map(|_| ".."));
384        }
385        #[cfg(feature = "azure")]
386        {
387            d.field("azure_context", &self.azure_context.as_ref().map(|_| ".."));
388        }
389        d.finish()
390    }
391}
392
393impl PartialEq for ServerConfig {
394    fn eq(&self, other: &Self) -> bool {
395        self.storage_root == other.storage_root
396            && self.bearer_token == other.bearer_token
397            && self.public_base_url == other.public_base_url
398            && self.signed_url_key_id == other.signed_url_key_id
399            && self.signed_url_secret == other.signed_url_secret
400            && self.allow_insecure_url_sources == other.allow_insecure_url_sources
401            && self.cache_root == other.cache_root
402            && self.public_max_age_seconds == other.public_max_age_seconds
403            && self.public_stale_while_revalidate_seconds
404                == other.public_stale_while_revalidate_seconds
405            && self.disable_accept_negotiation == other.disable_accept_negotiation
406            && self.max_concurrent_transforms == other.max_concurrent_transforms
407            && self.transform_deadline_secs == other.transform_deadline_secs
408            && cfg_storage_eq(self, other)
409    }
410}
411
412fn cfg_storage_eq(_this: &ServerConfig, _other: &ServerConfig) -> bool {
413    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
414    {
415        if _this.storage_backend != _other.storage_backend {
416            return false;
417        }
418    }
419    #[cfg(feature = "s3")]
420    {
421        if _this
422            .s3_context
423            .as_ref()
424            .map(|c| (&c.default_bucket, &c.endpoint_url))
425            != _other
426                .s3_context
427                .as_ref()
428                .map(|c| (&c.default_bucket, &c.endpoint_url))
429        {
430            return false;
431        }
432    }
433    #[cfg(feature = "gcs")]
434    {
435        if _this
436            .gcs_context
437            .as_ref()
438            .map(|c| (&c.default_bucket, &c.endpoint_url))
439            != _other
440                .gcs_context
441                .as_ref()
442                .map(|c| (&c.default_bucket, &c.endpoint_url))
443        {
444            return false;
445        }
446    }
447    #[cfg(feature = "azure")]
448    {
449        if _this
450            .azure_context
451            .as_ref()
452            .map(|c| (&c.default_container, &c.endpoint_url))
453            != _other
454                .azure_context
455                .as_ref()
456                .map(|c| (&c.default_container, &c.endpoint_url))
457        {
458            return false;
459        }
460    }
461    true
462}
463
464impl Eq for ServerConfig {}
465
466impl ServerConfig {
467    /// Creates a server configuration from explicit values.
468    ///
469    /// This constructor does not canonicalize the storage root. It is primarily intended for
470    /// tests and embedding scenarios where the caller already controls the filesystem layout.
471    ///
472    /// # Examples
473    ///
474    /// ```
475    /// use truss::adapters::server::ServerConfig;
476    ///
477    /// let config = ServerConfig::new(std::env::temp_dir(), Some("secret".to_string()));
478    ///
479    /// assert_eq!(config.bearer_token.as_deref(), Some("secret"));
480    /// ```
481    pub fn new(storage_root: PathBuf, bearer_token: Option<String>) -> Self {
482        Self {
483            storage_root,
484            bearer_token,
485            public_base_url: None,
486            signed_url_key_id: None,
487            signed_url_secret: None,
488            allow_insecure_url_sources: false,
489            cache_root: None,
490            public_max_age_seconds: DEFAULT_PUBLIC_MAX_AGE_SECONDS,
491            public_stale_while_revalidate_seconds: DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS,
492            disable_accept_negotiation: false,
493            log_handler: None,
494            max_concurrent_transforms: DEFAULT_MAX_CONCURRENT_TRANSFORMS,
495            transform_deadline_secs: DEFAULT_TRANSFORM_DEADLINE_SECS,
496            transforms_in_flight: Arc::new(AtomicU64::new(0)),
497            #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
498            storage_timeout_secs: remote::STORAGE_DOWNLOAD_TIMEOUT_SECS,
499            #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
500            storage_backend: StorageBackend::Filesystem,
501            #[cfg(feature = "s3")]
502            s3_context: None,
503            #[cfg(feature = "gcs")]
504            gcs_context: None,
505            #[cfg(feature = "azure")]
506            azure_context: None,
507        }
508    }
509
510    /// Emits a diagnostic message through the configured log handler, or falls
511    /// back to stderr when no handler is set.
512    fn log(&self, msg: &str) {
513        if let Some(handler) = &self.log_handler {
514            handler(msg);
515        } else {
516            stderr_write(msg);
517        }
518    }
519
520    /// Returns a copy of the configuration with signed-URL verification credentials attached.
521    ///
522    /// Public GET endpoints require both a key identifier and a shared secret. Tests and local
523    /// development setups can use this helper to attach those values directly without going
524    /// through environment variables.
525    ///
526    /// # Examples
527    ///
528    /// ```
529    /// use truss::adapters::server::ServerConfig;
530    ///
531    /// let config = ServerConfig::new(std::env::temp_dir(), None)
532    ///     .with_signed_url_credentials("public-dev", "top-secret");
533    ///
534    /// assert_eq!(config.signed_url_key_id.as_deref(), Some("public-dev"));
535    /// assert_eq!(config.signed_url_secret.as_deref(), Some("top-secret"));
536    /// ```
537    pub fn with_signed_url_credentials(
538        mut self,
539        key_id: impl Into<String>,
540        secret: impl Into<String>,
541    ) -> Self {
542        self.signed_url_key_id = Some(key_id.into());
543        self.signed_url_secret = Some(secret.into());
544        self
545    }
546
547    /// Returns a copy of the configuration with insecure URL source allowances toggled.
548    ///
549    /// Enabling this flag allows URL sources that target loopback or private-network addresses
550    /// and permits non-standard ports. This is useful for local integration tests but weakens
551    /// the default SSRF protections of the server adapter.
552    ///
553    /// # Examples
554    ///
555    /// ```
556    /// use truss::adapters::server::ServerConfig;
557    ///
558    /// let config = ServerConfig::new(std::env::temp_dir(), Some("secret".to_string()))
559    ///     .with_insecure_url_sources(true);
560    ///
561    /// assert!(config.allow_insecure_url_sources);
562    /// ```
563    pub fn with_insecure_url_sources(mut self, allow_insecure_url_sources: bool) -> Self {
564        self.allow_insecure_url_sources = allow_insecure_url_sources;
565        self
566    }
567
568    /// Returns a copy of the configuration with a transform cache directory set.
569    ///
570    /// When a cache root is configured, the server stores transformed images on disk using a
571    /// sharded directory layout and serves subsequent identical requests from the cache.
572    ///
573    /// # Examples
574    ///
575    /// ```
576    /// use truss::adapters::server::ServerConfig;
577    ///
578    /// let config = ServerConfig::new(std::env::temp_dir(), None)
579    ///     .with_cache_root(std::env::temp_dir().join("truss-cache"));
580    ///
581    /// assert!(config.cache_root.is_some());
582    /// ```
583    pub fn with_cache_root(mut self, cache_root: impl Into<PathBuf>) -> Self {
584        self.cache_root = Some(cache_root.into());
585        self
586    }
587
588    /// Returns a copy of the configuration with an S3 storage backend attached.
589    #[cfg(feature = "s3")]
590    pub fn with_s3_context(mut self, context: s3::S3Context) -> Self {
591        self.storage_backend = StorageBackend::S3;
592        self.s3_context = Some(Arc::new(context));
593        self
594    }
595
596    /// Returns a copy of the configuration with a GCS storage backend attached.
597    #[cfg(feature = "gcs")]
598    pub fn with_gcs_context(mut self, context: gcs::GcsContext) -> Self {
599        self.storage_backend = StorageBackend::Gcs;
600        self.gcs_context = Some(Arc::new(context));
601        self
602    }
603
604    /// Returns a copy of the configuration with an Azure Blob Storage backend attached.
605    #[cfg(feature = "azure")]
606    pub fn with_azure_context(mut self, context: azure::AzureContext) -> Self {
607        self.storage_backend = StorageBackend::Azure;
608        self.azure_context = Some(Arc::new(context));
609        self
610    }
611
612    /// Loads server configuration from environment variables.
613    ///
614    /// The adapter currently reads:
615    ///
616    /// - `TRUSS_STORAGE_ROOT`: filesystem root for `source.kind=path` inputs. Defaults to the
617    ///   current directory and is canonicalized before use.
618    /// - `TRUSS_BEARER_TOKEN`: private API Bearer token. When this value is missing, private
619    ///   endpoints remain unavailable and return `503 Service Unavailable`.
620    /// - `TRUSS_PUBLIC_BASE_URL`: externally visible base URL reserved for future public endpoint
621    ///   signing. When set, it must parse as an absolute `http` or `https` URL.
622    /// - `TRUSS_SIGNED_URL_KEY_ID`: key identifier accepted by public signed GET endpoints.
623    /// - `TRUSS_SIGNED_URL_SECRET`: shared secret used to verify public signed GET signatures.
624    /// - `TRUSS_ALLOW_INSECURE_URL_SOURCES`: when set to `1`, `true`, `yes`, or `on`, URL
625    ///   sources may target loopback or private-network addresses and non-standard ports.
626    /// - `TRUSS_CACHE_ROOT`: directory for the on-disk transform cache. When set, transformed
627    ///   images are cached using a sharded `ab/cd/ef/<sha256>` layout. When absent, caching is
628    ///   disabled.
629    /// - `TRUSS_PUBLIC_MAX_AGE`: `Cache-Control: max-age` value (in seconds) for public GET
630    ///   image responses. Defaults to 3600.
631    /// - `TRUSS_PUBLIC_STALE_WHILE_REVALIDATE`: `Cache-Control: stale-while-revalidate` value
632    ///   (in seconds) for public GET image responses. Defaults to 60.
633    /// - `TRUSS_DISABLE_ACCEPT_NEGOTIATION`: when set to `1`, `true`, `yes`, or `on`, disables
634    ///   Accept-based content negotiation on public GET endpoints. This is recommended when running
635    ///   behind a CDN that does not forward the `Accept` header in its cache key.
636    /// - `TRUSS_STORAGE_BACKEND` *(requires the `s3`, `gcs`, or `azure` feature)*: storage backend
637    ///   for resolving `Path`-based public GET requests. Accepts `filesystem` (default), `s3`,
638    ///   `gcs`, or `azure`.
639    /// - `TRUSS_S3_BUCKET` *(requires the `s3` feature)*: default S3 bucket name. Required when
640    ///   the storage backend is `s3`.
641    /// - `TRUSS_S3_FORCE_PATH_STYLE` *(requires the `s3` feature)*: when set to `1`, `true`,
642    ///   `yes`, or `on`, use path-style S3 addressing (`http://endpoint/bucket/key`) instead
643    ///   of virtual-hosted-style. Required for S3-compatible services such as MinIO and
644    ///   adobe/s3mock.
645    /// - `TRUSS_GCS_BUCKET` *(requires the `gcs` feature)*: default GCS bucket name. Required
646    ///   when the storage backend is `gcs`.
647    /// - `TRUSS_GCS_ENDPOINT` *(requires the `gcs` feature)*: custom GCS endpoint URL. Used for
648    ///   emulators such as `fake-gcs-server`. When absent, the default Google Cloud Storage
649    ///   endpoint is used.
650    /// - `GOOGLE_APPLICATION_CREDENTIALS`: path to a GCS service account JSON key file.
651    /// - `GOOGLE_APPLICATION_CREDENTIALS_JSON`: inline GCS service account JSON (alternative to
652    ///   file path).
653    /// - `TRUSS_AZURE_CONTAINER` *(requires the `azure` feature)*: default Azure Blob Storage
654    ///   container name. Required when the storage backend is `azure`.
655    /// - `TRUSS_AZURE_ENDPOINT` *(requires the `azure` feature)*: custom Azure Blob Storage
656    ///   endpoint URL. Used for emulators such as Azurite. When absent, the endpoint is derived
657    ///   from `AZURE_STORAGE_ACCOUNT_NAME`.
658    /// - `AZURE_STORAGE_ACCOUNT_NAME`: Azure storage account name (used to derive the default
659    ///   endpoint when `TRUSS_AZURE_ENDPOINT` is not set).
660    /// - `TRUSS_MAX_CONCURRENT_TRANSFORMS`: maximum number of concurrent image transforms
661    ///   (default: 64, range: 1–1024). Requests exceeding this limit are rejected with 503.
662    /// - `TRUSS_TRANSFORM_DEADLINE_SECS`: per-transform wall-clock deadline in seconds
663    ///   (default: 30, range: 1–300). Transforms exceeding this deadline are cancelled.
664    /// - `TRUSS_STORAGE_TIMEOUT_SECS`: download timeout for storage backends in seconds
665    ///   (default: 30, range: 1–300).
666    ///
667    /// # Errors
668    ///
669    /// Returns an [`io::Error`] when the configured storage root does not exist or cannot be
670    /// canonicalized.
671    ///
672    /// # Examples
673    ///
674    /// ```no_run
675    /// // SAFETY: This example runs single-threaded; no concurrent env access.
676    /// unsafe {
677    ///     std::env::set_var("TRUSS_STORAGE_ROOT", ".");
678    ///     std::env::set_var("TRUSS_ALLOW_INSECURE_URL_SOURCES", "true");
679    /// }
680    ///
681    /// let config = truss::adapters::server::ServerConfig::from_env().unwrap();
682    ///
683    /// assert!(config.storage_root.is_absolute());
684    /// assert!(config.allow_insecure_url_sources);
685    /// ```
686    pub fn from_env() -> io::Result<Self> {
687        #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
688        let storage_backend = match env::var("TRUSS_STORAGE_BACKEND")
689            .ok()
690            .filter(|v| !v.is_empty())
691        {
692            Some(value) => StorageBackend::parse(&value)
693                .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?,
694            None => StorageBackend::Filesystem,
695        };
696
697        let storage_root =
698            env::var("TRUSS_STORAGE_ROOT").unwrap_or_else(|_| DEFAULT_STORAGE_ROOT.to_string());
699        let storage_root = PathBuf::from(storage_root).canonicalize()?;
700        let bearer_token = env::var("TRUSS_BEARER_TOKEN")
701            .ok()
702            .filter(|value| !value.is_empty());
703        let public_base_url = env::var("TRUSS_PUBLIC_BASE_URL")
704            .ok()
705            .filter(|value| !value.is_empty())
706            .map(validate_public_base_url)
707            .transpose()?;
708        let signed_url_key_id = env::var("TRUSS_SIGNED_URL_KEY_ID")
709            .ok()
710            .filter(|value| !value.is_empty());
711        let signed_url_secret = env::var("TRUSS_SIGNED_URL_SECRET")
712            .ok()
713            .filter(|value| !value.is_empty());
714
715        if signed_url_key_id.is_some() != signed_url_secret.is_some() {
716            return Err(io::Error::new(
717                io::ErrorKind::InvalidInput,
718                "TRUSS_SIGNED_URL_KEY_ID and TRUSS_SIGNED_URL_SECRET must be set together",
719            ));
720        }
721
722        if signed_url_key_id.is_some() && public_base_url.is_none() {
723            eprintln!(
724                "truss: warning: TRUSS_SIGNED_URL_KEY_ID is set but TRUSS_PUBLIC_BASE_URL is not. \
725                 Behind a reverse proxy or CDN the Host header may differ from the externally \
726                 visible authority, causing signed URL verification to fail. Consider setting \
727                 TRUSS_PUBLIC_BASE_URL to the canonical external origin."
728            );
729        }
730
731        let cache_root = env::var("TRUSS_CACHE_ROOT")
732            .ok()
733            .filter(|value| !value.is_empty())
734            .map(PathBuf::from);
735
736        let public_max_age_seconds = parse_optional_env_u32("TRUSS_PUBLIC_MAX_AGE")?
737            .unwrap_or(DEFAULT_PUBLIC_MAX_AGE_SECONDS);
738        let public_stale_while_revalidate_seconds =
739            parse_optional_env_u32("TRUSS_PUBLIC_STALE_WHILE_REVALIDATE")?
740                .unwrap_or(DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS);
741
742        let allow_insecure_url_sources = env_flag("TRUSS_ALLOW_INSECURE_URL_SOURCES");
743
744        let max_concurrent_transforms = match env::var("TRUSS_MAX_CONCURRENT_TRANSFORMS")
745            .ok()
746            .filter(|v| !v.is_empty())
747        {
748            Some(value) => {
749                let n: u64 = value.parse().map_err(|_| {
750                    io::Error::new(
751                        io::ErrorKind::InvalidInput,
752                        "TRUSS_MAX_CONCURRENT_TRANSFORMS must be a positive integer",
753                    )
754                })?;
755                if n == 0 || n > 1024 {
756                    return Err(io::Error::new(
757                        io::ErrorKind::InvalidInput,
758                        "TRUSS_MAX_CONCURRENT_TRANSFORMS must be between 1 and 1024",
759                    ));
760                }
761                n
762            }
763            None => DEFAULT_MAX_CONCURRENT_TRANSFORMS,
764        };
765
766        let transform_deadline_secs = match env::var("TRUSS_TRANSFORM_DEADLINE_SECS")
767            .ok()
768            .filter(|v| !v.is_empty())
769        {
770            Some(value) => {
771                let secs: u64 = value.parse().map_err(|_| {
772                    io::Error::new(
773                        io::ErrorKind::InvalidInput,
774                        "TRUSS_TRANSFORM_DEADLINE_SECS must be a positive integer",
775                    )
776                })?;
777                if secs == 0 || secs > 300 {
778                    return Err(io::Error::new(
779                        io::ErrorKind::InvalidInput,
780                        "TRUSS_TRANSFORM_DEADLINE_SECS must be between 1 and 300",
781                    ));
782                }
783                secs
784            }
785            None => DEFAULT_TRANSFORM_DEADLINE_SECS,
786        };
787
788        #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
789        let storage_timeout_secs = match env::var("TRUSS_STORAGE_TIMEOUT_SECS")
790            .ok()
791            .filter(|v| !v.is_empty())
792        {
793            Some(value) => {
794                let secs: u64 = value.parse().map_err(|_| {
795                    io::Error::new(
796                        io::ErrorKind::InvalidInput,
797                        "TRUSS_STORAGE_TIMEOUT_SECS must be a positive integer",
798                    )
799                })?;
800                if secs == 0 || secs > 300 {
801                    return Err(io::Error::new(
802                        io::ErrorKind::InvalidInput,
803                        "TRUSS_STORAGE_TIMEOUT_SECS must be between 1 and 300",
804                    ));
805                }
806                secs
807            }
808            None => remote::STORAGE_DOWNLOAD_TIMEOUT_SECS,
809        };
810
811        #[cfg(feature = "s3")]
812        let s3_context = if storage_backend == StorageBackend::S3 {
813            let bucket = env::var("TRUSS_S3_BUCKET")
814                .ok()
815                .filter(|v| !v.is_empty())
816                .ok_or_else(|| {
817                    io::Error::new(
818                        io::ErrorKind::InvalidInput,
819                        "TRUSS_S3_BUCKET is required when TRUSS_STORAGE_BACKEND=s3",
820                    )
821                })?;
822            Some(Arc::new(s3::build_s3_context(
823                bucket,
824                allow_insecure_url_sources,
825            )?))
826        } else {
827            None
828        };
829
830        #[cfg(feature = "gcs")]
831        let gcs_context = if storage_backend == StorageBackend::Gcs {
832            let bucket = env::var("TRUSS_GCS_BUCKET")
833                .ok()
834                .filter(|v| !v.is_empty())
835                .ok_or_else(|| {
836                    io::Error::new(
837                        io::ErrorKind::InvalidInput,
838                        "TRUSS_GCS_BUCKET is required when TRUSS_STORAGE_BACKEND=gcs",
839                    )
840                })?;
841            Some(Arc::new(gcs::build_gcs_context(
842                bucket,
843                allow_insecure_url_sources,
844            )?))
845        } else {
846            if env::var("TRUSS_GCS_BUCKET")
847                .ok()
848                .filter(|v| !v.is_empty())
849                .is_some()
850            {
851                eprintln!(
852                    "truss: warning: TRUSS_GCS_BUCKET is set but TRUSS_STORAGE_BACKEND is not \
853                     `gcs`. The GCS bucket will be ignored. Set TRUSS_STORAGE_BACKEND=gcs to \
854                     enable the GCS backend."
855                );
856            }
857            None
858        };
859
860        #[cfg(feature = "azure")]
861        let azure_context = if storage_backend == StorageBackend::Azure {
862            let container = env::var("TRUSS_AZURE_CONTAINER")
863                .ok()
864                .filter(|v| !v.is_empty())
865                .ok_or_else(|| {
866                    io::Error::new(
867                        io::ErrorKind::InvalidInput,
868                        "TRUSS_AZURE_CONTAINER is required when TRUSS_STORAGE_BACKEND=azure",
869                    )
870                })?;
871            Some(Arc::new(azure::build_azure_context(
872                container,
873                allow_insecure_url_sources,
874            )?))
875        } else {
876            if env::var("TRUSS_AZURE_CONTAINER")
877                .ok()
878                .filter(|v| !v.is_empty())
879                .is_some()
880            {
881                eprintln!(
882                    "truss: warning: TRUSS_AZURE_CONTAINER is set but TRUSS_STORAGE_BACKEND is not \
883                     `azure`. The Azure container will be ignored. Set TRUSS_STORAGE_BACKEND=azure to \
884                     enable the Azure backend."
885                );
886            }
887            None
888        };
889
890        Ok(Self {
891            storage_root,
892            bearer_token,
893            public_base_url,
894            signed_url_key_id,
895            signed_url_secret,
896            allow_insecure_url_sources,
897            cache_root,
898            public_max_age_seconds,
899            public_stale_while_revalidate_seconds,
900            disable_accept_negotiation: env_flag("TRUSS_DISABLE_ACCEPT_NEGOTIATION"),
901            log_handler: None,
902            max_concurrent_transforms,
903            transform_deadline_secs,
904            transforms_in_flight: Arc::new(AtomicU64::new(0)),
905            #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
906            storage_timeout_secs,
907            #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
908            storage_backend,
909            #[cfg(feature = "s3")]
910            s3_context,
911            #[cfg(feature = "gcs")]
912            gcs_context,
913            #[cfg(feature = "azure")]
914            azure_context,
915        })
916    }
917}
918
919/// Source selector used when generating a signed public transform URL.
920#[derive(Debug, Clone, PartialEq, Eq)]
921pub enum SignedUrlSource {
922    /// Generates a signed `GET /images/by-path` URL.
923    Path {
924        /// The storage-relative source path.
925        path: String,
926        /// An optional source version token.
927        version: Option<String>,
928    },
929    /// Generates a signed `GET /images/by-url` URL.
930    Url {
931        /// The remote source URL.
932        url: String,
933        /// An optional source version token.
934        version: Option<String>,
935    },
936}
937
938/// Builds a signed public transform URL for the server adapter.
939///
940/// The resulting URL targets either `GET /images/by-path` or `GET /images/by-url` depending on
941/// `source`. `base_url` must be an absolute `http` or `https` URL that points at the externally
942/// visible server origin. The helper applies the same canonical query and HMAC-SHA256 signature
943/// scheme that the server adapter verifies at request time.
944///
945/// The helper serializes only explicitly requested transform options and omits fields that would
946/// resolve to the documented defaults on the server side.
947///
948/// # Errors
949///
950/// Returns an error string when `base_url` is not an absolute `http` or `https` URL, when the
951/// visible authority cannot be determined, or when the HMAC state cannot be initialized.
952///
953/// # Examples
954///
955/// ```
956/// use truss::adapters::server::{sign_public_url, SignedUrlSource};
957/// use truss::{MediaType, TransformOptions};
958///
959/// let url = sign_public_url(
960///     "https://cdn.example.com",
961///     SignedUrlSource::Path {
962///         path: "/image.png".to_string(),
963///         version: None,
964///     },
965///     &TransformOptions {
966///         format: Some(MediaType::Jpeg),
967///         ..TransformOptions::default()
968///     },
969///     "public-dev",
970///     "secret-value",
971///     4_102_444_800,
972/// )
973/// .unwrap();
974///
975/// assert!(url.starts_with("https://cdn.example.com/images/by-path?"));
976/// assert!(url.contains("keyId=public-dev"));
977/// assert!(url.contains("signature="));
978/// ```
979pub fn sign_public_url(
980    base_url: &str,
981    source: SignedUrlSource,
982    options: &TransformOptions,
983    key_id: &str,
984    secret: &str,
985    expires: u64,
986) -> Result<String, String> {
987    let base_url = Url::parse(base_url).map_err(|error| format!("base URL is invalid: {error}"))?;
988    match base_url.scheme() {
989        "http" | "https" => {}
990        _ => return Err("base URL must use the http or https scheme".to_string()),
991    }
992
993    let route_path = match source {
994        SignedUrlSource::Path { .. } => "/images/by-path",
995        SignedUrlSource::Url { .. } => "/images/by-url",
996    };
997    let mut endpoint = base_url
998        .join(route_path)
999        .map_err(|error| format!("failed to resolve the public endpoint URL: {error}"))?;
1000    let authority = url_authority(&endpoint)?;
1001    let mut query = signed_source_query(source);
1002    extend_transform_query(&mut query, options);
1003    query.insert("keyId".to_string(), key_id.to_string());
1004    query.insert("expires".to_string(), expires.to_string());
1005
1006    let canonical = format!(
1007        "GET\n{}\n{}\n{}",
1008        authority,
1009        endpoint.path(),
1010        canonical_query_without_signature(&query)
1011    );
1012    let mut mac = HmacSha256::new_from_slice(secret.as_bytes())
1013        .map_err(|error| format!("failed to initialize signed URL HMAC: {error}"))?;
1014    mac.update(canonical.as_bytes());
1015    query.insert(
1016        "signature".to_string(),
1017        hex::encode(mac.finalize().into_bytes()),
1018    );
1019
1020    let mut serializer = url::form_urlencoded::Serializer::new(String::new());
1021    for (name, value) in query {
1022        serializer.append_pair(&name, &value);
1023    }
1024    endpoint.set_query(Some(&serializer.finish()));
1025    Ok(endpoint.into())
1026}
1027
1028/// Returns the bind address for the HTTP server adapter.
1029///
1030/// The adapter reads `TRUSS_BIND_ADDR` when it is present. Otherwise it falls back to
1031/// [`DEFAULT_BIND_ADDR`].
1032pub fn bind_addr() -> String {
1033    env::var("TRUSS_BIND_ADDR").unwrap_or_else(|_| DEFAULT_BIND_ADDR.to_string())
1034}
1035
1036/// Serves requests until the listener stops producing connections.
1037///
1038/// This helper loads [`ServerConfig`] from the process environment and then delegates to
1039/// [`serve_with_config`]. Health endpoints remain available even when the private API is not
1040/// configured, but authenticated transform requests will return `503 Service Unavailable`
1041/// unless `TRUSS_BEARER_TOKEN` is set.
1042///
1043/// # Errors
1044///
1045/// Returns an [`io::Error`] when the storage root cannot be resolved, when accepting the next
1046/// connection fails, or when a response cannot be written to the socket.
1047pub fn serve(listener: TcpListener) -> io::Result<()> {
1048    let config = ServerConfig::from_env()?;
1049
1050    // Fail fast: verify the storage backend is reachable before accepting
1051    // connections so that configuration errors are surfaced immediately.
1052    for (ok, name) in storage_health_check(&config) {
1053        if !ok {
1054            return Err(io::Error::new(
1055                io::ErrorKind::ConnectionRefused,
1056                format!(
1057                    "storage connectivity check failed for `{name}` — verify the backend \
1058                     endpoint, credentials, and container/bucket configuration"
1059                ),
1060            ));
1061        }
1062    }
1063
1064    serve_with_config(listener, config)
1065}
1066
1067/// Serves requests with an explicit server configuration.
1068///
1069/// This is the adapter entry point for tests and embedding scenarios that want deterministic
1070/// configuration instead of environment-variable lookup.
1071///
1072/// # Errors
1073///
1074/// Returns an [`io::Error`] when accepting the next connection fails or when a response cannot
1075/// be written to the socket.
1076pub fn serve_with_config(listener: TcpListener, config: ServerConfig) -> io::Result<()> {
1077    let config = Arc::new(config);
1078    let (sender, receiver) = std::sync::mpsc::channel::<TcpStream>();
1079
1080    // Spawn a pool of worker threads sized to the configured concurrency limit
1081    // (with a minimum of WORKER_THREADS to leave headroom for non-transform
1082    // requests such as health checks and metrics).  Each thread pulls connections
1083    // from the shared channel and handles them independently, so a slow request
1084    // no longer blocks all other clients.
1085    let receiver = Arc::new(std::sync::Mutex::new(receiver));
1086    let pool_size = (config.max_concurrent_transforms as usize).max(WORKER_THREADS);
1087    let mut workers = Vec::with_capacity(pool_size);
1088    for _ in 0..pool_size {
1089        let rx = Arc::clone(&receiver);
1090        let cfg = Arc::clone(&config);
1091        workers.push(std::thread::spawn(move || {
1092            loop {
1093                let stream = {
1094                    let guard = rx.lock().expect("worker lock poisoned");
1095                    match guard.recv() {
1096                        Ok(stream) => stream,
1097                        Err(_) => break,
1098                    }
1099                }; // MutexGuard dropped here — before handle_stream runs.
1100                if let Err(err) = handle_stream(stream, &cfg) {
1101                    cfg.log(&format!("failed to handle connection: {err}"));
1102                }
1103            }
1104        }));
1105    }
1106
1107    for stream in listener.incoming() {
1108        match stream {
1109            Ok(stream) => {
1110                if sender.send(stream).is_err() {
1111                    break;
1112                }
1113            }
1114            Err(err) => return Err(err),
1115        }
1116    }
1117
1118    drop(sender);
1119    let deadline = std::time::Instant::now() + Duration::from_secs(30);
1120    for worker in workers {
1121        let remaining = deadline.saturating_duration_since(std::time::Instant::now());
1122        if remaining.is_zero() {
1123            eprintln!("shutdown: timed out waiting for worker threads");
1124            break;
1125        }
1126        // Park the main thread until the worker finishes or the deadline
1127        // elapses. We cannot interrupt a blocked worker, but the socket
1128        // read/write timeouts ensure workers do not block forever.
1129        let worker_done =
1130            std::sync::Arc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
1131        let wd = std::sync::Arc::clone(&worker_done);
1132        std::thread::spawn(move || {
1133            let _ = worker.join();
1134            let (lock, cvar) = &*wd;
1135            *lock.lock().expect("shutdown notify lock") = true;
1136            cvar.notify_one();
1137        });
1138        let (lock, cvar) = &*worker_done;
1139        let mut done = lock.lock().expect("shutdown wait lock");
1140        while !*done {
1141            let (guard, timeout) = cvar
1142                .wait_timeout(done, remaining)
1143                .expect("shutdown condvar wait");
1144            done = guard;
1145            if timeout.timed_out() {
1146                eprintln!("shutdown: timed out waiting for a worker thread");
1147                break;
1148            }
1149        }
1150    }
1151
1152    Ok(())
1153}
1154
1155/// Serves exactly one request using configuration loaded from the environment.
1156///
1157/// This helper is primarily useful in tests that want to drive the server over a real TCP
1158/// socket but do not need a long-running loop.
1159///
1160/// # Errors
1161///
1162/// Returns an [`io::Error`] when the storage root cannot be resolved, when accepting the next
1163/// connection fails, or when a response cannot be written to the socket.
1164pub fn serve_once(listener: TcpListener) -> io::Result<()> {
1165    let config = ServerConfig::from_env()?;
1166    serve_once_with_config(listener, config)
1167}
1168
1169/// Serves exactly one request with an explicit server configuration.
1170///
1171/// # Errors
1172///
1173/// Returns an [`io::Error`] when accepting the next connection fails or when a response cannot
1174/// be written to the socket.
1175pub fn serve_once_with_config(listener: TcpListener, config: ServerConfig) -> io::Result<()> {
1176    let (stream, _) = listener.accept()?;
1177    handle_stream(stream, &config)
1178}
1179
1180#[derive(Debug, Deserialize)]
1181#[serde(deny_unknown_fields)]
1182struct TransformImageRequestPayload {
1183    source: TransformSourcePayload,
1184    #[serde(default)]
1185    options: TransformOptionsPayload,
1186}
1187
1188#[derive(Debug, Deserialize)]
1189#[serde(tag = "kind", rename_all = "lowercase")]
1190enum TransformSourcePayload {
1191    Path {
1192        path: String,
1193        version: Option<String>,
1194    },
1195    Url {
1196        url: String,
1197        version: Option<String>,
1198    },
1199    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
1200    Storage {
1201        bucket: Option<String>,
1202        key: String,
1203        version: Option<String>,
1204    },
1205}
1206
1207impl TransformSourcePayload {
1208    /// Computes a stable source hash from the reference and version, avoiding the
1209    /// need to read the full source bytes when a version tag is present. Returns
1210    /// `None` when no version is available, in which case the caller must fall back
1211    /// to the content-hash approach.
1212    /// Computes a stable source hash that includes the instance configuration
1213    /// boundaries (storage root, allow_insecure_url_sources) so that cache entries
1214    /// cannot be reused across instances with different security settings sharing
1215    /// the same cache directory.
1216    fn versioned_source_hash(&self, config: &ServerConfig) -> Option<String> {
1217        let (kind, reference, version): (&str, std::borrow::Cow<'_, str>, Option<&str>) = match self
1218        {
1219            Self::Path { path, version } => ("path", path.as_str().into(), version.as_deref()),
1220            Self::Url { url, version } => ("url", url.as_str().into(), version.as_deref()),
1221            #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
1222            Self::Storage {
1223                bucket,
1224                key,
1225                version,
1226            } => {
1227                let (scheme, effective_bucket) =
1228                    storage_scheme_and_bucket(bucket.as_deref(), config);
1229                let effective_bucket = effective_bucket?;
1230                (
1231                    "storage",
1232                    format!("{scheme}://{effective_bucket}/{key}").into(),
1233                    version.as_deref(),
1234                )
1235            }
1236        };
1237        let version = version?;
1238        // Use newline separators so that values containing colons cannot collide
1239        // with different (reference, version) pairs. Include configuration boundaries
1240        // to prevent cross-instance cache poisoning.
1241        let mut id = String::new();
1242        id.push_str(kind);
1243        id.push('\n');
1244        id.push_str(&reference);
1245        id.push('\n');
1246        id.push_str(version);
1247        id.push('\n');
1248        id.push_str(config.storage_root.to_string_lossy().as_ref());
1249        id.push('\n');
1250        id.push_str(if config.allow_insecure_url_sources {
1251            "insecure"
1252        } else {
1253            "strict"
1254        });
1255        #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
1256        {
1257            id.push('\n');
1258            id.push_str(storage_backend_label(config));
1259            #[cfg(feature = "s3")]
1260            if let Some(ref ctx) = config.s3_context
1261                && let Some(ref endpoint) = ctx.endpoint_url
1262            {
1263                id.push('\n');
1264                id.push_str(endpoint);
1265            }
1266            #[cfg(feature = "gcs")]
1267            if let Some(ref ctx) = config.gcs_context
1268                && let Some(ref endpoint) = ctx.endpoint_url
1269            {
1270                id.push('\n');
1271                id.push_str(endpoint);
1272            }
1273            #[cfg(feature = "azure")]
1274            if let Some(ref ctx) = config.azure_context {
1275                id.push('\n');
1276                id.push_str(&ctx.endpoint_url);
1277            }
1278        }
1279        Some(hex::encode(Sha256::digest(id.as_bytes())))
1280    }
1281}
1282
1283#[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
1284fn storage_scheme_and_bucket<'a>(
1285    explicit_bucket: Option<&'a str>,
1286    config: &'a ServerConfig,
1287) -> (&'static str, Option<&'a str>) {
1288    match config.storage_backend {
1289        #[cfg(feature = "s3")]
1290        StorageBackend::S3 => {
1291            let bucket = explicit_bucket.or(config
1292                .s3_context
1293                .as_ref()
1294                .map(|ctx| ctx.default_bucket.as_str()));
1295            ("s3", bucket)
1296        }
1297        #[cfg(feature = "gcs")]
1298        StorageBackend::Gcs => {
1299            let bucket = explicit_bucket.or(config
1300                .gcs_context
1301                .as_ref()
1302                .map(|ctx| ctx.default_bucket.as_str()));
1303            ("gcs", bucket)
1304        }
1305        StorageBackend::Filesystem => ("fs", explicit_bucket),
1306        #[cfg(feature = "azure")]
1307        StorageBackend::Azure => {
1308            let bucket = explicit_bucket.or(config
1309                .azure_context
1310                .as_ref()
1311                .map(|ctx| ctx.default_container.as_str()));
1312            ("azure", bucket)
1313        }
1314    }
1315}
1316
1317#[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
1318fn is_object_storage_backend(config: &ServerConfig) -> bool {
1319    match config.storage_backend {
1320        StorageBackend::Filesystem => false,
1321        #[cfg(feature = "s3")]
1322        StorageBackend::S3 => true,
1323        #[cfg(feature = "gcs")]
1324        StorageBackend::Gcs => true,
1325        #[cfg(feature = "azure")]
1326        StorageBackend::Azure => true,
1327    }
1328}
1329
1330#[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
1331fn storage_backend_label(config: &ServerConfig) -> &'static str {
1332    match config.storage_backend {
1333        StorageBackend::Filesystem => "fs-backend",
1334        #[cfg(feature = "s3")]
1335        StorageBackend::S3 => "s3-backend",
1336        #[cfg(feature = "gcs")]
1337        StorageBackend::Gcs => "gcs-backend",
1338        #[cfg(feature = "azure")]
1339        StorageBackend::Azure => "azure-backend",
1340    }
1341}
1342
1343#[derive(Debug, Default, Deserialize)]
1344#[serde(default, rename_all = "camelCase", deny_unknown_fields)]
1345struct TransformOptionsPayload {
1346    width: Option<u32>,
1347    height: Option<u32>,
1348    fit: Option<String>,
1349    position: Option<String>,
1350    format: Option<String>,
1351    quality: Option<u8>,
1352    background: Option<String>,
1353    rotate: Option<u16>,
1354    auto_orient: Option<bool>,
1355    strip_metadata: Option<bool>,
1356    preserve_exif: Option<bool>,
1357    blur: Option<f32>,
1358}
1359
1360impl TransformOptionsPayload {
1361    fn into_options(self) -> Result<TransformOptions, HttpResponse> {
1362        let defaults = TransformOptions::default();
1363
1364        Ok(TransformOptions {
1365            width: self.width,
1366            height: self.height,
1367            fit: parse_optional_named(self.fit.as_deref(), "fit", Fit::from_str)?,
1368            position: parse_optional_named(
1369                self.position.as_deref(),
1370                "position",
1371                Position::from_str,
1372            )?,
1373            format: parse_optional_named(self.format.as_deref(), "format", MediaType::from_str)?,
1374            quality: self.quality,
1375            background: parse_optional_named(
1376                self.background.as_deref(),
1377                "background",
1378                Rgba8::from_hex,
1379            )?,
1380            rotate: match self.rotate {
1381                Some(value) => parse_named(&value.to_string(), "rotate", Rotation::from_str)?,
1382                None => defaults.rotate,
1383            },
1384            auto_orient: self.auto_orient.unwrap_or(defaults.auto_orient),
1385            strip_metadata: self.strip_metadata.unwrap_or(defaults.strip_metadata),
1386            preserve_exif: self.preserve_exif.unwrap_or(defaults.preserve_exif),
1387            blur: self.blur,
1388            deadline: defaults.deadline,
1389        })
1390    }
1391}
1392
1393struct AccessLogEntry<'a> {
1394    request_id: &'a str,
1395    method: &'a str,
1396    path: &'a str,
1397    route: &'a str,
1398    status: &'a str,
1399    start: Instant,
1400    cache_status: Option<&'a str>,
1401}
1402
1403/// Extracts the `X-Request-Id` header value from request headers.
1404/// Returns `None` if the header is absent or empty.
1405fn extract_request_id(headers: &[(String, String)]) -> Option<String> {
1406    headers.iter().find_map(|(name, value)| {
1407        (name == "x-request-id" && !value.is_empty()).then_some(value.clone())
1408    })
1409}
1410
1411/// Classifies the `Cache-Status` response header as `"hit"` or `"miss"`.
1412/// Returns `None` when the header is absent.
1413fn extract_cache_status(headers: &[(&'static str, String)]) -> Option<&'static str> {
1414    headers
1415        .iter()
1416        .find_map(|(name, value)| (*name == "Cache-Status").then_some(value.as_str()))
1417        .map(|v| if v.contains("hit") { "hit" } else { "miss" })
1418}
1419
1420fn emit_access_log(config: &ServerConfig, entry: &AccessLogEntry<'_>) {
1421    config.log(
1422        &json!({
1423            "kind": "access_log",
1424            "request_id": entry.request_id,
1425            "method": entry.method,
1426            "path": entry.path,
1427            "route": entry.route,
1428            "status": entry.status,
1429            "latency_ms": entry.start.elapsed().as_millis() as u64,
1430            "cache_status": entry.cache_status,
1431        })
1432        .to_string(),
1433    );
1434}
1435
1436fn handle_stream(mut stream: TcpStream, config: &ServerConfig) -> io::Result<()> {
1437    // Prevent slow or stalled clients from blocking the accept loop indefinitely.
1438    if let Err(err) = stream.set_read_timeout(Some(SOCKET_READ_TIMEOUT)) {
1439        config.log(&format!("failed to set socket read timeout: {err}"));
1440    }
1441    if let Err(err) = stream.set_write_timeout(Some(SOCKET_WRITE_TIMEOUT)) {
1442        config.log(&format!("failed to set socket write timeout: {err}"));
1443    }
1444
1445    let mut requests_served: usize = 0;
1446
1447    loop {
1448        let partial = match read_request_headers(&mut stream) {
1449            Ok(partial) => partial,
1450            Err(response) => {
1451                if requests_served > 0 {
1452                    return Ok(());
1453                }
1454                let _ = write_response(&mut stream, response, true);
1455                return Ok(());
1456            }
1457        };
1458
1459        // Start timing after headers are read so latency reflects server
1460        // processing time, not client send / socket-wait time.
1461        let start = Instant::now();
1462
1463        let request_id =
1464            extract_request_id(&partial.headers).unwrap_or_else(|| Uuid::new_v4().to_string());
1465
1466        let client_wants_close = partial
1467            .headers
1468            .iter()
1469            .any(|(name, value)| name == "connection" && value.eq_ignore_ascii_case("close"));
1470
1471        let is_head = partial.method == "HEAD";
1472
1473        let requires_auth = matches!(
1474            (partial.method.as_str(), partial.path()),
1475            ("POST", "/images:transform") | ("POST", "/images")
1476        );
1477        if requires_auth
1478            && let Err(mut response) = authorize_request_headers(&partial.headers, config)
1479        {
1480            response.headers.push(("X-Request-Id", request_id.clone()));
1481            let sc = status_code(response.status).unwrap_or("unknown");
1482            let method_log = partial.method.clone();
1483            let path_log = partial.path().to_string();
1484            let _ = write_response(&mut stream, response, true);
1485            emit_access_log(
1486                config,
1487                &AccessLogEntry {
1488                    request_id: &request_id,
1489                    method: &method_log,
1490                    path: &path_log,
1491                    route: &path_log,
1492                    status: sc,
1493                    start,
1494                    cache_status: None,
1495                },
1496            );
1497            return Ok(());
1498        }
1499
1500        // Clone method/path before `read_request_body` consumes `partial`.
1501        let method = partial.method.clone();
1502        let path = partial.path().to_string();
1503
1504        let request = match read_request_body(&mut stream, partial) {
1505            Ok(request) => request,
1506            Err(mut response) => {
1507                response.headers.push(("X-Request-Id", request_id.clone()));
1508                let sc = status_code(response.status).unwrap_or("unknown");
1509                let _ = write_response(&mut stream, response, true);
1510                emit_access_log(
1511                    config,
1512                    &AccessLogEntry {
1513                        request_id: &request_id,
1514                        method: &method,
1515                        path: &path,
1516                        route: &path,
1517                        status: sc,
1518                        start,
1519                        cache_status: None,
1520                    },
1521                );
1522                return Ok(());
1523            }
1524        };
1525        let route = classify_route(&request);
1526        let mut response = route_request(request, config);
1527        record_http_metrics(route, response.status);
1528
1529        response.headers.push(("X-Request-Id", request_id.clone()));
1530
1531        let cache_status = extract_cache_status(&response.headers);
1532
1533        let sc = status_code(response.status).unwrap_or("unknown");
1534
1535        if is_head {
1536            response.body = Vec::new();
1537        }
1538
1539        requests_served += 1;
1540        let close_after = client_wants_close || requests_served >= KEEP_ALIVE_MAX_REQUESTS;
1541
1542        write_response(&mut stream, response, close_after)?;
1543
1544        emit_access_log(
1545            config,
1546            &AccessLogEntry {
1547                request_id: &request_id,
1548                method: &method,
1549                path: &path,
1550                route: route.as_label(),
1551                status: sc,
1552                start,
1553                cache_status,
1554            },
1555        );
1556
1557        if close_after {
1558            return Ok(());
1559        }
1560    }
1561}
1562
1563fn route_request(request: HttpRequest, config: &ServerConfig) -> HttpResponse {
1564    let method = request.method.clone();
1565    let path = request.path().to_string();
1566
1567    match (method.as_str(), path.as_str()) {
1568        ("GET" | "HEAD", "/health") => handle_health(config),
1569        ("GET" | "HEAD", "/health/live") => handle_health_live(),
1570        ("GET" | "HEAD", "/health/ready") => handle_health_ready(config),
1571        ("GET" | "HEAD", "/images/by-path") => handle_public_path_request(request, config),
1572        ("GET" | "HEAD", "/images/by-url") => handle_public_url_request(request, config),
1573        ("POST", "/images:transform") => handle_transform_request(request, config),
1574        ("POST", "/images") => handle_upload_request(request, config),
1575        ("GET" | "HEAD", "/metrics") => handle_metrics_request(request, config),
1576        _ => HttpResponse::problem("404 Not Found", NOT_FOUND_BODY.as_bytes().to_vec()),
1577    }
1578}
1579
1580fn classify_route(request: &HttpRequest) -> RouteMetric {
1581    match (request.method.as_str(), request.path()) {
1582        ("GET" | "HEAD", "/health") => RouteMetric::Health,
1583        ("GET" | "HEAD", "/health/live") => RouteMetric::HealthLive,
1584        ("GET" | "HEAD", "/health/ready") => RouteMetric::HealthReady,
1585        ("GET" | "HEAD", "/images/by-path") => RouteMetric::PublicByPath,
1586        ("GET" | "HEAD", "/images/by-url") => RouteMetric::PublicByUrl,
1587        ("POST", "/images:transform") => RouteMetric::Transform,
1588        ("POST", "/images") => RouteMetric::Upload,
1589        ("GET" | "HEAD", "/metrics") => RouteMetric::Metrics,
1590        _ => RouteMetric::Unknown,
1591    }
1592}
1593
1594fn handle_transform_request(request: HttpRequest, config: &ServerConfig) -> HttpResponse {
1595    if let Err(response) = authorize_request(&request, config) {
1596        return response;
1597    }
1598
1599    if !request_has_json_content_type(&request) {
1600        return unsupported_media_type_response("content-type must be application/json");
1601    }
1602
1603    let payload: TransformImageRequestPayload = match serde_json::from_slice(&request.body) {
1604        Ok(payload) => payload,
1605        Err(error) => {
1606            return bad_request_response(&format!("request body must be valid JSON: {error}"));
1607        }
1608    };
1609    let options = match payload.options.into_options() {
1610        Ok(options) => options,
1611        Err(response) => return response,
1612    };
1613
1614    let versioned_hash = payload.source.versioned_source_hash(config);
1615    if let Some(response) = try_versioned_cache_lookup(
1616        versioned_hash.as_deref(),
1617        &options,
1618        &request,
1619        ImageResponsePolicy::PrivateTransform,
1620        config,
1621    ) {
1622        return response;
1623    }
1624
1625    let source_bytes = match resolve_source_bytes(payload.source, config) {
1626        Ok(bytes) => bytes,
1627        Err(response) => return response,
1628    };
1629    transform_source_bytes(
1630        source_bytes,
1631        options,
1632        versioned_hash.as_deref(),
1633        &request,
1634        ImageResponsePolicy::PrivateTransform,
1635        config,
1636    )
1637}
1638
1639fn handle_public_path_request(request: HttpRequest, config: &ServerConfig) -> HttpResponse {
1640    handle_public_get_request(request, config, PublicSourceKind::Path)
1641}
1642
1643fn handle_public_url_request(request: HttpRequest, config: &ServerConfig) -> HttpResponse {
1644    handle_public_get_request(request, config, PublicSourceKind::Url)
1645}
1646
1647fn handle_public_get_request(
1648    request: HttpRequest,
1649    config: &ServerConfig,
1650    source_kind: PublicSourceKind,
1651) -> HttpResponse {
1652    let query = match parse_query_params(&request) {
1653        Ok(query) => query,
1654        Err(response) => return response,
1655    };
1656    if let Err(response) = authorize_signed_request(&request, &query, config) {
1657        return response;
1658    }
1659    let (source, options) = match parse_public_get_request(&query, source_kind) {
1660        Ok(parsed) => parsed,
1661        Err(response) => return response,
1662    };
1663
1664    // When the storage backend is object storage (S3 or GCS), convert Path
1665    // sources to Storage sources so that the `path` query parameter is
1666    // resolved as an object key.
1667    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
1668    let source = if is_object_storage_backend(config) {
1669        match source {
1670            TransformSourcePayload::Path { path, version } => TransformSourcePayload::Storage {
1671                bucket: None,
1672                key: path.trim_start_matches('/').to_string(),
1673                version,
1674            },
1675            other => other,
1676        }
1677    } else {
1678        source
1679    };
1680
1681    let versioned_hash = source.versioned_source_hash(config);
1682    if let Some(response) = try_versioned_cache_lookup(
1683        versioned_hash.as_deref(),
1684        &options,
1685        &request,
1686        ImageResponsePolicy::PublicGet,
1687        config,
1688    ) {
1689        return response;
1690    }
1691
1692    let source_bytes = match resolve_source_bytes(source, config) {
1693        Ok(bytes) => bytes,
1694        Err(response) => return response,
1695    };
1696
1697    transform_source_bytes(
1698        source_bytes,
1699        options,
1700        versioned_hash.as_deref(),
1701        &request,
1702        ImageResponsePolicy::PublicGet,
1703        config,
1704    )
1705}
1706
1707fn handle_upload_request(request: HttpRequest, config: &ServerConfig) -> HttpResponse {
1708    if let Err(response) = authorize_request(&request, config) {
1709        return response;
1710    }
1711
1712    let boundary = match parse_multipart_boundary(&request) {
1713        Ok(boundary) => boundary,
1714        Err(response) => return response,
1715    };
1716    let (file_bytes, options) = match parse_upload_request(&request.body, &boundary) {
1717        Ok(parts) => parts,
1718        Err(response) => return response,
1719    };
1720    transform_source_bytes(
1721        file_bytes,
1722        options,
1723        None,
1724        &request,
1725        ImageResponsePolicy::PrivateTransform,
1726        config,
1727    )
1728}
1729
1730/// Returns a minimal liveness response confirming the process is running.
1731fn handle_health_live() -> HttpResponse {
1732    let body = serde_json::to_vec(&json!({
1733        "status": "ok",
1734        "service": "truss",
1735        "version": env!("CARGO_PKG_VERSION"),
1736    }))
1737    .expect("serialize liveness");
1738    let mut body = body;
1739    body.push(b'\n');
1740    HttpResponse::json("200 OK", body)
1741}
1742
1743/// Returns a readiness response after checking that critical infrastructure
1744/// dependencies are available (storage root, cache root if configured, S3
1745/// reachability).  Transform capacity is intentionally excluded — it is a
1746/// runtime signal reported only by the full `/health` diagnostic endpoint.
1747fn handle_health_ready(config: &ServerConfig) -> HttpResponse {
1748    let mut checks: Vec<serde_json::Value> = Vec::new();
1749    let mut all_ok = true;
1750
1751    for (ok, name) in storage_health_check(config) {
1752        checks.push(json!({
1753            "name": name,
1754            "status": if ok { "ok" } else { "fail" },
1755        }));
1756        if !ok {
1757            all_ok = false;
1758        }
1759    }
1760
1761    if let Some(cache_root) = &config.cache_root {
1762        let cache_ok = cache_root.is_dir();
1763        checks.push(json!({
1764            "name": "cacheRoot",
1765            "status": if cache_ok { "ok" } else { "fail" },
1766        }));
1767        if !cache_ok {
1768            all_ok = false;
1769        }
1770    }
1771
1772    let status_str = if all_ok { "ok" } else { "fail" };
1773    let mut body = serde_json::to_vec(&json!({
1774        "status": status_str,
1775        "checks": checks,
1776    }))
1777    .expect("serialize readiness");
1778    body.push(b'\n');
1779
1780    if all_ok {
1781        HttpResponse::json("200 OK", body)
1782    } else {
1783        HttpResponse::json("503 Service Unavailable", body)
1784    }
1785}
1786
1787/// Returns a comprehensive diagnostic health response.
1788fn storage_health_check(config: &ServerConfig) -> Vec<(bool, &'static str)> {
1789    #[allow(unused_mut)]
1790    let mut checks = vec![(config.storage_root.is_dir(), "storageRoot")];
1791    #[cfg(feature = "s3")]
1792    if config.storage_backend == StorageBackend::S3 {
1793        let reachable = config
1794            .s3_context
1795            .as_ref()
1796            .is_some_and(|ctx| ctx.check_reachable());
1797        checks.push((reachable, "storageBackend"));
1798    }
1799    #[cfg(feature = "gcs")]
1800    if config.storage_backend == StorageBackend::Gcs {
1801        let reachable = config
1802            .gcs_context
1803            .as_ref()
1804            .is_some_and(|ctx| ctx.check_reachable());
1805        checks.push((reachable, "storageBackend"));
1806    }
1807    #[cfg(feature = "azure")]
1808    if config.storage_backend == StorageBackend::Azure {
1809        let reachable = config
1810            .azure_context
1811            .as_ref()
1812            .is_some_and(|ctx| ctx.check_reachable());
1813        checks.push((reachable, "storageBackend"));
1814    }
1815    checks
1816}
1817
1818fn handle_health(config: &ServerConfig) -> HttpResponse {
1819    let mut checks: Vec<serde_json::Value> = Vec::new();
1820    let mut all_ok = true;
1821
1822    for (ok, name) in storage_health_check(config) {
1823        checks.push(json!({
1824            "name": name,
1825            "status": if ok { "ok" } else { "fail" },
1826        }));
1827        if !ok {
1828            all_ok = false;
1829        }
1830    }
1831
1832    if let Some(cache_root) = &config.cache_root {
1833        let cache_ok = cache_root.is_dir();
1834        checks.push(json!({
1835            "name": "cacheRoot",
1836            "status": if cache_ok { "ok" } else { "fail" },
1837        }));
1838        if !cache_ok {
1839            all_ok = false;
1840        }
1841    }
1842
1843    let in_flight = config.transforms_in_flight.load(Ordering::Relaxed);
1844    let overloaded = in_flight >= config.max_concurrent_transforms;
1845    checks.push(json!({
1846        "name": "transformCapacity",
1847        "status": if overloaded { "fail" } else { "ok" },
1848    }));
1849    if overloaded {
1850        all_ok = false;
1851    }
1852
1853    let status_str = if all_ok { "ok" } else { "fail" };
1854    let mut body = serde_json::to_vec(&json!({
1855        "status": status_str,
1856        "service": "truss",
1857        "version": env!("CARGO_PKG_VERSION"),
1858        "uptimeSeconds": uptime_seconds(),
1859        "checks": checks,
1860    }))
1861    .expect("serialize health");
1862    body.push(b'\n');
1863
1864    HttpResponse::json("200 OK", body)
1865}
1866
1867fn handle_metrics_request(request: HttpRequest, config: &ServerConfig) -> HttpResponse {
1868    if let Err(response) = authorize_request(&request, config) {
1869        return response;
1870    }
1871
1872    HttpResponse::text(
1873        "200 OK",
1874        "text/plain; version=0.0.4; charset=utf-8",
1875        render_metrics_text(
1876            config.max_concurrent_transforms,
1877            &config.transforms_in_flight,
1878        )
1879        .into_bytes(),
1880    )
1881}
1882
1883fn parse_public_get_request(
1884    query: &BTreeMap<String, String>,
1885    source_kind: PublicSourceKind,
1886) -> Result<(TransformSourcePayload, TransformOptions), HttpResponse> {
1887    validate_public_query_names(query, source_kind)?;
1888
1889    let source = match source_kind {
1890        PublicSourceKind::Path => TransformSourcePayload::Path {
1891            path: required_query_param(query, "path")?.to_string(),
1892            version: query.get("version").cloned(),
1893        },
1894        PublicSourceKind::Url => TransformSourcePayload::Url {
1895            url: required_query_param(query, "url")?.to_string(),
1896            version: query.get("version").cloned(),
1897        },
1898    };
1899
1900    let defaults = TransformOptions::default();
1901    let options = TransformOptions {
1902        width: parse_optional_integer_query(query, "width")?,
1903        height: parse_optional_integer_query(query, "height")?,
1904        fit: parse_optional_named(query.get("fit").map(String::as_str), "fit", Fit::from_str)?,
1905        position: parse_optional_named(
1906            query.get("position").map(String::as_str),
1907            "position",
1908            Position::from_str,
1909        )?,
1910        format: parse_optional_named(
1911            query.get("format").map(String::as_str),
1912            "format",
1913            MediaType::from_str,
1914        )?,
1915        quality: parse_optional_u8_query(query, "quality")?,
1916        background: parse_optional_named(
1917            query.get("background").map(String::as_str),
1918            "background",
1919            Rgba8::from_hex,
1920        )?,
1921        rotate: match query.get("rotate") {
1922            Some(value) => parse_named(value, "rotate", Rotation::from_str)?,
1923            None => defaults.rotate,
1924        },
1925        auto_orient: parse_optional_bool_query(query, "autoOrient")?
1926            .unwrap_or(defaults.auto_orient),
1927        strip_metadata: parse_optional_bool_query(query, "stripMetadata")?
1928            .unwrap_or(defaults.strip_metadata),
1929        preserve_exif: parse_optional_bool_query(query, "preserveExif")?
1930            .unwrap_or(defaults.preserve_exif),
1931        blur: parse_optional_float_query(query, "blur")?,
1932        deadline: defaults.deadline,
1933    };
1934
1935    Ok((source, options))
1936}
1937
1938fn transform_source_bytes(
1939    source_bytes: Vec<u8>,
1940    options: TransformOptions,
1941    versioned_hash: Option<&str>,
1942    request: &HttpRequest,
1943    response_policy: ImageResponsePolicy,
1944    config: &ServerConfig,
1945) -> HttpResponse {
1946    let content_hash;
1947    let source_hash = match versioned_hash {
1948        Some(hash) => hash,
1949        None => {
1950            content_hash = hex::encode(Sha256::digest(&source_bytes));
1951            &content_hash
1952        }
1953    };
1954
1955    let cache = config
1956        .cache_root
1957        .as_ref()
1958        .map(|root| TransformCache::new(root.clone()).with_log_handler(config.log_handler.clone()));
1959
1960    if let Some(ref cache) = cache
1961        && options.format.is_some()
1962    {
1963        let cache_key = compute_cache_key(source_hash, &options, None);
1964        if let CacheLookup::Hit {
1965            media_type,
1966            body,
1967            age,
1968        } = cache.get(&cache_key)
1969        {
1970            CACHE_HITS_TOTAL.fetch_add(1, Ordering::Relaxed);
1971            let etag = build_image_etag(&body);
1972            let mut headers = build_image_response_headers(
1973                media_type,
1974                &etag,
1975                response_policy,
1976                false,
1977                CacheHitStatus::Hit,
1978                config.public_max_age_seconds,
1979                config.public_stale_while_revalidate_seconds,
1980            );
1981            headers.push(("Age", age.as_secs().to_string()));
1982            if matches!(response_policy, ImageResponsePolicy::PublicGet)
1983                && if_none_match_matches(request.header("if-none-match"), &etag)
1984            {
1985                return HttpResponse::empty("304 Not Modified", headers);
1986            }
1987            return HttpResponse::binary_with_headers(
1988                "200 OK",
1989                media_type.as_mime(),
1990                headers,
1991                body,
1992            );
1993        }
1994    }
1995
1996    let _slot = match TransformSlot::try_acquire(
1997        &config.transforms_in_flight,
1998        config.max_concurrent_transforms,
1999    ) {
2000        Some(slot) => slot,
2001        None => return service_unavailable_response("too many concurrent transforms; retry later"),
2002    };
2003    transform_source_bytes_inner(
2004        source_bytes,
2005        options,
2006        request,
2007        response_policy,
2008        cache.as_ref(),
2009        source_hash,
2010        ImageResponseConfig {
2011            disable_accept_negotiation: config.disable_accept_negotiation,
2012            public_cache_control: PublicCacheControl {
2013                max_age: config.public_max_age_seconds,
2014                stale_while_revalidate: config.public_stale_while_revalidate_seconds,
2015            },
2016            transform_deadline: Duration::from_secs(config.transform_deadline_secs),
2017        },
2018    )
2019}
2020
2021fn transform_source_bytes_inner(
2022    source_bytes: Vec<u8>,
2023    mut options: TransformOptions,
2024    request: &HttpRequest,
2025    response_policy: ImageResponsePolicy,
2026    cache: Option<&TransformCache>,
2027    source_hash: &str,
2028    response_config: ImageResponseConfig,
2029) -> HttpResponse {
2030    if options.deadline.is_none() {
2031        options.deadline = Some(response_config.transform_deadline);
2032    }
2033    let artifact = match sniff_artifact(RawArtifact::new(source_bytes, None)) {
2034        Ok(artifact) => artifact,
2035        Err(error) => return transform_error_response(error),
2036    };
2037    let negotiation_used =
2038        if options.format.is_none() && !response_config.disable_accept_negotiation {
2039            match negotiate_output_format(request.header("accept"), &artifact) {
2040                Ok(Some(format)) => {
2041                    options.format = Some(format);
2042                    true
2043                }
2044                Ok(None) => false,
2045                Err(response) => return response,
2046            }
2047        } else {
2048            false
2049        };
2050
2051    let negotiated_accept = if negotiation_used {
2052        request.header("accept")
2053    } else {
2054        None
2055    };
2056    let cache_key = compute_cache_key(source_hash, &options, negotiated_accept);
2057
2058    if let Some(cache) = cache
2059        && let CacheLookup::Hit {
2060            media_type,
2061            body,
2062            age,
2063        } = cache.get(&cache_key)
2064    {
2065        CACHE_HITS_TOTAL.fetch_add(1, Ordering::Relaxed);
2066        let etag = build_image_etag(&body);
2067        let mut headers = build_image_response_headers(
2068            media_type,
2069            &etag,
2070            response_policy,
2071            negotiation_used,
2072            CacheHitStatus::Hit,
2073            response_config.public_cache_control.max_age,
2074            response_config.public_cache_control.stale_while_revalidate,
2075        );
2076        headers.push(("Age", age.as_secs().to_string()));
2077        if matches!(response_policy, ImageResponsePolicy::PublicGet)
2078            && if_none_match_matches(request.header("if-none-match"), &etag)
2079        {
2080            return HttpResponse::empty("304 Not Modified", headers);
2081        }
2082        return HttpResponse::binary_with_headers("200 OK", media_type.as_mime(), headers, body);
2083    }
2084
2085    if cache.is_some() {
2086        CACHE_MISSES_TOTAL.fetch_add(1, Ordering::Relaxed);
2087    }
2088
2089    let is_svg = artifact.media_type == MediaType::Svg;
2090    let result = if is_svg {
2091        match transform_svg(TransformRequest::new(artifact, options)) {
2092            Ok(result) => result,
2093            Err(error) => return transform_error_response(error),
2094        }
2095    } else {
2096        match transform_raster(TransformRequest::new(artifact, options)) {
2097            Ok(result) => result,
2098            Err(error) => return transform_error_response(error),
2099        }
2100    };
2101
2102    for warning in &result.warnings {
2103        let msg = format!("truss: {warning}");
2104        if let Some(c) = cache
2105            && let Some(handler) = &c.log_handler
2106        {
2107            handler(&msg);
2108        } else {
2109            stderr_write(&msg);
2110        }
2111    }
2112
2113    let output = result.artifact;
2114
2115    if let Some(cache) = cache {
2116        cache.put(&cache_key, output.media_type, &output.bytes);
2117    }
2118
2119    let cache_hit_status = if cache.is_some() {
2120        CacheHitStatus::Miss
2121    } else {
2122        CacheHitStatus::Disabled
2123    };
2124
2125    let etag = build_image_etag(&output.bytes);
2126    let headers = build_image_response_headers(
2127        output.media_type,
2128        &etag,
2129        response_policy,
2130        negotiation_used,
2131        cache_hit_status,
2132        response_config.public_cache_control.max_age,
2133        response_config.public_cache_control.stale_while_revalidate,
2134    );
2135
2136    if matches!(response_policy, ImageResponsePolicy::PublicGet)
2137        && if_none_match_matches(request.header("if-none-match"), &etag)
2138    {
2139        return HttpResponse::empty("304 Not Modified", headers);
2140    }
2141
2142    HttpResponse::binary_with_headers("200 OK", output.media_type.as_mime(), headers, output.bytes)
2143}
2144
2145fn env_flag(name: &str) -> bool {
2146    env::var(name)
2147        .map(|value| {
2148            matches!(
2149                value.as_str(),
2150                "1" | "true" | "TRUE" | "yes" | "YES" | "on" | "ON"
2151            )
2152        })
2153        .unwrap_or(false)
2154}
2155
2156fn parse_optional_env_u32(name: &str) -> io::Result<Option<u32>> {
2157    match env::var(name) {
2158        Ok(value) if !value.is_empty() => value.parse::<u32>().map(Some).map_err(|_| {
2159            io::Error::new(
2160                io::ErrorKind::InvalidInput,
2161                format!("{name} must be a non-negative integer"),
2162            )
2163        }),
2164        _ => Ok(None),
2165    }
2166}
2167
2168fn validate_public_base_url(value: String) -> io::Result<String> {
2169    let parsed = Url::parse(&value).map_err(|error| {
2170        io::Error::new(
2171            io::ErrorKind::InvalidInput,
2172            format!("TRUSS_PUBLIC_BASE_URL must be a valid URL: {error}"),
2173        )
2174    })?;
2175
2176    match parsed.scheme() {
2177        "http" | "https" => Ok(parsed.to_string()),
2178        _ => Err(io::Error::new(
2179            io::ErrorKind::InvalidInput,
2180            "TRUSS_PUBLIC_BASE_URL must use http or https",
2181        )),
2182    }
2183}
2184
2185#[cfg(test)]
2186mod tests {
2187    use serial_test::serial;
2188
2189    use super::http_parse::{
2190        HttpRequest, find_header_terminator, read_request_body, read_request_headers,
2191        resolve_storage_path,
2192    };
2193    use super::multipart::parse_multipart_form_data;
2194    use super::remote::{PinnedResolver, prepare_remote_fetch_target};
2195    use super::response::auth_required_response;
2196    use super::response::{HttpResponse, bad_request_response};
2197    use super::{
2198        CacheHitStatus, DEFAULT_BIND_ADDR, DEFAULT_MAX_CONCURRENT_TRANSFORMS,
2199        DEFAULT_PUBLIC_MAX_AGE_SECONDS, DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS,
2200        ImageResponsePolicy, PublicSourceKind, ServerConfig, SignedUrlSource,
2201        TransformSourcePayload, authorize_signed_request, bind_addr, build_image_etag,
2202        build_image_response_headers, canonical_query_without_signature, negotiate_output_format,
2203        parse_public_get_request, route_request, serve_once_with_config, sign_public_url,
2204        transform_source_bytes,
2205    };
2206    use crate::{
2207        Artifact, ArtifactMetadata, MediaType, RawArtifact, TransformOptions, sniff_artifact,
2208    };
2209    use hmac::{Hmac, Mac};
2210    use image::codecs::png::PngEncoder;
2211    use image::{ColorType, ImageEncoder, Rgba, RgbaImage};
2212    use sha2::Sha256;
2213    use std::collections::BTreeMap;
2214    use std::fs;
2215    use std::io::{Cursor, Read, Write};
2216    use std::net::{SocketAddr, TcpListener, TcpStream};
2217    use std::path::{Path, PathBuf};
2218    use std::sync::atomic::Ordering;
2219    use std::thread;
2220    use std::time::{Duration, SystemTime, UNIX_EPOCH};
2221
2222    /// Test-only convenience wrapper that reads headers + body in one shot,
2223    /// preserving the original `read_request` semantics for existing tests.
2224    fn read_request<R: Read>(stream: &mut R) -> Result<HttpRequest, HttpResponse> {
2225        let partial = read_request_headers(stream)?;
2226        read_request_body(stream, partial)
2227    }
2228
2229    fn png_bytes() -> Vec<u8> {
2230        let image = RgbaImage::from_pixel(4, 3, Rgba([10, 20, 30, 255]));
2231        let mut bytes = Vec::new();
2232        PngEncoder::new(&mut bytes)
2233            .write_image(&image, 4, 3, ColorType::Rgba8.into())
2234            .expect("encode png");
2235        bytes
2236    }
2237
2238    fn temp_dir(name: &str) -> PathBuf {
2239        let unique = SystemTime::now()
2240            .duration_since(UNIX_EPOCH)
2241            .expect("current time")
2242            .as_nanos();
2243        let path = std::env::temp_dir().join(format!("truss-server-{name}-{unique}"));
2244        fs::create_dir_all(&path).expect("create temp dir");
2245        path
2246    }
2247
2248    fn write_png(path: &Path) {
2249        fs::write(path, png_bytes()).expect("write png fixture");
2250    }
2251
2252    fn artifact_with_alpha(has_alpha: bool) -> Artifact {
2253        Artifact::new(
2254            png_bytes(),
2255            MediaType::Png,
2256            ArtifactMetadata {
2257                width: Some(4),
2258                height: Some(3),
2259                frame_count: 1,
2260                duration: None,
2261                has_alpha: Some(has_alpha),
2262            },
2263        )
2264    }
2265
2266    fn sign_public_query(
2267        method: &str,
2268        authority: &str,
2269        path: &str,
2270        query: &BTreeMap<String, String>,
2271        secret: &str,
2272    ) -> String {
2273        let canonical = format!(
2274            "{method}\n{authority}\n{path}\n{}",
2275            canonical_query_without_signature(query)
2276        );
2277        let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).expect("create hmac");
2278        mac.update(canonical.as_bytes());
2279        hex::encode(mac.finalize().into_bytes())
2280    }
2281
2282    type FixtureResponse = (String, Vec<(String, String)>, Vec<u8>);
2283
2284    fn read_fixture_request(stream: &mut TcpStream) {
2285        stream
2286            .set_nonblocking(false)
2287            .expect("configure fixture stream blocking mode");
2288        stream
2289            .set_read_timeout(Some(Duration::from_millis(100)))
2290            .expect("configure fixture stream timeout");
2291
2292        let deadline = std::time::Instant::now() + Duration::from_secs(2);
2293        let mut buffer = Vec::new();
2294        let mut chunk = [0_u8; 1024];
2295        let header_end = loop {
2296            let read = match stream.read(&mut chunk) {
2297                Ok(read) => read,
2298                Err(error)
2299                    if matches!(
2300                        error.kind(),
2301                        std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut
2302                    ) && std::time::Instant::now() < deadline =>
2303                {
2304                    thread::sleep(Duration::from_millis(10));
2305                    continue;
2306                }
2307                Err(error) => panic!("read fixture request headers: {error}"),
2308            };
2309            if read == 0 {
2310                panic!("fixture request ended before headers were complete");
2311            }
2312            buffer.extend_from_slice(&chunk[..read]);
2313            if let Some(index) = find_header_terminator(&buffer) {
2314                break index;
2315            }
2316        };
2317
2318        let header_text = std::str::from_utf8(&buffer[..header_end]).expect("fixture request utf8");
2319        let content_length = header_text
2320            .split("\r\n")
2321            .filter_map(|line| line.split_once(':'))
2322            .find_map(|(name, value)| {
2323                name.trim()
2324                    .eq_ignore_ascii_case("content-length")
2325                    .then_some(value.trim())
2326            })
2327            .map(|value| {
2328                value
2329                    .parse::<usize>()
2330                    .expect("fixture content-length should be numeric")
2331            })
2332            .unwrap_or(0);
2333
2334        let mut body = buffer.len().saturating_sub(header_end + 4);
2335        while body < content_length {
2336            let read = match stream.read(&mut chunk) {
2337                Ok(read) => read,
2338                Err(error)
2339                    if matches!(
2340                        error.kind(),
2341                        std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut
2342                    ) && std::time::Instant::now() < deadline =>
2343                {
2344                    thread::sleep(Duration::from_millis(10));
2345                    continue;
2346                }
2347                Err(error) => panic!("read fixture request body: {error}"),
2348            };
2349            if read == 0 {
2350                panic!("fixture request body was truncated");
2351            }
2352            body += read;
2353        }
2354    }
2355
2356    fn spawn_http_server(responses: Vec<FixtureResponse>) -> (String, thread::JoinHandle<()>) {
2357        let listener = TcpListener::bind("127.0.0.1:0").expect("bind fixture server");
2358        listener
2359            .set_nonblocking(true)
2360            .expect("configure fixture server");
2361        let addr = listener.local_addr().expect("fixture server addr");
2362        let url = format!("http://{addr}/image");
2363
2364        let handle = thread::spawn(move || {
2365            for (status, headers, body) in responses {
2366                let deadline = std::time::Instant::now() + Duration::from_secs(10);
2367                let mut accepted = None;
2368                while std::time::Instant::now() < deadline {
2369                    match listener.accept() {
2370                        Ok(stream) => {
2371                            accepted = Some(stream);
2372                            break;
2373                        }
2374                        Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => {
2375                            thread::sleep(Duration::from_millis(10));
2376                        }
2377                        Err(error) => panic!("accept fixture request: {error}"),
2378                    }
2379                }
2380
2381                let Some((mut stream, _)) = accepted else {
2382                    break;
2383                };
2384                read_fixture_request(&mut stream);
2385                let mut header = format!(
2386                    "HTTP/1.1 {status}\r\nContent-Length: {}\r\nConnection: close\r\n",
2387                    body.len()
2388                );
2389                for (name, value) in headers {
2390                    header.push_str(&format!("{name}: {value}\r\n"));
2391                }
2392                header.push_str("\r\n");
2393                stream
2394                    .write_all(header.as_bytes())
2395                    .expect("write fixture headers");
2396                stream.write_all(&body).expect("write fixture body");
2397                stream.flush().expect("flush fixture response");
2398            }
2399        });
2400
2401        (url, handle)
2402    }
2403
2404    fn transform_request(path: &str) -> HttpRequest {
2405        HttpRequest {
2406            method: "POST".to_string(),
2407            target: "/images:transform".to_string(),
2408            version: "HTTP/1.1".to_string(),
2409            headers: vec![
2410                ("authorization".to_string(), "Bearer secret".to_string()),
2411                ("content-type".to_string(), "application/json".to_string()),
2412            ],
2413            body: format!(
2414                "{{\"source\":{{\"kind\":\"path\",\"path\":\"{path}\"}},\"options\":{{\"format\":\"jpeg\"}}}}"
2415            )
2416            .into_bytes(),
2417        }
2418    }
2419
2420    fn transform_url_request(url: &str) -> HttpRequest {
2421        HttpRequest {
2422            method: "POST".to_string(),
2423            target: "/images:transform".to_string(),
2424            version: "HTTP/1.1".to_string(),
2425            headers: vec![
2426                ("authorization".to_string(), "Bearer secret".to_string()),
2427                ("content-type".to_string(), "application/json".to_string()),
2428            ],
2429            body: format!(
2430                "{{\"source\":{{\"kind\":\"url\",\"url\":\"{url}\"}},\"options\":{{\"format\":\"jpeg\"}}}}"
2431            )
2432            .into_bytes(),
2433        }
2434    }
2435
2436    fn upload_request(file_bytes: &[u8], options_json: Option<&str>) -> HttpRequest {
2437        let boundary = "truss-test-boundary";
2438        let mut body = Vec::new();
2439        body.extend_from_slice(
2440            format!(
2441                "--{boundary}\r\nContent-Disposition: form-data; name=\"file\"; filename=\"image.png\"\r\nContent-Type: image/png\r\n\r\n"
2442            )
2443            .as_bytes(),
2444        );
2445        body.extend_from_slice(file_bytes);
2446        body.extend_from_slice(b"\r\n");
2447
2448        if let Some(options_json) = options_json {
2449            body.extend_from_slice(
2450                format!(
2451                    "--{boundary}\r\nContent-Disposition: form-data; name=\"options\"\r\nContent-Type: application/json\r\n\r\n{options_json}\r\n"
2452                )
2453                .as_bytes(),
2454            );
2455        }
2456
2457        body.extend_from_slice(format!("--{boundary}--\r\n").as_bytes());
2458
2459        HttpRequest {
2460            method: "POST".to_string(),
2461            target: "/images".to_string(),
2462            version: "HTTP/1.1".to_string(),
2463            headers: vec![
2464                ("authorization".to_string(), "Bearer secret".to_string()),
2465                (
2466                    "content-type".to_string(),
2467                    format!("multipart/form-data; boundary={boundary}"),
2468                ),
2469            ],
2470            body,
2471        }
2472    }
2473
2474    fn metrics_request(with_auth: bool) -> HttpRequest {
2475        let mut headers = Vec::new();
2476        if with_auth {
2477            headers.push(("authorization".to_string(), "Bearer secret".to_string()));
2478        }
2479
2480        HttpRequest {
2481            method: "GET".to_string(),
2482            target: "/metrics".to_string(),
2483            version: "HTTP/1.1".to_string(),
2484            headers,
2485            body: Vec::new(),
2486        }
2487    }
2488
2489    fn response_body(response: &HttpResponse) -> &str {
2490        std::str::from_utf8(&response.body).expect("utf8 response body")
2491    }
2492
2493    fn signed_public_request(target: &str, host: &str, secret: &str) -> HttpRequest {
2494        let (path, query) = target.split_once('?').expect("target has query");
2495        let mut query = url::form_urlencoded::parse(query.as_bytes())
2496            .into_owned()
2497            .collect::<BTreeMap<_, _>>();
2498        let signature = sign_public_query("GET", host, path, &query, secret);
2499        query.insert("signature".to_string(), signature);
2500        let final_query = url::form_urlencoded::Serializer::new(String::new())
2501            .extend_pairs(
2502                query
2503                    .iter()
2504                    .map(|(name, value)| (name.as_str(), value.as_str())),
2505            )
2506            .finish();
2507
2508        HttpRequest {
2509            method: "GET".to_string(),
2510            target: format!("{path}?{final_query}"),
2511            version: "HTTP/1.1".to_string(),
2512            headers: vec![("host".to_string(), host.to_string())],
2513            body: Vec::new(),
2514        }
2515    }
2516
2517    #[test]
2518    fn uses_default_bind_addr_when_env_is_missing() {
2519        unsafe { std::env::remove_var("TRUSS_BIND_ADDR") };
2520        assert_eq!(bind_addr(), DEFAULT_BIND_ADDR);
2521    }
2522
2523    #[test]
2524    fn authorize_signed_request_accepts_a_valid_signature() {
2525        let request = signed_public_request(
2526            "/images/by-path?path=%2Fimage.png&keyId=public-dev&expires=4102444800&format=jpeg",
2527            "assets.example.com",
2528            "secret-value",
2529        );
2530        let query = super::auth::parse_query_params(&request).expect("parse query");
2531        let config = ServerConfig::new(temp_dir("public-auth"), None)
2532            .with_signed_url_credentials("public-dev", "secret-value");
2533
2534        authorize_signed_request(&request, &query, &config).expect("signed auth should pass");
2535    }
2536
2537    #[test]
2538    fn authorize_signed_request_uses_public_base_url_authority() {
2539        let request = signed_public_request(
2540            "/images/by-path?path=%2Fimage.png&keyId=public-dev&expires=4102444800&format=jpeg",
2541            "cdn.example.com",
2542            "secret-value",
2543        );
2544        let query = super::auth::parse_query_params(&request).expect("parse query");
2545        let mut config = ServerConfig::new(temp_dir("public-authority"), None)
2546            .with_signed_url_credentials("public-dev", "secret-value");
2547        config.public_base_url = Some("https://cdn.example.com".to_string());
2548
2549        authorize_signed_request(&request, &query, &config).expect("signed auth should pass");
2550    }
2551
2552    #[test]
2553    fn negotiate_output_format_prefers_alpha_safe_formats_for_transparent_inputs() {
2554        let format =
2555            negotiate_output_format(Some("image/jpeg,image/png"), &artifact_with_alpha(true))
2556                .expect("negotiate output format")
2557                .expect("resolved output format");
2558
2559        assert_eq!(format, MediaType::Png);
2560    }
2561
2562    #[test]
2563    fn negotiate_output_format_prefers_avif_for_wildcard_accept() {
2564        let format = negotiate_output_format(Some("image/*"), &artifact_with_alpha(false))
2565            .expect("negotiate output format")
2566            .expect("resolved output format");
2567
2568        assert_eq!(format, MediaType::Avif);
2569    }
2570
2571    #[test]
2572    fn build_image_response_headers_include_cache_and_safety_metadata() {
2573        let headers = build_image_response_headers(
2574            MediaType::Webp,
2575            &build_image_etag(b"demo"),
2576            ImageResponsePolicy::PublicGet,
2577            true,
2578            CacheHitStatus::Disabled,
2579            DEFAULT_PUBLIC_MAX_AGE_SECONDS,
2580            DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS,
2581        );
2582
2583        assert!(headers.contains(&(
2584            "Cache-Control",
2585            "public, max-age=3600, stale-while-revalidate=60".to_string()
2586        )));
2587        assert!(headers.contains(&("Vary", "Accept".to_string())));
2588        assert!(headers.contains(&("X-Content-Type-Options", "nosniff".to_string())));
2589        assert!(headers.contains(&(
2590            "Content-Disposition",
2591            "inline; filename=\"truss.webp\"".to_string()
2592        )));
2593        assert!(headers.contains(&("Cache-Status", "\"truss\"; fwd=miss".to_string())));
2594    }
2595
2596    #[test]
2597    fn build_image_response_headers_include_csp_sandbox_for_svg() {
2598        let headers = build_image_response_headers(
2599            MediaType::Svg,
2600            &build_image_etag(b"svg-data"),
2601            ImageResponsePolicy::PublicGet,
2602            true,
2603            CacheHitStatus::Disabled,
2604            DEFAULT_PUBLIC_MAX_AGE_SECONDS,
2605            DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS,
2606        );
2607
2608        assert!(headers.contains(&("Content-Security-Policy", "sandbox".to_string())));
2609    }
2610
2611    #[test]
2612    fn build_image_response_headers_omit_csp_sandbox_for_raster() {
2613        let headers = build_image_response_headers(
2614            MediaType::Png,
2615            &build_image_etag(b"png-data"),
2616            ImageResponsePolicy::PublicGet,
2617            true,
2618            CacheHitStatus::Disabled,
2619            DEFAULT_PUBLIC_MAX_AGE_SECONDS,
2620            DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS,
2621        );
2622
2623        assert!(!headers.iter().any(|(k, _)| *k == "Content-Security-Policy"));
2624    }
2625
2626    #[test]
2627    fn backpressure_rejects_when_at_capacity() {
2628        let config = ServerConfig::new(std::env::temp_dir(), None);
2629        config
2630            .transforms_in_flight
2631            .store(DEFAULT_MAX_CONCURRENT_TRANSFORMS, Ordering::Relaxed);
2632
2633        let request = HttpRequest {
2634            method: "POST".to_string(),
2635            target: "/transform".to_string(),
2636            version: "HTTP/1.1".to_string(),
2637            headers: Vec::new(),
2638            body: Vec::new(),
2639        };
2640
2641        let png_bytes = {
2642            let mut buf = Vec::new();
2643            let encoder = image::codecs::png::PngEncoder::new(&mut buf);
2644            encoder
2645                .write_image(&[255, 0, 0, 255], 1, 1, image::ExtendedColorType::Rgba8)
2646                .unwrap();
2647            buf
2648        };
2649
2650        let response = transform_source_bytes(
2651            png_bytes,
2652            TransformOptions::default(),
2653            None,
2654            &request,
2655            ImageResponsePolicy::PrivateTransform,
2656            &config,
2657        );
2658
2659        assert!(response.status.contains("503"));
2660
2661        assert_eq!(
2662            config.transforms_in_flight.load(Ordering::Relaxed),
2663            DEFAULT_MAX_CONCURRENT_TRANSFORMS
2664        );
2665    }
2666
2667    #[test]
2668    fn backpressure_rejects_with_custom_concurrency_limit() {
2669        let custom_limit = 2u64;
2670        let mut config = ServerConfig::new(std::env::temp_dir(), None);
2671        config.max_concurrent_transforms = custom_limit;
2672        config
2673            .transforms_in_flight
2674            .store(custom_limit, Ordering::Relaxed);
2675
2676        let request = HttpRequest {
2677            method: "POST".to_string(),
2678            target: "/transform".to_string(),
2679            version: "HTTP/1.1".to_string(),
2680            headers: Vec::new(),
2681            body: Vec::new(),
2682        };
2683
2684        let png_bytes = {
2685            let mut buf = Vec::new();
2686            let encoder = image::codecs::png::PngEncoder::new(&mut buf);
2687            encoder
2688                .write_image(&[255, 0, 0, 255], 1, 1, image::ExtendedColorType::Rgba8)
2689                .unwrap();
2690            buf
2691        };
2692
2693        let response = transform_source_bytes(
2694            png_bytes,
2695            TransformOptions::default(),
2696            None,
2697            &request,
2698            ImageResponsePolicy::PrivateTransform,
2699            &config,
2700        );
2701
2702        assert!(response.status.contains("503"));
2703    }
2704
2705    #[test]
2706    fn compute_cache_key_is_deterministic() {
2707        let opts = TransformOptions {
2708            width: Some(300),
2709            height: Some(200),
2710            format: Some(MediaType::Webp),
2711            ..TransformOptions::default()
2712        };
2713        let key1 = super::cache::compute_cache_key("source-abc", &opts, None);
2714        let key2 = super::cache::compute_cache_key("source-abc", &opts, None);
2715        assert_eq!(key1, key2);
2716        assert_eq!(key1.len(), 64);
2717    }
2718
2719    #[test]
2720    fn compute_cache_key_differs_for_different_options() {
2721        let opts1 = TransformOptions {
2722            width: Some(300),
2723            format: Some(MediaType::Webp),
2724            ..TransformOptions::default()
2725        };
2726        let opts2 = TransformOptions {
2727            width: Some(400),
2728            format: Some(MediaType::Webp),
2729            ..TransformOptions::default()
2730        };
2731        let key1 = super::cache::compute_cache_key("same-source", &opts1, None);
2732        let key2 = super::cache::compute_cache_key("same-source", &opts2, None);
2733        assert_ne!(key1, key2);
2734    }
2735
2736    #[test]
2737    fn compute_cache_key_includes_accept_when_present() {
2738        let opts = TransformOptions::default();
2739        let key_no_accept = super::cache::compute_cache_key("src", &opts, None);
2740        let key_with_accept = super::cache::compute_cache_key("src", &opts, Some("image/webp"));
2741        assert_ne!(key_no_accept, key_with_accept);
2742    }
2743
2744    #[test]
2745    fn transform_cache_put_and_get_round_trips() {
2746        let dir = tempfile::tempdir().expect("create tempdir");
2747        let cache = super::cache::TransformCache::new(dir.path().to_path_buf());
2748
2749        cache.put(
2750            "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890",
2751            MediaType::Png,
2752            b"png-data",
2753        );
2754        let result = cache.get("abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890");
2755
2756        match result {
2757            super::cache::CacheLookup::Hit {
2758                media_type, body, ..
2759            } => {
2760                assert_eq!(media_type, MediaType::Png);
2761                assert_eq!(body, b"png-data");
2762            }
2763            super::cache::CacheLookup::Miss => panic!("expected cache hit"),
2764        }
2765    }
2766
2767    #[test]
2768    fn transform_cache_miss_for_unknown_key() {
2769        let dir = tempfile::tempdir().expect("create tempdir");
2770        let cache = super::cache::TransformCache::new(dir.path().to_path_buf());
2771
2772        let result = cache.get("0000001234567890abcdef1234567890abcdef1234567890abcdef1234567890");
2773        assert!(matches!(result, super::cache::CacheLookup::Miss));
2774    }
2775
2776    #[test]
2777    fn transform_cache_uses_sharded_layout() {
2778        let dir = tempfile::tempdir().expect("create tempdir");
2779        let cache = super::cache::TransformCache::new(dir.path().to_path_buf());
2780
2781        let key = "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890";
2782        cache.put(key, MediaType::Jpeg, b"jpeg-data");
2783
2784        let expected = dir.path().join("ab").join("cd").join("ef").join(key);
2785        assert!(
2786            expected.exists(),
2787            "sharded file should exist at {expected:?}"
2788        );
2789    }
2790
2791    #[test]
2792    fn transform_cache_expired_entry_is_miss() {
2793        let dir = tempfile::tempdir().expect("create tempdir");
2794        let mut cache = super::cache::TransformCache::new(dir.path().to_path_buf());
2795        cache.ttl = Duration::from_secs(0);
2796
2797        let key = "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890";
2798        cache.put(key, MediaType::Png, b"data");
2799
2800        std::thread::sleep(Duration::from_millis(10));
2801
2802        let result = cache.get(key);
2803        assert!(matches!(result, super::cache::CacheLookup::Miss));
2804    }
2805
2806    #[test]
2807    fn transform_cache_handles_corrupted_entry_as_miss() {
2808        let dir = tempfile::tempdir().expect("create tempdir");
2809        let cache = super::cache::TransformCache::new(dir.path().to_path_buf());
2810
2811        let key = "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890";
2812        let path = cache.entry_path(key);
2813        fs::create_dir_all(path.parent().unwrap()).unwrap();
2814        fs::write(&path, b"corrupted-data-without-header").unwrap();
2815
2816        let result = cache.get(key);
2817        assert!(matches!(result, super::cache::CacheLookup::Miss));
2818    }
2819
2820    #[test]
2821    fn cache_status_header_reflects_hit() {
2822        let headers = build_image_response_headers(
2823            MediaType::Png,
2824            &build_image_etag(b"data"),
2825            ImageResponsePolicy::PublicGet,
2826            false,
2827            CacheHitStatus::Hit,
2828            DEFAULT_PUBLIC_MAX_AGE_SECONDS,
2829            DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS,
2830        );
2831        assert!(headers.contains(&("Cache-Status", "\"truss\"; hit".to_string())));
2832    }
2833
2834    #[test]
2835    fn cache_status_header_reflects_miss() {
2836        let headers = build_image_response_headers(
2837            MediaType::Png,
2838            &build_image_etag(b"data"),
2839            ImageResponsePolicy::PublicGet,
2840            false,
2841            CacheHitStatus::Miss,
2842            DEFAULT_PUBLIC_MAX_AGE_SECONDS,
2843            DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS,
2844        );
2845        assert!(headers.contains(&("Cache-Status", "\"truss\"; fwd=miss".to_string())));
2846    }
2847
2848    #[test]
2849    fn origin_cache_put_and_get_round_trips() {
2850        let dir = tempfile::tempdir().expect("create tempdir");
2851        let cache = super::cache::OriginCache::new(dir.path());
2852
2853        cache.put("https://example.com/image.png", b"raw-source-bytes");
2854        let result = cache.get("https://example.com/image.png");
2855
2856        assert_eq!(result.as_deref(), Some(b"raw-source-bytes".as_ref()));
2857    }
2858
2859    #[test]
2860    fn origin_cache_miss_for_unknown_url() {
2861        let dir = tempfile::tempdir().expect("create tempdir");
2862        let cache = super::cache::OriginCache::new(dir.path());
2863
2864        assert!(
2865            cache
2866                .get("https://unknown.example.com/missing.png")
2867                .is_none()
2868        );
2869    }
2870
2871    #[test]
2872    fn origin_cache_expired_entry_is_none() {
2873        let dir = tempfile::tempdir().expect("create tempdir");
2874        let mut cache = super::cache::OriginCache::new(dir.path());
2875        cache.ttl = Duration::from_secs(0);
2876
2877        cache.put("https://example.com/img.png", b"data");
2878        std::thread::sleep(Duration::from_millis(10));
2879
2880        assert!(cache.get("https://example.com/img.png").is_none());
2881    }
2882
2883    #[test]
2884    fn origin_cache_uses_origin_subdirectory() {
2885        let dir = tempfile::tempdir().expect("create tempdir");
2886        let cache = super::cache::OriginCache::new(dir.path());
2887
2888        cache.put("https://example.com/test.png", b"bytes");
2889
2890        let origin_dir = dir.path().join("origin");
2891        assert!(origin_dir.exists(), "origin subdirectory should exist");
2892    }
2893
2894    #[test]
2895    fn sign_public_url_builds_a_signed_path_url() {
2896        let url = sign_public_url(
2897            "https://cdn.example.com",
2898            SignedUrlSource::Path {
2899                path: "/image.png".to_string(),
2900                version: Some("v1".to_string()),
2901            },
2902            &crate::TransformOptions {
2903                format: Some(MediaType::Jpeg),
2904                width: Some(320),
2905                ..crate::TransformOptions::default()
2906            },
2907            "public-dev",
2908            "secret-value",
2909            4_102_444_800,
2910        )
2911        .expect("sign public URL");
2912
2913        assert!(url.starts_with("https://cdn.example.com/images/by-path?"));
2914        assert!(url.contains("path=%2Fimage.png"));
2915        assert!(url.contains("version=v1"));
2916        assert!(url.contains("width=320"));
2917        assert!(url.contains("format=jpeg"));
2918        assert!(url.contains("keyId=public-dev"));
2919        assert!(url.contains("expires=4102444800"));
2920        assert!(url.contains("signature="));
2921    }
2922
2923    #[test]
2924    fn parse_public_get_request_rejects_unknown_query_parameters() {
2925        let query = BTreeMap::from([
2926            ("path".to_string(), "/image.png".to_string()),
2927            ("keyId".to_string(), "public-dev".to_string()),
2928            ("expires".to_string(), "4102444800".to_string()),
2929            ("signature".to_string(), "deadbeef".to_string()),
2930            ("unexpected".to_string(), "value".to_string()),
2931        ]);
2932
2933        let response = parse_public_get_request(&query, PublicSourceKind::Path)
2934            .expect_err("unknown query should fail");
2935
2936        assert_eq!(response.status, "400 Bad Request");
2937        assert!(response_body(&response).contains("is not supported"));
2938    }
2939
2940    #[test]
2941    fn prepare_remote_fetch_target_pins_the_validated_netloc() {
2942        let target = prepare_remote_fetch_target(
2943            "http://1.1.1.1/image.png",
2944            &ServerConfig::new(temp_dir("pin"), Some("secret".to_string())),
2945        )
2946        .expect("prepare remote target");
2947
2948        assert_eq!(target.netloc, "1.1.1.1:80");
2949        assert_eq!(target.addrs, vec![SocketAddr::from(([1, 1, 1, 1], 80))]);
2950    }
2951
2952    #[test]
2953    fn pinned_resolver_rejects_unexpected_netlocs() {
2954        use ureq::unversioned::resolver::Resolver;
2955
2956        let resolver = PinnedResolver {
2957            expected_netloc: "example.com:443".to_string(),
2958            addrs: vec![SocketAddr::from(([93, 184, 216, 34], 443))],
2959        };
2960
2961        let config = ureq::config::Config::builder().build();
2962        let timeout = ureq::unversioned::transport::NextTimeout {
2963            after: ureq::unversioned::transport::time::Duration::Exact(
2964                std::time::Duration::from_secs(30),
2965            ),
2966            reason: ureq::Timeout::Resolve,
2967        };
2968
2969        let uri: ureq::http::Uri = "https://example.com/path".parse().unwrap();
2970        let result = resolver
2971            .resolve(&uri, &config, timeout)
2972            .expect("resolve expected netloc");
2973        assert_eq!(&result[..], &[SocketAddr::from(([93, 184, 216, 34], 443))]);
2974
2975        let bad_uri: ureq::http::Uri = "https://proxy.example:8080/path".parse().unwrap();
2976        let timeout2 = ureq::unversioned::transport::NextTimeout {
2977            after: ureq::unversioned::transport::time::Duration::Exact(
2978                std::time::Duration::from_secs(30),
2979            ),
2980            reason: ureq::Timeout::Resolve,
2981        };
2982        let error = resolver
2983            .resolve(&bad_uri, &config, timeout2)
2984            .expect_err("unexpected netloc should fail");
2985        assert!(matches!(error, ureq::Error::HostNotFound));
2986    }
2987
2988    #[test]
2989    fn health_live_returns_status_service_version() {
2990        let request = HttpRequest {
2991            method: "GET".to_string(),
2992            target: "/health/live".to_string(),
2993            version: "HTTP/1.1".to_string(),
2994            headers: Vec::new(),
2995            body: Vec::new(),
2996        };
2997
2998        let response = route_request(request, &ServerConfig::new(temp_dir("live"), None));
2999
3000        assert_eq!(response.status, "200 OK");
3001        let body: serde_json::Value =
3002            serde_json::from_slice(&response.body).expect("parse live body");
3003        assert_eq!(body["status"], "ok");
3004        assert_eq!(body["service"], "truss");
3005        assert_eq!(body["version"], env!("CARGO_PKG_VERSION"));
3006    }
3007
3008    #[test]
3009    fn health_ready_returns_ok_when_storage_exists() {
3010        let storage = temp_dir("ready-ok");
3011        let request = HttpRequest {
3012            method: "GET".to_string(),
3013            target: "/health/ready".to_string(),
3014            version: "HTTP/1.1".to_string(),
3015            headers: Vec::new(),
3016            body: Vec::new(),
3017        };
3018
3019        let response = route_request(request, &ServerConfig::new(storage, None));
3020
3021        assert_eq!(response.status, "200 OK");
3022        let body: serde_json::Value =
3023            serde_json::from_slice(&response.body).expect("parse ready body");
3024        assert_eq!(body["status"], "ok");
3025        let checks = body["checks"].as_array().expect("checks array");
3026        assert!(
3027            checks
3028                .iter()
3029                .any(|c| c["name"] == "storageRoot" && c["status"] == "ok")
3030        );
3031    }
3032
3033    #[test]
3034    fn health_ready_returns_503_when_storage_missing() {
3035        let request = HttpRequest {
3036            method: "GET".to_string(),
3037            target: "/health/ready".to_string(),
3038            version: "HTTP/1.1".to_string(),
3039            headers: Vec::new(),
3040            body: Vec::new(),
3041        };
3042
3043        let config = ServerConfig::new(PathBuf::from("/nonexistent-truss-test-dir"), None);
3044        let response = route_request(request, &config);
3045
3046        assert_eq!(response.status, "503 Service Unavailable");
3047        let body: serde_json::Value =
3048            serde_json::from_slice(&response.body).expect("parse ready fail body");
3049        assert_eq!(body["status"], "fail");
3050        let checks = body["checks"].as_array().expect("checks array");
3051        assert!(
3052            checks
3053                .iter()
3054                .any(|c| c["name"] == "storageRoot" && c["status"] == "fail")
3055        );
3056    }
3057
3058    #[test]
3059    fn health_ready_returns_503_when_cache_root_missing() {
3060        let storage = temp_dir("ready-cache-fail");
3061        let mut config = ServerConfig::new(storage, None);
3062        config.cache_root = Some(PathBuf::from("/nonexistent-truss-cache-dir"));
3063
3064        let request = HttpRequest {
3065            method: "GET".to_string(),
3066            target: "/health/ready".to_string(),
3067            version: "HTTP/1.1".to_string(),
3068            headers: Vec::new(),
3069            body: Vec::new(),
3070        };
3071
3072        let response = route_request(request, &config);
3073
3074        assert_eq!(response.status, "503 Service Unavailable");
3075        let body: serde_json::Value =
3076            serde_json::from_slice(&response.body).expect("parse ready cache body");
3077        assert_eq!(body["status"], "fail");
3078        let checks = body["checks"].as_array().expect("checks array");
3079        assert!(
3080            checks
3081                .iter()
3082                .any(|c| c["name"] == "cacheRoot" && c["status"] == "fail")
3083        );
3084    }
3085
3086    #[test]
3087    fn health_returns_comprehensive_diagnostic() {
3088        let storage = temp_dir("health-diag");
3089        let request = HttpRequest {
3090            method: "GET".to_string(),
3091            target: "/health".to_string(),
3092            version: "HTTP/1.1".to_string(),
3093            headers: Vec::new(),
3094            body: Vec::new(),
3095        };
3096
3097        let response = route_request(request, &ServerConfig::new(storage, None));
3098
3099        assert_eq!(response.status, "200 OK");
3100        let body: serde_json::Value =
3101            serde_json::from_slice(&response.body).expect("parse health body");
3102        assert_eq!(body["status"], "ok");
3103        assert_eq!(body["service"], "truss");
3104        assert_eq!(body["version"], env!("CARGO_PKG_VERSION"));
3105        assert!(body["uptimeSeconds"].is_u64());
3106        assert!(body["checks"].is_array());
3107    }
3108
3109    #[test]
3110    fn unknown_path_returns_not_found() {
3111        let request = HttpRequest {
3112            method: "GET".to_string(),
3113            target: "/unknown".to_string(),
3114            version: "HTTP/1.1".to_string(),
3115            headers: Vec::new(),
3116            body: Vec::new(),
3117        };
3118
3119        let response = route_request(request, &ServerConfig::new(temp_dir("not-found"), None));
3120
3121        assert_eq!(response.status, "404 Not Found");
3122        assert_eq!(response.content_type, Some("application/problem+json"));
3123        let body = response_body(&response);
3124        assert!(body.contains("\"type\":\"about:blank\""));
3125        assert!(body.contains("\"title\":\"Not Found\""));
3126        assert!(body.contains("\"status\":404"));
3127        assert!(body.contains("not found"));
3128    }
3129
3130    #[test]
3131    fn transform_endpoint_requires_authentication() {
3132        let storage_root = temp_dir("auth");
3133        write_png(&storage_root.join("image.png"));
3134        let mut request = transform_request("/image.png");
3135        request.headers.retain(|(name, _)| name != "authorization");
3136
3137        let response = route_request(
3138            request,
3139            &ServerConfig::new(storage_root, Some("secret".to_string())),
3140        );
3141
3142        assert_eq!(response.status, "401 Unauthorized");
3143        assert!(response_body(&response).contains("authorization required"));
3144    }
3145
3146    #[test]
3147    fn transform_endpoint_returns_service_unavailable_without_configured_token() {
3148        let storage_root = temp_dir("token");
3149        write_png(&storage_root.join("image.png"));
3150
3151        let response = route_request(
3152            transform_request("/image.png"),
3153            &ServerConfig::new(storage_root, None),
3154        );
3155
3156        assert_eq!(response.status, "503 Service Unavailable");
3157        assert!(response_body(&response).contains("bearer token is not configured"));
3158    }
3159
3160    #[test]
3161    fn transform_endpoint_transforms_a_path_source() {
3162        let storage_root = temp_dir("transform");
3163        write_png(&storage_root.join("image.png"));
3164
3165        let response = route_request(
3166            transform_request("/image.png"),
3167            &ServerConfig::new(storage_root, Some("secret".to_string())),
3168        );
3169
3170        assert_eq!(response.status, "200 OK");
3171        assert_eq!(response.content_type, Some("image/jpeg"));
3172
3173        let artifact = sniff_artifact(RawArtifact::new(response.body, None)).expect("sniff output");
3174        assert_eq!(artifact.media_type, MediaType::Jpeg);
3175        assert_eq!(artifact.metadata.width, Some(4));
3176        assert_eq!(artifact.metadata.height, Some(3));
3177    }
3178
3179    #[test]
3180    fn transform_endpoint_rejects_private_url_sources_by_default() {
3181        let response = route_request(
3182            transform_url_request("http://127.0.0.1:8080/image.png"),
3183            &ServerConfig::new(temp_dir("url-blocked"), Some("secret".to_string())),
3184        );
3185
3186        assert_eq!(response.status, "403 Forbidden");
3187        assert!(response_body(&response).contains("port is not allowed"));
3188    }
3189
3190    #[test]
3191    fn transform_endpoint_transforms_a_url_source_when_insecure_allowance_is_enabled() {
3192        let (url, handle) = spawn_http_server(vec![(
3193            "200 OK".to_string(),
3194            vec![("Content-Type".to_string(), "image/png".to_string())],
3195            png_bytes(),
3196        )]);
3197
3198        let response = route_request(
3199            transform_url_request(&url),
3200            &ServerConfig::new(temp_dir("url"), Some("secret".to_string()))
3201                .with_insecure_url_sources(true),
3202        );
3203
3204        handle.join().expect("join fixture server");
3205
3206        assert_eq!(response.status, "200 OK");
3207        assert_eq!(response.content_type, Some("image/jpeg"));
3208
3209        let artifact = sniff_artifact(RawArtifact::new(response.body, None)).expect("sniff output");
3210        assert_eq!(artifact.media_type, MediaType::Jpeg);
3211    }
3212
3213    #[test]
3214    fn transform_endpoint_follows_remote_redirects() {
3215        let (redirect_url, handle) = spawn_http_server(vec![
3216            (
3217                "302 Found".to_string(),
3218                vec![("Location".to_string(), "/final-image".to_string())],
3219                Vec::new(),
3220            ),
3221            (
3222                "200 OK".to_string(),
3223                vec![("Content-Type".to_string(), "image/png".to_string())],
3224                png_bytes(),
3225            ),
3226        ]);
3227
3228        let response = route_request(
3229            transform_url_request(&redirect_url),
3230            &ServerConfig::new(temp_dir("redirect"), Some("secret".to_string()))
3231                .with_insecure_url_sources(true),
3232        );
3233
3234        handle.join().expect("join fixture server");
3235
3236        assert_eq!(response.status, "200 OK");
3237        let artifact = sniff_artifact(RawArtifact::new(response.body, None)).expect("sniff output");
3238        assert_eq!(artifact.media_type, MediaType::Jpeg);
3239    }
3240
3241    #[test]
3242    fn upload_endpoint_transforms_uploaded_file() {
3243        let response = route_request(
3244            upload_request(&png_bytes(), Some(r#"{"format":"jpeg"}"#)),
3245            &ServerConfig::new(temp_dir("upload"), Some("secret".to_string())),
3246        );
3247
3248        assert_eq!(response.status, "200 OK");
3249        assert_eq!(response.content_type, Some("image/jpeg"));
3250
3251        let artifact = sniff_artifact(RawArtifact::new(response.body, None)).expect("sniff output");
3252        assert_eq!(artifact.media_type, MediaType::Jpeg);
3253    }
3254
3255    #[test]
3256    fn upload_endpoint_requires_a_file_field() {
3257        let boundary = "truss-test-boundary";
3258        let request = HttpRequest {
3259            method: "POST".to_string(),
3260            target: "/images".to_string(),
3261            version: "HTTP/1.1".to_string(),
3262            headers: vec![
3263                ("authorization".to_string(), "Bearer secret".to_string()),
3264                (
3265                    "content-type".to_string(),
3266                    format!("multipart/form-data; boundary={boundary}"),
3267                ),
3268            ],
3269            body: format!(
3270                "--{boundary}\r\nContent-Disposition: form-data; name=\"options\"\r\nContent-Type: application/json\r\n\r\n{{\"format\":\"jpeg\"}}\r\n--{boundary}--\r\n"
3271            )
3272            .into_bytes(),
3273        };
3274
3275        let response = route_request(
3276            request,
3277            &ServerConfig::new(temp_dir("upload-missing-file"), Some("secret".to_string())),
3278        );
3279
3280        assert_eq!(response.status, "400 Bad Request");
3281        assert!(response_body(&response).contains("requires a `file` field"));
3282    }
3283
3284    #[test]
3285    fn upload_endpoint_rejects_non_multipart_content_type() {
3286        let request = HttpRequest {
3287            method: "POST".to_string(),
3288            target: "/images".to_string(),
3289            version: "HTTP/1.1".to_string(),
3290            headers: vec![
3291                ("authorization".to_string(), "Bearer secret".to_string()),
3292                ("content-type".to_string(), "application/json".to_string()),
3293            ],
3294            body: br#"{"file":"not-really-json"}"#.to_vec(),
3295        };
3296
3297        let response = route_request(
3298            request,
3299            &ServerConfig::new(temp_dir("upload-content-type"), Some("secret".to_string())),
3300        );
3301
3302        assert_eq!(response.status, "415 Unsupported Media Type");
3303        assert!(response_body(&response).contains("multipart/form-data"));
3304    }
3305
3306    #[test]
3307    fn parse_upload_request_extracts_file_and_options() {
3308        let request = upload_request(&png_bytes(), Some(r#"{"width":8,"format":"jpeg"}"#));
3309        let boundary =
3310            super::multipart::parse_multipart_boundary(&request).expect("parse boundary");
3311        let (file_bytes, options) =
3312            super::multipart::parse_upload_request(&request.body, &boundary)
3313                .expect("parse upload body");
3314
3315        assert_eq!(file_bytes, png_bytes());
3316        assert_eq!(options.width, Some(8));
3317        assert_eq!(options.format, Some(MediaType::Jpeg));
3318    }
3319
3320    #[test]
3321    fn metrics_endpoint_requires_authentication() {
3322        let response = route_request(
3323            metrics_request(false),
3324            &ServerConfig::new(temp_dir("metrics-auth"), Some("secret".to_string())),
3325        );
3326
3327        assert_eq!(response.status, "401 Unauthorized");
3328        assert!(response_body(&response).contains("authorization required"));
3329    }
3330
3331    #[test]
3332    fn metrics_endpoint_returns_prometheus_text() {
3333        super::metrics::record_http_metrics(super::metrics::RouteMetric::Health, "200 OK");
3334        let response = route_request(
3335            metrics_request(true),
3336            &ServerConfig::new(temp_dir("metrics"), Some("secret".to_string())),
3337        );
3338        let body = response_body(&response);
3339
3340        assert_eq!(response.status, "200 OK");
3341        assert_eq!(
3342            response.content_type,
3343            Some("text/plain; version=0.0.4; charset=utf-8")
3344        );
3345        assert!(body.contains("truss_http_requests_total"));
3346        assert!(body.contains("truss_http_requests_by_route_total{route=\"/health\"}"));
3347        assert!(body.contains("truss_http_responses_total{status=\"200\"}"));
3348    }
3349
3350    #[test]
3351    fn transform_endpoint_rejects_unsupported_remote_content_encoding() {
3352        let (url, handle) = spawn_http_server(vec![(
3353            "200 OK".to_string(),
3354            vec![
3355                ("Content-Type".to_string(), "image/png".to_string()),
3356                ("Content-Encoding".to_string(), "compress".to_string()),
3357            ],
3358            png_bytes(),
3359        )]);
3360
3361        let response = route_request(
3362            transform_url_request(&url),
3363            &ServerConfig::new(temp_dir("encoding"), Some("secret".to_string()))
3364                .with_insecure_url_sources(true),
3365        );
3366
3367        handle.join().expect("join fixture server");
3368
3369        assert_eq!(response.status, "502 Bad Gateway");
3370        assert!(response_body(&response).contains("unsupported content-encoding"));
3371    }
3372
3373    #[test]
3374    fn resolve_storage_path_rejects_parent_segments() {
3375        let storage_root = temp_dir("resolve");
3376        let response = resolve_storage_path(&storage_root, "../escape.png")
3377            .expect_err("parent segments should be rejected");
3378
3379        assert_eq!(response.status, "400 Bad Request");
3380        assert!(response_body(&response).contains("must not contain root"));
3381    }
3382
3383    #[test]
3384    fn read_request_parses_headers_and_body() {
3385        let request_bytes = b"POST /images:transform HTTP/1.1\r\nHost: localhost\r\nContent-Type: application/json\r\nContent-Length: 2\r\n\r\n{}";
3386        let mut cursor = Cursor::new(request_bytes);
3387        let request = read_request(&mut cursor).expect("parse request");
3388
3389        assert_eq!(request.method, "POST");
3390        assert_eq!(request.target, "/images:transform");
3391        assert_eq!(request.version, "HTTP/1.1");
3392        assert_eq!(request.header("host"), Some("localhost"));
3393        assert_eq!(request.body, b"{}");
3394    }
3395
3396    #[test]
3397    fn read_request_rejects_duplicate_content_length() {
3398        let request_bytes =
3399            b"POST /images:transform HTTP/1.1\r\nContent-Length: 2\r\nContent-Length: 2\r\n\r\n{}";
3400        let mut cursor = Cursor::new(request_bytes);
3401        let response = read_request(&mut cursor).expect_err("duplicate headers should fail");
3402
3403        assert_eq!(response.status, "400 Bad Request");
3404        assert!(response_body(&response).contains("content-length"));
3405    }
3406
3407    #[test]
3408    fn serve_once_handles_a_tcp_request() {
3409        let storage_root = temp_dir("serve-once");
3410        let config = ServerConfig::new(storage_root, None);
3411        let listener = TcpListener::bind("127.0.0.1:0").expect("bind test listener");
3412        let addr = listener.local_addr().expect("read local addr");
3413
3414        let server = thread::spawn(move || serve_once_with_config(listener, config));
3415
3416        let mut stream = TcpStream::connect(addr).expect("connect to test server");
3417        stream
3418            .write_all(b"GET /health/live HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n")
3419            .expect("write request");
3420
3421        let mut response = String::new();
3422        stream.read_to_string(&mut response).expect("read response");
3423
3424        server
3425            .join()
3426            .expect("join test server thread")
3427            .expect("serve one request");
3428
3429        assert!(response.starts_with("HTTP/1.1 200 OK"));
3430        assert!(response.contains("Content-Type: application/json"));
3431        assert!(response.contains("\"status\":\"ok\""));
3432        assert!(response.contains("\"service\":\"truss\""));
3433        assert!(response.contains("\"version\":"));
3434    }
3435
3436    #[test]
3437    fn helper_error_responses_use_rfc7807_problem_details() {
3438        let response = auth_required_response("authorization required");
3439        let bad_request = bad_request_response("bad input");
3440
3441        assert_eq!(
3442            response.content_type,
3443            Some("application/problem+json"),
3444            "error responses must use application/problem+json"
3445        );
3446        assert_eq!(bad_request.content_type, Some("application/problem+json"),);
3447
3448        let auth_body = response_body(&response);
3449        assert!(auth_body.contains("authorization required"));
3450        assert!(auth_body.contains("\"type\":\"about:blank\""));
3451        assert!(auth_body.contains("\"title\":\"Unauthorized\""));
3452        assert!(auth_body.contains("\"status\":401"));
3453
3454        let bad_body = response_body(&bad_request);
3455        assert!(bad_body.contains("bad input"));
3456        assert!(bad_body.contains("\"type\":\"about:blank\""));
3457        assert!(bad_body.contains("\"title\":\"Bad Request\""));
3458        assert!(bad_body.contains("\"status\":400"));
3459    }
3460
3461    #[test]
3462    fn parse_headers_rejects_duplicate_host() {
3463        let lines = "Host: example.com\r\nHost: evil.com\r\n";
3464        let result = super::http_parse::parse_headers(lines.split("\r\n"));
3465        assert!(result.is_err());
3466    }
3467
3468    #[test]
3469    fn parse_headers_rejects_duplicate_authorization() {
3470        let lines = "Authorization: Bearer a\r\nAuthorization: Bearer b\r\n";
3471        let result = super::http_parse::parse_headers(lines.split("\r\n"));
3472        assert!(result.is_err());
3473    }
3474
3475    #[test]
3476    fn parse_headers_rejects_duplicate_content_type() {
3477        let lines = "Content-Type: application/json\r\nContent-Type: text/plain\r\n";
3478        let result = super::http_parse::parse_headers(lines.split("\r\n"));
3479        assert!(result.is_err());
3480    }
3481
3482    #[test]
3483    fn parse_headers_rejects_duplicate_transfer_encoding() {
3484        let lines = "Transfer-Encoding: chunked\r\nTransfer-Encoding: gzip\r\n";
3485        let result = super::http_parse::parse_headers(lines.split("\r\n"));
3486        assert!(result.is_err());
3487    }
3488
3489    #[test]
3490    fn parse_headers_rejects_single_transfer_encoding() {
3491        let lines = "Host: example.com\r\nTransfer-Encoding: chunked\r\n";
3492        let result = super::http_parse::parse_headers(lines.split("\r\n"));
3493        let err = result.unwrap_err();
3494        assert!(
3495            err.status.starts_with("501"),
3496            "expected 501 status, got: {}",
3497            err.status
3498        );
3499        assert!(
3500            String::from_utf8_lossy(&err.body).contains("Transfer-Encoding"),
3501            "error response should mention Transfer-Encoding"
3502        );
3503    }
3504
3505    #[test]
3506    fn parse_headers_rejects_transfer_encoding_identity() {
3507        let lines = "Transfer-Encoding: identity\r\n";
3508        let result = super::http_parse::parse_headers(lines.split("\r\n"));
3509        assert!(result.is_err());
3510    }
3511
3512    #[test]
3513    fn parse_headers_allows_single_instances_of_singleton_headers() {
3514        let lines =
3515            "Host: example.com\r\nAuthorization: Bearer tok\r\nContent-Type: application/json\r\n";
3516        let result = super::http_parse::parse_headers(lines.split("\r\n"));
3517        assert!(result.is_ok());
3518        assert_eq!(result.unwrap().len(), 3);
3519    }
3520
3521    #[test]
3522    fn max_body_for_multipart_uses_upload_limit() {
3523        let headers = vec![(
3524            "content-type".to_string(),
3525            "multipart/form-data; boundary=abc".to_string(),
3526        )];
3527        assert_eq!(
3528            super::http_parse::max_body_for_headers(&headers),
3529            super::http_parse::MAX_UPLOAD_BODY_BYTES
3530        );
3531    }
3532
3533    #[test]
3534    fn max_body_for_json_uses_default_limit() {
3535        let headers = vec![("content-type".to_string(), "application/json".to_string())];
3536        assert_eq!(
3537            super::http_parse::max_body_for_headers(&headers),
3538            super::http_parse::MAX_REQUEST_BODY_BYTES
3539        );
3540    }
3541
3542    #[test]
3543    fn max_body_for_no_content_type_uses_default_limit() {
3544        let headers: Vec<(String, String)> = vec![];
3545        assert_eq!(
3546            super::http_parse::max_body_for_headers(&headers),
3547            super::http_parse::MAX_REQUEST_BODY_BYTES
3548        );
3549    }
3550
3551    fn make_test_config() -> ServerConfig {
3552        ServerConfig::new(std::env::temp_dir(), None)
3553    }
3554
3555    #[test]
3556    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
3557    fn storage_backend_parse_filesystem_aliases() {
3558        assert_eq!(
3559            super::StorageBackend::parse("filesystem").unwrap(),
3560            super::StorageBackend::Filesystem
3561        );
3562        assert_eq!(
3563            super::StorageBackend::parse("fs").unwrap(),
3564            super::StorageBackend::Filesystem
3565        );
3566        assert_eq!(
3567            super::StorageBackend::parse("local").unwrap(),
3568            super::StorageBackend::Filesystem
3569        );
3570    }
3571
3572    #[test]
3573    #[cfg(feature = "s3")]
3574    fn storage_backend_parse_s3() {
3575        assert_eq!(
3576            super::StorageBackend::parse("s3").unwrap(),
3577            super::StorageBackend::S3
3578        );
3579        assert_eq!(
3580            super::StorageBackend::parse("S3").unwrap(),
3581            super::StorageBackend::S3
3582        );
3583    }
3584
3585    #[test]
3586    #[cfg(feature = "gcs")]
3587    fn storage_backend_parse_gcs() {
3588        assert_eq!(
3589            super::StorageBackend::parse("gcs").unwrap(),
3590            super::StorageBackend::Gcs
3591        );
3592        assert_eq!(
3593            super::StorageBackend::parse("GCS").unwrap(),
3594            super::StorageBackend::Gcs
3595        );
3596    }
3597
3598    #[test]
3599    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
3600    fn storage_backend_parse_rejects_unknown() {
3601        assert!(super::StorageBackend::parse("").is_err());
3602        #[cfg(not(feature = "azure"))]
3603        assert!(super::StorageBackend::parse("azure").is_err());
3604        #[cfg(feature = "azure")]
3605        assert!(super::StorageBackend::parse("azure").is_ok());
3606    }
3607
3608    #[test]
3609    fn versioned_source_hash_returns_none_without_version() {
3610        let source = TransformSourcePayload::Path {
3611            path: "/photos/hero.jpg".to_string(),
3612            version: None,
3613        };
3614        assert!(source.versioned_source_hash(&make_test_config()).is_none());
3615    }
3616
3617    #[test]
3618    fn versioned_source_hash_is_deterministic() {
3619        let cfg = make_test_config();
3620        let source = TransformSourcePayload::Path {
3621            path: "/photos/hero.jpg".to_string(),
3622            version: Some("v1".to_string()),
3623        };
3624        let hash1 = source.versioned_source_hash(&cfg).unwrap();
3625        let hash2 = source.versioned_source_hash(&cfg).unwrap();
3626        assert_eq!(hash1, hash2);
3627        assert_eq!(hash1.len(), 64);
3628    }
3629
3630    #[test]
3631    fn versioned_source_hash_differs_by_version() {
3632        let cfg = make_test_config();
3633        let v1 = TransformSourcePayload::Path {
3634            path: "/photos/hero.jpg".to_string(),
3635            version: Some("v1".to_string()),
3636        };
3637        let v2 = TransformSourcePayload::Path {
3638            path: "/photos/hero.jpg".to_string(),
3639            version: Some("v2".to_string()),
3640        };
3641        assert_ne!(
3642            v1.versioned_source_hash(&cfg).unwrap(),
3643            v2.versioned_source_hash(&cfg).unwrap()
3644        );
3645    }
3646
3647    #[test]
3648    fn versioned_source_hash_differs_by_kind() {
3649        let cfg = make_test_config();
3650        let path = TransformSourcePayload::Path {
3651            path: "example.com/image.jpg".to_string(),
3652            version: Some("v1".to_string()),
3653        };
3654        let url = TransformSourcePayload::Url {
3655            url: "example.com/image.jpg".to_string(),
3656            version: Some("v1".to_string()),
3657        };
3658        assert_ne!(
3659            path.versioned_source_hash(&cfg).unwrap(),
3660            url.versioned_source_hash(&cfg).unwrap()
3661        );
3662    }
3663
3664    #[test]
3665    fn versioned_source_hash_differs_by_storage_root() {
3666        let cfg1 = ServerConfig::new(PathBuf::from("/data/images"), None);
3667        let cfg2 = ServerConfig::new(PathBuf::from("/other/images"), None);
3668        let source = TransformSourcePayload::Path {
3669            path: "/photos/hero.jpg".to_string(),
3670            version: Some("v1".to_string()),
3671        };
3672        assert_ne!(
3673            source.versioned_source_hash(&cfg1).unwrap(),
3674            source.versioned_source_hash(&cfg2).unwrap()
3675        );
3676    }
3677
3678    #[test]
3679    fn versioned_source_hash_differs_by_insecure_flag() {
3680        let mut cfg1 = make_test_config();
3681        cfg1.allow_insecure_url_sources = false;
3682        let mut cfg2 = make_test_config();
3683        cfg2.allow_insecure_url_sources = true;
3684        let source = TransformSourcePayload::Url {
3685            url: "http://example.com/img.jpg".to_string(),
3686            version: Some("v1".to_string()),
3687        };
3688        assert_ne!(
3689            source.versioned_source_hash(&cfg1).unwrap(),
3690            source.versioned_source_hash(&cfg2).unwrap()
3691        );
3692    }
3693
3694    #[test]
3695    #[cfg(feature = "s3")]
3696    fn versioned_source_hash_storage_variant_is_deterministic() {
3697        let cfg = make_test_config();
3698        let source = TransformSourcePayload::Storage {
3699            bucket: Some("my-bucket".to_string()),
3700            key: "photos/hero.jpg".to_string(),
3701            version: Some("v1".to_string()),
3702        };
3703        let hash1 = source.versioned_source_hash(&cfg).unwrap();
3704        let hash2 = source.versioned_source_hash(&cfg).unwrap();
3705        assert_eq!(hash1, hash2);
3706        assert_eq!(hash1.len(), 64);
3707    }
3708
3709    #[test]
3710    #[cfg(feature = "s3")]
3711    fn versioned_source_hash_storage_differs_from_path() {
3712        let cfg = make_test_config();
3713        let path_source = TransformSourcePayload::Path {
3714            path: "photos/hero.jpg".to_string(),
3715            version: Some("v1".to_string()),
3716        };
3717        let storage_source = TransformSourcePayload::Storage {
3718            bucket: Some("my-bucket".to_string()),
3719            key: "photos/hero.jpg".to_string(),
3720            version: Some("v1".to_string()),
3721        };
3722        assert_ne!(
3723            path_source.versioned_source_hash(&cfg).unwrap(),
3724            storage_source.versioned_source_hash(&cfg).unwrap()
3725        );
3726    }
3727
3728    #[test]
3729    #[cfg(feature = "s3")]
3730    fn versioned_source_hash_storage_differs_by_bucket() {
3731        let cfg = make_test_config();
3732        let s1 = TransformSourcePayload::Storage {
3733            bucket: Some("bucket-a".to_string()),
3734            key: "image.jpg".to_string(),
3735            version: Some("v1".to_string()),
3736        };
3737        let s2 = TransformSourcePayload::Storage {
3738            bucket: Some("bucket-b".to_string()),
3739            key: "image.jpg".to_string(),
3740            version: Some("v1".to_string()),
3741        };
3742        assert_ne!(
3743            s1.versioned_source_hash(&cfg).unwrap(),
3744            s2.versioned_source_hash(&cfg).unwrap()
3745        );
3746    }
3747
3748    #[test]
3749    #[cfg(feature = "s3")]
3750    fn versioned_source_hash_differs_by_backend() {
3751        let cfg_fs = make_test_config();
3752        let mut cfg_s3 = make_test_config();
3753        cfg_s3.storage_backend = super::StorageBackend::S3;
3754
3755        let source = TransformSourcePayload::Path {
3756            path: "photos/hero.jpg".to_string(),
3757            version: Some("v1".to_string()),
3758        };
3759        assert_ne!(
3760            source.versioned_source_hash(&cfg_fs).unwrap(),
3761            source.versioned_source_hash(&cfg_s3).unwrap()
3762        );
3763    }
3764
3765    #[test]
3766    #[cfg(feature = "s3")]
3767    fn versioned_source_hash_storage_differs_by_endpoint() {
3768        let mut cfg_a = make_test_config();
3769        cfg_a.storage_backend = super::StorageBackend::S3;
3770        cfg_a.s3_context = Some(std::sync::Arc::new(super::s3::S3Context::for_test(
3771            "shared",
3772            Some("http://minio-a:9000"),
3773        )));
3774
3775        let mut cfg_b = make_test_config();
3776        cfg_b.storage_backend = super::StorageBackend::S3;
3777        cfg_b.s3_context = Some(std::sync::Arc::new(super::s3::S3Context::for_test(
3778            "shared",
3779            Some("http://minio-b:9000"),
3780        )));
3781
3782        let source = TransformSourcePayload::Storage {
3783            bucket: None,
3784            key: "image.jpg".to_string(),
3785            version: Some("v1".to_string()),
3786        };
3787        assert_ne!(
3788            source.versioned_source_hash(&cfg_a).unwrap(),
3789            source.versioned_source_hash(&cfg_b).unwrap(),
3790        );
3791        assert_ne!(cfg_a, cfg_b);
3792    }
3793
3794    #[test]
3795    #[cfg(feature = "s3")]
3796    fn storage_backend_default_is_filesystem() {
3797        let cfg = make_test_config();
3798        assert_eq!(cfg.storage_backend, super::StorageBackend::Filesystem);
3799        assert!(cfg.s3_context.is_none());
3800    }
3801
3802    #[test]
3803    #[cfg(feature = "s3")]
3804    fn storage_payload_deserializes_storage_variant() {
3805        let json = r#"{"source":{"kind":"storage","key":"photos/hero.jpg"},"options":{}}"#;
3806        let payload: super::TransformImageRequestPayload = serde_json::from_str(json).unwrap();
3807        match payload.source {
3808            TransformSourcePayload::Storage {
3809                bucket,
3810                key,
3811                version,
3812            } => {
3813                assert!(bucket.is_none());
3814                assert_eq!(key, "photos/hero.jpg");
3815                assert!(version.is_none());
3816            }
3817            _ => panic!("expected Storage variant"),
3818        }
3819    }
3820
3821    #[test]
3822    #[cfg(feature = "s3")]
3823    fn storage_payload_deserializes_with_bucket() {
3824        let json = r#"{"source":{"kind":"storage","bucket":"my-bucket","key":"img.png","version":"v2"},"options":{}}"#;
3825        let payload: super::TransformImageRequestPayload = serde_json::from_str(json).unwrap();
3826        match payload.source {
3827            TransformSourcePayload::Storage {
3828                bucket,
3829                key,
3830                version,
3831            } => {
3832                assert_eq!(bucket.as_deref(), Some("my-bucket"));
3833                assert_eq!(key, "img.png");
3834                assert_eq!(version.as_deref(), Some("v2"));
3835            }
3836            _ => panic!("expected Storage variant"),
3837        }
3838    }
3839
3840    // -----------------------------------------------------------------------
3841    // S3: default_bucket fallback with bucket: None
3842    // -----------------------------------------------------------------------
3843
3844    #[test]
3845    #[cfg(feature = "s3")]
3846    fn versioned_source_hash_uses_default_bucket_when_bucket_is_none() {
3847        let mut cfg_a = make_test_config();
3848        cfg_a.storage_backend = super::StorageBackend::S3;
3849        cfg_a.s3_context = Some(std::sync::Arc::new(super::s3::S3Context::for_test(
3850            "bucket-a", None,
3851        )));
3852
3853        let mut cfg_b = make_test_config();
3854        cfg_b.storage_backend = super::StorageBackend::S3;
3855        cfg_b.s3_context = Some(std::sync::Arc::new(super::s3::S3Context::for_test(
3856            "bucket-b", None,
3857        )));
3858
3859        let source = TransformSourcePayload::Storage {
3860            bucket: None,
3861            key: "image.jpg".to_string(),
3862            version: Some("v1".to_string()),
3863        };
3864        // Different default_bucket ⇒ different hash
3865        assert_ne!(
3866            source.versioned_source_hash(&cfg_a).unwrap(),
3867            source.versioned_source_hash(&cfg_b).unwrap(),
3868        );
3869        // PartialEq also distinguishes them
3870        assert_ne!(cfg_a, cfg_b);
3871    }
3872
3873    #[test]
3874    #[cfg(feature = "s3")]
3875    fn versioned_source_hash_returns_none_without_bucket_or_context() {
3876        let mut cfg = make_test_config();
3877        cfg.storage_backend = super::StorageBackend::S3;
3878        cfg.s3_context = None;
3879
3880        let source = TransformSourcePayload::Storage {
3881            bucket: None,
3882            key: "image.jpg".to_string(),
3883            version: Some("v1".to_string()),
3884        };
3885        // No bucket available ⇒ None (falls back to content-hash)
3886        assert!(source.versioned_source_hash(&cfg).is_none());
3887    }
3888
3889    // -----------------------------------------------------------------------
3890    // S3: from_env branches
3891    //
3892    // These tests mutate process-global environment variables. A mutex
3893    // serializes them so that parallel test threads cannot interfere, and
3894    // each test saves/restores the variables it touches.
3895    // -----------------------------------------------------------------------
3896
3897    #[cfg(feature = "s3")]
3898    static FROM_ENV_MUTEX: std::sync::Mutex<()> = std::sync::Mutex::new(());
3899
3900    #[cfg(feature = "s3")]
3901    const S3_ENV_VARS: &[&str] = &[
3902        "TRUSS_STORAGE_ROOT",
3903        "TRUSS_STORAGE_BACKEND",
3904        "TRUSS_S3_BUCKET",
3905    ];
3906
3907    /// Save current values, run `f`, then restore originals regardless of
3908    /// panics. Holds `FROM_ENV_MUTEX` for the duration.
3909    #[cfg(feature = "s3")]
3910    fn with_s3_env<F: FnOnce()>(vars: &[(&str, Option<&str>)], f: F) {
3911        let _guard = FROM_ENV_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
3912        let saved: Vec<(&str, Option<String>)> = S3_ENV_VARS
3913            .iter()
3914            .map(|k| (*k, std::env::var(k).ok()))
3915            .collect();
3916        // Apply requested overrides
3917        for &(key, value) in vars {
3918            match value {
3919                Some(v) => unsafe { std::env::set_var(key, v) },
3920                None => unsafe { std::env::remove_var(key) },
3921            }
3922        }
3923        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(f));
3924        // Restore originals
3925        for (key, original) in saved {
3926            match original {
3927                Some(v) => unsafe { std::env::set_var(key, v) },
3928                None => unsafe { std::env::remove_var(key) },
3929            }
3930        }
3931        if let Err(payload) = result {
3932            std::panic::resume_unwind(payload);
3933        }
3934    }
3935
3936    #[test]
3937    #[cfg(feature = "s3")]
3938    fn from_env_rejects_invalid_storage_backend() {
3939        let storage = temp_dir("env-bad-backend");
3940        let storage_str = storage.to_str().unwrap().to_string();
3941        with_s3_env(
3942            &[
3943                ("TRUSS_STORAGE_ROOT", Some(&storage_str)),
3944                ("TRUSS_STORAGE_BACKEND", Some("nosuchbackend")),
3945                ("TRUSS_S3_BUCKET", None),
3946            ],
3947            || {
3948                let result = ServerConfig::from_env();
3949                assert!(result.is_err());
3950                let msg = result.unwrap_err().to_string();
3951                assert!(msg.contains("unknown storage backend"), "got: {msg}");
3952            },
3953        );
3954        let _ = std::fs::remove_dir_all(storage);
3955    }
3956
3957    #[test]
3958    #[cfg(feature = "s3")]
3959    fn from_env_rejects_s3_without_bucket() {
3960        let storage = temp_dir("env-no-bucket");
3961        let storage_str = storage.to_str().unwrap().to_string();
3962        with_s3_env(
3963            &[
3964                ("TRUSS_STORAGE_ROOT", Some(&storage_str)),
3965                ("TRUSS_STORAGE_BACKEND", Some("s3")),
3966                ("TRUSS_S3_BUCKET", None),
3967            ],
3968            || {
3969                let result = ServerConfig::from_env();
3970                assert!(result.is_err());
3971                let msg = result.unwrap_err().to_string();
3972                assert!(msg.contains("TRUSS_S3_BUCKET"), "got: {msg}");
3973            },
3974        );
3975        let _ = std::fs::remove_dir_all(storage);
3976    }
3977
3978    #[test]
3979    #[cfg(feature = "s3")]
3980    fn from_env_accepts_s3_with_bucket() {
3981        let storage = temp_dir("env-s3-ok");
3982        let storage_str = storage.to_str().unwrap().to_string();
3983        with_s3_env(
3984            &[
3985                ("TRUSS_STORAGE_ROOT", Some(&storage_str)),
3986                ("TRUSS_STORAGE_BACKEND", Some("s3")),
3987                ("TRUSS_S3_BUCKET", Some("my-images")),
3988            ],
3989            || {
3990                let cfg =
3991                    ServerConfig::from_env().expect("from_env should succeed with s3 + bucket");
3992                assert_eq!(cfg.storage_backend, super::StorageBackend::S3);
3993                let ctx = cfg.s3_context.expect("s3_context should be Some");
3994                assert_eq!(ctx.default_bucket, "my-images");
3995            },
3996        );
3997        let _ = std::fs::remove_dir_all(storage);
3998    }
3999
4000    // -----------------------------------------------------------------------
4001    // S3: health endpoint
4002    // -----------------------------------------------------------------------
4003
4004    #[test]
4005    #[cfg(feature = "s3")]
4006    fn health_ready_s3_returns_503_when_context_missing() {
4007        let storage = temp_dir("health-s3-no-ctx");
4008        let mut config = ServerConfig::new(storage.clone(), None);
4009        config.storage_backend = super::StorageBackend::S3;
4010        config.s3_context = None;
4011
4012        let request = super::http_parse::HttpRequest {
4013            method: "GET".to_string(),
4014            target: "/health/ready".to_string(),
4015            version: "HTTP/1.1".to_string(),
4016            headers: Vec::new(),
4017            body: Vec::new(),
4018        };
4019        let response = route_request(request, &config);
4020        let _ = std::fs::remove_dir_all(storage);
4021
4022        assert_eq!(response.status, "503 Service Unavailable");
4023        let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
4024        let checks = body["checks"].as_array().expect("checks array");
4025        assert!(
4026            checks
4027                .iter()
4028                .any(|c| c["name"] == "storageBackend" && c["status"] == "fail"),
4029            "expected s3Client fail check in {body}",
4030        );
4031    }
4032
4033    #[test]
4034    #[cfg(feature = "s3")]
4035    fn health_ready_s3_includes_s3_client_check() {
4036        let storage = temp_dir("health-s3-ok");
4037        let mut config = ServerConfig::new(storage.clone(), None);
4038        config.storage_backend = super::StorageBackend::S3;
4039        config.s3_context = Some(std::sync::Arc::new(super::s3::S3Context::for_test(
4040            "test-bucket",
4041            None,
4042        )));
4043
4044        let request = super::http_parse::HttpRequest {
4045            method: "GET".to_string(),
4046            target: "/health/ready".to_string(),
4047            version: "HTTP/1.1".to_string(),
4048            headers: Vec::new(),
4049            body: Vec::new(),
4050        };
4051        let response = route_request(request, &config);
4052        let _ = std::fs::remove_dir_all(storage);
4053
4054        // The s3Client check will report "fail" because there is no real S3
4055        // endpoint, but the important thing is that the check is present.
4056        let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
4057        let checks = body["checks"].as_array().expect("checks array");
4058        assert!(
4059            checks.iter().any(|c| c["name"] == "storageBackend"),
4060            "expected s3Client check in {body}",
4061        );
4062    }
4063
4064    // -----------------------------------------------------------------------
4065    // S3: public by-path remap (leading slash trimmed, Storage variant used)
4066    // -----------------------------------------------------------------------
4067
4068    /// Replicates the Path→Storage remap that `handle_public_get_request`
4069    /// performs when `storage_backend == S3`, so we can inspect the resulting
4070    /// key without issuing a real S3 request.
4071    #[cfg(feature = "s3")]
4072    fn remap_path_to_storage(path: &str, version: Option<&str>) -> TransformSourcePayload {
4073        let source = TransformSourcePayload::Path {
4074            path: path.to_string(),
4075            version: version.map(|v| v.to_string()),
4076        };
4077        match source {
4078            TransformSourcePayload::Path { path, version } => TransformSourcePayload::Storage {
4079                bucket: None,
4080                key: path.trim_start_matches('/').to_string(),
4081                version,
4082            },
4083            other => other,
4084        }
4085    }
4086
4087    #[test]
4088    #[cfg(feature = "s3")]
4089    fn public_by_path_s3_remap_trims_leading_slash() {
4090        // Paths with a leading slash (the common case from signed URLs like
4091        // `path=/image.png`) must have the slash stripped so that the S3 key
4092        // is `image.png`, not `/image.png`.
4093        let source = remap_path_to_storage("/photos/hero.jpg", Some("v1"));
4094        match &source {
4095            TransformSourcePayload::Storage { key, .. } => {
4096                assert_eq!(key, "photos/hero.jpg", "leading / must be trimmed");
4097            }
4098            _ => panic!("expected Storage variant after remap"),
4099        }
4100
4101        // Without a leading slash the key must be unchanged.
4102        let source2 = remap_path_to_storage("photos/hero.jpg", Some("v1"));
4103        match &source2 {
4104            TransformSourcePayload::Storage { key, .. } => {
4105                assert_eq!(key, "photos/hero.jpg");
4106            }
4107            _ => panic!("expected Storage variant after remap"),
4108        }
4109
4110        // Both must produce the same versioned hash (same effective key).
4111        let mut cfg = make_test_config();
4112        cfg.storage_backend = super::StorageBackend::S3;
4113        cfg.s3_context = Some(std::sync::Arc::new(super::s3::S3Context::for_test(
4114            "my-bucket",
4115            None,
4116        )));
4117        assert_eq!(
4118            source.versioned_source_hash(&cfg),
4119            source2.versioned_source_hash(&cfg),
4120            "leading-slash and no-leading-slash paths must hash identically after trim",
4121        );
4122    }
4123
4124    #[test]
4125    #[cfg(feature = "s3")]
4126    fn public_by_path_s3_remap_produces_storage_variant() {
4127        // Verify the remap converts Path to Storage with bucket: None.
4128        let source = remap_path_to_storage("/image.png", None);
4129        match source {
4130            TransformSourcePayload::Storage {
4131                bucket,
4132                key,
4133                version,
4134            } => {
4135                assert!(bucket.is_none(), "bucket must be None (use default)");
4136                assert_eq!(key, "image.png");
4137                assert!(version.is_none());
4138            }
4139            _ => panic!("expected Storage variant"),
4140        }
4141    }
4142
4143    // -----------------------------------------------------------------------
4144    // GCS: health endpoint
4145    // -----------------------------------------------------------------------
4146
4147    #[test]
4148    #[cfg(feature = "gcs")]
4149    fn health_ready_gcs_returns_503_when_context_missing() {
4150        let storage = temp_dir("health-gcs-no-ctx");
4151        let mut config = ServerConfig::new(storage.clone(), None);
4152        config.storage_backend = super::StorageBackend::Gcs;
4153        config.gcs_context = None;
4154
4155        let request = super::http_parse::HttpRequest {
4156            method: "GET".to_string(),
4157            target: "/health/ready".to_string(),
4158            version: "HTTP/1.1".to_string(),
4159            headers: Vec::new(),
4160            body: Vec::new(),
4161        };
4162        let response = route_request(request, &config);
4163        let _ = std::fs::remove_dir_all(storage);
4164
4165        assert_eq!(response.status, "503 Service Unavailable");
4166        let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
4167        let checks = body["checks"].as_array().expect("checks array");
4168        assert!(
4169            checks
4170                .iter()
4171                .any(|c| c["name"] == "storageBackend" && c["status"] == "fail"),
4172            "expected gcsClient fail check in {body}",
4173        );
4174    }
4175
4176    #[test]
4177    #[cfg(feature = "gcs")]
4178    fn health_ready_gcs_includes_gcs_client_check() {
4179        let storage = temp_dir("health-gcs-ok");
4180        let mut config = ServerConfig::new(storage.clone(), None);
4181        config.storage_backend = super::StorageBackend::Gcs;
4182        config.gcs_context = Some(std::sync::Arc::new(super::gcs::GcsContext::for_test(
4183            "test-bucket",
4184            None,
4185        )));
4186
4187        let request = super::http_parse::HttpRequest {
4188            method: "GET".to_string(),
4189            target: "/health/ready".to_string(),
4190            version: "HTTP/1.1".to_string(),
4191            headers: Vec::new(),
4192            body: Vec::new(),
4193        };
4194        let response = route_request(request, &config);
4195        let _ = std::fs::remove_dir_all(storage);
4196
4197        // The gcsClient check will report "fail" because there is no real GCS
4198        // endpoint, but the important thing is that the check is present.
4199        let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
4200        let checks = body["checks"].as_array().expect("checks array");
4201        assert!(
4202            checks.iter().any(|c| c["name"] == "storageBackend"),
4203            "expected gcsClient check in {body}",
4204        );
4205    }
4206
4207    // -----------------------------------------------------------------------
4208    // GCS: public by-path remap (leading slash trimmed, Storage variant used)
4209    // -----------------------------------------------------------------------
4210
4211    #[cfg(feature = "gcs")]
4212    fn remap_path_to_storage_gcs(path: &str, version: Option<&str>) -> TransformSourcePayload {
4213        let source = TransformSourcePayload::Path {
4214            path: path.to_string(),
4215            version: version.map(|v| v.to_string()),
4216        };
4217        match source {
4218            TransformSourcePayload::Path { path, version } => TransformSourcePayload::Storage {
4219                bucket: None,
4220                key: path.trim_start_matches('/').to_string(),
4221                version,
4222            },
4223            other => other,
4224        }
4225    }
4226
4227    #[test]
4228    #[cfg(feature = "gcs")]
4229    fn public_by_path_gcs_remap_trims_leading_slash() {
4230        let source = remap_path_to_storage_gcs("/photos/hero.jpg", Some("v1"));
4231        match &source {
4232            TransformSourcePayload::Storage { key, .. } => {
4233                assert_eq!(key, "photos/hero.jpg", "leading / must be trimmed");
4234            }
4235            _ => panic!("expected Storage variant after remap"),
4236        }
4237
4238        let source2 = remap_path_to_storage_gcs("photos/hero.jpg", Some("v1"));
4239        match &source2 {
4240            TransformSourcePayload::Storage { key, .. } => {
4241                assert_eq!(key, "photos/hero.jpg");
4242            }
4243            _ => panic!("expected Storage variant after remap"),
4244        }
4245
4246        let mut cfg = make_test_config();
4247        cfg.storage_backend = super::StorageBackend::Gcs;
4248        cfg.gcs_context = Some(std::sync::Arc::new(super::gcs::GcsContext::for_test(
4249            "my-bucket",
4250            None,
4251        )));
4252        assert_eq!(
4253            source.versioned_source_hash(&cfg),
4254            source2.versioned_source_hash(&cfg),
4255            "leading-slash and no-leading-slash paths must hash identically after trim",
4256        );
4257    }
4258
4259    #[test]
4260    #[cfg(feature = "gcs")]
4261    fn public_by_path_gcs_remap_produces_storage_variant() {
4262        let source = remap_path_to_storage_gcs("/image.png", None);
4263        match source {
4264            TransformSourcePayload::Storage {
4265                bucket,
4266                key,
4267                version,
4268            } => {
4269                assert!(bucket.is_none(), "bucket must be None (use default)");
4270                assert_eq!(key, "image.png");
4271                assert!(version.is_none());
4272            }
4273            _ => panic!("expected Storage variant"),
4274        }
4275    }
4276
4277    // -----------------------------------------------------------------------
4278    // Azure: health endpoint
4279    // -----------------------------------------------------------------------
4280
4281    #[test]
4282    #[cfg(feature = "azure")]
4283    fn health_ready_azure_returns_503_when_context_missing() {
4284        let storage = temp_dir("health-azure-no-ctx");
4285        let mut config = ServerConfig::new(storage.clone(), None);
4286        config.storage_backend = super::StorageBackend::Azure;
4287        config.azure_context = None;
4288
4289        let request = super::http_parse::HttpRequest {
4290            method: "GET".to_string(),
4291            target: "/health/ready".to_string(),
4292            version: "HTTP/1.1".to_string(),
4293            headers: Vec::new(),
4294            body: Vec::new(),
4295        };
4296        let response = route_request(request, &config);
4297        let _ = std::fs::remove_dir_all(storage);
4298
4299        assert_eq!(response.status, "503 Service Unavailable");
4300        let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
4301        let checks = body["checks"].as_array().expect("checks array");
4302        assert!(
4303            checks
4304                .iter()
4305                .any(|c| c["name"] == "storageBackend" && c["status"] == "fail"),
4306            "expected azureClient fail check in {body}",
4307        );
4308    }
4309
4310    #[test]
4311    #[cfg(feature = "azure")]
4312    fn health_ready_azure_includes_azure_client_check() {
4313        let storage = temp_dir("health-azure-ok");
4314        let mut config = ServerConfig::new(storage.clone(), None);
4315        config.storage_backend = super::StorageBackend::Azure;
4316        config.azure_context = Some(std::sync::Arc::new(super::azure::AzureContext::for_test(
4317            "test-bucket",
4318            "http://localhost:10000/devstoreaccount1",
4319        )));
4320
4321        let request = super::http_parse::HttpRequest {
4322            method: "GET".to_string(),
4323            target: "/health/ready".to_string(),
4324            version: "HTTP/1.1".to_string(),
4325            headers: Vec::new(),
4326            body: Vec::new(),
4327        };
4328        let response = route_request(request, &config);
4329        let _ = std::fs::remove_dir_all(storage);
4330
4331        let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
4332        let checks = body["checks"].as_array().expect("checks array");
4333        assert!(
4334            checks.iter().any(|c| c["name"] == "storageBackend"),
4335            "expected azureClient check in {body}",
4336        );
4337    }
4338
4339    #[test]
4340    fn read_request_rejects_json_body_over_1mib() {
4341        let body = vec![b'x'; super::http_parse::MAX_REQUEST_BODY_BYTES + 1];
4342        let content_length = body.len();
4343        let raw = format!(
4344            "POST /images:transform HTTP/1.1\r\n\
4345             Content-Type: application/json\r\n\
4346             Content-Length: {content_length}\r\n\r\n"
4347        );
4348        let mut data = raw.into_bytes();
4349        data.extend_from_slice(&body);
4350        let result = read_request(&mut data.as_slice());
4351        assert!(result.is_err());
4352    }
4353
4354    #[test]
4355    fn read_request_accepts_multipart_body_over_1mib() {
4356        let payload_size = super::http_parse::MAX_REQUEST_BODY_BYTES + 100;
4357        let body_content = vec![b'A'; payload_size];
4358        let boundary = "test-boundary-123";
4359        let mut body = Vec::new();
4360        body.extend_from_slice(format!("--{boundary}\r\nContent-Disposition: form-data; name=\"file\"; filename=\"big.jpg\"\r\n\r\n").as_bytes());
4361        body.extend_from_slice(&body_content);
4362        body.extend_from_slice(format!("\r\n--{boundary}--\r\n").as_bytes());
4363        let content_length = body.len();
4364        let raw = format!(
4365            "POST /images HTTP/1.1\r\n\
4366             Content-Type: multipart/form-data; boundary={boundary}\r\n\
4367             Content-Length: {content_length}\r\n\r\n"
4368        );
4369        let mut data = raw.into_bytes();
4370        data.extend_from_slice(&body);
4371        let result = read_request(&mut data.as_slice());
4372        assert!(
4373            result.is_ok(),
4374            "multipart upload over 1 MiB should be accepted"
4375        );
4376    }
4377
4378    #[test]
4379    fn multipart_boundary_in_payload_does_not_split_part() {
4380        let boundary = "abc123";
4381        let fake_boundary_in_payload = format!("\r\n--{boundary}NOTREAL");
4382        let part_body = format!("before{fake_boundary_in_payload}after");
4383        let body = format!(
4384            "--{boundary}\r\n\
4385             Content-Disposition: form-data; name=\"file\"\r\n\
4386             Content-Type: application/octet-stream\r\n\r\n\
4387             {part_body}\r\n\
4388             --{boundary}--\r\n"
4389        );
4390
4391        let parts = parse_multipart_form_data(body.as_bytes(), boundary)
4392            .expect("should parse despite boundary-like string in payload");
4393        assert_eq!(parts.len(), 1, "should have exactly one part");
4394
4395        let part_data = &body.as_bytes()[parts[0].body_range.clone()];
4396        let part_text = std::str::from_utf8(part_data).unwrap();
4397        assert!(
4398            part_text.contains("NOTREAL"),
4399            "part body should contain the full fake boundary string"
4400        );
4401    }
4402
4403    #[test]
4404    fn multipart_normal_two_parts_still_works() {
4405        let boundary = "testboundary";
4406        let body = format!(
4407            "--{boundary}\r\n\
4408             Content-Disposition: form-data; name=\"field1\"\r\n\r\n\
4409             value1\r\n\
4410             --{boundary}\r\n\
4411             Content-Disposition: form-data; name=\"field2\"\r\n\r\n\
4412             value2\r\n\
4413             --{boundary}--\r\n"
4414        );
4415
4416        let parts = parse_multipart_form_data(body.as_bytes(), boundary)
4417            .expect("should parse two normal parts");
4418        assert_eq!(parts.len(), 2);
4419        assert_eq!(parts[0].name, "field1");
4420        assert_eq!(parts[1].name, "field2");
4421    }
4422
4423    #[test]
4424    #[serial]
4425    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4426    fn test_storage_timeout_default() {
4427        unsafe {
4428            std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4429        }
4430        let config = ServerConfig::from_env().unwrap();
4431        assert_eq!(config.storage_timeout_secs, 30);
4432    }
4433
4434    #[test]
4435    #[serial]
4436    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4437    fn test_storage_timeout_custom() {
4438        unsafe {
4439            std::env::set_var("TRUSS_STORAGE_TIMEOUT_SECS", "60");
4440        }
4441        let config = ServerConfig::from_env().unwrap();
4442        assert_eq!(config.storage_timeout_secs, 60);
4443        unsafe {
4444            std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4445        }
4446    }
4447
4448    #[test]
4449    #[serial]
4450    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4451    fn test_storage_timeout_min_boundary() {
4452        unsafe {
4453            std::env::set_var("TRUSS_STORAGE_TIMEOUT_SECS", "1");
4454        }
4455        let config = ServerConfig::from_env().unwrap();
4456        assert_eq!(config.storage_timeout_secs, 1);
4457        unsafe {
4458            std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4459        }
4460    }
4461
4462    #[test]
4463    #[serial]
4464    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4465    fn test_storage_timeout_max_boundary() {
4466        unsafe {
4467            std::env::set_var("TRUSS_STORAGE_TIMEOUT_SECS", "300");
4468        }
4469        let config = ServerConfig::from_env().unwrap();
4470        assert_eq!(config.storage_timeout_secs, 300);
4471        unsafe {
4472            std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4473        }
4474    }
4475
4476    #[test]
4477    #[serial]
4478    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4479    fn test_storage_timeout_empty_string_uses_default() {
4480        unsafe {
4481            std::env::set_var("TRUSS_STORAGE_TIMEOUT_SECS", "");
4482        }
4483        let config = ServerConfig::from_env().unwrap();
4484        assert_eq!(config.storage_timeout_secs, 30);
4485        unsafe {
4486            std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4487        }
4488    }
4489
4490    #[test]
4491    #[serial]
4492    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4493    fn test_storage_timeout_zero_rejected() {
4494        unsafe {
4495            std::env::set_var("TRUSS_STORAGE_TIMEOUT_SECS", "0");
4496        }
4497        let err = ServerConfig::from_env().unwrap_err();
4498        assert!(
4499            err.to_string().contains("between 1 and 300"),
4500            "error should mention valid range: {err}"
4501        );
4502        unsafe {
4503            std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4504        }
4505    }
4506
4507    #[test]
4508    #[serial]
4509    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4510    fn test_storage_timeout_over_max_rejected() {
4511        unsafe {
4512            std::env::set_var("TRUSS_STORAGE_TIMEOUT_SECS", "301");
4513        }
4514        let err = ServerConfig::from_env().unwrap_err();
4515        assert!(
4516            err.to_string().contains("between 1 and 300"),
4517            "error should mention valid range: {err}"
4518        );
4519        unsafe {
4520            std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4521        }
4522    }
4523
4524    #[test]
4525    #[serial]
4526    #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4527    fn test_storage_timeout_non_numeric_rejected() {
4528        unsafe {
4529            std::env::set_var("TRUSS_STORAGE_TIMEOUT_SECS", "abc");
4530        }
4531        let err = ServerConfig::from_env().unwrap_err();
4532        assert!(
4533            err.to_string().contains("positive integer"),
4534            "error should mention positive integer: {err}"
4535        );
4536        unsafe {
4537            std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4538        }
4539    }
4540
4541    #[test]
4542    #[serial]
4543    fn test_max_concurrent_transforms_default() {
4544        unsafe {
4545            std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
4546        }
4547        let config = ServerConfig::from_env().unwrap();
4548        assert_eq!(config.max_concurrent_transforms, 64);
4549    }
4550
4551    #[test]
4552    #[serial]
4553    fn test_max_concurrent_transforms_custom() {
4554        unsafe {
4555            std::env::set_var("TRUSS_MAX_CONCURRENT_TRANSFORMS", "128");
4556        }
4557        let config = ServerConfig::from_env().unwrap();
4558        assert_eq!(config.max_concurrent_transforms, 128);
4559        unsafe {
4560            std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
4561        }
4562    }
4563
4564    #[test]
4565    #[serial]
4566    fn test_max_concurrent_transforms_min_boundary() {
4567        unsafe {
4568            std::env::set_var("TRUSS_MAX_CONCURRENT_TRANSFORMS", "1");
4569        }
4570        let config = ServerConfig::from_env().unwrap();
4571        assert_eq!(config.max_concurrent_transforms, 1);
4572        unsafe {
4573            std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
4574        }
4575    }
4576
4577    #[test]
4578    #[serial]
4579    fn test_max_concurrent_transforms_max_boundary() {
4580        unsafe {
4581            std::env::set_var("TRUSS_MAX_CONCURRENT_TRANSFORMS", "1024");
4582        }
4583        let config = ServerConfig::from_env().unwrap();
4584        assert_eq!(config.max_concurrent_transforms, 1024);
4585        unsafe {
4586            std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
4587        }
4588    }
4589
4590    #[test]
4591    #[serial]
4592    fn test_max_concurrent_transforms_empty_uses_default() {
4593        unsafe {
4594            std::env::set_var("TRUSS_MAX_CONCURRENT_TRANSFORMS", "");
4595        }
4596        let config = ServerConfig::from_env().unwrap();
4597        assert_eq!(config.max_concurrent_transforms, 64);
4598        unsafe {
4599            std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
4600        }
4601    }
4602
4603    #[test]
4604    #[serial]
4605    fn test_max_concurrent_transforms_zero_rejected() {
4606        unsafe {
4607            std::env::set_var("TRUSS_MAX_CONCURRENT_TRANSFORMS", "0");
4608        }
4609        let err = ServerConfig::from_env().unwrap_err();
4610        assert!(
4611            err.to_string().contains("between 1 and 1024"),
4612            "error should mention valid range: {err}"
4613        );
4614        unsafe {
4615            std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
4616        }
4617    }
4618
4619    #[test]
4620    #[serial]
4621    fn test_max_concurrent_transforms_over_max_rejected() {
4622        unsafe {
4623            std::env::set_var("TRUSS_MAX_CONCURRENT_TRANSFORMS", "1025");
4624        }
4625        let err = ServerConfig::from_env().unwrap_err();
4626        assert!(
4627            err.to_string().contains("between 1 and 1024"),
4628            "error should mention valid range: {err}"
4629        );
4630        unsafe {
4631            std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
4632        }
4633    }
4634
4635    #[test]
4636    #[serial]
4637    fn test_max_concurrent_transforms_non_numeric_rejected() {
4638        unsafe {
4639            std::env::set_var("TRUSS_MAX_CONCURRENT_TRANSFORMS", "abc");
4640        }
4641        let err = ServerConfig::from_env().unwrap_err();
4642        assert!(
4643            err.to_string().contains("positive integer"),
4644            "error should mention positive integer: {err}"
4645        );
4646        unsafe {
4647            std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
4648        }
4649    }
4650
4651    #[test]
4652    #[serial]
4653    fn test_transform_deadline_default() {
4654        unsafe {
4655            std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
4656        }
4657        let config = ServerConfig::from_env().unwrap();
4658        assert_eq!(config.transform_deadline_secs, 30);
4659    }
4660
4661    #[test]
4662    #[serial]
4663    fn test_transform_deadline_custom() {
4664        unsafe {
4665            std::env::set_var("TRUSS_TRANSFORM_DEADLINE_SECS", "60");
4666        }
4667        let config = ServerConfig::from_env().unwrap();
4668        assert_eq!(config.transform_deadline_secs, 60);
4669        unsafe {
4670            std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
4671        }
4672    }
4673
4674    #[test]
4675    #[serial]
4676    fn test_transform_deadline_min_boundary() {
4677        unsafe {
4678            std::env::set_var("TRUSS_TRANSFORM_DEADLINE_SECS", "1");
4679        }
4680        let config = ServerConfig::from_env().unwrap();
4681        assert_eq!(config.transform_deadline_secs, 1);
4682        unsafe {
4683            std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
4684        }
4685    }
4686
4687    #[test]
4688    #[serial]
4689    fn test_transform_deadline_max_boundary() {
4690        unsafe {
4691            std::env::set_var("TRUSS_TRANSFORM_DEADLINE_SECS", "300");
4692        }
4693        let config = ServerConfig::from_env().unwrap();
4694        assert_eq!(config.transform_deadline_secs, 300);
4695        unsafe {
4696            std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
4697        }
4698    }
4699
4700    #[test]
4701    #[serial]
4702    fn test_transform_deadline_empty_uses_default() {
4703        unsafe {
4704            std::env::set_var("TRUSS_TRANSFORM_DEADLINE_SECS", "");
4705        }
4706        let config = ServerConfig::from_env().unwrap();
4707        assert_eq!(config.transform_deadline_secs, 30);
4708        unsafe {
4709            std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
4710        }
4711    }
4712
4713    #[test]
4714    #[serial]
4715    fn test_transform_deadline_zero_rejected() {
4716        unsafe {
4717            std::env::set_var("TRUSS_TRANSFORM_DEADLINE_SECS", "0");
4718        }
4719        let err = ServerConfig::from_env().unwrap_err();
4720        assert!(
4721            err.to_string().contains("between 1 and 300"),
4722            "error should mention valid range: {err}"
4723        );
4724        unsafe {
4725            std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
4726        }
4727    }
4728
4729    #[test]
4730    #[serial]
4731    fn test_transform_deadline_over_max_rejected() {
4732        unsafe {
4733            std::env::set_var("TRUSS_TRANSFORM_DEADLINE_SECS", "301");
4734        }
4735        let err = ServerConfig::from_env().unwrap_err();
4736        assert!(
4737            err.to_string().contains("between 1 and 300"),
4738            "error should mention valid range: {err}"
4739        );
4740        unsafe {
4741            std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
4742        }
4743    }
4744
4745    #[test]
4746    #[serial]
4747    fn test_transform_deadline_non_numeric_rejected() {
4748        unsafe {
4749            std::env::set_var("TRUSS_TRANSFORM_DEADLINE_SECS", "abc");
4750        }
4751        let err = ServerConfig::from_env().unwrap_err();
4752        assert!(
4753            err.to_string().contains("positive integer"),
4754            "error should mention positive integer: {err}"
4755        );
4756        unsafe {
4757            std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
4758        }
4759    }
4760
4761    #[test]
4762    #[serial]
4763    #[cfg(feature = "azure")]
4764    fn test_azure_container_env_var_required() {
4765        unsafe {
4766            std::env::set_var("TRUSS_STORAGE_BACKEND", "azure");
4767            std::env::remove_var("TRUSS_AZURE_CONTAINER");
4768        }
4769        let err = ServerConfig::from_env().unwrap_err();
4770        assert!(
4771            err.to_string().contains("TRUSS_AZURE_CONTAINER"),
4772            "error should mention TRUSS_AZURE_CONTAINER: {err}"
4773        );
4774        unsafe {
4775            std::env::remove_var("TRUSS_STORAGE_BACKEND");
4776        }
4777    }
4778
4779    #[test]
4780    fn server_config_debug_redacts_bearer_token_and_signed_url_secret() {
4781        let mut config = ServerConfig::new(
4782            temp_dir("debug-redact"),
4783            Some("super-secret-token-12345".to_string()),
4784        );
4785        config.signed_url_key_id = Some("visible-key-id".to_string());
4786        config.signed_url_secret = Some("super-secret-hmac-key".to_string());
4787        let debug = format!("{config:?}");
4788        assert!(
4789            !debug.contains("super-secret-token-12345"),
4790            "bearer_token leaked in Debug output: {debug}"
4791        );
4792        assert!(
4793            !debug.contains("super-secret-hmac-key"),
4794            "signed_url_secret leaked in Debug output: {debug}"
4795        );
4796        assert!(
4797            debug.contains("[REDACTED]"),
4798            "expected [REDACTED] in Debug output: {debug}"
4799        );
4800        assert!(
4801            debug.contains("visible-key-id"),
4802            "signed_url_key_id should be visible: {debug}"
4803        );
4804    }
4805
4806    #[test]
4807    fn authorize_headers_accepts_correct_bearer_token() {
4808        let config = ServerConfig::new(temp_dir("auth-ok"), Some("correct-token".to_string()));
4809        let headers = vec![(
4810            "authorization".to_string(),
4811            "Bearer correct-token".to_string(),
4812        )];
4813        assert!(super::authorize_request_headers(&headers, &config).is_ok());
4814    }
4815
4816    #[test]
4817    fn authorize_headers_rejects_wrong_bearer_token() {
4818        let config = ServerConfig::new(temp_dir("auth-wrong"), Some("correct-token".to_string()));
4819        let headers = vec![(
4820            "authorization".to_string(),
4821            "Bearer wrong-token".to_string(),
4822        )];
4823        let err = super::authorize_request_headers(&headers, &config).unwrap_err();
4824        assert_eq!(err.status, "401 Unauthorized");
4825    }
4826
4827    #[test]
4828    fn authorize_headers_rejects_missing_header() {
4829        let config = ServerConfig::new(temp_dir("auth-missing"), Some("correct-token".to_string()));
4830        let headers: Vec<(String, String)> = vec![];
4831        let err = super::authorize_request_headers(&headers, &config).unwrap_err();
4832        assert_eq!(err.status, "401 Unauthorized");
4833    }
4834
4835    // ── TransformSlot RAII guard ──────────────────────────────────────
4836
4837    #[test]
4838    fn transform_slot_acquire_succeeds_under_limit() {
4839        use std::sync::Arc;
4840        use std::sync::atomic::AtomicU64;
4841
4842        let counter = Arc::new(AtomicU64::new(0));
4843        let slot = super::TransformSlot::try_acquire(&counter, 2);
4844        assert!(slot.is_some());
4845        assert_eq!(counter.load(Ordering::Relaxed), 1);
4846    }
4847
4848    #[test]
4849    fn transform_slot_acquire_returns_none_at_limit() {
4850        use std::sync::Arc;
4851        use std::sync::atomic::AtomicU64;
4852
4853        let counter = Arc::new(AtomicU64::new(0));
4854        let _s1 = super::TransformSlot::try_acquire(&counter, 1).unwrap();
4855        let s2 = super::TransformSlot::try_acquire(&counter, 1);
4856        assert!(s2.is_none());
4857        // Counter must still be 1 (failed acquire must not leak).
4858        assert_eq!(counter.load(Ordering::Relaxed), 1);
4859    }
4860
4861    #[test]
4862    fn transform_slot_drop_decrements_counter() {
4863        use std::sync::Arc;
4864        use std::sync::atomic::AtomicU64;
4865
4866        let counter = Arc::new(AtomicU64::new(0));
4867        {
4868            let _slot = super::TransformSlot::try_acquire(&counter, 4).unwrap();
4869            assert_eq!(counter.load(Ordering::Relaxed), 1);
4870        }
4871        // After drop the counter must return to zero.
4872        assert_eq!(counter.load(Ordering::Relaxed), 0);
4873    }
4874
4875    #[test]
4876    fn transform_slot_multiple_acquires_up_to_limit() {
4877        use std::sync::Arc;
4878        use std::sync::atomic::AtomicU64;
4879
4880        let counter = Arc::new(AtomicU64::new(0));
4881        let limit = 3u64;
4882        let mut slots = Vec::new();
4883        for _ in 0..limit {
4884            slots.push(super::TransformSlot::try_acquire(&counter, limit).unwrap());
4885        }
4886        assert_eq!(counter.load(Ordering::Relaxed), limit);
4887        // One more must fail.
4888        assert!(super::TransformSlot::try_acquire(&counter, limit).is_none());
4889        assert_eq!(counter.load(Ordering::Relaxed), limit);
4890        // Drop all slots.
4891        slots.clear();
4892        assert_eq!(counter.load(Ordering::Relaxed), 0);
4893    }
4894
4895    // ── Access log via emit_access_log ────────────────────────────────
4896
4897    #[test]
4898    fn emit_access_log_produces_json_with_expected_fields() {
4899        use std::sync::{Arc, Mutex};
4900        use std::time::Instant;
4901
4902        let captured = Arc::new(Mutex::new(String::new()));
4903        let captured_clone = Arc::clone(&captured);
4904        let handler: super::LogHandler =
4905            Arc::new(move |msg: &str| *captured_clone.lock().unwrap() = msg.to_owned());
4906
4907        let mut config = ServerConfig::new(temp_dir("access-log"), None);
4908        config.log_handler = Some(handler);
4909
4910        let start = Instant::now();
4911        super::emit_access_log(
4912            &config,
4913            &super::AccessLogEntry {
4914                request_id: "req-123",
4915                method: "GET",
4916                path: "/image.png",
4917                route: "transform",
4918                status: "200",
4919                start,
4920                cache_status: Some("hit"),
4921            },
4922        );
4923
4924        let output = captured.lock().unwrap().clone();
4925        let parsed: serde_json::Value = serde_json::from_str(&output).expect("valid JSON");
4926        assert_eq!(parsed["kind"], "access_log");
4927        assert_eq!(parsed["request_id"], "req-123");
4928        assert_eq!(parsed["method"], "GET");
4929        assert_eq!(parsed["path"], "/image.png");
4930        assert_eq!(parsed["route"], "transform");
4931        assert_eq!(parsed["status"], "200");
4932        assert_eq!(parsed["cache_status"], "hit");
4933        assert!(parsed["latency_ms"].is_u64());
4934    }
4935
4936    #[test]
4937    fn emit_access_log_null_cache_status_when_none() {
4938        use std::sync::{Arc, Mutex};
4939        use std::time::Instant;
4940
4941        let captured = Arc::new(Mutex::new(String::new()));
4942        let captured_clone = Arc::clone(&captured);
4943        let handler: super::LogHandler =
4944            Arc::new(move |msg: &str| *captured_clone.lock().unwrap() = msg.to_owned());
4945
4946        let mut config = ServerConfig::new(temp_dir("access-log-none"), None);
4947        config.log_handler = Some(handler);
4948
4949        super::emit_access_log(
4950            &config,
4951            &super::AccessLogEntry {
4952                request_id: "req-456",
4953                method: "POST",
4954                path: "/upload",
4955                route: "upload",
4956                status: "201",
4957                start: Instant::now(),
4958                cache_status: None,
4959            },
4960        );
4961
4962        let output = captured.lock().unwrap().clone();
4963        let parsed: serde_json::Value = serde_json::from_str(&output).expect("valid JSON");
4964        assert!(parsed["cache_status"].is_null());
4965    }
4966
4967    // ── X-Request-Id header ───────────────────────────────────────────
4968
4969    #[test]
4970    fn x_request_id_is_extracted_from_incoming_headers() {
4971        let headers = vec![
4972            ("host".to_string(), "localhost".to_string()),
4973            ("x-request-id".to_string(), "custom-id-abc".to_string()),
4974        ];
4975        assert_eq!(
4976            super::extract_request_id(&headers),
4977            Some("custom-id-abc".to_string())
4978        );
4979    }
4980
4981    #[test]
4982    fn x_request_id_not_extracted_when_empty() {
4983        let headers = vec![("x-request-id".to_string(), "".to_string())];
4984        assert!(super::extract_request_id(&headers).is_none());
4985    }
4986
4987    #[test]
4988    fn x_request_id_not_extracted_when_absent() {
4989        let headers = vec![("host".to_string(), "localhost".to_string())];
4990        assert!(super::extract_request_id(&headers).is_none());
4991    }
4992
4993    // ── Cache status extraction ───────────────────────────────────────
4994
4995    #[test]
4996    fn cache_status_hit_detected() {
4997        let headers: Vec<(&str, String)> = vec![("Cache-Status", "\"truss\"; hit".to_string())];
4998        assert_eq!(super::extract_cache_status(&headers), Some("hit"));
4999    }
5000
5001    #[test]
5002    fn cache_status_miss_detected() {
5003        let headers: Vec<(&str, String)> =
5004            vec![("Cache-Status", "\"truss\"; fwd=miss".to_string())];
5005        assert_eq!(super::extract_cache_status(&headers), Some("miss"));
5006    }
5007
5008    #[test]
5009    fn cache_status_none_when_header_absent() {
5010        let headers: Vec<(&str, String)> = vec![("Content-Type", "image/png".to_string())];
5011        assert!(super::extract_cache_status(&headers).is_none());
5012    }
5013}