use std::collections::HashMap;
use std::sync::Arc;
use crate::output::CaseOutput;
const MAX_CONCURRENT: usize = 8;
#[derive(Debug, Clone)]
pub struct S3Config {
pub endpoint: String,
pub bucket: String,
pub region: String,
pub public_url: String,
}
impl S3Config {
pub fn from_args_or_env(
endpoint: Option<&str>,
bucket: Option<&str>,
region: Option<&str>,
public_url: Option<&str>,
) -> Option<Self> {
let endpoint = endpoint
.map(String::from)
.or_else(|| std::env::var("S3_ENDPOINT").ok())?;
let bucket = bucket
.map(String::from)
.or_else(|| std::env::var("S3_FILES_BUCKET").ok())?;
let region = region
.map(String::from)
.or_else(|| std::env::var("S3_REGION").ok())?;
let public_url = public_url
.map(String::from)
.or_else(|| std::env::var("S3_FILES_PUBLIC_URL").ok())?;
Some(Self {
endpoint,
bucket,
region,
public_url,
})
}
}
pub fn process_thumbnails(
output: &mut CaseOutput,
config: &S3Config,
rt: &tokio::runtime::Runtime,
) {
let mut source_urls: Vec<String> = Vec::new();
for node in &output.nodes {
if let Some(ref url) = node.thumbnail
&& !source_urls.contains(url)
{
source_urls.push(url.clone());
}
}
if source_urls.is_empty() {
return;
}
eprintln!(
" processing {} thumbnail(s) (concurrency={})",
source_urls.len(),
MAX_CONCURRENT
);
let url_map = rt.block_on(process_all(source_urls, config));
for node in &mut output.nodes {
if let Some(ref source_url) = node.thumbnail {
match url_map.get(source_url) {
Some(Some(garage_url)) => {
node.thumbnail = Some(garage_url.clone());
}
Some(None) => {
node.thumbnail = None;
}
None => {}
}
}
}
}
async fn process_all(
source_urls: Vec<String>,
config: &S3Config,
) -> HashMap<String, Option<String>> {
let bucket = match create_bucket(config) {
Ok(b) => Arc::new(b),
Err(e) => {
eprintln!(" S3 bucket init failed: {e}");
let mut map = HashMap::new();
for url in source_urls {
map.insert(url, None);
}
return map;
}
};
let semaphore = Arc::new(tokio::sync::Semaphore::new(MAX_CONCURRENT));
let public_url = Arc::new(config.public_url.trim_end_matches('/').to_string());
let mut handles = Vec::new();
for url in source_urls {
let sem = semaphore.clone();
let bkt = bucket.clone();
let pub_url = public_url.clone();
handles.push(tokio::spawn(async move {
let _permit = sem.acquire().await;
let result = process_one(&url, &bkt, &pub_url).await;
(url, result)
}));
}
let mut map = HashMap::new();
for handle in handles {
match handle.await {
Ok((url, result)) => {
map.insert(url, result);
}
Err(e) => {
eprintln!(" thumbnail task panicked: {e}");
}
}
}
map
}
async fn process_one(source_url: &str, bucket: &s3::Bucket, public_url: &str) -> Option<String> {
let key = weave_image::thumbnail_key(source_url);
let garage_url = format!("{public_url}/{key}");
if let Ok((_, 200)) = bucket.head_object(&key).await {
eprintln!(" exists {source_url} -> {garage_url}");
return Some(garage_url);
}
let thumb = match weave_image::process_thumbnail(source_url) {
Ok(t) => t,
Err(e) => {
eprintln!(" warn {source_url}: {e}");
return None;
}
};
match bucket
.put_object_with_content_type(&key, &thumb.data, "image/webp")
.await
{
Ok(response) => {
let status = response.status_code();
if status == 200 || status == 201 {
eprintln!(
" uploaded {source_url} -> {garage_url} ({} bytes)",
thumb.data.len()
);
Some(garage_url)
} else {
eprintln!(" warn {source_url}: S3 PUT returned HTTP {status}");
None
}
}
Err(e) => {
eprintln!(" warn {source_url}: S3 upload failed: {e}");
None
}
}
}
fn create_bucket(config: &S3Config) -> Result<s3::Bucket, String> {
let region = s3::Region::Custom {
region: config.region.clone(),
endpoint: config.endpoint.clone(),
};
let access_key =
std::env::var("AWS_ACCESS_KEY_ID").map_err(|_| "AWS_ACCESS_KEY_ID not set".to_string())?;
let secret_key = std::env::var("AWS_SECRET_ACCESS_KEY")
.map_err(|_| "AWS_SECRET_ACCESS_KEY not set".to_string())?;
let credentials =
s3::creds::Credentials::new(Some(&access_key), Some(&secret_key), None, None, None)
.map_err(|e| e.to_string())?;
let bucket = s3::Bucket::new(&config.bucket, region, credentials)
.map_err(|e| e.to_string())?
.with_path_style();
Ok(*bucket)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn s3_config_from_args() {
let config = S3Config::from_args_or_env(
Some("http://localhost:3900"),
Some("files"),
Some("garage"),
Some("http://localhost:3902"),
);
assert!(config.is_some());
let c = config.unwrap_or_else(|| unreachable!());
assert_eq!(c.endpoint, "http://localhost:3900");
assert_eq!(c.bucket, "files");
assert_eq!(c.region, "garage");
assert_eq!(c.public_url, "http://localhost:3902");
}
#[test]
fn s3_config_missing_returns_none() {
let config = S3Config::from_args_or_env(None, None, None, None);
if std::env::var("S3_ENDPOINT").is_err() {
assert!(config.is_none());
}
}
#[test]
fn process_thumbnails_no_thumbnails() {
let mut output = CaseOutput {
case_id: "test".into(),
title: "Test".into(),
summary: String::new(),
nodes: vec![],
relationships: vec![],
sources: vec![],
};
let config = S3Config {
endpoint: "http://localhost:3900".into(),
bucket: "files".into(),
region: "garage".into(),
public_url: "http://localhost:3902".into(),
};
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap_or_else(|_| unreachable!());
process_thumbnails(&mut output, &config, &rt);
assert!(output.nodes.is_empty());
}
}