holger-agent-lib 0.6.0

Holger agent library: airgap, export, push operations using connectors
use anyhow::{Result, Context};
use std::path::PathBuf;
use std::sync::Arc;
use holger_traits::{ConnectorTrait, RemoteAsset};
use znippy_compress::stream_packer::{ArchiveEntry, StreamCompressor};

/// Map Nexus/Artifactory format string → znippy pkg_type discriminant.
pub fn pkg_type_from_format(format: &str) -> i8 {
    match format.to_lowercase().as_str() {
        "maven2" | "maven"  => 1,
        "cargo"             => 2,
        "npm"               => 3,
        "pypi" | "pip"      => 4,
        "docker"            => 5,
        "helm"              => 6,
        "raw"               => 7,
        _                   => 0,
    }
}

/// Feed assets from one repo into an already-open compressor sender.
/// Returns (downloaded, failed) counts.
pub async fn feed_repo_to_sender(
    connector: Arc<dyn ConnectorTrait>,
    assets: Vec<RemoteAsset>,
    sender: crossbeam_channel::Sender<ArchiveEntry>,
    pkg_type: i8,
    repo_name: String,
) -> Result<(usize, usize)> {
    let total = assets.len();
    let mut downloaded = 0usize;
    let mut failed = 0usize;

    for (i, asset) in assets.iter().enumerate() {
        print!("   [{}/{}] {} ... ", i + 1, total, asset.path);
        match connector.download_asset(asset).await {
            Ok(bytes) => {
                let n = bytes.len();
                sender.send(ArchiveEntry {
                    relative_path: asset.path.clone(),
                    data: bytes,
                    pkg_type: Some(pkg_type),
                    repo: Some(repo_name.clone()),
                }).context("compressor channel closed")?;
                downloaded += 1;
                println!("✓ ({} bytes)", n);
            }
            Err(e) => {
                failed += 1;
                println!("{}", e);
            }
        }
    }
    Ok((downloaded, failed))
}

/// Dump all repos from a connector into one znippy archive.
///
/// Parallel mode (default): all repos downloaded concurrently — tokio tasks share cloned
/// senders. The compressor's bounded channel provides backpressure so memory stays bounded.
/// Sequential mode: one repo at a time — lower peak memory and network usage.
pub async fn dump_all_repos_to_znippy(
    connector: Arc<dyn ConnectorTrait>,
    repos: Vec<holger_traits::RemoteRepository>,
    output: PathBuf,
    sequential: bool,
) -> Result<()> {
    use znippy_compress::stream_packer::compress_stream;

    println!("📦 Creating znippy archive: {}", output.display());
    let compressor: StreamCompressor = compress_stream(&output, false)?;

    let total_downloaded;
    let total_failed;

    if sequential {
        let sender = compressor.sender().clone();
        let mut dl = 0usize;
        let mut fail = 0usize;
        for repo in &repos {
            let pkg_type = pkg_type_from_format(&repo.format);
            println!("\n{} (format: {}, pkg_type: {})", repo.name, repo.format, pkg_type);
            let assets = connector.list_assets(&repo.name).await?;
            println!("  {} assets", assets.len());
            let (d, f) = feed_repo_to_sender(
                connector.clone(), assets, sender.clone(), pkg_type, repo.name.clone(),
            ).await?;
            dl += d;
            fail += f;
        }
        drop(sender);
        total_downloaded = dl;
        total_failed = fail;
    } else {
        // Parallel: spawn one task per repo, all share cloned senders.
        let mut tasks = Vec::new();
        for repo in &repos {
            let pkg_type = pkg_type_from_format(&repo.format);
            println!("\n{} (format: {}, pkg_type: {})", repo.name, repo.format, pkg_type);
            let assets = connector.list_assets(&repo.name).await?;
            println!("  {} assets queued", assets.len());

            let conn = connector.clone();
            let sender = compressor.sender().clone();
            let repo_name = repo.name.clone();
            tasks.push(tokio::spawn(async move {
                feed_repo_to_sender(conn, assets, sender, pkg_type, repo_name).await
            }));
        }

        let mut dl = 0usize;
        let mut fail = 0usize;
        for task in tasks {
            let (d, f) = task.await
                .context("repo task panicked")?
                .context("repo download failed")?;
            dl += d;
            fail += f;
        }
        total_downloaded = dl;
        total_failed = fail;
    }

    let report = compressor.finish()?;
    println!("\n✅ Archive complete: {} downloaded, {} failed ({} bytes compressed)",
             total_downloaded, total_failed, report.compressed_bytes);
    Ok(())
}

/// Transfer assets from a connector to a znippy archive (single repo, no pkg_type tagging).
pub async fn transfer_to_znippy(
    connector: &dyn ConnectorTrait,
    assets: Vec<RemoteAsset>,
    output: PathBuf,
) -> Result<()> {
    use znippy_compress::stream_packer::compress_stream;

    println!("📦 Creating znippy archive: {}", output.display());
    let compressor = compress_stream(&output, false)?;
    let sender = compressor.sender().clone();

    let total = assets.len();
    let mut downloaded = 0usize;

    for (i, asset) in assets.iter().enumerate() {
        print!("   [{}/{}] {} ... ", i + 1, total, asset.path);
        match connector.download_asset(asset).await {
            Ok(bytes) => {
                let n = bytes.len();
                sender.send(ArchiveEntry {
                    relative_path: asset.path.clone(),
                    data: bytes,
                    pkg_type: None,
                    repo: None,
                }).context("Failed to send to compressor")?;
                downloaded += 1;
                println!("✓ ({} bytes)", n);
            }
            Err(e) => {
                println!("{}", e);
            }
        }
    }

    drop(sender);
    let report = compressor.finish()?;
    println!("\n✅ Znippy archive complete: {} assets packed ({} bytes compressed)",
             downloaded, report.compressed_bytes);
    Ok(())
}

