mod config;
use config::AeroSyncConfig;
use zeroize::Zeroizing;
use aerosync::core::{
auth::{AuthConfig, AuthManager},
discovery::AeroSyncMdns,
preflight::preflight_check,
resume::ResumeStore,
server::{FileReceiver, ServerConfig, TlsConfig},
transfer::{TransferConfig, TransferEngine, TransferTask},
FileManager,
};
use aerosync::protocols::{http::HttpConfig, quic::QuicConfig, ratelimit::parse_limit, AutoAdapter};
use clap::{Parser, Subcommand};
use futures::stream::{self, StreamExt};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use uuid::Uuid;
#[derive(Parser)]
#[command(
name = "aerosync",
about = "High-performance cross-network file transfer for agents",
version = "0.2.0"
)]
struct Cli {
#[arg(short, long, global = true)]
verbose: bool,
#[arg(long, global = true, default_value = "~/.aerosync/config.toml")]
config: Option<PathBuf>,
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand)]
enum Commands {
Send {
source: PathBuf,
destination: String,
#[arg(short, long)]
recursive: bool,
#[arg(long)]
protocol: Option<String>,
#[arg(long)]
token: Option<String>,
#[arg(long, default_value = "4")]
parallel: usize,
#[arg(long)]
no_verify: bool,
#[arg(long)]
dry_run: bool,
#[arg(long)]
no_resume: bool,
#[arg(long)]
no_preflight: bool,
#[arg(long)]
limit: Option<String>,
#[arg(long, value_name = "CERT_DER")]
pin_cert: Vec<PathBuf>,
#[arg(long)]
accept_invalid_certs: bool,
},
Receive {
#[arg(long, default_value = "7788")]
port: u16,
#[arg(long, default_value = "7789")]
quic_port: u16,
#[arg(long, default_value = "./received")]
save_to: PathBuf,
#[arg(long, default_value = "0.0.0.0")]
bind: String,
#[arg(long)]
auth_token: Option<String>,
#[arg(long)]
one_shot: bool,
#[arg(long)]
overwrite: bool,
#[arg(long, default_value = "107374182400")]
max_size: u64,
#[arg(long)]
http_only: bool,
#[arg(long)]
tls_cert: Option<PathBuf>,
#[arg(long)]
tls_key: Option<PathBuf>,
#[arg(long)]
https: bool,
#[arg(long, default_value = "7790")]
https_port: u16,
#[arg(long)]
no_http: bool,
},
Token {
#[command(subcommand)]
action: TokenAction,
},
Status {
#[arg(default_value = "localhost:7788")]
host: String,
},
Resume {
#[command(subcommand)]
action: ResumeAction,
},
History {
#[arg(long, default_value = "20")]
limit: usize,
#[arg(long)]
sent: bool,
#[arg(long)]
received: bool,
#[arg(long)]
success_only: bool,
},
Watch {
#[arg(default_value = "localhost:7788")]
host: String,
#[arg(long)]
filter: Option<String>,
#[arg(long, default_value = "pretty")]
format: String,
#[arg(long)]
reconnect: bool,
#[arg(long, default_value = "0")]
max_retries: u32,
#[arg(long, default_value = "2")]
retry_delay: u64,
},
Discover {
#[arg(long, default_value = "3")]
timeout: u64,
#[arg(long)]
json: bool,
},
}
#[derive(Subcommand)]
enum TokenAction {
Generate {
#[arg(long)]
secret: Option<String>,
#[arg(long, default_value = "24")]
hours: u64,
#[arg(long)]
save: bool,
#[arg(long)]
label: Option<String>,
},
Verify {
token: String,
#[arg(long)]
secret: String,
},
List,
Revoke {
token_prefix: String,
},
}
#[derive(Subcommand)]
enum ResumeAction {
List {
#[arg(long, default_value = ".")]
state_dir: PathBuf,
},
Clear {
task_id: String,
#[arg(long, default_value = ".")]
state_dir: PathBuf,
},
ClearAll {
#[arg(long, default_value = ".")]
state_dir: PathBuf,
},
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
let level = if cli.verbose { "debug" } else { "info" };
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(level)),
)
.init();
let app_config = if let Some(ref cfg_path) = cli.config {
let expanded = shellexpand::tilde(&cfg_path.to_string_lossy()).to_string();
AeroSyncConfig::load(std::path::Path::new(&expanded)).unwrap_or_default()
} else {
AeroSyncConfig::default()
};
match cli.command {
Commands::Send {
source,
destination,
recursive,
protocol,
token,
parallel,
no_verify,
dry_run,
no_resume,
no_preflight,
limit,
pin_cert,
accept_invalid_certs,
} => {
cmd_send(
source,
destination,
recursive,
protocol,
token,
parallel,
no_verify,
dry_run,
no_resume,
no_preflight,
limit,
pin_cert,
accept_invalid_certs,
&app_config,
)
.await?;
}
Commands::Receive {
port,
quic_port,
save_to,
bind,
auth_token,
one_shot,
overwrite,
max_size,
http_only,
tls_cert,
tls_key,
https,
https_port,
no_http,
} => {
cmd_receive(
port,
quic_port,
save_to,
bind,
auth_token,
one_shot,
overwrite,
max_size,
http_only,
no_http,
tls_cert,
tls_key,
https,
https_port,
&app_config,
cli.config.clone(),
)
.await?;
}
Commands::Token { action } => {
cmd_token(action).await?;
}
Commands::Status { host } => {
cmd_status(host).await?;
}
Commands::Resume { action } => {
cmd_resume(action).await?;
}
Commands::History {
limit,
sent,
received,
success_only,
} => {
cmd_history(limit, sent, received, success_only).await?;
}
Commands::Watch {
host,
filter,
format,
reconnect,
max_retries,
retry_delay,
} => {
cmd_watch(host, filter, format, reconnect, max_retries, retry_delay).await?;
}
Commands::Discover { timeout, json } => {
cmd_discover(timeout, json).await?;
}
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn cmd_send(
source: PathBuf,
destination: String,
recursive: bool,
_protocol: Option<String>,
token: Option<String>,
_parallel: usize,
no_verify: bool,
dry_run: bool,
no_resume: bool,
no_preflight: bool,
limit: Option<String>,
pin_cert: Vec<PathBuf>,
accept_invalid_certs: bool,
app_config: &AeroSyncConfig,
) -> anyhow::Result<()> {
let files = collect_files(&source, recursive).await?;
if files.is_empty() {
eprintln!("No files found at: {}", source.display());
return Ok(());
}
let total_size: u64 = files.iter().map(|f| f.2).sum();
println!(
"Sending {} file(s), total {:.2} MB",
files.len(),
total_size as f64 / 1_048_576.0
);
if dry_run {
println!("\nDry run — files that would be sent:");
for (path, rel, size) in &files {
println!(
" {} → {} ({:.2} KB)",
path.display(),
rel.display(),
*size as f64 / 1024.0
);
}
return Ok(());
}
let dest_url = negotiate_protocol(&destination).await;
if !no_preflight {
let http_base = extract_http_base(&dest_url, &destination);
match preflight_check(&http_base, total_size).await {
Ok(info) => {
tracing::info!(
"Preflight OK: free={:.2} GB, version={:?}",
info.free_bytes as f64 / 1_073_741_824.0,
info.version
);
}
Err(e) => {
eprintln!("Preflight check failed: {}", e);
eprintln!("Use --no-preflight to skip this check.");
return Err(anyhow::anyhow!("Preflight failed: {}", e));
}
}
}
let config = TransferConfig {
max_concurrent_transfers: app_config.transfer.max_concurrent,
chunk_size: (app_config.transfer.chunk_size_mb * 1024 * 1024) as usize,
retry_attempts: app_config.transfer.retry_attempts,
timeout_seconds: app_config.transfer.timeout_seconds,
use_quic: !destination.starts_with("http"),
auth_token: token
.clone()
.or_else(|| app_config.auth.token.clone())
.map(Zeroizing::new),
enable_resume: !no_resume,
..TransferConfig::default()
};
let eff_token = token.clone().or_else(|| app_config.auth.token.clone());
let upload_limit_bps = limit.as_deref().and_then(parse_limit).unwrap_or(0);
if upload_limit_bps > 0 {
println!("Upload limit: {:.1} KB/s", upload_limit_bps as f64 / 1024.0);
}
let http_config = HttpConfig {
timeout_seconds: app_config.transfer.timeout_seconds,
max_retries: app_config.transfer.retry_attempts,
chunk_size: (app_config.transfer.chunk_size_mb * 1024 * 1024) as usize,
auth_token: eff_token.clone().map(Zeroizing::new),
upload_limit_bps,
accept_invalid_certs,
pinned_server_certs: pin_cert.clone(),
concurrent_chunks: 4,
max_reconnect_attempts: 5,
reconnect_base_delay_ms: 3_000,
};
let quic_config = QuicConfig {
auth_token: eff_token.clone().map(Zeroizing::new),
pinned_server_certs: pin_cert,
..QuicConfig::default()
};
let adapter = Arc::new(AutoAdapter::new(http_config, quic_config));
let engine = TransferEngine::new(config);
engine.start(adapter).await?;
let sha256_map: HashMap<PathBuf, Option<String>> = if !no_verify {
println!("Computing SHA-256 checksums ({} file(s))...", files.len());
stream::iter(files.iter().map(|(path, _, _)| path.clone()))
.map(|path| async move {
let hash = match FileManager::compute_sha256(&path).await {
Ok(h) => Some(h),
Err(e) => {
tracing::warn!("Could not compute SHA-256 for {}: {}", path.display(), e);
None
}
};
(path, hash)
})
.buffer_unordered(8)
.collect()
.await
} else {
HashMap::new()
};
let mp = MultiProgress::new();
let summary_style = ProgressStyle::with_template(
"Overall [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta}) @ {binary_bytes_per_sec}",
)
.unwrap()
.progress_chars("=>-");
let summary_pb = mp.add(ProgressBar::new(total_size));
summary_pb.set_style(summary_style);
let file_style = ProgressStyle::with_template(
" {spinner} {msg:<30} [{bar:30.green/white}] {bytes}/{total_bytes}",
)
.unwrap()
.progress_chars("=>-");
let mut file_bars: HashMap<Uuid, ProgressBar> = HashMap::new();
for (path, relative_path, size) in &files {
let file_name = path
.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_default();
let sha256 = sha256_map.get(path).and_then(|h| h.clone());
let rel_str = relative_path.to_string_lossy();
let task_dest = format!("{}/{}", dest_url.trim_end_matches('/'), rel_str);
let mut task = TransferTask::new_upload(path.clone(), task_dest, *size);
task.sha256 = sha256;
let task_id = task.id;
engine.add_transfer(task).await?;
let pb = mp.add(ProgressBar::new(*size));
pb.set_style(file_style.clone());
pb.set_message(file_name);
file_bars.insert(task_id, pb);
}
let monitor = engine.get_progress_monitor().await;
let deadline = tokio::time::Instant::now() + Duration::from_secs(300);
let mut last_total_bytes: u64 = 0;
loop {
let done = {
let m = monitor.read().await;
let stats = m.get_stats();
for tp in m.get_active_transfers() {
if let Some(pb) = file_bars.get(&tp.task_id) {
pb.set_position(tp.bytes_transferred.min(tp.total_bytes));
if matches!(
tp.status,
aerosync::core::progress::TransferStatus::Completed
) {
pb.finish_with_message(format!("{} ✓", pb.message()));
} else if matches!(
tp.status,
aerosync::core::progress::TransferStatus::Failed(_)
) {
pb.abandon_with_message(format!("{} ✗", pb.message()));
}
}
}
let delta = stats.transferred_bytes.saturating_sub(last_total_bytes);
if delta > 0 {
summary_pb.inc(delta);
last_total_bytes = stats.transferred_bytes;
}
stats.completed_files + stats.failed_files >= stats.total_files
};
if done {
break;
}
if tokio::time::Instant::now() >= deadline {
eprintln!("Timeout waiting for transfers");
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
summary_pb.finish_with_message("Done");
let m = monitor.read().await;
let stats = m.get_stats();
let speed_mb = stats.overall_speed / 1_048_576.0;
println!(
"\nCompleted: {}/{} files, Failed: {}, Avg speed: {:.2} MB/s",
stats.completed_files, stats.total_files, stats.failed_files, speed_mb
);
if stats.failed_files > 0 {
let transfers = m.get_active_transfers();
for t in &transfers {
if let aerosync::core::progress::TransferStatus::Failed(ref err_msg) = t.status {
let fake_err = aerosync::core::AeroSyncError::Network(err_msg.clone());
if let Some(adv) = aerosync::core::error_advice::advice_for(&fake_err) {
eprintln!("\n {}", adv.summary);
eprintln!(" Suggestions:");
for (i, s) in adv.suggestions.iter().enumerate() {
eprintln!(" {}. {}", i + 1, s);
}
break; }
}
}
}
Ok(())
}
async fn collect_files(
source: &PathBuf,
recursive: bool,
) -> anyhow::Result<Vec<(PathBuf, PathBuf, u64)>> {
let meta = tokio::fs::metadata(source).await?;
if meta.is_file() {
let rel = PathBuf::from(source.file_name().unwrap_or(source.as_os_str()));
return Ok(vec![(source.clone(), rel, meta.len())]);
}
if !meta.is_dir() {
return Err(anyhow::anyhow!("Source is not a file or directory"));
}
if !recursive {
return Err(anyhow::anyhow!(
"'{}' is a directory. Use --recursive to send directories.",
source.display()
));
}
let mut result = Vec::new();
collect_files_recursive(source, source, &mut result).await?;
Ok(result)
}
fn collect_files_recursive<'a>(
base: &'a PathBuf,
dir: &'a PathBuf,
out: &'a mut Vec<(PathBuf, PathBuf, u64)>,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Result<()>> + Send + 'a>> {
Box::pin(async move {
let mut entries = tokio::fs::read_dir(dir).await?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
let meta = entry.metadata().await?;
if meta.is_file() {
let rel = path.strip_prefix(base).unwrap_or(&path).to_path_buf();
out.push((path, rel, meta.len()));
} else if meta.is_dir() {
collect_files_recursive(base, &path, out).await?;
}
}
Ok(())
})
}
fn extract_http_base(dest_url: &str, original_dest: &str) -> String {
if dest_url.starts_with("https://") {
let trimmed = dest_url.trim_start_matches("https://");
let host_port = trimmed.split('/').next().unwrap_or(trimmed);
return format!("https://{}", host_port);
}
if dest_url.starts_with("http://") {
let trimmed = dest_url.trim_start_matches("http://");
let host_port = trimmed.split('/').next().unwrap_or(trimmed);
return format!("http://{}", host_port);
}
if dest_url.starts_with("quic://") {
let trimmed = dest_url.trim_start_matches("quic://");
let host_port = trimmed.split('/').next().unwrap_or(trimmed);
if let Some(colon_pos) = host_port.rfind(':') {
let host = &host_port[..colon_pos];
let quic_port: u16 = host_port[colon_pos + 1..].parse().unwrap_or(7789);
let http_port = quic_port.saturating_sub(1);
return format!("http://{}:{}", host, http_port);
}
return format!("http://{}", host_port);
}
format!("http://{}", original_dest)
}
async fn negotiate_protocol(dest: &str) -> String {
if dest.starts_with("http://")
|| dest.starts_with("https://")
|| dest.starts_with("quic://")
|| dest.starts_with("s3://")
|| dest.starts_with("ftp://")
{
return dest.to_string();
}
let health_url = format!("http://{}/health", dest);
let probe = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(2))
.build();
if let Ok(client) = probe {
if let Ok(resp) = client.get(&health_url).send().await {
let is_aerosync = resp
.headers()
.get("x-aerosync")
.and_then(|v| v.to_str().ok())
.map(|v| v == "true")
.unwrap_or(false);
if is_aerosync {
let quic_dest = if let Some(colon_pos) = dest.rfind(':') {
let host = &dest[..colon_pos];
let http_port: u16 = dest[colon_pos + 1..].parse().unwrap_or(7788);
let quic_port = http_port + 1;
format!("quic://{}:{}/upload", host, quic_port)
} else {
format!("quic://{}:7789/upload", dest)
};
tracing::info!("AeroSync peer detected, upgrading to QUIC: {}", quic_dest);
return quic_dest;
}
}
}
format!("http://{}/upload", dest)
}
#[allow(clippy::too_many_arguments)]
async fn cmd_receive(
port: u16,
quic_port: u16,
save_to: PathBuf,
bind: String,
auth_token: Option<String>,
one_shot: bool,
overwrite: bool,
max_size: u64,
http_only: bool,
no_http: bool,
tls_cert: Option<PathBuf>,
tls_key: Option<PathBuf>,
https: bool,
https_port: u16,
app_config: &AeroSyncConfig,
config_path: Option<PathBuf>,
) -> anyhow::Result<()> {
let auth_cfg = auth_token.map(|token| {
AuthConfig {
enable_auth: true,
secret_key: token,
token_lifetime_hours: 24,
allowed_ips: vec![],
}
});
let config = ServerConfig {
http_port: port,
quic_port,
bind_address: bind.clone(),
receive_directory: save_to.clone(),
max_file_size: max_size,
allow_overwrite: overwrite,
enable_http: !no_http,
enable_quic: !http_only && !no_http,
auth: auth_cfg,
audit_log: None,
tls: match (tls_cert, tls_key) {
(Some(cert), Some(key)) => Some(TlsConfig {
cert_path: cert,
key_path: key,
}),
_ => None,
},
enable_metrics: app_config.metrics.enabled,
enable_ws: app_config.ws.enabled,
ws_event_buffer: app_config.ws.event_buffer,
routing: app_config.routing.clone(),
enable_https: https,
https_port,
};
println!("AeroSync receiver starting...");
if !no_http {
println!(" HTTP: {}:{}", bind, port);
}
if !http_only && !no_http {
println!(" QUIC: {}:{}", bind, quic_port);
}
if https {
println!(" HTTPS: {}:{}", bind, https_port);
}
println!(" Save: {}", save_to.display());
if overwrite {
println!(" Mode: overwrite enabled");
}
println!("\nReady. Waiting for files... (Ctrl+C to stop)\n");
let mut receiver = FileReceiver::new(config);
receiver.start().await?;
if let Some(ref cfg_path) = config_path {
let expanded = shellexpand::tilde(&cfg_path.to_string_lossy()).to_string();
receiver.watch_config_reload(std::path::PathBuf::from(expanded));
}
if one_shot {
loop {
tokio::time::sleep(Duration::from_millis(500)).await;
let files = receiver.get_received_files().await;
if !files.is_empty() {
let f = &files[0];
println!(
"Received: {} ({:.2} KB) -> {}",
f.original_name,
f.size as f64 / 1024.0,
f.saved_path.display()
);
receiver.stop().await?;
break;
}
}
} else {
let mut last_count = 0;
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
let files = receiver.get_received_files().await;
if files.len() > last_count {
for f in &files[last_count..] {
println!(
" [+] {} ({:.2} KB) sha256={}",
f.original_name,
f.size as f64 / 1024.0,
f.sha256.as_deref().map(|h| &h[..8]).unwrap_or("none")
);
}
last_count = files.len();
}
}
}
Ok(())
}
async fn cmd_token(action: TokenAction) -> anyhow::Result<()> {
match action {
TokenAction::Generate {
secret,
hours,
save,
label,
} => {
let secret_key = secret
.unwrap_or_else(|| format!("{}-{}", uuid::Uuid::new_v4(), uuid::Uuid::new_v4()));
let config = AuthConfig {
enable_auth: true,
secret_key: secret_key.clone(),
token_lifetime_hours: hours,
allowed_ips: vec![],
};
let manager = AuthManager::new(config)
.map_err(|e| anyhow::anyhow!("Failed to create auth manager: {}", e))?;
let token = manager
.generate_token()
.map_err(|e| anyhow::anyhow!("Failed to generate token: {}", e))?;
println!("Token: {}", token);
println!("Secret: {}", secret_key);
println!("Expires: {} hours", hours);
if save {
let store_path = aerosync::core::auth::TokenStore::default_path();
let store = aerosync::core::auth::TokenStore::new(&store_path);
let expires_at = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
+ hours * 3600;
store.save(&token, label.as_deref(), expires_at)?;
println!("Saved to: {}", store_path.display());
}
println!("\nUsage:");
println!(" aerosync send ./file host:7788 --token {}", &token[..32]);
println!(" aerosync receive --auth-token <same-token>");
}
TokenAction::Verify { token, secret } => {
let config = AuthConfig {
enable_auth: true,
secret_key: secret,
token_lifetime_hours: 24,
allowed_ips: vec![],
};
let manager = AuthManager::new(config)
.map_err(|e| anyhow::anyhow!("Failed to create auth manager: {}", e))?;
match manager.authenticate(Some(&token), "127.0.0.1") {
Ok(true) => println!("Token is VALID"),
Ok(false) => println!("Token is INVALID or EXPIRED"),
Err(e) => println!("Verification error: {}", e),
}
}
TokenAction::List => {
let store_path = aerosync::core::auth::TokenStore::default_path();
let store = aerosync::core::auth::TokenStore::new(&store_path);
let tokens = store.list_all()?;
if tokens.is_empty() {
println!("No saved tokens. Use: aerosync token generate --save");
} else {
println!("{} token(s):\n", tokens.len());
for t in &tokens {
let status = if t.revoked {
"revoked"
} else if t.is_expired() {
"expired"
} else {
"valid"
};
println!(
" [{}] {}... ({}{})",
status,
&t.token[..t.token.len().min(32)],
t.label.as_deref().unwrap_or(""),
if t.label.is_some() { " " } else { "" }
);
}
}
}
TokenAction::Revoke { token_prefix } => {
let store_path = aerosync::core::auth::TokenStore::default_path();
let store = aerosync::core::auth::TokenStore::new(&store_path);
if let Some(found) = store.find_by_prefix(&token_prefix)? {
store.revoke(&found.token)?;
println!("Revoked: {}...", &found.token[..found.token.len().min(32)]);
} else {
if store.revoke(&token_prefix)? {
println!("Revoked token.");
} else {
eprintln!(
"Token not found: {}",
&token_prefix[..token_prefix.len().min(16)]
);
}
}
}
}
Ok(())
}
async fn cmd_status(host: String) -> anyhow::Result<()> {
let url = if host.starts_with("http") {
format!("{}/health", host)
} else {
format!("http://{}/health", host)
};
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(5))
.build()?;
match client.get(&url).send().await {
Ok(resp) if resp.status().is_success() => {
let body: serde_json::Value = resp.json().await?;
println!("Status: {}", body["status"].as_str().unwrap_or("unknown"));
println!(
"Received files: {}",
body["received_files"].as_u64().unwrap_or(0)
);
}
Ok(resp) => {
eprintln!("Server returned: {}", resp.status());
}
Err(e) => {
eprintln!("Cannot reach {}: {}", host, e);
}
}
Ok(())
}
async fn cmd_history(
limit: usize,
sent: bool,
received: bool,
success_only: bool,
) -> anyhow::Result<()> {
use aerosync::core::{HistoryQuery, HistoryStore};
let store_path = HistoryStore::default_path();
if !store_path.exists() {
println!("No transfer history yet.");
return Ok(());
}
let store = HistoryStore::new(&store_path).await?;
let direction = if sent {
Some("send".to_string())
} else if received {
Some("receive".to_string())
} else {
None
};
let q = HistoryQuery {
direction,
success_only,
limit,
..Default::default()
};
let entries = store.query(&q).await?;
if entries.is_empty() {
println!("No matching history records.");
return Ok(());
}
println!("{} record(s):\n", entries.len());
for e in &entries {
let status_marker = if e.success { "✓" } else { "✗" };
let speed_kb = if e.avg_speed_bps > 0 {
e.avg_speed_bps as f64 / 1024.0
} else {
0.0
};
println!(
" {} [{:>7}] {:>6.1} KB/s {:<30} {} → {}",
status_marker,
e.protocol,
speed_kb,
&e.filename[..e.filename.len().min(30)],
e.direction,
e.remote_ip.as_deref().unwrap_or("?")
);
if let Some(ref err) = e.error {
println!(" error: {}", err);
}
}
Ok(())
}
async fn cmd_resume(action: ResumeAction) -> anyhow::Result<()> {
match action {
ResumeAction::List { state_dir } => {
let store = ResumeStore::new(&state_dir);
let pending = store.list_pending().await?;
if pending.is_empty() {
println!("No pending resume tasks.");
} else {
println!("{} pending transfer(s):\n", pending.len());
for s in &pending {
let done = s.completed_chunks.len();
let total = s.total_chunks;
let pct = done
.saturating_mul(100)
.checked_div(total as usize)
.unwrap_or(0);
println!(
" [{}] {} → {}",
s.task_id,
s.source_path.display(),
s.destination
);
println!(
" Progress: {}/{} chunks ({}%), {:.2} MB / {:.2} MB",
done,
total,
pct,
s.bytes_transferred() as f64 / 1_048_576.0,
s.total_size as f64 / 1_048_576.0
);
}
println!("\nResume with: aerosync send <source> <destination>");
}
}
ResumeAction::Clear { task_id, state_dir } => {
let uuid = task_id
.parse::<uuid::Uuid>()
.map_err(|_| anyhow::anyhow!("Invalid task ID: {}", task_id))?;
let store = ResumeStore::new(&state_dir);
store.delete(uuid).await?;
println!("Cleared resume state for task {}", task_id);
}
ResumeAction::ClearAll { state_dir } => {
let store = ResumeStore::new(&state_dir);
let pending = store.list_pending().await?;
let count = pending.len();
for s in pending {
store.delete(s.task_id).await?;
}
println!("Cleared {} resume state(s).", count);
}
}
Ok(())
}
fn build_ws_url(host: &str) -> String {
if host.starts_with("ws://") || host.starts_with("wss://") {
host.to_string()
} else {
format!("ws://{}/ws", host.trim_end_matches('/'))
}
}
enum WatchResult {
ServerClosed,
Disconnected(String),
}
async fn watch_once(url: &str, filter: &Option<String>, format: &str) -> WatchResult {
use futures::StreamExt;
use tokio_tungstenite::{connect_async, tungstenite::Message};
let (ws_stream, _) = match connect_async(url).await {
Ok(s) => s,
Err(e) => return WatchResult::Disconnected(e.to_string()),
};
let (_write, mut read) = ws_stream.split();
while let Some(msg) = read.next().await {
match msg {
Ok(Message::Text(text)) => {
if let Some(ref f) = filter {
if !text.contains(&format!("\"{}\"", f)) {
continue;
}
}
if format == "json" {
println!("{}", text);
} else if let Ok(v) = serde_json::from_str::<serde_json::Value>(&text) {
print_event_pretty(&v);
} else {
println!("{}", text);
}
}
Ok(Message::Close(_)) => {
return WatchResult::ServerClosed;
}
Err(e) => {
return WatchResult::Disconnected(e.to_string());
}
_ => {}
}
}
WatchResult::Disconnected("stream ended".to_string())
}
async fn cmd_watch(
host: String,
filter: Option<String>,
format: String,
reconnect: bool,
max_retries: u32,
retry_delay: u64,
) -> anyhow::Result<()> {
let url = build_ws_url(&host);
let unlimited = max_retries == 0;
let mut attempt: u32 = 0;
const MAX_DELAY_SECS: u64 = 60;
eprintln!("Connecting to {}...", url);
loop {
match watch_once(&url, &filter, &format).await {
WatchResult::ServerClosed => {
eprintln!("Server closed connection.");
return Ok(());
}
WatchResult::Disconnected(reason) => {
if attempt == 0 && reason.contains("stream ended") {
return Ok(());
}
if !reconnect {
if attempt == 0 {
return Err(anyhow::anyhow!("Failed to connect to {}: {}", url, reason));
}
eprintln!("WebSocket error: {}", reason);
return Ok(());
}
attempt += 1;
if !unlimited && attempt > max_retries {
eprintln!("Max retries ({}) reached. Giving up.", max_retries);
return Err(anyhow::anyhow!(
"Failed to connect after {} attempt(s)",
attempt
));
}
let delay =
(retry_delay.saturating_mul(1u64 << (attempt - 1).min(5))).min(MAX_DELAY_SECS);
eprintln!(
"Disconnected ({}). Reconnecting in {}s... (attempt {}/{})",
reason,
delay,
attempt,
if unlimited {
"∞".to_string()
} else {
max_retries.to_string()
}
);
tokio::time::sleep(Duration::from_secs(delay)).await;
eprintln!("Reconnecting to {}...", url);
}
}
attempt = 0;
}
}
fn print_event_pretty(v: &serde_json::Value) {
let event = v["event"].as_str().unwrap_or("unknown");
match event {
"transfer_started" => eprintln!(
"[--> started ] {} ({} bytes) from {}",
v["filename"].as_str().unwrap_or("?"),
v["size"].as_u64().unwrap_or(0),
v["sender_ip"].as_str().unwrap_or("?"),
),
"progress" => {
}
"completed" => {
let sha = v["sha256"].as_str().unwrap_or("");
let sha_short = &sha[..sha.len().min(8)];
println!(
"[completed ] {} ({} bytes) sha256={}",
v["filename"].as_str().unwrap_or("?"),
v["size"].as_u64().unwrap_or(0),
sha_short,
);
}
"failed" => eprintln!(
"[ failed ] {} reason={}",
v["filename"].as_str().unwrap_or("?"),
v["reason"].as_str().unwrap_or("?"),
),
_ => println!("[{}] {}", event, v),
}
}
async fn cmd_discover(timeout_secs: u64, json: bool) -> anyhow::Result<()> {
let timeout = Duration::from_secs(timeout_secs);
if !json {
eprintln!("Scanning for AeroSync receivers on local network ({timeout_secs}s)…");
}
let mut peers = AeroSyncMdns::discover(timeout).await;
peers.sort_by(|a, b| a.host.cmp(&b.host).then(a.port.cmp(&b.port)));
if json {
for peer in &peers {
let obj = serde_json::json!({
"name": peer.name,
"host": peer.host,
"port": peer.port,
"addr": peer.addr(),
"version": peer.version,
"ws_enabled": peer.ws_enabled,
"auth_required": peer.auth_required,
});
println!("{}", obj);
}
} else if peers.is_empty() {
println!("No AeroSync receivers found.");
println!("Tip: make sure receiver is running with: aerosync receive");
} else {
println!("\nFound {} receiver(s):\n", peers.len());
println!(
"{:<20} {:<22} {:<10} {:<6} {:<6}",
"NAME", "ADDRESS", "VERSION", "WS", "AUTH"
);
println!("{}", "-".repeat(68));
for peer in &peers {
println!(
"{:<20} {:<22} {:<10} {:<6} {:<6}",
peer.name,
peer.addr(),
peer.version.as_deref().unwrap_or("-"),
if peer.ws_enabled { "yes" } else { "no" },
if peer.auth_required { "yes" } else { "no" },
);
}
println!();
println!("Use: aerosync send <file> <ADDRESS> [--token <TOKEN>]");
}
Ok(())
}
#[cfg(test)]
mod watch_tests {
use super::*;
#[test]
fn test_build_ws_url_bare_host_port() {
assert_eq!(build_ws_url("localhost:7788"), "ws://localhost:7788/ws");
}
#[test]
fn test_build_ws_url_already_ws_scheme() {
assert_eq!(build_ws_url("ws://myhost:9000/ws"), "ws://myhost:9000/ws");
}
#[test]
fn test_build_ws_url_wss_scheme() {
assert_eq!(build_ws_url("wss://myhost:443/ws"), "wss://myhost:443/ws");
}
#[test]
fn test_build_ws_url_trailing_slash_stripped() {
assert_eq!(
build_ws_url("192.168.1.10:7788/"),
"ws://192.168.1.10:7788/ws"
);
}
#[test]
fn test_retry_backoff_delay() {
let base: u64 = 2;
const MAX: u64 = 60;
let delays: Vec<u64> = (1u32..=8)
.map(|attempt| (base.saturating_mul(1u64 << (attempt - 1).min(5))).min(MAX))
.collect();
assert_eq!(delays[0], 2);
assert_eq!(delays[1], 4);
assert_eq!(delays[2], 8);
assert_eq!(delays[3], 16);
assert_eq!(delays[4], 32);
assert_eq!(delays[5], 60); assert_eq!(delays[6], 60);
assert_eq!(delays[7], 60);
}
#[test]
fn test_unlimited_retries_flag() {
let max_retries: u32 = 0;
assert!(max_retries == 0, "0 should mean unlimited");
}
}