mod ai;
mod backup;
mod bench;
mod cell;
mod config;
mod detect;
mod dmr;
mod docker;
mod embed;
mod health;
mod init;
mod metrics;
mod output;
mod paths;
mod update;
mod watch;
use anyhow::Result;
use clap::{Parser, Subcommand, ValueEnum};
use std::env;
use std::path::PathBuf;
use crate::detect::Accel;
#[derive(Parser)]
#[command(
name = "knishio",
about = "KnishIO Validator Orchestration CLI",
version,
propagate_version = true
)]
struct Cli {
#[arg(long, global = true, default_value = "https://localhost:8080")]
url: String,
#[command(subcommand)]
command: Commands,
}
#[derive(Copy, Clone, Debug, ValueEnum, PartialEq, Eq)]
#[value(rename_all = "kebab-case")]
enum AccelFlag {
Auto,
Cpu,
Cuda,
Dmr,
MetalNative,
Rocm,
Vulkan,
}
#[derive(Subcommand)]
enum Commands {
Detect,
Init {
#[arg(long)]
tls: bool,
#[arg(long)]
cors: Option<String>,
},
Start {
#[arg(long)]
build: bool,
#[arg(short, long)]
detach: bool,
#[arg(long, value_enum, default_value_t = AccelFlag::Auto)]
accel: AccelFlag,
#[arg(long)]
gen_model: Option<String>,
},
Stop {
#[arg(long, value_enum, default_value_t = AccelFlag::Auto)]
accel: AccelFlag,
},
Destroy {
#[arg(long)]
volumes: bool,
#[arg(long, value_enum, default_value_t = AccelFlag::Auto)]
accel: AccelFlag,
},
Rebuild {
#[arg(long, value_enum, default_value_t = AccelFlag::Auto)]
accel: AccelFlag,
#[arg(long)]
gen_model: Option<String>,
},
Update {
#[arg(long)]
build: bool,
#[arg(long, conflicts_with = "build")]
rollback: bool,
#[arg(long, value_enum, default_value_t = AccelFlag::Auto)]
accel: AccelFlag,
#[arg(long)]
gen_model: Option<String>,
},
Logs {
#[arg(short, long)]
follow: bool,
#[arg(long)]
tail: Option<usize>,
#[arg(long, value_enum, default_value_t = AccelFlag::Auto)]
accel: AccelFlag,
},
Status {
#[arg(long, value_enum, default_value_t = AccelFlag::Auto)]
accel: AccelFlag,
},
Dmr {
#[command(subcommand)]
command: DmrCommands,
},
Ai {
#[command(subcommand)]
command: AiCommands,
},
Cell {
#[command(subcommand)]
command: CellCommands,
},
Backup {
#[command(subcommand)]
command: BackupCommands,
},
Restore {
path: String,
#[arg(long)]
skip_verify: bool,
},
Psql {
#[arg(short, long)]
command: Option<String>,
},
Bench {
#[command(subcommand)]
command: BenchCommands,
},
Embed {
#[command(subcommand)]
command: EmbedCommands,
},
Health {
#[arg(long)]
full: bool,
},
Ready,
Full,
Db,
Metrics {
#[arg(long)]
filter: Option<String>,
#[arg(long)]
raw: bool,
},
Completions {
#[arg(value_enum)]
shell: clap_complete::Shell,
},
Watch {
#[command(subcommand)]
subject: WatchSubject,
},
}
#[derive(Subcommand)]
enum WatchSubject {
Embeddings {
#[arg(long)]
meta_type: Option<String>,
#[arg(long)]
meta_id: Option<String>,
},
Dag {
#[arg(long)]
cell: Option<String>,
},
}
#[derive(Subcommand)]
enum DmrCommands {
Status,
Enable,
Pull {
#[arg(long)]
model: Option<String>,
},
}
#[derive(Subcommand)]
enum AiCommands {
Status,
}
#[derive(Subcommand)]
enum BackupCommands {
Create {
#[arg(short, long)]
output: Option<String>,
},
List,
}
#[derive(Subcommand)]
enum CellCommands {
Create {
slug: String,
#[arg(long)]
name: Option<String>,
#[arg(long, default_value = "active")]
status: String,
},
List,
Activate {
slug: String,
},
Pause {
slug: String,
},
Archive {
slug: String,
},
}
#[derive(Subcommand)]
enum BenchCommands {
Run {
#[arg(long, default_value_t = 50)]
identities: usize,
#[arg(long, default_value = "meta", value_delimiter = ',')]
types: Vec<String>,
#[arg(long, default_value_t = 100)]
metas_per_identity: usize,
#[arg(long, default_value_t = 10)]
transfers_per_identity: usize,
#[arg(long, default_value_t = 5)]
rules_per_identity: usize,
#[arg(long, default_value_t = 5)]
burns_per_identity: usize,
#[arg(long, default_value_t = 1_000_000.0)]
token_amount: f64,
#[arg(long, default_value = "https://localhost:8080")]
endpoint: String,
#[arg(long, default_value_t = 5)]
concurrency: usize,
#[arg(long)]
cell_slug: Option<String>,
#[arg(long)]
keep: bool,
},
Generate {
#[arg(long, default_value_t = 50)]
identities: usize,
#[arg(long, default_value = "meta", value_delimiter = ',')]
types: Vec<String>,
#[arg(long, default_value_t = 100)]
metas_per_identity: usize,
#[arg(long, default_value_t = 10)]
transfers_per_identity: usize,
#[arg(long, default_value_t = 5)]
rules_per_identity: usize,
#[arg(long, default_value_t = 5)]
burns_per_identity: usize,
#[arg(long, default_value_t = 1_000_000.0)]
token_amount: f64,
#[arg(short, long)]
output: String,
},
Execute {
plan: String,
#[arg(long, default_value = "https://localhost:8080")]
endpoint: String,
#[arg(long, default_value_t = 5)]
concurrency: usize,
#[arg(long)]
cell_slug: Option<String>,
#[arg(long)]
keep: bool,
},
Clean {
#[arg(long, conflicts_with = "all")]
cell_slug: Option<String>,
#[arg(long, conflicts_with = "cell_slug")]
all: bool,
},
}
#[derive(Subcommand)]
enum EmbedCommands {
Status,
Reset {
#[arg(long, conflicts_with = "all")]
model: Option<String>,
#[arg(long, conflicts_with = "model")]
all: bool,
#[arg(long, short = 'y')]
yes: bool,
},
Search {
query: String,
#[arg(long, default_value_t = 10)]
limit: i32,
#[arg(long, default_value_t = 0.7)]
threshold: f64,
#[arg(long)]
meta_type: Option<String>,
},
Ask {
question: String,
#[arg(long, default_value_t = 20)]
max_results: i32,
#[arg(long, default_value_t = 0.5)]
threshold: f64,
#[arg(long)]
meta_type: Option<String>,
},
}
#[tokio::main]
async fn main() -> Result<()> {
let cli = Cli::parse();
let cwd = env::current_dir()?;
let cfg = config::Config::load(&cwd).with_url_override(&cli.url);
match cli.command {
Commands::Detect => {
let env = detect::detect();
detect::print_summary(&env);
}
Commands::Init { tls, cors } => {
init::run(&cwd, tls, cors.as_deref()).await?;
}
Commands::Start {
build,
detach,
accel,
gen_model,
} => {
let (accel, files) = resolve_accel_and_files(&cwd, &cfg, accel)?;
warn_feature_gated_accel_without_rebuild(accel, build);
let resolved = gen_model.as_deref().map(docker::resolve_gen_model);
let env: Vec<(&str, &str)> = match &resolved {
Some(m) => vec![("GENERATION_MODEL", m.as_str())],
None => vec![],
};
docker::start(&files, build, detach, &env).await?;
if cfg.accel_is_native(accel) {
docker::print_metal_native_hint(&cwd, &cfg);
}
}
Commands::Stop { accel } => {
let (_accel, files) = resolve_accel_and_files(&cwd, &cfg, accel)?;
docker::stop(&files).await?;
}
Commands::Destroy { volumes, accel } => {
let (_accel, files) = resolve_accel_and_files(&cwd, &cfg, accel)?;
docker::destroy(&files, volumes).await?;
}
Commands::Rebuild { accel, gen_model } => {
let (_accel, files) = resolve_accel_and_files(&cwd, &cfg, accel)?;
let resolved = gen_model.as_deref().map(docker::resolve_gen_model);
let env: Vec<(&str, &str)> = match &resolved {
Some(m) => vec![("GENERATION_MODEL", m.as_str())],
None => vec![],
};
docker::rebuild(&files, &env).await?;
}
Commands::Update {
build,
rollback,
accel,
gen_model,
} => {
let (accel, files) = resolve_accel_and_files(&cwd, &cfg, accel)?;
if !rollback {
warn_feature_gated_accel_without_rebuild(accel, build);
}
let base = files
.first()
.cloned()
.ok_or_else(|| anyhow::anyhow!("resolved accel chain is empty"))?;
if rollback {
update::rollback(&base, &cfg).await?;
} else {
let resolved = gen_model.as_deref().map(docker::resolve_gen_model);
let env: Vec<(&str, &str)> = match &resolved {
Some(m) => vec![("GENERATION_MODEL", m.as_str())],
None => vec![],
};
update::update(&base, &cfg, build, &env).await?;
}
}
Commands::Logs { follow, tail, accel } => {
let (_accel, files) = resolve_accel_and_files(&cwd, &cfg, accel)?;
docker::logs(&files, follow, tail).await?;
}
Commands::Status { accel } => {
let (_accel, files) = resolve_accel_and_files(&cwd, &cfg, accel)?;
docker::status(&files).await?;
}
Commands::Ai { command } => match command {
AiCommands::Status => ai::status(&cfg).await?,
},
Commands::Dmr { command } => match command {
DmrCommands::Status => dmr::status().await?,
DmrCommands::Enable => dmr::enable().await?,
DmrCommands::Pull { model } => dmr::pull(model).await?,
},
Commands::Cell { command } => match command {
CellCommands::Create { slug, name, status } => {
cell::create(&cfg, &slug, name.as_deref(), &status).await?;
}
CellCommands::List => {
cell::list(&cfg).await?;
}
CellCommands::Activate { slug } => {
cell::set_status(&cfg, &slug, "active").await?;
}
CellCommands::Pause { slug } => {
cell::set_status(&cfg, &slug, "paused").await?;
}
CellCommands::Archive { slug } => {
cell::set_status(&cfg, &slug, "archived").await?;
}
},
Commands::Backup { command } => match command {
BackupCommands::Create { output } => {
backup::backup(&cfg, output.as_deref()).await?;
}
BackupCommands::List => {
backup::list().await?;
}
},
Commands::Restore { path, skip_verify } => {
backup::restore(&cfg, &path, skip_verify).await?;
}
Commands::Psql { command } => {
docker::psql(&cfg, command.as_deref()).await?;
}
Commands::Bench { command } => match command {
BenchCommands::Run {
identities,
types,
metas_per_identity,
transfers_per_identity,
rules_per_identity,
burns_per_identity,
token_amount,
endpoint,
concurrency,
cell_slug,
keep,
} => {
let gen_args = bench::generate::GenerateArgs {
identities,
types,
metas_per_identity,
transfers_per_identity,
rules_per_identity,
burns_per_identity,
token_amount,
output: String::new(), };
let exec_args = bench::execute::ExecuteArgs {
plan: String::new(), endpoint: Some(endpoint),
endpoints: None,
strategy: bench::execute::Strategy::RoundRobin,
concurrency,
cell_slug,
csv: None,
plot: None,
insecure_tls: cfg.validator.insecure_tls,
};
bench::run(gen_args, exec_args, &cfg, keep).await?;
}
BenchCommands::Generate {
identities,
types,
metas_per_identity,
transfers_per_identity,
rules_per_identity,
burns_per_identity,
token_amount,
output: output_path,
} => {
let gen_args = bench::generate::GenerateArgs {
identities,
types,
metas_per_identity,
transfers_per_identity,
rules_per_identity,
burns_per_identity,
token_amount,
output: output_path,
};
bench::generate(gen_args)?;
output::success("Plan generation complete");
}
BenchCommands::Execute {
plan,
endpoint,
concurrency,
cell_slug,
keep,
} => {
let exec_args = bench::execute::ExecuteArgs {
plan,
endpoint: Some(endpoint),
endpoints: None,
strategy: bench::execute::Strategy::RoundRobin,
concurrency,
cell_slug,
csv: None,
plot: None,
insecure_tls: cfg.validator.insecure_tls,
};
bench::execute(exec_args, &cfg, keep).await?;
}
BenchCommands::Clean { cell_slug, all } => {
bench::clean(&cfg, cell_slug.as_deref(), all).await?;
}
},
Commands::Embed { command } => match command {
EmbedCommands::Status => {
embed::status(&cfg).await?;
}
EmbedCommands::Reset { model, all, yes } => {
embed::reset(&cfg, model.as_deref(), all, yes).await?;
}
EmbedCommands::Search {
query,
limit,
threshold,
meta_type,
} => {
embed::search(&cfg, &query, limit, threshold, meta_type.as_deref()).await?;
}
EmbedCommands::Ask {
question,
max_results,
threshold,
meta_type,
} => {
embed::ask(&cfg, &question, max_results, threshold, meta_type.as_deref()).await?;
}
},
Commands::Health { full } => {
if full {
health::health_full(&cfg.validator.url, cfg.validator.insecure_tls).await?;
} else {
health::healthz(&cfg.validator.url, cfg.validator.insecure_tls).await?;
}
}
Commands::Ready => {
health::readyz(&cfg.validator.url, false, cfg.validator.insecure_tls).await?;
}
Commands::Full => {
health::readyz(&cfg.validator.url, true, cfg.validator.insecure_tls).await?;
}
Commands::Db => {
health::db_check(&cfg.validator.url, cfg.validator.insecure_tls).await?;
}
Commands::Metrics { filter, raw } => {
metrics::metrics(&cfg, filter.as_deref(), raw).await?;
}
Commands::Completions { shell } => {
use clap::CommandFactory;
let mut cmd = Cli::command();
clap_complete::generate(shell, &mut cmd, "knishio", &mut std::io::stdout());
}
Commands::Watch { subject } => match subject {
WatchSubject::Embeddings { meta_type, meta_id } => {
watch::embeddings(&cfg, meta_type, meta_id).await?;
}
WatchSubject::Dag { cell } => {
watch::dag(&cfg, cell).await?;
}
},
}
Ok(())
}
fn resolve_accel_and_files(
cwd: &std::path::Path,
cfg: &config::Config,
cli_accel: AccelFlag,
) -> Result<(Accel, Vec<PathBuf>)> {
let (accel, source) = match cli_accel {
AccelFlag::Auto => match cfg.default_accel.as_deref() {
Some(name) => {
let a = parse_accel_name(name).ok_or_else(|| {
anyhow::anyhow!(
"config `default_accel = \"{}\"` is not a known accel name",
name
)
})?;
(a, AccelSource::Config)
}
None => {
let env = detect::detect();
detect::print_summary(&env);
(env.accel, AccelSource::Auto)
}
},
explicit => (flag_to_accel(explicit), AccelSource::Explicit),
};
let arrow = colored::Colorize::bold(colored::Colorize::green("→"));
match source {
AccelSource::Auto => {
}
AccelSource::Explicit => {
output::header("Environment");
println!(
"{} Accel: {} (explicit --accel; detection skipped)",
arrow,
accel
);
}
AccelSource::Config => {
output::header("Environment");
println!(
"{} Accel: {} (config default_accel; detection skipped)",
arrow,
accel
);
}
}
let configured = cfg.accel_files(accel).to_vec();
if configured.is_empty() {
output::warn(&format!(
"accel `{}` has no configured compose files; falling back to cpu",
accel
));
let cpu_files = cfg.accel_files(Accel::Cpu).to_vec();
let resolved = paths::find_compose_files(cwd, &cpu_files)?;
print_stack_line(&cpu_files);
return Ok((Accel::Cpu, resolved));
}
match paths::find_compose_files(cwd, &configured) {
Ok(resolved) => {
print_stack_line(&configured);
if accel == Accel::Dmr {
println!(
"{} {:8} validator → model-runner.docker.internal:12434/engines/llama.cpp/v1",
arrow, "Routing:"
);
}
Ok((accel, resolved))
}
Err(e) => {
output::warn(&format!("{}", e));
output::warn(&format!(
"falling back from `{}` to `cpu`",
accel
));
let cpu_files = cfg.accel_files(Accel::Cpu).to_vec();
let resolved = paths::find_compose_files(cwd, &cpu_files)?;
print_stack_line(&cpu_files);
Ok((Accel::Cpu, resolved))
}
}
}
fn warn_feature_gated_accel_without_rebuild(accel: Accel, will_rebuild: bool) {
if will_rebuild {
return;
}
let feature = match accel {
Accel::Cuda => "cuda",
Accel::Rocm => "rocm",
Accel::Vulkan => "vulkan",
_ => return,
};
output::warn(&format!(
"Accel '{}' requires a validator image built with --features {}. \
If this is a fresh setup, run `knishio rebuild --accel {}` first \
(or pass `--build` to this command); otherwise the --accel flag \
is a no-op on the existing image and inference stays on CPU.",
feature, feature, feature
));
}
fn print_stack_line(files: &[String]) {
let arrow = colored::Colorize::bold(colored::Colorize::green("→"));
println!(
"{} {:8} {}",
arrow,
"Stack:",
files.join(" + ")
);
}
enum AccelSource {
Auto,
Explicit,
Config,
}
fn flag_to_accel(f: AccelFlag) -> Accel {
match f {
AccelFlag::Auto => unreachable!("Auto handled by caller"),
AccelFlag::Cpu => Accel::Cpu,
AccelFlag::Cuda => Accel::Cuda,
AccelFlag::Dmr => Accel::Dmr,
AccelFlag::MetalNative => Accel::MetalNative,
AccelFlag::Rocm => Accel::Rocm,
AccelFlag::Vulkan => Accel::Vulkan,
}
}
fn parse_accel_name(name: &str) -> Option<Accel> {
match name {
"cpu" => Some(Accel::Cpu),
"cuda" => Some(Accel::Cuda),
"dmr" => Some(Accel::Dmr),
"metal-native" => Some(Accel::MetalNative),
"rocm" => Some(Accel::Rocm),
"vulkan" => Some(Accel::Vulkan),
_ => None,
}
}