/// Transfer assets from a connector to a directory
pub async fn transfer_to_directory(
    connector: &dyn ConnectorTrait,
    assets: Vec<RemoteAsset>,
    output: PathBuf,
) -> Result<()> {
    println!("📁 Downloading to directory: {}", output.display());
    std::fs::create_dir_all(&output)
        .with_context(|| format!("Failed to create output directory: {}", output.display()))?;

    let total = assets.len();
    let mut downloaded = 0usize;
    let mut failed = 0usize;

    for (i, asset) in assets.iter().enumerate() {
        let dest = output.join(&asset.path);
        if let Some(parent) = dest.parent() {
            std::fs::create_dir_all(parent)?;
        }

        print!("   [{}/{}] {} ... ", i + 1, total, asset.path);

        match connector.download_asset(asset).await {
            Ok(bytes) => {
                std::fs::write(&dest, &bytes)
                    .with_context(|| format!("Failed to write {}", dest.display()))?;
                downloaded += 1;
                println!("✓ ({} bytes)", bytes.len());
            }
            Err(e) => {
                failed += 1;
                println!("{}", e);
            }
        }
    }

    println!("\n✅ Done! {} downloaded, {} failed", downloaded, failed);
    Ok(())
}

/// Push assets from a directory to a connector
pub async fn push_directory_to_connector(
    connector: &dyn ConnectorTrait,
    directory: PathBuf,
    repository: &str,
) -> Result<()> {
    println!("📦 Pushing from directory: {}", directory.display());

    let crate_files = find_crate_files(&directory)?;
    println!("   Found {} .crate files to push", crate_files.len());

    if crate_files.is_empty() {
        println!("   Nothing to push.");
        return Ok(());
    }

    let total = crate_files.len();
    let mut pushed = 0usize;
    let mut failed = 0usize;

    for (i, crate_path) in crate_files.iter().enumerate() {
        let relative = extract_crate_relative_path(crate_path);
        let upload_path = relative.to_string_lossy();

        print!("   [{}/{}] {} ... ", i + 1, total, upload_path);

        let data = std::fs::read(crate_path)
            .with_context(|| format!("Failed to read {}", crate_path.display()))?;

        match connector.upload_asset(repository, &upload_path, &data).await {
            Ok(()) => {
                pushed += 1;
                println!("");
            }
            Err(e) => {
                failed += 1;
                println!("{}", e);
            }
        }
    }

    println!("\n✅ Push complete: {} succeeded, {} failed", pushed, failed);
    Ok(())
}

/// Push assets from a znippy archive to a connector
pub async fn push_znippy_to_connector(
    connector: &dyn ConnectorTrait,
    archive: PathBuf,
    repository: &str,
) -> Result<()> {
    use znippy_common::{ZnippyArchive, ZnippyReader};

    println!("📦 Reading znippy archive: {}", archive.display());
    let znippy = ZnippyArchive::open(&archive)?;
    let files = znippy.list_files()?;
    println!("   Found {} files in archive", files.len());

    let total = files.len();
    let mut pushed = 0usize;
    let mut failed = 0usize;

    for (i, path) in files.iter().enumerate() {
        print!("   [{}/{}] {} ... ", i + 1, total, path);
        match znippy.extract_file(path) {
            Ok(data) => {
                match connector.upload_asset(repository, path, &data).await {
                    Ok(()) => { pushed += 1; println!(""); }
                    Err(e) => { failed += 1; println!("{}", e); }
                }
            }
            Err(e) => { failed += 1; println!("✗ extract: {}", e); }
        }
    }

    println!("\n✅ Push complete: {} succeeded, {} failed", pushed, failed);
    Ok(())
}

fn extract_crate_relative_path(path: &std::path::Path) -> PathBuf {
    let components: Vec<_> = path.components().collect();
    for (i, comp) in components.iter().enumerate() {
        if comp.as_os_str() == "crates" {
            return components[i..].iter().collect();
        }
    }
    PathBuf::from(path.file_name().unwrap_or_default())
}

fn find_crate_files(dir: &std::path::Path) -> Result<Vec<PathBuf>> {
    let mut files = Vec::new();
    walk_dir_recursive(dir, &mut files)?;
    files.sort();
    Ok(files)
}

fn walk_dir_recursive(dir: &std::path::Path, files: &mut Vec<PathBuf>) -> Result<()> {
    if !dir.is_dir() {
        return Ok(());
    }
    for entry in std::fs::read_dir(dir)? {
        let entry = entry?;
        let path = entry.path();
        if path.is_dir() {
            walk_dir_recursive(&path, files)?;
        } else if path.extension().and_then(|e| e.to_str()) == Some("crate") {
            files.push(path);
        }
    }
    Ok(())
}