use async_trait::async_trait;
use bytes::Bytes;
use sha2::{Digest, Sha256};
use std::sync::Arc;
use std::time::Duration;
pub mod config;
pub mod data_uri;
pub mod fetcher;
pub mod ip_filter;
pub mod store;
pub mod token;
pub mod walker;
pub use config::{BackendConfig, FetcherConfig, ImageNormalizerConfig, SigningConfig};
pub use store::{ImageStore, MemoryStore, SignedImageUrl, StoreError};
pub use token::{ImageToken, TokenParseError};
pub use walker::Mode;
#[derive(Debug, Clone)]
pub enum ImageInput {
HttpUrl(String),
DataUri(String),
}
#[derive(Debug, thiserror::Error)]
pub enum NormalizeError {
#[error("bad input: {0}")]
BadInput(String),
#[error("fetch failed: {0}")]
FetchFailed(String),
#[error("transient failure: {0}")]
Transient(String),
#[error("store failed: {0}")]
StoreFailed(String),
#[error("token not found in store")]
NotFound,
}
impl From<fetcher::FetchError> for NormalizeError {
fn from(e: fetcher::FetchError) -> Self {
match e {
fetcher::FetchError::BadInput(m) => NormalizeError::BadInput(m),
fetcher::FetchError::FetchFailed(m) => NormalizeError::FetchFailed(m),
fetcher::FetchError::Transient(m) => NormalizeError::Transient(m),
}
}
}
impl From<data_uri::DataUriError> for NormalizeError {
fn from(e: data_uri::DataUriError) -> Self {
NormalizeError::BadInput(e.to_string())
}
}
impl From<StoreError> for NormalizeError {
fn from(e: StoreError) -> Self {
match e {
StoreError::NotFound => NormalizeError::NotFound,
StoreError::Backend(m) => NormalizeError::StoreFailed(m),
StoreError::Unimplemented => NormalizeError::StoreFailed("backend not implemented".into()),
}
}
}
#[derive(Debug, Clone)]
pub struct IngestResult {
pub token: ImageToken,
pub mime: String,
pub bytes_len: u64,
}
#[async_trait]
pub trait ImageNormalizer: Send + Sync {
async fn ingest(&self, input: ImageInput) -> Result<IngestResult, NormalizeError>;
async fn sign(&self, token: ImageToken, ttl: Duration) -> Result<SignedImageUrl, NormalizeError>;
async fn read(&self, token: ImageToken) -> Result<(String, Bytes), NormalizeError>;
}
pub struct DisabledNormalizer;
#[async_trait]
impl ImageNormalizer for DisabledNormalizer {
async fn ingest(&self, _input: ImageInput) -> Result<IngestResult, NormalizeError> {
Err(NormalizeError::BadInput("image normalisation is disabled".into()))
}
async fn sign(&self, _token: ImageToken, _ttl: Duration) -> Result<SignedImageUrl, NormalizeError> {
Err(NormalizeError::BadInput("image normalisation is disabled".into()))
}
async fn read(&self, _token: ImageToken) -> Result<(String, Bytes), NormalizeError> {
Err(NormalizeError::BadInput("image normalisation is disabled".into()))
}
}
pub struct DefaultImageNormalizer<S: ImageStore> {
fetcher: fetcher::ImageFetcher,
store: Arc<S>,
}
impl<S: ImageStore> DefaultImageNormalizer<S> {
pub fn new(fetcher_cfg: FetcherConfig, store: Arc<S>) -> Self {
Self {
fetcher: fetcher::ImageFetcher::new(fetcher_cfg),
store,
}
}
}
#[async_trait]
impl<S: ImageStore + 'static> ImageNormalizer for DefaultImageNormalizer<S> {
async fn ingest(&self, input: ImageInput) -> Result<IngestResult, NormalizeError> {
let (mime, bytes) = match input {
ImageInput::HttpUrl(url) => {
let fetched = self.fetcher.fetch(&url).await?;
(fetched.mime, fetched.bytes)
}
ImageInput::DataUri(uri) => {
let decoded = data_uri::parse(&uri)?;
let len = decoded.bytes.len() as u64;
if len > self.fetcher.max_bytes() {
return Err(NormalizeError::BadInput(format!(
"data: URI payload {len} bytes exceeds cap {}",
self.fetcher.max_bytes()
)));
}
if !self.fetcher.mime_allowed(&decoded.mime) {
return Err(NormalizeError::BadInput(format!("mime not allowed: {}", decoded.mime)));
}
(decoded.mime, Bytes::from(decoded.bytes))
}
};
let bytes_len = bytes.len() as u64;
let mut hasher = Sha256::new();
hasher.update(&bytes);
let digest = hasher.finalize();
let mut sha = [0u8; 32];
sha.copy_from_slice(&digest);
let token = ImageToken(sha);
if !self.store.exists(token).await? {
self.store.put(token, &mime, bytes).await?;
}
Ok(IngestResult { token, mime, bytes_len })
}
async fn sign(&self, token: ImageToken, ttl: Duration) -> Result<SignedImageUrl, NormalizeError> {
Ok(self.store.sign(token, ttl).await?)
}
async fn read(&self, token: ImageToken) -> Result<(String, Bytes), NormalizeError> {
Ok(self.store.read(token).await?)
}
}
pub fn from_config(cfg: &ImageNormalizerConfig) -> Result<Arc<dyn ImageNormalizer>, anyhow::Error> {
if !cfg.enabled {
return Ok(Arc::new(DisabledNormalizer));
}
let backend = cfg
.backend
.as_ref()
.ok_or_else(|| anyhow::anyhow!("image_normalizer.enabled = true but image_normalizer.backend is not set"))?;
Ok(match backend {
BackendConfig::Memory => {
tracing::warn!(
"image_normalizer enabled with the in-memory backend: stored image bytes are lost on \
restart and are not shared across replicas — use gcs or s3_compatible in production"
);
let store = Arc::new(MemoryStore::new());
Arc::new(DefaultImageNormalizer::new(cfg.fetcher.clone(), store))
}
BackendConfig::Gcs { bucket, region } => {
let store = Arc::new(store::GcsStore::new(bucket.clone(), region.clone()));
Arc::new(DefaultImageNormalizer::new(cfg.fetcher.clone(), store))
}
BackendConfig::S3Compatible {
bucket,
endpoint_url,
region,
force_path_style,
} => {
let access_key_id = std::env::var("IMAGE_NORMALIZER_S3_ACCESS_KEY_ID").map_err(|_| {
anyhow::anyhow!(
"image_normalizer.backend.type = s3_compatible requires the \
IMAGE_NORMALIZER_S3_ACCESS_KEY_ID environment variable"
)
})?;
let secret_access_key = std::env::var("IMAGE_NORMALIZER_S3_SECRET_ACCESS_KEY").map_err(|_| {
anyhow::anyhow!(
"image_normalizer.backend.type = s3_compatible requires the \
IMAGE_NORMALIZER_S3_SECRET_ACCESS_KEY environment variable"
)
})?;
let store = Arc::new(store::S3CompatStore::new(
bucket.clone(),
endpoint_url.clone(),
region.clone(),
*force_path_style,
access_key_id,
secret_access_key,
));
Arc::new(DefaultImageNormalizer::new(cfg.fetcher.clone(), store))
}
})
}
#[cfg(test)]
mod tests {
use super::*;
const TINY_PNG_DATA_URI: &str =
"data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mNkYAAAAAYAAjCB0C8AAAAASUVORK5CYII=";
#[tokio::test]
async fn ingest_data_uri_then_sign_round_trip() {
let store = Arc::new(MemoryStore::new());
let n = DefaultImageNormalizer::new(FetcherConfig::default(), store.clone());
let result = n.ingest(ImageInput::DataUri(TINY_PNG_DATA_URI.to_string())).await.unwrap();
let token = result.token;
assert_eq!(result.mime, "image/png");
assert!(result.bytes_len > 0, "bytes_len should be the actual decoded length, got 0");
let result_again = n.ingest(ImageInput::DataUri(TINY_PNG_DATA_URI.to_string())).await.unwrap();
assert_eq!(token, result_again.token);
assert_eq!(result.bytes_len, result_again.bytes_len);
let signed = n.sign(token, Duration::from_secs(60)).await.unwrap();
assert!(signed.url.contains(&token.to_hex()));
let (mime, bytes) = n.read(token).await.unwrap();
assert_eq!(mime, "image/png");
assert_eq!(&bytes[..8], b"\x89PNG\r\n\x1a\n");
}
#[tokio::test]
async fn ingest_bad_data_uri_returns_bad_input() {
let store = Arc::new(MemoryStore::new());
let n = DefaultImageNormalizer::new(FetcherConfig::default(), store);
let err = n.ingest(ImageInput::DataUri("data:image/png,raw".to_string())).await.unwrap_err();
assert!(matches!(err, NormalizeError::BadInput(_)), "got {err:?}");
}
#[tokio::test]
async fn ingest_oversized_data_uri_is_rejected() {
let store = Arc::new(MemoryStore::new());
let cfg = FetcherConfig {
max_bytes: 8,
..FetcherConfig::default()
};
let n = DefaultImageNormalizer::new(cfg, store);
let err = n.ingest(ImageInput::DataUri(TINY_PNG_DATA_URI.to_string())).await.unwrap_err();
assert!(matches!(err, NormalizeError::BadInput(_)), "got {err:?}");
}
#[tokio::test]
async fn ingest_data_uri_with_disallowed_mime_is_rejected() {
let store = Arc::new(MemoryStore::new());
let n = DefaultImageNormalizer::new(FetcherConfig::default(), store);
let err = n
.ingest(ImageInput::DataUri("data:text/html;base64,PGgxPmhpPC9oMT4=".to_string()))
.await
.unwrap_err();
assert!(matches!(err, NormalizeError::BadInput(_)), "got {err:?}");
}
#[tokio::test]
async fn disabled_normalizer_errors_predictably() {
let n = DisabledNormalizer;
let err = n.ingest(ImageInput::DataUri(TINY_PNG_DATA_URI.to_string())).await.unwrap_err();
assert!(matches!(err, NormalizeError::BadInput(_)));
}
#[test]
fn from_config_disabled_returns_disabled_normalizer() {
let cfg = ImageNormalizerConfig::default();
let _: Arc<dyn ImageNormalizer> = from_config(&cfg).expect("disabled config must build cleanly");
}
#[test]
fn from_config_memory_backend_when_enabled() {
let cfg = ImageNormalizerConfig {
enabled: true,
backend: Some(BackendConfig::Memory),
fetcher: FetcherConfig::default(),
signing: SigningConfig::default(),
};
let _: Arc<dyn ImageNormalizer> = from_config(&cfg).expect("memory backend must build");
}
#[test]
fn from_config_enabled_without_backend_errors() {
let cfg = ImageNormalizerConfig {
enabled: true,
backend: None,
fetcher: FetcherConfig::default(),
signing: SigningConfig::default(),
};
match from_config(&cfg) {
Ok(_) => panic!("enabled + no backend must error"),
Err(e) => {
let msg = e.to_string();
assert!(msg.contains("backend"), "error should mention 'backend': {msg}");
}
}
}
}