stormchaser-agent 1.3.2

A robust, distributed workflow engine for event-driven and human-triggered workflows.
use anyhow::Result;
use reqwest::{Body, Client};
use serde_json::{json, Value};
use sha2::{Digest, Sha256};
use std::collections::HashMap;
use std::fs::File;
use std::io::Read;
use std::path::Path;
use std::process::Command;
use tokio::fs::File as AsyncFile;
use tracing::{error, info, warn};

async fn upload_to_s3(
    name: &String,
    artifact_val: &Value,
    path: &str,
    file_size: u64,
    hash: String,
    client: &Client,
    metadata_map: &mut HashMap<String, Value>,
) {
    if let Some(put_url) = artifact_val.get("put_url").and_then(|u| u.as_str()) {
        let file_tokio = match AsyncFile::open(path).await {
            Ok(f) => f,
            Err(e) => {
                error!("Failed to open artifact file '{}': {}", name, e);
                return;
            }
        };
        let body = Body::from(file_tokio);

        let res = match client
            .put(put_url)
            .header("Content-Length", file_size)
            .body(body)
            .send()
            .await
        {
            Ok(r) => r,
            Err(e) => {
                error!("Request failed for artifact '{}': {}", name, e);
                return;
            }
        };

        if !res.status().is_success() {
            let status = res.status();
            let text = res.text().await.unwrap_or_default();
            error!(
                "Failed to upload artifact '{}': {} - {}",
                name, status, text
            );
        } else {
            info!("Successfully parked artifact '{}' (hash: {})", name, hash);
            metadata_map.insert(
                name.clone(),
                json!({
                    "hash": hash,
                    "size": file_size,
                    "content_type": "application/octet-stream",
                }),
            );
        }
    } else {
        warn!("Missing 'put_url' for S3 artifact '{}'", name);
    }
}

async fn upload_to_oci(
    name: &String,
    artifact_val: &Value,
    path: &str,
    file_size: u64,
    hash: String,
    metadata_map: &mut HashMap<String, Value>,
) {
    if let Some(remote_path) = artifact_val.get("remote_path").and_then(|u| u.as_str()) {
        let mut cmd = Command::new("oras");
        cmd.arg("push");

        if let (Some(user), Some(pass)) = (
            artifact_val.get("username").and_then(|u| u.as_str()),
            artifact_val.get("password").and_then(|p| p.as_str()),
        ) {
            cmd.arg("--username").arg(user).arg("--password").arg(pass);
        }

        cmd.arg(remote_path);
        cmd.arg(path);

        let output = match cmd.output() {
            Ok(o) => o,
            Err(e) => {
                error!("Failed to execute oras for artifact '{}': {}", name, e);
                return;
            }
        };

        if !output.status.success() {
            let stderr = String::from_utf8_lossy(&output.stderr);
            error!("Failed to push OCI artifact '{}': {}", name, stderr);
        } else {
            info!(
                "Successfully parked OCI artifact '{}' (hash: {})",
                name, hash
            );
            metadata_map.insert(
                name.clone(),
                json!({
                    "hash": hash,
                    "size": file_size,
                    "content_type": "application/vnd.oci.image.layer.v1.tar+gzip",
                }),
            );
        }
    } else {
        warn!("Missing 'remote_path' for OCI artifact '{}'", name);
    }
}

/// Uploads artifacts to configured backends and returns metadata maps.
pub async fn park_artifacts(artifacts: Value) -> Result<HashMap<String, Value>> {
    let client = Client::new();
    let mut metadata_map = HashMap::new();

    if let Some(artifact_map) = artifacts.as_object() {
        for (name, artifact_val) in artifact_map {
            let backend_type = artifact_val
                .get("backend_type")
                .and_then(|t| t.as_str())
                .unwrap_or("s3");

            let path = match artifact_val.get("path").and_then(|p| p.as_str()) {
                Some(p) => p,
                None => continue,
            };

            if !Path::new(path).exists() {
                warn!("Artifact '{}' path '{}' not found, skipping", name, path);
                continue;
            }

            info!("Parking artifact '{}' from path '{}'...", name, path);

            let file = File::open(path)?;
            let file_size = file.metadata()?.len();
            let mut hasher = Sha256::new();
            {
                let mut file_read = File::open(path)?;
                let mut buffer = [0; 8192];
                loop {
                    let count = file_read.read(&mut buffer)?;
                    if count == 0 {
                        break;
                    }
                    hasher.update(&buffer[..count]);
                }
            }
            let hash = hex::encode(hasher.finalize());

            if backend_type == "s3" {
                upload_to_s3(
                    name,
                    artifact_val,
                    path,
                    file_size,
                    hash,
                    &client,
                    &mut metadata_map,
                )
                .await;
            } else if backend_type == "oci" {
                upload_to_oci(name, artifact_val, path, file_size, hash, &mut metadata_map).await;
            } else {
                warn!("Unsupported artifact backend_type '{}'", backend_type);
            }
        }
    }

    Ok(metadata_map)
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::fs;
    use tempfile::tempdir;
    use wiremock::matchers::{method, path};
    use wiremock::{Mock, MockServer, ResponseTemplate};

    #[tokio::test]
    async fn test_park_artifacts_success() {
        let mock_server = MockServer::start().await;
        let dir = tempdir().unwrap();
        let artifact_path = dir.path().join("artifact.bin");
        fs::write(&artifact_path, "binary content").unwrap();

        Mock::given(method("PUT"))
            .and(path("/artifacts/artifact.bin"))
            .respond_with(ResponseTemplate::new(200))
            .mount(&mock_server)
            .await;

        let artifacts = json!({
            "my-artifact": {
                "backend_type": "s3",
                "path": artifact_path.to_str().unwrap(),
                "put_url": format!("{}/artifacts/artifact.bin", mock_server.uri())
            }
        });

        let metadata = park_artifacts(artifacts).await.unwrap();
        assert!(metadata.contains_key("my-artifact"));
        assert_eq!(metadata["my-artifact"]["size"], 14);
    }
}