use std::path::PathBuf;
use anyhow::{Context, Result};
use clap::Args;
use colored::Colorize;
use rand::Rng;
use serde::Serialize;
use super::identity::{get_or_create_identity, parse_relays};
use super::spawn::{nostr_spawn_round_trip, NostrSpawnOutcome};
use paygress::nostr::{AccessDetailsContent, ErrorResponseContent, TemplateAccessPort};
#[derive(Args)]
pub struct BatchArgs {
#[arg(long)]
pub provider: String,
#[arg(long, conflicts_with_all = ["tokens_file", "split_token"])]
pub tokens: Option<String>,
#[arg(long, conflicts_with_all = ["tokens", "split_token"])]
pub tokens_file: Option<PathBuf>,
#[arg(long)]
pub split_token: Option<String>,
#[arg(long, requires = "split_token")]
pub shards: Option<usize>,
#[arg(short, long, default_value = "basic")]
pub tier: String,
#[arg(long, default_value = "agent-sandbox")]
pub template: String,
#[arg(long, default_value = "./paygress-batch")]
pub output: PathBuf,
#[arg(long, default_value_t = 120)]
pub timeout_secs: u64,
#[arg(long, default_value = "ubuntu:22.04")]
pub image: String,
#[arg(long)]
pub nostr_key: Option<String>,
#[arg(long)]
pub relays: Option<String>,
#[arg(long, value_parser = parse_isolation_level_arg)]
pub isolation_level: Option<paygress::nostr::IsolationLevel>,
}
fn parse_isolation_level_arg(s: &str) -> Result<paygress::nostr::IsolationLevel, String> {
paygress::nostr::IsolationLevel::from_slug(s).ok_or_else(|| {
format!(
"unknown isolation level `{}` (expected one of: \
shared-kernel, dedicated-host, attested-research-tier)",
s
)
})
}
#[derive(Debug, Clone, Serialize)]
pub struct ShardManifestEntry {
pub index: usize,
pub status: String, pub host: Option<String>,
pub ssh_port: Option<u16>,
pub ssh_user: Option<String>,
pub ssh_pass: Option<String>,
pub pod_id: Option<String>,
pub expires_at: Option<String>,
pub template_ports: Vec<TemplateAccessPort>,
pub error_type: Option<String>,
pub error_message: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
pub struct ShardManifest {
pub provider_npub: String,
pub template: String,
pub tier: String,
pub shard_count: usize,
pub spawned_count: usize,
pub shards: Vec<ShardManifestEntry>,
}
fn generate_password(len: usize) -> String {
const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
let mut rng = rand::thread_rng();
(0..len)
.map(|_| CHARSET[rng.gen_range(0..CHARSET.len())] as char)
.collect()
}
pub fn parse_tokens(args: &BatchArgs) -> Result<Vec<String>> {
if let Some(s) = &args.tokens {
let v: Vec<String> = s
.split(',')
.map(|t| t.trim().to_string())
.filter(|t| !t.is_empty())
.collect();
if v.is_empty() {
anyhow::bail!("--tokens must contain at least one non-empty token");
}
return Ok(v);
}
if let Some(p) = &args.tokens_file {
let content = std::fs::read_to_string(p)
.with_context(|| format!("failed to read tokens file {}", p.display()))?;
let v: Vec<String> = content
.lines()
.map(|l| l.trim().to_string())
.filter(|l| !l.is_empty() && !l.starts_with('#'))
.collect();
if v.is_empty() {
anyhow::bail!(
"token file {} contains no tokens (after stripping comments + blank lines)",
p.display()
);
}
return Ok(v);
}
anyhow::bail!("one of --tokens, --tokens-file, or --split-token is required");
}
pub async fn materialize_tokens(args: &BatchArgs) -> Result<Vec<String>> {
if let Some(big_token) = &args.split_token {
let n = args
.shards
.ok_or_else(|| anyhow::anyhow!("--shards is required when --split-token is set"))?;
if n == 0 {
anyhow::bail!("--shards must be >= 1");
}
let mut db_path = std::env::temp_dir();
db_path.push(format!(
"paygress-batch-split-{}.redb",
uuid::Uuid::new_v4()
));
let result = paygress::cashu::split_token_into_n(big_token, n, &db_path).await;
let _ = std::fs::remove_file(&db_path);
return result;
}
parse_tokens(args)
}
fn manifest_entry_from_success(
index: usize,
host_address_fallback: &str,
ssh_user: &str,
ssh_pass: &str,
access: AccessDetailsContent,
) -> ShardManifestEntry {
let host = if access.host_address.is_empty() {
host_address_fallback.to_string()
} else {
access.host_address
};
ShardManifestEntry {
index,
status: "spawned".to_string(),
host: Some(host),
ssh_port: Some(access.node_port),
ssh_user: Some(ssh_user.to_string()),
ssh_pass: Some(ssh_pass.to_string()),
pod_id: Some(access.pod_npub),
expires_at: Some(access.expires_at),
template_ports: access.template_ports,
error_type: None,
error_message: None,
}
}
fn manifest_entry_from_error(
index: usize,
status: &str,
err: ErrorResponseContent,
) -> ShardManifestEntry {
ShardManifestEntry {
index,
status: status.to_string(),
host: None,
ssh_port: None,
ssh_user: None,
ssh_pass: None,
pod_id: None,
expires_at: None,
template_ports: Vec::new(),
error_type: Some(err.error_type),
error_message: Some(err.message),
}
}
fn manifest_entry_status_only(
index: usize,
status: &str,
message: Option<String>,
) -> ShardManifestEntry {
ShardManifestEntry {
index,
status: status.to_string(),
host: None,
ssh_port: None,
ssh_user: None,
ssh_pass: None,
pod_id: None,
expires_at: None,
template_ports: Vec::new(),
error_type: None,
error_message: message,
}
}
pub async fn execute(args: BatchArgs, _verbose: bool) -> Result<()> {
let tokens = materialize_tokens(&args).await?;
let n = tokens.len();
let relays = parse_relays(args.relays.clone());
let nostr_key = get_or_create_identity(args.nostr_key.clone())?;
println!("{}", "Paygress Batch Coordinator".blue().bold());
println!("{}", "-".repeat(50).blue());
println!(" Provider: {}", args.provider.cyan());
println!(" Template: {}", args.template.cyan());
println!(" Tier: {}", args.tier);
println!(" Shards: {}", n);
println!(" Output dir: {}", args.output.display());
println!();
std::fs::create_dir_all(&args.output)
.with_context(|| format!("failed to create output dir {}", args.output.display()))?;
for i in 0..n {
let p = args.output.join(format!("shard-{}", i));
std::fs::create_dir_all(&p)
.with_context(|| format!("failed to create shard subdir {}", p.display()))?;
}
let mut handles = Vec::with_capacity(n);
for (i, token) in tokens.into_iter().enumerate() {
let provider = args.provider.clone();
let tier = args.tier.clone();
let image = args.image.clone();
let template = Some(args.template.clone());
let relays = relays.clone();
let nostr_key = nostr_key.clone();
let timeout = args.timeout_secs;
let ssh_user = "user".to_string();
let ssh_pass = generate_password(16);
let iso = args.isolation_level;
let handle = tokio::spawn(async move {
let outcome = nostr_spawn_round_trip(
&provider,
&tier,
&token,
image,
ssh_user.clone(),
ssh_pass.clone(),
template,
None,
None,
None,
None,
iso,
relays,
nostr_key,
timeout,
)
.await;
(i, ssh_user, ssh_pass, outcome)
});
handles.push(handle);
}
let mut entries: Vec<ShardManifestEntry> = Vec::with_capacity(n);
for h in handles {
let (i, ssh_user, ssh_pass, outcome) = match h.await {
Ok(v) => v,
Err(e) => {
entries.push(manifest_entry_status_only(
0,
"join_error",
Some(format!("tokio join error: {}", e)),
));
continue;
}
};
let entry = match outcome {
Ok(NostrSpawnOutcome::Success(access)) => {
manifest_entry_from_success(i, &args.provider, &ssh_user, &ssh_pass, access)
}
Ok(NostrSpawnOutcome::ProviderError(err)) => {
manifest_entry_from_error(i, "provider_error", err)
}
Ok(NostrSpawnOutcome::ProviderOffline) => manifest_entry_status_only(
i,
"offline",
Some("provider's heartbeat did not appear within the live window".to_string()),
),
Ok(NostrSpawnOutcome::Timeout) => manifest_entry_status_only(
i,
"timeout",
Some(format!(
"no response within {}s; token may have been spent",
args.timeout_secs
)),
),
Ok(NostrSpawnOutcome::UnknownResponse(s)) => manifest_entry_status_only(
i,
"unknown_response",
Some(format!("body: {}", s.chars().take(200).collect::<String>())),
),
Err(e) => manifest_entry_status_only(i, "transport_error", Some(e.to_string())),
};
entries.push(entry);
}
entries.sort_by_key(|e| e.index);
let spawned_count = entries.iter().filter(|e| e.status == "spawned").count();
let manifest = ShardManifest {
provider_npub: args.provider.clone(),
template: args.template.clone(),
tier: args.tier.clone(),
shard_count: n,
spawned_count,
shards: entries.clone(),
};
let manifest_path = args.output.join("shards.json");
let manifest_json = serde_json::to_string_pretty(&manifest)?;
std::fs::write(&manifest_path, manifest_json)
.with_context(|| format!("failed to write {}", manifest_path.display()))?;
println!();
println!("{}", "-".repeat(50).blue());
println!(
"{}: {}/{} shards spawned",
"Result".bold(),
spawned_count.to_string().green(),
n
);
println!(" Manifest: {}", manifest_path.display());
println!();
println!("{}", "Per-shard summary:".bold());
for e in &entries {
let status_label = match e.status.as_str() {
"spawned" => "spawned".green().to_string(),
"offline" => "offline".red().to_string(),
"timeout" => "timeout".red().to_string(),
"provider_error" | "unknown_response" | "transport_error" | "join_error" => {
e.status.red().to_string()
}
_ => e.status.to_string(),
};
match (&e.host, e.ssh_port) {
(Some(host), Some(port)) => println!(
" shard-{:<3} {:<10} {}:{}",
e.index, status_label, host, port
),
_ => {
let detail = e
.error_message
.as_deref()
.or(e.error_type.as_deref())
.unwrap_or("");
println!(" shard-{:<3} {:<10} {}", e.index, status_label, detail);
}
}
}
if spawned_count < n {
anyhow::bail!(
"{} of {} shards failed to spawn (see manifest for details)",
n - spawned_count,
n
);
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
fn args_with_tokens(s: &str) -> BatchArgs {
BatchArgs {
provider: "npub1abc".to_string(),
tokens: Some(s.to_string()),
tokens_file: None,
split_token: None,
shards: None,
tier: "basic".to_string(),
template: "agent-sandbox".to_string(),
output: PathBuf::from("/tmp/paygress-batch-test"),
timeout_secs: 120,
image: "ubuntu:22.04".to_string(),
nostr_key: None,
relays: None,
isolation_level: None,
}
}
fn args_with_file(p: PathBuf) -> BatchArgs {
BatchArgs {
provider: "npub1abc".to_string(),
tokens: None,
tokens_file: Some(p),
split_token: None,
shards: None,
tier: "basic".to_string(),
template: "agent-sandbox".to_string(),
output: PathBuf::from("/tmp/paygress-batch-test"),
timeout_secs: 120,
image: "ubuntu:22.04".to_string(),
nostr_key: None,
relays: None,
isolation_level: None,
}
}
fn args_with_split(token: &str, shards: Option<usize>) -> BatchArgs {
BatchArgs {
provider: "npub1abc".to_string(),
tokens: None,
tokens_file: None,
split_token: Some(token.to_string()),
shards,
tier: "basic".to_string(),
template: "agent-sandbox".to_string(),
output: PathBuf::from("/tmp/paygress-batch-test"),
timeout_secs: 120,
image: "ubuntu:22.04".to_string(),
nostr_key: None,
relays: None,
isolation_level: None,
}
}
#[test]
fn parse_tokens_comma_list() {
let args = args_with_tokens("a,b,c");
let v = parse_tokens(&args).unwrap();
assert_eq!(v, vec!["a", "b", "c"]);
}
#[test]
fn parse_tokens_strips_whitespace() {
let args = args_with_tokens(" a , b ,c ");
let v = parse_tokens(&args).unwrap();
assert_eq!(v, vec!["a", "b", "c"]);
}
#[test]
fn parse_tokens_drops_empty_entries() {
let args = args_with_tokens("a,,b,");
let v = parse_tokens(&args).unwrap();
assert_eq!(v, vec!["a", "b"]);
}
#[test]
fn parse_tokens_rejects_empty_input() {
let args = args_with_tokens("");
assert!(parse_tokens(&args).is_err());
let args = args_with_tokens(" , , ");
assert!(parse_tokens(&args).is_err());
}
#[test]
fn parse_tokens_from_file_with_comments() {
let dir = tempfile::tempdir().unwrap();
let p = dir.path().join("tokens.txt");
std::fs::write(
&p,
"# header comment\ntoken-a\n\n token-b \n# trailing comment\ntoken-c\n",
)
.unwrap();
let args = args_with_file(p);
let v = parse_tokens(&args).unwrap();
assert_eq!(v, vec!["token-a", "token-b", "token-c"]);
}
#[test]
fn parse_tokens_rejects_empty_file() {
let dir = tempfile::tempdir().unwrap();
let p = dir.path().join("empty.txt");
std::fs::write(&p, "# only a comment\n\n").unwrap();
let args = args_with_file(p);
assert!(parse_tokens(&args).is_err());
}
#[test]
fn manifest_entry_success_carries_access_fields() {
let access = AccessDetailsContent {
pod_npub: "container-42".to_string(),
node_port: 30042,
expires_at: "2026-04-30T00:00:00Z".to_string(),
cpu_millicores: 1000,
memory_mb: 1024,
pod_spec_name: "Basic".to_string(),
pod_spec_description: "1 vCPU".to_string(),
instructions: vec!["ssh -p 30042 root@host".to_string()],
host_address: "10.0.0.7".to_string(),
template_ports: vec![],
};
let e = manifest_entry_from_success(3, "fallback-host", "user", "pw", access);
assert_eq!(e.index, 3);
assert_eq!(e.status, "spawned");
assert_eq!(e.host.as_deref(), Some("10.0.0.7"));
assert_eq!(e.ssh_port, Some(30042));
assert_eq!(e.ssh_user.as_deref(), Some("user"));
assert_eq!(e.ssh_pass.as_deref(), Some("pw"));
assert!(e.error_type.is_none());
}
#[test]
fn manifest_entry_success_falls_back_when_host_address_empty() {
let access = AccessDetailsContent {
pod_npub: "container-1".to_string(),
node_port: 30001,
expires_at: "2026-04-30T00:00:00Z".to_string(),
cpu_millicores: 500,
memory_mb: 512,
pod_spec_name: "Basic".to_string(),
pod_spec_description: "—".to_string(),
instructions: vec![],
host_address: String::new(),
template_ports: vec![],
};
let e = manifest_entry_from_success(0, "provider-public-ip", "user", "pw", access);
assert_eq!(e.host.as_deref(), Some("provider-public-ip"));
}
#[tokio::test]
async fn materialize_tokens_split_without_shards_errors() {
let args = args_with_split("dummy-token-not-used", None);
let err = materialize_tokens(&args).await.unwrap_err();
assert!(
err.to_string().contains("--shards is required"),
"got: {}",
err
);
}
#[tokio::test]
async fn materialize_tokens_split_zero_shards_errors() {
let args = args_with_split("dummy-token-not-used", Some(0));
let err = materialize_tokens(&args).await.unwrap_err();
assert!(err.to_string().contains(">= 1"), "got: {}", err);
}
#[tokio::test]
async fn materialize_tokens_split_invalid_token_errors_fast() {
let args = args_with_split("not-a-real-cashu-token", Some(3));
let err = materialize_tokens(&args).await.unwrap_err();
assert!(
err.to_string().contains("invalid input token"),
"got: {}",
err
);
}
#[tokio::test]
async fn materialize_tokens_falls_through_to_parse_tokens_for_static_input() {
let args = args_with_tokens("a,b,c");
let v = materialize_tokens(&args).await.unwrap();
assert_eq!(v, vec!["a", "b", "c"]);
}
#[test]
fn manifest_entry_error_carries_error_fields() {
let err = ErrorResponseContent {
error_type: "token_already_spent".to_string(),
message: "this Cashu token was already redeemed".to_string(),
details: None,
};
let e = manifest_entry_from_error(2, "provider_error", err);
assert_eq!(e.index, 2);
assert_eq!(e.status, "provider_error");
assert_eq!(e.error_type.as_deref(), Some("token_already_spent"));
assert!(e.host.is_none());
assert!(e.ssh_port.is_none());
}
}