1mod auth;
26#[cfg(feature = "azure")]
27pub mod azure;
28mod cache;
29mod config;
30#[cfg(feature = "gcs")]
31pub mod gcs;
32mod http_parse;
33mod metrics;
34mod multipart;
35mod negotiate;
36mod rate_limit;
37mod remote;
38mod response;
39#[cfg(feature = "s3")]
40pub mod s3;
41
42#[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
43pub use config::StorageBackend;
44use config::StorageBackendLabel;
45pub use config::{DEFAULT_BIND_ADDR, DEFAULT_STORAGE_ROOT, LogHandler, LogLevel, ServerConfig};
46
47use auth::{
48 authorize_request, authorize_request_headers, authorize_signed_request,
49 canonical_query_without_signature, extend_transform_query, parse_optional_bool_query,
50 parse_optional_float_query, parse_optional_integer_query, parse_optional_u8_query,
51 parse_query_params, required_query_param, signed_source_query, url_authority,
52 validate_public_query_names,
53};
54use cache::{
55 CacheLookup, TransformCache, compute_cache_key, compute_watermark_identity,
56 try_versioned_cache_lookup,
57};
58use http_parse::{
59 HttpRequest, parse_named, parse_optional_named, read_request_body, read_request_headers,
60 request_has_json_content_type,
61};
62use metrics::{
63 CACHE_HITS_TOTAL, CACHE_MISSES_TOTAL, RouteMetric, record_http_metrics,
64 record_http_request_duration, record_storage_duration, record_transform_duration,
65 record_transform_error, record_watermark_transform, render_metrics_text, status_code,
66 storage_backend_index_from_config, uptime_seconds,
67};
68use multipart::{parse_multipart_boundary, parse_upload_request};
69use negotiate::{
70 CacheHitStatus, ImageResponsePolicy, PublicSourceKind, build_image_etag,
71 build_image_response_headers, if_none_match_matches, negotiate_output_format,
72};
73use remote::{read_remote_watermark_bytes, resolve_source_bytes};
74use response::{
75 HttpResponse, NOT_FOUND_BODY, bad_request_response, service_unavailable_response,
76 too_many_requests_response, transform_error_response, unsupported_media_type_response,
77 write_response, write_response_compressed,
78};
79
80use crate::{
81 CropRegion, Fit, MediaType, Position, RawArtifact, Rgba8, Rotation, TransformOptions,
82 TransformRequest, WatermarkInput, sniff_artifact, transform_raster, transform_svg,
83};
84use hmac::{Hmac, Mac};
85use serde::Deserialize;
86use serde_json::json;
87use sha2::{Digest, Sha256};
88use std::collections::BTreeMap;
89use std::env;
90use std::io;
91use std::net::{TcpListener, TcpStream};
92use std::str::FromStr;
93use std::sync::Arc;
94use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU8, AtomicU64, Ordering};
95use std::time::{Duration, Instant};
96use subtle::ConstantTimeEq;
97use url::Url;
98use uuid::Uuid;
99
100pub(crate) fn stderr_write(msg: &str) {
105 use std::io::Write;
106
107 let bytes = msg.as_bytes();
108 let mut buf = Vec::with_capacity(bytes.len() + 1);
109 buf.extend_from_slice(bytes);
110 buf.push(b'\n');
111
112 #[cfg(unix)]
113 {
114 use std::os::fd::FromRawFd;
115 let mut f = unsafe { std::fs::File::from_raw_fd(2) };
117 let _ = f.write_all(&buf);
118 std::mem::forget(f);
120 }
121
122 #[cfg(windows)]
123 {
124 use std::os::windows::io::FromRawHandle;
125
126 unsafe extern "system" {
127 fn GetStdHandle(nStdHandle: u32) -> *mut std::ffi::c_void;
128 }
129
130 const STD_ERROR_HANDLE: u32 = (-12_i32) as u32;
131 let handle = unsafe { GetStdHandle(STD_ERROR_HANDLE) };
134 let mut f = unsafe { std::fs::File::from_raw_handle(handle) };
135 let _ = f.write_all(&buf);
136 std::mem::forget(f);
138 }
139}
140
141const SOCKET_READ_TIMEOUT: Duration = Duration::from_secs(60);
142const SOCKET_WRITE_TIMEOUT: Duration = Duration::from_secs(60);
143const WORKER_THREADS: usize = 8;
145type HmacSha256 = Hmac<Sha256>;
146
147#[derive(Clone, Copy)]
148struct PublicCacheControl {
149 max_age: u32,
150 stale_while_revalidate: u32,
151}
152
153#[derive(Clone, Copy)]
154struct ImageResponseConfig {
155 disable_accept_negotiation: bool,
156 public_cache_control: PublicCacheControl,
157 transform_deadline: Duration,
158}
159
160struct TransformSlot {
166 counter: Arc<AtomicU64>,
167}
168
169impl TransformSlot {
170 fn try_acquire(counter: &Arc<AtomicU64>, limit: u64) -> Option<Self> {
171 let prev = counter.fetch_add(1, Ordering::Relaxed);
172 if prev >= limit {
173 counter.fetch_sub(1, Ordering::Relaxed);
174 None
175 } else {
176 Some(Self {
177 counter: Arc::clone(counter),
178 })
179 }
180 }
181}
182
183impl Drop for TransformSlot {
184 fn drop(&mut self) {
185 self.counter.fetch_sub(1, Ordering::Relaxed);
186 }
187}
188
189#[derive(Debug, Clone, PartialEq, Eq)]
191pub enum SignedUrlSource {
192 Path {
194 path: String,
196 version: Option<String>,
198 },
199 Url {
201 url: String,
203 version: Option<String>,
205 },
206}
207
208#[derive(Debug, Default)]
253pub struct SignedWatermarkParams {
254 pub url: String,
255 pub position: Option<String>,
256 pub opacity: Option<u8>,
257 pub margin: Option<u32>,
258}
259
260#[allow(clippy::too_many_arguments)]
261pub fn sign_public_url(
262 base_url: &str,
263 source: SignedUrlSource,
264 options: &TransformOptions,
265 key_id: &str,
266 secret: &str,
267 expires: u64,
268 watermark: Option<&SignedWatermarkParams>,
269 preset: Option<&str>,
270) -> Result<String, String> {
271 let base_url = Url::parse(base_url).map_err(|error| format!("base URL is invalid: {error}"))?;
272 match base_url.scheme() {
273 "http" | "https" => {}
274 _ => return Err("base URL must use the http or https scheme".to_string()),
275 }
276
277 let route_path = match source {
278 SignedUrlSource::Path { .. } => "/images/by-path",
279 SignedUrlSource::Url { .. } => "/images/by-url",
280 };
281 let mut endpoint = base_url
282 .join(route_path)
283 .map_err(|error| format!("failed to resolve the public endpoint URL: {error}"))?;
284 let authority = url_authority(&endpoint)?;
285 let mut query = signed_source_query(source);
286 if let Some(name) = preset {
287 query.insert("preset".to_string(), name.to_string());
288 }
289 extend_transform_query(&mut query, options);
290 if let Some(wm) = watermark {
291 query.insert("watermarkUrl".to_string(), wm.url.clone());
292 if let Some(ref pos) = wm.position {
293 query.insert("watermarkPosition".to_string(), pos.clone());
294 }
295 if let Some(opacity) = wm.opacity {
296 query.insert("watermarkOpacity".to_string(), opacity.to_string());
297 }
298 if let Some(margin) = wm.margin {
299 query.insert("watermarkMargin".to_string(), margin.to_string());
300 }
301 }
302 query.insert("keyId".to_string(), key_id.to_string());
303 query.insert("expires".to_string(), expires.to_string());
304
305 let canonical = format!(
306 "GET\n{}\n{}\n{}",
307 authority,
308 endpoint.path(),
309 canonical_query_without_signature(&query)
310 );
311 let mut mac = HmacSha256::new_from_slice(secret.as_bytes())
312 .map_err(|error| format!("failed to initialize signed URL HMAC: {error}"))?;
313 mac.update(canonical.as_bytes());
314 query.insert(
315 "signature".to_string(),
316 hex::encode(mac.finalize().into_bytes()),
317 );
318
319 let mut serializer = url::form_urlencoded::Serializer::new(String::new());
320 for (name, value) in query {
321 serializer.append_pair(&name, &value);
322 }
323 endpoint.set_query(Some(&serializer.finish()));
324 Ok(endpoint.into())
325}
326
327pub fn bind_addr() -> String {
332 env::var("TRUSS_BIND_ADDR").unwrap_or_else(|_| DEFAULT_BIND_ADDR.to_string())
333}
334
335pub fn serve(listener: TcpListener) -> io::Result<()> {
347 let config = ServerConfig::from_env()?;
348
349 for (ok, name) in storage_health_check(&config) {
352 if !ok {
353 return Err(io::Error::new(
354 io::ErrorKind::ConnectionRefused,
355 format!(
356 "storage connectivity check failed for `{name}` — verify the backend \
357 endpoint, credentials, and container/bucket configuration"
358 ),
359 ));
360 }
361 }
362
363 serve_with_config(listener, config)
364}
365
366pub fn serve_with_config(listener: TcpListener, config: ServerConfig) -> io::Result<()> {
376 let config = Arc::new(config);
377 let (sender, receiver) = std::sync::mpsc::channel::<TcpStream>();
378
379 let receiver = Arc::new(std::sync::Mutex::new(receiver));
385 let pool_size = (config.max_concurrent_transforms as usize).max(WORKER_THREADS);
386 let mut workers = Vec::with_capacity(pool_size);
387 for _ in 0..pool_size {
388 let rx = Arc::clone(&receiver);
389 let cfg = Arc::clone(&config);
390 workers.push(std::thread::spawn(move || {
391 loop {
392 let stream = {
393 let guard = rx.lock().expect("worker lock poisoned");
394 match guard.recv() {
395 Ok(stream) => stream,
396 Err(_) => break,
397 }
398 }; if let Err(err) = handle_stream(stream, &cfg) {
400 cfg.log_warn(&format!("failed to handle connection: {err}"));
401 }
402 }
403 }));
404 }
405
406 let (shutdown_read_fd, shutdown_write_fd) = create_shutdown_pipe()?;
410 install_signal_handler(
411 Arc::clone(&config.draining),
412 shutdown_write_fd,
413 Arc::clone(&config.log_level),
414 );
415
416 if let Some(ref path) = config.presets_file_path {
418 let presets = Arc::clone(&config.presets);
419 let draining = Arc::clone(&config.draining);
420 let cfg = Arc::clone(&config);
421 let path = path.clone();
422 std::thread::Builder::new()
423 .name("preset-watcher".into())
424 .spawn(move || preset_watcher(presets, path, draining, cfg))
425 .expect("failed to spawn preset watcher thread");
426 }
427
428 listener.set_nonblocking(true)?;
431
432 loop {
433 wait_for_accept_or_shutdown(&listener, shutdown_read_fd, &config.draining);
437
438 if poll_shutdown_pipe(shutdown_read_fd) {
440 break;
441 }
442
443 if config.draining.load(Ordering::SeqCst) {
446 break;
447 }
448
449 match listener.accept() {
450 Ok((stream, _addr)) => {
451 let _ = stream.set_nonblocking(false);
453 if sender.send(stream).is_err() {
454 break;
455 }
456 }
457 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
458 }
460 Err(err) => return Err(err),
461 }
462 }
463
464 let drain_secs = config.shutdown_drain_secs;
466 config.log(&format!(
467 "shutdown: drain started, waiting {drain_secs}s for load balancers"
468 ));
469 if drain_secs > 0 {
470 std::thread::sleep(Duration::from_secs(drain_secs));
471 }
472 config.log("shutdown: drain complete, closing listener");
473
474 drop(sender);
476 let deadline = std::time::Instant::now() + Duration::from_secs(15);
479 for worker in workers {
480 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
481 if remaining.is_zero() {
482 stderr_write("shutdown: timed out waiting for worker threads");
483 break;
484 }
485 let worker_done =
489 std::sync::Arc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
490 let wd = std::sync::Arc::clone(&worker_done);
491 std::thread::spawn(move || {
492 let _ = worker.join();
493 let (lock, cvar) = &*wd;
494 *lock.lock().expect("shutdown notify lock") = true;
495 cvar.notify_one();
496 });
497 let (lock, cvar) = &*worker_done;
498 let mut done = lock.lock().expect("shutdown wait lock");
499 while !*done {
500 let (guard, timeout) = cvar
501 .wait_timeout(done, remaining)
502 .expect("shutdown condvar wait");
503 done = guard;
504 if timeout.timed_out() {
505 stderr_write("shutdown: timed out waiting for a worker thread");
506 break;
507 }
508 }
509 }
510
511 config.log("shutdown: complete");
512 close_shutdown_pipe(shutdown_read_fd, shutdown_write_fd);
513 Ok(())
514}
515
516#[cfg(unix)]
522fn create_shutdown_pipe() -> io::Result<(i32, i32)> {
523 let mut fds = [0i32; 2];
524 if unsafe { libc::pipe(fds.as_mut_ptr()) } != 0 {
525 return Err(io::Error::last_os_error());
526 }
527 unsafe {
530 libc::fcntl(fds[0], libc::F_SETFL, libc::O_NONBLOCK);
531 libc::fcntl(fds[1], libc::F_SETFL, libc::O_NONBLOCK);
532 }
533 Ok((fds[0], fds[1]))
534}
535
536#[cfg(windows)]
537fn create_shutdown_pipe() -> io::Result<(i32, i32)> {
538 Ok((-1, -1))
540}
541
542#[cfg(unix)]
543fn poll_shutdown_pipe(read_fd: i32) -> bool {
544 let mut buf = [0u8; 1];
545 let n = unsafe { libc::read(read_fd, buf.as_mut_ptr().cast(), 1) };
546 n > 0
547}
548
549#[cfg(windows)]
550fn poll_shutdown_pipe(_read_fd: i32) -> bool {
551 false
552}
553
554#[cfg(unix)]
558fn wait_for_accept_or_shutdown(
559 listener: &std::net::TcpListener,
560 shutdown_read_fd: i32,
561 _draining: &AtomicBool,
562) {
563 use std::os::unix::io::AsRawFd;
564 let listener_fd = listener.as_raw_fd();
565 let mut fds = [
566 libc::pollfd {
567 fd: listener_fd,
568 events: libc::POLLIN,
569 revents: 0,
570 },
571 libc::pollfd {
572 fd: shutdown_read_fd,
573 events: libc::POLLIN,
574 revents: 0,
575 },
576 ];
577 unsafe { libc::poll(fds.as_mut_ptr(), 2, -1) };
580}
581
582#[cfg(windows)]
583fn wait_for_accept_or_shutdown(
584 _listener: &std::net::TcpListener,
585 _shutdown_read_fd: i32,
586 draining: &AtomicBool,
587) {
588 if !draining.load(Ordering::SeqCst) {
591 std::thread::sleep(Duration::from_millis(10));
592 }
593}
594
595#[cfg(unix)]
596fn close_shutdown_pipe(read_fd: i32, write_fd: i32) {
597 unsafe {
598 libc::close(read_fd);
599 libc::close(write_fd);
600 }
601}
602
603#[cfg(windows)]
604fn close_shutdown_pipe(_read_fd: i32, _write_fd: i32) {}
605
606static SHUTDOWN_PIPE_WR: AtomicI32 = AtomicI32::new(-1);
608static GLOBAL_DRAINING: std::sync::atomic::AtomicPtr<AtomicBool> =
610 std::sync::atomic::AtomicPtr::new(std::ptr::null_mut());
611static GLOBAL_LOG_LEVEL: std::sync::atomic::AtomicPtr<AtomicU8> =
613 std::sync::atomic::AtomicPtr::new(std::ptr::null_mut());
614
615#[cfg(unix)]
616fn install_signal_handler(draining: Arc<AtomicBool>, write_fd: i32, log_level: Arc<AtomicU8>) {
617 SHUTDOWN_PIPE_WR.store(write_fd, Ordering::SeqCst);
620 let ptr = Arc::into_raw(draining).cast_mut();
624 GLOBAL_DRAINING.store(ptr, Ordering::SeqCst);
625 let lvl_ptr = Arc::into_raw(log_level).cast_mut();
627 GLOBAL_LOG_LEVEL.store(lvl_ptr, Ordering::SeqCst);
628
629 unsafe {
633 let mut sa: libc::sigaction = std::mem::zeroed();
634 sa.sa_sigaction = signal_handler as *const () as libc::sighandler_t;
635 libc::sigemptyset(&mut sa.sa_mask);
636 sa.sa_flags = libc::SA_RESTART;
637 libc::sigaction(libc::SIGTERM, &sa, std::ptr::null_mut());
638 libc::sigaction(libc::SIGINT, &sa, std::ptr::null_mut());
639
640 let mut sa_usr1: libc::sigaction = std::mem::zeroed();
641 sa_usr1.sa_sigaction = sigusr1_handler as *const () as libc::sighandler_t;
642 libc::sigemptyset(&mut sa_usr1.sa_mask);
643 sa_usr1.sa_flags = libc::SA_RESTART;
644 libc::sigaction(libc::SIGUSR1, &sa_usr1, std::ptr::null_mut());
645 }
646}
647
648#[cfg(unix)]
653extern "C" fn sigusr1_handler(_sig: libc::c_int) {
654 let ptr = GLOBAL_LOG_LEVEL.load(Ordering::SeqCst);
655 if ptr.is_null() {
656 return;
657 }
658 let level_atomic = unsafe { &*ptr };
659 let current = level_atomic.load(Ordering::SeqCst);
660 let next = LogLevel::from_u8(current).cycle();
661 level_atomic.store(next as u8, Ordering::SeqCst);
662
663 let msg = match next {
665 LogLevel::Error => b"[log] level changed to error\n" as &[u8],
666 LogLevel::Warn => b"[log] level changed to warn\n",
667 LogLevel::Info => b"[log] level changed to info\n",
668 LogLevel::Debug => b"[log] level changed to debug\n",
669 };
670 unsafe { libc::write(2, msg.as_ptr().cast(), msg.len()) };
671}
672
673#[cfg(unix)]
674extern "C" fn signal_handler(_sig: libc::c_int) {
675 let ptr = GLOBAL_DRAINING.load(Ordering::SeqCst);
677 if !ptr.is_null() {
678 unsafe { (*ptr).store(true, Ordering::SeqCst) };
679 }
680 let fd = SHUTDOWN_PIPE_WR.load(Ordering::SeqCst);
682 if fd >= 0 {
683 let byte: u8 = 1;
684 unsafe { libc::write(fd, (&byte as *const u8).cast(), 1) };
685 }
686}
687
688#[cfg(windows)]
689fn install_signal_handler(draining: Arc<AtomicBool>, _write_fd: i32, _log_level: Arc<AtomicU8>) {
690 let ptr = Arc::into_raw(draining).cast_mut();
692 GLOBAL_DRAINING.store(ptr, Ordering::SeqCst);
693
694 unsafe {
697 libc::signal(libc::SIGINT, windows_signal_handler as libc::sighandler_t);
698 }
699}
700
701#[cfg(windows)]
702extern "C" fn windows_signal_handler(_sig: libc::c_int) {
703 let ptr = GLOBAL_DRAINING.load(Ordering::SeqCst);
704 if !ptr.is_null() {
705 unsafe { (*ptr).store(true, Ordering::SeqCst) };
706 }
707 unsafe {
709 libc::signal(libc::SIGINT, windows_signal_handler as libc::sighandler_t);
710 }
711}
712
713const PRESET_WATCH_INTERVAL: Duration = Duration::from_secs(5);
719
720fn preset_watcher(
723 presets: Arc<std::sync::RwLock<std::collections::HashMap<String, TransformOptionsPayload>>>,
724 path: std::path::PathBuf,
725 draining: Arc<AtomicBool>,
726 config: Arc<ServerConfig>,
727) {
728 use config::parse_presets_file;
729 use std::fs;
730
731 let mut last_modified = fs::metadata(&path).and_then(|m| m.modified()).ok();
732
733 loop {
734 std::thread::sleep(PRESET_WATCH_INTERVAL);
735
736 if draining.load(Ordering::Relaxed) {
737 break;
738 }
739
740 let current_modified = match fs::metadata(&path).and_then(|m| m.modified()) {
741 Ok(mtime) => Some(mtime),
742 Err(err) => {
743 config.log_warn(&format!(
744 "[presets] failed to stat `{}`: {err}",
745 path.display()
746 ));
747 continue;
748 }
749 };
750
751 if current_modified == last_modified {
752 continue;
753 }
754
755 match parse_presets_file(&path) {
756 Ok(new_presets) => {
757 let count = new_presets.len();
758 *presets.write().expect("presets lock poisoned") = new_presets;
759 last_modified = current_modified;
760 config.log(&format!(
761 "[presets] reloaded {count} presets from `{}`",
762 path.display()
763 ));
764 }
765 Err(err) => {
766 config.log_warn(&format!(
767 "[presets] reload failed for `{}`: {err} (keeping previous presets)",
768 path.display()
769 ));
770 }
774 }
775 }
776}
777
778pub fn serve_once(listener: TcpListener) -> io::Result<()> {
788 let config = ServerConfig::from_env()?;
789 serve_once_with_config(listener, config)
790}
791
792pub fn serve_once_with_config(listener: TcpListener, config: ServerConfig) -> io::Result<()> {
799 let (stream, _) = listener.accept()?;
800 handle_stream(stream, &config)
801}
802
803#[derive(Debug, Deserialize)]
804#[serde(deny_unknown_fields)]
805struct TransformImageRequestPayload {
806 source: TransformSourcePayload,
807 #[serde(default)]
808 options: TransformOptionsPayload,
809 #[serde(default)]
810 watermark: Option<WatermarkPayload>,
811}
812
813#[derive(Debug, Deserialize)]
814#[serde(tag = "kind", rename_all = "lowercase")]
815enum TransformSourcePayload {
816 Path {
817 path: String,
818 version: Option<String>,
819 },
820 Url {
821 url: String,
822 version: Option<String>,
823 },
824 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
825 Storage {
826 bucket: Option<String>,
827 key: String,
828 version: Option<String>,
829 },
830}
831
832impl TransformSourcePayload {
833 fn versioned_source_hash(&self, config: &ServerConfig) -> Option<String> {
842 let (kind, reference, version): (&str, std::borrow::Cow<'_, str>, Option<&str>) = match self
843 {
844 Self::Path { path, version } => ("path", path.as_str().into(), version.as_deref()),
845 Self::Url { url, version } => ("url", url.as_str().into(), version.as_deref()),
846 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
847 Self::Storage {
848 bucket,
849 key,
850 version,
851 } => {
852 let (scheme, effective_bucket) =
853 storage_scheme_and_bucket(bucket.as_deref(), config);
854 let effective_bucket = effective_bucket?;
855 (
856 "storage",
857 format!("{scheme}://{effective_bucket}/{key}").into(),
858 version.as_deref(),
859 )
860 }
861 };
862 let version = version?;
863 let mut id = String::new();
867 id.push_str(kind);
868 id.push('\n');
869 id.push_str(&reference);
870 id.push('\n');
871 id.push_str(version);
872 id.push('\n');
873 id.push_str(config.storage_root.to_string_lossy().as_ref());
874 id.push('\n');
875 id.push_str(if config.allow_insecure_url_sources {
876 "insecure"
877 } else {
878 "strict"
879 });
880 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
881 {
882 id.push('\n');
883 id.push_str(storage_backend_label(config));
884 #[cfg(feature = "s3")]
885 if let Some(ref ctx) = config.s3_context
886 && let Some(ref endpoint) = ctx.endpoint_url
887 {
888 id.push('\n');
889 id.push_str(endpoint);
890 }
891 #[cfg(feature = "gcs")]
892 if let Some(ref ctx) = config.gcs_context
893 && let Some(ref endpoint) = ctx.endpoint_url
894 {
895 id.push('\n');
896 id.push_str(endpoint);
897 }
898 #[cfg(feature = "azure")]
899 if let Some(ref ctx) = config.azure_context {
900 id.push('\n');
901 id.push_str(&ctx.endpoint_url);
902 }
903 }
904 Some(hex::encode(Sha256::digest(id.as_bytes())))
905 }
906
907 fn metrics_backend_label(&self, _config: &ServerConfig) -> Option<StorageBackendLabel> {
911 match self {
912 Self::Path { .. } => Some(StorageBackendLabel::Filesystem),
913 Self::Url { .. } => None,
914 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
915 Self::Storage { .. } => Some(_config.storage_backend_label()),
916 }
917 }
918}
919
920#[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
921fn storage_scheme_and_bucket<'a>(
922 explicit_bucket: Option<&'a str>,
923 config: &'a ServerConfig,
924) -> (&'static str, Option<&'a str>) {
925 match config.storage_backend {
926 #[cfg(feature = "s3")]
927 StorageBackend::S3 => {
928 let bucket = explicit_bucket.or(config
929 .s3_context
930 .as_ref()
931 .map(|ctx| ctx.default_bucket.as_str()));
932 ("s3", bucket)
933 }
934 #[cfg(feature = "gcs")]
935 StorageBackend::Gcs => {
936 let bucket = explicit_bucket.or(config
937 .gcs_context
938 .as_ref()
939 .map(|ctx| ctx.default_bucket.as_str()));
940 ("gcs", bucket)
941 }
942 StorageBackend::Filesystem => ("fs", explicit_bucket),
943 #[cfg(feature = "azure")]
944 StorageBackend::Azure => {
945 let bucket = explicit_bucket.or(config
946 .azure_context
947 .as_ref()
948 .map(|ctx| ctx.default_container.as_str()));
949 ("azure", bucket)
950 }
951 }
952}
953
954#[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
955fn is_object_storage_backend(config: &ServerConfig) -> bool {
956 match config.storage_backend {
957 StorageBackend::Filesystem => false,
958 #[cfg(feature = "s3")]
959 StorageBackend::S3 => true,
960 #[cfg(feature = "gcs")]
961 StorageBackend::Gcs => true,
962 #[cfg(feature = "azure")]
963 StorageBackend::Azure => true,
964 }
965}
966
967#[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
968fn storage_backend_label(config: &ServerConfig) -> &'static str {
969 match config.storage_backend {
970 StorageBackend::Filesystem => "fs-backend",
971 #[cfg(feature = "s3")]
972 StorageBackend::S3 => "s3-backend",
973 #[cfg(feature = "gcs")]
974 StorageBackend::Gcs => "gcs-backend",
975 #[cfg(feature = "azure")]
976 StorageBackend::Azure => "azure-backend",
977 }
978}
979
980#[derive(Clone, Debug, Default, Deserialize, PartialEq)]
981#[serde(default, rename_all = "camelCase", deny_unknown_fields)]
982pub struct TransformOptionsPayload {
983 pub width: Option<u32>,
984 pub height: Option<u32>,
985 pub fit: Option<String>,
986 pub position: Option<String>,
987 pub format: Option<String>,
988 pub quality: Option<u8>,
989 pub background: Option<String>,
990 pub rotate: Option<u16>,
991 pub auto_orient: Option<bool>,
992 pub strip_metadata: Option<bool>,
993 pub preserve_exif: Option<bool>,
994 pub crop: Option<String>,
995 pub blur: Option<f32>,
996 pub sharpen: Option<f32>,
997}
998
999impl TransformOptionsPayload {
1000 fn with_overrides(self, overrides: &TransformOptionsPayload) -> Self {
1003 Self {
1004 width: overrides.width.or(self.width),
1005 height: overrides.height.or(self.height),
1006 fit: overrides.fit.clone().or(self.fit),
1007 position: overrides.position.clone().or(self.position),
1008 format: overrides.format.clone().or(self.format),
1009 quality: overrides.quality.or(self.quality),
1010 background: overrides.background.clone().or(self.background),
1011 rotate: overrides.rotate.or(self.rotate),
1012 auto_orient: overrides.auto_orient.or(self.auto_orient),
1013 strip_metadata: overrides.strip_metadata.or(self.strip_metadata),
1014 preserve_exif: overrides.preserve_exif.or(self.preserve_exif),
1015 crop: overrides.crop.clone().or(self.crop),
1016 blur: overrides.blur.or(self.blur),
1017 sharpen: overrides.sharpen.or(self.sharpen),
1018 }
1019 }
1020
1021 fn into_options(self) -> Result<TransformOptions, HttpResponse> {
1022 let defaults = TransformOptions::default();
1023
1024 Ok(TransformOptions {
1025 width: self.width,
1026 height: self.height,
1027 fit: parse_optional_named(self.fit.as_deref(), "fit", Fit::from_str)?,
1028 position: parse_optional_named(
1029 self.position.as_deref(),
1030 "position",
1031 Position::from_str,
1032 )?,
1033 format: parse_optional_named(self.format.as_deref(), "format", MediaType::from_str)?,
1034 quality: self.quality,
1035 background: parse_optional_named(
1036 self.background.as_deref(),
1037 "background",
1038 Rgba8::from_hex,
1039 )?,
1040 rotate: match self.rotate {
1041 Some(value) => parse_named(&value.to_string(), "rotate", Rotation::from_str)?,
1042 None => defaults.rotate,
1043 },
1044 auto_orient: self.auto_orient.unwrap_or(defaults.auto_orient),
1045 strip_metadata: self.strip_metadata.unwrap_or(defaults.strip_metadata),
1046 preserve_exif: self.preserve_exif.unwrap_or(defaults.preserve_exif),
1047 crop: parse_optional_named(self.crop.as_deref(), "crop", CropRegion::from_str)?,
1048 blur: self.blur,
1049 sharpen: self.sharpen,
1050 deadline: defaults.deadline,
1051 })
1052 }
1053}
1054
1055const REQUEST_DEADLINE_SECS: u64 = 60;
1057
1058const WATERMARK_DEFAULT_POSITION: Position = Position::BottomRight;
1059const WATERMARK_DEFAULT_OPACITY: u8 = 50;
1060const WATERMARK_DEFAULT_MARGIN: u32 = 10;
1061const WATERMARK_MAX_MARGIN: u32 = 9999;
1062
1063#[derive(Debug, Default, Deserialize)]
1064#[serde(default, rename_all = "camelCase", deny_unknown_fields)]
1065struct WatermarkPayload {
1066 url: Option<String>,
1067 position: Option<String>,
1068 opacity: Option<u8>,
1069 margin: Option<u32>,
1070}
1071
1072struct ValidatedWatermarkPayload {
1074 url: String,
1075 position: Position,
1076 opacity: u8,
1077 margin: u32,
1078}
1079
1080impl ValidatedWatermarkPayload {
1081 fn cache_identity(&self) -> String {
1082 compute_watermark_identity(
1083 &self.url,
1084 self.position.as_name(),
1085 self.opacity,
1086 self.margin,
1087 )
1088 }
1089}
1090
1091fn validate_watermark_payload(
1093 payload: Option<&WatermarkPayload>,
1094) -> Result<Option<ValidatedWatermarkPayload>, HttpResponse> {
1095 let Some(wm) = payload else {
1096 return Ok(None);
1097 };
1098 let url = wm.url.as_deref().filter(|u| !u.is_empty()).ok_or_else(|| {
1099 bad_request_response("watermark.url is required when watermark is present")
1100 })?;
1101
1102 let position = parse_optional_named(
1103 wm.position.as_deref(),
1104 "watermark.position",
1105 Position::from_str,
1106 )?
1107 .unwrap_or(WATERMARK_DEFAULT_POSITION);
1108
1109 let opacity = wm.opacity.unwrap_or(WATERMARK_DEFAULT_OPACITY);
1110 if opacity == 0 || opacity > 100 {
1111 return Err(bad_request_response(
1112 "watermark.opacity must be between 1 and 100",
1113 ));
1114 }
1115 let margin = wm.margin.unwrap_or(WATERMARK_DEFAULT_MARGIN);
1116 if margin > WATERMARK_MAX_MARGIN {
1117 return Err(bad_request_response(
1118 "watermark.margin must be at most 9999",
1119 ));
1120 }
1121
1122 Ok(Some(ValidatedWatermarkPayload {
1123 url: url.to_string(),
1124 position,
1125 opacity,
1126 margin,
1127 }))
1128}
1129
1130fn fetch_watermark(
1132 validated: ValidatedWatermarkPayload,
1133 config: &ServerConfig,
1134 deadline: Option<Instant>,
1135) -> Result<WatermarkInput, HttpResponse> {
1136 let bytes = read_remote_watermark_bytes(&validated.url, config, deadline)?;
1137 let artifact = sniff_artifact(RawArtifact::new(bytes, None))
1138 .map_err(|error| bad_request_response(&format!("watermark image is invalid: {error}")))?;
1139 if !artifact.media_type.is_raster() {
1140 return Err(bad_request_response(
1141 "watermark image must be a raster format (not SVG)",
1142 ));
1143 }
1144 Ok(WatermarkInput {
1145 image: artifact,
1146 position: validated.position,
1147 opacity: validated.opacity,
1148 margin: validated.margin,
1149 })
1150}
1151
1152fn resolve_multipart_watermark(
1153 bytes: Vec<u8>,
1154 position: Option<String>,
1155 opacity: Option<u8>,
1156 margin: Option<u32>,
1157) -> Result<WatermarkInput, HttpResponse> {
1158 let artifact = sniff_artifact(RawArtifact::new(bytes, None))
1159 .map_err(|error| bad_request_response(&format!("watermark image is invalid: {error}")))?;
1160 if !artifact.media_type.is_raster() {
1161 return Err(bad_request_response(
1162 "watermark image must be a raster format (not SVG)",
1163 ));
1164 }
1165 let position = parse_optional_named(
1166 position.as_deref(),
1167 "watermark_position",
1168 Position::from_str,
1169 )?
1170 .unwrap_or(WATERMARK_DEFAULT_POSITION);
1171 let opacity = opacity.unwrap_or(WATERMARK_DEFAULT_OPACITY);
1172 if opacity == 0 || opacity > 100 {
1173 return Err(bad_request_response(
1174 "watermark_opacity must be between 1 and 100",
1175 ));
1176 }
1177 let margin = margin.unwrap_or(WATERMARK_DEFAULT_MARGIN);
1178 if margin > WATERMARK_MAX_MARGIN {
1179 return Err(bad_request_response(
1180 "watermark_margin must be at most 9999",
1181 ));
1182 }
1183 Ok(WatermarkInput {
1184 image: artifact,
1185 position,
1186 opacity,
1187 margin,
1188 })
1189}
1190
1191struct AccessLogEntry<'a> {
1192 request_id: &'a str,
1193 method: &'a str,
1194 path: &'a str,
1195 route: &'a str,
1196 status: &'a str,
1197 start: Instant,
1198 cache_status: Option<&'a str>,
1199 watermark: bool,
1200}
1201
1202fn extract_request_id(headers: &[(String, String)]) -> Option<String> {
1206 headers.iter().find_map(|(name, value)| {
1207 if name != "x-request-id" || value.is_empty() {
1208 return None;
1209 }
1210 if value
1211 .bytes()
1212 .any(|b| b == b'\r' || b == b'\n' || b == b'\0')
1213 {
1214 return None;
1215 }
1216 Some(value.clone())
1217 })
1218}
1219
1220fn extract_cache_status(headers: &[(String, String)]) -> Option<&'static str> {
1223 headers
1224 .iter()
1225 .find_map(|(name, value)| (name == "Cache-Status").then_some(value.as_str()))
1226 .map(|v| if v.contains("hit") { "hit" } else { "miss" })
1227}
1228
1229fn extract_watermark_flag(headers: &mut Vec<(String, String)>) -> bool {
1231 let pos = headers
1232 .iter()
1233 .position(|(name, _)| name == "X-Truss-Watermark");
1234 if let Some(idx) = pos {
1235 headers.swap_remove(idx);
1236 true
1237 } else {
1238 false
1239 }
1240}
1241
1242fn emit_access_log(config: &ServerConfig, entry: &AccessLogEntry<'_>) {
1243 config.log(
1244 &json!({
1245 "kind": "access_log",
1246 "request_id": entry.request_id,
1247 "method": entry.method,
1248 "path": entry.path,
1249 "route": entry.route,
1250 "status": entry.status,
1251 "latency_ms": entry.start.elapsed().as_millis() as u64,
1252 "cache_status": entry.cache_status,
1253 "watermark": entry.watermark,
1254 })
1255 .to_string(),
1256 );
1257}
1258
1259fn handle_stream(mut stream: TcpStream, config: &ServerConfig) -> io::Result<()> {
1260 if let Err(err) = stream.set_read_timeout(Some(SOCKET_READ_TIMEOUT)) {
1262 config.log_warn(&format!("failed to set socket read timeout: {err}"));
1263 }
1264 if let Err(err) = stream.set_write_timeout(Some(SOCKET_WRITE_TIMEOUT)) {
1265 config.log_warn(&format!("failed to set socket write timeout: {err}"));
1266 }
1267
1268 let peer_ip = stream.peer_addr().ok().map(|addr| addr.ip());
1272
1273 let mut requests_served: u64 = 0;
1274
1275 loop {
1276 let partial = match read_request_headers(&mut stream, config.max_upload_bytes) {
1277 Ok(partial) => partial,
1278 Err(response) => {
1279 if requests_served > 0 {
1280 return Ok(());
1281 }
1282 let _ = write_response(&mut stream, response, true);
1283 return Ok(());
1284 }
1285 };
1286
1287 let start = Instant::now();
1290
1291 let request_id =
1292 extract_request_id(&partial.headers).unwrap_or_else(|| Uuid::new_v4().to_string());
1293
1294 if let (Some(limiter), Some(ip)) = (&config.rate_limiter, peer_ip)
1296 && !limiter.check(ip)
1297 {
1298 let mut response = too_many_requests_response("rate limit exceeded — try again later");
1299 response
1300 .headers
1301 .push(("X-Request-Id".to_string(), request_id.clone()));
1302 record_http_metrics(RouteMetric::Unknown, response.status);
1303 let sc = status_code(response.status).unwrap_or("unknown");
1304 let method_log = partial.method.clone();
1305 let path_log = partial.path().to_string();
1306 let _ = write_response(&mut stream, response, true);
1307 record_http_request_duration(RouteMetric::Unknown, start);
1308 emit_access_log(
1309 config,
1310 &AccessLogEntry {
1311 request_id: &request_id,
1312 method: &method_log,
1313 path: &path_log,
1314 route: &path_log,
1315 status: sc,
1316 start,
1317 cache_status: None,
1318 watermark: false,
1319 },
1320 );
1321 return Ok(());
1322 }
1323
1324 let client_wants_close = partial
1325 .headers
1326 .iter()
1327 .any(|(name, value)| name == "connection" && value.eq_ignore_ascii_case("close"));
1328
1329 let accepts_gzip = config.enable_compression
1330 && http_parse::header_value(&partial.headers, "accept-encoding")
1331 .is_some_and(|v| http_parse::accepts_encoding(v, "gzip"));
1332
1333 let is_head = partial.method == "HEAD";
1334
1335 let requires_auth = matches!(
1336 (partial.method.as_str(), partial.path()),
1337 ("POST", "/images:transform") | ("POST", "/images")
1338 );
1339 if requires_auth
1340 && let Err(mut response) = authorize_request_headers(&partial.headers, config)
1341 {
1342 response
1343 .headers
1344 .push(("X-Request-Id".to_string(), request_id.clone()));
1345 record_http_metrics(RouteMetric::Unknown, response.status);
1346 let sc = status_code(response.status).unwrap_or("unknown");
1347 let method_log = partial.method.clone();
1348 let path_log = partial.path().to_string();
1349 let _ = write_response_compressed(
1350 &mut stream,
1351 response,
1352 true,
1353 accepts_gzip,
1354 config.compression_level,
1355 );
1356 record_http_request_duration(RouteMetric::Unknown, start);
1357 emit_access_log(
1358 config,
1359 &AccessLogEntry {
1360 request_id: &request_id,
1361 method: &method_log,
1362 path: &path_log,
1363 route: &path_log,
1364 status: sc,
1365 start,
1366 cache_status: None,
1367 watermark: false,
1368 },
1369 );
1370 return Ok(());
1371 }
1372
1373 if matches!(
1376 (partial.method.as_str(), partial.path()),
1377 ("GET" | "HEAD", "/metrics")
1378 ) {
1379 let early_response = if config.disable_metrics {
1380 Some(HttpResponse::problem(
1381 "404 Not Found",
1382 NOT_FOUND_BODY.as_bytes().to_vec(),
1383 ))
1384 } else if let Some(expected) = &config.metrics_token {
1385 let provided = http_parse::header_value(&partial.headers, "authorization")
1386 .and_then(|value| {
1387 let (scheme, token) = value.split_once(|c: char| c.is_whitespace())?;
1388 scheme.eq_ignore_ascii_case("Bearer").then(|| token.trim())
1389 });
1390 match provided {
1391 Some(token) if token.as_bytes().ct_eq(expected.as_bytes()).into() => None,
1392 _ => Some(response::auth_required_response(
1393 "metrics endpoint requires authentication",
1394 )),
1395 }
1396 } else {
1397 None
1398 };
1399
1400 if let Some(mut response) = early_response {
1401 response
1402 .headers
1403 .push(("X-Request-Id".to_string(), request_id.clone()));
1404 record_http_metrics(RouteMetric::Metrics, response.status);
1405 let sc = status_code(response.status).unwrap_or("unknown");
1406 let method_log = partial.method.clone();
1407 let path_log = partial.path().to_string();
1408 let _ = write_response_compressed(
1409 &mut stream,
1410 response,
1411 true,
1412 accepts_gzip,
1413 config.compression_level,
1414 );
1415 record_http_request_duration(RouteMetric::Metrics, start);
1416 emit_access_log(
1417 config,
1418 &AccessLogEntry {
1419 request_id: &request_id,
1420 method: &method_log,
1421 path: &path_log,
1422 route: "/metrics",
1423 status: sc,
1424 start,
1425 cache_status: None,
1426 watermark: false,
1427 },
1428 );
1429 return Ok(());
1430 }
1431 }
1432
1433 let method = partial.method.clone();
1435 let path = partial.path().to_string();
1436
1437 let request = match read_request_body(&mut stream, partial) {
1438 Ok(request) => request,
1439 Err(mut response) => {
1440 response
1441 .headers
1442 .push(("X-Request-Id".to_string(), request_id.clone()));
1443 record_http_metrics(RouteMetric::Unknown, response.status);
1444 let sc = status_code(response.status).unwrap_or("unknown");
1445 let _ = write_response_compressed(
1446 &mut stream,
1447 response,
1448 true,
1449 accepts_gzip,
1450 config.compression_level,
1451 );
1452 record_http_request_duration(RouteMetric::Unknown, start);
1453 emit_access_log(
1454 config,
1455 &AccessLogEntry {
1456 request_id: &request_id,
1457 method: &method,
1458 path: &path,
1459 route: &path,
1460 status: sc,
1461 start,
1462 cache_status: None,
1463 watermark: false,
1464 },
1465 );
1466 return Ok(());
1467 }
1468 };
1469 let route = classify_route(&request);
1470 let mut response = route_request(request, config);
1471 record_http_metrics(route, response.status);
1472
1473 response
1474 .headers
1475 .push(("X-Request-Id".to_string(), request_id.clone()));
1476
1477 let cache_status = extract_cache_status(&response.headers);
1478 let had_watermark = extract_watermark_flag(&mut response.headers);
1479
1480 let sc = status_code(response.status).unwrap_or("unknown");
1481
1482 if is_head {
1483 response.body = Vec::new();
1484 }
1485
1486 requests_served += 1;
1487 let close_after = client_wants_close || requests_served >= config.keep_alive_max_requests;
1488
1489 write_response_compressed(
1490 &mut stream,
1491 response,
1492 close_after,
1493 accepts_gzip,
1494 config.compression_level,
1495 )?;
1496 record_http_request_duration(route, start);
1497
1498 emit_access_log(
1499 config,
1500 &AccessLogEntry {
1501 request_id: &request_id,
1502 method: &method,
1503 path: &path,
1504 route: route.as_label(),
1505 status: sc,
1506 start,
1507 cache_status,
1508 watermark: had_watermark,
1509 },
1510 );
1511
1512 if close_after {
1513 return Ok(());
1514 }
1515 }
1516}
1517
1518fn route_request(request: HttpRequest, config: &ServerConfig) -> HttpResponse {
1519 let method = request.method.clone();
1520 let path = request.path().to_string();
1521
1522 match (method.as_str(), path.as_str()) {
1523 ("GET" | "HEAD", "/health") => handle_health(config),
1524 ("GET" | "HEAD", "/health/live") => handle_health_live(),
1525 ("GET" | "HEAD", "/health/ready") => handle_health_ready(config),
1526 ("GET" | "HEAD", "/images/by-path") => handle_public_path_request(request, config),
1527 ("GET" | "HEAD", "/images/by-url") => handle_public_url_request(request, config),
1528 ("POST", "/images:transform") => handle_transform_request(request, config),
1529 ("POST", "/images") => handle_upload_request(request, config),
1530 ("GET" | "HEAD", "/metrics") => handle_metrics_request(request, config),
1531 _ => HttpResponse::problem("404 Not Found", NOT_FOUND_BODY.as_bytes().to_vec()),
1532 }
1533}
1534
1535fn classify_route(request: &HttpRequest) -> RouteMetric {
1536 match (request.method.as_str(), request.path()) {
1537 ("GET" | "HEAD", "/health") => RouteMetric::Health,
1538 ("GET" | "HEAD", "/health/live") => RouteMetric::HealthLive,
1539 ("GET" | "HEAD", "/health/ready") => RouteMetric::HealthReady,
1540 ("GET" | "HEAD", "/images/by-path") => RouteMetric::PublicByPath,
1541 ("GET" | "HEAD", "/images/by-url") => RouteMetric::PublicByUrl,
1542 ("POST", "/images:transform") => RouteMetric::Transform,
1543 ("POST", "/images") => RouteMetric::Upload,
1544 ("GET" | "HEAD", "/metrics") => RouteMetric::Metrics,
1545 _ => RouteMetric::Unknown,
1546 }
1547}
1548
1549fn handle_transform_request(request: HttpRequest, config: &ServerConfig) -> HttpResponse {
1550 let request_deadline = Some(Instant::now() + Duration::from_secs(REQUEST_DEADLINE_SECS));
1551
1552 if let Err(response) = authorize_request(&request, config) {
1553 return response;
1554 }
1555
1556 if !request_has_json_content_type(&request) {
1557 return unsupported_media_type_response("content-type must be application/json");
1558 }
1559
1560 let payload: TransformImageRequestPayload = match serde_json::from_slice(&request.body) {
1561 Ok(payload) => payload,
1562 Err(error) => {
1563 return bad_request_response(&format!("request body must be valid JSON: {error}"));
1564 }
1565 };
1566 let options = match payload.options.into_options() {
1567 Ok(options) => options,
1568 Err(response) => return response,
1569 };
1570
1571 let versioned_hash = payload.source.versioned_source_hash(config);
1572 let validated_wm = match validate_watermark_payload(payload.watermark.as_ref()) {
1573 Ok(wm) => wm,
1574 Err(response) => return response,
1575 };
1576 let watermark_id = validated_wm.as_ref().map(|v| v.cache_identity());
1577
1578 if let Some(response) = try_versioned_cache_lookup(
1579 versioned_hash.as_deref(),
1580 &options,
1581 &request,
1582 ImageResponsePolicy::PrivateTransform,
1583 config,
1584 watermark_id.as_deref(),
1585 ) {
1586 return response;
1587 }
1588
1589 let storage_start = Instant::now();
1590 let backend_label = payload.source.metrics_backend_label(config);
1591 let backend_idx = backend_label.map(|l| storage_backend_index_from_config(&l));
1592 let source_bytes = match resolve_source_bytes(payload.source, config, request_deadline) {
1593 Ok(bytes) => {
1594 if let Some(idx) = backend_idx {
1595 record_storage_duration(idx, storage_start);
1596 }
1597 bytes
1598 }
1599 Err(response) => {
1600 if let Some(idx) = backend_idx {
1601 record_storage_duration(idx, storage_start);
1602 }
1603 return response;
1604 }
1605 };
1606 transform_source_bytes(
1607 source_bytes,
1608 options,
1609 versioned_hash.as_deref(),
1610 &request,
1611 ImageResponsePolicy::PrivateTransform,
1612 config,
1613 WatermarkSource::from_validated(validated_wm),
1614 watermark_id.as_deref(),
1615 request_deadline,
1616 )
1617}
1618
1619fn handle_public_path_request(request: HttpRequest, config: &ServerConfig) -> HttpResponse {
1620 handle_public_get_request(request, config, PublicSourceKind::Path)
1621}
1622
1623fn handle_public_url_request(request: HttpRequest, config: &ServerConfig) -> HttpResponse {
1624 handle_public_get_request(request, config, PublicSourceKind::Url)
1625}
1626
1627fn handle_public_get_request(
1628 request: HttpRequest,
1629 config: &ServerConfig,
1630 source_kind: PublicSourceKind,
1631) -> HttpResponse {
1632 let request_deadline = Some(Instant::now() + Duration::from_secs(REQUEST_DEADLINE_SECS));
1633 let query = match parse_query_params(&request) {
1634 Ok(query) => query,
1635 Err(response) => return response,
1636 };
1637 if let Err(response) = authorize_signed_request(&request, &query, config) {
1638 return response;
1639 }
1640 let (source, options, watermark_payload) =
1641 match parse_public_get_request(&query, source_kind, config) {
1642 Ok(parsed) => parsed,
1643 Err(response) => return response,
1644 };
1645
1646 let validated_wm = match validate_watermark_payload(watermark_payload.as_ref()) {
1647 Ok(wm) => wm,
1648 Err(response) => return response,
1649 };
1650 let watermark_id = validated_wm.as_ref().map(|v| v.cache_identity());
1651
1652 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
1656 let source = if is_object_storage_backend(config) {
1657 match source {
1658 TransformSourcePayload::Path { path, version } => TransformSourcePayload::Storage {
1659 bucket: None,
1660 key: path.trim_start_matches('/').to_string(),
1661 version,
1662 },
1663 other => other,
1664 }
1665 } else {
1666 source
1667 };
1668
1669 let versioned_hash = source.versioned_source_hash(config);
1670 if let Some(response) = try_versioned_cache_lookup(
1671 versioned_hash.as_deref(),
1672 &options,
1673 &request,
1674 ImageResponsePolicy::PublicGet,
1675 config,
1676 watermark_id.as_deref(),
1677 ) {
1678 return response;
1679 }
1680
1681 let storage_start = Instant::now();
1682 let backend_label = source.metrics_backend_label(config);
1683 let backend_idx = backend_label.map(|l| storage_backend_index_from_config(&l));
1684 let source_bytes = match resolve_source_bytes(source, config, request_deadline) {
1685 Ok(bytes) => {
1686 if let Some(idx) = backend_idx {
1687 record_storage_duration(idx, storage_start);
1688 }
1689 bytes
1690 }
1691 Err(response) => {
1692 if let Some(idx) = backend_idx {
1693 record_storage_duration(idx, storage_start);
1694 }
1695 return response;
1696 }
1697 };
1698
1699 transform_source_bytes(
1700 source_bytes,
1701 options,
1702 versioned_hash.as_deref(),
1703 &request,
1704 ImageResponsePolicy::PublicGet,
1705 config,
1706 WatermarkSource::from_validated(validated_wm),
1707 watermark_id.as_deref(),
1708 request_deadline,
1709 )
1710}
1711
1712fn handle_upload_request(request: HttpRequest, config: &ServerConfig) -> HttpResponse {
1713 if let Err(response) = authorize_request(&request, config) {
1714 return response;
1715 }
1716
1717 let boundary = match parse_multipart_boundary(&request) {
1718 Ok(boundary) => boundary,
1719 Err(response) => return response,
1720 };
1721 let (file_bytes, options, watermark) = match parse_upload_request(&request.body, &boundary) {
1722 Ok(parts) => parts,
1723 Err(response) => return response,
1724 };
1725 let watermark_identity = watermark.as_ref().map(|wm| {
1726 let content_hash = hex::encode(sha2::Sha256::digest(&wm.image.bytes));
1727 cache::compute_watermark_content_identity(
1728 &content_hash,
1729 wm.position.as_name(),
1730 wm.opacity,
1731 wm.margin,
1732 )
1733 });
1734 transform_source_bytes(
1735 file_bytes,
1736 options,
1737 None,
1738 &request,
1739 ImageResponsePolicy::PrivateTransform,
1740 config,
1741 WatermarkSource::from_ready(watermark),
1742 watermark_identity.as_deref(),
1743 None,
1744 )
1745}
1746
1747#[cfg(target_os = "linux")]
1750fn disk_free_bytes(path: &std::path::Path) -> Option<u64> {
1751 use std::ffi::CString;
1752
1753 let c_path = CString::new(path.to_str()?).ok()?;
1754 let mut stat: libc::statvfs = unsafe { std::mem::zeroed() };
1755 let ret = unsafe { libc::statvfs(c_path.as_ptr(), &mut stat) };
1756 if ret == 0 {
1757 stat.f_bavail.checked_mul(stat.f_frsize)
1758 } else {
1759 None
1760 }
1761}
1762
1763#[cfg(not(target_os = "linux"))]
1764fn disk_free_bytes(_path: &std::path::Path) -> Option<u64> {
1765 None
1766}
1767
1768#[cfg(target_os = "linux")]
1771fn process_rss_bytes() -> Option<u64> {
1772 let status = std::fs::read_to_string("/proc/self/status").ok()?;
1773 for line in status.lines() {
1774 if let Some(value) = line.strip_prefix("VmRSS:") {
1775 let value = value.trim();
1776 let kb_str = value.strip_suffix(" kB")?.trim();
1778 let kb: u64 = kb_str.parse().ok()?;
1779 return kb.checked_mul(1024);
1780 }
1781 }
1782 None
1783}
1784
1785#[cfg(not(target_os = "linux"))]
1786fn process_rss_bytes() -> Option<u64> {
1787 None
1788}
1789
1790fn handle_health_live() -> HttpResponse {
1792 let body = serde_json::to_vec(&json!({
1793 "status": "ok",
1794 "service": "truss",
1795 "version": env!("CARGO_PKG_VERSION"),
1796 }))
1797 .expect("serialize liveness");
1798 let mut body = body;
1799 body.push(b'\n');
1800 HttpResponse::json("200 OK", body)
1801}
1802
1803fn handle_health_ready(config: &ServerConfig) -> HttpResponse {
1807 let mut checks: Vec<serde_json::Value> = Vec::new();
1808 let mut all_ok = true;
1809
1810 if config.draining.load(Ordering::Relaxed) {
1815 let mut body = serde_json::to_vec(&json!({
1816 "status": "fail",
1817 "checks": [{ "name": "draining", "status": "fail" }],
1818 }))
1819 .expect("serialize readiness");
1820 body.push(b'\n');
1821 return HttpResponse::problem("503 Service Unavailable", body);
1822 }
1823
1824 for (ok, name) in storage_health_check(config) {
1825 checks.push(json!({
1826 "name": name,
1827 "status": if ok { "ok" } else { "fail" },
1828 }));
1829 if !ok {
1830 all_ok = false;
1831 }
1832 }
1833
1834 if let Some(cache_root) = &config.cache_root {
1835 let cache_ok = cache_root.is_dir();
1836 checks.push(json!({
1837 "name": "cacheRoot",
1838 "status": if cache_ok { "ok" } else { "fail" },
1839 }));
1840 if !cache_ok {
1841 all_ok = false;
1842 }
1843 }
1844
1845 if let Some(cache_root) = &config.cache_root {
1846 let free = disk_free_bytes(cache_root);
1847 let threshold = config.health_cache_min_free_bytes;
1848 let disk_ok = match (free, threshold) {
1849 (Some(f), Some(min)) => f >= min,
1850 _ => true,
1851 };
1852 let mut check = json!({
1853 "name": "cacheDiskFree",
1854 "status": if disk_ok { "ok" } else { "fail" },
1855 });
1856 if let Some(f) = free {
1857 check["freeBytes"] = json!(f);
1858 }
1859 if let Some(min) = threshold {
1860 check["thresholdBytes"] = json!(min);
1861 }
1862 checks.push(check);
1863 if !disk_ok {
1864 all_ok = false;
1865 }
1866 }
1867
1868 let in_flight = config.transforms_in_flight.load(Ordering::Relaxed);
1870 let overloaded = in_flight >= config.max_concurrent_transforms;
1871 checks.push(json!({
1872 "name": "transformCapacity",
1873 "status": if overloaded { "fail" } else { "ok" },
1874 "current": in_flight,
1875 "max": config.max_concurrent_transforms,
1876 }));
1877 if overloaded {
1878 all_ok = false;
1879 }
1880
1881 if let Some(rss_bytes) = process_rss_bytes() {
1883 let threshold = config.health_max_memory_bytes;
1884 let mem_ok = threshold.is_none_or(|max| rss_bytes <= max);
1885 let mut check = json!({
1886 "name": "memoryUsage",
1887 "status": if mem_ok { "ok" } else { "fail" },
1888 "rssBytes": rss_bytes,
1889 });
1890 if let Some(max) = threshold {
1891 check["thresholdBytes"] = json!(max);
1892 }
1893 checks.push(check);
1894 if !mem_ok {
1895 all_ok = false;
1896 }
1897 }
1898
1899 let status_str = if all_ok { "ok" } else { "fail" };
1900 let mut body = serde_json::to_vec(&json!({
1901 "status": status_str,
1902 "checks": checks,
1903 }))
1904 .expect("serialize readiness");
1905 body.push(b'\n');
1906
1907 if all_ok {
1908 HttpResponse::json("200 OK", body)
1909 } else {
1910 HttpResponse::json("503 Service Unavailable", body)
1911 }
1912}
1913
1914fn storage_health_check(config: &ServerConfig) -> Vec<(bool, &'static str)> {
1916 #[allow(unused_mut)]
1917 let mut checks = vec![(config.storage_root.is_dir(), "storageRoot")];
1918 #[cfg(feature = "s3")]
1919 if config.storage_backend == StorageBackend::S3 {
1920 let reachable = config
1921 .s3_context
1922 .as_ref()
1923 .is_some_and(|ctx| ctx.check_reachable());
1924 checks.push((reachable, "storageBackend"));
1925 }
1926 #[cfg(feature = "gcs")]
1927 if config.storage_backend == StorageBackend::Gcs {
1928 let reachable = config
1929 .gcs_context
1930 .as_ref()
1931 .is_some_and(|ctx| ctx.check_reachable());
1932 checks.push((reachable, "storageBackend"));
1933 }
1934 #[cfg(feature = "azure")]
1935 if config.storage_backend == StorageBackend::Azure {
1936 let reachable = config
1937 .azure_context
1938 .as_ref()
1939 .is_some_and(|ctx| ctx.check_reachable());
1940 checks.push((reachable, "storageBackend"));
1941 }
1942 checks
1943}
1944
1945fn handle_health(config: &ServerConfig) -> HttpResponse {
1946 let mut checks: Vec<serde_json::Value> = Vec::new();
1947 let mut all_ok = true;
1948
1949 for (ok, name) in storage_health_check(config) {
1950 checks.push(json!({
1951 "name": name,
1952 "status": if ok { "ok" } else { "fail" },
1953 }));
1954 if !ok {
1955 all_ok = false;
1956 }
1957 }
1958
1959 if let Some(cache_root) = &config.cache_root {
1960 let cache_ok = cache_root.is_dir();
1961 checks.push(json!({
1962 "name": "cacheRoot",
1963 "status": if cache_ok { "ok" } else { "fail" },
1964 }));
1965 if !cache_ok {
1966 all_ok = false;
1967 }
1968 }
1969
1970 if let Some(cache_root) = &config.cache_root {
1972 let free = disk_free_bytes(cache_root);
1973 let threshold = config.health_cache_min_free_bytes;
1974 let disk_ok = match (free, threshold) {
1975 (Some(f), Some(min)) => f >= min,
1976 _ => true,
1977 };
1978 let mut check = json!({
1979 "name": "cacheDiskFree",
1980 "status": if disk_ok { "ok" } else { "fail" },
1981 });
1982 if let Some(f) = free {
1983 check["freeBytes"] = json!(f);
1984 }
1985 if let Some(min) = threshold {
1986 check["thresholdBytes"] = json!(min);
1987 }
1988 checks.push(check);
1989 if !disk_ok {
1990 all_ok = false;
1991 }
1992 }
1993
1994 let in_flight = config.transforms_in_flight.load(Ordering::Relaxed);
1996 let overloaded = in_flight >= config.max_concurrent_transforms;
1997 checks.push(json!({
1998 "name": "transformCapacity",
1999 "status": if overloaded { "fail" } else { "ok" },
2000 "current": in_flight,
2001 "max": config.max_concurrent_transforms,
2002 }));
2003 if overloaded {
2004 all_ok = false;
2005 }
2006
2007 let rss = process_rss_bytes();
2009 if let Some(rss_bytes) = rss {
2010 let threshold = config.health_max_memory_bytes;
2011 let mem_ok = threshold.is_none_or(|max| rss_bytes <= max);
2012 let mut check = json!({
2013 "name": "memoryUsage",
2014 "status": if mem_ok { "ok" } else { "fail" },
2015 "rssBytes": rss_bytes,
2016 });
2017 if let Some(max) = threshold {
2018 check["thresholdBytes"] = json!(max);
2019 }
2020 checks.push(check);
2021 if !mem_ok {
2022 all_ok = false;
2023 }
2024 }
2025
2026 let status_str = if all_ok { "ok" } else { "fail" };
2027 let mut body = serde_json::to_vec(&json!({
2028 "status": status_str,
2029 "service": "truss",
2030 "version": env!("CARGO_PKG_VERSION"),
2031 "uptimeSeconds": uptime_seconds(),
2032 "checks": checks,
2033 "maxInputPixels": config.max_input_pixels,
2034 }))
2035 .expect("serialize health");
2036 body.push(b'\n');
2037
2038 HttpResponse::json("200 OK", body)
2039}
2040
2041fn handle_metrics_request(request: HttpRequest, config: &ServerConfig) -> HttpResponse {
2042 if config.disable_metrics {
2043 return HttpResponse::problem("404 Not Found", NOT_FOUND_BODY.as_bytes().to_vec());
2044 }
2045
2046 if let Some(expected) = &config.metrics_token {
2047 let provided = request.header("authorization").and_then(|value| {
2048 let (scheme, token) = value.split_once(|c: char| c.is_whitespace())?;
2049 scheme.eq_ignore_ascii_case("Bearer").then(|| token.trim())
2050 });
2051 match provided {
2052 Some(token) if token.as_bytes().ct_eq(expected.as_bytes()).into() => {}
2053 _ => {
2054 return response::auth_required_response(
2055 "metrics endpoint requires authentication",
2056 );
2057 }
2058 }
2059 }
2060
2061 HttpResponse::text(
2062 "200 OK",
2063 "text/plain; version=0.0.4; charset=utf-8",
2064 render_metrics_text(
2065 config.max_concurrent_transforms,
2066 &config.transforms_in_flight,
2067 )
2068 .into_bytes(),
2069 )
2070}
2071
2072fn parse_public_get_request(
2073 query: &BTreeMap<String, String>,
2074 source_kind: PublicSourceKind,
2075 config: &ServerConfig,
2076) -> Result<
2077 (
2078 TransformSourcePayload,
2079 TransformOptions,
2080 Option<WatermarkPayload>,
2081 ),
2082 HttpResponse,
2083> {
2084 validate_public_query_names(query, source_kind)?;
2085
2086 let source = match source_kind {
2087 PublicSourceKind::Path => TransformSourcePayload::Path {
2088 path: required_query_param(query, "path")?.to_string(),
2089 version: query.get("version").cloned(),
2090 },
2091 PublicSourceKind::Url => TransformSourcePayload::Url {
2092 url: required_query_param(query, "url")?.to_string(),
2093 version: query.get("version").cloned(),
2094 },
2095 };
2096
2097 let has_orphaned_watermark_params = query.contains_key("watermarkPosition")
2098 || query.contains_key("watermarkOpacity")
2099 || query.contains_key("watermarkMargin");
2100 let watermark = if query.contains_key("watermarkUrl") {
2101 Some(WatermarkPayload {
2102 url: query.get("watermarkUrl").cloned(),
2103 position: query.get("watermarkPosition").cloned(),
2104 opacity: parse_optional_u8_query(query, "watermarkOpacity")?,
2105 margin: parse_optional_integer_query(query, "watermarkMargin")?,
2106 })
2107 } else if has_orphaned_watermark_params {
2108 return Err(bad_request_response(
2109 "watermarkPosition, watermarkOpacity, and watermarkMargin require watermarkUrl",
2110 ));
2111 } else {
2112 None
2113 };
2114
2115 let per_request = TransformOptionsPayload {
2117 width: parse_optional_integer_query(query, "width")?,
2118 height: parse_optional_integer_query(query, "height")?,
2119 fit: query.get("fit").cloned(),
2120 position: query.get("position").cloned(),
2121 format: query.get("format").cloned(),
2122 quality: parse_optional_u8_query(query, "quality")?,
2123 background: query.get("background").cloned(),
2124 rotate: query
2125 .get("rotate")
2126 .map(|v| v.parse::<u16>())
2127 .transpose()
2128 .map_err(|_| bad_request_response("rotate must be 0, 90, 180, or 270"))?,
2129 auto_orient: parse_optional_bool_query(query, "autoOrient")?,
2130 strip_metadata: parse_optional_bool_query(query, "stripMetadata")?,
2131 preserve_exif: parse_optional_bool_query(query, "preserveExif")?,
2132 crop: query.get("crop").cloned(),
2133 blur: parse_optional_float_query(query, "blur")?,
2134 sharpen: parse_optional_float_query(query, "sharpen")?,
2135 };
2136
2137 let merged = if let Some(preset_name) = query.get("preset") {
2139 let presets = config.presets.read().expect("presets lock poisoned");
2140 let preset = presets
2141 .get(preset_name)
2142 .ok_or_else(|| bad_request_response(&format!("unknown preset `{preset_name}`")))?;
2143 preset.clone().with_overrides(&per_request)
2144 } else {
2145 per_request
2146 };
2147
2148 let options = merged.into_options()?;
2149
2150 Ok((source, options, watermark))
2151}
2152
2153enum WatermarkSource {
2155 Deferred(ValidatedWatermarkPayload),
2156 Ready(WatermarkInput),
2157 None,
2158}
2159
2160impl WatermarkSource {
2161 fn from_validated(validated: Option<ValidatedWatermarkPayload>) -> Self {
2162 match validated {
2163 Some(v) => Self::Deferred(v),
2164 None => Self::None,
2165 }
2166 }
2167
2168 fn from_ready(input: Option<WatermarkInput>) -> Self {
2169 match input {
2170 Some(w) => Self::Ready(w),
2171 None => Self::None,
2172 }
2173 }
2174
2175 fn is_some(&self) -> bool {
2176 !matches!(self, Self::None)
2177 }
2178}
2179
2180#[allow(clippy::too_many_arguments)]
2181fn transform_source_bytes(
2182 source_bytes: Vec<u8>,
2183 options: TransformOptions,
2184 versioned_hash: Option<&str>,
2185 request: &HttpRequest,
2186 response_policy: ImageResponsePolicy,
2187 config: &ServerConfig,
2188 watermark: WatermarkSource,
2189 watermark_identity: Option<&str>,
2190 request_deadline: Option<Instant>,
2191) -> HttpResponse {
2192 let content_hash;
2193 let source_hash = match versioned_hash {
2194 Some(hash) => hash,
2195 None => {
2196 content_hash = hex::encode(Sha256::digest(&source_bytes));
2197 &content_hash
2198 }
2199 };
2200
2201 let cache = config.cache_root.as_ref().map(|root| {
2202 TransformCache::new(root.clone())
2203 .with_log_handler(config.log_handler.clone())
2204 .with_max_bytes(config.cache_max_bytes)
2205 });
2206
2207 if let Some(ref cache) = cache
2208 && options.format.is_some()
2209 {
2210 let cache_key = compute_cache_key(source_hash, &options, None, watermark_identity);
2211 if let CacheLookup::Hit {
2212 media_type,
2213 body,
2214 age,
2215 } = cache.get(&cache_key)
2216 {
2217 CACHE_HITS_TOTAL.fetch_add(1, Ordering::Relaxed);
2218 let etag = build_image_etag(&body);
2219 let mut headers = build_image_response_headers(
2220 media_type,
2221 &etag,
2222 response_policy,
2223 false,
2224 CacheHitStatus::Hit,
2225 config.public_max_age_seconds,
2226 config.public_stale_while_revalidate_seconds,
2227 &config.custom_response_headers,
2228 );
2229 headers.push(("Age".to_string(), age.as_secs().to_string()));
2230 if matches!(response_policy, ImageResponsePolicy::PublicGet)
2231 && if_none_match_matches(request.header("if-none-match"), &etag)
2232 {
2233 return HttpResponse::empty("304 Not Modified", headers);
2234 }
2235 return HttpResponse::binary_with_headers(
2236 "200 OK",
2237 media_type.as_mime(),
2238 headers,
2239 body,
2240 );
2241 }
2242 }
2243
2244 let _slot = match TransformSlot::try_acquire(
2245 &config.transforms_in_flight,
2246 config.max_concurrent_transforms,
2247 ) {
2248 Some(slot) => slot,
2249 None => return service_unavailable_response("too many concurrent transforms; retry later"),
2250 };
2251 transform_source_bytes_inner(
2252 source_bytes,
2253 options,
2254 request,
2255 response_policy,
2256 cache.as_ref(),
2257 source_hash,
2258 ImageResponseConfig {
2259 disable_accept_negotiation: config.disable_accept_negotiation,
2260 public_cache_control: PublicCacheControl {
2261 max_age: config.public_max_age_seconds,
2262 stale_while_revalidate: config.public_stale_while_revalidate_seconds,
2263 },
2264 transform_deadline: Duration::from_secs(config.transform_deadline_secs),
2265 },
2266 watermark,
2267 watermark_identity,
2268 config,
2269 request_deadline,
2270 )
2271}
2272
2273#[allow(clippy::too_many_arguments)]
2274fn transform_source_bytes_inner(
2275 source_bytes: Vec<u8>,
2276 mut options: TransformOptions,
2277 request: &HttpRequest,
2278 response_policy: ImageResponsePolicy,
2279 cache: Option<&TransformCache>,
2280 source_hash: &str,
2281 response_config: ImageResponseConfig,
2282 watermark_source: WatermarkSource,
2283 watermark_identity: Option<&str>,
2284 config: &ServerConfig,
2285 request_deadline: Option<Instant>,
2286) -> HttpResponse {
2287 if options.deadline.is_none() {
2288 options.deadline = Some(response_config.transform_deadline);
2289 }
2290 let artifact = match sniff_artifact(RawArtifact::new(source_bytes, None)) {
2291 Ok(artifact) => artifact,
2292 Err(error) => {
2293 record_transform_error(&error);
2294 return transform_error_response(error);
2295 }
2296 };
2297 let negotiation_used =
2298 if options.format.is_none() && !response_config.disable_accept_negotiation {
2299 match negotiate_output_format(
2300 request.header("accept"),
2301 &artifact,
2302 &config.format_preference,
2303 ) {
2304 Ok(Some(format)) => {
2305 options.format = Some(format);
2306 true
2307 }
2308 Ok(None) => false,
2309 Err(response) => return response,
2310 }
2311 } else {
2312 false
2313 };
2314
2315 if let (Some(w), Some(h)) = (artifact.metadata.width, artifact.metadata.height) {
2319 let pixels = u64::from(w) * u64::from(h);
2320 if pixels > config.max_input_pixels {
2321 return response::unprocessable_entity_response(&format!(
2322 "input image has {pixels} pixels, server limit is {}",
2323 config.max_input_pixels
2324 ));
2325 }
2326 }
2327
2328 let negotiated_accept = if negotiation_used {
2329 request.header("accept")
2330 } else {
2331 None
2332 };
2333 let cache_key = compute_cache_key(source_hash, &options, negotiated_accept, watermark_identity);
2334
2335 if let Some(cache) = cache
2336 && let CacheLookup::Hit {
2337 media_type,
2338 body,
2339 age,
2340 } = cache.get(&cache_key)
2341 {
2342 CACHE_HITS_TOTAL.fetch_add(1, Ordering::Relaxed);
2343 let etag = build_image_etag(&body);
2344 let mut headers = build_image_response_headers(
2345 media_type,
2346 &etag,
2347 response_policy,
2348 negotiation_used,
2349 CacheHitStatus::Hit,
2350 response_config.public_cache_control.max_age,
2351 response_config.public_cache_control.stale_while_revalidate,
2352 &config.custom_response_headers,
2353 );
2354 headers.push(("Age".to_string(), age.as_secs().to_string()));
2355 if matches!(response_policy, ImageResponsePolicy::PublicGet)
2356 && if_none_match_matches(request.header("if-none-match"), &etag)
2357 {
2358 return HttpResponse::empty("304 Not Modified", headers);
2359 }
2360 return HttpResponse::binary_with_headers("200 OK", media_type.as_mime(), headers, body);
2361 }
2362
2363 if cache.is_some() {
2364 CACHE_MISSES_TOTAL.fetch_add(1, Ordering::Relaxed);
2365 }
2366
2367 let is_svg = artifact.media_type == MediaType::Svg;
2368
2369 let watermark = if is_svg && watermark_source.is_some() {
2371 return bad_request_response("watermark is not supported for SVG source images");
2372 } else {
2373 match watermark_source {
2374 WatermarkSource::Deferred(validated) => {
2375 match fetch_watermark(validated, config, request_deadline) {
2376 Ok(wm) => {
2377 record_watermark_transform();
2378 Some(wm)
2379 }
2380 Err(response) => return response,
2381 }
2382 }
2383 WatermarkSource::Ready(wm) => {
2384 record_watermark_transform();
2385 Some(wm)
2386 }
2387 WatermarkSource::None => None,
2388 }
2389 };
2390
2391 let had_watermark = watermark.is_some();
2392
2393 let transform_start = Instant::now();
2394 let mut request_obj = TransformRequest::new(artifact, options);
2395 request_obj.watermark = watermark;
2396 let result = if is_svg {
2397 match transform_svg(request_obj) {
2398 Ok(result) => result,
2399 Err(error) => {
2400 record_transform_error(&error);
2401 return transform_error_response(error);
2402 }
2403 }
2404 } else {
2405 match transform_raster(request_obj) {
2406 Ok(result) => result,
2407 Err(error) => {
2408 record_transform_error(&error);
2409 return transform_error_response(error);
2410 }
2411 }
2412 };
2413 record_transform_duration(result.artifact.media_type, transform_start);
2414
2415 for warning in &result.warnings {
2416 let msg = format!("truss: {warning}");
2417 if let Some(c) = cache
2418 && let Some(handler) = &c.log_handler
2419 {
2420 handler(&msg);
2421 } else {
2422 stderr_write(&msg);
2423 }
2424 }
2425
2426 let output = result.artifact;
2427
2428 if let Some(cache) = cache {
2429 cache.put(&cache_key, output.media_type, &output.bytes);
2430 }
2431
2432 let cache_hit_status = if cache.is_some() {
2433 CacheHitStatus::Miss
2434 } else {
2435 CacheHitStatus::Disabled
2436 };
2437
2438 let etag = build_image_etag(&output.bytes);
2439 let headers = build_image_response_headers(
2440 output.media_type,
2441 &etag,
2442 response_policy,
2443 negotiation_used,
2444 cache_hit_status,
2445 response_config.public_cache_control.max_age,
2446 response_config.public_cache_control.stale_while_revalidate,
2447 &config.custom_response_headers,
2448 );
2449
2450 if matches!(response_policy, ImageResponsePolicy::PublicGet)
2451 && if_none_match_matches(request.header("if-none-match"), &etag)
2452 {
2453 return HttpResponse::empty("304 Not Modified", headers);
2454 }
2455
2456 let mut response = HttpResponse::binary_with_headers(
2457 "200 OK",
2458 output.media_type.as_mime(),
2459 headers,
2460 output.bytes,
2461 );
2462 if had_watermark {
2463 response
2464 .headers
2465 .push(("X-Truss-Watermark".to_string(), "true".to_string()));
2466 }
2467 response
2468}
2469
2470#[cfg(test)]
2471mod tests {
2472 use serial_test::serial;
2473
2474 use super::config::DEFAULT_MAX_CONCURRENT_TRANSFORMS;
2475 use super::config::{
2476 DEFAULT_PUBLIC_MAX_AGE_SECONDS, DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS,
2477 parse_presets_from_env,
2478 };
2479 use super::http_parse::{
2480 DEFAULT_MAX_UPLOAD_BODY_BYTES, HttpRequest, find_header_terminator, read_request_body,
2481 read_request_headers, resolve_storage_path,
2482 };
2483 use super::multipart::parse_multipart_form_data;
2484 use super::remote::{PinnedResolver, prepare_remote_fetch_target};
2485 use super::response::auth_required_response;
2486 use super::response::{HttpResponse, bad_request_response};
2487 use super::{
2488 CacheHitStatus, DEFAULT_BIND_ADDR, ImageResponsePolicy, PublicSourceKind, ServerConfig,
2489 SignedUrlSource, TransformOptionsPayload, TransformSourcePayload, WatermarkSource,
2490 authorize_signed_request, bind_addr, build_image_etag, build_image_response_headers,
2491 canonical_query_without_signature, negotiate_output_format, parse_public_get_request,
2492 route_request, serve_once_with_config, sign_public_url, transform_source_bytes,
2493 };
2494 use crate::{
2495 Artifact, ArtifactMetadata, Fit, MediaType, RawArtifact, TransformOptions, sniff_artifact,
2496 };
2497 use hmac::{Hmac, Mac};
2498 use image::codecs::png::PngEncoder;
2499 use image::{ColorType, ImageEncoder, Rgba, RgbaImage};
2500 use sha2::Sha256;
2501 use std::collections::{BTreeMap, HashMap};
2502 use std::env;
2503 use std::fs;
2504 use std::io::{Cursor, Read, Write};
2505 use std::net::{SocketAddr, TcpListener, TcpStream};
2506 use std::path::{Path, PathBuf};
2507 use std::sync::atomic::Ordering;
2508 use std::thread;
2509 use std::time::{Duration, SystemTime, UNIX_EPOCH};
2510
2511 fn read_request<R: Read>(stream: &mut R) -> Result<HttpRequest, HttpResponse> {
2514 let partial = read_request_headers(stream, DEFAULT_MAX_UPLOAD_BODY_BYTES)?;
2515 read_request_body(stream, partial)
2516 }
2517
2518 fn png_bytes() -> Vec<u8> {
2519 let image = RgbaImage::from_pixel(4, 3, Rgba([10, 20, 30, 255]));
2520 let mut bytes = Vec::new();
2521 PngEncoder::new(&mut bytes)
2522 .write_image(&image, 4, 3, ColorType::Rgba8.into())
2523 .expect("encode png");
2524 bytes
2525 }
2526
2527 fn temp_dir(name: &str) -> PathBuf {
2528 let unique = SystemTime::now()
2529 .duration_since(UNIX_EPOCH)
2530 .expect("current time")
2531 .as_nanos();
2532 let path = std::env::temp_dir().join(format!("truss-server-{name}-{unique}"));
2533 fs::create_dir_all(&path).expect("create temp dir");
2534 path
2535 }
2536
2537 fn write_png(path: &Path) {
2538 fs::write(path, png_bytes()).expect("write png fixture");
2539 }
2540
2541 fn artifact_with_alpha(has_alpha: bool) -> Artifact {
2542 Artifact::new(
2543 png_bytes(),
2544 MediaType::Png,
2545 ArtifactMetadata {
2546 width: Some(4),
2547 height: Some(3),
2548 frame_count: 1,
2549 duration: None,
2550 has_alpha: Some(has_alpha),
2551 },
2552 )
2553 }
2554
2555 fn sign_public_query(
2556 method: &str,
2557 authority: &str,
2558 path: &str,
2559 query: &BTreeMap<String, String>,
2560 secret: &str,
2561 ) -> String {
2562 let canonical = format!(
2563 "{method}\n{authority}\n{path}\n{}",
2564 canonical_query_without_signature(query)
2565 );
2566 let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).expect("create hmac");
2567 mac.update(canonical.as_bytes());
2568 hex::encode(mac.finalize().into_bytes())
2569 }
2570
2571 type FixtureResponse = (String, Vec<(String, String)>, Vec<u8>);
2572
2573 fn read_fixture_request(stream: &mut TcpStream) {
2574 stream
2575 .set_nonblocking(false)
2576 .expect("configure fixture stream blocking mode");
2577 stream
2578 .set_read_timeout(Some(Duration::from_millis(100)))
2579 .expect("configure fixture stream timeout");
2580
2581 let deadline = std::time::Instant::now() + Duration::from_secs(2);
2582 let mut buffer = Vec::new();
2583 let mut chunk = [0_u8; 1024];
2584 let header_end = loop {
2585 let read = match stream.read(&mut chunk) {
2586 Ok(read) => read,
2587 Err(error)
2588 if matches!(
2589 error.kind(),
2590 std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut
2591 ) && std::time::Instant::now() < deadline =>
2592 {
2593 thread::sleep(Duration::from_millis(10));
2594 continue;
2595 }
2596 Err(error) => panic!("read fixture request headers: {error}"),
2597 };
2598 if read == 0 {
2599 panic!("fixture request ended before headers were complete");
2600 }
2601 buffer.extend_from_slice(&chunk[..read]);
2602 if let Some(index) = find_header_terminator(&buffer) {
2603 break index;
2604 }
2605 };
2606
2607 let header_text = std::str::from_utf8(&buffer[..header_end]).expect("fixture request utf8");
2608 let content_length = header_text
2609 .split("\r\n")
2610 .filter_map(|line| line.split_once(':'))
2611 .find_map(|(name, value)| {
2612 name.trim()
2613 .eq_ignore_ascii_case("content-length")
2614 .then_some(value.trim())
2615 })
2616 .map(|value| {
2617 value
2618 .parse::<usize>()
2619 .expect("fixture content-length should be numeric")
2620 })
2621 .unwrap_or(0);
2622
2623 let mut body = buffer.len().saturating_sub(header_end + 4);
2624 while body < content_length {
2625 let read = match stream.read(&mut chunk) {
2626 Ok(read) => read,
2627 Err(error)
2628 if matches!(
2629 error.kind(),
2630 std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut
2631 ) && std::time::Instant::now() < deadline =>
2632 {
2633 thread::sleep(Duration::from_millis(10));
2634 continue;
2635 }
2636 Err(error) => panic!("read fixture request body: {error}"),
2637 };
2638 if read == 0 {
2639 panic!("fixture request body was truncated");
2640 }
2641 body += read;
2642 }
2643 }
2644
2645 fn spawn_http_server(responses: Vec<FixtureResponse>) -> (String, thread::JoinHandle<()>) {
2646 let listener = TcpListener::bind("127.0.0.1:0").expect("bind fixture server");
2647 listener
2648 .set_nonblocking(true)
2649 .expect("configure fixture server");
2650 let addr = listener.local_addr().expect("fixture server addr");
2651 let url = format!("http://{addr}/image");
2652
2653 let handle = thread::spawn(move || {
2654 for (status, headers, body) in responses {
2655 let deadline = std::time::Instant::now() + Duration::from_secs(10);
2656 let mut accepted = None;
2657 while std::time::Instant::now() < deadline {
2658 match listener.accept() {
2659 Ok(stream) => {
2660 accepted = Some(stream);
2661 break;
2662 }
2663 Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => {
2664 thread::sleep(Duration::from_millis(10));
2665 }
2666 Err(error) => panic!("accept fixture request: {error}"),
2667 }
2668 }
2669
2670 let Some((mut stream, _)) = accepted else {
2671 break;
2672 };
2673 read_fixture_request(&mut stream);
2674 let mut header = format!(
2675 "HTTP/1.1 {status}\r\nContent-Length: {}\r\nConnection: close\r\n",
2676 body.len()
2677 );
2678 for (name, value) in headers {
2679 header.push_str(&format!("{name}: {value}\r\n"));
2680 }
2681 header.push_str("\r\n");
2682 stream
2683 .write_all(header.as_bytes())
2684 .expect("write fixture headers");
2685 stream.write_all(&body).expect("write fixture body");
2686 stream.flush().expect("flush fixture response");
2687 }
2688 });
2689
2690 (url, handle)
2691 }
2692
2693 fn transform_request(path: &str) -> HttpRequest {
2694 HttpRequest {
2695 method: "POST".to_string(),
2696 target: "/images:transform".to_string(),
2697 version: "HTTP/1.1".to_string(),
2698 headers: vec![
2699 ("authorization".to_string(), "Bearer secret".to_string()),
2700 ("content-type".to_string(), "application/json".to_string()),
2701 ],
2702 body: format!(
2703 "{{\"source\":{{\"kind\":\"path\",\"path\":\"{path}\"}},\"options\":{{\"format\":\"jpeg\"}}}}"
2704 )
2705 .into_bytes(),
2706 }
2707 }
2708
2709 fn transform_url_request(url: &str) -> HttpRequest {
2710 HttpRequest {
2711 method: "POST".to_string(),
2712 target: "/images:transform".to_string(),
2713 version: "HTTP/1.1".to_string(),
2714 headers: vec![
2715 ("authorization".to_string(), "Bearer secret".to_string()),
2716 ("content-type".to_string(), "application/json".to_string()),
2717 ],
2718 body: format!(
2719 "{{\"source\":{{\"kind\":\"url\",\"url\":\"{url}\"}},\"options\":{{\"format\":\"jpeg\"}}}}"
2720 )
2721 .into_bytes(),
2722 }
2723 }
2724
2725 fn upload_request(file_bytes: &[u8], options_json: Option<&str>) -> HttpRequest {
2726 let boundary = "truss-test-boundary";
2727 let mut body = Vec::new();
2728 body.extend_from_slice(
2729 format!(
2730 "--{boundary}\r\nContent-Disposition: form-data; name=\"file\"; filename=\"image.png\"\r\nContent-Type: image/png\r\n\r\n"
2731 )
2732 .as_bytes(),
2733 );
2734 body.extend_from_slice(file_bytes);
2735 body.extend_from_slice(b"\r\n");
2736
2737 if let Some(options_json) = options_json {
2738 body.extend_from_slice(
2739 format!(
2740 "--{boundary}\r\nContent-Disposition: form-data; name=\"options\"\r\nContent-Type: application/json\r\n\r\n{options_json}\r\n"
2741 )
2742 .as_bytes(),
2743 );
2744 }
2745
2746 body.extend_from_slice(format!("--{boundary}--\r\n").as_bytes());
2747
2748 HttpRequest {
2749 method: "POST".to_string(),
2750 target: "/images".to_string(),
2751 version: "HTTP/1.1".to_string(),
2752 headers: vec![
2753 ("authorization".to_string(), "Bearer secret".to_string()),
2754 (
2755 "content-type".to_string(),
2756 format!("multipart/form-data; boundary={boundary}"),
2757 ),
2758 ],
2759 body,
2760 }
2761 }
2762
2763 fn metrics_request(with_auth: bool) -> HttpRequest {
2764 let mut headers = Vec::new();
2765 if with_auth {
2766 headers.push(("authorization".to_string(), "Bearer secret".to_string()));
2767 }
2768
2769 HttpRequest {
2770 method: "GET".to_string(),
2771 target: "/metrics".to_string(),
2772 version: "HTTP/1.1".to_string(),
2773 headers,
2774 body: Vec::new(),
2775 }
2776 }
2777
2778 fn response_body(response: &HttpResponse) -> &str {
2779 std::str::from_utf8(&response.body).expect("utf8 response body")
2780 }
2781
2782 fn signed_public_request(target: &str, host: &str, secret: &str) -> HttpRequest {
2783 let (path, query) = target.split_once('?').expect("target has query");
2784 let mut query = url::form_urlencoded::parse(query.as_bytes())
2785 .into_owned()
2786 .collect::<BTreeMap<_, _>>();
2787 let signature = sign_public_query("GET", host, path, &query, secret);
2788 query.insert("signature".to_string(), signature);
2789 let final_query = url::form_urlencoded::Serializer::new(String::new())
2790 .extend_pairs(
2791 query
2792 .iter()
2793 .map(|(name, value)| (name.as_str(), value.as_str())),
2794 )
2795 .finish();
2796
2797 HttpRequest {
2798 method: "GET".to_string(),
2799 target: format!("{path}?{final_query}"),
2800 version: "HTTP/1.1".to_string(),
2801 headers: vec![("host".to_string(), host.to_string())],
2802 body: Vec::new(),
2803 }
2804 }
2805
2806 #[test]
2807 fn uses_default_bind_addr_when_env_is_missing() {
2808 unsafe { std::env::remove_var("TRUSS_BIND_ADDR") };
2809 assert_eq!(bind_addr(), DEFAULT_BIND_ADDR);
2810 }
2811
2812 #[test]
2813 fn authorize_signed_request_accepts_a_valid_signature() {
2814 let request = signed_public_request(
2815 "/images/by-path?path=%2Fimage.png&keyId=public-dev&expires=4102444800&format=jpeg",
2816 "assets.example.com",
2817 "secret-value",
2818 );
2819 let query = super::auth::parse_query_params(&request).expect("parse query");
2820 let config = ServerConfig::new(temp_dir("public-auth"), None)
2821 .with_signed_url_credentials("public-dev", "secret-value");
2822
2823 authorize_signed_request(&request, &query, &config).expect("signed auth should pass");
2824 }
2825
2826 #[test]
2827 fn authorize_signed_request_uses_public_base_url_authority() {
2828 let request = signed_public_request(
2829 "/images/by-path?path=%2Fimage.png&keyId=public-dev&expires=4102444800&format=jpeg",
2830 "cdn.example.com",
2831 "secret-value",
2832 );
2833 let query = super::auth::parse_query_params(&request).expect("parse query");
2834 let mut config = ServerConfig::new(temp_dir("public-authority"), None)
2835 .with_signed_url_credentials("public-dev", "secret-value");
2836 config.public_base_url = Some("https://cdn.example.com".to_string());
2837
2838 authorize_signed_request(&request, &query, &config).expect("signed auth should pass");
2839 }
2840
2841 #[test]
2842 fn negotiate_output_format_prefers_alpha_safe_formats_for_transparent_inputs() {
2843 let format = negotiate_output_format(
2844 Some("image/jpeg,image/png"),
2845 &artifact_with_alpha(true),
2846 &[],
2847 )
2848 .expect("negotiate output format")
2849 .expect("resolved output format");
2850
2851 assert_eq!(format, MediaType::Png);
2852 }
2853
2854 #[test]
2855 fn negotiate_output_format_prefers_avif_for_wildcard_accept() {
2856 let format = negotiate_output_format(Some("image/*"), &artifact_with_alpha(false), &[])
2857 .expect("negotiate output format")
2858 .expect("resolved output format");
2859
2860 assert_eq!(format, MediaType::Avif);
2861 }
2862
2863 #[test]
2864 fn build_image_response_headers_include_cache_and_safety_metadata() {
2865 let headers = build_image_response_headers(
2866 MediaType::Webp,
2867 &build_image_etag(b"demo"),
2868 ImageResponsePolicy::PublicGet,
2869 true,
2870 CacheHitStatus::Disabled,
2871 DEFAULT_PUBLIC_MAX_AGE_SECONDS,
2872 DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS,
2873 &[],
2874 );
2875
2876 assert!(headers.contains(&(
2877 "Cache-Control".to_string(),
2878 "public, max-age=3600, stale-while-revalidate=60".to_string()
2879 )));
2880 assert!(headers.contains(&("Vary".to_string(), "Accept".to_string())));
2881 assert!(headers.contains(&("X-Content-Type-Options".to_string(), "nosniff".to_string())));
2882 assert!(headers.contains(&(
2883 "Content-Disposition".to_string(),
2884 "inline; filename=\"truss.webp\"".to_string()
2885 )));
2886 assert!(headers.contains(&(
2887 "Cache-Status".to_string(),
2888 "\"truss\"; fwd=miss".to_string()
2889 )));
2890 }
2891
2892 #[test]
2893 fn build_image_response_headers_include_csp_sandbox_for_svg() {
2894 let headers = build_image_response_headers(
2895 MediaType::Svg,
2896 &build_image_etag(b"svg-data"),
2897 ImageResponsePolicy::PublicGet,
2898 true,
2899 CacheHitStatus::Disabled,
2900 DEFAULT_PUBLIC_MAX_AGE_SECONDS,
2901 DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS,
2902 &[],
2903 );
2904
2905 assert!(headers.contains(&("Content-Security-Policy".to_string(), "sandbox".to_string())));
2906 }
2907
2908 #[test]
2909 fn build_image_response_headers_omit_csp_sandbox_for_raster() {
2910 let headers = build_image_response_headers(
2911 MediaType::Png,
2912 &build_image_etag(b"png-data"),
2913 ImageResponsePolicy::PublicGet,
2914 true,
2915 CacheHitStatus::Disabled,
2916 DEFAULT_PUBLIC_MAX_AGE_SECONDS,
2917 DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS,
2918 &[],
2919 );
2920
2921 assert!(!headers.iter().any(|(k, _)| *k == "Content-Security-Policy"));
2922 }
2923
2924 #[test]
2925 fn backpressure_rejects_when_at_capacity() {
2926 let config = ServerConfig::new(std::env::temp_dir(), None);
2927 config
2928 .transforms_in_flight
2929 .store(DEFAULT_MAX_CONCURRENT_TRANSFORMS, Ordering::Relaxed);
2930
2931 let request = HttpRequest {
2932 method: "POST".to_string(),
2933 target: "/transform".to_string(),
2934 version: "HTTP/1.1".to_string(),
2935 headers: Vec::new(),
2936 body: Vec::new(),
2937 };
2938
2939 let png_bytes = {
2940 let mut buf = Vec::new();
2941 let encoder = image::codecs::png::PngEncoder::new(&mut buf);
2942 encoder
2943 .write_image(&[255, 0, 0, 255], 1, 1, image::ExtendedColorType::Rgba8)
2944 .unwrap();
2945 buf
2946 };
2947
2948 let response = transform_source_bytes(
2949 png_bytes,
2950 TransformOptions::default(),
2951 None,
2952 &request,
2953 ImageResponsePolicy::PrivateTransform,
2954 &config,
2955 WatermarkSource::None,
2956 None,
2957 None,
2958 );
2959
2960 assert!(response.status.contains("503"));
2961
2962 assert_eq!(
2963 config.transforms_in_flight.load(Ordering::Relaxed),
2964 DEFAULT_MAX_CONCURRENT_TRANSFORMS
2965 );
2966 }
2967
2968 #[test]
2969 fn backpressure_rejects_with_custom_concurrency_limit() {
2970 let custom_limit = 2u64;
2971 let mut config = ServerConfig::new(std::env::temp_dir(), None);
2972 config.max_concurrent_transforms = custom_limit;
2973 config
2974 .transforms_in_flight
2975 .store(custom_limit, Ordering::Relaxed);
2976
2977 let request = HttpRequest {
2978 method: "POST".to_string(),
2979 target: "/transform".to_string(),
2980 version: "HTTP/1.1".to_string(),
2981 headers: Vec::new(),
2982 body: Vec::new(),
2983 };
2984
2985 let png_bytes = {
2986 let mut buf = Vec::new();
2987 let encoder = image::codecs::png::PngEncoder::new(&mut buf);
2988 encoder
2989 .write_image(&[255, 0, 0, 255], 1, 1, image::ExtendedColorType::Rgba8)
2990 .unwrap();
2991 buf
2992 };
2993
2994 let response = transform_source_bytes(
2995 png_bytes,
2996 TransformOptions::default(),
2997 None,
2998 &request,
2999 ImageResponsePolicy::PrivateTransform,
3000 &config,
3001 WatermarkSource::None,
3002 None,
3003 None,
3004 );
3005
3006 assert!(response.status.contains("503"));
3007 }
3008
3009 #[test]
3010 fn compute_cache_key_is_deterministic() {
3011 let opts = TransformOptions {
3012 width: Some(300),
3013 height: Some(200),
3014 format: Some(MediaType::Webp),
3015 ..TransformOptions::default()
3016 };
3017 let key1 = super::cache::compute_cache_key("source-abc", &opts, None, None);
3018 let key2 = super::cache::compute_cache_key("source-abc", &opts, None, None);
3019 assert_eq!(key1, key2);
3020 assert_eq!(key1.len(), 64);
3021 }
3022
3023 #[test]
3024 fn compute_cache_key_differs_for_different_options() {
3025 let opts1 = TransformOptions {
3026 width: Some(300),
3027 format: Some(MediaType::Webp),
3028 ..TransformOptions::default()
3029 };
3030 let opts2 = TransformOptions {
3031 width: Some(400),
3032 format: Some(MediaType::Webp),
3033 ..TransformOptions::default()
3034 };
3035 let key1 = super::cache::compute_cache_key("same-source", &opts1, None, None);
3036 let key2 = super::cache::compute_cache_key("same-source", &opts2, None, None);
3037 assert_ne!(key1, key2);
3038 }
3039
3040 #[test]
3041 fn compute_cache_key_includes_accept_when_present() {
3042 let opts = TransformOptions::default();
3043 let key_no_accept = super::cache::compute_cache_key("src", &opts, None, None);
3044 let key_with_accept =
3045 super::cache::compute_cache_key("src", &opts, Some("image/webp"), None);
3046 assert_ne!(key_no_accept, key_with_accept);
3047 }
3048
3049 #[test]
3050 fn transform_cache_put_and_get_round_trips() {
3051 let dir = tempfile::tempdir().expect("create tempdir");
3052 let cache = super::cache::TransformCache::new(dir.path().to_path_buf());
3053
3054 cache.put(
3055 "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890",
3056 MediaType::Png,
3057 b"png-data",
3058 );
3059 let result = cache.get("abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890");
3060
3061 match result {
3062 super::cache::CacheLookup::Hit {
3063 media_type, body, ..
3064 } => {
3065 assert_eq!(media_type, MediaType::Png);
3066 assert_eq!(body, b"png-data");
3067 }
3068 super::cache::CacheLookup::Miss => panic!("expected cache hit"),
3069 }
3070 }
3071
3072 #[test]
3073 fn transform_cache_miss_for_unknown_key() {
3074 let dir = tempfile::tempdir().expect("create tempdir");
3075 let cache = super::cache::TransformCache::new(dir.path().to_path_buf());
3076
3077 let result = cache.get("0000001234567890abcdef1234567890abcdef1234567890abcdef1234567890");
3078 assert!(matches!(result, super::cache::CacheLookup::Miss));
3079 }
3080
3081 #[test]
3082 fn transform_cache_uses_sharded_layout() {
3083 let dir = tempfile::tempdir().expect("create tempdir");
3084 let cache = super::cache::TransformCache::new(dir.path().to_path_buf());
3085
3086 let key = "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890";
3087 cache.put(key, MediaType::Jpeg, b"jpeg-data");
3088
3089 let expected = dir.path().join("ab").join("cd").join("ef").join(key);
3090 assert!(
3091 expected.exists(),
3092 "sharded file should exist at {expected:?}"
3093 );
3094 }
3095
3096 #[test]
3097 fn transform_cache_expired_entry_is_miss() {
3098 let dir = tempfile::tempdir().expect("create tempdir");
3099 let mut cache = super::cache::TransformCache::new(dir.path().to_path_buf());
3100 cache.ttl = Duration::from_secs(0);
3101
3102 let key = "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890";
3103 cache.put(key, MediaType::Png, b"data");
3104
3105 std::thread::sleep(Duration::from_millis(10));
3106
3107 let result = cache.get(key);
3108 assert!(matches!(result, super::cache::CacheLookup::Miss));
3109 }
3110
3111 #[test]
3112 fn transform_cache_handles_corrupted_entry_as_miss() {
3113 let dir = tempfile::tempdir().expect("create tempdir");
3114 let cache = super::cache::TransformCache::new(dir.path().to_path_buf());
3115
3116 let key = "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890";
3117 let path = cache.entry_path(key);
3118 fs::create_dir_all(path.parent().unwrap()).unwrap();
3119 fs::write(&path, b"corrupted-data-without-header").unwrap();
3120
3121 let result = cache.get(key);
3122 assert!(matches!(result, super::cache::CacheLookup::Miss));
3123 }
3124
3125 #[test]
3126 fn cache_status_header_reflects_hit() {
3127 let headers = build_image_response_headers(
3128 MediaType::Png,
3129 &build_image_etag(b"data"),
3130 ImageResponsePolicy::PublicGet,
3131 false,
3132 CacheHitStatus::Hit,
3133 DEFAULT_PUBLIC_MAX_AGE_SECONDS,
3134 DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS,
3135 &[],
3136 );
3137 assert!(headers.contains(&("Cache-Status".to_string(), "\"truss\"; hit".to_string())));
3138 }
3139
3140 #[test]
3141 fn cache_status_header_reflects_miss() {
3142 let headers = build_image_response_headers(
3143 MediaType::Png,
3144 &build_image_etag(b"data"),
3145 ImageResponsePolicy::PublicGet,
3146 false,
3147 CacheHitStatus::Miss,
3148 DEFAULT_PUBLIC_MAX_AGE_SECONDS,
3149 DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS,
3150 &[],
3151 );
3152 assert!(headers.contains(&(
3153 "Cache-Status".to_string(),
3154 "\"truss\"; fwd=miss".to_string()
3155 )));
3156 }
3157
3158 #[test]
3159 fn origin_cache_put_and_get_round_trips() {
3160 let dir = tempfile::tempdir().expect("create tempdir");
3161 let cache = super::cache::OriginCache::new(dir.path());
3162
3163 cache.put("src", "https://example.com/image.png", b"raw-source-bytes");
3164 let result = cache.get("src", "https://example.com/image.png");
3165
3166 assert_eq!(result.as_deref(), Some(b"raw-source-bytes".as_ref()));
3167 }
3168
3169 #[test]
3170 fn origin_cache_miss_for_unknown_url() {
3171 let dir = tempfile::tempdir().expect("create tempdir");
3172 let cache = super::cache::OriginCache::new(dir.path());
3173
3174 assert!(
3175 cache
3176 .get("src", "https://unknown.example.com/missing.png")
3177 .is_none()
3178 );
3179 }
3180
3181 #[test]
3182 fn origin_cache_expired_entry_is_none() {
3183 let dir = tempfile::tempdir().expect("create tempdir");
3184 let mut cache = super::cache::OriginCache::new(dir.path());
3185 cache.ttl = Duration::from_secs(0);
3186
3187 cache.put("src", "https://example.com/img.png", b"data");
3188 std::thread::sleep(Duration::from_millis(10));
3189
3190 assert!(cache.get("src", "https://example.com/img.png").is_none());
3191 }
3192
3193 #[test]
3194 fn origin_cache_uses_origin_subdirectory() {
3195 let dir = tempfile::tempdir().expect("create tempdir");
3196 let cache = super::cache::OriginCache::new(dir.path());
3197
3198 cache.put("src", "https://example.com/test.png", b"bytes");
3199
3200 let origin_dir = dir.path().join("origin");
3201 assert!(origin_dir.exists(), "origin subdirectory should exist");
3202 }
3203
3204 #[test]
3205 fn sign_public_url_builds_a_signed_path_url() {
3206 let url = sign_public_url(
3207 "https://cdn.example.com",
3208 SignedUrlSource::Path {
3209 path: "/image.png".to_string(),
3210 version: Some("v1".to_string()),
3211 },
3212 &crate::TransformOptions {
3213 format: Some(MediaType::Jpeg),
3214 width: Some(320),
3215 ..crate::TransformOptions::default()
3216 },
3217 "public-dev",
3218 "secret-value",
3219 4_102_444_800,
3220 None,
3221 None,
3222 )
3223 .expect("sign public URL");
3224
3225 assert!(url.starts_with("https://cdn.example.com/images/by-path?"));
3226 assert!(url.contains("path=%2Fimage.png"));
3227 assert!(url.contains("version=v1"));
3228 assert!(url.contains("width=320"));
3229 assert!(url.contains("format=jpeg"));
3230 assert!(url.contains("keyId=public-dev"));
3231 assert!(url.contains("expires=4102444800"));
3232 assert!(url.contains("signature="));
3233 }
3234
3235 #[test]
3236 fn parse_public_get_request_rejects_unknown_query_parameters() {
3237 let query = BTreeMap::from([
3238 ("path".to_string(), "/image.png".to_string()),
3239 ("keyId".to_string(), "public-dev".to_string()),
3240 ("expires".to_string(), "4102444800".to_string()),
3241 ("signature".to_string(), "deadbeef".to_string()),
3242 ("unexpected".to_string(), "value".to_string()),
3243 ]);
3244
3245 let config = ServerConfig::new(temp_dir("parse-query"), None);
3246 let response = parse_public_get_request(&query, PublicSourceKind::Path, &config)
3247 .expect_err("unknown query should fail");
3248
3249 assert_eq!(response.status, "400 Bad Request");
3250 assert!(response_body(&response).contains("is not supported"));
3251 }
3252
3253 #[test]
3254 fn parse_public_get_request_resolves_preset() {
3255 let mut presets = HashMap::new();
3256 presets.insert(
3257 "thumbnail".to_string(),
3258 TransformOptionsPayload {
3259 width: Some(150),
3260 height: Some(150),
3261 fit: Some("cover".to_string()),
3262 ..TransformOptionsPayload::default()
3263 },
3264 );
3265 let config = ServerConfig::new(temp_dir("preset"), None).with_presets(presets);
3266
3267 let query = BTreeMap::from([
3268 ("path".to_string(), "/image.png".to_string()),
3269 ("preset".to_string(), "thumbnail".to_string()),
3270 ]);
3271 let (_, options, _) =
3272 parse_public_get_request(&query, PublicSourceKind::Path, &config).unwrap();
3273
3274 assert_eq!(options.width, Some(150));
3275 assert_eq!(options.height, Some(150));
3276 assert_eq!(options.fit, Some(Fit::Cover));
3277 }
3278
3279 #[test]
3280 fn parse_public_get_request_preset_with_override() {
3281 let mut presets = HashMap::new();
3282 presets.insert(
3283 "thumbnail".to_string(),
3284 TransformOptionsPayload {
3285 width: Some(150),
3286 height: Some(150),
3287 fit: Some("cover".to_string()),
3288 format: Some("webp".to_string()),
3289 ..TransformOptionsPayload::default()
3290 },
3291 );
3292 let config = ServerConfig::new(temp_dir("preset-override"), None).with_presets(presets);
3293
3294 let query = BTreeMap::from([
3295 ("path".to_string(), "/image.png".to_string()),
3296 ("preset".to_string(), "thumbnail".to_string()),
3297 ("width".to_string(), "200".to_string()),
3298 ("format".to_string(), "jpeg".to_string()),
3299 ]);
3300 let (_, options, _) =
3301 parse_public_get_request(&query, PublicSourceKind::Path, &config).unwrap();
3302
3303 assert_eq!(options.width, Some(200));
3304 assert_eq!(options.height, Some(150));
3305 assert_eq!(options.format, Some(MediaType::Jpeg));
3306 }
3307
3308 #[test]
3309 fn parse_public_get_request_rejects_unknown_preset() {
3310 let config = ServerConfig::new(temp_dir("preset-unknown"), None);
3311
3312 let query = BTreeMap::from([
3313 ("path".to_string(), "/image.png".to_string()),
3314 ("preset".to_string(), "nonexistent".to_string()),
3315 ]);
3316 let response = parse_public_get_request(&query, PublicSourceKind::Path, &config)
3317 .expect_err("unknown preset should fail");
3318
3319 assert_eq!(response.status, "400 Bad Request");
3320 assert!(response_body(&response).contains("unknown preset"));
3321 }
3322
3323 #[test]
3324 fn sign_public_url_includes_preset_in_signed_url() {
3325 let url = sign_public_url(
3326 "https://cdn.example.com",
3327 SignedUrlSource::Path {
3328 path: "/image.png".to_string(),
3329 version: None,
3330 },
3331 &crate::TransformOptions::default(),
3332 "public-dev",
3333 "secret-value",
3334 4_102_444_800,
3335 None,
3336 Some("thumbnail"),
3337 )
3338 .expect("sign public URL with preset");
3339
3340 assert!(url.contains("preset=thumbnail"));
3341 assert!(url.contains("signature="));
3342 }
3343
3344 #[test]
3345 #[serial]
3346 fn parse_presets_from_env_parses_json() {
3347 unsafe {
3348 env::set_var(
3349 "TRUSS_PRESETS",
3350 r#"{"thumb":{"width":100,"height":100,"fit":"cover"}}"#,
3351 );
3352 env::remove_var("TRUSS_PRESETS_FILE");
3353 }
3354 let (presets, file_path) = parse_presets_from_env().unwrap();
3355 unsafe {
3356 env::remove_var("TRUSS_PRESETS");
3357 }
3358
3359 assert!(file_path.is_none());
3360 assert_eq!(presets.len(), 1);
3361 let thumb = presets.get("thumb").unwrap();
3362 assert_eq!(thumb.width, Some(100));
3363 assert_eq!(thumb.height, Some(100));
3364 assert_eq!(thumb.fit.as_deref(), Some("cover"));
3365 }
3366
3367 #[test]
3368 fn prepare_remote_fetch_target_pins_the_validated_netloc() {
3369 let target = prepare_remote_fetch_target(
3370 "http://1.1.1.1/image.png",
3371 &ServerConfig::new(temp_dir("pin"), Some("secret".to_string())),
3372 )
3373 .expect("prepare remote target");
3374
3375 assert_eq!(target.netloc, "1.1.1.1:80");
3376 assert_eq!(target.addrs, vec![SocketAddr::from(([1, 1, 1, 1], 80))]);
3377 }
3378
3379 #[test]
3380 fn pinned_resolver_rejects_unexpected_netlocs() {
3381 use ureq::unversioned::resolver::Resolver;
3382
3383 let resolver = PinnedResolver {
3384 expected_netloc: "example.com:443".to_string(),
3385 addrs: vec![SocketAddr::from(([93, 184, 216, 34], 443))],
3386 };
3387
3388 let config = ureq::config::Config::builder().build();
3389 let timeout = ureq::unversioned::transport::NextTimeout {
3390 after: ureq::unversioned::transport::time::Duration::Exact(
3391 std::time::Duration::from_secs(30),
3392 ),
3393 reason: ureq::Timeout::Resolve,
3394 };
3395
3396 let uri: ureq::http::Uri = "https://example.com/path".parse().unwrap();
3397 let result = resolver
3398 .resolve(&uri, &config, timeout)
3399 .expect("resolve expected netloc");
3400 assert_eq!(&result[..], &[SocketAddr::from(([93, 184, 216, 34], 443))]);
3401
3402 let bad_uri: ureq::http::Uri = "https://proxy.example:8080/path".parse().unwrap();
3403 let timeout2 = ureq::unversioned::transport::NextTimeout {
3404 after: ureq::unversioned::transport::time::Duration::Exact(
3405 std::time::Duration::from_secs(30),
3406 ),
3407 reason: ureq::Timeout::Resolve,
3408 };
3409 let error = resolver
3410 .resolve(&bad_uri, &config, timeout2)
3411 .expect_err("unexpected netloc should fail");
3412 assert!(matches!(error, ureq::Error::HostNotFound));
3413 }
3414
3415 #[test]
3416 fn health_live_returns_status_service_version() {
3417 let request = HttpRequest {
3418 method: "GET".to_string(),
3419 target: "/health/live".to_string(),
3420 version: "HTTP/1.1".to_string(),
3421 headers: Vec::new(),
3422 body: Vec::new(),
3423 };
3424
3425 let response = route_request(request, &ServerConfig::new(temp_dir("live"), None));
3426
3427 assert_eq!(response.status, "200 OK");
3428 let body: serde_json::Value =
3429 serde_json::from_slice(&response.body).expect("parse live body");
3430 assert_eq!(body["status"], "ok");
3431 assert_eq!(body["service"], "truss");
3432 assert_eq!(body["version"], env!("CARGO_PKG_VERSION"));
3433 }
3434
3435 #[test]
3436 fn health_ready_returns_ok_when_storage_exists() {
3437 let storage = temp_dir("ready-ok");
3438 let request = HttpRequest {
3439 method: "GET".to_string(),
3440 target: "/health/ready".to_string(),
3441 version: "HTTP/1.1".to_string(),
3442 headers: Vec::new(),
3443 body: Vec::new(),
3444 };
3445
3446 let response = route_request(request, &ServerConfig::new(storage, None));
3447
3448 assert_eq!(response.status, "200 OK");
3449 let body: serde_json::Value =
3450 serde_json::from_slice(&response.body).expect("parse ready body");
3451 assert_eq!(body["status"], "ok");
3452 let checks = body["checks"].as_array().expect("checks array");
3453 assert!(
3454 checks
3455 .iter()
3456 .any(|c| c["name"] == "storageRoot" && c["status"] == "ok")
3457 );
3458 }
3459
3460 #[test]
3461 fn health_ready_returns_503_when_storage_missing() {
3462 let request = HttpRequest {
3463 method: "GET".to_string(),
3464 target: "/health/ready".to_string(),
3465 version: "HTTP/1.1".to_string(),
3466 headers: Vec::new(),
3467 body: Vec::new(),
3468 };
3469
3470 let config = ServerConfig::new(PathBuf::from("/nonexistent-truss-test-dir"), None);
3471 let response = route_request(request, &config);
3472
3473 assert_eq!(response.status, "503 Service Unavailable");
3474 let body: serde_json::Value =
3475 serde_json::from_slice(&response.body).expect("parse ready fail body");
3476 assert_eq!(body["status"], "fail");
3477 let checks = body["checks"].as_array().expect("checks array");
3478 assert!(
3479 checks
3480 .iter()
3481 .any(|c| c["name"] == "storageRoot" && c["status"] == "fail")
3482 );
3483 }
3484
3485 #[test]
3486 fn health_ready_returns_503_when_cache_root_missing() {
3487 let storage = temp_dir("ready-cache-fail");
3488 let mut config = ServerConfig::new(storage, None);
3489 config.cache_root = Some(PathBuf::from("/nonexistent-truss-cache-dir"));
3490
3491 let request = HttpRequest {
3492 method: "GET".to_string(),
3493 target: "/health/ready".to_string(),
3494 version: "HTTP/1.1".to_string(),
3495 headers: Vec::new(),
3496 body: Vec::new(),
3497 };
3498
3499 let response = route_request(request, &config);
3500
3501 assert_eq!(response.status, "503 Service Unavailable");
3502 let body: serde_json::Value =
3503 serde_json::from_slice(&response.body).expect("parse ready cache body");
3504 assert_eq!(body["status"], "fail");
3505 let checks = body["checks"].as_array().expect("checks array");
3506 assert!(
3507 checks
3508 .iter()
3509 .any(|c| c["name"] == "cacheRoot" && c["status"] == "fail")
3510 );
3511 }
3512
3513 #[test]
3514 fn health_returns_comprehensive_diagnostic() {
3515 let storage = temp_dir("health-diag");
3516 let request = HttpRequest {
3517 method: "GET".to_string(),
3518 target: "/health".to_string(),
3519 version: "HTTP/1.1".to_string(),
3520 headers: Vec::new(),
3521 body: Vec::new(),
3522 };
3523
3524 let response = route_request(request, &ServerConfig::new(storage, None));
3525
3526 assert_eq!(response.status, "200 OK");
3527 let body: serde_json::Value =
3528 serde_json::from_slice(&response.body).expect("parse health body");
3529 assert_eq!(body["status"], "ok");
3530 assert_eq!(body["service"], "truss");
3531 assert_eq!(body["version"], env!("CARGO_PKG_VERSION"));
3532 assert!(body["uptimeSeconds"].is_u64());
3533 assert!(body["checks"].is_array());
3534 }
3535
3536 #[test]
3537 fn unknown_path_returns_not_found() {
3538 let request = HttpRequest {
3539 method: "GET".to_string(),
3540 target: "/unknown".to_string(),
3541 version: "HTTP/1.1".to_string(),
3542 headers: Vec::new(),
3543 body: Vec::new(),
3544 };
3545
3546 let response = route_request(request, &ServerConfig::new(temp_dir("not-found"), None));
3547
3548 assert_eq!(response.status, "404 Not Found");
3549 assert_eq!(response.content_type, Some("application/problem+json"));
3550 let body = response_body(&response);
3551 assert!(body.contains("\"type\":\"about:blank\""));
3552 assert!(body.contains("\"title\":\"Not Found\""));
3553 assert!(body.contains("\"status\":404"));
3554 assert!(body.contains("not found"));
3555 }
3556
3557 #[test]
3558 fn transform_endpoint_requires_authentication() {
3559 let storage_root = temp_dir("auth");
3560 write_png(&storage_root.join("image.png"));
3561 let mut request = transform_request("/image.png");
3562 request.headers.retain(|(name, _)| name != "authorization");
3563
3564 let response = route_request(
3565 request,
3566 &ServerConfig::new(storage_root, Some("secret".to_string())),
3567 );
3568
3569 assert_eq!(response.status, "401 Unauthorized");
3570 assert!(response_body(&response).contains("authorization required"));
3571 }
3572
3573 #[test]
3574 fn transform_endpoint_returns_service_unavailable_without_configured_token() {
3575 let storage_root = temp_dir("token");
3576 write_png(&storage_root.join("image.png"));
3577
3578 let response = route_request(
3579 transform_request("/image.png"),
3580 &ServerConfig::new(storage_root, None),
3581 );
3582
3583 assert_eq!(response.status, "503 Service Unavailable");
3584 assert!(response_body(&response).contains("bearer token is not configured"));
3585 }
3586
3587 #[test]
3588 fn transform_endpoint_transforms_a_path_source() {
3589 let storage_root = temp_dir("transform");
3590 write_png(&storage_root.join("image.png"));
3591
3592 let response = route_request(
3593 transform_request("/image.png"),
3594 &ServerConfig::new(storage_root, Some("secret".to_string())),
3595 );
3596
3597 assert_eq!(response.status, "200 OK");
3598 assert_eq!(response.content_type, Some("image/jpeg"));
3599
3600 let artifact = sniff_artifact(RawArtifact::new(response.body, None)).expect("sniff output");
3601 assert_eq!(artifact.media_type, MediaType::Jpeg);
3602 assert_eq!(artifact.metadata.width, Some(4));
3603 assert_eq!(artifact.metadata.height, Some(3));
3604 }
3605
3606 #[test]
3607 fn transform_endpoint_rejects_private_url_sources_by_default() {
3608 let response = route_request(
3609 transform_url_request("http://127.0.0.1:8080/image.png"),
3610 &ServerConfig::new(temp_dir("url-blocked"), Some("secret".to_string())),
3611 );
3612
3613 assert_eq!(response.status, "403 Forbidden");
3614 assert!(response_body(&response).contains("port is not allowed"));
3615 }
3616
3617 #[test]
3618 fn transform_endpoint_transforms_a_url_source_when_insecure_allowance_is_enabled() {
3619 let (url, handle) = spawn_http_server(vec![(
3620 "200 OK".to_string(),
3621 vec![("Content-Type".to_string(), "image/png".to_string())],
3622 png_bytes(),
3623 )]);
3624
3625 let response = route_request(
3626 transform_url_request(&url),
3627 &ServerConfig::new(temp_dir("url"), Some("secret".to_string()))
3628 .with_insecure_url_sources(true),
3629 );
3630
3631 handle.join().expect("join fixture server");
3632
3633 assert_eq!(response.status, "200 OK");
3634 assert_eq!(response.content_type, Some("image/jpeg"));
3635
3636 let artifact = sniff_artifact(RawArtifact::new(response.body, None)).expect("sniff output");
3637 assert_eq!(artifact.media_type, MediaType::Jpeg);
3638 }
3639
3640 #[test]
3641 fn transform_endpoint_follows_remote_redirects() {
3642 let (redirect_url, handle) = spawn_http_server(vec![
3643 (
3644 "302 Found".to_string(),
3645 vec![("Location".to_string(), "/final-image".to_string())],
3646 Vec::new(),
3647 ),
3648 (
3649 "200 OK".to_string(),
3650 vec![("Content-Type".to_string(), "image/png".to_string())],
3651 png_bytes(),
3652 ),
3653 ]);
3654
3655 let response = route_request(
3656 transform_url_request(&redirect_url),
3657 &ServerConfig::new(temp_dir("redirect"), Some("secret".to_string()))
3658 .with_insecure_url_sources(true),
3659 );
3660
3661 handle.join().expect("join fixture server");
3662
3663 assert_eq!(response.status, "200 OK");
3664 let artifact = sniff_artifact(RawArtifact::new(response.body, None)).expect("sniff output");
3665 assert_eq!(artifact.media_type, MediaType::Jpeg);
3666 }
3667
3668 #[test]
3669 fn upload_endpoint_transforms_uploaded_file() {
3670 let response = route_request(
3671 upload_request(&png_bytes(), Some(r#"{"format":"jpeg"}"#)),
3672 &ServerConfig::new(temp_dir("upload"), Some("secret".to_string())),
3673 );
3674
3675 assert_eq!(response.status, "200 OK");
3676 assert_eq!(response.content_type, Some("image/jpeg"));
3677
3678 let artifact = sniff_artifact(RawArtifact::new(response.body, None)).expect("sniff output");
3679 assert_eq!(artifact.media_type, MediaType::Jpeg);
3680 }
3681
3682 #[test]
3683 fn upload_endpoint_requires_a_file_field() {
3684 let boundary = "truss-test-boundary";
3685 let request = HttpRequest {
3686 method: "POST".to_string(),
3687 target: "/images".to_string(),
3688 version: "HTTP/1.1".to_string(),
3689 headers: vec![
3690 ("authorization".to_string(), "Bearer secret".to_string()),
3691 (
3692 "content-type".to_string(),
3693 format!("multipart/form-data; boundary={boundary}"),
3694 ),
3695 ],
3696 body: format!(
3697 "--{boundary}\r\nContent-Disposition: form-data; name=\"options\"\r\nContent-Type: application/json\r\n\r\n{{\"format\":\"jpeg\"}}\r\n--{boundary}--\r\n"
3698 )
3699 .into_bytes(),
3700 };
3701
3702 let response = route_request(
3703 request,
3704 &ServerConfig::new(temp_dir("upload-missing-file"), Some("secret".to_string())),
3705 );
3706
3707 assert_eq!(response.status, "400 Bad Request");
3708 assert!(response_body(&response).contains("requires a `file` field"));
3709 }
3710
3711 #[test]
3712 fn upload_endpoint_rejects_non_multipart_content_type() {
3713 let request = HttpRequest {
3714 method: "POST".to_string(),
3715 target: "/images".to_string(),
3716 version: "HTTP/1.1".to_string(),
3717 headers: vec![
3718 ("authorization".to_string(), "Bearer secret".to_string()),
3719 ("content-type".to_string(), "application/json".to_string()),
3720 ],
3721 body: br#"{"file":"not-really-json"}"#.to_vec(),
3722 };
3723
3724 let response = route_request(
3725 request,
3726 &ServerConfig::new(temp_dir("upload-content-type"), Some("secret".to_string())),
3727 );
3728
3729 assert_eq!(response.status, "415 Unsupported Media Type");
3730 assert!(response_body(&response).contains("multipart/form-data"));
3731 }
3732
3733 #[test]
3734 fn parse_upload_request_extracts_file_and_options() {
3735 let request = upload_request(&png_bytes(), Some(r#"{"width":8,"format":"jpeg"}"#));
3736 let boundary =
3737 super::multipart::parse_multipart_boundary(&request).expect("parse boundary");
3738 let (file_bytes, options, _watermark) =
3739 super::multipart::parse_upload_request(&request.body, &boundary)
3740 .expect("parse upload body");
3741
3742 assert_eq!(file_bytes, png_bytes());
3743 assert_eq!(options.width, Some(8));
3744 assert_eq!(options.format, Some(MediaType::Jpeg));
3745 }
3746
3747 #[test]
3748 fn metrics_endpoint_does_not_require_authentication() {
3749 let response = route_request(
3750 metrics_request(false),
3751 &ServerConfig::new(temp_dir("metrics-no-auth"), Some("secret".to_string())),
3752 );
3753
3754 assert_eq!(response.status, "200 OK");
3755 }
3756
3757 #[test]
3758 fn metrics_endpoint_returns_prometheus_text() {
3759 super::metrics::record_http_metrics(super::metrics::RouteMetric::Health, "200 OK");
3760 let response = route_request(
3761 metrics_request(true),
3762 &ServerConfig::new(temp_dir("metrics"), Some("secret".to_string())),
3763 );
3764 let body = response_body(&response);
3765
3766 assert_eq!(response.status, "200 OK");
3767 assert_eq!(
3768 response.content_type,
3769 Some("text/plain; version=0.0.4; charset=utf-8")
3770 );
3771 assert!(body.contains("truss_http_requests_total"));
3772 assert!(body.contains("truss_http_requests_by_route_total{route=\"/health\"}"));
3773 assert!(body.contains("truss_http_responses_total{status=\"200\"}"));
3774 assert!(body.contains("# TYPE truss_http_request_duration_seconds histogram"));
3776 assert!(
3777 body.contains(
3778 "truss_http_request_duration_seconds_bucket{route=\"/health\",le=\"+Inf\"}"
3779 )
3780 );
3781 assert!(body.contains("# TYPE truss_transform_duration_seconds histogram"));
3782 assert!(body.contains("# TYPE truss_storage_request_duration_seconds histogram"));
3783 assert!(body.contains("# TYPE truss_transform_errors_total counter"));
3785 assert!(body.contains("truss_transform_errors_total{error_type=\"decode_failed\"}"));
3786 }
3787
3788 #[test]
3789 fn metrics_endpoint_returns_401_when_token_required() {
3790 let mut config = ServerConfig::new(temp_dir("metrics-auth"), None);
3791 config.metrics_token = Some("my-secret-token".to_string());
3792
3793 let response = route_request(metrics_request(false), &config);
3795 assert_eq!(response.status, "401 Unauthorized");
3796 }
3797
3798 #[test]
3799 fn metrics_endpoint_accepts_valid_token() {
3800 let mut config = ServerConfig::new(temp_dir("metrics-auth-ok"), None);
3801 config.metrics_token = Some("secret".to_string());
3802
3803 let response = route_request(metrics_request(true), &config);
3805 assert_eq!(response.status, "200 OK");
3806 }
3807
3808 #[test]
3809 fn metrics_endpoint_rejects_wrong_token() {
3810 let mut config = ServerConfig::new(temp_dir("metrics-auth-bad"), None);
3811 config.metrics_token = Some("correct-token".to_string());
3812
3813 let response = route_request(metrics_request(true), &config);
3815 assert_eq!(response.status, "401 Unauthorized");
3816 }
3817
3818 #[test]
3819 fn metrics_endpoint_returns_404_when_disabled() {
3820 let mut config = ServerConfig::new(temp_dir("metrics-disabled"), None);
3821 config.disable_metrics = true;
3822
3823 let response = route_request(metrics_request(false), &config);
3824 assert_eq!(response.status, "404 Not Found");
3825 }
3826
3827 #[test]
3828 fn transform_endpoint_rejects_unsupported_remote_content_encoding() {
3829 let (url, handle) = spawn_http_server(vec![(
3830 "200 OK".to_string(),
3831 vec![
3832 ("Content-Type".to_string(), "image/png".to_string()),
3833 ("Content-Encoding".to_string(), "compress".to_string()),
3834 ],
3835 png_bytes(),
3836 )]);
3837
3838 let response = route_request(
3839 transform_url_request(&url),
3840 &ServerConfig::new(temp_dir("encoding"), Some("secret".to_string()))
3841 .with_insecure_url_sources(true),
3842 );
3843
3844 handle.join().expect("join fixture server");
3845
3846 assert_eq!(response.status, "502 Bad Gateway");
3847 assert!(response_body(&response).contains("unsupported content-encoding"));
3848 }
3849
3850 #[test]
3851 fn resolve_storage_path_rejects_parent_segments() {
3852 let storage_root = temp_dir("resolve");
3853 let response = resolve_storage_path(&storage_root, "../escape.png")
3854 .expect_err("parent segments should be rejected");
3855
3856 assert_eq!(response.status, "400 Bad Request");
3857 assert!(response_body(&response).contains("must not contain root"));
3858 }
3859
3860 #[test]
3861 fn read_request_parses_headers_and_body() {
3862 let request_bytes = b"POST /images:transform HTTP/1.1\r\nHost: localhost\r\nContent-Type: application/json\r\nContent-Length: 2\r\n\r\n{}";
3863 let mut cursor = Cursor::new(request_bytes);
3864 let request = read_request(&mut cursor).expect("parse request");
3865
3866 assert_eq!(request.method, "POST");
3867 assert_eq!(request.target, "/images:transform");
3868 assert_eq!(request.version, "HTTP/1.1");
3869 assert_eq!(request.header("host"), Some("localhost"));
3870 assert_eq!(request.body, b"{}");
3871 }
3872
3873 #[test]
3874 fn read_request_rejects_duplicate_content_length() {
3875 let request_bytes =
3876 b"POST /images:transform HTTP/1.1\r\nContent-Length: 2\r\nContent-Length: 2\r\n\r\n{}";
3877 let mut cursor = Cursor::new(request_bytes);
3878 let response = read_request(&mut cursor).expect_err("duplicate headers should fail");
3879
3880 assert_eq!(response.status, "400 Bad Request");
3881 assert!(response_body(&response).contains("content-length"));
3882 }
3883
3884 #[test]
3885 fn serve_once_handles_a_tcp_request() {
3886 let storage_root = temp_dir("serve-once");
3887 let config = ServerConfig::new(storage_root, None);
3888 let listener = TcpListener::bind("127.0.0.1:0").expect("bind test listener");
3889 let addr = listener.local_addr().expect("read local addr");
3890
3891 let server = thread::spawn(move || serve_once_with_config(listener, config));
3892
3893 let mut stream = TcpStream::connect(addr).expect("connect to test server");
3894 stream
3895 .write_all(b"GET /health/live HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n")
3896 .expect("write request");
3897
3898 let mut response = String::new();
3899 stream.read_to_string(&mut response).expect("read response");
3900
3901 server
3902 .join()
3903 .expect("join test server thread")
3904 .expect("serve one request");
3905
3906 assert!(response.starts_with("HTTP/1.1 200 OK"));
3907 assert!(response.contains("Content-Type: application/json"));
3908 assert!(response.contains("\"status\":\"ok\""));
3909 assert!(response.contains("\"service\":\"truss\""));
3910 assert!(response.contains("\"version\":"));
3911 }
3912
3913 #[test]
3914 fn helper_error_responses_use_rfc7807_problem_details() {
3915 let response = auth_required_response("authorization required");
3916 let bad_request = bad_request_response("bad input");
3917
3918 assert_eq!(
3919 response.content_type,
3920 Some("application/problem+json"),
3921 "error responses must use application/problem+json"
3922 );
3923 assert_eq!(bad_request.content_type, Some("application/problem+json"),);
3924
3925 let auth_body = response_body(&response);
3926 assert!(auth_body.contains("authorization required"));
3927 assert!(auth_body.contains("\"type\":\"about:blank\""));
3928 assert!(auth_body.contains("\"title\":\"Unauthorized\""));
3929 assert!(auth_body.contains("\"status\":401"));
3930
3931 let bad_body = response_body(&bad_request);
3932 assert!(bad_body.contains("bad input"));
3933 assert!(bad_body.contains("\"type\":\"about:blank\""));
3934 assert!(bad_body.contains("\"title\":\"Bad Request\""));
3935 assert!(bad_body.contains("\"status\":400"));
3936 }
3937
3938 #[test]
3939 fn parse_headers_rejects_duplicate_host() {
3940 let lines = "Host: example.com\r\nHost: evil.com\r\n";
3941 let result = super::http_parse::parse_headers(lines.split("\r\n"));
3942 assert!(result.is_err());
3943 }
3944
3945 #[test]
3946 fn parse_headers_rejects_duplicate_authorization() {
3947 let lines = "Authorization: Bearer a\r\nAuthorization: Bearer b\r\n";
3948 let result = super::http_parse::parse_headers(lines.split("\r\n"));
3949 assert!(result.is_err());
3950 }
3951
3952 #[test]
3953 fn parse_headers_rejects_duplicate_content_type() {
3954 let lines = "Content-Type: application/json\r\nContent-Type: text/plain\r\n";
3955 let result = super::http_parse::parse_headers(lines.split("\r\n"));
3956 assert!(result.is_err());
3957 }
3958
3959 #[test]
3960 fn parse_headers_rejects_duplicate_transfer_encoding() {
3961 let lines = "Transfer-Encoding: chunked\r\nTransfer-Encoding: gzip\r\n";
3962 let result = super::http_parse::parse_headers(lines.split("\r\n"));
3963 assert!(result.is_err());
3964 }
3965
3966 #[test]
3967 fn parse_headers_rejects_single_transfer_encoding() {
3968 let lines = "Host: example.com\r\nTransfer-Encoding: chunked\r\n";
3969 let result = super::http_parse::parse_headers(lines.split("\r\n"));
3970 let err = result.unwrap_err();
3971 assert!(
3972 err.status.starts_with("501"),
3973 "expected 501 status, got: {}",
3974 err.status
3975 );
3976 assert!(
3977 String::from_utf8_lossy(&err.body).contains("Transfer-Encoding"),
3978 "error response should mention Transfer-Encoding"
3979 );
3980 }
3981
3982 #[test]
3983 fn parse_headers_rejects_transfer_encoding_identity() {
3984 let lines = "Transfer-Encoding: identity\r\n";
3985 let result = super::http_parse::parse_headers(lines.split("\r\n"));
3986 assert!(result.is_err());
3987 }
3988
3989 #[test]
3990 fn parse_headers_allows_single_instances_of_singleton_headers() {
3991 let lines =
3992 "Host: example.com\r\nAuthorization: Bearer tok\r\nContent-Type: application/json\r\n";
3993 let result = super::http_parse::parse_headers(lines.split("\r\n"));
3994 assert!(result.is_ok());
3995 assert_eq!(result.unwrap().len(), 3);
3996 }
3997
3998 #[test]
3999 fn max_body_for_multipart_uses_upload_limit() {
4000 let headers = vec![(
4001 "content-type".to_string(),
4002 "multipart/form-data; boundary=abc".to_string(),
4003 )];
4004 assert_eq!(
4005 super::http_parse::max_body_for_headers(
4006 &headers,
4007 super::http_parse::DEFAULT_MAX_UPLOAD_BODY_BYTES
4008 ),
4009 super::http_parse::DEFAULT_MAX_UPLOAD_BODY_BYTES
4010 );
4011 }
4012
4013 #[test]
4014 fn max_body_for_json_uses_default_limit() {
4015 let headers = vec![("content-type".to_string(), "application/json".to_string())];
4016 assert_eq!(
4017 super::http_parse::max_body_for_headers(
4018 &headers,
4019 super::http_parse::DEFAULT_MAX_UPLOAD_BODY_BYTES
4020 ),
4021 super::http_parse::MAX_REQUEST_BODY_BYTES
4022 );
4023 }
4024
4025 #[test]
4026 fn max_body_for_no_content_type_uses_default_limit() {
4027 let headers: Vec<(String, String)> = vec![];
4028 assert_eq!(
4029 super::http_parse::max_body_for_headers(
4030 &headers,
4031 super::http_parse::DEFAULT_MAX_UPLOAD_BODY_BYTES
4032 ),
4033 super::http_parse::MAX_REQUEST_BODY_BYTES
4034 );
4035 }
4036
4037 fn make_test_config() -> ServerConfig {
4038 ServerConfig::new(std::env::temp_dir(), None)
4039 }
4040
4041 #[test]
4042 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4043 fn storage_backend_parse_filesystem_aliases() {
4044 assert_eq!(
4045 super::StorageBackend::parse("filesystem").unwrap(),
4046 super::StorageBackend::Filesystem
4047 );
4048 assert_eq!(
4049 super::StorageBackend::parse("fs").unwrap(),
4050 super::StorageBackend::Filesystem
4051 );
4052 assert_eq!(
4053 super::StorageBackend::parse("local").unwrap(),
4054 super::StorageBackend::Filesystem
4055 );
4056 }
4057
4058 #[test]
4059 #[cfg(feature = "s3")]
4060 fn storage_backend_parse_s3() {
4061 assert_eq!(
4062 super::StorageBackend::parse("s3").unwrap(),
4063 super::StorageBackend::S3
4064 );
4065 assert_eq!(
4066 super::StorageBackend::parse("S3").unwrap(),
4067 super::StorageBackend::S3
4068 );
4069 }
4070
4071 #[test]
4072 #[cfg(feature = "gcs")]
4073 fn storage_backend_parse_gcs() {
4074 assert_eq!(
4075 super::StorageBackend::parse("gcs").unwrap(),
4076 super::StorageBackend::Gcs
4077 );
4078 assert_eq!(
4079 super::StorageBackend::parse("GCS").unwrap(),
4080 super::StorageBackend::Gcs
4081 );
4082 }
4083
4084 #[test]
4085 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4086 fn storage_backend_parse_rejects_unknown() {
4087 assert!(super::StorageBackend::parse("").is_err());
4088 #[cfg(not(feature = "azure"))]
4089 assert!(super::StorageBackend::parse("azure").is_err());
4090 #[cfg(feature = "azure")]
4091 assert!(super::StorageBackend::parse("azure").is_ok());
4092 }
4093
4094 #[test]
4095 fn versioned_source_hash_returns_none_without_version() {
4096 let source = TransformSourcePayload::Path {
4097 path: "/photos/hero.jpg".to_string(),
4098 version: None,
4099 };
4100 assert!(source.versioned_source_hash(&make_test_config()).is_none());
4101 }
4102
4103 #[test]
4104 fn versioned_source_hash_is_deterministic() {
4105 let cfg = make_test_config();
4106 let source = TransformSourcePayload::Path {
4107 path: "/photos/hero.jpg".to_string(),
4108 version: Some("v1".to_string()),
4109 };
4110 let hash1 = source.versioned_source_hash(&cfg).unwrap();
4111 let hash2 = source.versioned_source_hash(&cfg).unwrap();
4112 assert_eq!(hash1, hash2);
4113 assert_eq!(hash1.len(), 64);
4114 }
4115
4116 #[test]
4117 fn versioned_source_hash_differs_by_version() {
4118 let cfg = make_test_config();
4119 let v1 = TransformSourcePayload::Path {
4120 path: "/photos/hero.jpg".to_string(),
4121 version: Some("v1".to_string()),
4122 };
4123 let v2 = TransformSourcePayload::Path {
4124 path: "/photos/hero.jpg".to_string(),
4125 version: Some("v2".to_string()),
4126 };
4127 assert_ne!(
4128 v1.versioned_source_hash(&cfg).unwrap(),
4129 v2.versioned_source_hash(&cfg).unwrap()
4130 );
4131 }
4132
4133 #[test]
4134 fn versioned_source_hash_differs_by_kind() {
4135 let cfg = make_test_config();
4136 let path = TransformSourcePayload::Path {
4137 path: "example.com/image.jpg".to_string(),
4138 version: Some("v1".to_string()),
4139 };
4140 let url = TransformSourcePayload::Url {
4141 url: "example.com/image.jpg".to_string(),
4142 version: Some("v1".to_string()),
4143 };
4144 assert_ne!(
4145 path.versioned_source_hash(&cfg).unwrap(),
4146 url.versioned_source_hash(&cfg).unwrap()
4147 );
4148 }
4149
4150 #[test]
4151 fn versioned_source_hash_differs_by_storage_root() {
4152 let cfg1 = ServerConfig::new(PathBuf::from("/data/images"), None);
4153 let cfg2 = ServerConfig::new(PathBuf::from("/other/images"), None);
4154 let source = TransformSourcePayload::Path {
4155 path: "/photos/hero.jpg".to_string(),
4156 version: Some("v1".to_string()),
4157 };
4158 assert_ne!(
4159 source.versioned_source_hash(&cfg1).unwrap(),
4160 source.versioned_source_hash(&cfg2).unwrap()
4161 );
4162 }
4163
4164 #[test]
4165 fn versioned_source_hash_differs_by_insecure_flag() {
4166 let mut cfg1 = make_test_config();
4167 cfg1.allow_insecure_url_sources = false;
4168 let mut cfg2 = make_test_config();
4169 cfg2.allow_insecure_url_sources = true;
4170 let source = TransformSourcePayload::Url {
4171 url: "http://example.com/img.jpg".to_string(),
4172 version: Some("v1".to_string()),
4173 };
4174 assert_ne!(
4175 source.versioned_source_hash(&cfg1).unwrap(),
4176 source.versioned_source_hash(&cfg2).unwrap()
4177 );
4178 }
4179
4180 #[test]
4181 #[cfg(feature = "s3")]
4182 fn versioned_source_hash_storage_variant_is_deterministic() {
4183 let cfg = make_test_config();
4184 let source = TransformSourcePayload::Storage {
4185 bucket: Some("my-bucket".to_string()),
4186 key: "photos/hero.jpg".to_string(),
4187 version: Some("v1".to_string()),
4188 };
4189 let hash1 = source.versioned_source_hash(&cfg).unwrap();
4190 let hash2 = source.versioned_source_hash(&cfg).unwrap();
4191 assert_eq!(hash1, hash2);
4192 assert_eq!(hash1.len(), 64);
4193 }
4194
4195 #[test]
4196 #[cfg(feature = "s3")]
4197 fn versioned_source_hash_storage_differs_from_path() {
4198 let cfg = make_test_config();
4199 let path_source = TransformSourcePayload::Path {
4200 path: "photos/hero.jpg".to_string(),
4201 version: Some("v1".to_string()),
4202 };
4203 let storage_source = TransformSourcePayload::Storage {
4204 bucket: Some("my-bucket".to_string()),
4205 key: "photos/hero.jpg".to_string(),
4206 version: Some("v1".to_string()),
4207 };
4208 assert_ne!(
4209 path_source.versioned_source_hash(&cfg).unwrap(),
4210 storage_source.versioned_source_hash(&cfg).unwrap()
4211 );
4212 }
4213
4214 #[test]
4215 #[cfg(feature = "s3")]
4216 fn versioned_source_hash_storage_differs_by_bucket() {
4217 let cfg = make_test_config();
4218 let s1 = TransformSourcePayload::Storage {
4219 bucket: Some("bucket-a".to_string()),
4220 key: "image.jpg".to_string(),
4221 version: Some("v1".to_string()),
4222 };
4223 let s2 = TransformSourcePayload::Storage {
4224 bucket: Some("bucket-b".to_string()),
4225 key: "image.jpg".to_string(),
4226 version: Some("v1".to_string()),
4227 };
4228 assert_ne!(
4229 s1.versioned_source_hash(&cfg).unwrap(),
4230 s2.versioned_source_hash(&cfg).unwrap()
4231 );
4232 }
4233
4234 #[test]
4235 #[cfg(feature = "s3")]
4236 fn versioned_source_hash_differs_by_backend() {
4237 let cfg_fs = make_test_config();
4238 let mut cfg_s3 = make_test_config();
4239 cfg_s3.storage_backend = super::StorageBackend::S3;
4240
4241 let source = TransformSourcePayload::Path {
4242 path: "photos/hero.jpg".to_string(),
4243 version: Some("v1".to_string()),
4244 };
4245 assert_ne!(
4246 source.versioned_source_hash(&cfg_fs).unwrap(),
4247 source.versioned_source_hash(&cfg_s3).unwrap()
4248 );
4249 }
4250
4251 #[test]
4252 #[cfg(feature = "s3")]
4253 fn versioned_source_hash_storage_differs_by_endpoint() {
4254 let mut cfg_a = make_test_config();
4255 cfg_a.storage_backend = super::StorageBackend::S3;
4256 cfg_a.s3_context = Some(std::sync::Arc::new(super::s3::S3Context::for_test(
4257 "shared",
4258 Some("http://minio-a:9000"),
4259 )));
4260
4261 let mut cfg_b = make_test_config();
4262 cfg_b.storage_backend = super::StorageBackend::S3;
4263 cfg_b.s3_context = Some(std::sync::Arc::new(super::s3::S3Context::for_test(
4264 "shared",
4265 Some("http://minio-b:9000"),
4266 )));
4267
4268 let source = TransformSourcePayload::Storage {
4269 bucket: None,
4270 key: "image.jpg".to_string(),
4271 version: Some("v1".to_string()),
4272 };
4273 assert_ne!(
4274 source.versioned_source_hash(&cfg_a).unwrap(),
4275 source.versioned_source_hash(&cfg_b).unwrap(),
4276 );
4277 assert_ne!(cfg_a, cfg_b);
4278 }
4279
4280 #[test]
4281 #[cfg(feature = "s3")]
4282 fn storage_backend_default_is_filesystem() {
4283 let cfg = make_test_config();
4284 assert_eq!(cfg.storage_backend, super::StorageBackend::Filesystem);
4285 assert!(cfg.s3_context.is_none());
4286 }
4287
4288 #[test]
4289 #[cfg(feature = "s3")]
4290 fn storage_payload_deserializes_storage_variant() {
4291 let json = r#"{"source":{"kind":"storage","key":"photos/hero.jpg"},"options":{}}"#;
4292 let payload: super::TransformImageRequestPayload = serde_json::from_str(json).unwrap();
4293 match payload.source {
4294 TransformSourcePayload::Storage {
4295 bucket,
4296 key,
4297 version,
4298 } => {
4299 assert!(bucket.is_none());
4300 assert_eq!(key, "photos/hero.jpg");
4301 assert!(version.is_none());
4302 }
4303 _ => panic!("expected Storage variant"),
4304 }
4305 }
4306
4307 #[test]
4308 #[cfg(feature = "s3")]
4309 fn storage_payload_deserializes_with_bucket() {
4310 let json = r#"{"source":{"kind":"storage","bucket":"my-bucket","key":"img.png","version":"v2"},"options":{}}"#;
4311 let payload: super::TransformImageRequestPayload = serde_json::from_str(json).unwrap();
4312 match payload.source {
4313 TransformSourcePayload::Storage {
4314 bucket,
4315 key,
4316 version,
4317 } => {
4318 assert_eq!(bucket.as_deref(), Some("my-bucket"));
4319 assert_eq!(key, "img.png");
4320 assert_eq!(version.as_deref(), Some("v2"));
4321 }
4322 _ => panic!("expected Storage variant"),
4323 }
4324 }
4325
4326 #[test]
4331 #[cfg(feature = "s3")]
4332 fn versioned_source_hash_uses_default_bucket_when_bucket_is_none() {
4333 let mut cfg_a = make_test_config();
4334 cfg_a.storage_backend = super::StorageBackend::S3;
4335 cfg_a.s3_context = Some(std::sync::Arc::new(super::s3::S3Context::for_test(
4336 "bucket-a", None,
4337 )));
4338
4339 let mut cfg_b = make_test_config();
4340 cfg_b.storage_backend = super::StorageBackend::S3;
4341 cfg_b.s3_context = Some(std::sync::Arc::new(super::s3::S3Context::for_test(
4342 "bucket-b", None,
4343 )));
4344
4345 let source = TransformSourcePayload::Storage {
4346 bucket: None,
4347 key: "image.jpg".to_string(),
4348 version: Some("v1".to_string()),
4349 };
4350 assert_ne!(
4352 source.versioned_source_hash(&cfg_a).unwrap(),
4353 source.versioned_source_hash(&cfg_b).unwrap(),
4354 );
4355 assert_ne!(cfg_a, cfg_b);
4357 }
4358
4359 #[test]
4360 #[cfg(feature = "s3")]
4361 fn versioned_source_hash_returns_none_without_bucket_or_context() {
4362 let mut cfg = make_test_config();
4363 cfg.storage_backend = super::StorageBackend::S3;
4364 cfg.s3_context = None;
4365
4366 let source = TransformSourcePayload::Storage {
4367 bucket: None,
4368 key: "image.jpg".to_string(),
4369 version: Some("v1".to_string()),
4370 };
4371 assert!(source.versioned_source_hash(&cfg).is_none());
4373 }
4374
4375 #[cfg(feature = "s3")]
4384 static FROM_ENV_MUTEX: std::sync::Mutex<()> = std::sync::Mutex::new(());
4385
4386 #[cfg(feature = "s3")]
4387 const S3_ENV_VARS: &[&str] = &[
4388 "TRUSS_STORAGE_ROOT",
4389 "TRUSS_STORAGE_BACKEND",
4390 "TRUSS_S3_BUCKET",
4391 ];
4392
4393 #[cfg(feature = "s3")]
4396 fn with_s3_env<F: FnOnce()>(vars: &[(&str, Option<&str>)], f: F) {
4397 let _guard = FROM_ENV_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
4398 let saved: Vec<(&str, Option<String>)> = S3_ENV_VARS
4399 .iter()
4400 .map(|k| (*k, std::env::var(k).ok()))
4401 .collect();
4402 for &(key, value) in vars {
4404 match value {
4405 Some(v) => unsafe { std::env::set_var(key, v) },
4406 None => unsafe { std::env::remove_var(key) },
4407 }
4408 }
4409 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(f));
4410 for (key, original) in saved {
4412 match original {
4413 Some(v) => unsafe { std::env::set_var(key, v) },
4414 None => unsafe { std::env::remove_var(key) },
4415 }
4416 }
4417 if let Err(payload) = result {
4418 std::panic::resume_unwind(payload);
4419 }
4420 }
4421
4422 #[test]
4423 #[cfg(feature = "s3")]
4424 fn from_env_rejects_invalid_storage_backend() {
4425 let storage = temp_dir("env-bad-backend");
4426 let storage_str = storage.to_str().unwrap().to_string();
4427 with_s3_env(
4428 &[
4429 ("TRUSS_STORAGE_ROOT", Some(&storage_str)),
4430 ("TRUSS_STORAGE_BACKEND", Some("nosuchbackend")),
4431 ("TRUSS_S3_BUCKET", None),
4432 ],
4433 || {
4434 let result = ServerConfig::from_env();
4435 assert!(result.is_err());
4436 let msg = result.unwrap_err().to_string();
4437 assert!(msg.contains("unknown storage backend"), "got: {msg}");
4438 },
4439 );
4440 let _ = std::fs::remove_dir_all(storage);
4441 }
4442
4443 #[test]
4444 #[cfg(feature = "s3")]
4445 fn from_env_rejects_s3_without_bucket() {
4446 let storage = temp_dir("env-no-bucket");
4447 let storage_str = storage.to_str().unwrap().to_string();
4448 with_s3_env(
4449 &[
4450 ("TRUSS_STORAGE_ROOT", Some(&storage_str)),
4451 ("TRUSS_STORAGE_BACKEND", Some("s3")),
4452 ("TRUSS_S3_BUCKET", None),
4453 ],
4454 || {
4455 let result = ServerConfig::from_env();
4456 assert!(result.is_err());
4457 let msg = result.unwrap_err().to_string();
4458 assert!(msg.contains("TRUSS_S3_BUCKET"), "got: {msg}");
4459 },
4460 );
4461 let _ = std::fs::remove_dir_all(storage);
4462 }
4463
4464 #[test]
4465 #[cfg(feature = "s3")]
4466 fn from_env_accepts_s3_with_bucket() {
4467 let storage = temp_dir("env-s3-ok");
4468 let storage_str = storage.to_str().unwrap().to_string();
4469 with_s3_env(
4470 &[
4471 ("TRUSS_STORAGE_ROOT", Some(&storage_str)),
4472 ("TRUSS_STORAGE_BACKEND", Some("s3")),
4473 ("TRUSS_S3_BUCKET", Some("my-images")),
4474 ],
4475 || {
4476 let cfg =
4477 ServerConfig::from_env().expect("from_env should succeed with s3 + bucket");
4478 assert_eq!(cfg.storage_backend, super::StorageBackend::S3);
4479 let ctx = cfg.s3_context.expect("s3_context should be Some");
4480 assert_eq!(ctx.default_bucket, "my-images");
4481 },
4482 );
4483 let _ = std::fs::remove_dir_all(storage);
4484 }
4485
4486 #[test]
4491 #[cfg(feature = "s3")]
4492 fn health_ready_s3_returns_503_when_context_missing() {
4493 let storage = temp_dir("health-s3-no-ctx");
4494 let mut config = ServerConfig::new(storage.clone(), None);
4495 config.storage_backend = super::StorageBackend::S3;
4496 config.s3_context = None;
4497
4498 let request = super::http_parse::HttpRequest {
4499 method: "GET".to_string(),
4500 target: "/health/ready".to_string(),
4501 version: "HTTP/1.1".to_string(),
4502 headers: Vec::new(),
4503 body: Vec::new(),
4504 };
4505 let response = route_request(request, &config);
4506 let _ = std::fs::remove_dir_all(storage);
4507
4508 assert_eq!(response.status, "503 Service Unavailable");
4509 let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
4510 let checks = body["checks"].as_array().expect("checks array");
4511 assert!(
4512 checks
4513 .iter()
4514 .any(|c| c["name"] == "storageBackend" && c["status"] == "fail"),
4515 "expected s3Client fail check in {body}",
4516 );
4517 }
4518
4519 #[test]
4520 #[cfg(feature = "s3")]
4521 fn health_ready_s3_includes_s3_client_check() {
4522 let storage = temp_dir("health-s3-ok");
4523 let mut config = ServerConfig::new(storage.clone(), None);
4524 config.storage_backend = super::StorageBackend::S3;
4525 config.s3_context = Some(std::sync::Arc::new(super::s3::S3Context::for_test(
4526 "test-bucket",
4527 None,
4528 )));
4529
4530 let request = super::http_parse::HttpRequest {
4531 method: "GET".to_string(),
4532 target: "/health/ready".to_string(),
4533 version: "HTTP/1.1".to_string(),
4534 headers: Vec::new(),
4535 body: Vec::new(),
4536 };
4537 let response = route_request(request, &config);
4538 let _ = std::fs::remove_dir_all(storage);
4539
4540 let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
4543 let checks = body["checks"].as_array().expect("checks array");
4544 assert!(
4545 checks.iter().any(|c| c["name"] == "storageBackend"),
4546 "expected s3Client check in {body}",
4547 );
4548 }
4549
4550 #[cfg(feature = "s3")]
4558 fn remap_path_to_storage(path: &str, version: Option<&str>) -> TransformSourcePayload {
4559 let source = TransformSourcePayload::Path {
4560 path: path.to_string(),
4561 version: version.map(|v| v.to_string()),
4562 };
4563 match source {
4564 TransformSourcePayload::Path { path, version } => TransformSourcePayload::Storage {
4565 bucket: None,
4566 key: path.trim_start_matches('/').to_string(),
4567 version,
4568 },
4569 other => other,
4570 }
4571 }
4572
4573 #[test]
4574 #[cfg(feature = "s3")]
4575 fn public_by_path_s3_remap_trims_leading_slash() {
4576 let source = remap_path_to_storage("/photos/hero.jpg", Some("v1"));
4580 match &source {
4581 TransformSourcePayload::Storage { key, .. } => {
4582 assert_eq!(key, "photos/hero.jpg", "leading / must be trimmed");
4583 }
4584 _ => panic!("expected Storage variant after remap"),
4585 }
4586
4587 let source2 = remap_path_to_storage("photos/hero.jpg", Some("v1"));
4589 match &source2 {
4590 TransformSourcePayload::Storage { key, .. } => {
4591 assert_eq!(key, "photos/hero.jpg");
4592 }
4593 _ => panic!("expected Storage variant after remap"),
4594 }
4595
4596 let mut cfg = make_test_config();
4598 cfg.storage_backend = super::StorageBackend::S3;
4599 cfg.s3_context = Some(std::sync::Arc::new(super::s3::S3Context::for_test(
4600 "my-bucket",
4601 None,
4602 )));
4603 assert_eq!(
4604 source.versioned_source_hash(&cfg),
4605 source2.versioned_source_hash(&cfg),
4606 "leading-slash and no-leading-slash paths must hash identically after trim",
4607 );
4608 }
4609
4610 #[test]
4611 #[cfg(feature = "s3")]
4612 fn public_by_path_s3_remap_produces_storage_variant() {
4613 let source = remap_path_to_storage("/image.png", None);
4615 match source {
4616 TransformSourcePayload::Storage {
4617 bucket,
4618 key,
4619 version,
4620 } => {
4621 assert!(bucket.is_none(), "bucket must be None (use default)");
4622 assert_eq!(key, "image.png");
4623 assert!(version.is_none());
4624 }
4625 _ => panic!("expected Storage variant"),
4626 }
4627 }
4628
4629 #[test]
4634 #[cfg(feature = "gcs")]
4635 fn health_ready_gcs_returns_503_when_context_missing() {
4636 let storage = temp_dir("health-gcs-no-ctx");
4637 let mut config = ServerConfig::new(storage.clone(), None);
4638 config.storage_backend = super::StorageBackend::Gcs;
4639 config.gcs_context = None;
4640
4641 let request = super::http_parse::HttpRequest {
4642 method: "GET".to_string(),
4643 target: "/health/ready".to_string(),
4644 version: "HTTP/1.1".to_string(),
4645 headers: Vec::new(),
4646 body: Vec::new(),
4647 };
4648 let response = route_request(request, &config);
4649 let _ = std::fs::remove_dir_all(storage);
4650
4651 assert_eq!(response.status, "503 Service Unavailable");
4652 let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
4653 let checks = body["checks"].as_array().expect("checks array");
4654 assert!(
4655 checks
4656 .iter()
4657 .any(|c| c["name"] == "storageBackend" && c["status"] == "fail"),
4658 "expected gcsClient fail check in {body}",
4659 );
4660 }
4661
4662 #[test]
4663 #[cfg(feature = "gcs")]
4664 fn health_ready_gcs_includes_gcs_client_check() {
4665 let storage = temp_dir("health-gcs-ok");
4666 let mut config = ServerConfig::new(storage.clone(), None);
4667 config.storage_backend = super::StorageBackend::Gcs;
4668 config.gcs_context = Some(std::sync::Arc::new(super::gcs::GcsContext::for_test(
4669 "test-bucket",
4670 None,
4671 )));
4672
4673 let request = super::http_parse::HttpRequest {
4674 method: "GET".to_string(),
4675 target: "/health/ready".to_string(),
4676 version: "HTTP/1.1".to_string(),
4677 headers: Vec::new(),
4678 body: Vec::new(),
4679 };
4680 let response = route_request(request, &config);
4681 let _ = std::fs::remove_dir_all(storage);
4682
4683 let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
4686 let checks = body["checks"].as_array().expect("checks array");
4687 assert!(
4688 checks.iter().any(|c| c["name"] == "storageBackend"),
4689 "expected gcsClient check in {body}",
4690 );
4691 }
4692
4693 #[cfg(feature = "gcs")]
4698 fn remap_path_to_storage_gcs(path: &str, version: Option<&str>) -> TransformSourcePayload {
4699 let source = TransformSourcePayload::Path {
4700 path: path.to_string(),
4701 version: version.map(|v| v.to_string()),
4702 };
4703 match source {
4704 TransformSourcePayload::Path { path, version } => TransformSourcePayload::Storage {
4705 bucket: None,
4706 key: path.trim_start_matches('/').to_string(),
4707 version,
4708 },
4709 other => other,
4710 }
4711 }
4712
4713 #[test]
4714 #[cfg(feature = "gcs")]
4715 fn public_by_path_gcs_remap_trims_leading_slash() {
4716 let source = remap_path_to_storage_gcs("/photos/hero.jpg", Some("v1"));
4717 match &source {
4718 TransformSourcePayload::Storage { key, .. } => {
4719 assert_eq!(key, "photos/hero.jpg", "leading / must be trimmed");
4720 }
4721 _ => panic!("expected Storage variant after remap"),
4722 }
4723
4724 let source2 = remap_path_to_storage_gcs("photos/hero.jpg", Some("v1"));
4725 match &source2 {
4726 TransformSourcePayload::Storage { key, .. } => {
4727 assert_eq!(key, "photos/hero.jpg");
4728 }
4729 _ => panic!("expected Storage variant after remap"),
4730 }
4731
4732 let mut cfg = make_test_config();
4733 cfg.storage_backend = super::StorageBackend::Gcs;
4734 cfg.gcs_context = Some(std::sync::Arc::new(super::gcs::GcsContext::for_test(
4735 "my-bucket",
4736 None,
4737 )));
4738 assert_eq!(
4739 source.versioned_source_hash(&cfg),
4740 source2.versioned_source_hash(&cfg),
4741 "leading-slash and no-leading-slash paths must hash identically after trim",
4742 );
4743 }
4744
4745 #[test]
4746 #[cfg(feature = "gcs")]
4747 fn public_by_path_gcs_remap_produces_storage_variant() {
4748 let source = remap_path_to_storage_gcs("/image.png", None);
4749 match source {
4750 TransformSourcePayload::Storage {
4751 bucket,
4752 key,
4753 version,
4754 } => {
4755 assert!(bucket.is_none(), "bucket must be None (use default)");
4756 assert_eq!(key, "image.png");
4757 assert!(version.is_none());
4758 }
4759 _ => panic!("expected Storage variant"),
4760 }
4761 }
4762
4763 #[test]
4768 #[cfg(feature = "azure")]
4769 fn health_ready_azure_returns_503_when_context_missing() {
4770 let storage = temp_dir("health-azure-no-ctx");
4771 let mut config = ServerConfig::new(storage.clone(), None);
4772 config.storage_backend = super::StorageBackend::Azure;
4773 config.azure_context = None;
4774
4775 let request = super::http_parse::HttpRequest {
4776 method: "GET".to_string(),
4777 target: "/health/ready".to_string(),
4778 version: "HTTP/1.1".to_string(),
4779 headers: Vec::new(),
4780 body: Vec::new(),
4781 };
4782 let response = route_request(request, &config);
4783 let _ = std::fs::remove_dir_all(storage);
4784
4785 assert_eq!(response.status, "503 Service Unavailable");
4786 let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
4787 let checks = body["checks"].as_array().expect("checks array");
4788 assert!(
4789 checks
4790 .iter()
4791 .any(|c| c["name"] == "storageBackend" && c["status"] == "fail"),
4792 "expected azureClient fail check in {body}",
4793 );
4794 }
4795
4796 #[test]
4797 #[cfg(feature = "azure")]
4798 fn health_ready_azure_includes_azure_client_check() {
4799 let storage = temp_dir("health-azure-ok");
4800 let mut config = ServerConfig::new(storage.clone(), None);
4801 config.storage_backend = super::StorageBackend::Azure;
4802 config.azure_context = Some(std::sync::Arc::new(super::azure::AzureContext::for_test(
4803 "test-bucket",
4804 "http://localhost:10000/devstoreaccount1",
4805 )));
4806
4807 let request = super::http_parse::HttpRequest {
4808 method: "GET".to_string(),
4809 target: "/health/ready".to_string(),
4810 version: "HTTP/1.1".to_string(),
4811 headers: Vec::new(),
4812 body: Vec::new(),
4813 };
4814 let response = route_request(request, &config);
4815 let _ = std::fs::remove_dir_all(storage);
4816
4817 let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
4818 let checks = body["checks"].as_array().expect("checks array");
4819 assert!(
4820 checks.iter().any(|c| c["name"] == "storageBackend"),
4821 "expected azureClient check in {body}",
4822 );
4823 }
4824
4825 #[test]
4826 fn read_request_rejects_json_body_over_1mib() {
4827 let body = vec![b'x'; super::http_parse::MAX_REQUEST_BODY_BYTES + 1];
4828 let content_length = body.len();
4829 let raw = format!(
4830 "POST /images:transform HTTP/1.1\r\n\
4831 Content-Type: application/json\r\n\
4832 Content-Length: {content_length}\r\n\r\n"
4833 );
4834 let mut data = raw.into_bytes();
4835 data.extend_from_slice(&body);
4836 let result = read_request(&mut data.as_slice());
4837 assert!(result.is_err());
4838 }
4839
4840 #[test]
4841 fn read_request_accepts_multipart_body_over_1mib() {
4842 let payload_size = super::http_parse::MAX_REQUEST_BODY_BYTES + 100;
4843 let body_content = vec![b'A'; payload_size];
4844 let boundary = "test-boundary-123";
4845 let mut body = Vec::new();
4846 body.extend_from_slice(format!("--{boundary}\r\nContent-Disposition: form-data; name=\"file\"; filename=\"big.jpg\"\r\n\r\n").as_bytes());
4847 body.extend_from_slice(&body_content);
4848 body.extend_from_slice(format!("\r\n--{boundary}--\r\n").as_bytes());
4849 let content_length = body.len();
4850 let raw = format!(
4851 "POST /images HTTP/1.1\r\n\
4852 Content-Type: multipart/form-data; boundary={boundary}\r\n\
4853 Content-Length: {content_length}\r\n\r\n"
4854 );
4855 let mut data = raw.into_bytes();
4856 data.extend_from_slice(&body);
4857 let result = read_request(&mut data.as_slice());
4858 assert!(
4859 result.is_ok(),
4860 "multipart upload over 1 MiB should be accepted"
4861 );
4862 }
4863
4864 #[test]
4865 fn multipart_boundary_in_payload_does_not_split_part() {
4866 let boundary = "abc123";
4867 let fake_boundary_in_payload = format!("\r\n--{boundary}NOTREAL");
4868 let part_body = format!("before{fake_boundary_in_payload}after");
4869 let body = format!(
4870 "--{boundary}\r\n\
4871 Content-Disposition: form-data; name=\"file\"\r\n\
4872 Content-Type: application/octet-stream\r\n\r\n\
4873 {part_body}\r\n\
4874 --{boundary}--\r\n"
4875 );
4876
4877 let parts = parse_multipart_form_data(body.as_bytes(), boundary)
4878 .expect("should parse despite boundary-like string in payload");
4879 assert_eq!(parts.len(), 1, "should have exactly one part");
4880
4881 let part_data = &body.as_bytes()[parts[0].body_range.clone()];
4882 let part_text = std::str::from_utf8(part_data).unwrap();
4883 assert!(
4884 part_text.contains("NOTREAL"),
4885 "part body should contain the full fake boundary string"
4886 );
4887 }
4888
4889 #[test]
4890 fn multipart_normal_two_parts_still_works() {
4891 let boundary = "testboundary";
4892 let body = format!(
4893 "--{boundary}\r\n\
4894 Content-Disposition: form-data; name=\"field1\"\r\n\r\n\
4895 value1\r\n\
4896 --{boundary}\r\n\
4897 Content-Disposition: form-data; name=\"field2\"\r\n\r\n\
4898 value2\r\n\
4899 --{boundary}--\r\n"
4900 );
4901
4902 let parts = parse_multipart_form_data(body.as_bytes(), boundary)
4903 .expect("should parse two normal parts");
4904 assert_eq!(parts.len(), 2);
4905 assert_eq!(parts[0].name, "field1");
4906 assert_eq!(parts[1].name, "field2");
4907 }
4908
4909 #[test]
4910 #[serial]
4911 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4912 fn test_storage_timeout_default() {
4913 unsafe {
4914 std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4915 }
4916 let config = ServerConfig::from_env().unwrap();
4917 assert_eq!(config.storage_timeout_secs, 30);
4918 }
4919
4920 #[test]
4921 #[serial]
4922 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4923 fn test_storage_timeout_custom() {
4924 unsafe {
4925 std::env::set_var("TRUSS_STORAGE_TIMEOUT_SECS", "60");
4926 }
4927 let config = ServerConfig::from_env().unwrap();
4928 assert_eq!(config.storage_timeout_secs, 60);
4929 unsafe {
4930 std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4931 }
4932 }
4933
4934 #[test]
4935 #[serial]
4936 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4937 fn test_storage_timeout_min_boundary() {
4938 unsafe {
4939 std::env::set_var("TRUSS_STORAGE_TIMEOUT_SECS", "1");
4940 }
4941 let config = ServerConfig::from_env().unwrap();
4942 assert_eq!(config.storage_timeout_secs, 1);
4943 unsafe {
4944 std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4945 }
4946 }
4947
4948 #[test]
4949 #[serial]
4950 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4951 fn test_storage_timeout_max_boundary() {
4952 unsafe {
4953 std::env::set_var("TRUSS_STORAGE_TIMEOUT_SECS", "300");
4954 }
4955 let config = ServerConfig::from_env().unwrap();
4956 assert_eq!(config.storage_timeout_secs, 300);
4957 unsafe {
4958 std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4959 }
4960 }
4961
4962 #[test]
4963 #[serial]
4964 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4965 fn test_storage_timeout_empty_string_uses_default() {
4966 unsafe {
4967 std::env::set_var("TRUSS_STORAGE_TIMEOUT_SECS", "");
4968 }
4969 let config = ServerConfig::from_env().unwrap();
4970 assert_eq!(config.storage_timeout_secs, 30);
4971 unsafe {
4972 std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4973 }
4974 }
4975
4976 #[test]
4977 #[serial]
4978 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4979 fn test_storage_timeout_zero_rejected() {
4980 unsafe {
4981 std::env::set_var("TRUSS_STORAGE_TIMEOUT_SECS", "0");
4982 }
4983 let err = ServerConfig::from_env().unwrap_err();
4984 assert!(
4985 err.to_string().contains("between 1 and 300"),
4986 "error should mention valid range: {err}"
4987 );
4988 unsafe {
4989 std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
4990 }
4991 }
4992
4993 #[test]
4994 #[serial]
4995 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
4996 fn test_storage_timeout_over_max_rejected() {
4997 unsafe {
4998 std::env::set_var("TRUSS_STORAGE_TIMEOUT_SECS", "301");
4999 }
5000 let err = ServerConfig::from_env().unwrap_err();
5001 assert!(
5002 err.to_string().contains("between 1 and 300"),
5003 "error should mention valid range: {err}"
5004 );
5005 unsafe {
5006 std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
5007 }
5008 }
5009
5010 #[test]
5011 #[serial]
5012 #[cfg(any(feature = "s3", feature = "gcs", feature = "azure"))]
5013 fn test_storage_timeout_non_numeric_rejected() {
5014 unsafe {
5015 std::env::set_var("TRUSS_STORAGE_TIMEOUT_SECS", "abc");
5016 }
5017 let err = ServerConfig::from_env().unwrap_err();
5018 assert!(
5019 err.to_string().contains("positive integer"),
5020 "error should mention positive integer: {err}"
5021 );
5022 unsafe {
5023 std::env::remove_var("TRUSS_STORAGE_TIMEOUT_SECS");
5024 }
5025 }
5026
5027 #[test]
5028 #[serial]
5029 fn test_max_concurrent_transforms_default() {
5030 unsafe {
5031 std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
5032 }
5033 let config = ServerConfig::from_env().unwrap();
5034 assert_eq!(config.max_concurrent_transforms, 64);
5035 }
5036
5037 #[test]
5038 #[serial]
5039 fn test_max_concurrent_transforms_custom() {
5040 unsafe {
5041 std::env::set_var("TRUSS_MAX_CONCURRENT_TRANSFORMS", "128");
5042 }
5043 let config = ServerConfig::from_env().unwrap();
5044 assert_eq!(config.max_concurrent_transforms, 128);
5045 unsafe {
5046 std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
5047 }
5048 }
5049
5050 #[test]
5051 #[serial]
5052 fn test_max_concurrent_transforms_min_boundary() {
5053 unsafe {
5054 std::env::set_var("TRUSS_MAX_CONCURRENT_TRANSFORMS", "1");
5055 }
5056 let config = ServerConfig::from_env().unwrap();
5057 assert_eq!(config.max_concurrent_transforms, 1);
5058 unsafe {
5059 std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
5060 }
5061 }
5062
5063 #[test]
5064 #[serial]
5065 fn test_max_concurrent_transforms_max_boundary() {
5066 unsafe {
5067 std::env::set_var("TRUSS_MAX_CONCURRENT_TRANSFORMS", "1024");
5068 }
5069 let config = ServerConfig::from_env().unwrap();
5070 assert_eq!(config.max_concurrent_transforms, 1024);
5071 unsafe {
5072 std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
5073 }
5074 }
5075
5076 #[test]
5077 #[serial]
5078 fn test_max_concurrent_transforms_empty_uses_default() {
5079 unsafe {
5080 std::env::set_var("TRUSS_MAX_CONCURRENT_TRANSFORMS", "");
5081 }
5082 let config = ServerConfig::from_env().unwrap();
5083 assert_eq!(config.max_concurrent_transforms, 64);
5084 unsafe {
5085 std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
5086 }
5087 }
5088
5089 #[test]
5090 #[serial]
5091 fn test_max_concurrent_transforms_zero_rejected() {
5092 unsafe {
5093 std::env::set_var("TRUSS_MAX_CONCURRENT_TRANSFORMS", "0");
5094 }
5095 let err = ServerConfig::from_env().unwrap_err();
5096 assert!(
5097 err.to_string().contains("between 1 and 1024"),
5098 "error should mention valid range: {err}"
5099 );
5100 unsafe {
5101 std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
5102 }
5103 }
5104
5105 #[test]
5106 #[serial]
5107 fn test_max_concurrent_transforms_over_max_rejected() {
5108 unsafe {
5109 std::env::set_var("TRUSS_MAX_CONCURRENT_TRANSFORMS", "1025");
5110 }
5111 let err = ServerConfig::from_env().unwrap_err();
5112 assert!(
5113 err.to_string().contains("between 1 and 1024"),
5114 "error should mention valid range: {err}"
5115 );
5116 unsafe {
5117 std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
5118 }
5119 }
5120
5121 #[test]
5122 #[serial]
5123 fn test_max_concurrent_transforms_non_numeric_rejected() {
5124 unsafe {
5125 std::env::set_var("TRUSS_MAX_CONCURRENT_TRANSFORMS", "abc");
5126 }
5127 let err = ServerConfig::from_env().unwrap_err();
5128 assert!(
5129 err.to_string().contains("positive integer"),
5130 "error should mention positive integer: {err}"
5131 );
5132 unsafe {
5133 std::env::remove_var("TRUSS_MAX_CONCURRENT_TRANSFORMS");
5134 }
5135 }
5136
5137 #[test]
5138 #[serial]
5139 fn test_transform_deadline_default() {
5140 unsafe {
5141 std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
5142 }
5143 let config = ServerConfig::from_env().unwrap();
5144 assert_eq!(config.transform_deadline_secs, 30);
5145 }
5146
5147 #[test]
5148 #[serial]
5149 fn test_transform_deadline_custom() {
5150 unsafe {
5151 std::env::set_var("TRUSS_TRANSFORM_DEADLINE_SECS", "60");
5152 }
5153 let config = ServerConfig::from_env().unwrap();
5154 assert_eq!(config.transform_deadline_secs, 60);
5155 unsafe {
5156 std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
5157 }
5158 }
5159
5160 #[test]
5161 #[serial]
5162 fn test_transform_deadline_min_boundary() {
5163 unsafe {
5164 std::env::set_var("TRUSS_TRANSFORM_DEADLINE_SECS", "1");
5165 }
5166 let config = ServerConfig::from_env().unwrap();
5167 assert_eq!(config.transform_deadline_secs, 1);
5168 unsafe {
5169 std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
5170 }
5171 }
5172
5173 #[test]
5174 #[serial]
5175 fn test_transform_deadline_max_boundary() {
5176 unsafe {
5177 std::env::set_var("TRUSS_TRANSFORM_DEADLINE_SECS", "300");
5178 }
5179 let config = ServerConfig::from_env().unwrap();
5180 assert_eq!(config.transform_deadline_secs, 300);
5181 unsafe {
5182 std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
5183 }
5184 }
5185
5186 #[test]
5187 #[serial]
5188 fn test_transform_deadline_empty_uses_default() {
5189 unsafe {
5190 std::env::set_var("TRUSS_TRANSFORM_DEADLINE_SECS", "");
5191 }
5192 let config = ServerConfig::from_env().unwrap();
5193 assert_eq!(config.transform_deadline_secs, 30);
5194 unsafe {
5195 std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
5196 }
5197 }
5198
5199 #[test]
5200 #[serial]
5201 fn test_transform_deadline_zero_rejected() {
5202 unsafe {
5203 std::env::set_var("TRUSS_TRANSFORM_DEADLINE_SECS", "0");
5204 }
5205 let err = ServerConfig::from_env().unwrap_err();
5206 assert!(
5207 err.to_string().contains("between 1 and 300"),
5208 "error should mention valid range: {err}"
5209 );
5210 unsafe {
5211 std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
5212 }
5213 }
5214
5215 #[test]
5216 #[serial]
5217 fn test_transform_deadline_over_max_rejected() {
5218 unsafe {
5219 std::env::set_var("TRUSS_TRANSFORM_DEADLINE_SECS", "301");
5220 }
5221 let err = ServerConfig::from_env().unwrap_err();
5222 assert!(
5223 err.to_string().contains("between 1 and 300"),
5224 "error should mention valid range: {err}"
5225 );
5226 unsafe {
5227 std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
5228 }
5229 }
5230
5231 #[test]
5232 #[serial]
5233 fn test_transform_deadline_non_numeric_rejected() {
5234 unsafe {
5235 std::env::set_var("TRUSS_TRANSFORM_DEADLINE_SECS", "abc");
5236 }
5237 let err = ServerConfig::from_env().unwrap_err();
5238 assert!(
5239 err.to_string().contains("positive integer"),
5240 "error should mention positive integer: {err}"
5241 );
5242 unsafe {
5243 std::env::remove_var("TRUSS_TRANSFORM_DEADLINE_SECS");
5244 }
5245 }
5246
5247 #[test]
5248 #[serial]
5249 #[cfg(feature = "azure")]
5250 fn test_azure_container_env_var_required() {
5251 unsafe {
5252 std::env::set_var("TRUSS_STORAGE_BACKEND", "azure");
5253 std::env::remove_var("TRUSS_AZURE_CONTAINER");
5254 }
5255 let err = ServerConfig::from_env().unwrap_err();
5256 assert!(
5257 err.to_string().contains("TRUSS_AZURE_CONTAINER"),
5258 "error should mention TRUSS_AZURE_CONTAINER: {err}"
5259 );
5260 unsafe {
5261 std::env::remove_var("TRUSS_STORAGE_BACKEND");
5262 }
5263 }
5264
5265 #[test]
5266 fn server_config_debug_redacts_bearer_token_and_signed_url_secret() {
5267 let mut config = ServerConfig::new(
5268 temp_dir("debug-redact"),
5269 Some("super-secret-token-12345".to_string()),
5270 );
5271 config.signed_url_key_id = Some("visible-key-id".to_string());
5272 config.signed_url_secret = Some("super-secret-hmac-key".to_string());
5273 let debug = format!("{config:?}");
5274 assert!(
5275 !debug.contains("super-secret-token-12345"),
5276 "bearer_token leaked in Debug output: {debug}"
5277 );
5278 assert!(
5279 !debug.contains("super-secret-hmac-key"),
5280 "signed_url_secret leaked in Debug output: {debug}"
5281 );
5282 assert!(
5283 debug.contains("[REDACTED]"),
5284 "expected [REDACTED] in Debug output: {debug}"
5285 );
5286 assert!(
5287 debug.contains("visible-key-id"),
5288 "signed_url_key_id should be visible: {debug}"
5289 );
5290 }
5291
5292 #[test]
5293 fn authorize_headers_accepts_correct_bearer_token() {
5294 let config = ServerConfig::new(temp_dir("auth-ok"), Some("correct-token".to_string()));
5295 let headers = vec![(
5296 "authorization".to_string(),
5297 "Bearer correct-token".to_string(),
5298 )];
5299 assert!(super::authorize_request_headers(&headers, &config).is_ok());
5300 }
5301
5302 #[test]
5303 fn authorize_headers_rejects_wrong_bearer_token() {
5304 let config = ServerConfig::new(temp_dir("auth-wrong"), Some("correct-token".to_string()));
5305 let headers = vec![(
5306 "authorization".to_string(),
5307 "Bearer wrong-token".to_string(),
5308 )];
5309 let err = super::authorize_request_headers(&headers, &config).unwrap_err();
5310 assert_eq!(err.status, "401 Unauthorized");
5311 }
5312
5313 #[test]
5314 fn authorize_headers_rejects_missing_header() {
5315 let config = ServerConfig::new(temp_dir("auth-missing"), Some("correct-token".to_string()));
5316 let headers: Vec<(String, String)> = vec![];
5317 let err = super::authorize_request_headers(&headers, &config).unwrap_err();
5318 assert_eq!(err.status, "401 Unauthorized");
5319 }
5320
5321 #[test]
5324 fn transform_slot_acquire_succeeds_under_limit() {
5325 use std::sync::Arc;
5326 use std::sync::atomic::AtomicU64;
5327
5328 let counter = Arc::new(AtomicU64::new(0));
5329 let slot = super::TransformSlot::try_acquire(&counter, 2);
5330 assert!(slot.is_some());
5331 assert_eq!(counter.load(Ordering::Relaxed), 1);
5332 }
5333
5334 #[test]
5335 fn transform_slot_acquire_returns_none_at_limit() {
5336 use std::sync::Arc;
5337 use std::sync::atomic::AtomicU64;
5338
5339 let counter = Arc::new(AtomicU64::new(0));
5340 let _s1 = super::TransformSlot::try_acquire(&counter, 1).unwrap();
5341 let s2 = super::TransformSlot::try_acquire(&counter, 1);
5342 assert!(s2.is_none());
5343 assert_eq!(counter.load(Ordering::Relaxed), 1);
5345 }
5346
5347 #[test]
5348 fn transform_slot_drop_decrements_counter() {
5349 use std::sync::Arc;
5350 use std::sync::atomic::AtomicU64;
5351
5352 let counter = Arc::new(AtomicU64::new(0));
5353 {
5354 let _slot = super::TransformSlot::try_acquire(&counter, 4).unwrap();
5355 assert_eq!(counter.load(Ordering::Relaxed), 1);
5356 }
5357 assert_eq!(counter.load(Ordering::Relaxed), 0);
5359 }
5360
5361 #[test]
5362 fn transform_slot_multiple_acquires_up_to_limit() {
5363 use std::sync::Arc;
5364 use std::sync::atomic::AtomicU64;
5365
5366 let counter = Arc::new(AtomicU64::new(0));
5367 let limit = 3u64;
5368 let mut slots = Vec::new();
5369 for _ in 0..limit {
5370 slots.push(super::TransformSlot::try_acquire(&counter, limit).unwrap());
5371 }
5372 assert_eq!(counter.load(Ordering::Relaxed), limit);
5373 assert!(super::TransformSlot::try_acquire(&counter, limit).is_none());
5375 assert_eq!(counter.load(Ordering::Relaxed), limit);
5376 slots.clear();
5378 assert_eq!(counter.load(Ordering::Relaxed), 0);
5379 }
5380
5381 #[test]
5384 fn emit_access_log_produces_json_with_expected_fields() {
5385 use std::sync::{Arc, Mutex};
5386 use std::time::Instant;
5387
5388 let captured = Arc::new(Mutex::new(String::new()));
5389 let captured_clone = Arc::clone(&captured);
5390 let handler: super::LogHandler =
5391 Arc::new(move |msg: &str| *captured_clone.lock().unwrap() = msg.to_owned());
5392
5393 let mut config = ServerConfig::new(temp_dir("access-log"), None);
5394 config.log_handler = Some(handler);
5395
5396 let start = Instant::now();
5397 super::emit_access_log(
5398 &config,
5399 &super::AccessLogEntry {
5400 request_id: "req-123",
5401 method: "GET",
5402 path: "/image.png",
5403 route: "transform",
5404 status: "200",
5405 start,
5406 cache_status: Some("hit"),
5407 watermark: false,
5408 },
5409 );
5410
5411 let output = captured.lock().unwrap().clone();
5412 let parsed: serde_json::Value = serde_json::from_str(&output).expect("valid JSON");
5413 assert_eq!(parsed["kind"], "access_log");
5414 assert_eq!(parsed["request_id"], "req-123");
5415 assert_eq!(parsed["method"], "GET");
5416 assert_eq!(parsed["path"], "/image.png");
5417 assert_eq!(parsed["route"], "transform");
5418 assert_eq!(parsed["status"], "200");
5419 assert_eq!(parsed["cache_status"], "hit");
5420 assert!(parsed["latency_ms"].is_u64());
5421 }
5422
5423 #[test]
5424 fn emit_access_log_null_cache_status_when_none() {
5425 use std::sync::{Arc, Mutex};
5426 use std::time::Instant;
5427
5428 let captured = Arc::new(Mutex::new(String::new()));
5429 let captured_clone = Arc::clone(&captured);
5430 let handler: super::LogHandler =
5431 Arc::new(move |msg: &str| *captured_clone.lock().unwrap() = msg.to_owned());
5432
5433 let mut config = ServerConfig::new(temp_dir("access-log-none"), None);
5434 config.log_handler = Some(handler);
5435
5436 super::emit_access_log(
5437 &config,
5438 &super::AccessLogEntry {
5439 request_id: "req-456",
5440 method: "POST",
5441 path: "/upload",
5442 route: "upload",
5443 status: "201",
5444 start: Instant::now(),
5445 cache_status: None,
5446 watermark: false,
5447 },
5448 );
5449
5450 let output = captured.lock().unwrap().clone();
5451 let parsed: serde_json::Value = serde_json::from_str(&output).expect("valid JSON");
5452 assert!(parsed["cache_status"].is_null());
5453 }
5454
5455 #[test]
5458 fn x_request_id_is_extracted_from_incoming_headers() {
5459 let headers = vec![
5460 ("host".to_string(), "localhost".to_string()),
5461 ("x-request-id".to_string(), "custom-id-abc".to_string()),
5462 ];
5463 assert_eq!(
5464 super::extract_request_id(&headers),
5465 Some("custom-id-abc".to_string())
5466 );
5467 }
5468
5469 #[test]
5470 fn x_request_id_not_extracted_when_empty() {
5471 let headers = vec![("x-request-id".to_string(), "".to_string())];
5472 assert!(super::extract_request_id(&headers).is_none());
5473 }
5474
5475 #[test]
5476 fn x_request_id_not_extracted_when_absent() {
5477 let headers = vec![("host".to_string(), "localhost".to_string())];
5478 assert!(super::extract_request_id(&headers).is_none());
5479 }
5480
5481 #[test]
5484 fn cache_status_hit_detected() {
5485 let headers: Vec<(String, String)> =
5486 vec![("Cache-Status".to_string(), "\"truss\"; hit".to_string())];
5487 assert_eq!(super::extract_cache_status(&headers), Some("hit"));
5488 }
5489
5490 #[test]
5491 fn cache_status_miss_detected() {
5492 let headers: Vec<(String, String)> = vec![(
5493 "Cache-Status".to_string(),
5494 "\"truss\"; fwd=miss".to_string(),
5495 )];
5496 assert_eq!(super::extract_cache_status(&headers), Some("miss"));
5497 }
5498
5499 #[test]
5500 fn cache_status_none_when_header_absent() {
5501 let headers: Vec<(String, String)> =
5502 vec![("Content-Type".to_string(), "image/png".to_string())];
5503 assert!(super::extract_cache_status(&headers).is_none());
5504 }
5505
5506 #[test]
5507 fn signing_keys_populated_by_with_signed_url_credentials() {
5508 let config = ServerConfig::new(temp_dir("signing-keys-populated"), None)
5509 .with_signed_url_credentials("key-alpha", "secret-alpha");
5510
5511 assert_eq!(
5512 config.signing_keys.get("key-alpha").map(String::as_str),
5513 Some("secret-alpha")
5514 );
5515 }
5516
5517 #[test]
5518 fn authorize_signed_request_accepts_multiple_keys() {
5519 let mut extra = HashMap::new();
5520 extra.insert("key-beta".to_string(), "secret-beta".to_string());
5521 let config = ServerConfig::new(temp_dir("multi-key-accept"), None)
5522 .with_signed_url_credentials("key-alpha", "secret-alpha")
5523 .with_signing_keys(extra);
5524
5525 let request_alpha = signed_public_request(
5527 "/images/by-path?path=%2Fimage.png&keyId=key-alpha&expires=4102444800&format=jpeg",
5528 "assets.example.com",
5529 "secret-alpha",
5530 );
5531 let query_alpha =
5532 super::auth::parse_query_params(&request_alpha).expect("parse query alpha");
5533 authorize_signed_request(&request_alpha, &query_alpha, &config)
5534 .expect("key-alpha should be accepted");
5535
5536 let request_beta = signed_public_request(
5538 "/images/by-path?path=%2Fimage.png&keyId=key-beta&expires=4102444800&format=jpeg",
5539 "assets.example.com",
5540 "secret-beta",
5541 );
5542 let query_beta = super::auth::parse_query_params(&request_beta).expect("parse query beta");
5543 authorize_signed_request(&request_beta, &query_beta, &config)
5544 .expect("key-beta should be accepted");
5545 }
5546
5547 #[test]
5548 fn authorize_signed_request_rejects_unknown_key() {
5549 let config = ServerConfig::new(temp_dir("unknown-key-reject"), None)
5550 .with_signed_url_credentials("key-alpha", "secret-alpha");
5551
5552 let request = signed_public_request(
5553 "/images/by-path?path=%2Fimage.png&keyId=key-unknown&expires=4102444800&format=jpeg",
5554 "assets.example.com",
5555 "secret-unknown",
5556 );
5557 let query = super::auth::parse_query_params(&request).expect("parse query");
5558 authorize_signed_request(&request, &query, &config)
5559 .expect_err("unknown key should be rejected");
5560 }
5561
5562 #[test]
5565 fn x_request_id_rejects_crlf_injection() {
5566 let headers = vec![(
5567 "x-request-id".to_string(),
5568 "evil\r\nX-Injected: true".to_string(),
5569 )];
5570 assert!(
5571 super::extract_request_id(&headers).is_none(),
5572 "CRLF in request ID must be rejected"
5573 );
5574 }
5575
5576 #[test]
5577 fn x_request_id_rejects_lone_cr() {
5578 let headers = vec![("x-request-id".to_string(), "evil\rid".to_string())];
5579 assert!(super::extract_request_id(&headers).is_none());
5580 }
5581
5582 #[test]
5583 fn x_request_id_rejects_lone_lf() {
5584 let headers = vec![("x-request-id".to_string(), "evil\nid".to_string())];
5585 assert!(super::extract_request_id(&headers).is_none());
5586 }
5587
5588 #[test]
5589 fn x_request_id_rejects_nul_byte() {
5590 let headers = vec![("x-request-id".to_string(), "evil\0id".to_string())];
5591 assert!(super::extract_request_id(&headers).is_none());
5592 }
5593
5594 #[test]
5595 fn x_request_id_accepts_normal_uuid() {
5596 let headers = vec![(
5597 "x-request-id".to_string(),
5598 "550e8400-e29b-41d4-a716-446655440000".to_string(),
5599 )];
5600 assert_eq!(
5601 super::extract_request_id(&headers),
5602 Some("550e8400-e29b-41d4-a716-446655440000".to_string())
5603 );
5604 }
5605
5606 #[test]
5609 fn server_config_new_has_expected_defaults() {
5610 let root = temp_dir("cfg-defaults");
5611 let config = ServerConfig::new(root.clone(), None);
5612 assert_eq!(config.storage_root, root);
5613 assert!(config.bearer_token.is_none());
5614 assert!(config.signed_url_secret.is_none());
5615 assert!(config.signing_keys.is_empty());
5616 assert!(config.presets.read().unwrap().is_empty());
5617 assert_eq!(
5618 config.max_concurrent_transforms,
5619 DEFAULT_MAX_CONCURRENT_TRANSFORMS
5620 );
5621 assert_eq!(
5622 config.public_max_age_seconds,
5623 DEFAULT_PUBLIC_MAX_AGE_SECONDS
5624 );
5625 assert_eq!(
5626 config.public_stale_while_revalidate_seconds,
5627 DEFAULT_PUBLIC_STALE_WHILE_REVALIDATE_SECONDS
5628 );
5629 assert!(!config.allow_insecure_url_sources);
5630 }
5631
5632 #[test]
5633 fn server_config_builder_with_signed_url_credentials_overwrites() {
5634 let root = temp_dir("cfg-builder");
5635 let config = ServerConfig::new(root, None)
5636 .with_signed_url_credentials("key1", "secret1")
5637 .with_signed_url_credentials("key2", "secret2");
5638 assert!(config.signing_keys.contains_key("key1"));
5639 assert!(config.signing_keys.contains_key("key2"));
5640 }
5641
5642 #[test]
5645 fn route_request_returns_not_found_for_unknown_path() {
5646 let root = temp_dir("route-unknown");
5647 let config = ServerConfig::new(root, None);
5648 let request = HttpRequest {
5649 method: "GET".to_string(),
5650 target: "/nonexistent".to_string(),
5651 version: "HTTP/1.1".to_string(),
5652 headers: vec![("host".to_string(), "localhost".to_string())],
5653 body: Vec::new(),
5654 };
5655 let response = route_request(request, &config);
5656 assert_eq!(response.status, "404 Not Found");
5657 }
5658
5659 #[test]
5660 fn route_request_health_returns_200() {
5661 let root = temp_dir("route-health");
5662 let config = ServerConfig::new(root, None);
5663 let request = HttpRequest {
5664 method: "GET".to_string(),
5665 target: "/health".to_string(),
5666 version: "HTTP/1.1".to_string(),
5667 headers: vec![("host".to_string(), "localhost".to_string())],
5668 body: Vec::new(),
5669 };
5670 let response = route_request(request, &config);
5671 assert_eq!(response.status, "200 OK");
5672 }
5673
5674 #[test]
5677 fn transform_slot_concurrent_acquire_respects_limit() {
5678 use std::sync::Arc;
5679 use std::sync::Barrier;
5680 use std::sync::atomic::AtomicU64;
5681
5682 let counter = Arc::new(AtomicU64::new(0));
5683 let limit = 4u64;
5684 let num_threads = 16;
5685 let barrier = Arc::new(Barrier::new(num_threads));
5686 let acquired = Arc::new(AtomicU64::new(0));
5687
5688 let handles: Vec<_> = (0..num_threads)
5689 .map(|_| {
5690 let counter = Arc::clone(&counter);
5691 let barrier = Arc::clone(&barrier);
5692 let acquired = Arc::clone(&acquired);
5693 thread::spawn(move || {
5694 barrier.wait();
5695 if let Some(_slot) = super::TransformSlot::try_acquire(&counter, limit) {
5696 acquired.fetch_add(1, Ordering::Relaxed);
5697 thread::sleep(Duration::from_millis(10));
5698 }
5699 })
5700 })
5701 .collect();
5702
5703 for h in handles {
5704 h.join().unwrap();
5705 }
5706 assert_eq!(counter.load(Ordering::Relaxed), 0);
5707 }
5708
5709 #[test]
5710 #[serial]
5711 fn test_max_input_pixels_default() {
5712 unsafe {
5713 std::env::remove_var("TRUSS_MAX_INPUT_PIXELS");
5714 }
5715 let config = ServerConfig::from_env().unwrap();
5716 assert_eq!(config.max_input_pixels, 40_000_000);
5717 }
5718
5719 #[test]
5720 #[serial]
5721 fn test_max_input_pixels_custom() {
5722 unsafe {
5723 std::env::set_var("TRUSS_MAX_INPUT_PIXELS", "10000000");
5724 }
5725 let config = ServerConfig::from_env().unwrap();
5726 assert_eq!(config.max_input_pixels, 10_000_000);
5727 unsafe {
5728 std::env::remove_var("TRUSS_MAX_INPUT_PIXELS");
5729 }
5730 }
5731
5732 #[test]
5733 #[serial]
5734 fn test_max_input_pixels_min_boundary() {
5735 unsafe {
5736 std::env::set_var("TRUSS_MAX_INPUT_PIXELS", "1");
5737 }
5738 let config = ServerConfig::from_env().unwrap();
5739 assert_eq!(config.max_input_pixels, 1);
5740 unsafe {
5741 std::env::remove_var("TRUSS_MAX_INPUT_PIXELS");
5742 }
5743 }
5744
5745 #[test]
5746 #[serial]
5747 fn test_max_input_pixels_max_boundary() {
5748 unsafe {
5749 std::env::set_var("TRUSS_MAX_INPUT_PIXELS", "100000000");
5750 }
5751 let config = ServerConfig::from_env().unwrap();
5752 assert_eq!(config.max_input_pixels, 100_000_000);
5753 unsafe {
5754 std::env::remove_var("TRUSS_MAX_INPUT_PIXELS");
5755 }
5756 }
5757
5758 #[test]
5759 #[serial]
5760 fn test_max_input_pixels_empty_uses_default() {
5761 unsafe {
5762 std::env::set_var("TRUSS_MAX_INPUT_PIXELS", "");
5763 }
5764 let config = ServerConfig::from_env().unwrap();
5765 assert_eq!(config.max_input_pixels, 40_000_000);
5766 unsafe {
5767 std::env::remove_var("TRUSS_MAX_INPUT_PIXELS");
5768 }
5769 }
5770
5771 #[test]
5772 #[serial]
5773 fn test_max_input_pixels_zero_rejected() {
5774 unsafe {
5775 std::env::set_var("TRUSS_MAX_INPUT_PIXELS", "0");
5776 }
5777 let err = ServerConfig::from_env().unwrap_err();
5778 assert!(err.to_string().contains("TRUSS_MAX_INPUT_PIXELS"));
5779 unsafe {
5780 std::env::remove_var("TRUSS_MAX_INPUT_PIXELS");
5781 }
5782 }
5783
5784 #[test]
5785 #[serial]
5786 fn test_max_input_pixels_over_max_rejected() {
5787 unsafe {
5788 std::env::set_var("TRUSS_MAX_INPUT_PIXELS", "100000001");
5789 }
5790 let err = ServerConfig::from_env().unwrap_err();
5791 assert!(err.to_string().contains("TRUSS_MAX_INPUT_PIXELS"));
5792 unsafe {
5793 std::env::remove_var("TRUSS_MAX_INPUT_PIXELS");
5794 }
5795 }
5796
5797 #[test]
5798 #[serial]
5799 fn test_max_input_pixels_non_numeric_rejected() {
5800 unsafe {
5801 std::env::set_var("TRUSS_MAX_INPUT_PIXELS", "abc");
5802 }
5803 let err = ServerConfig::from_env().unwrap_err();
5804 assert!(err.to_string().contains("TRUSS_MAX_INPUT_PIXELS"));
5805 unsafe {
5806 std::env::remove_var("TRUSS_MAX_INPUT_PIXELS");
5807 }
5808 }
5809
5810 #[test]
5811 #[serial]
5812 fn test_max_upload_bytes_default() {
5813 unsafe {
5814 std::env::remove_var("TRUSS_MAX_UPLOAD_BYTES");
5815 }
5816 let config = ServerConfig::from_env().unwrap();
5817 assert_eq!(config.max_upload_bytes, 100 * 1024 * 1024);
5818 }
5819
5820 #[test]
5821 #[serial]
5822 fn test_max_upload_bytes_custom() {
5823 unsafe {
5824 std::env::set_var("TRUSS_MAX_UPLOAD_BYTES", "5242880");
5825 }
5826 let config = ServerConfig::from_env().unwrap();
5827 assert_eq!(config.max_upload_bytes, 5 * 1024 * 1024);
5828 unsafe {
5829 std::env::remove_var("TRUSS_MAX_UPLOAD_BYTES");
5830 }
5831 }
5832
5833 #[test]
5834 #[serial]
5835 fn test_max_upload_bytes_zero_rejected() {
5836 unsafe {
5837 std::env::set_var("TRUSS_MAX_UPLOAD_BYTES", "0");
5838 }
5839 let err = ServerConfig::from_env().unwrap_err();
5840 assert!(err.to_string().contains("TRUSS_MAX_UPLOAD_BYTES"));
5841 unsafe {
5842 std::env::remove_var("TRUSS_MAX_UPLOAD_BYTES");
5843 }
5844 }
5845
5846 #[test]
5847 #[serial]
5848 fn test_max_upload_bytes_non_numeric_rejected() {
5849 unsafe {
5850 std::env::set_var("TRUSS_MAX_UPLOAD_BYTES", "abc");
5851 }
5852 let err = ServerConfig::from_env().unwrap_err();
5853 assert!(err.to_string().contains("TRUSS_MAX_UPLOAD_BYTES"));
5854 unsafe {
5855 std::env::remove_var("TRUSS_MAX_UPLOAD_BYTES");
5856 }
5857 }
5858
5859 #[test]
5860 fn max_body_for_multipart_uses_custom_upload_limit() {
5861 let headers = vec![(
5862 "content-type".to_string(),
5863 "multipart/form-data; boundary=abc".to_string(),
5864 )];
5865 let custom_limit = 5 * 1024 * 1024;
5866 assert_eq!(
5867 super::http_parse::max_body_for_headers(&headers, custom_limit),
5868 custom_limit
5869 );
5870 }
5871
5872 #[test]
5873 fn health_includes_max_input_pixels() {
5874 let storage = temp_dir("health-pixels");
5875 let request = HttpRequest {
5876 method: "GET".to_string(),
5877 target: "/health".to_string(),
5878 version: "HTTP/1.1".to_string(),
5879 headers: Vec::new(),
5880 body: Vec::new(),
5881 };
5882
5883 let config = ServerConfig::new(storage, None);
5884 let response = route_request(request, &config);
5885
5886 assert_eq!(response.status, "200 OK");
5887 let body: serde_json::Value =
5888 serde_json::from_slice(&response.body).expect("parse health body");
5889 assert_eq!(body["maxInputPixels"], 40_000_000);
5890 }
5891
5892 #[test]
5893 fn health_includes_transform_capacity_details() {
5894 let storage = temp_dir("health-capacity");
5895 let config = ServerConfig::new(storage, None);
5896 let response = route_request(
5897 HttpRequest {
5898 method: "GET".to_string(),
5899 target: "/health".to_string(),
5900 version: "HTTP/1.1".to_string(),
5901 headers: Vec::new(),
5902 body: Vec::new(),
5903 },
5904 &config,
5905 );
5906 let body: serde_json::Value =
5907 serde_json::from_slice(&response.body).expect("parse health body");
5908 let checks = body["checks"].as_array().expect("checks array");
5909 let capacity = checks
5910 .iter()
5911 .find(|c| c["name"] == "transformCapacity")
5912 .expect("transformCapacity check");
5913 assert_eq!(capacity["current"], 0);
5914 assert_eq!(capacity["max"], 64);
5915 }
5916
5917 #[cfg(target_os = "linux")]
5918 #[test]
5919 fn process_rss_bytes_returns_some() {
5920 let rss = super::process_rss_bytes();
5921 assert!(rss.is_some());
5922 assert!(rss.unwrap() > 0);
5923 }
5924
5925 #[cfg(target_os = "linux")]
5926 #[test]
5927 fn disk_free_bytes_returns_some_for_existing_dir() {
5928 let dir = temp_dir("disk-free");
5929 let free = super::disk_free_bytes(&dir);
5930 assert!(free.is_some());
5931 assert!(free.unwrap() > 0);
5932 }
5933
5934 #[test]
5935 fn health_ready_returns_503_when_memory_exceeded() {
5936 let storage = temp_dir("health-mem");
5937 let mut config = ServerConfig::new(storage, None);
5938 config.health_max_memory_bytes = Some(1);
5940 let response = route_request(
5941 HttpRequest {
5942 method: "GET".to_string(),
5943 target: "/health/ready".to_string(),
5944 version: "HTTP/1.1".to_string(),
5945 headers: Vec::new(),
5946 body: Vec::new(),
5947 },
5948 &config,
5949 );
5950 if cfg!(target_os = "linux") {
5953 assert_eq!(response.status, "503 Service Unavailable");
5954 }
5955 }
5956
5957 #[test]
5958 fn health_includes_memory_usage_on_linux() {
5959 let storage = temp_dir("health-mem-report");
5960 let mut config = ServerConfig::new(storage, None);
5961 config.health_max_memory_bytes = Some(u64::MAX);
5962 let response = route_request(
5963 HttpRequest {
5964 method: "GET".to_string(),
5965 target: "/health".to_string(),
5966 version: "HTTP/1.1".to_string(),
5967 headers: Vec::new(),
5968 body: Vec::new(),
5969 },
5970 &config,
5971 );
5972 let body: serde_json::Value =
5973 serde_json::from_slice(&response.body).expect("parse health body");
5974 if cfg!(target_os = "linux") {
5975 let checks = body["checks"].as_array().expect("checks array");
5976 let mem = checks
5977 .iter()
5978 .find(|c| c["name"] == "memoryUsage")
5979 .expect("memoryUsage check");
5980 assert_eq!(mem["status"], "ok");
5981 assert!(mem["rssBytes"].as_u64().unwrap() > 0);
5982 }
5983 }
5984
5985 #[cfg(target_os = "linux")]
5986 #[test]
5987 fn disk_free_bytes_returns_none_for_nonexistent_path() {
5988 let free = super::disk_free_bytes(std::path::Path::new("/nonexistent/path/xyz"));
5989 assert!(free.is_none());
5990 }
5991
5992 #[test]
5993 fn health_ready_503_body_contains_fail_status() {
5994 let storage = temp_dir("health-ready-body");
5995 std::fs::remove_dir_all(&storage).ok();
5996 let config = ServerConfig::new(storage, None);
5997 let response = route_request(
5998 HttpRequest {
5999 method: "GET".to_string(),
6000 target: "/health/ready".to_string(),
6001 version: "HTTP/1.1".to_string(),
6002 headers: Vec::new(),
6003 body: Vec::new(),
6004 },
6005 &config,
6006 );
6007 assert_eq!(response.status, "503 Service Unavailable");
6008 let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
6009 assert_eq!(body["status"], "fail");
6010 let checks = body["checks"].as_array().expect("checks array");
6011 let storage_check = checks
6012 .iter()
6013 .find(|c| c["name"] == "storageRoot")
6014 .expect("storageRoot check");
6015 assert_eq!(storage_check["status"], "fail");
6016 }
6017
6018 #[test]
6019 fn health_ready_cache_disk_free_shown_when_cache_root_set() {
6020 let storage = temp_dir("health-ready-cache-disk");
6021 let cache = temp_dir("health-ready-cache-disk-cache");
6022 let mut config = ServerConfig::new(storage, None).with_cache_root(cache);
6023 config.health_cache_min_free_bytes = Some(1);
6024 let response = route_request(
6025 HttpRequest {
6026 method: "GET".to_string(),
6027 target: "/health/ready".to_string(),
6028 version: "HTTP/1.1".to_string(),
6029 headers: Vec::new(),
6030 body: Vec::new(),
6031 },
6032 &config,
6033 );
6034 let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
6035 let checks = body["checks"].as_array().expect("checks array");
6036 let disk_check = checks
6037 .iter()
6038 .find(|c| c["name"] == "cacheDiskFree")
6039 .expect("cacheDiskFree check");
6040 assert_eq!(disk_check["status"], "ok");
6041 if cfg!(target_os = "linux") {
6042 assert!(disk_check["freeBytes"].as_u64().is_some());
6043 }
6044 assert_eq!(disk_check["thresholdBytes"], 1);
6045 }
6046
6047 #[test]
6048 fn health_ready_no_cache_disk_free_without_cache_root() {
6049 let storage = temp_dir("health-ready-no-cache");
6050 let config = ServerConfig::new(storage, None);
6051 let response = route_request(
6052 HttpRequest {
6053 method: "GET".to_string(),
6054 target: "/health/ready".to_string(),
6055 version: "HTTP/1.1".to_string(),
6056 headers: Vec::new(),
6057 body: Vec::new(),
6058 },
6059 &config,
6060 );
6061 let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
6062 let checks = body["checks"].as_array().expect("checks array");
6063 assert!(
6064 checks.iter().all(|c| c["name"] != "cacheDiskFree"),
6065 "cacheDiskFree should not appear without cache_root"
6066 );
6067 }
6068
6069 #[test]
6070 fn health_ready_memory_check_includes_details() {
6071 let storage = temp_dir("health-ready-mem-detail");
6072 let mut config = ServerConfig::new(storage, None);
6073 config.health_max_memory_bytes = Some(u64::MAX);
6074 let response = route_request(
6075 HttpRequest {
6076 method: "GET".to_string(),
6077 target: "/health/ready".to_string(),
6078 version: "HTTP/1.1".to_string(),
6079 headers: Vec::new(),
6080 body: Vec::new(),
6081 },
6082 &config,
6083 );
6084 let body: serde_json::Value = serde_json::from_slice(&response.body).expect("parse body");
6085 let checks = body["checks"].as_array().expect("checks array");
6086 let mem = checks.iter().find(|c| c["name"] == "memoryUsage");
6087 if cfg!(target_os = "linux") {
6088 let mem = mem.expect("memoryUsage check present on Linux");
6089 assert_eq!(mem["status"], "ok");
6090 assert_eq!(mem["thresholdBytes"], u64::MAX);
6091 assert!(mem["rssBytes"].as_u64().is_some());
6092 } else {
6093 assert!(mem.is_none(), "memoryUsage should be absent on non-Linux");
6094 }
6095 }
6096
6097 #[test]
6100 fn health_ready_returns_503_when_draining() {
6101 let storage = temp_dir("health-ready-draining");
6102 let config = ServerConfig::new(storage, None);
6103 config.draining.store(true, Ordering::Relaxed);
6104
6105 let response = route_request(
6106 HttpRequest {
6107 method: "GET".to_string(),
6108 target: "/health/ready".to_string(),
6109 version: "HTTP/1.1".to_string(),
6110 headers: Vec::new(),
6111 body: Vec::new(),
6112 },
6113 &config,
6114 );
6115
6116 assert_eq!(response.status, "503 Service Unavailable");
6117 let body: serde_json::Value =
6118 serde_json::from_slice(&response.body).expect("parse ready body");
6119 assert_eq!(body["status"], "fail");
6120 let checks = body["checks"].as_array().expect("checks array");
6121 assert!(
6122 checks
6123 .iter()
6124 .any(|c| c["name"] == "draining" && c["status"] == "fail")
6125 );
6126 }
6127
6128 #[test]
6129 fn health_ready_returns_ok_when_not_draining() {
6130 let storage = temp_dir("health-ready-not-draining");
6131 let config = ServerConfig::new(storage, None);
6132 let response = route_request(
6134 HttpRequest {
6135 method: "GET".to_string(),
6136 target: "/health/ready".to_string(),
6137 version: "HTTP/1.1".to_string(),
6138 headers: Vec::new(),
6139 body: Vec::new(),
6140 },
6141 &config,
6142 );
6143
6144 assert_eq!(response.status, "200 OK");
6145 let body: serde_json::Value =
6146 serde_json::from_slice(&response.body).expect("parse ready body");
6147 assert_eq!(body["status"], "ok");
6148 let checks = body["checks"].as_array().expect("checks array");
6150 assert!(!checks.iter().any(|c| c["name"] == "draining"));
6151 }
6152
6153 #[test]
6156 fn health_live_returns_200_while_draining() {
6157 let storage = temp_dir("live-draining");
6158 let config = ServerConfig::new(storage, None);
6159 config.draining.store(true, Ordering::Relaxed);
6160
6161 let response = route_request(
6162 HttpRequest {
6163 method: "GET".to_string(),
6164 target: "/health/live".to_string(),
6165 version: "HTTP/1.1".to_string(),
6166 headers: Vec::new(),
6167 body: Vec::new(),
6168 },
6169 &config,
6170 );
6171
6172 assert_eq!(response.status, "200 OK");
6175 }
6176
6177 #[test]
6178 fn normal_request_processed_while_draining() {
6179 let storage = temp_dir("normal-draining");
6180 let config = ServerConfig::new(storage, None);
6181 config.draining.store(true, Ordering::Relaxed);
6182
6183 let response = route_request(
6187 HttpRequest {
6188 method: "GET".to_string(),
6189 target: "/nonexistent".to_string(),
6190 version: "HTTP/1.1".to_string(),
6191 headers: Vec::new(),
6192 body: Vec::new(),
6193 },
6194 &config,
6195 );
6196
6197 assert_eq!(response.status, "404 Not Found");
6199 }
6200
6201 #[test]
6204 fn preset_watcher_reloads_on_file_change() {
6205 use std::sync::Arc;
6206 use std::sync::atomic::{AtomicBool, Ordering};
6207
6208 let dir = std::env::temp_dir().join(format!(
6209 "truss_test_watcher_{}",
6210 std::time::SystemTime::UNIX_EPOCH
6211 .elapsed()
6212 .unwrap()
6213 .as_nanos()
6214 ));
6215 std::fs::create_dir_all(&dir).unwrap();
6216 let path = dir.join("presets.json");
6217 std::fs::write(&path, r#"{"thumb":{"width":100}}"#).unwrap();
6218
6219 let presets = Arc::new(std::sync::RwLock::new({
6220 let mut m = std::collections::HashMap::new();
6221 m.insert(
6222 "thumb".to_string(),
6223 TransformOptionsPayload {
6224 width: Some(100),
6225 height: None,
6226 fit: None,
6227 position: None,
6228 format: None,
6229 quality: None,
6230 background: None,
6231 rotate: None,
6232 auto_orient: None,
6233 strip_metadata: None,
6234 preserve_exif: None,
6235 crop: None,
6236 blur: None,
6237 sharpen: None,
6238 },
6239 );
6240 m
6241 }));
6242 let draining = Arc::new(AtomicBool::new(false));
6243 let config = Arc::new(ServerConfig::new(dir.clone(), None));
6244
6245 let presets_clone = Arc::clone(&presets);
6246 let draining_clone = Arc::clone(&draining);
6247 let config_clone = Arc::clone(&config);
6248 let path_clone = path.clone();
6249
6250 let handle = std::thread::spawn(move || {
6251 super::preset_watcher(presets_clone, path_clone, draining_clone, config_clone);
6252 });
6253
6254 std::thread::sleep(std::time::Duration::from_millis(100));
6256 std::thread::sleep(std::time::Duration::from_secs(1));
6258 std::fs::write(&path, r#"{"thumb":{"width":200},"banner":{"width":800}}"#).unwrap();
6259
6260 std::thread::sleep(std::time::Duration::from_secs(6));
6262
6263 {
6265 let p = presets.read().unwrap();
6266 assert_eq!(p.len(), 2, "expected 2 presets after reload");
6267 assert_eq!(p["thumb"].width, Some(200));
6268 assert_eq!(p["banner"].width, Some(800));
6269 }
6270
6271 draining.store(true, Ordering::Relaxed);
6273 handle.join().unwrap();
6274
6275 std::fs::remove_dir_all(&dir).unwrap();
6276 }
6277
6278 #[test]
6279 fn preset_watcher_keeps_old_presets_on_invalid_file() {
6280 use std::sync::Arc;
6281 use std::sync::atomic::{AtomicBool, Ordering};
6282
6283 let dir = std::env::temp_dir().join(format!(
6284 "truss_test_watcher_invalid_{}",
6285 std::time::SystemTime::UNIX_EPOCH
6286 .elapsed()
6287 .unwrap()
6288 .as_nanos()
6289 ));
6290 std::fs::create_dir_all(&dir).unwrap();
6291 let path = dir.join("presets.json");
6292 std::fs::write(&path, r#"{"thumb":{"width":100}}"#).unwrap();
6293
6294 let presets = Arc::new(std::sync::RwLock::new({
6295 let mut m = std::collections::HashMap::new();
6296 m.insert(
6297 "thumb".to_string(),
6298 TransformOptionsPayload {
6299 width: Some(100),
6300 height: None,
6301 fit: None,
6302 position: None,
6303 format: None,
6304 quality: None,
6305 background: None,
6306 rotate: None,
6307 auto_orient: None,
6308 strip_metadata: None,
6309 preserve_exif: None,
6310 crop: None,
6311 blur: None,
6312 sharpen: None,
6313 },
6314 );
6315 m
6316 }));
6317 let draining = Arc::new(AtomicBool::new(false));
6318 let config = Arc::new(ServerConfig::new(dir.clone(), None));
6319
6320 let presets_clone = Arc::clone(&presets);
6321 let draining_clone = Arc::clone(&draining);
6322 let config_clone = Arc::clone(&config);
6323 let path_clone = path.clone();
6324
6325 let handle = std::thread::spawn(move || {
6326 super::preset_watcher(presets_clone, path_clone, draining_clone, config_clone);
6327 });
6328
6329 std::thread::sleep(std::time::Duration::from_millis(100));
6331 std::thread::sleep(std::time::Duration::from_secs(1));
6332 std::fs::write(&path, "invalid json!!!").unwrap();
6333
6334 std::thread::sleep(std::time::Duration::from_secs(6));
6336
6337 {
6339 let p = presets.read().unwrap();
6340 assert_eq!(p.len(), 1, "presets should not change on invalid file");
6341 assert_eq!(p["thumb"].width, Some(100));
6342 }
6343
6344 draining.store(true, Ordering::Relaxed);
6345 handle.join().unwrap();
6346
6347 std::fs::remove_dir_all(&dir).unwrap();
6348 }
6349}