use anyhow::{Result, Context};
use std::path::PathBuf;
use std::sync::Arc;
use holger_traits::{ConnectorTrait, RemoteAsset};
use znippy_compress::stream_packer::{ArchiveEntry, StreamCompressor};
pub fn pkg_type_from_format(format: &str) -> i8 {
match format.to_lowercase().as_str() {
"maven2" | "maven" => 1,
"cargo" => 2,
"npm" => 3,
"pypi" | "pip" => 4,
"docker" => 5,
"helm" => 6,
"raw" => 7,
_ => 0,
}
}
pub async fn feed_repo_to_sender(
connector: Arc<dyn ConnectorTrait>,
assets: Vec<RemoteAsset>,
sender: crossbeam_channel::Sender<ArchiveEntry>,
pkg_type: i8,
repo_name: String,
) -> Result<(usize, usize)> {
let total = assets.len();
let mut downloaded = 0usize;
let mut failed = 0usize;
for (i, asset) in assets.iter().enumerate() {
print!(" [{}/{}] {} ... ", i + 1, total, asset.path);
match connector.download_asset(asset).await {
Ok(bytes) => {
let n = bytes.len();
sender.send(ArchiveEntry {
relative_path: asset.path.clone(),
data: bytes,
pkg_type: Some(pkg_type),
repo: Some(repo_name.clone()),
}).context("compressor channel closed")?;
downloaded += 1;
println!("✓ ({} bytes)", n);
}
Err(e) => {
failed += 1;
println!("✗ {}", e);
}
}
}
Ok((downloaded, failed))
}
pub async fn dump_all_repos_to_znippy(
connector: Arc<dyn ConnectorTrait>,
repos: Vec<holger_traits::RemoteRepository>,
output: PathBuf,
sequential: bool,
) -> Result<()> {
use znippy_compress::stream_packer::compress_stream;
println!("📦 Creating znippy archive: {}", output.display());
let compressor: StreamCompressor = compress_stream(&output, false)?;
let total_downloaded;
let total_failed;
if sequential {
let sender = compressor.sender().clone();
let mut dl = 0usize;
let mut fail = 0usize;
for repo in &repos {
let pkg_type = pkg_type_from_format(&repo.format);
println!("\n→ {} (format: {}, pkg_type: {})", repo.name, repo.format, pkg_type);
let assets = connector.list_assets(&repo.name).await?;
println!(" {} assets", assets.len());
let (d, f) = feed_repo_to_sender(
connector.clone(), assets, sender.clone(), pkg_type, repo.name.clone(),
).await?;
dl += d;
fail += f;
}
drop(sender);
total_downloaded = dl;
total_failed = fail;
} else {
let mut tasks = Vec::new();
for repo in &repos {
let pkg_type = pkg_type_from_format(&repo.format);
println!("\n→ {} (format: {}, pkg_type: {})", repo.name, repo.format, pkg_type);
let assets = connector.list_assets(&repo.name).await?;
println!(" {} assets queued", assets.len());
let conn = connector.clone();
let sender = compressor.sender().clone();
let repo_name = repo.name.clone();
tasks.push(tokio::spawn(async move {
feed_repo_to_sender(conn, assets, sender, pkg_type, repo_name).await
}));
}
let mut dl = 0usize;
let mut fail = 0usize;
for task in tasks {
let (d, f) = task.await
.context("repo task panicked")?
.context("repo download failed")?;
dl += d;
fail += f;
}
total_downloaded = dl;
total_failed = fail;
}
let report = compressor.finish()?;
println!("\n✅ Archive complete: {} downloaded, {} failed ({} bytes compressed)",
total_downloaded, total_failed, report.compressed_bytes);
Ok(())
}
pub async fn transfer_to_znippy(
connector: &dyn ConnectorTrait,
assets: Vec<RemoteAsset>,
output: PathBuf,
) -> Result<()> {
use znippy_compress::stream_packer::compress_stream;
println!("📦 Creating znippy archive: {}", output.display());
let compressor = compress_stream(&output, false)?;
let sender = compressor.sender().clone();
let total = assets.len();
let mut downloaded = 0usize;
for (i, asset) in assets.iter().enumerate() {
print!(" [{}/{}] {} ... ", i + 1, total, asset.path);
match connector.download_asset(asset).await {
Ok(bytes) => {
let n = bytes.len();
sender.send(ArchiveEntry {
relative_path: asset.path.clone(),
data: bytes,
pkg_type: None,
repo: None,
}).context("Failed to send to compressor")?;
downloaded += 1;
println!("✓ ({} bytes)", n);
}
Err(e) => {
println!("✗ {}", e);
}
}
}
drop(sender);
let report = compressor.finish()?;
println!("\n✅ Znippy archive complete: {} assets packed ({} bytes compressed)",
downloaded, report.compressed_bytes);
Ok(())
}
pub async fn transfer_to_directory(
connector: &dyn ConnectorTrait,
assets: Vec<RemoteAsset>,
output: PathBuf,
) -> Result<()> {
println!("📁 Downloading to directory: {}", output.display());
std::fs::create_dir_all(&output)
.with_context(|| format!("Failed to create output directory: {}", output.display()))?;
let total = assets.len();
let mut downloaded = 0usize;
let mut failed = 0usize;
for (i, asset) in assets.iter().enumerate() {
let dest = output.join(&asset.path);
if let Some(parent) = dest.parent() {
std::fs::create_dir_all(parent)?;
}
print!(" [{}/{}] {} ... ", i + 1, total, asset.path);
match connector.download_asset(asset).await {
Ok(bytes) => {
std::fs::write(&dest, &bytes)
.with_context(|| format!("Failed to write {}", dest.display()))?;
downloaded += 1;
println!("✓ ({} bytes)", bytes.len());
}
Err(e) => {
failed += 1;
println!("✗ {}", e);
}
}
}
println!("\n✅ Done! {} downloaded, {} failed", downloaded, failed);
Ok(())
}
pub async fn push_directory_to_connector(
connector: &dyn ConnectorTrait,
directory: PathBuf,
repository: &str,
) -> Result<()> {
println!("📦 Pushing from directory: {}", directory.display());
let crate_files = find_crate_files(&directory)?;
println!(" Found {} .crate files to push", crate_files.len());
if crate_files.is_empty() {
println!(" Nothing to push.");
return Ok(());
}
let total = crate_files.len();
let mut pushed = 0usize;
let mut failed = 0usize;
for (i, crate_path) in crate_files.iter().enumerate() {
let relative = extract_crate_relative_path(crate_path);
let upload_path = relative.to_string_lossy();
print!(" [{}/{}] {} ... ", i + 1, total, upload_path);
let data = std::fs::read(crate_path)
.with_context(|| format!("Failed to read {}", crate_path.display()))?;
match connector.upload_asset(repository, &upload_path, &data).await {
Ok(()) => {
pushed += 1;
println!("✓");
}
Err(e) => {
failed += 1;
println!("✗ {}", e);
}
}
}
println!("\n✅ Push complete: {} succeeded, {} failed", pushed, failed);
Ok(())
}
pub async fn push_znippy_to_connector(
connector: &dyn ConnectorTrait,
archive: PathBuf,
repository: &str,
) -> Result<()> {
use znippy_common::{ZnippyArchive, ZnippyReader};
println!("📦 Reading znippy archive: {}", archive.display());
let znippy = ZnippyArchive::open(&archive)?;
let files = znippy.list_files()?;
println!(" Found {} files in archive", files.len());
let total = files.len();
let mut pushed = 0usize;
let mut failed = 0usize;
for (i, path) in files.iter().enumerate() {
print!(" [{}/{}] {} ... ", i + 1, total, path);
match znippy.extract_file(path) {
Ok(data) => {
match connector.upload_asset(repository, path, &data).await {
Ok(()) => { pushed += 1; println!("✓"); }
Err(e) => { failed += 1; println!("✗ {}", e); }
}
}
Err(e) => { failed += 1; println!("✗ extract: {}", e); }
}
}
println!("\n✅ Push complete: {} succeeded, {} failed", pushed, failed);
Ok(())
}
fn extract_crate_relative_path(path: &std::path::Path) -> PathBuf {
let components: Vec<_> = path.components().collect();
for (i, comp) in components.iter().enumerate() {
if comp.as_os_str() == "crates" {
return components[i..].iter().collect();
}
}
PathBuf::from(path.file_name().unwrap_or_default())
}
fn find_crate_files(dir: &std::path::Path) -> Result<Vec<PathBuf>> {
let mut files = Vec::new();
walk_dir_recursive(dir, &mut files)?;
files.sort();
Ok(files)
}
fn walk_dir_recursive(dir: &std::path::Path, files: &mut Vec<PathBuf>) -> Result<()> {
if !dir.is_dir() {
return Ok(());
}
for entry in std::fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
walk_dir_recursive(&path, files)?;
} else if path.extension().and_then(|e| e.to_str()) == Some("crate") {
files.push(path);
}
}
Ok(())
}