weave-content 0.2.3

Content DSL parser, validator, and builder for OSINT case files
use std::collections::HashMap;
use std::sync::Arc;

use crate::output::CaseOutput;

/// Maximum concurrent thumbnail downloads.
const MAX_CONCURRENT: usize = 8;

/// S3 configuration for thumbnail uploads.
#[derive(Debug, Clone)]
pub struct S3Config {
    pub endpoint: String,
    pub bucket: String,
    pub region: String,
    pub public_url: String,
}

impl S3Config {
    /// Build from CLI flags, falling back to environment variables.
    pub fn from_args_or_env(
        endpoint: Option<&str>,
        bucket: Option<&str>,
        region: Option<&str>,
        public_url: Option<&str>,
    ) -> Option<Self> {
        let endpoint = endpoint
            .map(String::from)
            .or_else(|| std::env::var("S3_ENDPOINT").ok())?;
        let bucket = bucket
            .map(String::from)
            .or_else(|| std::env::var("S3_FILES_BUCKET").ok())?;
        let region = region
            .map(String::from)
            .or_else(|| std::env::var("S3_REGION").ok())?;
        let public_url = public_url
            .map(String::from)
            .or_else(|| std::env::var("S3_FILES_PUBLIC_URL").ok())?;

        Some(Self {
            endpoint,
            bucket,
            region,
            public_url,
        })
    }
}

/// Process all thumbnails in a case output: download, resize, upload to S3,
/// and replace thumbnail URLs with Garage public URLs.
///
/// Failures are warnings -- the node gets `thumbnail: null` in the output.
pub fn process_thumbnails(
    output: &mut CaseOutput,
    config: &S3Config,
    rt: &tokio::runtime::Runtime,
) {
    // Collect unique thumbnail source URLs
    let mut source_urls: Vec<String> = Vec::new();
    for node in &output.nodes {
        if let Some(ref url) = node.thumbnail
            && !source_urls.contains(url)
        {
            source_urls.push(url.clone());
        }
    }

    if source_urls.is_empty() {
        return;
    }

    eprintln!(
        "  processing {} thumbnail(s) (concurrency={})",
        source_urls.len(),
        MAX_CONCURRENT
    );

    // Process all thumbnails concurrently
    let url_map = rt.block_on(process_all(source_urls, config));

    // Replace thumbnail URLs in output
    for node in &mut output.nodes {
        if let Some(ref source_url) = node.thumbnail {
            match url_map.get(source_url) {
                Some(Some(garage_url)) => {
                    node.thumbnail = Some(garage_url.clone());
                }
                Some(None) => {
                    // Processing failed, null out thumbnail
                    node.thumbnail = None;
                }
                None => {}
            }
        }
    }
}

/// Process all thumbnail URLs concurrently. Returns a map of
/// `source_url -> Option<garage_url>` (None = failed).
async fn process_all(
    source_urls: Vec<String>,
    config: &S3Config,
) -> HashMap<String, Option<String>> {
    let bucket = match create_bucket(config) {
        Ok(b) => Arc::new(b),
        Err(e) => {
            eprintln!("  S3 bucket init failed: {e}");
            let mut map = HashMap::new();
            for url in source_urls {
                map.insert(url, None);
            }
            return map;
        }
    };

    let semaphore = Arc::new(tokio::sync::Semaphore::new(MAX_CONCURRENT));
    let public_url = Arc::new(config.public_url.trim_end_matches('/').to_string());

    let mut handles = Vec::new();

    for url in source_urls {
        let sem = semaphore.clone();
        let bkt = bucket.clone();
        let pub_url = public_url.clone();
        handles.push(tokio::spawn(async move {
            let _permit = sem.acquire().await;
            let result = process_one(&url, &bkt, &pub_url).await;
            (url, result)
        }));
    }

    let mut map = HashMap::new();
    for handle in handles {
        match handle.await {
            Ok((url, result)) => {
                map.insert(url, result);
            }
            Err(e) => {
                eprintln!("  thumbnail task panicked: {e}");
            }
        }
    }

    map
}

