use std::path::PathBuf;
use clap::{Args, Subcommand};
use serde::Serialize;
use crate::context::{resolve_profile, CliContext, RemoteAttach};
use crate::error::{generic, invalid_args, sdk, CliError};
use crate::parsers::{parse_u16_flexible, parse_u64_flexible};
use crate::prelude::{emit_value, OutputFormat};
#[derive(Args, Debug, Default)]
pub struct RemoteAttachArgs {
#[arg(long, value_parser = crate::parsers::parse_socket_addr_string)]
pub node_addr: Option<String>,
#[arg(long, value_parser = crate::parsers::parse_hex32_string)]
pub node_pubkey: Option<String>,
#[arg(long = "node-id", value_parser = crate::parsers::parse_u64_flexible_string)]
pub remote_node_id: Option<String>,
#[arg(long, value_parser = crate::parsers::parse_hex32_string)]
pub psk_hex: Option<String>,
}
#[derive(Subcommand, Debug)]
pub enum AggregatorCommand {
Inspect(InspectArgs),
Query(QueryArgs),
Ls(LsArgs),
Spawn(SpawnArgs),
Scale(ScaleArgs),
}
#[derive(Args, Debug)]
pub struct InspectArgs {
#[arg(long)]
pub identity: Option<PathBuf>,
#[arg(long, default_value_t = crate::prelude::DEFAULT_SUPERVISOR_NODE)]
pub node: u64,
}
#[derive(Args, Debug)]
pub struct LsArgs {
#[arg(long)]
pub identity: Option<PathBuf>,
#[arg(long, default_value_t = crate::prelude::DEFAULT_SUPERVISOR_NODE)]
pub node: u64,
#[arg(long, default_value_t = false)]
pub remote: bool,
#[command(flatten)]
pub attach: RemoteAttachArgs,
}
#[derive(Args, Debug)]
pub struct SpawnArgs {
#[arg(long)]
pub name: String,
#[arg(long)]
pub template: String,
#[arg(long)]
pub replica_count: u8,
#[arg(long)]
pub identity: Option<PathBuf>,
#[arg(long, default_value_t = crate::prelude::DEFAULT_SUPERVISOR_NODE)]
pub node: u64,
#[command(flatten)]
pub attach: RemoteAttachArgs,
}
#[derive(Args, Debug)]
pub struct ScaleArgs {
#[arg(long)]
pub name: String,
#[arg(long)]
pub template: String,
#[arg(long)]
pub replica_count: u8,
#[arg(long)]
pub identity: Option<PathBuf>,
#[arg(long, default_value_t = crate::prelude::DEFAULT_SUPERVISOR_NODE)]
pub node: u64,
#[command(flatten)]
pub attach: RemoteAttachArgs,
}
#[derive(Args, Debug)]
pub struct QueryArgs {
pub target: String,
#[arg(long)]
pub kind: String,
#[arg(long, default_value_t = false)]
pub fresh: bool,
#[arg(long)]
pub identity: Option<PathBuf>,
#[arg(long, default_value_t = crate::prelude::DEFAULT_SUPERVISOR_NODE)]
pub node: u64,
#[command(flatten)]
pub attach: RemoteAttachArgs,
}
pub async fn run(
cmd: AggregatorCommand,
output: Option<OutputFormat>,
config_path: Option<&std::path::Path>,
profile_name: &str,
) -> Result<(), CliError> {
match cmd {
AggregatorCommand::Inspect(args) => {
run_inspect(args, output, config_path, profile_name).await
}
AggregatorCommand::Query(args) => run_query(args, output, config_path, profile_name).await,
AggregatorCommand::Ls(args) => run_ls(args, output, config_path, profile_name).await,
AggregatorCommand::Spawn(args) => run_spawn(args, output, config_path, profile_name).await,
AggregatorCommand::Scale(args) => run_scale(args, output, config_path, profile_name).await,
}
}
async fn run_inspect(
args: InspectArgs,
output: Option<OutputFormat>,
config_path: Option<&std::path::Path>,
profile_name: &str,
) -> Result<(), CliError> {
let profile = resolve_profile(config_path, profile_name).await?;
let ctx = CliContext::build(&profile, args.identity.as_deref(), args.node, false).await?;
let deck = ctx.deck();
let view = match deck.aggregator_snapshot() {
Some(snap) => InspectView {
aggregator_installed: true,
source_subnet: Some(snap.source_subnet.to_string()),
fold_kinds: snap
.fold_kinds
.iter()
.map(|k| format!("{k:#06x}"))
.collect(),
summary_interval_secs: snap.summary_interval.as_secs_f64(),
generation: snap.generation,
summary_count: snap.summaries.len() as u64,
summaries: snap
.summaries
.iter()
.cloned()
.map(SummaryRow::from)
.collect(),
},
None => InspectView {
aggregator_installed: false,
source_subnet: None,
fold_kinds: Vec::new(),
summary_interval_secs: 0.0,
generation: 0,
summary_count: 0,
summaries: Vec::new(),
},
};
emit_value(OutputFormat::resolve_oneshot(output), &view)
.map_err(|e| generic(format!("write aggregator inspect: {e}")))?;
Ok(())
}
async fn run_query(
args: QueryArgs,
output: Option<OutputFormat>,
config_path: Option<&std::path::Path>,
profile_name: &str,
) -> Result<(), CliError> {
let target = parse_u64_flexible(&args.target)
.map_err(|e| invalid_args(format!("target `{}`: {e}", args.target)))?;
let kind = parse_u16_flexible(&args.kind)
.map_err(|e| invalid_args(format!("kind `{}`: {e}", args.kind)))?;
let profile = resolve_profile(config_path, profile_name).await?;
let remote = require_remote_attach(&profile, &args.attach, "query")?;
let ctx =
CliContext::build_with_remote(&profile, args.identity.as_deref(), args.node, false, remote)
.await?;
let mesh = ctx.require_mesh_node()?;
use net_sdk::aggregator::FoldQueryClient;
let client = FoldQueryClient::new(mesh);
let summaries = if args.fresh {
client
.query_summarize_now(target, kind)
.await
.map_err(|e| sdk(format!("fold.query (summarize-now) failed: {e}")))?
} else {
client
.query_latest(target, kind)
.await
.map_err(|e| sdk(format!("fold.query (latest) failed: {e}")))?
};
let view = QueryView {
target_node_id: target,
fold_kind: format!("{kind:#06x}"),
fresh: args.fresh,
summary_count: summaries.len() as u64,
summaries: summaries.into_iter().map(SummaryRow::from).collect(),
};
emit_value(OutputFormat::resolve_oneshot(output), &view)
.map_err(|e| generic(format!("write aggregator query: {e}")))?;
Ok(())
}
fn require_remote_attach(
profile: &crate::config::Profile,
args: &RemoteAttachArgs,
verb: &str,
) -> Result<RemoteAttach, CliError> {
crate::context::require_remote_attach(profile, args, || {
invalid_args(format!(
"net aggregator {verb} needs a remote daemon target: pass \
--node-addr <IP:PORT> --node-pubkey <HEX> --node-id <N> \
--psk-hex <HEX> (each can be defaulted in the profile as \
`node_addr` / `node_pubkey` / `node_id` / `psk_hex`)."
))
})
}
async fn run_ls(
args: LsArgs,
output: Option<OutputFormat>,
config_path: Option<&std::path::Path>,
profile_name: &str,
) -> Result<(), CliError> {
let profile = resolve_profile(config_path, profile_name).await?;
let want_remote = args.remote || args.attach.node_addr.is_some();
if want_remote {
return run_ls_remote(args, output, &profile).await;
}
let ctx = CliContext::build(&profile, args.identity.as_deref(), args.node, false).await?;
let deck = ctx.deck();
let snapshot = deck.aggregator_registry_snapshot().await;
let view = match snapshot {
Some(s) => LsView {
registry_installed: true,
group_count: s.groups.len() as u64,
groups: s.groups.iter().map(LsGroupRow::from).collect(),
},
None => LsView {
registry_installed: false,
group_count: 0,
groups: Vec::new(),
},
};
emit_value(OutputFormat::resolve_oneshot(output), &view)
.map_err(|e| generic(format!("write aggregator ls: {e}")))?;
Ok(())
}
async fn run_ls_remote(
args: LsArgs,
output: Option<OutputFormat>,
profile: &crate::config::Profile,
) -> Result<(), CliError> {
let remote = require_remote_attach(profile, &args.attach, "ls --remote")?;
let target_node_id = remote.node_id;
let ctx =
CliContext::build_with_remote(profile, args.identity.as_deref(), args.node, false, remote)
.await?;
let mesh = ctx.require_mesh_node()?;
use net_sdk::aggregator::RegistryClient;
let client = RegistryClient::new(mesh);
let groups = client
.list(target_node_id)
.await
.map_err(|e| sdk(format!("aggregator.registry list failed: {e}")))?;
let view = RemoteLsView {
target_node_id,
group_count: groups.len() as u64,
groups: groups.iter().map(RemoteLsGroupRow::from).collect(),
};
emit_value(OutputFormat::resolve_oneshot(output), &view)
.map_err(|e| generic(format!("write aggregator ls --remote: {e}")))?;
Ok(())
}
async fn run_spawn(
args: SpawnArgs,
output: Option<OutputFormat>,
config_path: Option<&std::path::Path>,
profile_name: &str,
) -> Result<(), CliError> {
if args.replica_count == 0 {
return Err(invalid_args("replica_count must be > 0"));
}
if args.template.trim().is_empty() {
return Err(invalid_args("--template must not be empty"));
}
if args.name.trim().is_empty() {
return Err(invalid_args("--name must not be empty"));
}
let profile = resolve_profile(config_path, profile_name).await?;
let remote = require_remote_attach(&profile, &args.attach, "spawn")?;
let target_node_id = remote.node_id;
let ctx =
CliContext::build_with_remote(&profile, args.identity.as_deref(), args.node, false, remote)
.await?;
let mesh = ctx.require_mesh_node()?;
use net_sdk::aggregator::RegistryClient;
let client = RegistryClient::new(mesh);
let summary = client
.spawn(target_node_id, args.template, args.name, args.replica_count)
.await
.map_err(|e| sdk(format!("aggregator.registry spawn failed: {e}")))?;
let view = SpawnView::from(&summary);
emit_value(OutputFormat::resolve_oneshot(output), &view)
.map_err(|e| generic(format!("write aggregator spawn: {e}")))?;
Ok(())
}
async fn run_scale(
args: ScaleArgs,
output: Option<OutputFormat>,
config_path: Option<&std::path::Path>,
profile_name: &str,
) -> Result<(), CliError> {
if args.replica_count == 0 {
return Err(invalid_args("replica_count must be > 0"));
}
if args.template.trim().is_empty() {
return Err(invalid_args("--template must not be empty"));
}
if args.name.trim().is_empty() {
return Err(invalid_args("--name must not be empty"));
}
let profile = resolve_profile(config_path, profile_name).await?;
let remote = require_remote_attach(&profile, &args.attach, "scale")?;
let target_node_id = remote.node_id;
let ctx =
CliContext::build_with_remote(&profile, args.identity.as_deref(), args.node, false, remote)
.await?;
let mesh = ctx.require_mesh_node()?;
use net_sdk::aggregator::RegistryClient;
let client = RegistryClient::new(mesh);
let summary = client
.scale(
target_node_id,
args.name.clone(),
args.template.clone(),
args.replica_count,
)
.await
.map_err(|e| sdk(format!("aggregator.registry scale failed: {e}")))?;
let view = SpawnView::from(&summary);
emit_value(OutputFormat::resolve_oneshot(output), &view)
.map_err(|e| generic(format!("write aggregator scale: {e}")))?;
Ok(())
}
#[derive(Serialize)]
struct LsView {
registry_installed: bool,
group_count: u64,
groups: Vec<LsGroupRow>,
}
#[derive(Serialize)]
struct LsGroupRow {
name: String,
group_seed: String,
replicas: Vec<LsReplicaRow>,
healthy_count: u64,
replica_count: u64,
}
impl From<&net_sdk::deck::AggregatorRegistryGroupSnapshot> for LsGroupRow {
fn from(g: &net_sdk::deck::AggregatorRegistryGroupSnapshot) -> Self {
let replicas: Vec<LsReplicaRow> = g.replicas.iter().map(LsReplicaRow::from).collect();
let healthy_count = replicas.iter().filter(|r| r.healthy).count() as u64;
Self {
name: g.name.clone(),
group_seed: g
.group_seed
.iter()
.fold(String::with_capacity(64), |mut acc, b| {
use std::fmt::Write as _;
let _ = write!(&mut acc, "{b:02x}");
acc
}),
replica_count: g.replicas.len() as u64,
healthy_count,
replicas,
}
}
}
#[derive(Serialize)]
struct LsReplicaRow {
generation: u64,
healthy: bool,
#[serde(skip_serializing_if = "Option::is_none")]
diagnostic: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
placement_node_id: Option<u64>,
}
impl From<&net_sdk::deck::AggregatorReplicaRow> for LsReplicaRow {
fn from(r: &net_sdk::deck::AggregatorReplicaRow) -> Self {
Self {
generation: r.generation,
healthy: r.healthy,
diagnostic: r.diagnostic.clone(),
placement_node_id: r.placement_node_id,
}
}
}
#[derive(Serialize)]
struct RemoteLsView {
target_node_id: u64,
group_count: u64,
groups: Vec<RemoteLsGroupRow>,
}
#[derive(Serialize)]
struct RemoteLsGroupRow {
name: String,
group_seed: String,
source_subnet: String,
fold_kinds: Vec<String>,
healthy_count: u64,
replica_count: u64,
replicas: Vec<RemoteReplicaRow>,
}
impl From<&net_sdk::aggregator::RegistryGroupSummary> for RemoteLsGroupRow {
fn from(g: &net_sdk::aggregator::RegistryGroupSummary) -> Self {
let replicas: Vec<RemoteReplicaRow> =
g.replicas.iter().map(RemoteReplicaRow::from).collect();
let healthy_count = replicas.iter().filter(|r| r.healthy).count() as u64;
Self {
name: g.name.clone(),
group_seed: hex::encode(g.group_seed),
source_subnet: g.source_subnet.to_string(),
fold_kinds: g.fold_kinds.iter().map(|k| format!("{k:#06x}")).collect(),
healthy_count,
replica_count: replicas.len() as u64,
replicas,
}
}
}
#[derive(Serialize)]
struct SpawnView {
name: String,
group_seed: String,
source_subnet: String,
fold_kinds: Vec<String>,
replica_count: u64,
replicas: Vec<RemoteReplicaRow>,
}
impl From<&net_sdk::aggregator::RegistryGroupSummary> for SpawnView {
fn from(g: &net_sdk::aggregator::RegistryGroupSummary) -> Self {
Self {
name: g.name.clone(),
group_seed: hex::encode(g.group_seed),
source_subnet: g.source_subnet.to_string(),
fold_kinds: g.fold_kinds.iter().map(|k| format!("{k:#06x}")).collect(),
replica_count: g.replicas.len() as u64,
replicas: g.replicas.iter().map(RemoteReplicaRow::from).collect(),
}
}
}
#[derive(Serialize)]
struct RemoteReplicaRow {
generation: u64,
healthy: bool,
#[serde(skip_serializing_if = "Option::is_none")]
diagnostic: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
placement_node_id: Option<u64>,
}
impl From<&net_sdk::aggregator::RegistryReplicaSummary> for RemoteReplicaRow {
fn from(r: &net_sdk::aggregator::RegistryReplicaSummary) -> Self {
Self {
generation: r.generation,
healthy: r.healthy,
diagnostic: r.diagnostic.clone(),
placement_node_id: r.placement_node_id,
}
}
}
#[derive(Serialize)]
struct QueryView {
target_node_id: u64,
fold_kind: String,
fresh: bool,
summary_count: u64,
summaries: Vec<SummaryRow>,
}
#[derive(Serialize)]
struct InspectView {
aggregator_installed: bool,
source_subnet: Option<String>,
fold_kinds: Vec<String>,
summary_interval_secs: f64,
generation: u64,
summary_count: u64,
summaries: Vec<SummaryRow>,
}
#[derive(Serialize)]
struct SummaryRow {
#[serde(skip_serializing_if = "Option::is_none")]
timestamp: Option<u64>,
fold_kind: String,
source_subnet: String,
buckets: Vec<(String, u64)>,
generation: u64,
}
impl From<net_sdk::deck::SummaryAnnouncement> for SummaryRow {
fn from(s: net_sdk::deck::SummaryAnnouncement) -> Self {
Self {
timestamp: None,
fold_kind: format!("{:#06x}", s.fold_kind),
source_subnet: s.source_subnet.to_string(),
buckets: s.buckets,
generation: s.generation,
}
}
}