use oximg::pipeline;
use std::collections::HashMap;
use std::path::PathBuf;
#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
use std::sync::{Arc, Mutex};
use axum::Router;
use axum::body::Bytes;
use axum::extract::{Path, State};
use axum::http::{StatusCode, header};
use axum::response::IntoResponse;
use axum::routing::get;
use tokio::sync::{Semaphore, watch};
type FlightKey = (u32, u32, String);
type FlightResult = Result<Bytes, (StatusCode, String)>;
type FlightMap = Mutex<HashMap<FlightKey, watch::Receiver<Option<FlightResult>>>>;
#[derive(Clone)]
struct App {
images_dir: Arc<PathBuf>,
cpu_slots: Arc<Semaphore>,
quality: f32,
fast_preset: bool,
resize_threads: usize,
inflight: Arc<FlightMap>,
}
fn env_or<T: std::str::FromStr>(key: &str, default: T) -> T {
std::env::var(key)
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(default)
}
fn main() -> anyhow::Result<()> {
let workers = std::thread::available_parallelism()?.get();
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.max_blocking_threads(workers + 4)
.build()?
.block_on(async_main(workers))
}
async fn async_main(workers: usize) -> anyhow::Result<()> {
let port: u16 = env_or("PORT", 8081);
let images_dir =
PathBuf::from(std::env::var("IMAGES_DIR").unwrap_or_else(|_| "./images".to_string()));
let app = App {
images_dir: Arc::new(images_dir.clone()),
cpu_slots: Arc::new(Semaphore::new(workers)),
quality: env_or("QUALITY", 80.0),
fast_preset: std::env::var("PRESET").as_deref() != Ok("small"),
resize_threads: env_or("OXIMG_PAR", 1),
inflight: Arc::new(Mutex::new(HashMap::new())),
};
let router = Router::new()
.route("/health", get(async || "ok"))
.route("/resize/{w}/{h}/{file}", get(handle_resize))
.with_state(app);
let listener = tokio::net::TcpListener::bind(("0.0.0.0", port)).await?;
eprintln!(
"oximg listening on :{port} (images: {}, workers: {workers})",
images_dir.display()
);
axum::serve(listener, router).await?;
Ok(())
}
async fn handle_resize(
State(app): State<App>,
Path((w, h, file)): Path<(u32, u32, String)>,
) -> Result<impl IntoResponse, (StatusCode, String)> {
if w == 0 || h == 0 || w > 8192 || h > 8192 {
return Err((StatusCode::BAD_REQUEST, "invalid dimensions".into()));
}
if file.contains(['/', '\\']) || file.contains("..") {
return Err((StatusCode::BAD_REQUEST, "invalid filename".into()));
}
let out = singleflight(&app, (w, h, file)).await?;
Ok((
[
(header::CONTENT_TYPE, "image/jpeg"),
(header::CACHE_CONTROL, "public, max-age=31536000"),
],
out,
))
}
struct FlightGuard {
map: Arc<FlightMap>,
key: FlightKey,
}
impl Drop for FlightGuard {
fn drop(&mut self) {
self.map.lock().unwrap().remove(&self.key);
}
}
async fn singleflight(app: &App, key: FlightKey) -> FlightResult {
for _ in 0..3 {
let leader_tx = {
let mut map = app.inflight.lock().unwrap();
match map.get(&key) {
Some(rx) => Err(rx.clone()),
None => {
let (tx, rx) = watch::channel(None);
map.insert(key.clone(), rx);
Ok(tx)
}
}
};
match leader_tx {
Ok(tx) => {
let guard = FlightGuard {
map: Arc::clone(&app.inflight),
key: key.clone(),
};
let result = process_one(app, &key).await;
drop(guard);
tx.send_replace(Some(result.clone()));
return result;
}
Err(mut rx) => loop {
if let Some(result) = rx.borrow_and_update().as_ref() {
return result.clone();
}
if rx.changed().await.is_err() {
break; }
},
}
}
Err((
StatusCode::SERVICE_UNAVAILABLE,
"request coalescing failed repeatedly".into(),
))
}
async fn process_one(app: &App, key: &FlightKey) -> FlightResult {
let (w, h, file) = key;
let path = app.images_dir.join(file);
let permit = app
.cpu_slots
.clone()
.acquire_owned()
.await
.expect("semaphore closed");
let params = pipeline::Params {
max_width: *w,
max_height: *h,
quality: app.quality,
fast_preset: app.fast_preset,
parallel: app.resize_threads,
};
let out = tokio::task::spawn_blocking(move || {
let _permit = permit; pipeline::process_path(&path, ¶ms)
})
.await
.map_err(|_| {
(
StatusCode::UNPROCESSABLE_ENTITY,
"image processing panicked (broken image?)".to_string(),
)
})?
.map_err(|e| match e.downcast_ref::<std::io::Error>() {
Some(io) if io.kind() == std::io::ErrorKind::NotFound => {
(StatusCode::NOT_FOUND, "image not found".to_string())
}
_ => (StatusCode::UNPROCESSABLE_ENTITY, e.to_string()),
})?;
Ok(Bytes::from(out))
}