/// Process a single thumbnail: check S3 existence, download/resize/upload if needed.
async fn process_one(source_url: &str, bucket: &s3::Bucket, public_url: &str) -> Option<String> {
    let key = weave_image::thumbnail_key(source_url);
    let garage_url = format!("{public_url}/{key}");

    // Check if already exists via HEAD
    if let Ok((_, 200)) = bucket.head_object(&key).await {
        eprintln!("  exists {source_url} -> {garage_url}");
        return Some(garage_url);
    }

    // Download and resize
    let thumb = match weave_image::process_thumbnail(source_url) {
        Ok(t) => t,
        Err(e) => {
            eprintln!("  warn  {source_url}: {e}");
            return None;
        }
    };

    // Upload to S3
    match bucket
        .put_object_with_content_type(&key, &thumb.data, "image/webp")
        .await
    {
        Ok(response) => {
            let status = response.status_code();
            if status == 200 || status == 201 {
                eprintln!(
                    "  uploaded {source_url} -> {garage_url} ({} bytes)",
                    thumb.data.len()
                );
                Some(garage_url)
            } else {
                eprintln!("  warn  {source_url}: S3 PUT returned HTTP {status}");
                None
            }
        }
        Err(e) => {
            eprintln!("  warn  {source_url}: S3 upload failed: {e}");
            None
        }
    }
}

fn create_bucket(config: &S3Config) -> Result<s3::Bucket, String> {
    let region = s3::Region::Custom {
        region: config.region.clone(),
        endpoint: config.endpoint.clone(),
    };

    let access_key =
        std::env::var("AWS_ACCESS_KEY_ID").map_err(|_| "AWS_ACCESS_KEY_ID not set".to_string())?;
    let secret_key = std::env::var("AWS_SECRET_ACCESS_KEY")
        .map_err(|_| "AWS_SECRET_ACCESS_KEY not set".to_string())?;

    let credentials =
        s3::creds::Credentials::new(Some(&access_key), Some(&secret_key), None, None, None)
            .map_err(|e| e.to_string())?;

    let bucket = s3::Bucket::new(&config.bucket, region, credentials)
        .map_err(|e| e.to_string())?
        .with_path_style();

    Ok(*bucket)
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn s3_config_from_args() {
        let config = S3Config::from_args_or_env(
            Some("http://localhost:3900"),
            Some("files"),
            Some("garage"),
            Some("http://localhost:3902"),
        );
        assert!(config.is_some());
        let c = config.unwrap_or_else(|| unreachable!());
        assert_eq!(c.endpoint, "http://localhost:3900");
        assert_eq!(c.bucket, "files");
        assert_eq!(c.region, "garage");
        assert_eq!(c.public_url, "http://localhost:3902");
    }

    #[test]
    fn s3_config_missing_returns_none() {
        // Without env vars or args, should return None
        let config = S3Config::from_args_or_env(None, None, None, None);
        // This depends on env vars; if none set, returns None
        if std::env::var("S3_ENDPOINT").is_err() {
            assert!(config.is_none());
        }
    }

    #[test]
    fn process_thumbnails_no_thumbnails() {
        let mut output = CaseOutput {
            case_id: "test".into(),
            title: "Test".into(),
            summary: String::new(),
            nodes: vec![],
            relationships: vec![],
            sources: vec![],
        };
        let config = S3Config {
            endpoint: "http://localhost:3900".into(),
            bucket: "files".into(),
            region: "garage".into(),
            public_url: "http://localhost:3902".into(),
        };
        let rt = tokio::runtime::Builder::new_current_thread()
            .enable_all()
            .build()
            .unwrap_or_else(|_| unreachable!());
        // Should be a no-op
        process_thumbnails(&mut output, &config, &rt);
        assert!(output.nodes.is_empty());
    }
}