use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
use clap::{Args, Subcommand};
use serde::Serialize;
use crate::commands::aggregator::RemoteAttachArgs;
use crate::context::{resolve_profile, CliContext, RemoteAttach};
use crate::error::{generic, invalid_args, sdk, CliError};
use crate::parsers::parse_u64_flexible;
use crate::prelude::{emit_value, OutputFormat};
use net_sdk::dataforts::{MeshBlobAdapter, Redex};
use net_sdk::transport::{self, BlobRef, Encoding};
#[derive(Subcommand, Debug)]
pub enum TransferCommand {
RecvBlob(RecvBlobArgs),
SendBlob(SendBlobArgs),
RecvDir(RecvDirArgs),
SendDir(SendDirArgs),
Ls(LsArgs),
Status(StatusArgs),
Cancel(CancelArgs),
}
#[derive(Args, Debug)]
pub struct RecvBlobArgs {
#[arg(long, value_parser = crate::parsers::parse_u64_flexible)]
pub from: Option<u64>,
#[arg(long = "blob-ref")]
pub blob_ref: String,
#[arg(long)]
pub out: PathBuf,
#[arg(long)]
pub identity: Option<PathBuf>,
#[arg(long, default_value_t = crate::prelude::DEFAULT_SUPERVISOR_NODE)]
pub node: u64,
#[command(flatten)]
pub attach: RemoteAttachArgs,
}
#[derive(Args, Debug)]
pub struct SendBlobArgs {
pub path: PathBuf,
#[arg(long)]
pub store: Option<PathBuf>,
}
#[derive(Args, Debug)]
pub struct RecvDirArgs {
#[arg(long, value_parser = crate::parsers::parse_u64_flexible)]
pub from: Option<u64>,
#[arg(long = "remote-ref")]
pub remote_ref: String,
#[arg(long)]
pub out: PathBuf,
#[arg(long, default_value_t = 0)]
pub concurrency: usize,
#[arg(long)]
pub identity: Option<PathBuf>,
#[arg(long, default_value_t = crate::prelude::DEFAULT_SUPERVISOR_NODE)]
pub node: u64,
#[command(flatten)]
pub attach: RemoteAttachArgs,
}
#[derive(Args, Debug)]
pub struct SendDirArgs {
pub path: PathBuf,
#[arg(long)]
pub store: Option<PathBuf>,
}
#[derive(Args, Debug)]
pub struct LsArgs {
#[arg(long)]
pub identity: Option<PathBuf>,
#[arg(long, default_value_t = crate::prelude::DEFAULT_SUPERVISOR_NODE)]
pub node: u64,
#[command(flatten)]
pub attach: RemoteAttachArgs,
}
#[derive(Args, Debug)]
pub struct StatusArgs {
pub transfer_id: String,
#[arg(long)]
pub identity: Option<PathBuf>,
#[arg(long, default_value_t = crate::prelude::DEFAULT_SUPERVISOR_NODE)]
pub node: u64,
#[command(flatten)]
pub attach: RemoteAttachArgs,
}
#[derive(Args, Debug)]
pub struct CancelArgs {
pub transfer_id: String,
#[arg(long)]
pub identity: Option<PathBuf>,
#[arg(long, default_value_t = crate::prelude::DEFAULT_SUPERVISOR_NODE)]
pub node: u64,
#[command(flatten)]
pub attach: RemoteAttachArgs,
}
pub async fn run(
cmd: TransferCommand,
output: Option<OutputFormat>,
config_path: Option<&std::path::Path>,
profile_name: &str,
quiet: bool,
) -> Result<(), CliError> {
match cmd {
TransferCommand::RecvBlob(args) => {
run_recv_blob(args, output, config_path, profile_name, quiet).await
}
TransferCommand::SendBlob(args) => run_send_blob(args, output).await,
TransferCommand::RecvDir(args) => {
run_recv_dir(args, output, config_path, profile_name, quiet).await
}
TransferCommand::SendDir(args) => run_send_dir(args, output).await,
TransferCommand::Ls(args) => run_ls(args, output, config_path, profile_name).await,
TransferCommand::Status(args) => run_status(args, output, config_path, profile_name).await,
TransferCommand::Cancel(args) => run_cancel(args, output, config_path, profile_name).await,
}
}
async fn run_recv_blob(
args: RecvBlobArgs,
output: Option<OutputFormat>,
config_path: Option<&std::path::Path>,
profile_name: &str,
quiet: bool,
) -> Result<(), CliError> {
let blob_ref = parse_content_ref(&args.blob_ref, "--blob-ref")?;
let profile = resolve_profile(config_path, profile_name).await?;
let remote = require_remote_attach(&profile, &args.attach, "recv-blob")?;
let attached = remote.node_id;
let source = args.from.unwrap_or(attached);
let ctx =
CliContext::build_with_remote(&profile, args.identity.as_deref(), args.node, false, remote)
.await?;
let mesh = ctx.require_mesh()?;
transport::serve_blob_transfer(
mesh,
Arc::new(MeshBlobAdapter::new("recv", Arc::new(Redex::new()))),
);
let label = format!("fetching blob from peer {source}");
let enabled = progress_enabled(output, quiet);
let declared = blob_ref.size();
let progress = if declared > 0 {
Progress::start_bytes(&label, declared, enabled)
} else {
Progress::start(&label, enabled)
};
let started = Instant::now();
use futures::StreamExt as _;
let mut stream = std::pin::pin!(transport::fetch_blob_stream(mesh, source, &blob_ref));
let mut writer = AtomicFileWriter::create(&args.out).await?;
let mut total: u64 = 0;
while let Some(item) = stream.next().await {
let chunk = item.map_err(|e| {
sdk(format!(
"fetch_blob from peer {source} failed: {e}{}",
relay_hint(source, attached)
))
})?;
total += chunk.len() as u64;
progress.inc(chunk.len() as u64);
writer.write_chunk(&chunk).await?;
}
writer.commit().await?;
let elapsed = started.elapsed();
progress.finish();
let view = RecvBlobView {
peer: source,
out: args.out.display().to_string(),
bytes: total,
duration_secs: elapsed.as_secs_f64(),
throughput_mib_s: throughput_mib_s(total, elapsed),
};
emit_value(OutputFormat::resolve_oneshot(output), &view)
.map_err(|e| generic(format!("write recv-blob result: {e}")))?;
Ok(())
}
async fn run_send_blob(args: SendBlobArgs, output: Option<OutputFormat>) -> Result<(), CliError> {
let adapter = match args.store.as_deref() {
Some(dir) => Some(persistent_adapter(dir, "send-blob").await?),
None => None,
};
let blob_ref = if args.path.as_os_str() == "-" {
transport::store_blob_reader(
adapter.as_deref(),
tokio::io::stdin(),
"mesh://transfer",
Encoding::Replicated,
)
.await
} else {
let file = tokio::fs::File::open(&args.path)
.await
.map_err(|e| generic(format!("open {}: {e}", args.path.display())))?;
transport::store_blob_reader(
adapter.as_deref(),
file,
"mesh://transfer",
Encoding::Replicated,
)
.await
}
.map_err(|e| sdk(format!("send blob from {}: {e}", args.path.display())))?;
let (small_hash, chunk_count) = match &blob_ref {
BlobRef::Small { hash, .. } => (Some(hex::encode(hash)), 1u64),
BlobRef::Manifest { chunks, .. } => (None, chunks.len() as u64),
BlobRef::Tree { .. } => {
return Err(sdk("internal: store_blob_reader yielded a Tree BlobRef \
(it only produces Small / Manifest) — please report"));
}
};
let view = SendBlobView {
blob_ref: hex::encode(blob_ref.encode()),
hash: small_hash,
size: blob_ref.size(),
chunks: chunk_count,
staged_to: args.store.as_deref().map(|d| d.display().to_string()),
};
emit_value(OutputFormat::resolve_oneshot(output), &view)
.map_err(|e| generic(format!("write send-blob result: {e}")))?;
Ok(())
}
async fn run_recv_dir(
args: RecvDirArgs,
output: Option<OutputFormat>,
config_path: Option<&std::path::Path>,
profile_name: &str,
quiet: bool,
) -> Result<(), CliError> {
let manifest_ref = parse_content_ref(&args.remote_ref, "--remote-ref")?;
let profile = resolve_profile(config_path, profile_name).await?;
let remote = require_remote_attach(&profile, &args.attach, "recv-dir")?;
let attached = remote.node_id;
let source = args.from.unwrap_or(attached);
let ctx =
CliContext::build_with_remote(&profile, args.identity.as_deref(), args.node, false, remote)
.await?;
let mesh = ctx.require_mesh()?;
transport::serve_blob_transfer(
mesh,
Arc::new(MeshBlobAdapter::new("recv", Arc::new(Redex::new()))),
);
let spinner = Progress::start(
&format!("reconstructing directory from peer {source}"),
progress_enabled(output, quiet),
);
let started = Instant::now();
let stats = transport::fetch_dir(mesh, source, &manifest_ref, &args.out, args.concurrency)
.await
.map_err(|e| {
sdk(format!(
"fetch_dir from peer {source} failed: {e}{}",
relay_hint(source, attached)
))
})?;
let elapsed = started.elapsed();
spinner.finish();
let view = RecvDirView {
peer: source,
out: args.out.display().to_string(),
files: stats.files as u64,
dirs: stats.dirs as u64,
symlinks: stats.symlinks as u64,
bytes: stats.bytes,
duration_secs: elapsed.as_secs_f64(),
throughput_mib_s: throughput_mib_s(stats.bytes, elapsed),
atomic: true,
};
emit_value(OutputFormat::resolve_oneshot(output), &view)
.map_err(|e| generic(format!("write recv-dir result: {e}")))?;
Ok(())
}
async fn run_send_dir(args: SendDirArgs, output: Option<OutputFormat>) -> Result<(), CliError> {
if !args.path.is_dir() {
return Err(invalid_args(format!(
"send-dir source `{}` is not a directory",
args.path.display()
)));
}
let (adapter, staged) = match args.store.as_deref() {
Some(dir) => (
persistent_adapter(dir, "send-dir").await?,
Some(dir.display().to_string()),
),
None => (
Arc::new(MeshBlobAdapter::new("send-dir", Arc::new(Redex::new()))),
None,
),
};
let manifest_ref = transport::store_dir(&adapter, &args.path)
.await
.map_err(|e| sdk(format!("store_dir `{}`: {e}", args.path.display())))?;
let view = SendDirView {
remote_ref: hex::encode(manifest_ref.encode()),
manifest_size: manifest_ref.size(),
staged_to: staged,
};
emit_value(OutputFormat::resolve_oneshot(output), &view)
.map_err(|e| generic(format!("write send-dir result: {e}")))?;
Ok(())
}
async fn transfer_client(
attach: &RemoteAttachArgs,
identity: Option<&std::path::Path>,
node: u64,
config_path: Option<&std::path::Path>,
profile_name: &str,
verb: &str,
) -> Result<(CliContext, u64), CliError> {
let profile = resolve_profile(config_path, profile_name).await?;
let remote = require_remote_attach(&profile, attach, verb)?;
let target = remote.node_id;
let ctx = CliContext::build_with_remote(&profile, identity, node, false, remote).await?;
Ok((ctx, target))
}
async fn run_ls(
args: LsArgs,
output: Option<OutputFormat>,
config_path: Option<&std::path::Path>,
profile_name: &str,
) -> Result<(), CliError> {
let (ctx, target) = transfer_client(
&args.attach,
args.identity.as_deref(),
args.node,
config_path,
profile_name,
"ls",
)
.await?;
let client = transport::BlobTransferClient::new(ctx.require_mesh_node()?);
let transfers = client
.list(target)
.await
.map_err(|e| sdk(format!("blob.transfers list on peer {target} failed: {e}")))?;
let view = LsView {
transfer_count: transfers.len() as u64,
transfers: transfers.iter().map(TransferRow::from).collect(),
};
emit_value(OutputFormat::resolve_oneshot(output), &view)
.map_err(|e| generic(format!("write transfer ls: {e}")))?;
Ok(())
}
async fn run_status(
args: StatusArgs,
output: Option<OutputFormat>,
config_path: Option<&std::path::Path>,
profile_name: &str,
) -> Result<(), CliError> {
let stream_id = parse_u64_flexible(&args.transfer_id)
.map_err(|e| invalid_args(format!("transfer-id `{}`: {e}", args.transfer_id)))?;
let (ctx, target) = transfer_client(
&args.attach,
args.identity.as_deref(),
args.node,
config_path,
profile_name,
"status",
)
.await?;
let client = transport::BlobTransferClient::new(ctx.require_mesh_node()?);
let found = client.get(target, stream_id).await.map_err(|e| {
sdk(format!(
"blob.transfers status on peer {target} failed: {e}"
))
})?;
let view = StatusView {
transfer_id: stream_id,
found: found.is_some(),
transfer: found.as_ref().map(TransferRow::from),
};
emit_value(OutputFormat::resolve_oneshot(output), &view)
.map_err(|e| generic(format!("write transfer status: {e}")))?;
Ok(())
}
async fn run_cancel(
args: CancelArgs,
output: Option<OutputFormat>,
config_path: Option<&std::path::Path>,
profile_name: &str,
) -> Result<(), CliError> {
let stream_id = parse_u64_flexible(&args.transfer_id)
.map_err(|e| invalid_args(format!("transfer-id `{}`: {e}", args.transfer_id)))?;
let (ctx, target) = transfer_client(
&args.attach,
args.identity.as_deref(),
args.node,
config_path,
profile_name,
"cancel",
)
.await?;
let client = transport::BlobTransferClient::new(ctx.require_mesh_node()?);
let cancelled = client.cancel(target, stream_id).await.map_err(|e| {
sdk(format!(
"blob.transfers cancel on peer {target} failed: {e}"
))
})?;
let view = CancelView {
transfer_id: stream_id,
cancelled,
};
emit_value(OutputFormat::resolve_oneshot(output), &view)
.map_err(|e| generic(format!("write transfer cancel: {e}")))?;
Ok(())
}
fn parse_content_ref(s: &str, flag: &str) -> Result<BlobRef, CliError> {
let body = s.trim();
let trimmed = body
.strip_prefix("0x")
.or_else(|| body.strip_prefix("0X"))
.unwrap_or(body);
let bytes = hex::decode(trimmed)
.map_err(|e| invalid_args(format!("{flag} `{s}` is not valid hex: {e}")))?;
if bytes.len() == 32 {
let mut hash = [0u8; 32];
hash.copy_from_slice(&bytes);
return Ok(BlobRef::small(format!("mesh://{trimmed}"), hash, 0));
}
match BlobRef::decode(&bytes) {
Ok(Some(r)) => Ok(r),
Ok(None) => Err(invalid_args(format!(
"{flag} `{s}` decoded to an empty BlobRef"
))),
Err(e) => Err(invalid_args(format!(
"{flag} `{s}` is neither a 32-byte hash nor a valid encoded BlobRef: {e}"
))),
}
}
fn relay_hint(source: u64, attached: u64) -> String {
if source == attached {
String::new()
} else {
format!(
" (fetching from peer {source} via attach node {attached}; \
ensure {attached} has a route to {source})"
)
}
}
fn require_remote_attach(
profile: &crate::config::Profile,
args: &RemoteAttachArgs,
verb: &str,
) -> Result<RemoteAttach, CliError> {
crate::context::require_remote_attach(profile, args, || {
invalid_args(format!(
"net transfer {verb} needs a holder target: pass --node-addr <IP:PORT> \
--node-pubkey <HEX> --node-id <N> --psk-hex <HEX> (each can be defaulted \
in the profile as `node_addr` / `node_pubkey` / `node_id` / `psk_hex`)."
))
})
}
async fn persistent_adapter(
dir: &std::path::Path,
id: &str,
) -> Result<Arc<MeshBlobAdapter>, CliError> {
tokio::fs::create_dir_all(dir)
.await
.map_err(|e| generic(format!("create store dir {}: {e}", dir.display())))?;
let redex = Arc::new(Redex::new().with_persistent_dir(dir));
Ok(Arc::new(
MeshBlobAdapter::new(id, redex).with_persistent(true),
))
}
struct AtomicFileWriter {
partial: PathBuf,
out: PathBuf,
file: tokio::fs::File,
}
impl AtomicFileWriter {
async fn create(out: &std::path::Path) -> Result<Self, CliError> {
let partial = partial_path(out);
if let Some(parent) = out.parent() {
if !parent.as_os_str().is_empty() {
tokio::fs::create_dir_all(parent)
.await
.map_err(|e| generic(format!("create out dir {}: {e}", parent.display())))?;
}
}
let file = tokio::fs::File::create(&partial)
.await
.map_err(|e| generic(format!("create {}: {e}", partial.display())))?;
Ok(Self {
partial,
out: out.to_path_buf(),
file,
})
}
async fn write_chunk(&mut self, bytes: &[u8]) -> Result<(), CliError> {
use tokio::io::AsyncWriteExt as _;
self.file
.write_all(bytes)
.await
.map_err(|e| generic(format!("write {}: {e}", self.partial.display())))
}
async fn commit(mut self) -> Result<(), CliError> {
use tokio::io::AsyncWriteExt as _;
self.file
.flush()
.await
.map_err(|e| generic(format!("flush {}: {e}", self.partial.display())))?;
drop(self.file);
tokio::fs::rename(&self.partial, &self.out)
.await
.map_err(|e| {
generic(format!(
"rename {} -> {}: {e} (partial left in place)",
self.partial.display(),
self.out.display()
))
})
}
}
fn partial_path(out: &std::path::Path) -> PathBuf {
let mut name = out.file_name().unwrap_or_default().to_os_string();
name.push(".partial");
out.with_file_name(name)
}
fn throughput_mib_s(bytes: u64, elapsed: std::time::Duration) -> f64 {
let secs = elapsed.as_secs_f64();
if secs <= 0.0 {
0.0
} else {
(bytes as f64 / (1024.0 * 1024.0)) / secs
}
}
fn progress_enabled(output: Option<OutputFormat>, quiet: bool) -> bool {
!quiet
&& matches!(
OutputFormat::resolve_oneshot(output),
OutputFormat::Table | OutputFormat::Text
)
}
struct Progress(Option<indicatif::ProgressBar>);
impl Progress {
fn start(msg: &str, enabled: bool) -> Self {
use std::io::IsTerminal as _;
if !enabled || !std::io::stderr().is_terminal() {
return Self(None);
}
let pb = indicatif::ProgressBar::new_spinner();
pb.set_draw_target(indicatif::ProgressDrawTarget::stderr());
pb.enable_steady_tick(std::time::Duration::from_millis(120));
pb.set_message(msg.to_string());
Self(Some(pb))
}
fn start_bytes(msg: &str, total: u64, enabled: bool) -> Self {
use std::io::IsTerminal as _;
if !enabled || !std::io::stderr().is_terminal() {
return Self(None);
}
let pb = indicatif::ProgressBar::new(total);
pb.set_draw_target(indicatif::ProgressDrawTarget::stderr());
pb.enable_steady_tick(std::time::Duration::from_millis(120));
let style = indicatif::ProgressStyle::with_template(
"{msg} [{bar:30}] {bytes}/{total_bytes} ({bytes_per_sec})",
)
.unwrap_or_else(|_| indicatif::ProgressStyle::default_bar())
.progress_chars("=>-");
pb.set_style(style);
pb.set_message(msg.to_string());
Self(Some(pb))
}
fn inc(&self, n: u64) {
if let Some(pb) = &self.0 {
pb.inc(n);
}
}
fn finish(mut self) {
self.clear();
}
fn clear(&mut self) {
if let Some(pb) = self.0.take() {
pb.finish_and_clear();
}
}
}
impl Drop for Progress {
fn drop(&mut self) {
self.clear();
}
}
#[derive(Serialize)]
struct RecvBlobView {
peer: u64,
out: String,
bytes: u64,
duration_secs: f64,
throughput_mib_s: f64,
}
#[derive(Serialize)]
struct SendBlobView {
blob_ref: String,
#[serde(skip_serializing_if = "Option::is_none")]
hash: Option<String>,
size: u64,
chunks: u64,
#[serde(skip_serializing_if = "Option::is_none")]
staged_to: Option<String>,
}
#[derive(Serialize)]
struct RecvDirView {
peer: u64,
out: String,
files: u64,
dirs: u64,
symlinks: u64,
bytes: u64,
duration_secs: f64,
throughput_mib_s: f64,
atomic: bool,
}
#[derive(Serialize)]
struct SendDirView {
remote_ref: String,
manifest_size: u64,
#[serde(skip_serializing_if = "Option::is_none")]
staged_to: Option<String>,
}
#[derive(Serialize)]
struct LsView {
transfer_count: u64,
transfers: Vec<TransferRow>,
}
#[derive(Serialize)]
struct TransferRow {
transfer_id: u64,
peer: u64,
hash: String,
bytes_received: u64,
#[serde(skip_serializing_if = "Option::is_none")]
total_bytes: Option<u64>,
}
impl From<&transport::TransferStatus> for TransferRow {
fn from(s: &transport::TransferStatus) -> Self {
Self {
transfer_id: s.stream_id,
peer: s.holder,
hash: hex::encode(s.expected_hash),
bytes_received: s.bytes_received,
total_bytes: s.total_bytes,
}
}
}
#[derive(Serialize)]
struct StatusView {
transfer_id: u64,
found: bool,
#[serde(skip_serializing_if = "Option::is_none")]
transfer: Option<TransferRow>,
}
#[derive(Serialize)]
struct CancelView {
transfer_id: u64,
cancelled: bool,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_content_ref_accepts_a_bare_32_byte_hash() {
let hex = "ab".repeat(32); let parsed = parse_content_ref(&hex, "--blob-ref").expect("bare hash parses");
match parsed {
BlobRef::Small {
hash, size, uri, ..
} => {
assert_eq!(hash, [0xabu8; 32]);
assert_eq!(size, 0);
assert_eq!(uri, format!("mesh://{hex}"));
}
other => panic!("expected Small, got {other:?}"),
}
}
#[test]
fn parse_content_ref_strips_a_single_0x_prefix() {
let hex = "cd".repeat(32);
let lower = parse_content_ref(&format!("0x{hex}"), "--blob-ref").expect("0x prefix");
let upper = parse_content_ref(&format!("0X{hex}"), "--blob-ref").expect("0X prefix");
let bare = parse_content_ref(&hex, "--blob-ref").expect("bare");
assert_eq!(lower, bare);
assert_eq!(upper, bare);
}
#[test]
fn parse_content_ref_does_not_peel_a_doubled_0x_prefix() {
let doubled = format!("0x0x{}", "ef".repeat(32));
assert!(parse_content_ref(&doubled, "--blob-ref").is_err());
}
#[test]
fn parse_content_ref_round_trips_a_full_encoded_ref() {
let original = BlobRef::small("mesh://x".to_string(), [7u8; 32], 123);
let encoded = hex::encode(original.encode());
let parsed = parse_content_ref(&encoded, "--remote-ref").expect("full ref decodes");
assert_eq!(parsed, original);
}
#[test]
fn parse_content_ref_rejects_non_hex_and_wrong_length() {
assert!(parse_content_ref("not-hex", "--blob-ref").is_err());
assert!(parse_content_ref(&"ab".repeat(16), "--blob-ref").is_err());
}
#[test]
fn relay_hint_only_fires_for_a_relayed_fetch() {
assert_eq!(relay_hint(42, 42), "");
let hint = relay_hint(7, 42);
assert!(hint.contains('7') && hint.contains("42"), "hint: {hint}");
}
}