1mod auth;
2#[cfg(feature = "azure")]
3pub mod azure;
4mod cache;
5#[cfg(feature = "gcs")]
6pub mod gcs;
7mod http_parse;
8mod metrics;
9mod multipart;
10mod negotiate;
11mod remote;
12mod response;
13#[cfg(feature = "s3")]
14pub mod s3;
15
16use auth::{
17 authorize_request, authorize_request_headers, authorize_signed_request,
18 canonical_query_without_signature, extend_transform_query, parse_optional_bool_query,
19 parse_optional_float_query, parse_optional_integer_query, parse_optional_u8_query,
20 parse_query_params, required_query_param, signed_source_query, url_authority,
21 validate_public_query_names,
22};
23use cache::{CacheLookup, TransformCache, compute_cache_key, try_versioned_cache_lookup};
24use http_parse::{
25 HttpRequest, parse_named, parse_optional_named, read_request_body, read_request_headers,
26 request_has_json_content_type,
27};
28use metrics::{
29 CACHE_HITS_TOTAL, CACHE_MISSES_TOTAL, DEFAULT_MAX_CONCURRENT_TRANSFORMS, RouteMetric,
30 record_http_metrics, render_metrics_text, status_code, uptime_seconds,
31};
32use multipart::{parse_multipart_boundary, parse_upload_request};
33use negotiate::{
34 CacheHitStatus, ImageResponsePolicy, PublicSourceKind, build_image_etag,
35 build_image_response_headers, if_none_match_matches, negotiate_output_format,
36};
37use remote::resolve_source_bytes;
38use response::{
39 HttpResponse, NOT_FOUND_BODY, bad_request_response, service_unavailable_response,
40 transform_error_response, unsupported_media_type_response, write_response,
41};
42
43use crate::{
44 Fit, MediaType, Position, RawArtifact, Rgba8, Rotation, TransformOptions, TransformRequest,
45 sniff_artifact, transform_raster, transform_svg,
46};
47use hmac::{Hmac, Mac};
48use serde::Deserialize;
49use serde_json::json;
50use sha2::{Digest, Sha256};
51use std::collections::BTreeMap;
52use std::env;
53use std::fmt;
54use std::io;
55use std::net::{TcpListener, TcpStream};
56use std::path::PathBuf;
57use std::str::FromStr;
58use std::sync::Arc;
59use std::sync::atomic::{AtomicU64, Ordering};
60use std::time::{Duration, Instant};
61use url::Url;
62use uuid::Uuid;
63
64fn stderr_write(msg: &str) {
68 use std::io::Write;
69 use std::os::fd::FromRawFd;
70
71 let mut f = unsafe { std::fs::File::from_raw_fd(2) };
73
74 let bytes = msg.as_bytes();
76 let mut buf = Vec::with_capacity(bytes.len() + 1);
77 buf.extend_from_slice(bytes);
78 buf.push(b'\n');
79 let _ = f.write_all(&buf);
80
81 std::mem::forget(f);
83}
84
85#[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
88#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89pub enum StorageBackend {
90 Filesystem,
92 #[cfg(feature = "s3")]
94 S3,
95 #[cfg(feature = "gcs")]
97 Gcs,
98 #[cfg(feature = "azure")]
100 Azure,
101}
102
103#[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
104impl StorageBackend {
105 pub fn parse(value: &str) -> Result<Self, String> {
107 match value.to_ascii_lowercase().as_str() {
108 "filesystem" | "fs" | "local" => Ok(Self::Filesystem),
109 #[cfg(feature = "s3")]
110 "s3" => Ok(Self::S3),
111 #[cfg(feature = "gcs")]
112 "gcs" => Ok(Self::Gcs),
113 #[cfg(feature = "azure")]
114 "azure" => Ok(Self::Azure),
115 _ => {
116 let mut expected = vec!["filesystem"];
117 #[cfg(feature = "s3")]
118 expected.push("s3");
119 #[cfg(feature = "gcs")]
120 expected.push("gcs");
121 #[cfg(feature = "azure")]
122 expected.push("azure");
123
124 #[allow(unused_mut)]
125 let mut hint = String::new();
126 #[cfg(not(feature = "s3"))]
127 if value.eq_ignore_ascii_case("s3") {
128 hint = " (hint: rebuild with --features s3)".to_string();
129 }
130 #[cfg(not(feature = "gcs"))]
131 if value.eq_ignore_ascii_case("gcs") {
132 hint = " (hint: rebuild with --features gcs)".to_string();
133 }
134 #[cfg(not(feature = "azure"))]
135 if value.eq_ignore_ascii_case("azure") {
136 hint = " (hint: rebuild with --features azure)".to_string();
137 }
138
139 Err(format!(
140 "unknown storage backend `{value}` (expected {}){hint}",
141 expected.join(" or ")
142 ))
143 }
144 }
145 }
146}
147
148pub const DEFAULT_BIND_ADDR: &str = "127.0.0.1:8080";
150
151pub const DEFAULT_STORAGE_ROOT: &str = ".";
153
154const DEFAULT_PUBLIC_MAX_AGE_SECONDS: u32 = 3600;
155const DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS: u32 = 60;
156const SOCKET_READ_TIMEOUT: Duration = Duration::from_secs(60);
157const SOCKET_WRITE_TIMEOUT: Duration = Duration::from_secs(60);
158const WORKER_THREADS: usize = 8;
160type HmacSha256 = Hmac<Sha256>;
161
162const KEEP_ALIVE_MAX_REQUESTS: usize = 100;
166
167const DEFAULT_TRANSFORM_DEADLINE_SECS: u64 = 30;
170
171#[derive(Clone, Copy)]
172struct PublicCacheControl {
173 max_age: u32,
174 stale_while_revalidate: u32,
175}
176
177#[derive(Clone, Copy)]
178struct ImageResponseConfig {
179 disable_accept_negotiation: bool,
180 public_cache_control: PublicCacheControl,
181 transform_deadline: Duration,
182}
183
184struct TransformSlot {
190 counter: Arc<AtomicU64>,
191}
192
193impl TransformSlot {
194 fn try_acquire(counter: &Arc<AtomicU64>, limit: u64) -> Option<Self> {
195 let prev = counter.fetch_add(1, Ordering::Relaxed);
196 if prev >= limit {
197 counter.fetch_sub(1, Ordering::Relaxed);
198 None
199 } else {
200 Some(Self {
201 counter: Arc::clone(counter),
202 })
203 }
204 }
205}
206
207impl Drop for TransformSlot {
208 fn drop(&mut self) {
209 self.counter.fetch_sub(1, Ordering::Relaxed);
210 }
211}
212
213pub type LogHandler = Arc<dyn Fn(&str) + Send + Sync>;
224
225pub struct ServerConfig {
226 pub storage_root: PathBuf,
228 pub bearer_token: Option<String>,
230 pub public_base_url: Option<String>,
237 pub signed_url_key_id: Option<String>,
239 pub signed_url_secret: Option<String>,
241 pub allow_insecure_url_sources: bool,
247 pub cache_root: Option<PathBuf>,
254 pub public_max_age_seconds: u32,
259 pub public_stale_while_revalidate_seconds: u32,
264 pub disable_accept_negotiation: bool,
272 pub log_handler: Option<LogHandler>,
278 pub max_concurrent_transforms: u64,
282 pub transform_deadline_secs: u64,
286 pub transforms_in_flight: Arc<AtomicU64>,
292 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
296 pub storage_timeout_secs: u64,
297 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
299 pub storage_backend: StorageBackend,
300 #[cfg(feature = "s3")]
302 pub s3_context: Option<Arc<s3::S3Context>>,
303 #[cfg(feature = "gcs")]
305 pub gcs_context: Option<Arc<gcs::GcsContext>>,
306 #[cfg(feature = "azure")]
308 pub azure_context: Option<Arc<azure::AzureContext>>,
309}
310
311impl Clone for ServerConfig {
312 fn clone(&self) -> Self {
313 Self {
314 storage_root: self.storage_root.clone(),
315 bearer_token: self.bearer_token.clone(),
316 public_base_url: self.public_base_url.clone(),
317 signed_url_key_id: self.signed_url_key_id.clone(),
318 signed_url_secret: self.signed_url_secret.clone(),
319 allow_insecure_url_sources: self.allow_insecure_url_sources,
320 cache_root: self.cache_root.clone(),
321 public_max_age_seconds: self.public_max_age_seconds,
322 public_stale_while_revalidate_seconds: self.public_stale_while_revalidate_seconds,
323 disable_accept_negotiation: self.disable_accept_negotiation,
324 log_handler: self.log_handler.clone(),
325 max_concurrent_transforms: self.max_concurrent_transforms,
326 transform_deadline_secs: self.transform_deadline_secs,
327 transforms_in_flight: Arc::clone(&self.transforms_in_flight),
328 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
329 storage_timeout_secs: self.storage_timeout_secs,
330 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
331 storage_backend: self.storage_backend,
332 #[cfg(feature = "s3")]
333 s3_context: self.s3_context.clone(),
334 #[cfg(feature = "gcs")]
335 gcs_context: self.gcs_context.clone(),
336 #[cfg(feature = "azure")]
337 azure_context: self.azure_context.clone(),
338 }
339 }
340}
341
342impl fmt::Debug for ServerConfig {
343 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
344 let mut d = f.debug_struct("ServerConfig");
345 d.field("storage_root", &self.storage_root)
346 .field(
347 "bearer_token",
348 &self.bearer_token.as_ref().map(|_| "[REDACTED]"),
349 )
350 .field("public_base_url", &self.public_base_url)
351 .field("signed_url_key_id", &self.signed_url_key_id)
352 .field(
353 "signed_url_secret",
354 &self.signed_url_secret.as_ref().map(|_| "[REDACTED]"),
355 )
356 .field(
357 "allow_insecure_url_sources",
358 &self.allow_insecure_url_sources,
359 )
360 .field("cache_root", &self.cache_root)
361 .field("public_max_age_seconds", &self.public_max_age_seconds)
362 .field(
363 "public_stale_while_revalidate_seconds",
364 &self.public_stale_while_revalidate_seconds,
365 )
366 .field(
367 "disable_accept_negotiation",
368 &self.disable_accept_negotiation,
369 )
370 .field("log_handler", &self.log_handler.as_ref().map(|_| ".."))
371 .field("max_concurrent_transforms", &self.max_concurrent_transforms)
372 .field("transform_deadline_secs", &self.transform_deadline_secs);
373 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
374 {
375 d.field("storage_backend", &self.storage_backend);
376 }
377 #[cfg(feature = "s3")]
378 {
379 d.field("s3_context", &self.s3_context.as_ref().map(|_| ".."));
380 }
381 #[cfg(feature = "gcs")]
382 {
383 d.field("gcs_context", &self.gcs_context.as_ref().map(|_| ".."));
384 }
385 #[cfg(feature = "azure")]
386 {
387 d.field("azure_context", &self.azure_context.as_ref().map(|_| ".."));
388 }
389 d.finish()
390 }
391}
392
393impl PartialEq for ServerConfig {
394 fn eq(&self, other: &Self) -> bool {
395 self.storage_root == other.storage_root
396 && self.bearer_token == other.bearer_token
397 && self.public_base_url == other.public_base_url
398 && self.signed_url_key_id == other.signed_url_key_id
399 && self.signed_url_secret == other.signed_url_secret
400 && self.allow_insecure_url_sources == other.allow_insecure_url_sources
401 && self.cache_root == other.cache_root
402 && self.public_max_age_seconds == other.public_max_age_seconds
403 && self.public_stale_while_revalidate_seconds
404 == other.public_stale_while_revalidate_seconds
405 && self.disable_accept_negotiation == other.disable_accept_negotiation
406 && self.max_concurrent_transforms == other.max_concurrent_transforms
407 && self.transform_deadline_secs == other.transform_deadline_secs
408 && cfg_storage_eq(self, other)
409 }
410}
411
412fn cfg_storage_eq(_this: &ServerConfig, _other: &ServerConfig) -> bool {
413 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
414 {
415 if _this.storage_backend != _other.storage_backend {
416 return false;
417 }
418 }
419 #[cfg(feature = "s3")]
420 {
421 if _this
422 .s3_context
423 .as_ref()
424 .map(|c| (&c.default_bucket, &c.endpoint_url))
425 != _other
426 .s3_context
427 .as_ref()
428 .map(|c| (&c.default_bucket, &c.endpoint_url))
429 {
430 return false;
431 }
432 }
433 #[cfg(feature = "gcs")]
434 {
435 if _this
436 .gcs_context
437 .as_ref()
438 .map(|c| (&c.default_bucket, &c.endpoint_url))
439 != _other
440 .gcs_context
441 .as_ref()
442 .map(|c| (&c.default_bucket, &c.endpoint_url))
443 {
444 return false;
445 }
446 }
447 #[cfg(feature = "azure")]
448 {
449 if _this
450 .azure_context
451 .as_ref()
452 .map(|c| (&c.default_container, &c.endpoint_url))
453 != _other
454 .azure_context
455 .as_ref()
456 .map(|c| (&c.default_container, &c.endpoint_url))
457 {
458 return false;
459 }
460 }
461 true
462}
463
464impl Eq for ServerConfig {}
465
466impl ServerConfig {
467 pub fn new(storage_root: PathBuf, bearer_token: Option<String>) -> Self {
482 Self {
483 storage_root,
484 bearer_token,
485 public_base_url: None,
486 signed_url_key_id: None,
487 signed_url_secret: None,
488 allow_insecure_url_sources: false,
489 cache_root: None,
490 public_max_age_seconds: DEFAULT_PUBLIC_MAX_AGE_SECONDS,
491 public_stale_while_revalidate_seconds: DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS,
492 disable_accept_negotiation: false,
493 log_handler: None,
494 max_concurrent_transforms: DEFAULT_MAX_CONCURRENT_TRANSFORMS,
495 transform_deadline_secs: DEFAULT_TRANSFORM_DEADLINE_SECS,
496 transforms_in_flight: Arc::new(AtomicU64::new(0)),
497 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
498 storage_timeout_secs: remote::STORAGE_DOWNLOAD_TIMEOUT_SECS,
499 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
500 storage_backend: StorageBackend::Filesystem,
501 #[cfg(feature = "s3")]
502 s3_context: None,
503 #[cfg(feature = "gcs")]
504 gcs_context: None,
505 #[cfg(feature = "azure")]
506 azure_context: None,
507 }
508 }
509
510 fn log(&self, msg: &str) {
513 if let Some(handler) = &self.log_handler {
514 handler(msg);
515 } else {
516 stderr_write(msg);
517 }
518 }
519
520 pub fn with_signed_url_credentials(
538 mut self,
539 key_id: impl Into<String>,
540 secret: impl Into<String>,
541 ) -> Self {
542 self.signed_url_key_id = Some(key_id.into());
543 self.signed_url_secret = Some(secret.into());
544 self
545 }
546
547 pub fn with_insecure_url_sources(mut self, allow_insecure_url_sources: bool) -> Self {
564 self.allow_insecure_url_sources = allow_insecure_url_sources;
565 self
566 }
567
568 pub fn with_cache_root(mut self, cache_root: impl Into<PathBuf>) -> Self {
584 self.cache_root = Some(cache_root.into());
585 self
586 }
587
588 #[cfg(feature = "s3")]
590 pub fn with_s3_context(mut self, context: s3::S3Context) -> Self {
591 self.storage_backend = StorageBackend::S3;
592 self.s3_context = Some(Arc::new(context));
593 self
594 }
595
596 #[cfg(feature = "gcs")]
598 pub fn with_gcs_context(mut self, context: gcs::GcsContext) -> Self {
599 self.storage_backend = StorageBackend::Gcs;
600 self.gcs_context = Some(Arc::new(context));
601 self
602 }
603
604 #[cfg(feature = "azure")]
606 pub fn with_azure_context(mut self, context: azure::AzureContext) -> Self {
607 self.storage_backend = StorageBackend::Azure;
608 self.azure_context = Some(Arc::new(context));
609 self
610 }
611
612 pub fn from_env() -> io::Result<Self> {
687 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
688 let storage_backend = match env::var("TRUSS_STORAGE_BACKEND")
689 .ok()
690 .filter(|v| !v.is_empty())
691 {
692 Some(value) => StorageBackend::parse(&value)
693 .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?,
694 None => StorageBackend::Filesystem,
695 };
696
697 let storage_root =
698 env::var("TRUSS_STORAGE_ROOT").unwrap_or_else(|_| DEFAULT_STORAGE_ROOT.to_string());
699 let storage_root = PathBuf::from(storage_root).canonicalize()?;
700 let bearer_token = env::var("TRUSS_BEARER_TOKEN")
701 .ok()
702 .filter(|value| !value.is_empty());
703 let public_base_url = env::var("TRUSS_PUBLIC_BASE_URL")
704 .ok()
705 .filter(|value| !value.is_empty())
706 .map(validate_public_base_url)
707 .transpose()?;
708 let signed_url_key_id = env::var("TRUSS_SIGNED_URL_KEY_ID")
709 .ok()
710 .filter(|value| !value.is_empty());
711 let signed_url_secret = env::var("TRUSS_SIGNED_URL_SECRET")
712 .ok()
713 .filter(|value| !value.is_empty());
714
715 if signed_url_key_id.is_some() != signed_url_secret.is_some() {
716 return Err(io::Error::new(
717 io::ErrorKind::InvalidInput,
718 "TRUSS_SIGNED_URL_KEY_ID and TRUSS_SIGNED_URL_SECRET must be set together",
719 ));
720 }
721
722 if signed_url_key_id.is_some() && public_base_url.is_none() {
723 eprintln!(
724 "truss: warning: TRUSS_SIGNED_URL_KEY_ID is set but TRUSS_PUBLIC_BASE_URL is not. \
725 Behind a reverse proxy or CDN the Host header may differ from the externally \
726 visible authority, causing signed URL verification to fail. Consider setting \
727 TRUSS_PUBLIC_BASE_URL to the canonical external origin."
728 );
729 }
730
731 let cache_root = env::var("TRUSS_CACHE_ROOT")
732 .ok()
733 .filter(|value| !value.is_empty())
734 .map(PathBuf::from);
735
736 let public_max_age_seconds = parse_optional_env_u32("TRUSS_PUBLIC_MAX_AGE")?
737 .unwrap_or(DEFAULT_PUBLIC_MAX_AGE_SECONDS);
738 let public_stale_while_revalidate_seconds =
739 parse_optional_env_u32("TRUSS_PUBLIC_STALE_WHILE_REVALIDATE")?
740 .unwrap_or(DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS);
741
742 let allow_insecure_url_sources = env_flag("TRUSS_ALLOW_INSECURE_URL_SOURCES");
743
744 let max_concurrent_transforms = match env::var("TRUSS_MAX_CONCURRENT_TRANSFORMS")
745 .ok()
746 .filter(|v| !v.is_empty())
747 {
748 Some(value) => {
749 let n: u64 = value.parse().map_err(|_| {
750 io::Error::new(
751 io::ErrorKind::InvalidInput,
752 "TRUSS_MAX_CONCURRENT_TRANSFORMS must be a positive integer",
753 )
754 })?;
755 if n == 0 || n > 1024 {
756 return Err(io::Error::new(
757 io::ErrorKind::InvalidInput,
758 "TRUSS_MAX_CONCURRENT_TRANSFORMS must be between 1 and 1024",
759 ));
760 }
761 n
762 }
763 None => DEFAULT_MAX_CONCURRENT_TRANSFORMS,
764 };
765
766 let transform_deadline_secs = match env::var("TRUSS_TRANSFORM_DEADLINE_SECS")
767 .ok()
768 .filter(|v| !v.is_empty())
769 {
770 Some(value) => {
771 let secs: u64 = value.parse().map_err(|_| {
772 io::Error::new(
773 io::ErrorKind::InvalidInput,
774 "TRUSS_TRANSFORM_DEADLINE_SECS must be a positive integer",
775 )
776 })?;
777 if secs == 0 || secs > 300 {
778 return Err(io::Error::new(
779 io::ErrorKind::InvalidInput,
780 "TRUSS_TRANSFORM_DEADLINE_SECS must be between 1 and 300",
781 ));
782 }
783 secs
784 }
785 None => DEFAULT_TRANSFORM_DEADLINE_SECS,
786 };
787
788 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
789 let storage_timeout_secs = match env::var("TRUSS_STORAGE_TIMEOUT_SECS")
790 .ok()
791 .filter(|v| !v.is_empty())
792 {
793 Some(value) => {
794 let secs: u64 = value.parse().map_err(|_| {
795 io::Error::new(
796 io::ErrorKind::InvalidInput,
797 "TRUSS_STORAGE_TIMEOUT_SECS must be a positive integer",
798 )
799 })?;
800 if secs == 0 || secs > 300 {
801 return Err(io::Error::new(
802 io::ErrorKind::InvalidInput,
803 "TRUSS_STORAGE_TIMEOUT_SECS must be between 1 and 300",
804 ));
805 }
806 secs
807 }
808 None => remote::STORAGE_DOWNLOAD_TIMEOUT_SECS,
809 };
810
811 #[cfg(feature = "s3")]
812 let s3_context = if storage_backend == StorageBackend::S3 {
813 let bucket = env::var("TRUSS_S3_BUCKET")
814 .ok()
815 .filter(|v| !v.is_empty())
816 .ok_or_else(|| {
817 io::Error::new(
818 io::ErrorKind::InvalidInput,
819 "TRUSS_S3_BUCKET is required when TRUSS_STORAGE_BACKEND=s3",
820 )
821 })?;
822 Some(Arc::new(s3::build_s3_context(
823 bucket,
824 allow_insecure_url_sources,
825 )?))
826 } else {
827 None
828 };
829
830 #[cfg(feature = "gcs")]
831 let gcs_context = if storage_backend == StorageBackend::Gcs {
832 let bucket = env::var("TRUSS_GCS_BUCKET")
833 .ok()
834 .filter(|v| !v.is_empty())
835 .ok_or_else(|| {
836 io::Error::new(
837 io::ErrorKind::InvalidInput,
838 "TRUSS_GCS_BUCKET is required when TRUSS_STORAGE_BACKEND=gcs",
839 )
840 })?;
841 Some(Arc::new(gcs::build_gcs_context(
842 bucket,
843 allow_insecure_url_sources,
844 )?))
845 } else {
846 if env::var("TRUSS_GCS_BUCKET")
847 .ok()
848 .filter(|v| !v.is_empty())
849 .is_some()
850 {
851 eprintln!(
852 "truss: warning: TRUSS_GCS_BUCKET is set but TRUSS_STORAGE_BACKEND is not \
853 `gcs`. The GCS bucket will be ignored. Set TRUSS_STORAGE_BACKEND=gcs to \
854 enable the GCS backend."
855 );
856 }
857 None
858 };
859
860 #[cfg(feature = "azure")]
861 let azure_context = if storage_backend == StorageBackend::Azure {
862 let container = env::var("TRUSS_AZURE_CONTAINER")
863 .ok()
864 .filter(|v| !v.is_empty())
865 .ok_or_else(|| {
866 io::Error::new(
867 io::ErrorKind::InvalidInput,
868 "TRUSS_AZURE_CONTAINER is required when TRUSS_STORAGE_BACKEND=azure",
869 )
870 })?;
871 Some(Arc::new(azure::build_azure_context(
872 container,
873 allow_insecure_url_sources,
874 )?))
875 } else {
876 if env::var("TRUSS_AZURE_CONTAINER")
877 .ok()
878 .filter(|v| !v.is_empty())
879 .is_some()
880 {
881 eprintln!(
882 "truss: warning: TRUSS_AZURE_CONTAINER is set but TRUSS_STORAGE_BACKEND is not \
883 `azure`. The Azure container will be ignored. Set TRUSS_STORAGE_BACKEND=azure to \
884 enable the Azure backend."
885 );
886 }
887 None
888 };
889
890 Ok(Self {
891 storage_root,
892 bearer_token,
893 public_base_url,
894 signed_url_key_id,
895 signed_url_secret,
896 allow_insecure_url_sources,
897 cache_root,
898 public_max_age_seconds,
899 public_stale_while_revalidate_seconds,
900 disable_accept_negotiation: env_flag("TRUSS_DISABLE_ACCEPT_NEGOTIATION"),
901 log_handler: None,
902 max_concurrent_transforms,
903 transform_deadline_secs,
904 transforms_in_flight: Arc::new(AtomicU64::new(0)),
905 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
906 storage_timeout_secs,
907 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
908 storage_backend,
909 #[cfg(feature = "s3")]
910 s3_context,
911 #[cfg(feature = "gcs")]
912 gcs_context,
913 #[cfg(feature = "azure")]
914 azure_context,
915 })
916 }
917}
918
919#[derive(Debug, Clone, PartialEq, Eq)]
921pub enum SignedUrlSource {
922 Path {
924 path: String,
926 version: Option<String>,
928 },
929 Url {
931 url: String,
933 version: Option<String>,
935 },
936}
937
938pub fn sign_public_url(
980 base_url: &str,
981 source: SignedUrlSource,
982 options: &TransformOptions,
983 key_id: &str,
984 secret: &str,
985 expires: u64,
986) -> Result<String, String> {
987 let base_url = Url::parse(base_url).map_err(|error| format!("base URL is invalid: {error}"))?;
988 match base_url.scheme() {
989 "http" | "https" => {}
990 _ => return Err("base URL must use the http or https scheme".to_string()),
991 }
992
993 let route_path = match source {
994 SignedUrlSource::Path { .. } => "/images/by-path",
995 SignedUrlSource::Url { .. } => "/images/by-url",
996 };
997 let mut endpoint = base_url
998 .join(route_path)
999 .map_err(|error| format!("failed to resolve the public endpoint URL: {error}"))?;
1000 let authority = url_authority(&endpoint)?;
1001 let mut query = signed_source_query(source);
1002 extend_transform_query(&mut query, options);
1003 query.insert("keyId".to_string(), key_id.to_string());
1004 query.insert("expires".to_string(), expires.to_string());
1005
1006 let canonical = format!(
1007 "GET\n{}\n{}\n{}",
1008 authority,
1009 endpoint.path(),
1010 canonical_query_without_signature(&query)
1011 );
1012 let mut mac = HmacSha256::new_from_slice(secret.as_bytes())
1013 .map_err(|error| format!("failed to initialize signed URL HMAC: {error}"))?;
1014 mac.update(canonical.as_bytes());
1015 query.insert(
1016 "signature".to_string(),
1017 hex::encode(mac.finalize().into_bytes()),
1018 );
1019
1020 let mut serializer = url::form_urlencoded::Serializer::new(String::new());
1021 for (name, value) in query {
1022 serializer.append_pair(&name, &value);
1023 }
1024 endpoint.set_query(Some(&serializer.finish()));
1025 Ok(endpoint.into())
1026}
1027
1028pub fn bind_addr() -> String {
1033 env::var("TRUSS_BIND_ADDR").unwrap_or_else(|_| DEFAULT_BIND_ADDR.to_string())
1034}
1035
1036pub fn serve(listener: TcpListener) -> io::Result<()> {
1048 let config = ServerConfig::from_env()?;
1049
1050 for (ok, name) in storage_health_check(&config) {
1053 if !ok {
1054 return Err(io::Error::new(
1055 io::ErrorKind::ConnectionRefused,
1056 format!(
1057 "storage connectivity check failed for `{name}` — verify the backend \
1058 endpoint, credentials, and container/bucket configuration"
1059 ),
1060 ));
1061 }
1062 }
1063
1064 serve_with_config(listener, config)
1065}
1066
1067pub fn serve_with_config(listener: TcpListener, config: ServerConfig) -> io::Result<()> {
1077 let config = Arc::new(config);
1078 let (sender, receiver) = std::sync::mpsc::channel::<TcpStream>();
1079
1080 let receiver = Arc::new(std::sync::Mutex::new(receiver));
1086 let pool_size = (config.max_concurrent_transforms as usize).max(WORKER_THREADS);
1087 let mut workers = Vec::with_capacity(pool_size);
1088 for _ in 0..pool_size {
1089 let rx = Arc::clone(&receiver);
1090 let cfg = Arc::clone(&config);
1091 workers.push(std::thread::spawn(move || {
1092 loop {
1093 let stream = {
1094 let guard = rx.lock().expect("worker lock poisoned");
1095 match guard.recv() {
1096 Ok(stream) => stream,
1097 Err(_) => break,
1098 }
1099 }; if let Err(err) = handle_stream(stream, &cfg) {
1101 cfg.log(&format!("failed to handle connection: {err}"));
1102 }
1103 }
1104 }));
1105 }
1106
1107 for stream in listener.incoming() {
1108 match stream {
1109 Ok(stream) => {
1110 if sender.send(stream).is_err() {
1111 break;
1112 }
1113 }
1114 Err(err) => return Err(err),
1115 }
1116 }
1117
1118 drop(sender);
1119 let deadline = std::time::Instant::now() + Duration::from_secs(30);
1120 for worker in workers {
1121 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
1122 if remaining.is_zero() {
1123 eprintln!("shutdown: timed out waiting for worker threads");
1124 break;
1125 }
1126 let worker_done =
1130 std::sync::Arc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
1131 let wd = std::sync::Arc::clone(&worker_done);
1132 std::thread::spawn(move || {
1133 let _ = worker.join();
1134 let (lock, cvar) = &*wd;
1135 *lock.lock().expect("shutdown notify lock") = true;
1136 cvar.notify_one();
1137 });
1138 let (lock, cvar) = &*worker_done;
1139 let mut done = lock.lock().expect("shutdown wait lock");
1140 while !*done {
1141 let (guard, timeout) = cvar
1142 .wait_timeout(done, remaining)
1143 .expect("shutdown condvar wait");
1144 done = guard;
1145 if timeout.timed_out() {
1146 eprintln!("shutdown: timed out waiting for a worker thread");
1147 break;
1148 }
1149 }
1150 }
1151
1152 Ok(())
1153}
1154
1155pub fn serve_once(listener: TcpListener) -> io::Result<()> {
1165 let config = ServerConfig::from_env()?;
1166 serve_once_with_config(listener, config)
1167}
1168
1169pub fn serve_once_with_config(listener: TcpListener, config: ServerConfig) -> io::Result<()> {
1176 let (stream, _) = listener.accept()?;
1177 handle_stream(stream, &config)
1178}
1179
1180#[derive(Debug, Deserialize)]
1181#[serde(deny_unknown_fields)]
1182struct TransformImageRequestPayload {
1183 source: TransformSourcePayload,
1184 #[serde(default)]
1185 options: TransformOptionsPayload,
1186}
1187
1188#[derive(Debug, Deserialize)]
1189#[serde(tag = "kind", rename_all = "lowercase")]
1190enum TransformSourcePayload {
1191 Path {
1192 path: String,
1193 version: Option<String>,
1194 },
1195 Url {
1196 url: String,
1197 version: Option<String>,
1198 },
1199 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
1200 Storage {
1201 bucket: Option<String>,
1202 key: String,
1203 version: Option<String>,
1204 },
1205}
1206
1207impl TransformSourcePayload {
1208 fn versioned_source_hash(&self, config: &ServerConfig) -> Option<String> {
1217 let (kind, reference, version): (&str, std::borrow::Cow<'_, str>, Option<&str>) = match self
1218 {
1219 Self::Path { path, version } => ("path", path.as_str().into(), version.as_deref()),
1220 Self::Url { url, version } => ("url", url.as_str().into(), version.as_deref()),
1221 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
1222 Self::Storage {
1223 bucket,
1224 key,
1225 version,
1226 } => {
1227 let (scheme, effective_bucket) =
1228 storage_scheme_and_bucket(bucket.as_deref(), config);
1229 let effective_bucket = effective_bucket?;
1230 (
1231 "storage",
1232 format!("{scheme}://{effective_bucket}/{key}").into(),
1233 version.as_deref(),
1234 )
1235 }
1236 };
1237 let version = version?;
1238 let mut id = String::new();
1242 id.push_str(kind);
1243 id.push('\n');
1244 id.push_str(&reference);
1245 id.push('\n');
1246 id.push_str(version);
1247 id.push('\n');
1248 id.push_str(config.storage_root.to_string_lossy().as_ref());
1249 id.push('\n');
1250 id.push_str(if config.allow_insecure_url_sources {
1251 "insecure"
1252 } else {
1253 "strict"
1254 });
1255 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
1256 {
1257 id.push('\n');
1258 id.push_str(storage_backend_label(config));
1259 #[cfg(feature = "s3")]
1260 if let Some(ref ctx) = config.s3_context
1261 && let Some(ref endpoint) = ctx.endpoint_url
1262 {
1263 id.push('\n');
1264 id.push_str(endpoint);
1265 }
1266 #[cfg(feature = "gcs")]
1267 if let Some(ref ctx) = config.gcs_context
1268 && let Some(ref endpoint) = ctx.endpoint_url
1269 {
1270 id.push('\n');
1271 id.push_str(endpoint);
1272 }
1273 #[cfg(feature = "azure")]
1274 if let Some(ref ctx) = config.azure_context {
1275 id.push('\n');
1276 id.push_str(&ctx.endpoint_url);
1277 }
1278 }
1279 Some(hex::encode(Sha256::digest(id.as_bytes())))
1280 }
1281}
1282
1283#[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
1284fn storage_scheme_and_bucket<'a>(
1285 explicit_bucket: Option<&'a str>,
1286 config: &'a ServerConfig,
1287) -> (&'static str, Option<&'a str>) {
1288 match config.storage_backend {
1289 #[cfg(feature = "s3")]
1290 StorageBackend::S3 => {
1291 let bucket = explicit_bucket.or(config
1292 .s3_context
1293 .as_ref()
1294 .map(|ctx| ctx.default_bucket.as_str()));
1295 ("s3", bucket)
1296 }
1297 #[cfg(feature = "gcs")]
1298 StorageBackend::Gcs => {
1299 let bucket = explicit_bucket.or(config
1300 .gcs_context
1301 .as_ref()
1302 .map(|ctx| ctx.default_bucket.as_str()));
1303 ("gcs", bucket)
1304 }
1305 StorageBackend::Filesystem => ("fs", explicit_bucket),
1306 #[cfg(feature = "azure")]
1307 StorageBackend::Azure => {
1308 let bucket = explicit_bucket.or(config
1309 .azure_context
1310 .as_ref()
1311 .map(|ctx| ctx.default_container.as_str()));
1312 ("azure", bucket)
1313 }
1314 }
1315}
1316
1317#[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
1318fn is_object_storage_backend(config: &ServerConfig) -> bool {
1319 match config.storage_backend {
1320 StorageBackend::Filesystem => false,
1321 #[cfg(feature = "s3")]
1322 StorageBackend::S3 => true,
1323 #[cfg(feature = "gcs")]
1324 StorageBackend::Gcs => true,
1325 #[cfg(feature = "azure")]
1326 StorageBackend::Azure => true,
1327 }
1328}
1329
1330#[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
1331fn storage_backend_label(config: &ServerConfig) -> &'static str {
1332 match config.storage_backend {
1333 StorageBackend::Filesystem => "fs-backend",
1334 #[cfg(feature = "s3")]
1335 StorageBackend::S3 => "s3-backend",
1336 #[cfg(feature = "gcs")]
1337 StorageBackend::Gcs => "gcs-backend",
1338 #[cfg(feature = "azure")]
1339 StorageBackend::Azure => "azure-backend",
1340 }
1341}
1342
1343#[derive(Debug, Default, Deserialize)]
1344#[serde(default, rename_all = "camelCase", deny_unknown_fields)]
1345struct TransformOptionsPayload {
1346 width: Option<u32>,
1347 height: Option<u32>,
1348 fit: Option<String>,
1349 position: Option<String>,
1350 format: Option<String>,
1351 quality: Option<u8>,
1352 background: Option<String>,
1353 rotate: Option<u16>,
1354 auto_orient: Option<bool>,
1355 strip_metadata: Option<bool>,
1356 preserve_exif: Option<bool>,
1357 blur: Option<f32>,
1358}
1359
1360impl TransformOptionsPayload {
1361 fn into_options(self) -> Result<TransformOptions, HttpResponse> {
1362 let defaults = TransformOptions::default();
1363
1364 Ok(TransformOptions {
1365 width: self.width,
1366 height: self.height,
1367 fit: parse_optional_named(self.fit.as_deref(), "fit", Fit::from_str)?,
1368 position: parse_optional_named(
1369 self.position.as_deref(),
1370 "position",
1371 Position::from_str,
1372 )?,
1373 format: parse_optional_named(self.format.as_deref(), "format", MediaType::from_str)?,
1374 quality: self.quality,
1375 background: parse_optional_named(
1376 self.background.as_deref(),
1377 "background",
1378 Rgba8::from_hex,
1379 )?,
1380 rotate: match self.rotate {
1381 Some(value) => parse_named(&value.to_string(), "rotate", Rotation::from_str)?,
1382 None => defaults.rotate,
1383 },
1384 auto_orient: self.auto_orient.unwrap_or(defaults.auto_orient),
1385 strip_metadata: self.strip_metadata.unwrap_or(defaults.strip_metadata),
1386 preserve_exif: self.preserve_exif.unwrap_or(defaults.preserve_exif),
1387 blur: self.blur,
1388 deadline: defaults.deadline,
1389 })
1390 }
1391}
1392
1393struct AccessLogEntry<'a> {
1394 request_id: &'a str,
1395 method: &'a str,
1396 path: &'a str,
1397 route: &'a str,
1398 status: &'a str,
1399 start: Instant,
1400 cache_status: Option<&'a str>,
1401}
1402
1403fn extract_request_id(headers: &[(String, String)]) -> Option<String> {
1406 headers.iter().find_map(|(name, value)| {
1407 (name == "x-request-id" && !value.is_empty()).then_some(value.clone())
1408 })
1409}
1410
1411fn extract_cache_status(headers: &[(&'static str, String)]) -> Option<&'static str> {
1414 headers
1415 .iter()
1416 .find_map(|(name, value)| (*name == "Cache-Status").then_some(value.as_str()))
1417 .map(|v| if v.contains("hit") { "hit" } else { "miss" })
1418}
1419
1420fn emit_access_log(config: &ServerConfig, entry: &AccessLogEntry<'_>) {
1421 config.log(
1422 &json!({
1423 "kind": "access_log",
1424 "request_id": entry.request_id,
1425 "method": entry.method,
1426 "path": entry.path,
1427 "route": entry.route,
1428 "status": entry.status,
1429 "latency_ms": entry.start.elapsed().as_millis() as u64,
1430 "cache_status": entry.cache_status,
1431 })
1432 .to_string(),
1433 );
1434}
1435
1436fn handle_stream(mut stream: TcpStream, config: &ServerConfig) -> io::Result<()> {
1437 if let Err(err) = stream.set_read_timeout(Some(SOCKET_READ_TIMEOUT)) {
1439 config.log(&format!("failed to set socket read timeout: {err}"));
1440 }
1441 if let Err(err) = stream.set_write_timeout(Some(SOCKET_WRITE_TIMEOUT)) {
1442 config.log(&format!("failed to set socket write timeout: {err}"));
1443 }
1444
1445 let mut requests_served: usize = 0;
1446
1447 loop {
1448 let partial = match read_request_headers(&mut stream) {
1449 Ok(partial) => partial,
1450 Err(response) => {
1451 if requests_served > 0 {
1452 return Ok(());
1453 }
1454 let _ = write_response(&mut stream, response, true);
1455 return Ok(());
1456 }
1457 };
1458
1459 let start = Instant::now();
1462
1463 let request_id =
1464 extract_request_id(&partial.headers).unwrap_or_else(|| Uuid::new_v4().to_string());
1465
1466 let client_wants_close = partial
1467 .headers
1468 .iter()
1469 .any(|(name, value)| name == "connection" && value.eq_ignore_ascii_case("close"));
1470
1471 let is_head = partial.method == "HEAD";
1472
1473 let requires_auth = matches!(
1474 (partial.method.as_str(), partial.path()),
1475 ("POST", "/images:transform") | ("POST", "/images")
1476 );
1477 if requires_auth
1478 && let Err(mut response) = authorize_request_headers(&partial.headers, config)
1479 {
1480 response.headers.push(("X-Request-Id", request_id.clone()));
1481 let sc = status_code(response.status).unwrap_or("unknown");
1482 let method_log = partial.method.clone();
1483 let path_log = partial.path().to_string();
1484 let _ = write_response(&mut stream, response, true);
1485 emit_access_log(
1486 config,
1487 &AccessLogEntry {
1488 request_id: &request_id,
1489 method: &method_log,
1490 path: &path_log,
1491 route: &path_log,
1492 status: sc,
1493 start,
1494 cache_status: None,
1495 },
1496 );
1497 return Ok(());
1498 }
1499
1500 let method = partial.method.clone();
1502 let path = partial.path().to_string();
1503
1504 let request = match read_request_body(&mut stream, partial) {
1505 Ok(request) => request,
1506 Err(mut response) => {
1507 response.headers.push(("X-Request-Id", request_id.clone()));
1508 let sc = status_code(response.status).unwrap_or("unknown");
1509 let _ = write_response(&mut stream, response, true);
1510 emit_access_log(
1511 config,
1512 &AccessLogEntry {
1513 request_id: &request_id,
1514 method: &method,
1515 path: &path,
1516 route: &path,
1517 status: sc,
1518 start,
1519 cache_status: None,
1520 },
1521 );
1522 return Ok(());
1523 }
1524 };
1525 let route = classify_route(&request);
1526 let mut response = route_request(request, config);
1527 record_http_metrics(route, response.status);
1528
1529 response.headers.push(("X-Request-Id", request_id.clone()));
1530
1531 let cache_status = extract_cache_status(&response.headers);
1532
1533 let sc = status_code(response.status).unwrap_or("unknown");
1534
1535 if is_head {
1536 response.body = Vec::new();
1537 }
1538
1539 requests_served += 1;
1540 let close_after = client_wants_close || requests_served >= KEEP_ALIVE_MAX_REQUESTS;
1541
1542 write_response(&mut stream, response, close_after)?;
1543
1544 emit_access_log(
1545 config,
1546 &AccessLogEntry {
1547 request_id: &request_id,
1548 method: &method,
1549 path: &path,
1550 route: route.as_label(),
1551 status: sc,
1552 start,
1553 cache_status,
1554 },
1555 );
1556
1557 if close_after {
1558 return Ok(());
1559 }
1560 }
1561}
1562
1563fn route_request(request: HttpRequest, config: &ServerConfig) -> HttpResponse {
1564 let method = request.method.clone();
1565 let path = request.path().to_string();
1566
1567 match (method.as_str(), path.as_str()) {
1568 ("GET" | "HEAD", "/health") => handle_health(config),
1569 ("GET" | "HEAD", "/health/live") => handle_health_live(),
1570 ("GET" | "HEAD", "/health/ready") => handle_health_ready(config),
1571 ("GET" | "HEAD", "/images/by-path") => handle_public_path_request(request, config),
1572 ("GET" | "HEAD", "/images/by-url") => handle_public_url_request(request, config),
1573 ("POST", "/images:transform") => handle_transform_request(request, config),
1574 ("POST", "/images") => handle_upload_request(request, config),
1575 ("GET" | "HEAD", "/metrics") => handle_metrics_request(request, config),
1576 _ => HttpResponse::problem("404 Not Found", NOT_FOUND_BODY.as_bytes().to_vec()),
1577 }
1578}
1579
1580fn classify_route(request: &HttpRequest) -> RouteMetric {
1581 match (request.method.as_str(), request.path()) {
1582 ("GET" | "HEAD", "/health") => RouteMetric::Health,
1583 ("GET" | "HEAD", "/health/live") => RouteMetric::HealthLive,
1584 ("GET" | "HEAD", "/health/ready") => RouteMetric::HealthReady,
1585 ("GET" | "HEAD", "/images/by-path") => RouteMetric::PublicByPath,
1586 ("GET" | "HEAD", "/images/by-url") => RouteMetric::PublicByUrl,
1587 ("POST", "/images:transform") => RouteMetric::Transform,
1588 ("POST", "/images") => RouteMetric::Upload,
1589 ("GET" | "HEAD", "/metrics") => RouteMetric::Metrics,
1590 _ => RouteMetric::Unknown,
1591 }
1592}
1593
1594fn handle_transform_request(request: HttpRequest, config: &ServerConfig) -> HttpResponse {
1595 if let Err(response) = authorize_request(&request, config) {
1596 return response;
1597 }
1598
1599 if !request_has_json_content_type(&request) {
1600 return unsupported_media_type_response("content-type must be application/json");
1601 }
1602
1603 let payload: TransformImageRequestPayload = match serde_json::from_slice(&request.body) {
1604 Ok(payload) => payload,
1605 Err(error) => {
1606 return bad_request_response(&format!("request body must be valid JSON: {error}"));
1607 }
1608 };
1609 let options = match payload.options.into_options() {
1610 Ok(options) => options,
1611 Err(response) => return response,
1612 };
1613
1614 let versioned_hash = payload.source.versioned_source_hash(config);
1615 if let Some(response) = try_versioned_cache_lookup(
1616 versioned_hash.as_deref(),
1617 &options,
1618 &request,
1619 ImageResponsePolicy::PrivateTransform,
1620 config,
1621 ) {
1622 return response;
1623 }
1624
1625 let source_bytes = match resolve_source_bytes(payload.source, config) {
1626 Ok(bytes) => bytes,
1627 Err(response) => return response,
1628 };
1629 transform_source_bytes(
1630 source_bytes,
1631 options,
1632 versioned_hash.as_deref(),
1633 &request,
1634 ImageResponsePolicy::PrivateTransform,
1635 config,
1636 )
1637}
1638
1639fn handle_public_path_request(request: HttpRequest, config: &ServerConfig) -> HttpResponse {
1640 handle_public_get_request(request, config, PublicSourceKind::Path)
1641}
1642
1643fn handle_public_url_request(request: HttpRequest, config: &ServerConfig) -> HttpResponse {
1644 handle_public_get_request(request, config, PublicSourceKind::Url)
1645}
1646
1647fn handle_public_get_request(
1648 request: HttpRequest,
1649 config: &ServerConfig,
1650 source_kind: PublicSourceKind,
1651) -> HttpResponse {
1652 let query = match parse_query_params(&request) {
1653 Ok(query) => query,
1654 Err(response) => return response,
1655 };
1656 if let Err(response) = authorize_signed_request(&request, &query, config) {
1657 return response;
1658 }
1659 let (source, options) = match parse_public_get_request(&query, source_kind) {
1660 Ok(parsed) => parsed,
1661 Err(response) => return response,
1662 };
1663
1664 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
1668 let source = if is_object_storage_backend(config) {
1669 match source {
1670 TransformSourcePayload::Path { path, version } => TransformSourcePayload::Storage {
1671 bucket: None,
1672 key: path.trim_start_matches('/').to_string(),
1673 version,
1674 },
1675 other => other,
1676 }
1677 } else {
1678 source
1679 };
1680
1681 let versioned_hash = source.versioned_source_hash(config);
1682 if let Some(response) = try_versioned_cache_lookup(
1683 versioned_hash.as_deref(),
1684 &options,
1685 &request,
1686 ImageResponsePolicy::PublicGet,
1687 config,
1688 ) {
1689 return response;
1690 }
1691
1692 let source_bytes = match resolve_source_bytes(source, config) {
1693 Ok(bytes) => bytes,
1694 Err(response) => return response,
1695 };
1696
1697 transform_source_bytes(
1698 source_bytes,
1699 options,
1700 versioned_hash.as_deref(),
1701 &request,
1702 ImageResponsePolicy::PublicGet,
1703 config,
1704 )
1705}
1706
1707fn handle_upload_request(request: HttpRequest, config: &ServerConfig) -> HttpResponse {
1708 if let Err(response) = authorize_request(&request, config) {
1709 return response;
1710 }
1711
1712 let boundary = match parse_multipart_boundary(&request) {
1713 Ok(boundary) => boundary,
1714 Err(response) => return response,
1715 };
1716 let (file_bytes, options) = match parse_upload_request(&request.body, &boundary) {
1717 Ok(parts) => parts,
1718 Err(response) => return response,
1719 };
1720 transform_source_bytes(
1721 file_bytes,
1722 options,
1723 None,
1724 &request,
1725 ImageResponsePolicy::PrivateTransform,
1726 config,
1727 )
1728}
1729
1730fn handle_health_live() -> HttpResponse {
1732 let body = serde_json::to_vec(&json!({
1733 "status": "ok",
1734 "service": "truss",
1735 "version": env!("CARGO_PKG_VERSION"),
1736 }))
1737 .expect("serialize liveness");
1738 let mut body = body;
1739 body.push(b'\n');
1740 HttpResponse::json("200 OK", body)
1741}
1742
1743fn handle_health_ready(config: &ServerConfig) -> HttpResponse {
1748 let mut checks: Vec<serde_json::Value> = Vec::new();
1749 let mut all_ok = true;
1750
1751 for (ok, name) in storage_health_check(config) {
1752 checks.push(json!({
1753 "name": name,
1754 "status": if ok { "ok" } else { "fail" },
1755 }));
1756 if !ok {
1757 all_ok = false;
1758 }
1759 }
1760
1761 if let Some(cache_root) = &config.cache_root {
1762 let cache_ok = cache_root.is_dir();
1763 checks.push(json!({
1764 "name": "cacheRoot",
1765 "status": if cache_ok { "ok" } else { "fail" },
1766 }));
1767 if !cache_ok {
1768 all_ok = false;
1769 }
1770 }
1771
1772 let status_str = if all_ok { "ok" } else { "fail" };
1773 let mut body = serde_json::to_vec(&json!({
1774 "status": status_str,
1775 "checks": checks,
1776 }))
1777 .expect("serialize readiness");
1778 body.push(b'\n');
1779
1780 if all_ok {
1781 HttpResponse::json("200 OK", body)
1782 } else {
1783 HttpResponse::json("503 Service Unavailable", body)
1784 }
1785}
1786
1787fn storage_health_check(config: &ServerConfig) -> Vec<(bool, &'static str)> {
1789 #[allow(unused_mut)]
1790 let mut checks = vec![(config.storage_root.is_dir(), "storageRoot")];
1791 #[cfg(feature = "s3")]
1792 if config.storage_backend == StorageBackend::S3 {
1793 let reachable = config
1794 .s3_context
1795 .as_ref()
1796 .is_some_and(|ctx| ctx.check_reachable());
1797 checks.push((reachable, "storageBackend"));
1798 }
1799 #[cfg(feature = "gcs")]
1800 if config.storage_backend == StorageBackend::Gcs {
1801 let reachable = config
1802 .gcs_context
1803 .as_ref()
1804 .is_some_and(|ctx| ctx.check_reachable());
1805 checks.push((reachable, "storageBackend"));
1806 }
1807 #[cfg(feature = "azure")]
1808 if config.storage_backend == StorageBackend::Azure {
1809 let reachable = config
1810 .azure_context
1811 .as_ref()
1812 .is_some_and(|ctx| ctx.check_reachable());
1813 checks.push((reachable, "storageBackend"));
1814 }
1815 checks
1816}
1817
1818fn handle_health(config: &ServerConfig) -> HttpResponse {
1819 let mut checks: Vec<serde_json::Value> = Vec::new();
1820 let mut all_ok = true;
1821
1822 for (ok, name) in storage_health_check(config) {
1823 checks.push(json!({
1824 "name": name,
1825 "status": if ok { "ok" } else { "fail" },
1826 }));
1827 if !ok {
1828 all_ok = false;
1829 }
1830 }
1831
1832 if let Some(cache_root) = &config.cache_root {
1833 let cache_ok = cache_root.is_dir();
1834 checks.push(json!({
1835 "name": "cacheRoot",
1836 "status": if cache_ok { "ok" } else { "fail" },
1837 }));
1838 if !cache_ok {
1839 all_ok = false;
1840 }
1841 }
1842
1843 let in_flight = config.transforms_in_flight.load(Ordering::Relaxed);
1844 let overloaded = in_flight >= config.max_concurrent_transforms;
1845 checks.push(json!({
1846 "name": "transformCapacity",
1847 "status": if overloaded { "fail" } else { "ok" },
1848 }));
1849 if overloaded {
1850 all_ok = false;
1851 }
1852
1853 let status_str = if all_ok { "ok" } else { "fail" };
1854 let mut body = serde_json::to_vec(&json!({
1855 "status": status_str,
1856 "service": "truss",
1857 "version": env!("CARGO_PKG_VERSION"),
1858 "uptimeSeconds": uptime_seconds(),
1859 "checks": checks,
1860 }))
1861 .expect("serialize health");
1862 body.push(b'\n');
1863
1864 HttpResponse::json("200 OK", body)
1865}
1866
1867fn handle_metrics_request(request: HttpRequest, config: &ServerConfig) -> HttpResponse {
1868 if let Err(response) = authorize_request(&request, config) {
1869 return response;
1870 }
1871
1872 HttpResponse::text(
1873 "200 OK",
1874 "text/plain; version=0.0.4; charset=utf-8",
1875 render_metrics_text(
1876 config.max_concurrent_transforms,
1877 &config.transforms_in_flight,
1878 )
1879 .into_bytes(),
1880 )
1881}
1882
1883fn parse_public_get_request(
1884 query: &BTreeMap<String, String>,
1885 source_kind: PublicSourceKind,
1886) -> Result<(TransformSourcePayload, TransformOptions), HttpResponse> {
1887 validate_public_query_names(query, source_kind)?;
1888
1889 let source = match source_kind {
1890 PublicSourceKind::Path => TransformSourcePayload::Path {
1891 path: required_query_param(query, "path")?.to_string(),
1892 version: query.get("version").cloned(),
1893 },
1894 PublicSourceKind::Url => TransformSourcePayload::Url {
1895 url: required_query_param(query, "url")?.to_string(),
1896 version: query.get("version").cloned(),
1897 },
1898 };
1899
1900 let defaults = TransformOptions::default();
1901 let options = TransformOptions {
1902 width: parse_optional_integer_query(query, "width")?,
1903 height: parse_optional_integer_query(query, "height")?,
1904 fit: parse_optional_named(query.get("fit").map(String::as_str), "fit", Fit::from_str)?,
1905 position: parse_optional_named(
1906 query.get("position").map(String::as_str),
1907 "position",
1908 Position::from_str,
1909 )?,
1910 format: parse_optional_named(
1911 query.get("format").map(String::as_str),
1912 "format",
1913 MediaType::from_str,
1914 )?,
1915 quality: parse_optional_u8_query(query, "quality")?,
1916 background: parse_optional_named(
1917 query.get("background").map(String::as_str),
1918 "background",
1919 Rgba8::from_hex,
1920 )?,
1921 rotate: match query.get("rotate") {
1922 Some(value) => parse_named(value, "rotate", Rotation::from_str)?,
1923 None => defaults.rotate,
1924 },
1925 auto_orient: parse_optional_bool_query(query, "autoOrient")?
1926 .unwrap_or(defaults.auto_orient),
1927 strip_metadata: parse_optional_bool_query(query, "stripMetadata")?
1928 .unwrap_or(defaults.strip_metadata),
1929 preserve_exif: parse_optional_bool_query(query, "preserveExif")?
1930 .unwrap_or(defaults.preserve_exif),
1931 blur: parse_optional_float_query(query, "blur")?,
1932 deadline: defaults.deadline,
1933 };
1934
1935 Ok((source, options))
1936}
1937
1938fn transform_source_bytes(
1939 source_bytes: Vec<u8>,
1940 options: TransformOptions,
1941 versioned_hash: Option<&str>,
1942 request: &HttpRequest,
1943 response_policy: ImageResponsePolicy,
1944 config: &ServerConfig,
1945) -> HttpResponse {
1946 let content_hash;
1947 let source_hash = match versioned_hash {
1948 Some(hash) => hash,
1949 None => {
1950 content_hash = hex::encode(Sha256::digest(&source_bytes));
1951 &content_hash
1952 }
1953 };
1954
1955 let cache = config
1956 .cache_root
1957 .as_ref()
1958 .map(|root| TransformCache::new(root.clone()).with_log_handler(config.log_handler.clone()));
1959
1960 if let Some(ref cache) = cache
1961 && options.format.is_some()
1962 {
1963 let cache_key = compute_cache_key(source_hash, &options, None);
1964 if let CacheLookup::Hit {
1965 media_type,
1966 body,
1967 age,
1968 } = cache.get(&cache_key)
1969 {
1970 CACHE_HITS_TOTAL.fetch_add(1, Ordering::Relaxed);
1971 let etag = build_image_etag(&body);
1972 let mut headers = build_image_response_headers(
1973 media_type,
1974 &etag,
1975 response_policy,
1976 false,
1977 CacheHitStatus::Hit,
1978 config.public_max_age_seconds,
1979 config.public_stale_while_revalidate_seconds,
1980 );
1981 headers.push(("Age", age.as_secs().to_string()));
1982 if matches!(response_policy, ImageResponsePolicy::PublicGet)
1983 && if_none_match_matches(request.header("if-none-match"), &etag)
1984 {
1985 return HttpResponse::empty("304 Not Modified", headers);
1986 }
1987 return HttpResponse::binary_with_headers(
1988 "200 OK",
1989 media_type.as_mime(),
1990 headers,
1991 body,
1992 );
1993 }
1994 }
1995
1996 let _slot = match TransformSlot::try_acquire(
1997 &config.transforms_in_flight,
1998 config.max_concurrent_transforms,
1999 ) {
2000 Some(slot) => slot,
2001 None => return service_unavailable_response("too many concurrent transforms; retry later"),
2002 };
2003 transform_source_bytes_inner(
2004 source_bytes,
2005 options,
2006 request,
2007 response_policy,
2008 cache.as_ref(),
2009 source_hash,
2010 ImageResponseConfig {
2011 disable_accept_negotiation: config.disable_accept_negotiation,
2012 public_cache_control: PublicCacheControl {
2013 max_age: config.public_max_age_seconds,
2014 stale_while_revalidate: config.public_stale_while_revalidate_seconds,
2015 },
2016 transform_deadline: Duration::from_secs(config.transform_deadline_secs),
2017 },
2018 )
2019}
2020
2021fn transform_source_bytes_inner(
2022 source_bytes: Vec<u8>,
2023 mut options: TransformOptions,
2024 request: &HttpRequest,
2025 response_policy: ImageResponsePolicy,
2026 cache: Option<&TransformCache>,
2027 source_hash: &str,
2028 response_config: ImageResponseConfig,
2029) -> HttpResponse {
2030 if options.deadline.is_none() {
2031 options.deadline = Some(response_config.transform_deadline);
2032 }
2033 let artifact = match sniff_artifact(RawArtifact::new(source_bytes, None)) {
2034 Ok(artifact) => artifact,
2035 Err(error) => return transform_error_response(error),
2036 };
2037 let negotiation_used =
2038 if options.format.is_none() && !response_config.disable_accept_negotiation {
2039 match negotiate_output_format(request.header("accept"), &artifact) {
2040 Ok(Some(format)) => {
2041 options.format = Some(format);
2042 true
2043 }
2044 Ok(None) => false,
2045 Err(response) => return response,
2046 }
2047 } else {
2048 false
2049 };
2050
2051 let negotiated_accept = if negotiation_used {
2052 request.header("accept")
2053 } else {
2054 None
2055 };
2056 let cache_key = compute_cache_key(source_hash, &options, negotiated_accept);
2057
2058 if let Some(cache) = cache
2059 && let CacheLookup::Hit {
2060 media_type,
2061 body,
2062 age,
2063 } = cache.get(&cache_key)
2064 {
2065 CACHE_HITS_TOTAL.fetch_add(1, Ordering::Relaxed);
2066 let etag = build_image_etag(&body);
2067 let mut headers = build_image_response_headers(
2068 media_type,
2069 &etag,
2070 response_policy,
2071 negotiation_used,
2072 CacheHitStatus::Hit,
2073 response_config.public_cache_control.max_age,
2074 response_config.public_cache_control.stale_while_revalidate,
2075 );
2076 headers.push(("Age", age.as_secs().to_string()));
2077 if matches!(response_policy, ImageResponsePolicy::PublicGet)
2078 && if_none_match_matches(request.header("if-none-match"), &etag)
2079 {
2080 return HttpResponse::empty("304 Not Modified", headers);
2081 }
2082 return HttpResponse::binary_with_headers("200 OK", media_type.as_mime(), headers, body);
2083 }
2084
2085 if cache.is_some() {
2086 CACHE_MISSES_TOTAL.fetch_add(1, Ordering::Relaxed);
2087 }
2088
2089 let is_svg = artifact.media_type == MediaType::Svg;
2090 let result = if is_svg {
2091 match transform_svg(TransformRequest::new(artifact, options)) {
2092 Ok(result) => result,
2093 Err(error) => return transform_error_response(error),
2094 }
2095 } else {
2096 match transform_raster(TransformRequest::new(artifact, options)) {
2097 Ok(result) => result,
2098 Err(error) => return transform_error_response(error),
2099 }
2100 };
2101
2102 for warning in &result.warnings {
2103 let msg = format!("truss: {warning}");
2104 if let Some(c) = cache
2105 && let Some(handler) = &c.log_handler
2106 {
2107 handler(&msg);
2108 } else {
2109 stderr_write(&msg);
2110 }
2111 }
2112
2113 let output = result.artifact;
2114
2115 if let Some(cache) = cache {
2116 cache.put(&cache_key, output.media_type, &output.bytes);
2117 }
2118
2119 let cache_hit_status = if cache.is_some() {
2120 CacheHitStatus::Miss
2121 } else {
2122 CacheHitStatus::Disabled
2123 };
2124
2125 let etag = build_image_etag(&output.bytes);
2126 let headers = build_image_response_headers(
2127 output.media_type,
2128 &etag,
2129 response_policy,
2130 negotiation_used,
2131 cache_hit_status,
2132 response_config.public_cache_control.max_age,
2133 response_config.public_cache_control.stale_while_revalidate,
2134 );
2135
2136 if matches!(response_policy, ImageResponsePolicy::PublicGet)
2137 && if_none_match_matches(request.header("if-none-match"), &etag)
2138 {
2139 return HttpResponse::empty("304 Not Modified", headers);
2140 }
2141
2142 HttpResponse::binary_with_headers("200 OK", output.media_type.as_mime(), headers, output.bytes)
2143}
2144
2145fn env_flag(name: &str) -> bool {
2146 env::var(name)
2147 .map(|value| {
2148 matches!(
2149 value.as_str(),
2150 "1" | "true" | "TRUE" | "yes" | "YES" | "on" | "ON"
2151 )
2152 })
2153 .unwrap_or(false)
2154}
2155
2156fn parse_optional_env_u32(name: &str) -> io::Result<Option<u32>> {
2157 match env::var(name) {
2158 Ok(value) if !value.is_empty() => value.parse::<u32>().map(Some).map_err(|_| {
2159 io::Error::new(
2160 io::ErrorKind::InvalidInput,
2161 format!("{name} must be a non-negative integer"),
2162 )
2163 }),
2164 _ => Ok(None),
2165 }
2166}
2167
2168fn validate_public_base_url(value: String) -> io::Result<String> {
2169 let parsed = Url::parse(&value).map_err(|error| {
2170 io::Error::new(
2171 io::ErrorKind::InvalidInput,
2172 format!("TRUSS_PUBLIC_BASE_URL must be a valid URL: {error}"),
2173 )
2174 })?;
2175
2176 match parsed.scheme() {
2177 "http" | "https" => Ok(parsed.to_string()),
2178 _ => Err(io::Error::new(
2179 io::ErrorKind::InvalidInput,
2180 "TRUSS_PUBLIC_BASE_URL must use http or https",
2181 )),
2182 }
2183}
2184
2185#[cfg(test)]
2186mod tests {
2187 use serial_test::serial;
2188
2189 use super::http_parse::{
2190 HttpRequest, find_header_terminator, read_request_body, read_request_headers,
2191 resolve_storage_path,
2192 };
2193 use super::multipart::parse_multipart_form_data;
2194 use super::remote::{PinnedResolver, prepare_remote_fetch_target};
2195 use super::response::auth_required_response;
2196 use super::response::{HttpResponse, bad_request_response};
2197 use super::{
2198 CacheHitStatus, DEFAULT_BIND_ADDR, DEFAULT_MAX_CONCURRENT_TRANSFORMS,
2199 DEFAULT_PUBLIC_MAX_AGE_SECONDS, DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS,
2200 ImageResponsePolicy, PublicSourceKind, ServerConfig, SignedUrlSource,
2201 TransformSourcePayload, authorize_signed_request, bind_addr, build_image_etag,
2202 build_image_response_headers, canonical_query_without_signature, negotiate_output_format,
2203 parse_public_get_request, route_request, serve_once_with_config, sign_public_url,
2204 transform_source_bytes,
2205 };
2206 use crate::{
2207 Artifact, ArtifactMetadata, MediaType, RawArtifact, TransformOptions, sniff_artifact,
2208 };
2209 use hmac::{Hmac, Mac};
2210 use image::codecs::png::PngEncoder;
2211 use image::{ColorType, ImageEncoder, Rgba, RgbaImage};
2212 use sha2::Sha256;
2213 use std::collections::BTreeMap;
2214 use std::fs;
2215 use std::io::{Cursor, Read, Write};
2216 use std::net::{SocketAddr, TcpListener, TcpStream};
2217 use std::path::{Path, PathBuf};
2218 use std::sync::atomic::Ordering;
2219 use std::thread;
2220 use std::time::{Duration, SystemTime, UNIX_EPOCH};
2221
2222 fn read_request<R: Read>(stream: &mut R) -> Result<HttpRequest, HttpResponse> {
2225 let partial = read_request_headers(stream)?;
2226 read_request_body(stream, partial)
2227 }
2228
2229 fn png_bytes() -> Vec<u8> {
2230 let image = RgbaImage::from_pixel(4, 3, Rgba([10, 20, 30, 255]));
2231 let mut bytes = Vec::new();
2232 PngEncoder::new(&mut bytes)
2233 .write_image(&image, 4, 3, ColorType::Rgba8.into())
2234 .expect("encode png");
2235 bytes
2236 }
2237
2238 fn temp_dir(name: &str) -> PathBuf {
2239 let unique = SystemTime::now()
2240 .duration_since(UNIX_EPOCH)
2241 .expect("current time")
2242 .as_nanos();
2243 let path = std::env::temp_dir().join(format!("truss-server-{name}-{unique}"));
2244 fs::create_dir_all(&path).expect("create temp dir");
2245 path
2246 }
2247
2248 fn write_png(path: &Path) {
2249 fs::write(path, png_bytes()).expect("write png fixture");
2250 }
2251
2252 fn artifact_with_alpha(has_alpha: bool) -> Artifact {
2253 Artifact::new(
2254 png_bytes(),
2255 MediaType::Png,
2256 ArtifactMetadata {
2257 width: Some(4),
2258 height: Some(3),
2259 frame_count: 1,
2260 duration: None,
2261 has_alpha: Some(has_alpha),
2262 },
2263 )
2264 }
2265
2266 fn sign_public_query(
2267 method: &str,
2268 authority: &str,
2269 path: &str,
2270 query: &BTreeMap<String, String>,
2271 secret: &str,
2272 ) -> String {
2273 let canonical = format!(
2274 "{method}\n{authority}\n{path}\n{}",
2275 canonical_query_without_signature(query)
2276 );
2277 let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).expect("create hmac");
2278 mac.update(canonical.as_bytes());
2279 hex::encode(mac.finalize().into_bytes())
2280 }
2281
2282 type FixtureResponse = (String, Vec<(String, String)>, Vec<u8>);
2283
2284 fn read_fixture_request(stream: &mut TcpStream) {
2285 stream
2286 .set_nonblocking(false)
2287 .expect("configure fixture stream blocking mode");
2288 stream
2289 .set_read_timeout(Some(Duration::from_millis(100)))
2290 .expect("configure fixture stream timeout");
2291
2292 let deadline = std::time::Instant::now() + Duration::from_secs(2);
2293 let mut buffer = Vec::new();
2294 let mut chunk = [0_u8; 1024];
2295 let header_end = loop {
2296 let read = match stream.read(&mut chunk) {
2297 Ok(read) => read,
2298 Err(error)
2299 if matches!(
2300 error.kind(),
2301 std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut
2302 ) && std::time::Instant::now() < deadline =>
2303 {
2304 thread::sleep(Duration::from_millis(10));
2305 continue;
2306 }
2307 Err(error) => panic!("read fixture request headers: {error}"),
2308 };
2309 if read == 0 {
2310 panic!("fixture request ended before headers were complete");
2311 }
2312 buffer.extend_from_slice(&chunk[..read]);
2313 if let Some(index) = find_header_terminator(&buffer) {
2314 break index;
2315 }
2316 };
2317
2318 let header_text = std::str::from_utf8(&buffer[..header_end]).expect("fixture request utf8");
2319 let content_length = header_text
2320 .split("\r\n")
2321 .filter_map(|line| line.split_once(':'))
2322 .find_map(|(name, value)| {
2323 name.trim()
2324 .eq_ignore_ascii_case("content-length")
2325 .then_some(value.trim())
2326 })
2327 .map(|value| {
2328 value
2329 .parse::<usize>()
2330 .expect("fixture content-length should be numeric")
2331 })
2332 .unwrap_or(0);
2333
2334 let mut body = buffer.len().saturating_sub(header_end + 4);
2335 while body < content_length {
2336 let read = match stream.read(&mut chunk) {
2337 Ok(read) => read,
2338 Err(error)
2339 if matches!(
2340 error.kind(),
2341 std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut
2342 ) && std::time::Instant::now() < deadline =>
2343 {
2344 thread::sleep(Duration::from_millis(10));
2345 continue;
2346 }
2347 Err(error) => panic!("read fixture request body: {error}"),
2348 };
2349 if read == 0 {
2350 panic!("fixture request body was truncated");
2351 }
2352 body += read;
2353 }
2354 }
2355
2356 fn spawn_http_server(responses: Vec<FixtureResponse>) -> (String, thread::JoinHandle<()>) {
2357 let listener = TcpListener::bind("127.0.0.1:0").expect("bind fixture server");
2358 listener
2359 .set_nonblocking(true)
2360 .expect("configure fixture server");
2361 let addr = listener.local_addr().expect("fixture server addr");
2362 let url = format!("http://{addr}/image");
2363
2364 let handle = thread::spawn(move || {
2365 for (status, headers, body) in responses {
2366 let deadline = std::time::Instant::now() + Duration::from_secs(10);
2367 let mut accepted = None;
2368 while std::time::Instant::now() < deadline {
2369 match listener.accept() {
2370 Ok(stream) => {
2371 accepted = Some(stream);
2372 break;
2373 }
2374 Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => {
2375 thread::sleep(Duration::from_millis(10));
2376 }
2377 Err(error) => panic!("accept fixture request: {error}"),
2378 }
2379 }
2380
2381 let Some((mut stream, _)) = accepted else {
2382 break;
2383 };
2384 read_fixture_request(&mut stream);
2385 let mut header = format!(
2386 "HTTP/1.1 {status}\r\nContent-Length: {}\r\nConnection: close\r\n",
2387 body.len()
2388 );
2389 for (name, value) in headers {
2390 header.push_str(&format!("{name}: {value}\r\n"));
2391 }
2392 header.push_str("\r\n");
2393 stream
2394 .write_all(header.as_bytes())
2395 .expect("write fixture headers");
2396 stream.write_all(&body).expect("write fixture body");
2397 stream.flush().expect("flush fixture response");
2398 }
2399 });
2400
2401 (url, handle)
2402 }
2403
2404 fn transform_request(path: &str) -> HttpRequest {
2405 HttpRequest {
2406 method: "POST".to_string(),
2407 target: "/images:transform".to_string(),
2408 version: "HTTP/1.1".to_string(),
2409 headers: vec![
2410 ("authorization".to_string(), "Bearer secret".to_string()),
2411 ("content-type".to_string(), "application/json".to_string()),
2412 ],
2413 body: format!(
2414 "{{\"source\":{{\"kind\":\"path\",\"path\":\"{path}\"}},\"options\":{{\"format\":\"jpeg\"}}}}"
2415 )
2416 .into_bytes(),
2417 }
2418 }
2419
2420 fn transform_url_request(url: &str) -> HttpRequest {
2421 HttpRequest {
2422 method: "POST".to_string(),
2423 target: "/images:transform".to_string(),
2424 version: "HTTP/1.1".to_string(),
2425 headers: vec![
2426 ("authorization".to_string(), "Bearer secret".to_string()),
2427 ("content-type".to_string(), "application/json".to_string()),
2428 ],
2429 body: format!(
2430 "{{\"source\":{{\"kind\":\"url\",\"url\":\"{url}\"}},\"options\":{{\"format\":\"jpeg\"}}}}"
2431 )
2432 .into_bytes(),
2433 }
2434 }
2435
2436 fn upload_request(file_bytes: &[u8], options_json: Option<&str>) -> HttpRequest {
2437 let boundary = "truss-test-boundary";
2438 let mut body = Vec::new();
2439 body.extend_from_slice(
2440 format!(
2441 "--{boundary}\r\nContent-Disposition: form-data; name=\"file\"; filename=\"image.png\"\r\nContent-Type: image/png\r\n\r\n"
2442 )
2443 .as_bytes(),
2444 );
2445 body.extend_from_slice(file_bytes);
2446 body.extend_from_slice(b"\r\n");
2447
2448 if let Some(options_json) = options_json {
2449 body.extend_from_slice(
2450 format!(
2451 "--{boundary}\r\nContent-Disposition: form-data; name=\"options\"\r\nContent-Type: application/json\r\n\r\n{options_json}\r\n"
2452 )
2453 .as_bytes(),
2454 );
2455 }
2456
2457 body.extend_from_slice(format!("--{boundary}--\r\n").as_bytes());
2458
2459 HttpRequest {
2460 method: "POST".to_string(),
2461 target: "/images".to_string(),
2462 version: "HTTP/1.1".to_string(),
2463 headers: vec![
2464 ("authorization".to_string(), "Bearer secret".to_string()),
2465 (
2466 "content-type".to_string(),
2467 format!("multipart/form-data; boundary={boundary}"),
2468 ),
2469 ],
2470 body,
2471 }
2472 }
2473
2474 fn metrics_request(with_auth: bool) -> HttpRequest {
2475 let mut headers = Vec::new();
2476 if with_auth {
2477 headers.push(("authorization".to_string(), "Bearer secret".to_string()));
2478 }
2479
2480 HttpRequest {
2481 method: "GET".to_string(),
2482 target: "/metrics".to_string(),
2483 version: "HTTP/1.1".to_string(),
2484 headers,
2485 body: Vec::new(),
2486 }
2487 }
2488
2489 fn response_body(response: &HttpResponse) -> &str {
2490 std::str::from_utf8(&response.body).expect("utf8 response body")
2491 }
2492
2493 fn signed_public_request(target: &str, host: &str, secret: &str) -> HttpRequest {
2494 let (path, query) = target.split_once('?').expect("target has query");
2495 let mut query = url::form_urlencoded::parse(query.as_bytes())
2496 .into_owned()
2497 .collect::<BTreeMap<_, _>>();
2498 let signature = sign_public_query("GET", host, path, &query, secret);
2499 query.insert("signature".to_string(), signature);
2500 let final_query = url::form_urlencoded::Serializer::new(String::new())
2501 .extend_pairs(
2502 query
2503 .iter()
2504 .map(|(name, value)| (name.as_str(), value.as_str())),
2505 )
2506 .finish();
2507
2508 HttpRequest {
2509 method: "GET".to_string(),
2510 target: format!("{path}?{final_query}"),
2511 version: "HTTP/1.1".to_string(),
2512 headers: vec![("host".to_string(), host.to_string())],
2513 body: Vec::new(),
2514 }
2515 }
2516
2517 #[test]
2518 fn uses_default_bind_addr_when_env_is_missing() {
2519 unsafe { std::env::remove_var("TRUSS_BIND_ADDR") };
2520 assert_eq!(bind_addr(), DEFAULT_BIND_ADDR);
2521 }
2522
2523 #[test]
2524 fn authorize_signed_request_accepts_a_valid_signature() {
2525 let request = signed_public_request(
2526 "/images/by-path?path=%2Fimage.png&keyId=public-dev&expires=4102444800&format=jpeg",
2527 "assets.example.com",
2528 "secret-value",
2529 );
2530 let query = super::auth::parse_query_params(&request).expect("parse query");
2531 let config = ServerConfig::new(temp_dir("public-auth"), None)
2532 .with_signed_url_credentials("public-dev", "secret-value");
2533
2534 authorize_signed_request(&request, &query, &config).expect("signed auth should pass");
2535 }
2536
2537 #[test]
2538 fn authorize_signed_request_uses_public_base_url_authority() {
2539 let request = signed_public_request(
2540 "/images/by-path?path=%2Fimage.png&keyId=public-dev&expires=4102444800&format=jpeg",
2541 "cdn.example.com",
2542 "secret-value",
2543 );
2544 let query = super::auth::parse_query_params(&request).expect("parse query");
2545 let mut config = ServerConfig::new(temp_dir("public-authority"), None)
2546 .with_signed_url_credentials("public-dev", "secret-value");
2547 config.public_base_url = Some("https://cdn.example.com".to_string());
2548
2549 authorize_signed_request(&request, &query, &config).expect("signed auth should pass");
2550 }
2551
2552 #[test]
2553 fn negotiate_output_format_prefers_alpha_safe_formats_for_transparent_inputs() {
2554 let format =
2555 negotiate_output_format(Some("image/jpeg,image/png"), &artifact_with_alpha(true))
2556 .expect("negotiate output format")
2557 .expect("resolved output format");
2558
2559 assert_eq!(format, MediaType::Png);
2560 }
2561
2562 #[test]
2563 fn negotiate_output_format_prefers_avif_for_wildcard_accept() {
2564 let format = negotiate_output_format(Some("image/*"), &artifact_with_alpha(false))
2565 .expect("negotiate output format")
2566 .expect("resolved output format");
2567
2568 assert_eq!(format, MediaType::Avif);
2569 }
2570
2571 #[test]
2572 fn build_image_response_headers_include_cache_and_safety_metadata() {
2573 let headers = build_image_response_headers(
2574 MediaType::Webp,
2575 &build_image_etag(b"demo"),
2576 ImageResponsePolicy::PublicGet,
2577 true,
2578 CacheHitStatus::Disabled,
2579 DEFAULT_PUBLIC_MAX_AGE_SECONDS,
2580 DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS,
2581 );
2582
2583 assert!(headers.contains(&(
2584 "Cache-Control",
2585 "public, max-age=3600, stale-while-revalidate=60".to_string()
2586 )));
2587 assert!(headers.contains(&("Vary", "Accept".to_string())));
2588 assert!(headers.contains(&("X-Content-Type-Options", "nosniff".to_string())));
2589 assert!(headers.contains(&(
2590 "Content-Disposition",
2591 "inline; filename=\"truss.webp\"".to_string()
2592 )));
2593 assert!(headers.contains(&("Cache-Status", "\"truss\"; fwd=miss".to_string())));
2594 }
2595
2596 #[test]
2597 fn build_image_response_headers_include_csp_sandbox_for_svg() {
2598 let headers = build_image_response_headers(
2599 MediaType::Svg,
2600 &build_image_etag(b"svg-data"),
2601 ImageResponsePolicy::PublicGet,
2602 true,
2603 CacheHitStatus::Disabled,
2604 DEFAULT_PUBLIC_MAX_AGE_SECONDS,
2605 DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS,
2606 );
2607
2608 assert!(headers.contains(&("Content-Security-Policy", "sandbox".to_string())));
2609 }
2610
2611 #[test]
2612 fn build_image_response_headers_omit_csp_sandbox_for_raster() {
2613 let headers = build_image_response_headers(
2614 MediaType::Png,
2615 &build_image_etag(b"png-data"),
2616 ImageResponsePolicy::PublicGet,
2617 true,
2618 CacheHitStatus::Disabled,
2619 DEFAULT_PUBLIC_MAX_AGE_SECONDS,
2620 DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS,
2621 );
2622
2623 assert!(!headers.iter().any(|(k, _)| *k == "Content-Security-Policy"));
2624 }
2625
2626 #[test]
2627 fn backpressure_rejects_when_at_capacity() {
2628 let config = ServerConfig::new(std::env::temp_dir(), None);
2629 config
2630 .transforms_in_flight
2631 .store(DEFAULT_MAX_CONCURRENT_TRANSFORMS, Ordering::Relaxed);
2632
2633 let request = HttpRequest {
2634 method: "POST".to_string(),
2635 target: "/transform".to_string(),
2636 version: "HTTP/1.1".to_string(),
2637 headers: Vec::new(),
2638 body: Vec::new(),
2639 };
2640
2641 let png_bytes = {
2642 let mut buf = Vec::new();
2643 let encoder = image::codecs::png::PngEncoder::new(&mut buf);
2644 encoder
2645 .write_image(&[255, 0, 0, 255], 1, 1, image::ExtendedColorType::Rgba8)
2646 .unwrap();
2647 buf
2648 };
2649
2650 let response = transform_source_bytes(
2651 png_bytes,
2652 TransformOptions::default(),
2653 None,
2654 &request,
2655 ImageResponsePolicy::PrivateTransform,
2656 &config,
2657 );
2658
2659 assert!(response.status.contains("503"));
2660
2661 assert_eq!(
2662 config.transforms_in_flight.load(Ordering::Relaxed),
2663 DEFAULT_MAX_CONCURRENT_TRANSFORMS
2664 );
2665 }
2666
2667 #[test]
2668 fn backpressure_rejects_with_custom_concurrency_limit() {
2669 let custom_limit = 2u64;
2670 let mut config = ServerConfig::new(std::env::temp_dir(), None);
2671 config.max_concurrent_transforms = custom_limit;
2672 config
2673 .transforms_in_flight
2674 .store(custom_limit, Ordering::Relaxed);
2675
2676 let request = HttpRequest {
2677 method: "POST".to_string(),
2678 target: "/transform".to_string(),
2679 version: "HTTP/1.1".to_string(),
2680 headers: Vec::new(),
2681 body: Vec::new(),
2682 };
2683
2684 let png_bytes = {
2685 let mut buf = Vec::new();
2686 let encoder = image::codecs::png::PngEncoder::new(&mut buf);
2687 encoder
2688 .write_image(&[255, 0, 0, 255], 1, 1, image::ExtendedColorType::Rgba8)
2689 .unwrap();
2690 buf
2691 };
2692
2693 let response = transform_source_bytes(
2694 png_bytes,
2695 TransformOptions::default(),
2696 None,
2697 &request,
2698 ImageResponsePolicy::PrivateTransform,
2699 &config,
2700 );
2701
2702 assert!(response.status.contains("503"));
2703 }
2704
2705 #[test]
2706 fn compute_cache_key_is_deterministic() {
2707 let opts = TransformOptions {
2708 width: Some(300),
2709 height: Some(200),
2710 format: Some(MediaType::Webp),
2711 ..TransformOptions::default()
2712 };
2713 let key1 = super::cache::compute_cache_key("source-abc", &opts, None);
2714 let key2 = super::cache::compute_cache_key("source-abc", &opts, None);
2715 assert_eq!(key1, key2);
2716 assert_eq!(key1.len(), 64);
2717 }
2718
2719 #[test]
2720 fn compute_cache_key_differs_for_different_options() {
2721 let opts1 = TransformOptions {
2722 width: Some(300),
2723 format: Some(MediaType::Webp),
2724 ..TransformOptions::default()
2725 };
2726 let opts2 = TransformOptions {
2727 width: Some(400),
2728 format: Some(MediaType::Webp),
2729 ..TransformOptions::default()
2730 };
2731 let key1 = super::cache::compute_cache_key("same-source", &opts1, None);
2732 let key2 = super::cache::compute_cache_key("same-source", &opts2, None);
2733 assert_ne!(key1, key2);
2734 }
2735
2736 #[test]
2737 fn compute_cache_key_includes_accept_when_present() {
2738 let opts = TransformOptions::default();
2739 let key_no_accept = super::cache::compute_cache_key("src", &opts, None);
2740 let key_with_accept = super::cache::compute_cache_key("src", &opts, Some("image/webp"));
2741 assert_ne!(key_no_accept, key_with_accept);
2742 }
2743
2744 #[test]
2745 fn transform_cache_put_and_get_round_trips() {
2746 let dir = tempfile::tempdir().expect("create tempdir");
2747 let cache = super::cache::TransformCache::new(dir.path().to_path_buf());
2748
2749 cache.put(
2750 "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890",
2751 MediaType::Png,
2752 b"png-data",
2753 );
2754 let result = cache.get("abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890");
2755
2756 match result {
2757 super::cache::CacheLookup::Hit {
2758 media_type, body, ..
2759 } => {
2760 assert_eq!(media_type, MediaType::Png);
2761 assert_eq!(body, b"png-data");
2762 }
2763 super::cache::CacheLookup::Miss => panic!("expected cache hit"),
2764 }
2765 }
2766
2767 #[test]
2768 fn transform_cache_miss_for_unknown_key() {
2769 let dir = tempfile::tempdir().expect("create tempdir");
2770 let cache = super::cache::TransformCache::new(dir.path().to_path_buf());
2771
2772 let result = cache.get("0000001234567890abcdef1234567890abcdef1234567890abcdef1234567890");
2773 assert!(matches!(result, super::cache::CacheLookup::Miss));
2774 }
2775
2776 #[test]
2777 fn transform_cache_uses_sharded_layout() {
2778 let dir = tempfile::tempdir().expect("create tempdir");
2779 let cache = super::cache::TransformCache::new(dir.path().to_path_buf());
2780
2781 let key = "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890";
2782 cache.put(key, MediaType::Jpeg, b"jpeg-data");
2783
2784 let expected = dir.path().join("ab").join("cd").join("ef").join(key);
2785 assert!(
2786 expected.exists(),
2787 "sharded file should exist at {expected:?}"
2788 );
2789 }
2790
2791 #[test]
2792 fn transform_cache_expired_entry_is_miss() {
2793 let dir = tempfile::tempdir().expect("create tempdir");
2794 let mut cache = super::cache::TransformCache::new(dir.path().to_path_buf());
2795 cache.ttl = Duration::from_secs(0);
2796
2797 let key = "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890";
2798 cache.put(key, MediaType::Png, b"data");
2799
2800 std::thread::sleep(Duration::from_millis(10));
2801
2802 let result = cache.get(key);
2803 assert!(matches!(result, super::cache::CacheLookup::Miss));
2804 }
2805
2806 #[test]
2807 fn transform_cache_handles_corrupted_entry_as_miss() {
2808 let dir = tempfile::tempdir().expect("create tempdir");
2809 let cache = super::cache::TransformCache::new(dir.path().to_path_buf());
2810
2811 let key = "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890";
2812 let path = cache.entry_path(key);
2813 fs::create_dir_all(path.parent().unwrap()).unwrap();
2814 fs::write(&path, b"corrupted-data-without-header").unwrap();
2815
2816 let result = cache.get(key);
2817 assert!(matches!(result, super::cache::CacheLookup::Miss));
2818 }
2819
2820 #[test]
2821 fn cache_status_header_reflects_hit() {
2822 let headers = build_image_response_headers(
2823 MediaType::Png,
2824 &build_image_etag(b"data"),
2825 ImageResponsePolicy::PublicGet,
2826 false,
2827 CacheHitStatus::Hit,
2828 DEFAULT_PUBLIC_MAX_AGE_SECONDS,
2829 DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS,
2830 );
2831 assert!(headers.contains(&("Cache-Status", "\"truss\"; hit".to_string())));
2832 }
2833
2834 #[test]
2835 fn cache_status_header_reflects_miss() {
2836 let headers = build_image_response_headers(
2837 MediaType::Png,
2838 &build_image_etag(b"data"),
2839 ImageResponsePolicy::PublicGet,
2840 false,
2841 CacheHitStatus::Miss,
2842 DEFAULT_PUBLIC_MAX_AGE_SECONDS,
2843 DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS,
2844 );
2845 assert!(headers.contains(&("Cache-Status", "\"truss\"; fwd=miss".to_string())));
2846 }
2847
2848 #[test]
2849 fn origin_cache_put_and_get_round_trips() {
2850 let dir = tempfile::tempdir().expect("create tempdir");
2851 let cache = super::cache::OriginCache::new(dir.path());
2852
2853 cache.put("https://example.com/image.png", b"raw-source-bytes");
2854 let result = cache.get("https://example.com/image.png");
2855
2856 assert_eq!(result.as_deref(), Some(b"raw-source-bytes".as_ref()));
2857 }
2858
2859 #[test]
2860 fn origin_cache_miss_for_unknown_url() {
2861 let dir = tempfile::tempdir().expect("create tempdir");
2862 let cache = super::cache::OriginCache::new(dir.path());
2863
2864 assert!(
2865 cache
2866 .get("https://unknown.example.com/missing.png")
2867 .is_none()
2868 );
2869 }
2870
2871 #[test]
2872 fn origin_cache_expired_entry_is_none() {
2873 let dir = tempfile::tempdir().expect("create tempdir");
2874 let mut cache = super::cache::OriginCache::new(dir.path());
2875 cache.ttl = Duration::from_secs(0);
2876
2877 cache.put("https://example.com/img.png", b"data");
2878 std::thread::sleep(Duration::from_millis(10));
2879
2880 assert!(cache.get("https://example.com/img.png").is_none());
2881 }
2882
2883 #[test]
2884 fn origin_cache_uses_origin_subdirectory() {
2885 let dir = tempfile::tempdir().expect("create tempdir");
2886 let cache = super::cache::OriginCache::new(dir.path());
2887
2888 cache.put("https://example.com/test.png", b"bytes");
2889
2890 let origin_dir = dir.path().join("origin");
2891 assert!(origin_dir.exists(), "origin subdirectory should exist");
2892 }
2893
2894 #[test]
2895 fn sign_public_url_builds_a_signed_path_url() {
2896 let url = sign_public_url(
2897 "https://cdn.example.com",
2898 SignedUrlSource::Path {
2899 path: "/image.png".to_string(),
2900 version: Some("v1".to_string()),
2901 },
2902 &crate::TransformOptions {
2903 format: Some(MediaType::Jpeg),
2904 width: Some(320),
2905 ..crate::TransformOptions::default()
2906 },
2907 "public-dev",
2908 "secret-value",
2909 4_102_444_800,
2910 )
2911 .expect("sign public URL");
2912
2913 assert!(url.starts_with("https://cdn.example.com/images/by-path?"));
2914 assert!(url.contains("path=%2Fimage.png"));
2915 assert!(url.contains("version=v1"));
2916 assert!(url.contains("width=320"));
2917 assert!(url.contains("format=jpeg"));
2918 assert!(url.contains("keyId=public-dev"));
2919 assert!(url.contains("expires=4102444800"));
2920 assert!(url.contains("signature="));
2921 }
2922
2923 #[test]
2924 fn parse_public_get_request_rejects_unknown_query_parameters() {
2925 let query = BTreeMap::from([
2926 ("path".to_string(), "/image.png".to_string()),
2927 ("keyId".to_string(), "public-dev".to_string()),
2928 ("expires".to_string(), "4102444800".to_string()),
2929 ("signature".to_string(), "deadbeef".to_string()),
2930 ("unexpected".to_string(), "value".to_string()),
2931 ]);
2932
2933 let response = parse_public_get_request(&query, PublicSourceKind::Path)
2934 .expect_err("unknown query should fail");
2935
2936 assert_eq!(response.status, "400 Bad Request");
2937 assert!(response_body(&response).contains("is not supported"));
2938 }
2939
2940 #[test]
2941 fn prepare_remote_fetch_target_pins_the_validated_netloc() {
2942 let target = prepare_remote_fetch_target(
2943 "http://1.1.1.1/image.png",
2944 &ServerConfig::new(temp_dir("pin"), Some("secret".to_string())),
2945 )
2946 .expect("prepare remote target");
2947
2948 assert_eq!(target.netloc, "1.1.1.1:80");
2949 assert_eq!(target.addrs, vec![SocketAddr::from(([1, 1, 1, 1], 80))]);
2950 }
2951
2952 #[test]
2953 fn pinned_resolver_rejects_unexpected_netlocs() {
2954 use ureq::unversioned::resolver::Resolver;
2955
2956 let resolver = PinnedResolver {
2957 expected_netloc: "example.com:443".to_string(),
2958 addrs: vec![SocketAddr::from(([93, 184, 216, 34], 443))],
2959 };
2960
2961 let config = ureq::config::Config::builder().build();
2962 let timeout = ureq::unversioned::transport::NextTimeout {
2963 after: ureq::unversioned::transport::time::Duration::Exact(
2964 std::time::Duration::from_secs(30),
2965 ),
2966 reason: ureq::Timeout::Resolve,
2967 };
2968
2969 let uri: ureq::http::Uri = "https://example.com/path".parse().unwrap();
2970 let result = resolver
2971 .resolve(&uri, &config, timeout)
2972 .expect("resolve expected netloc");
2973 assert_eq!(&result[..], &[SocketAddr::from(([93, 184, 216, 34], 443))]);
2974
2975 let bad_uri: ureq::http::Uri = "https://proxy.example:8080/path".parse().unwrap();
2976 let timeout2 = ureq::unversioned::transport::NextTimeout {
2977 after: ureq::unversioned::transport::time::Duration::Exact(
2978 std::time::Duration::from_secs(30),
2979 ),
2980 reason: ureq::Timeout::Resolve,
2981 };
2982 let error = resolver
2983 .resolve(&bad_uri, &config, timeout2)
2984 .expect_err("unexpected netloc should fail");
2985 assert!(matches!(error, ureq::Error::HostNotFound));
2986 }
2987
2988 #[test]
2989 fn health_live_returns_status_service_version() {
2990 let request = HttpRequest {
2991 method: "GET".to_string(),
2992 target: "/health/live".to_string(),
2993 version: "HTTP/1.1".to_string(),
2994 headers: Vec::new(),
2995 body: Vec::new(),
2996 };
2997
2998 let response = route_request(request, &ServerConfig::new(temp_dir("live"), None));
2999
3000 assert_eq!(response.status, "200 OK");
3001 let body: serde_json::Value =
3002 serde_json::from_slice(&response.body).expect("parse live body");
3003 assert_eq!(body["status"], "ok");
3004 assert_eq!(body["service"], "truss");
3005 assert_eq!(body["version"], env!("CARGO_PKG_VERSION"));
3006 }
3007
3008 #[test]
3009 fn health_ready_returns_ok_when_storage_exists() {
3010 let storage = temp_dir("ready-ok");
3011 let request = HttpRequest {
3012 method: "GET".to_string(),
3013 target: "/health/ready".to_string(),
3014 version: "HTTP/1.1".to_string(),
3015 headers: Vec::new(),
3016 body: Vec::new(),
3017 };
3018
3019 let response = route_request(request, &ServerConfig::new(storage, None));
3020
3021 assert_eq!(response.status, "200 OK");
3022 let body: serde_json::Value =
3023 serde_json::from_slice(&response.body).expect("parse ready body");
3024 assert_eq!(body["status"], "ok");
3025 let checks = body["checks"].as_array().expect("checks array");
3026 assert!(
3027 checks
3028 .iter()
3029 .any(|c| c["name"] == "storageRoot" && c["status"] == "ok")
3030 );
3031 }
3032
3033 #[test]
3034 fn health_ready_returns_503_when_storage_missing() {
3035 let request = HttpRequest {
3036 method: "GET".to_string(),
3037 target: "/health/ready".to_string(),
3038 version: "HTTP/1.1".to_string(),
3039 headers: Vec::new(),
3040 body: Vec::new(),
3041 };
3042
3043 let config = ServerConfig::new(PathBuf::from("/nonexistent-truss-test-dir"), None);
3044 let response = route_request(request, &config);
3045
3046 assert_eq!(response.status, "503 Service Unavailable");
3047 let body: serde_json::Value =
3048 serde_json::from_slice(&response.body).expect("parse ready fail body");
3049 assert_eq!(body["status"], "fail");
3050 let checks = body["checks"].as_array().expect("checks array");
3051 assert!(
3052 checks
3053 .iter()
3054 .any(|c| c["name"] == "storageRoot" && c["status"] == "fail")
3055 );
3056 }
3057
3058 #[test]
3059 fn health_ready_returns_503_when_cache_root_missing() {
3060 let storage = temp_dir("ready-cache-fail");
3061 let mut config = ServerConfig::new(storage, None);
3062 config.cache_root = Some(PathBuf::from("/nonexistent-truss-cache-dir"));
3063
3064 let request = HttpRequest {
3065 method: "GET".to_string(),
3066 target: "/health/ready".to_string(),
3067 version: "HTTP/1.1".to_string(),
3068 headers: Vec::new(),
3069 body: Vec::new(),
3070 };
3071
3072 let response = route_request(request, &config);
3073
3074 assert_eq!(response.status, "503 Service Unavailable");
3075 let body: serde_json::Value =
3076 serde_json::from_slice(&response.body).expect("parse ready cache body");
3077 assert_eq!(body["status"], "fail");
3078 let checks = body["checks"].as_array().expect("checks array");
3079 assert!(
3080 checks
3081 .iter()
3082 .any(|c| c["name"] == "cacheRoot" && c["status"] == "fail")
3083 );
3084 }
3085
3086 #[test]
3087 fn health_returns_comprehensive_diagnostic() {
3088 let storage = temp_dir("health-diag");
3089 let request = HttpRequest {
3090 method: "GET".to_string(),
3091 target: "/health".to_string(),
3092 version: "HTTP/1.1".to_string(),
3093 headers: Vec::new(),
3094 body: Vec::new(),
3095 };
3096
3097 let response = route_request(request, &ServerConfig::new(storage, None));
3098
3099 assert_eq!(response.status, "200 OK");
3100 let body: serde_json::Value =
3101 serde_json::from_slice(&response.body).expect("parse health body");
3102 assert_eq!(body["status"], "ok");
3103 assert_eq!(body["service"], "truss");
3104 assert_eq!(body["version"], env!("CARGO_PKG_VERSION"));
3105 assert!(body["uptimeSeconds"].is_u64());
3106 assert!(body["checks"].is_array());
3107 }
3108
3109 #[test]
3110 fn unknown_path_returns_not_found() {
3111 let request = HttpRequest {
3112 method: "GET".to_string(),
3113 target: "/unknown".to_string(),
3114 version: "HTTP/1.1".to_string(),
3115 headers: Vec::new(),
3116 body: Vec::new(),
3117 };
3118
3119 let response = route_request(request, &ServerConfig::new(temp_dir("not-found"), None));
3120
3121 assert_eq!(response.status, "404 Not Found");
3122 assert_eq!(response.content_type, Some("application/problem+json"));
3123 let body = response_body(&response);
3124 assert!(body.contains("\"type\":\"about:blank\""));
3125 assert!(body.contains("\"title\":\"Not Found\""));
3126 assert!(body.contains("\"status\":404"));
3127 assert!(body.contains("not found"));
3128 }
3129
3130 #[test]
3131 fn transform_endpoint_requires_authentication() {
3132 let storage_root = temp_dir("auth");
3133 write_png(&storage_root.join("image.png"));
3134 let mut request = transform_request("/image.png");
3135 request.headers.retain(|(name, _)| name != "authorization");
3136
3137 let response = route_request(
3138 request,
3139 &ServerConfig::new(storage_root, Some("secret".to_string())),
3140 );
3141
3142 assert_eq!(response.status, "401 Unauthorized");
3143 assert!(response_body(&response).contains("authorization required"));
3144 }
3145
3146 #[test]
3147 fn transform_endpoint_returns_service_unavailable_without_configured_token() {
3148 let storage_root = temp_dir("token");
3149 write_png(&storage_root.join("image.png"));
3150
3151 let response = route_request(
3152 transform_request("/image.png"),
3153 &ServerConfig::new(storage_root, None),
3154 );
3155
3156 assert_eq!(response.status, "503 Service Unavailable");
3157 assert!(response_body(&response).contains("bearer token is not configured"));
3158 }
3159
3160 #[test]
3161 fn transform_endpoint_transforms_a_path_source() {
3162 let storage_root = temp_dir("transform");
3163 write_png(&storage_root.join("image.png"));
3164
3165 let response = route_request(
3166 transform_request("/image.png"),
3167 &ServerConfig::new(storage_root, Some("secret".to_string())),
3168 );
3169
3170 assert_eq!(response.status, "200 OK");
3171 assert_eq!(response.content_type, Some("image/jpeg"));
3172
3173 let artifact = sniff_artifact(RawArtifact::new(response.body, None)).expect("sniff output");
3174 assert_eq!(artifact.media_type, MediaType::Jpeg);
3175 assert_eq!(artifact.metadata.width, Some(4));
3176 assert_eq!(artifact.metadata.height, Some(3));
3177 }
3178
3179 #[test]
3180 fn transform_endpoint_rejects_private_url_sources_by_default() {
3181 let response = route_request(
3182 transform_url_request("http://127.0.0.1:8080/image.png"),
3183 &ServerConfig::new(temp_dir("url-blocked"), Some("secret".to_string())),
3184 );
3185
3186 assert_eq!(response.status, "403 Forbidden");
3187 assert!(response_body(&response).contains("port is not allowed"));
3188 }
3189
3190 #[test]
3191 fn transform_endpoint_transforms_a_url_source_when_insecure_allowance_is_enabled() {
3192 let (url, handle) = spawn_http_server(vec![(
3193 "200 OK".to_string(),
3194 vec![("Content-Type".to_string(), "image/png".to_string())],
3195 png_bytes(),
3196 )]);
3197
3198 let response = route_request(
3199 transform_url_request(&url),
3200 &ServerConfig::new(temp_dir("url"), Some("secret".to_string()))
3201 .with_insecure_url_sources(true),
3202 );
3203
3204 handle.join().expect("join fixture server");
3205
3206 assert_eq!(response.status, "200 OK");
3207 assert_eq!(response.content_type, Some("image/jpeg"));
3208
3209 let artifact = sniff_artifact(RawArtifact::new(response.body, None)).expect("sniff output");
3210 assert_eq!(artifact.media_type, MediaType::Jpeg);
3211 }
3212
3213 #[test]
3214 fn transform_endpoint_follows_remote_redirects() {
3215 let (redirect_url, handle) = spawn_http_server(vec![
3216 (
3217 "302 Found".to_string(),
3218 vec![("Location".to_string(), "/final-image".to_string())],
3219 Vec::new(),
3220 ),
3221 (
3222 "200 OK".to_string(),
3223 vec![("Content-Type".to_string(), "image/png".to_string())],
3224 png_bytes(),
3225 ),
3226 ]);
3227
3228 let response = route_request(
3229 transform_url_request(&redirect_url),
3230 &ServerConfig::new(temp_dir("redirect"), Some("secret".to_string()))
3231 .with_insecure_url_sources(true),
3232 );
3233
3234 handle.join().expect("join fixture server");
3235
3236 assert_eq!(response.status, "200 OK");
3237 let artifact = sniff_artifact(RawArtifact::new(response.body, None)).expect("sniff output");
3238 assert_eq!(artifact.media_type, MediaType::Jpeg);
3239 }
3240
3241 #[test]
3242 fn upload_endpoint_transforms_uploaded_file() {
3243 let response = route_request(
3244 upload_request(&png_bytes(), Some(r#"{"format":"jpeg"}"#)),
3245 &ServerConfig::new(temp_dir("upload"), Some("secret".to_string())),
3246 );
3247
3248 assert_eq!(response.status, "200 OK");
3249 assert_eq!(response.content_type, Some("image/jpeg"));
3250
3251 let artifact = sniff_artifact(RawArtifact::new(response.body, None)).expect("sniff output");
3252 assert_eq!(artifact.media_type, MediaType::Jpeg);
3253 }
3254
3255 #[test]
3256 fn upload_endpoint_requires_a_file_field() {
3257 let boundary = "truss-test-boundary";
3258 let request = HttpRequest {
3259 method: "POST".to_string(),
3260 target: "/images".to_string(),
3261 version: "HTTP/1.1".to_string(),
3262 headers: vec![
3263 ("authorization".to_string(), "Bearer secret".to_string()),
3264 (
3265 "content-type".to_string(),
3266 format!("multipart/form-data; boundary={boundary}"),
3267 ),
3268 ],
3269 body: format!(
3270 "--{boundary}\r\nContent-Disposition: form-data; name=\"options\"\r\nContent-Type: application/json\r\n\r\n{{\"format\":\"jpeg\"}}\r\n--{boundary}--\r\n"
3271 )
3272 .into_bytes(),
3273 };
3274
3275 let response = route_request(
3276 request,
3277 &ServerConfig::new(temp_dir("upload-missing-file"), Some("secret".to_string())),
3278 );
3279
3280 assert_eq!(response.status, "400 Bad Request");
3281 assert!(response_body(&response).contains("requires a `file` field"));
3282 }
3283
3284 #[test]
3285 fn upload_endpoint_rejects_non_multipart_content_type() {
3286 let request = HttpRequest {
3287 method: "POST".to_string(),
3288 target: "/images".to_string(),
3289 version: "HTTP/1.1".to_string(),
3290 headers: vec![
3291 ("authorization".to_string(), "Bearer secret".to_string()),
3292 ("content-type".to_string(), "application/json".to_string()),
3293 ],
3294 body: br#"{"file":"not-really-json"}"#.to_vec(),
3295 };
3296
3297 let response = route_request(
3298 request,
3299 &ServerConfig::new(temp_dir("upload-content-type"), Some("secret".to_string())),
3300 );
3301
3302 assert_eq!(response.status, "415 Unsupported Media Type");
3303 assert!(response_body(&response).contains("multipart/form-data"));
3304 }
3305
3306 #[test]
3307 fn parse_upload_request_extracts_file_and_options() {
3308 let request = upload_request(&png_bytes(), Some(r#"{"width":8,"format":"jpeg"}"#));
3309 let boundary =
3310 super::multipart::parse_multipart_boundary(&request).expect("parse boundary");
3311 let (file_bytes, options) =
3312 super::multipart::parse_upload_request(&request.body, &boundary)
3313 .expect("parse upload body");
3314
3315 assert_eq!(file_bytes, png_bytes());
3316 assert_eq!(options.width, Some(8));
3317 assert_eq!(options.format, Some(MediaType::Jpeg));
3318 }
3319
3320 #[test]
3321 fn metrics_endpoint_requires_authentication() {
3322 let response = route_request(
3323 metrics_request(false),
3324 &ServerConfig::new(temp_dir("metrics-auth"), Some("secret".to_string())),
3325 );
3326
3327 assert_eq!(response.status, "401 Unauthorized");
3328 assert!(response_body(&response).contains("authorization required"));
3329 }
3330
3331 #[test]
3332 fn metrics_endpoint_returns_prometheus_text() {
3333 super::metrics::record_http_metrics(super::metrics::RouteMetric::Health, "200 OK");
3334 let response = route_request(
3335 metrics_request(true),
3336 &ServerConfig::new(temp_dir("metrics"), Some("secret".to_string())),
3337 );
3338 let body = response_body(&response);
3339
3340 assert_eq!(response.status, "200 OK");
3341 assert_eq!(
3342 response.content_type,
3343 Some("text/plain; version=0.0.4; charset=utf-8")
3344 );
3345 assert!(body.contains("truss_http_requests_total"));
3346 assert!(body.contains("truss_http_requests_by_route_total{route=\"/health\"}"));
3347 assert!(body.contains("truss_http_responses_total{status=\"200\"}"));
3348 }
3349
3350 #[test]
3351 fn transform_endpoint_rejects_unsupported_remote_content_encoding() {
3352 let (url, handle) = spawn_http_server(vec![(
3353 "200 OK".to_string(),
3354 vec![
3355 ("Content-Type".to_string(), "image/png".to_string()),
3356 ("Content-Encoding".to_string(), "compress".to_string()),
3357 ],
3358 png_bytes(),
3359 )]);
3360
3361 let response = route_request(
3362 transform_url_request(&url),
3363 &ServerConfig::new(temp_dir("encoding"), Some("secret".to_string()))
3364 .with_insecure_url_sources(true),
3365 );
3366
3367 handle.join().expect("join fixture server");
3368
3369 assert_eq!(response.status, "502 Bad Gateway");
3370 assert!(response_body(&response).contains("unsupported content-encoding"));
3371 }
3372
3373 #[test]
3374 fn resolve_storage_path_rejects_parent_segments() {
3375 let storage_root = temp_dir("resolve");
3376 let response = resolve_storage_path(&storage_root, "../escape.png")
3377 .expect_err("parent segments should be rejected");
3378
3379 assert_eq!(response.status, "400 Bad Request");
3380 assert!(response_body(&response).contains("must not contain root"));
3381 }
3382
3383 #[test]
3384 fn read_request_parses_headers_and_body() {
3385 let request_bytes = b"POST /images:transform HTTP/1.1\r\nHost: localhost\r\nContent-Type: application/json\r\nContent-Length: 2\r\n\r\n{}";
3386 let mut cursor = Cursor::new(request_bytes);
3387 let request = read_request(&mut cursor).expect("parse request");
3388
3389 assert_eq!(request.method, "POST");
3390 assert_eq!(request.target, "/images:transform");
3391 assert_eq!(request.version, "HTTP/1.1");
3392 assert_eq!(request.header("host"), Some("localhost"));
3393 assert_eq!(request.body, b"{}");
3394 }
3395
3396 #[test]
3397 fn read_request_rejects_duplicate_content_length() {
3398 let request_bytes =
3399 b"POST /images:transform HTTP/1.1\r\nContent-Length: 2\r\nContent-Length: 2\r\n\r\n{}";
3400 let mut cursor = Cursor::new(request_bytes);
3401 let response = read_request(&mut cursor).expect_err("duplicate headers should fail");
3402
3403 assert_eq!(response.status, "400 Bad Request");
3404 assert!(response_body(&response).contains("content-length"));
3405 }
3406
3407 #[test]
3408 fn serve_once_handles_a_tcp_request() {
3409 let storage_root = temp_dir("serve-once");
3410 let config = ServerConfig::new(storage_root, None);
3411 let listener = TcpListener::bind("127.0.0.1:0").expect("bind test listener");
3412 let addr = listener.local_addr().expect("read local addr");
3413
3414 let server = thread::spawn(move || serve_once_with_config(listener, config));
3415
3416 let mut stream = TcpStream::connect(addr).expect("connect to test server");
3417 stream
3418 .write_all(b"GET /health/live HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n")
3419 .expect("write request");
3420
3421 let mut response = String::new();
3422 stream.read_to_string(&mut response).expect("read response");
3423
3424 server
3425 .join()
3426 .expect("join test server thread")
3427 .expect("serve one request");
3428
3429 assert!(response.starts_with("HTTP/1.1 200 OK"));
3430 assert!(response.contains("Content-Type: application/json"));
3431 assert!(response.contains("\"status\":\"ok\""));
3432 assert!(response.contains("\"service\":\"truss\""));
3433 assert!(response.contains("\"version\":"));
3434 }
3435
3436 #[test]
3437 fn helper_error_responses_use_rfc7807_problem_details() {
3438 let response = auth_required_response("authorization required");
3439 let bad_request = bad_request_response("bad input");
3440
3441 assert_eq!(
3442 response.content_type,
3443 Some("application/problem+json"),
3444 "error responses must use application/problem+json"
3445 );
3446 assert_eq!(bad_request.content_type, Some("application/problem+json"),);
3447
3448 let auth_body = response_body(&response);
3449 assert!(auth_body.contains("authorization required"));
3450 assert!(auth_body.contains("\"type\":\"about:blank\""));
3451 assert!(auth_body.contains("\"title\":\"Unauthorized\""));
3452 assert!(auth_body.contains("\"status\":401"));
3453
3454 let bad_body = response_body(&bad_request);
3455 assert!(bad_body.contains("bad input"));
3456 assert!(bad_body.contains("\"type\":\"about:blank\""));
3457 assert!(bad_body.contains("\"title\":\"Bad Request\""));
3458 assert!(bad_body.contains("\"status\":400"));
3459 }
3460
3461 #[test]
3462 fn parse_headers_rejects_duplicate_host() {
3463 let lines = "Host: example.com\r\nHost: evil.com\r\n";
3464 let result = super::http_parse::parse_headers(lines.split("\r\n"));
3465 assert!(result.is_err());
3466 }
3467
3468 #[test]
3469 fn parse_headers_rejects_duplicate_authorization() {
3470 let lines = "Authorization: Bearer a\r\nAuthorization: Bearer b\r\n";
3471 let result = super::http_parse::parse_headers(lines.split("\r\n"));
3472 assert!(result.is_err());
3473 }
3474
3475 #[test]
3476 fn parse_headers_rejects_duplicate_content_type() {
3477 let lines = "Content-Type: application/json\r\nContent-Type: text/plain\r\n";
3478 let result = super::http_parse::parse_headers(lines.split("\r\n"));
3479 assert!(result.is_err());
3480 }
3481
3482 #[test]
3483 fn parse_headers_rejects_duplicate_transfer_encoding() {
3484 let lines = "Transfer-Encoding: chunked\r\nTransfer-Encoding: gzip\r\n";
3485 let result = super::http_parse::parse_headers(lines.split("\r\n"));
3486 assert!(result.is_err());
3487 }
3488
3489 #[test]
3490 fn parse_headers_rejects_single_transfer_encoding() {
3491 let lines = "Host: example.com\r\nTransfer-Encoding: chunked\r\n";
3492 let result = super::http_parse::parse_headers(lines.split("\r\n"));
3493 let err = result.unwrap_err();
3494 assert!(
3495 err.status.starts_with("501"),
3496 "expected 501 status, got: {}",
3497 err.status
3498 );
3499 assert!(
3500 String::from_utf8_lossy(&err.body).contains("Transfer-Encoding"),
3501 "error response should mention Transfer-Encoding"
3502 );
3503 }
3504
3505 #[test]
3506 fn parse_headers_rejects_transfer_encoding_identity() {
3507 let lines = "Transfer-Encoding: identity\r\n";
3508 let result = super::http_parse::parse_headers(lines.split("\r\n"));
3509 assert!(result.is_err());
3510 }
3511
3512 #[test]
3513 fn parse_headers_allows_single_instances_of_singleton_headers() {
3514 let lines =
3515 "Host: example.com\r\nAuthorization: Bearer tok\r\nContent-Type: application/json\r\n";
3516 let result = super::http_parse::parse_headers(lines.split("\r\n"));
3517 assert!(result.is_ok());
3518 assert_eq!(result.unwrap().len(), 3);
3519 }
3520
3521 #[test]
3522 fn max_body_for_multipart_uses_upload_limit() {
3523 let headers = vec![(
3524 "content-type".to_string(),
3525 "multipart/form-data; boundary=abc".to_string(),
3526 )];
3527 assert_eq!(
3528 super::http_parse::max_body_for_headers(&headers),
3529 super::http_parse::MAX_UPLOAD_BODY_BYTES
3530 );
3531 }
3532
3533 #[test]
3534 fn max_body_for_json_uses_default_limit() {
3535 let headers = vec![("content-type".to_string(), "application/json".to_string())];
3536 assert_eq!(
3537 super::http_parse::max_body_for_headers(&headers),
3538 super::http_parse::MAX_REQUEST_BODY_BYTES
3539 );
3540 }
3541
3542 #[test]
3543 fn max_body_for_no_content_type_uses_default_limit() {
3544 let headers: Vec<(String, String)> = vec![];
3545 assert_eq!(
3546 super::http_parse::max_body_for_headers(&headers),
3547 super::http_parse::MAX_REQUEST_BODY_BYTES
3548 );
3549 }
3550
3551 fn make_test_config() -> ServerConfig {
3552 ServerConfig::new(std::env::temp_dir(), None)
3553 }
3554
3555 #[test]
3556 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
3557 fn storage_backend_parse_filesystem_aliases() {
3558 assert_eq!(
3559 super::StorageBackend::parse("filesystem").unwrap(),
3560 super::StorageBackend::Filesystem
3561 );
3562 assert_eq!(
3563 super::StorageBackend::parse("fs").unwrap(),
3564 super::StorageBackend::Filesystem
3565 );
3566 assert_eq!(
3567 super::StorageBackend::parse("local").unwrap(),
3568 super::StorageBackend::Filesystem
3569 );
3570 }
3571
3572 #[test]
3573 #[cfg(feature = "s3")]
3574 fn storage_backend_parse_s3() {
3575 assert_eq!(
3576 super::StorageBackend::parse("s3").unwrap(),
3577 super::StorageBackend::S3
3578 );
3579 assert_eq!(
3580 super::StorageBackend::parse("S3").unwrap(),
3581 super::StorageBackend::S3
3582 );
3583 }
3584
3585 #[test]
3586 #[cfg(feature = "gcs")]
3587 fn storage_backend_parse_gcs() {
3588 assert_eq!(
3589 super::StorageBackend::parse("gcs").unwrap(),
3590 super::StorageBackend::Gcs
3591 );
3592 assert_eq!(
3593 super::StorageBackend::parse("GCS").unwrap(),
3594 super::StorageBackend::Gcs
3595 );
3596 }
3597
3598 #[test]
3599 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
3600 fn storage_backend_parse_rejects_unknown() {
3601 assert!(super::StorageBackend::parse("").is_err());
3602 #[cfg(not(feature = "azure"))]
3603 assert!(super::StorageBackend::parse("azure").is_err());
3604 #[cfg(feature = "azure")]
3605 assert!(super::StorageBackend::parse("azure").is_ok());
3606 }
3607
3608 #[test]
3609 fn versioned_source_hash_returns_none_without_version() {
3610 let source = TransformSourcePayload::Path {
3611 path: "/photos/hero.jpg".to_string(),
3612 version: None,
3613 };
3614 assert!(source.versioned_source_hash(&make_test_config()).is_none());
3615 }
3616
3617 #[test]
3618 fn versioned_source_hash_is_deterministic() {
3619 let cfg = make_test_config();
3620 let source = TransformSourcePayload::Path {
3621 path: "/photos/hero.jpg".to_string(),
3622 version: Some("v1".to_string()),
3623 };
3624 let hash1 = source.versioned_source_hash(&cfg).unwrap();
3625 let hash2 = source.versioned_source_hash(&cfg).unwrap();
3626 assert_eq!(hash1, hash2);
3627 assert_eq!(hash1.len(), 64);
3628 }
3629
3630 #[test]
3631 fn versioned_source_hash_differs_by_version() {
3632 let cfg = make_test_config();
3633 let v1 = TransformSourcePayload::Path {
3634 path: "/photos/hero.jpg".to_string(),
3635 version: Some("v1".to_string()),
3636 };
3637 let v2 = TransformSourcePayload::Path {
3638 path: "/photos/hero.jpg".to_string(),
3639 version: Some("v2".to_string()),
3640 };
3641 assert_ne!(
3642 v1.versioned_source_hash(&cfg).unwrap(),
3643 v2.versioned_source_hash(&cfg).unwrap()
3644 );
3645 }
3646
3647 #[test]
3648 fn versioned_source_hash_differs_by_kind() {
3649 let cfg = make_test_config();
3650 let path = TransformSourcePayload::Path {
3651 path: "example.com/image.jpg".to_string(),
3652 version: Some("v1".to_string()),
3653 };
3654 let url = TransformSourcePayload::Url {
3655 url: "example.com/image.jpg".to_string(),
3656 version: Some("v1".to_string()),
3657 };
3658 assert_ne!(
3659 path.versioned_source_hash(&cfg).unwrap(),
3660 url.versioned_source_hash(&cfg).unwrap()
3661 );
3662 }
3663
3664 #[test]
3665 fn versioned_source_hash_differs_by_storage_root() {
3666 let cfg1 = ServerConfig::new(PathBuf::from("/data/images"), None);
3667 let cfg2 = ServerConfig::new(PathBuf::from("/other/images"), None);
3668 let source = TransformSourcePayload::Path {
3669 path: "/photos/hero.jpg".to_string(),
3670 version: Some("v1".to_string()),
3671 };
3672 assert_ne!(
3673 source.versioned_source_hash(&cfg1).unwrap(),
3674 source.versioned_source_hash(&cfg2).unwrap()
3675 );
3676 }
3677
3678 #[test]
3679 fn versioned_source_hash_differs_by_insecure_flag() {
3680 let mut cfg1 = make_test_config();
3681 cfg1.allow_insecure_url_sources = false;
3682 let mut cfg2 = make_test_config();
3683 cfg2.allow_insecure_url_sources = true;
3684 let source = TransformSourcePayload::Url {
3685 url: "http://example.com/img.jpg".to_string(),
3686 version: Some("v1".to_string()),
3687 };
3688 assert_ne!(
3689 source.versioned_source_hash(&cfg1).unwrap(),
3690 source.versioned_source_hash(&cfg2).unwrap()
3691 );
3692 }
3693
3694 #[test]
3695 #[cfg(feature = "s3")]
3696 fn versioned_source_hash_storage_variant_is_deterministic() {
3697 let cfg = make_test_config();
3698 let source = TransformSourcePayload::Storage {
3699 bucket: Some("my-bucket".to_string()),
3700 key: "photos/hero.jpg".to_string(),
3701 version: Some("v1".to_string()),
3702 };
3703 let hash1 = source.versioned_source_hash(&cfg).unwrap();
3704 let hash2 = source.versioned_source_hash(&cfg).unwrap();
3705 assert_eq!(hash1, hash2);
3706 assert_eq!(hash1.len(), 64);
3707 }
3708
3709 #[test]
3710 #[cfg(feature = "s3")]
3711 fn versioned_source_hash_storage_differs_from_path() {
3712 let cfg = make_test_config();
3713 let path_source = TransformSourcePayload::Path {
3714 path: "photos/hero.jpg".to_string(),
3715 version: Some("v1".to_string()),
3716 };
3717 let storage_source = TransformSourcePayload::Storage {
3718 bucket: Some("my-bucket".to_string()),
3719 key: "photos/hero.jpg".to_string(),
3720 version: Some("v1".to_string()),
3721 };
3722 assert_ne!(
3723 path_source.versioned_source_hash(&cfg).unwrap(),
3724 storage_source.versioned_source_hash(&cfg).unwrap()
3725 );
3726 }
3727
3728 #[test]
3729 #[cfg(feature = "s3")]
3730 fn versioned_source_hash_storage_differs_by_bucket() {
3731 let cfg = make_test_config();
3732 let s1 = TransformSourcePayload::Storage {
3733 bucket: Some("bucket-a".to_string()),
3734 key: "image.jpg".to_string(),
3735 version: Some("v1".to_string()),
3736 };
3737 let s2 = TransformSourcePayload::Storage {
3738 bucket: Some("bucket-b".to_string()),
3739 key: "image.jpg".to_string(),
3740 version: Some("v1".to_string()),
3741 };
3742 assert_ne!(
3743 s1.versioned_source_hash(&cfg).unwrap(),
3744 s2.versioned_source_hash(&cfg).unwrap()
3745 );
3746 }
3747
3748 #[test]
3749 #[cfg(feature = "s3")]
3750 fn versioned_source_hash_differs_by_backend() {
3751 let cfg_fs = make_test_config();
3752 let mut cfg_s3 = make_test_config();
3753 cfg_s3.storage_backend = super::StorageBackend::S3;
3754
3755 let source = TransformSourcePayload::Path {
3756 path: "photos/hero.jpg".to_string(),
3757 version: Some("v1".to_string()),
3758 };
3759 assert_ne!(
3760 source.versioned_source_hash(&cfg_fs).unwrap(),
3761 source.versioned_source_hash(&cfg_s3).unwrap()
3762 );
3763 }
3764
3765 #[test]
3766 #[cfg(feature = "s3")]
3767 fn versioned_source_hash_storage_differs_by_endpoint() {
3768 let mut cfg_a = make_test_config();
3769 cfg_a.storage_backend = super::StorageBackend::S3;
3770 cfg_a.s3_context = Some(std::sync::Arc::new(super::s3::S3Context::for_test(
3771 "shared",
3772 Some("http://minio-a:9000"),
3773 )));
3774
3775 let mut cfg_b = make_test_config();
3776 cfg_b.storage_backend = super::StorageBackend::S3;
3777 cfg_b.s3_context = Some(std::sync::Arc::new(super::s3::S3Context::for_test(
3778 "shared",
3779 Some("http://minio-b:9000"),
3780 )));
3781
3782 let source = TransformSourcePayload::Storage {
3783 bucket: None,
3784 key: "image.jpg".to_string(),
3785 version: Some("v1".to_string()),
3786 };
3787 assert_ne!(
3788 source.versioned_source_hash(&cfg_a).unwrap(),
3789 source.versioned_source_hash(&cfg_b).unwrap(),
3790 );
3791 assert_ne!(cfg_a, cfg_b);
3792 }
3793
3794 #[test]
3795 #[cfg(feature = "s3")]
3796 fn storage_backend_default_is_filesystem() {
3797 let cfg = make_test_config();
3798 assert_eq!(cfg.storage_backend, super::StorageBackend::Filesystem);
3799 assert!(cfg.s3_context.is_none());
3800 }
3801
3802 #[test]
3803 #[cfg(feature = "s3")]
3804 fn storage_payload_deserializes_storage_variant() {
3805 let json = r#"{"source":{"kind":"storage","key":"photos/hero.jpg"},"options":{}}"#;
3806 let payload: super::TransformImageRequestPayload = serde_json::from_str(json).unwrap();
3807 match payload.source {
3808 TransformSourcePayload::Storage {
3809 bucket,
3810 key,
3811 version,
3812 } => {
3813 assert!(bucket.is_none());
3814 assert_eq!(key, "photos/hero.jpg");
3815 assert!(version.is_none());
3816 }
3817 _ => panic!("expected Storage variant"),
3818 }
3819 }
3820
3821 #[test]
3822 #[cfg(feature = "s3")]
3823 fn storage_payload_deserializes_with_bucket() {
3824 let json = r#"{"source":{"kind":"storage","bucket":"my-bucket","key":"img.png","version":"v2"},"options":{}}"#;
3825 let payload: super::TransformImageRequestPayload = serde_json::from_str(json).unwrap();
3826 match payload.source {
3827 TransformSourcePayload::Storage {
3828 bucket,
3829 key,
3830 version,
3831 } => {
3832 assert_eq!(bucket.as_deref(), Some("my-bucket"));
3833 assert_eq!(key, "img.png");
3834 assert_eq!(version.as_deref(), Some("v2"));
3835 }
3836 _ => panic!("expected Storage variant"),
3837 }
3838 }
3839
3840 #[test]
3845 #[cfg(feature = "s3")]
3846 fn versioned_source_hash_uses_default_bucket_when_bucket_is_none() {
3847 let mut cfg_a = make_test_config();
3848 cfg_a.storage_backend = super::StorageBackend::S3;
3849 cfg_a.s3_context = Some(std::sync::Arc::new(super::s3::S3Context::for_test(
3850 "bucket-a", None,
3851 )));
3852
3853 let mut cfg_b = make_test_config();
3854 cfg_b.storage_backend = super::StorageBackend::S3;
3855 cfg_b.s3_context = Some(std::sync::Arc::new(super::s3::S3Context::for_test(
3856 "bucket-b", None,
3857 )));
3858
3859 let source = TransformSourcePayload::Storage {
3860 bucket: None,
3861 key: "image.jpg".to_string(),
3862 version: Some("v1".to_string()),
3863 };
3864 assert_ne!(
3866 source.versioned_source_hash(&cfg_a).unwrap(),
3867 source.versioned_source_hash(&cfg_b).unwrap(),
3868 );
3869 assert_ne!(cfg_a, cfg_b);
3871 }
3872
3873 #[test]
3874 #[cfg(feature = "s3")]
3875 fn versioned_source_hash_returns_none_without_bucket_or_context() {
3876 let mut cfg = make_test_config();
3877 cfg.storage_backend = super::StorageBackend::S3;
3878 cfg.s3_context = None;
3879
3880 let source = TransformSourcePayload::Storage {
3881 bucket: None,
3882 key: "image.jpg".to_string(),
3883 version: Some("v1".to_string()),
3884 };
3885 assert!(source.versioned_source_hash(&cfg).is_none());
3887 }
3888
3889 #[cfg(feature = "s3")]
3898 static FROM_ENV_MUTEX: std::sync::Mutex<()> = std::sync::Mutex::new(());
3899
3900 #[cfg(feature = "s3")]
3901 const S3_ENV_VARS: &[&str] = &[
3902 "TRUSS_STORAGE_ROOT",
3903 "TRUSS_STORAGE_BACKEND",
3904 "TRUSS_S3_BUCKET",
3905 ];
3906
3907 #[cfg(feature = "s3")]
3910 fn with_s3_env<F: FnOnce()>(vars: &[(&str, Option<&str>)], f: F) {
3911 let _guard = FROM_ENV_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
3912 let saved: Vec<(&str, Option<String>)> = S3_ENV_VARS
3913 .iter()
3914 .map(|k| (*k, std::env::var(k).ok()))
3915 .collect();
3916 for &(key, value) in vars {
3918 match value {
3919 Some(v) => unsafe { std::env::set_var(key, v) },
3920 None => unsafe { std::env::remove_var(key) },
3921 }
3922 }
3923 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(f));
3924 for (key, original) in saved {
3926 match original {
3927 Some(v) => unsafe { std::env::set_var(key, v) },
3928 None => unsafe { std::env::remove_var(key) },
3929 }
3930 }
3931 if let Err(payload) = result {
3932 std::panic::resume_unwind(payload);
3933 }
3934 }
3935
3936 #[test]
3937 #[cfg(feature = "s3")]
3938 fn from_env_rejects_invalid_storage_backend() {
3939 let storage = temp_dir("env-bad-backend");
3940 let storage_str = storage.to_str().unwrap().to_string();
3941 with_s3_env(
3942 &[
3943 ("TRUSS_STORAGE_ROOT", Some(&storage_str)),
3944 ("TRUSS_STORAGE_BACKEND", Some("nosuchbackend")),
3945 ("TRUSS_S3_BUCKET", None),
3946 ],
3947 || {
3948 let result = ServerConfig::from_env();
3949 assert!(result.is_err());
3950 let msg = result.unwrap_err().to_string();
3951 assert!(msg.contains("unknown storage backend"), "got: {msg}");
3952 },
3953 );
3954 let _ = std::fs::remove_dir_all(storage);
3955 }
3956
3957 #[test]
3958 #[cfg(feature = "s3")]
3959 fn from_env_rejects_s3_without_bucket() {
3960 let storage = temp_dir("env-no-bucket");
3961 let storage_str = storage.to_str().unwrap().to_string();
3962 with_s3_env(
3963 &[
3964 ("TRUSS_STORAGE_ROOT", Some(&storage_str)),
3965 ("TRUSS_STORAGE_BACKEND", Some("s3")),
3966 ("TRUSS_S3_BUCKET", None),
3967 ],
3968 || {
3969 let result = ServerConfig::from_env();
3970 assert!(result.is_err());
3971 let msg = result.unwrap_err().to_string();
3972 assert!(msg.contains("TRUSS_S3_BUCKET"), "got: {msg}");
3973 },
3974 );
3975 let _ = std::fs::remove_dir_all(storage);
3976 }
3977
3978 #[test]
3979 #[cfg(feature = "s3")]
3980 fn from_env_accepts_s3_with_bucket() {
3981 let storage = temp_dir("env-s3-ok");
3982 let storage_str = storage.to_str().unwrap().to_string();
3983 with_s3_env(
3984 &[
3985 ("TRUSS_STORAGE_ROOT", Some(&storage_str)),
3986 ("TRUSS_STORAGE_BACKEND", Some("s3")),
3987 ("TRUSS_S3_BUCKET", Some("my-images")),
3988 ],
3989 || {
3990 let cfg =
3991 ServerConfig::from_env().expect("from_env should succeed with s3 + bucket");
3992 assert_eq!(cfg.storage_backend, super::StorageBackend::S3);
3993 let ctx = cfg.s3_context.expect("s3_context should be Some");
3994 assert_eq!(ctx.default_bucket, "my-images");
3995 },
3996 );
3997 let _ = std::fs::remove_dir_all(storage);
3998 }
3999
4000 #[test]
4005 #[cfg(feature = "s3")]
4006 fn health_ready_s3_returns_503_when_context_missing() {
4007 let storage = temp_dir("health-s3-no-ctx");
4008 let mut config = ServerConfig::new(storage.clone(), None);
4009 config.storage_backend = super::StorageBackend::S3;
4010 config.s3_context = None;
4011
4012 let request = super::http_parse::HttpRequest {
4013 method: "GET".to_string(),
4014 target: "/health/ready".to_string(),
4015 version: "HTTP/1.1".to_string(),
4016 headers: Vec::new(),
4017 body: Vec::new(),
4018 };
4019 let response = route_request(request, &config);
4020 let _ = std::fs::remove_dir_all(storage);
4021
4022 assert_eq!(response.status, "503 Service Unavailable");
4023 let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
4024 let checks = body["checks"].as_array().expect("checks array");
4025 assert!(
4026 checks
4027 .iter()
4028 .any(|c| c["name"] == "storageBackend" && c["status"] == "fail"),
4029 "expected s3Client fail check in {body}",
4030 );
4031 }
4032
4033 #[test]
4034 #[cfg(feature = "s3")]
4035 fn health_ready_s3_includes_s3_client_check() {
4036 let storage = temp_dir("health-s3-ok");
4037 let mut config = ServerConfig::new(storage.clone(), None);
4038 config.storage_backend = super::StorageBackend::S3;
4039 config.s3_context = Some(std::sync::Arc::new(super::s3::S3Context::for_test(
4040 "test-bucket",
4041 None,
4042 )));
4043
4044 let request = super::http_parse::HttpRequest {
4045 method: "GET".to_string(),
4046 target: "/health/ready".to_string(),
4047 version: "HTTP/1.1".to_string(),
4048 headers: Vec::new(),
4049 body: Vec::new(),
4050 };
4051 let response = route_request(request, &config);
4052 let _ = std::fs::remove_dir_all(storage);
4053
4054 let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
4057 let checks = body["checks"].as_array().expect("checks array");
4058 assert!(
4059 checks.iter().any(|c| c["name"] == "storageBackend"),
4060 "expected s3Client check in {body}",
4061 );
4062 }
4063
4064 #[cfg(feature = "s3")]
4072 fn remap_path_to_storage(path: &str, version: Option<&str>) -> TransformSourcePayload {
4073 let source = TransformSourcePayload::Path {
4074 path: path.to_string(),
4075 version: version.map(|v| v.to_string()),
4076 };
4077 match source {
4078 TransformSourcePayload::Path { path, version } => TransformSourcePayload::Storage {
4079 bucket: None,
4080 key: path.trim_start_matches('/').to_string(),
4081 version,
4082 },
4083 other => other,
4084 }
4085 }
4086
4087 #[test]
4088 #[cfg(feature = "s3")]
4089 fn public_by_path_s3_remap_trims_leading_slash() {
4090 let source = remap_path_to_storage("/photos/hero.jpg", Some("v1"));
4094 match &source {
4095 TransformSourcePayload::Storage { key, .. } => {
4096 assert_eq!(key, "photos/hero.jpg", "leading / must be trimmed");
4097 }
4098 _ => panic!("expected Storage variant after remap"),
4099 }
4100
4101 let source2 = remap_path_to_storage("photos/hero.jpg", Some("v1"));
4103 match &source2 {
4104 TransformSourcePayload::Storage { key, .. } => {
4105 assert_eq!(key, "photos/hero.jpg");
4106 }
4107 _ => panic!("expected Storage variant after remap"),
4108 }
4109
4110 let mut cfg = make_test_config();
4112 cfg.storage_backend = super::StorageBackend::S3;
4113 cfg.s3_context = Some(std::sync::Arc::new(super::s3::S3Context::for_test(
4114 "my-bucket",
4115 None,
4116 )));
4117 assert_eq!(
4118 source.versioned_source_hash(&cfg),
4119 source2.versioned_source_hash(&cfg),
4120 "leading-slash and no-leading-slash paths must hash identically after trim",
4121 );
4122 }
4123
4124 #[test]
4125 #[cfg(feature = "s3")]
4126 fn public_by_path_s3_remap_produces_storage_variant() {
4127 let source = remap_path_to_storage("/image.png", None);
4129 match source {
4130 TransformSourcePayload::Storage {
4131 bucket,
4132 key,
4133 version,
4134 } => {
4135 assert!(bucket.is_none(), "bucket must be None (use default)");
4136 assert_eq!(key, "image.png");
4137 assert!(version.is_none());
4138 }
4139 _ => panic!("expected Storage variant"),
4140 }
4141 }
4142
4143 #[test]
4148 #[cfg(feature = "gcs")]
4149 fn health_ready_gcs_returns_503_when_context_missing() {
4150 let storage = temp_dir("health-gcs-no-ctx");
4151 let mut config = ServerConfig::new(storage.clone(), None);
4152 config.storage_backend = super::StorageBackend::Gcs;
4153 config.gcs_context = None;
4154
4155 let request = super::http_parse::HttpRequest {
4156 method: "GET".to_string(),
4157 target: "/health/ready".to_string(),
4158 version: "HTTP/1.1".to_string(),
4159 headers: Vec::new(),
4160 body: Vec::new(),
4161 };
4162 let response = route_request(request, &config);
4163 let _ = std::fs::remove_dir_all(storage);
4164
4165 assert_eq!(response.status, "503 Service Unavailable");
4166 let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
4167 let checks = body["checks"].as_array().expect("checks array");
4168 assert!(
4169 checks
4170 .iter()
4171 .any(|c| c["name"] == "storageBackend" && c["status"] == "fail"),
4172 "expected gcsClient fail check in {body}",
4173 );
4174 }
4175
4176 #[test]
4177 #[cfg(feature = "gcs")]
4178 fn health_ready_gcs_includes_gcs_client_check() {
4179 let storage = temp_dir("health-gcs-ok");
4180 let mut config = ServerConfig::new(storage.clone(), None);
4181 config.storage_backend = super::StorageBackend::Gcs;
4182 config.gcs_context = Some(std::sync::Arc::new(super::gcs::GcsContext::for_test(
4183 "test-bucket",
4184 None,
4185 )));
4186
4187 let request = super::http_parse::HttpRequest {
4188 method: "GET".to_string(),
4189 target: "/health/ready".to_string(),
4190 version: "HTTP/1.1".to_string(),
4191 headers: Vec::new(),
4192 body: Vec::new(),
4193 };
4194 let response = route_request(request, &config);
4195 let _ = std::fs::remove_dir_all(storage);
4196
4197 let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
4200 let checks = body["checks"].as_array().expect("checks array");
4201 assert!(
4202 checks.iter().any(|c| c["name"] == "storageBackend"),
4203 "expected gcsClient check in {body}",
4204 );
4205 }
4206
4207 #[cfg(feature = "gcs")]
4212 fn remap_path_to_storage_gcs(path: &str, version: Option<&str>) -> TransformSourcePayload {
4213 let source = TransformSourcePayload::Path {
4214 path: path.to_string(),
4215 version: version.map(|v| v.to_string()),
4216 };
4217 match source {
4218 TransformSourcePayload::Path { path, version } => TransformSourcePayload::Storage {
4219 bucket: None,
4220 key: path.trim_start_matches('/').to_string(),
4221 version,
4222 },
4223 other => other,
4224 }
4225 }
4226
4227 #[test]
4228 #[cfg(feature = "gcs")]
4229 fn public_by_path_gcs_remap_trims_leading_slash() {
4230 let source = remap_path_to_storage_gcs("/photos/hero.jpg", Some("v1"));
4231 match &source {
4232 TransformSourcePayload::Storage { key, .. } => {
4233 assert_eq!(key, "photos/hero.jpg", "leading / must be trimmed");
4234 }
4235 _ => panic!("expected Storage variant after remap"),
4236 }
4237
4238 let source2 = remap_path_to_storage_gcs("photos/hero.jpg", Some("v1"));
4239 match &source2 {
4240 TransformSourcePayload::Storage { key, .. } => {
4241 assert_eq!(key, "photos/hero.jpg");
4242 }
4243 _ => panic!("expected Storage variant after remap"),
4244 }
4245
4246 let mut cfg = make_test_config();
4247 cfg.storage_backend = super::StorageBackend::Gcs;
4248 cfg.gcs_context = Some(std::sync::Arc::new(super::gcs::GcsContext::for_test(
4249 "my-bucket",
4250 None,
4251 )));
4252 assert_eq!(
4253 source.versioned_source_hash(&cfg),
4254 source2.versioned_source_hash(&cfg),
4255 "leading-slash and no-leading-slash paths must hash identically after trim",
4256 );
4257 }
4258
4259 #[test]
4260 #[cfg(feature = "gcs")]
4261 fn public_by_path_gcs_remap_produces_storage_variant() {
4262 let source = remap_path_to_storage_gcs("/image.png", None);
4263 match source {
4264 TransformSourcePayload::Storage {
4265 bucket,
4266 key,
4267 version,
4268 } => {
4269 assert!(bucket.is_none(), "bucket must be None (use default)");
4270 assert_eq!(key, "image.png");
4271 assert!(version.is_none());
4272 }
4273 _ => panic!("expected Storage variant"),
4274 }
4275 }
4276
4277 #[test]
4282 #[cfg(feature = "azure")]
4283 fn health_ready_azure_returns_503_when_context_missing() {
4284 let storage = temp_dir("health-azure-no-ctx");
4285 let mut config = ServerConfig::new(storage.clone(), None);
4286 config.storage_backend = super::StorageBackend::Azure;
4287 config.azure_context = None;
4288
4289 let request = super::http_parse::HttpRequest {
4290 method: "GET".to_string(),
4291 target: "/health/ready".to_string(),
4292 version: "HTTP/1.1".to_string(),
4293 headers: Vec::new(),
4294 body: Vec::new(),
4295 };
4296 let response = route_request(request, &config);
4297 let _ = std::fs::remove_dir_all(storage);
4298
4299 assert_eq!(response.status, "503 Service Unavailable");
4300 let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
4301 let checks = body["checks"].as_array().expect("checks array");
4302 assert!(
4303 checks
4304 .iter()
4305 .any(|c| c["name"] == "storageBackend" && c["status"] == "fail"),
4306 "expected azureClient fail check in {body}",
4307 );
4308 }
4309
4310 #[test]
4311 #[cfg(feature = "azure")]
4312 fn health_ready_azure_includes_azure_client_check() {
4313 let storage = temp_dir("health-azure-ok");
4314 let mut config = ServerConfig::new(storage.clone(), None);
4315 config.storage_backend = super::StorageBackend::Azure;
4316 config.azure_context = Some(std::sync::Arc::new(super::azure::AzureContext::for_test(
4317 "test-bucket",
4318 "http://localhost:10000/devstoreaccount1",
4319 )));
4320
4321 let request = super::http_parse::HttpRequest {
4322 method: "GET".to_string(),
4323 target: "/health/ready".to_string(),
4324 version: "HTTP/1.1".to_string(),
4325 headers: Vec::new(),
4326 body: Vec::new(),
4327 };
4328 let response = route_request(request, &config);
4329 let _ = std::fs::remove_dir_all(storage);
4330
4331 let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
4332 let checks = body["checks"].as_array().expect("checks array");
4333 assert!(
4334 checks.iter().any(|c| c["name"] == "storageBackend"),
4335 "expected azureClient check in {body}",
4336 );
4337 }
4338
4339 #[test]
4340 fn read_request_rejects_json_body_over_1mib() {
4341 let body = vec![b'x'; super::http_parse::MAX_REQUEST_BODY_BYTES + 1];
4342 let content_length = body.len();
4343 let raw = format!(
4344 "POST /images:transform HTTP/1.1\r\n\
4345 Content-Type: application/json\r\n\
4346 Content-Length: {content_length}\r\n\r\n"
4347 );
4348 let mut data = raw.into_bytes();
4349 data.extend_from_slice(&body);
4350 let result = read_request(&mut data.as_slice());
4351 assert!(result.is_err());
4352 }
4353
4354 #[test]
4355 fn read_request_accepts_multipart_body_over_1mib() {
4356 let payload_size = super::http_parse::MAX_REQUEST_BODY_BYTES + 100;
4357 let body_content = vec![b'A'; payload_size];
4358 let boundary = "test-boundary-123";
4359 let mut body = Vec::new();
4360 body.extend_from_slice(format!("--{boundary}\r\nContent-Disposition: form-data; name=\"file\"; filename=\"big.jpg\"\r\n\r\n").as_bytes());
4361 body.extend_from_slice(&body_content);
4362 body.extend_from_slice(format!("\r\n--{boundary}--\r\n").as_bytes());
4363 let content_length = body.len();
4364 let raw = format!(
4365 "POST /images HTTP/1.1\r\n\
4366 Content-Type: multipart/form-data; boundary={boundary}\r\n\
4367 Content-Length: {content_length}\r\n\r\n"
4368 );
4369 let mut data = raw.into_bytes();
4370 data.extend_from_slice(&body);
4371 let result = read_request(&mut data.as_slice());
4372 assert!(
4373 result.is_ok(),
4374 "multipart upload over 1 MiB should be accepted"
4375 );
4376 }
4377
4378 #[test]
4379 fn multipart_boundary_in_payload_does_not_split_part() {
4380 let boundary = "abc123";
4381 let fake_boundary_in_payload = format!("\r\n--{boundary}NOTREAL");
4382 let part_body = format!("before{fake_boundary_in_payload}after");
4383 let body = format!(
4384 "--{boundary}\r\n\
4385 Content-Disposition: form-data; name=\"file\"\r\n\
4386 Content-Type: application/octet-stream\r\n\r\n\
4387 {part_body}\r\n\
4388 --{boundary}--\r\n"
4389 );
4390
4391 let parts = parse_multipart_form_data(body.as_bytes(), boundary)
4392 .expect("should parse despite boundary-like string in payload");
4393 assert_eq!(parts.len(), 1, "should have exactly one part");
4394
4395 let part_data = &body.as_bytes()[parts[0].body_range.clone()];
4396 let part_text = std::str::from_utf8(part_data).unwrap();
4397 assert!(
4398 part_text.contains("NOTREAL"),
4399 "part body should contain the full fake boundary string"
4400 );
4401 }
4402
4403 #[test]
4404 fn multipart_normal_two_parts_still_works() {
4405 let boundary = "testboundary";
4406 let body = format!(
4407 "--{boundary}\r\n\
4408 Content-Disposition: form-data; name=\"field1\"\r\n\r\n\
4409 value1\r\n\
4410 --{boundary}\r\n\
4411 Content-Disposition: form-data; name=\"field2\"\r\n\r\n\
4412 value2\r\n\
4413 --{boundary}--\r\n"
4414 );
4415
4416 let parts = parse_multipart_form_data(body.as_bytes(), boundary)
4417 .expect("should parse two normal parts");
4418 assert_eq!(parts.len(), 2);
4419 assert_eq!(parts[0].name, "field1");
4420 assert_eq!(parts[1].name, "field2");
4421 }
4422
4423 #[test]
4424 #[serial]
4425 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4426 fn test_storage_timeout_default() {
4427 unsafe {
4428 std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4429 }
4430 let config = ServerConfig::from_env().unwrap();
4431 assert_eq!(config.storage_timeout_secs, 30);
4432 }
4433
4434 #[test]
4435 #[serial]
4436 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4437 fn test_storage_timeout_custom() {
4438 unsafe {
4439 std::env::set_var("TRUSS_STORAGE_TIMEOUT_SECS", "60");
4440 }
4441 let config = ServerConfig::from_env().unwrap();
4442 assert_eq!(config.storage_timeout_secs, 60);
4443 unsafe {
4444 std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4445 }
4446 }
4447
4448 #[test]
4449 #[serial]
4450 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4451 fn test_storage_timeout_min_boundary() {
4452 unsafe {
4453 std::env::set_var("TRUSS_STORAGE_TIMEOUT_SECS", "1");
4454 }
4455 let config = ServerConfig::from_env().unwrap();
4456 assert_eq!(config.storage_timeout_secs, 1);
4457 unsafe {
4458 std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4459 }
4460 }
4461
4462 #[test]
4463 #[serial]
4464 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4465 fn test_storage_timeout_max_boundary() {
4466 unsafe {
4467 std::env::set_var("TRUSS_STORAGE_TIMEOUT_SECS", "300");
4468 }
4469 let config = ServerConfig::from_env().unwrap();
4470 assert_eq!(config.storage_timeout_secs, 300);
4471 unsafe {
4472 std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4473 }
4474 }
4475
4476 #[test]
4477 #[serial]
4478 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4479 fn test_storage_timeout_empty_string_uses_default() {
4480 unsafe {
4481 std::env::set_var("TRUSS_STORAGE_TIMEOUT_SECS", "");
4482 }
4483 let config = ServerConfig::from_env().unwrap();
4484 assert_eq!(config.storage_timeout_secs, 30);
4485 unsafe {
4486 std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4487 }
4488 }
4489
4490 #[test]
4491 #[serial]
4492 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4493 fn test_storage_timeout_zero_rejected() {
4494 unsafe {
4495 std::env::set_var("TRUSS_STORAGE_TIMEOUT_SECS", "0");
4496 }
4497 let err = ServerConfig::from_env().unwrap_err();
4498 assert!(
4499 err.to_string().contains("between 1 and 300"),
4500 "error should mention valid range: {err}"
4501 );
4502 unsafe {
4503 std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4504 }
4505 }
4506
4507 #[test]
4508 #[serial]
4509 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4510 fn test_storage_timeout_over_max_rejected() {
4511 unsafe {
4512 std::env::set_var("TRUSS_STORAGE_TIMEOUT_SECS", "301");
4513 }
4514 let err = ServerConfig::from_env().unwrap_err();
4515 assert!(
4516 err.to_string().contains("between 1 and 300"),
4517 "error should mention valid range: {err}"
4518 );
4519 unsafe {
4520 std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4521 }
4522 }
4523
4524 #[test]
4525 #[serial]
4526 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4527 fn test_storage_timeout_non_numeric_rejected() {
4528 unsafe {
4529 std::env::set_var("TRUSS_STORAGE_TIMEOUT_SECS", "abc");
4530 }
4531 let err = ServerConfig::from_env().unwrap_err();
4532 assert!(
4533 err.to_string().contains("positive integer"),
4534 "error should mention positive integer: {err}"
4535 );
4536 unsafe {
4537 std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4538 }
4539 }
4540
4541 #[test]
4542 #[serial]
4543 fn test_max_concurrent_transforms_default() {
4544 unsafe {
4545 std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
4546 }
4547 let config = ServerConfig::from_env().unwrap();
4548 assert_eq!(config.max_concurrent_transforms, 64);
4549 }
4550
4551 #[test]
4552 #[serial]
4553 fn test_max_concurrent_transforms_custom() {
4554 unsafe {
4555 std::env::set_var("TRUSS_MAX_CONCURRENT_TRANSFORMS", "128");
4556 }
4557 let config = ServerConfig::from_env().unwrap();
4558 assert_eq!(config.max_concurrent_transforms, 128);
4559 unsafe {
4560 std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
4561 }
4562 }
4563
4564 #[test]
4565 #[serial]
4566 fn test_max_concurrent_transforms_min_boundary() {
4567 unsafe {
4568 std::env::set_var("TRUSS_MAX_CONCURRENT_TRANSFORMS", "1");
4569 }
4570 let config = ServerConfig::from_env().unwrap();
4571 assert_eq!(config.max_concurrent_transforms, 1);
4572 unsafe {
4573 std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
4574 }
4575 }
4576
4577 #[test]
4578 #[serial]
4579 fn test_max_concurrent_transforms_max_boundary() {
4580 unsafe {
4581 std::env::set_var("TRUSS_MAX_CONCURRENT_TRANSFORMS", "1024");
4582 }
4583 let config = ServerConfig::from_env().unwrap();
4584 assert_eq!(config.max_concurrent_transforms, 1024);
4585 unsafe {
4586 std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
4587 }
4588 }
4589
4590 #[test]
4591 #[serial]
4592 fn test_max_concurrent_transforms_empty_uses_default() {
4593 unsafe {
4594 std::env::set_var("TRUSS_MAX_CONCURRENT_TRANSFORMS", "");
4595 }
4596 let config = ServerConfig::from_env().unwrap();
4597 assert_eq!(config.max_concurrent_transforms, 64);
4598 unsafe {
4599 std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
4600 }
4601 }
4602
4603 #[test]
4604 #[serial]
4605 fn test_max_concurrent_transforms_zero_rejected() {
4606 unsafe {
4607 std::env::set_var("TRUSS_MAX_CONCURRENT_TRANSFORMS", "0");
4608 }
4609 let err = ServerConfig::from_env().unwrap_err();
4610 assert!(
4611 err.to_string().contains("between 1 and 1024"),
4612 "error should mention valid range: {err}"
4613 );
4614 unsafe {
4615 std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
4616 }
4617 }
4618
4619 #[test]
4620 #[serial]
4621 fn test_max_concurrent_transforms_over_max_rejected() {
4622 unsafe {
4623 std::env::set_var("TRUSS_MAX_CONCURRENT_TRANSFORMS", "1025");
4624 }
4625 let err = ServerConfig::from_env().unwrap_err();
4626 assert!(
4627 err.to_string().contains("between 1 and 1024"),
4628 "error should mention valid range: {err}"
4629 );
4630 unsafe {
4631 std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
4632 }
4633 }
4634
4635 #[test]
4636 #[serial]
4637 fn test_max_concurrent_transforms_non_numeric_rejected() {
4638 unsafe {
4639 std::env::set_var("TRUSS_MAX_CONCURRENT_TRANSFORMS", "abc");
4640 }
4641 let err = ServerConfig::from_env().unwrap_err();
4642 assert!(
4643 err.to_string().contains("positive integer"),
4644 "error should mention positive integer: {err}"
4645 );
4646 unsafe {
4647 std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
4648 }
4649 }
4650
4651 #[test]
4652 #[serial]
4653 fn test_transform_deadline_default() {
4654 unsafe {
4655 std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
4656 }
4657 let config = ServerConfig::from_env().unwrap();
4658 assert_eq!(config.transform_deadline_secs, 30);
4659 }
4660
4661 #[test]
4662 #[serial]
4663 fn test_transform_deadline_custom() {
4664 unsafe {
4665 std::env::set_var("TRUSS_TRANSFORM_DEADLINE_SECS", "60");
4666 }
4667 let config = ServerConfig::from_env().unwrap();
4668 assert_eq!(config.transform_deadline_secs, 60);
4669 unsafe {
4670 std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
4671 }
4672 }
4673
4674 #[test]
4675 #[serial]
4676 fn test_transform_deadline_min_boundary() {
4677 unsafe {
4678 std::env::set_var("TRUSS_TRANSFORM_DEADLINE_SECS", "1");
4679 }
4680 let config = ServerConfig::from_env().unwrap();
4681 assert_eq!(config.transform_deadline_secs, 1);
4682 unsafe {
4683 std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
4684 }
4685 }
4686
4687 #[test]
4688 #[serial]
4689 fn test_transform_deadline_max_boundary() {
4690 unsafe {
4691 std::env::set_var("TRUSS_TRANSFORM_DEADLINE_SECS", "300");
4692 }
4693 let config = ServerConfig::from_env().unwrap();
4694 assert_eq!(config.transform_deadline_secs, 300);
4695 unsafe {
4696 std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
4697 }
4698 }
4699
4700 #[test]
4701 #[serial]
4702 fn test_transform_deadline_empty_uses_default() {
4703 unsafe {
4704 std::env::set_var("TRUSS_TRANSFORM_DEADLINE_SECS", "");
4705 }
4706 let config = ServerConfig::from_env().unwrap();
4707 assert_eq!(config.transform_deadline_secs, 30);
4708 unsafe {
4709 std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
4710 }
4711 }
4712
4713 #[test]
4714 #[serial]
4715 fn test_transform_deadline_zero_rejected() {
4716 unsafe {
4717 std::env::set_var("TRUSS_TRANSFORM_DEADLINE_SECS", "0");
4718 }
4719 let err = ServerConfig::from_env().unwrap_err();
4720 assert!(
4721 err.to_string().contains("between 1 and 300"),
4722 "error should mention valid range: {err}"
4723 );
4724 unsafe {
4725 std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
4726 }
4727 }
4728
4729 #[test]
4730 #[serial]
4731 fn test_transform_deadline_over_max_rejected() {
4732 unsafe {
4733 std::env::set_var("TRUSS_TRANSFORM_DEADLINE_SECS", "301");
4734 }
4735 let err = ServerConfig::from_env().unwrap_err();
4736 assert!(
4737 err.to_string().contains("between 1 and 300"),
4738 "error should mention valid range: {err}"
4739 );
4740 unsafe {
4741 std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
4742 }
4743 }
4744
4745 #[test]
4746 #[serial]
4747 fn test_transform_deadline_non_numeric_rejected() {
4748 unsafe {
4749 std::env::set_var("TRUSS_TRANSFORM_DEADLINE_SECS", "abc");
4750 }
4751 let err = ServerConfig::from_env().unwrap_err();
4752 assert!(
4753 err.to_string().contains("positive integer"),
4754 "error should mention positive integer: {err}"
4755 );
4756 unsafe {
4757 std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
4758 }
4759 }
4760
4761 #[test]
4762 #[serial]
4763 #[cfg(feature = "azure")]
4764 fn test_azure_container_env_var_required() {
4765 unsafe {
4766 std::env::set_var("TRUSS_STORAGE_BACKEND", "azure");
4767 std::env::remove_var("TRUSS_AZURE_CONTAINER");
4768 }
4769 let err = ServerConfig::from_env().unwrap_err();
4770 assert!(
4771 err.to_string().contains("TRUSS_AZURE_CONTAINER"),
4772 "error should mention TRUSS_AZURE_CONTAINER: {err}"
4773 );
4774 unsafe {
4775 std::env::remove_var("TRUSS_STORAGE_BACKEND");
4776 }
4777 }
4778
4779 #[test]
4780 fn server_config_debug_redacts_bearer_token_and_signed_url_secret() {
4781 let mut config = ServerConfig::new(
4782 temp_dir("debug-redact"),
4783 Some("super-secret-token-12345".to_string()),
4784 );
4785 config.signed_url_key_id = Some("visible-key-id".to_string());
4786 config.signed_url_secret = Some("super-secret-hmac-key".to_string());
4787 let debug = format!("{config:?}");
4788 assert!(
4789 !debug.contains("super-secret-token-12345"),
4790 "bearer_token leaked in Debug output: {debug}"
4791 );
4792 assert!(
4793 !debug.contains("super-secret-hmac-key"),
4794 "signed_url_secret leaked in Debug output: {debug}"
4795 );
4796 assert!(
4797 debug.contains("[REDACTED]"),
4798 "expected [REDACTED] in Debug output: {debug}"
4799 );
4800 assert!(
4801 debug.contains("visible-key-id"),
4802 "signed_url_key_id should be visible: {debug}"
4803 );
4804 }
4805
4806 #[test]
4807 fn authorize_headers_accepts_correct_bearer_token() {
4808 let config = ServerConfig::new(temp_dir("auth-ok"), Some("correct-token".to_string()));
4809 let headers = vec![(
4810 "authorization".to_string(),
4811 "Bearer correct-token".to_string(),
4812 )];
4813 assert!(super::authorize_request_headers(&headers, &config).is_ok());
4814 }
4815
4816 #[test]
4817 fn authorize_headers_rejects_wrong_bearer_token() {
4818 let config = ServerConfig::new(temp_dir("auth-wrong"), Some("correct-token".to_string()));
4819 let headers = vec![(
4820 "authorization".to_string(),
4821 "Bearer wrong-token".to_string(),
4822 )];
4823 let err = super::authorize_request_headers(&headers, &config).unwrap_err();
4824 assert_eq!(err.status, "401 Unauthorized");
4825 }
4826
4827 #[test]
4828 fn authorize_headers_rejects_missing_header() {
4829 let config = ServerConfig::new(temp_dir("auth-missing"), Some("correct-token".to_string()));
4830 let headers: Vec<(String, String)> = vec![];
4831 let err = super::authorize_request_headers(&headers, &config).unwrap_err();
4832 assert_eq!(err.status, "401 Unauthorized");
4833 }
4834
4835 #[test]
4838 fn transform_slot_acquire_succeeds_under_limit() {
4839 use std::sync::Arc;
4840 use std::sync::atomic::AtomicU64;
4841
4842 let counter = Arc::new(AtomicU64::new(0));
4843 let slot = super::TransformSlot::try_acquire(&counter, 2);
4844 assert!(slot.is_some());
4845 assert_eq!(counter.load(Ordering::Relaxed), 1);
4846 }
4847
4848 #[test]
4849 fn transform_slot_acquire_returns_none_at_limit() {
4850 use std::sync::Arc;
4851 use std::sync::atomic::AtomicU64;
4852
4853 let counter = Arc::new(AtomicU64::new(0));
4854 let _s1 = super::TransformSlot::try_acquire(&counter, 1).unwrap();
4855 let s2 = super::TransformSlot::try_acquire(&counter, 1);
4856 assert!(s2.is_none());
4857 assert_eq!(counter.load(Ordering::Relaxed), 1);
4859 }
4860
4861 #[test]
4862 fn transform_slot_drop_decrements_counter() {
4863 use std::sync::Arc;
4864 use std::sync::atomic::AtomicU64;
4865
4866 let counter = Arc::new(AtomicU64::new(0));
4867 {
4868 let _slot = super::TransformSlot::try_acquire(&counter, 4).unwrap();
4869 assert_eq!(counter.load(Ordering::Relaxed), 1);
4870 }
4871 assert_eq!(counter.load(Ordering::Relaxed), 0);
4873 }
4874
4875 #[test]
4876 fn transform_slot_multiple_acquires_up_to_limit() {
4877 use std::sync::Arc;
4878 use std::sync::atomic::AtomicU64;
4879
4880 let counter = Arc::new(AtomicU64::new(0));
4881 let limit = 3u64;
4882 let mut slots = Vec::new();
4883 for _ in 0..limit {
4884 slots.push(super::TransformSlot::try_acquire(&counter, limit).unwrap());
4885 }
4886 assert_eq!(counter.load(Ordering::Relaxed), limit);
4887 assert!(super::TransformSlot::try_acquire(&counter, limit).is_none());
4889 assert_eq!(counter.load(Ordering::Relaxed), limit);
4890 slots.clear();
4892 assert_eq!(counter.load(Ordering::Relaxed), 0);
4893 }
4894
4895 #[test]
4898 fn emit_access_log_produces_json_with_expected_fields() {
4899 use std::sync::{Arc, Mutex};
4900 use std::time::Instant;
4901
4902 let captured = Arc::new(Mutex::new(String::new()));
4903 let captured_clone = Arc::clone(&captured);
4904 let handler: super::LogHandler =
4905 Arc::new(move |msg: &str| *captured_clone.lock().unwrap() = msg.to_owned());
4906
4907 let mut config = ServerConfig::new(temp_dir("access-log"), None);
4908 config.log_handler = Some(handler);
4909
4910 let start = Instant::now();
4911 super::emit_access_log(
4912 &config,
4913 &super::AccessLogEntry {
4914 request_id: "req-123",
4915 method: "GET",
4916 path: "/image.png",
4917 route: "transform",
4918 status: "200",
4919 start,
4920 cache_status: Some("hit"),
4921 },
4922 );
4923
4924 let output = captured.lock().unwrap().clone();
4925 let parsed: serde_json::Value = serde_json::from_str(&output).expect("valid JSON");
4926 assert_eq!(parsed["kind"], "access_log");
4927 assert_eq!(parsed["request_id"], "req-123");
4928 assert_eq!(parsed["method"], "GET");
4929 assert_eq!(parsed["path"], "/image.png");
4930 assert_eq!(parsed["route"], "transform");
4931 assert_eq!(parsed["status"], "200");
4932 assert_eq!(parsed["cache_status"], "hit");
4933 assert!(parsed["latency_ms"].is_u64());
4934 }
4935
4936 #[test]
4937 fn emit_access_log_null_cache_status_when_none() {
4938 use std::sync::{Arc, Mutex};
4939 use std::time::Instant;
4940
4941 let captured = Arc::new(Mutex::new(String::new()));
4942 let captured_clone = Arc::clone(&captured);
4943 let handler: super::LogHandler =
4944 Arc::new(move |msg: &str| *captured_clone.lock().unwrap() = msg.to_owned());
4945
4946 let mut config = ServerConfig::new(temp_dir("access-log-none"), None);
4947 config.log_handler = Some(handler);
4948
4949 super::emit_access_log(
4950 &config,
4951 &super::AccessLogEntry {
4952 request_id: "req-456",
4953 method: "POST",
4954 path: "/upload",
4955 route: "upload",
4956 status: "201",
4957 start: Instant::now(),
4958 cache_status: None,
4959 },
4960 );
4961
4962 let output = captured.lock().unwrap().clone();
4963 let parsed: serde_json::Value = serde_json::from_str(&output).expect("valid JSON");
4964 assert!(parsed["cache_status"].is_null());
4965 }
4966
4967 #[test]
4970 fn x_request_id_is_extracted_from_incoming_headers() {
4971 let headers = vec![
4972 ("host".to_string(), "localhost".to_string()),
4973 ("x-request-id".to_string(), "custom-id-abc".to_string()),
4974 ];
4975 assert_eq!(
4976 super::extract_request_id(&headers),
4977 Some("custom-id-abc".to_string())
4978 );
4979 }
4980
4981 #[test]
4982 fn x_request_id_not_extracted_when_empty() {
4983 let headers = vec![("x-request-id".to_string(), "".to_string())];
4984 assert!(super::extract_request_id(&headers).is_none());
4985 }
4986
4987 #[test]
4988 fn x_request_id_not_extracted_when_absent() {
4989 let headers = vec![("host".to_string(), "localhost".to_string())];
4990 assert!(super::extract_request_id(&headers).is_none());
4991 }
4992
4993 #[test]
4996 fn cache_status_hit_detected() {
4997 let headers: Vec<(&str, String)> = vec![("Cache-Status", "\"truss\"; hit".to_string())];
4998 assert_eq!(super::extract_cache_status(&headers), Some("hit"));
4999 }
5000
5001 #[test]
5002 fn cache_status_miss_detected() {
5003 let headers: Vec<(&str, String)> =
5004 vec![("Cache-Status", "\"truss\"; fwd=miss".to_string())];
5005 assert_eq!(super::extract_cache_status(&headers), Some("miss"));
5006 }
5007
5008 #[test]
5009 fn cache_status_none_when_header_absent() {
5010 let headers: Vec<(&str, String)> = vec![("Content-Type", "image/png".to_string())];
5011 assert!(super::extract_cache_status(&headers).is_none());
5012 }
5013}