use std::collections::HashMap;
use std::io::{self, BufRead, IsTerminal, Write};
use anyhow::{bail, Context, Result};
use serde_json::Value;
use crate::api::client::ApiClient;
use crate::api::error::ApiError;
use crate::config::credentials::resolve_api_key;
use crate::config::manager::ConfigManager;
use crate::model::loader::Models;
use crate::model::types::{Operation, Param};
use crate::service::executor::OperationExecutor;
use super::formatter;
pub struct OutputOpts<'a> {
pub format: &'a str,
pub query: Option<&'a str>,
pub no_header: bool,
pub wait: bool,
}
impl<'a> OutputOpts<'a> {
pub fn new(format: &'a str) -> Self {
Self {
format,
query: None,
no_header: false,
wait: false,
}
}
}
const DANGEROUS_OPERATIONS: &[&str] = &[
"delete", "drop", "suspend", "release", "restore", "clear",
];
pub async fn run(
models: &Models,
config_mgr: &ConfigManager,
resource: &str,
operation: &str,
raw_args: &[String],
output_opts: &OutputOpts<'_>,
fetch_all: bool,
) -> Result<()> {
let (op, is_data_plane) = find_operation(models, resource, operation)?;
if DANGEROUS_OPERATIONS.contains(&operation) {
let has_yes = raw_args.iter().any(|a| a == "--yes" || a == "-y");
if !has_yes {
print!("Are you sure you want to {} {}? [y/N] ", operation, resource);
io::stdout().flush()?;
let mut input = String::new();
io::stdin().lock().read_line(&mut input)?;
if !input.trim().eq_ignore_ascii_case("y") {
println!("Aborted.");
return Ok(());
}
}
}
let raw_args: Vec<String> = raw_args
.iter()
.filter(|a| *a != "--yes" && *a != "-y")
.cloned()
.collect();
if op.dedicated_only {
let ctx = config_mgr.get_context();
let is_dedicated = ctx
.plan
.as_deref()
.map(|p| p.eq_ignore_ascii_case("dedicated"))
.unwrap_or(false);
if !is_dedicated {
bail!(
"Operation '{} {}' is only available on Dedicated clusters.\n\
Your current context plan: {}.\n\
Set a Dedicated cluster context: zilliz context set --cluster-id <id>",
resource,
operation,
ctx.plan.as_deref().unwrap_or("unknown")
);
}
}
let api_key = resolve_api_key(None, config_mgr).ok_or_else(|| ApiError::NoApiKey)?;
let base_url = if is_data_plane {
let ctx = config_mgr.get_context();
ctx.endpoint
.context("No cluster context set. Run: zilliz context set --cluster-id <id>")?
} else {
models
.control_plane
.endpoint
.clone()
.unwrap_or_else(|| "https://api.cloud.zilliz.com".to_string())
};
let mut param_values = parse_args(&raw_args, &op)?;
let missing_params: Vec<&Param> = op
.params
.iter()
.filter(|p| p.required && !param_values.contains_key(&p.name))
.collect();
if !missing_params.is_empty() {
let (promptable, complex): (Vec<&Param>, Vec<&Param>) = missing_params
.into_iter()
.partition(|p| !matches!(p.param_type.as_str(), "array" | "object"));
if std::io::stdin().is_terminal() && !promptable.is_empty() {
let prompted = prompt_missing_params(&promptable)?;
for (name, value) in prompted {
param_values.insert(name, value);
}
} else if !promptable.is_empty() {
let flags: Vec<String> = promptable.iter().chain(complex.iter())
.map(|p| p.cli_flag()).collect();
bail!(
"Missing required option{}: {}",
if flags.len() > 1 { "s" } else { "" },
flags.join(", ")
);
}
if !complex.is_empty() {
let flags: Vec<String> = complex.iter().map(|p| p.cli_flag()).collect();
bail!(
"Missing required option{}: {}",
if flags.len() > 1 { "s" } else { "" },
flags.join(", ")
);
}
}
let client = ApiClient::new(api_key, base_url);
let executor = OperationExecutor::new(&client);
let result = if fetch_all && op.pagination.is_some() {
let items = executor.execute_all_pages(&op, ¶m_values).await?;
Value::Array(items)
} else {
executor.execute(&op, ¶m_values).await?
};
if output_opts.wait {
if let Some(job_id) = result.get("jobId").and_then(|v| v.as_str()) {
let wait_result = super::job_waiter::wait_for_job(
&client,
job_id,
1800,
5,
)
.await?;
print_output_with_opts(&wait_result, output_opts, Some(&op));
return Ok(());
}
}
print_output_with_opts(&result, output_opts, Some(&op));
Ok(())
}
pub fn print_output_with_opts(result: &Value, opts: &OutputOpts<'_>, op: Option<&Operation>) {
let filtered = if let Some(query) = opts.query {
match formatter::apply_query(result, query) {
Ok(v) => v,
Err(e) => {
eprintln!("Error: {}", e);
return;
}
}
} else {
result.clone()
};
match opts.format {
"json" => {
println!("{}", formatter::format_json(&filtered));
}
"text" => {
println!("{}", formatter::format_text(&filtered));
}
"yaml" => {
println!("{}", formatter::format_yaml(&filtered));
}
"csv" => {
println!("{}", formatter::format_csv(&filtered, opts.no_header));
}
_ => {
let data_field = op.and_then(|o| o.output.data_field.as_deref());
let items = extract_items(&filtered, data_field);
if items.is_empty() {
println!("{}", formatter::format_json(&filtered));
} else {
let columns = formatter::auto_columns(&items);
let col_refs: Vec<&str> = columns.iter().map(|s| s.as_str()).collect();
println!(
"{}",
formatter::format_table_with_opts(&items, &col_refs, opts.no_header)
);
}
}
}
}
fn extract_items<'a>(result: &'a Value, data_field: Option<&str>) -> Vec<&'a Value> {
if let Some(field) = data_field {
if let Some(arr) = result.get(field).and_then(|v| v.as_array()) {
return arr.iter().collect();
}
}
if let Some(arr) = result.as_array() {
return arr.iter().collect();
}
for field in &["data", "results", "items", "clusters", "collections"] {
if let Some(arr) = result.get(*field).and_then(|v| v.as_array()) {
return arr.iter().collect();
}
}
if result.is_object() {
return vec![result];
}
vec![]
}
fn prompt_missing_params(params: &[&Param]) -> Result<HashMap<String, Value>> {
let mut values = HashMap::new();
let stdin = io::stdin();
let stderr = io::stderr();
eprintln!("Please provide the required parameters:");
for param in params {
let description = param.description.as_deref().unwrap_or("");
let label = if description.is_empty() {
param.name.clone()
} else {
format!("{} ({})", param.name, description)
};
let value = match param.param_type.as_str() {
"boolean" => prompt_boolean(&stdin, &stderr, &label)?,
"integer" => prompt_integer(&stdin, &stderr, &label)?,
_ => {
if let Some(choices) = ¶m.choices {
prompt_choices(&stdin, &stderr, &label, choices)?
} else {
prompt_string(&stdin, &stderr, &label)?
}
}
};
values.insert(param.name.clone(), value);
}
Ok(values)
}
fn prompt_string(
stdin: &io::Stdin,
stderr: &io::Stderr,
label: &str,
) -> Result<Value> {
loop {
eprint!("{}: ", label);
stderr.lock().flush()?;
let mut input = String::new();
stdin.lock().read_line(&mut input)?;
let trimmed = input.trim();
if !trimmed.is_empty() {
return Ok(Value::String(trimmed.to_string()));
}
eprintln!("Value required, please try again.");
}
}
fn prompt_integer(
stdin: &io::Stdin,
stderr: &io::Stderr,
label: &str,
) -> Result<Value> {
loop {
eprint!("{}: ", label);
stderr.lock().flush()?;
let mut input = String::new();
stdin.lock().read_line(&mut input)?;
let trimmed = input.trim();
if trimmed.is_empty() {
eprintln!("Value required, please try again.");
continue;
}
match trimmed.parse::<i64>() {
Ok(n) => return Ok(Value::Number(n.into())),
Err(_) => eprintln!("Invalid integer, please try again."),
}
}
}
fn prompt_boolean(
stdin: &io::Stdin,
stderr: &io::Stderr,
label: &str,
) -> Result<Value> {
loop {
eprint!("{} [y/N]: ", label);
stderr.lock().flush()?;
let mut input = String::new();
stdin.lock().read_line(&mut input)?;
let trimmed = input.trim();
if trimmed.is_empty() {
return Ok(Value::Bool(false));
}
match trimmed.to_ascii_lowercase().as_str() {
"y" | "yes" => return Ok(Value::Bool(true)),
"n" | "no" => return Ok(Value::Bool(false)),
_ => eprintln!("Please enter y or n."),
}
}
}
fn prompt_choices(
stdin: &io::Stdin,
stderr: &io::Stderr,
label: &str,
choices: &[String],
) -> Result<Value> {
eprintln!("{}:", label);
for (i, choice) in choices.iter().enumerate() {
eprintln!(" [{}] {}", i + 1, choice);
}
loop {
eprint!("Select [1-{}]: ", choices.len());
stderr.lock().flush()?;
let mut input = String::new();
stdin.lock().read_line(&mut input)?;
let trimmed = input.trim();
if let Ok(n) = trimmed.parse::<usize>() {
if n >= 1 && n <= choices.len() {
return Ok(Value::String(choices[n - 1].clone()));
}
}
eprintln!("Invalid selection, please try again.");
}
}
fn find_operation(
models: &Models,
resource: &str,
operation: &str,
) -> Result<(Operation, bool)> {
if let Some(res) = models.control_plane.resources.get(resource) {
if let Some(op) = res.operations.get(operation) {
return Ok((op.clone(), false));
}
}
if let Some(res) = models.data_plane.resources.get(resource) {
if let Some(op) = res.operations.get(operation) {
return Ok((op.clone(), true));
}
}
for model in [&models.control_plane, &models.data_plane] {
if let Some(res) = model.resources.get(resource) {
let ops: Vec<&str> = res.operations.keys().map(|s| s.as_str()).collect();
let suggestion = suggest_closest(operation, &ops);
bail!(
"Unknown operation '{}' for resource '{}'. {}Available operations: {}",
operation,
resource,
suggestion,
ops.join(", ")
);
}
}
let mut all_resources = Vec::new();
for model in [&models.control_plane, &models.data_plane] {
for key in model.resources.keys() {
all_resources.push(key.as_str());
}
}
let suggestion = suggest_closest(resource, &all_resources);
bail!(
"Unknown resource '{}'. {}Available resources: {}",
resource,
suggestion,
all_resources.join(", ")
);
}
fn suggest_closest(input: &str, candidates: &[&str]) -> String {
let mut best: Option<(&str, usize)> = None;
for &c in candidates {
let d = edit_distance(input, c);
if d <= 3 && (best.is_none() || d < best.unwrap().1) {
best = Some((c, d));
}
}
match best {
Some((name, _)) => format!("Did you mean '{}'? ", name),
None => String::new(),
}
}
fn edit_distance(a: &str, b: &str) -> usize {
let a: Vec<char> = a.chars().collect();
let b: Vec<char> = b.chars().collect();
let mut dp = vec![vec![0usize; b.len() + 1]; a.len() + 1];
for (i, row) in dp.iter_mut().enumerate().take(a.len() + 1) {
row[0] = i;
}
for (j, val) in dp[0].iter_mut().enumerate().take(b.len() + 1) {
*val = j;
}
for i in 1..=a.len() {
for j in 1..=b.len() {
let cost = if a[i - 1] == b[j - 1] { 0 } else { 1 };
dp[i][j] = (dp[i - 1][j] + 1)
.min(dp[i][j - 1] + 1)
.min(dp[i - 1][j - 1] + cost);
}
}
dp[a.len()][b.len()]
}
fn parse_args(raw_args: &[String], operation: &Operation) -> Result<HashMap<String, Value>> {
let mut values = HashMap::new();
let mut i = 0;
while i < raw_args.len() {
let arg = &raw_args[i];
if !arg.starts_with("--") {
bail!("Unexpected argument '{}'. Use --flag value format.", arg);
}
let flag = arg.as_str();
if flag == "--body" {
i += 1;
let body_str = raw_args
.get(i)
.context("--body requires a JSON value or file://path")?;
let body_json = parse_json_or_file(body_str)?;
if let Value::Object(map) = body_json {
for (k, v) in map {
values.insert(k, v);
}
} else {
bail!("--body must be a JSON object");
}
i += 1;
continue;
}
let param = operation.params.iter().find(|p| {
p.cli_flag() == flag
|| format!("--{}", p.name) == flag
|| format!("--{}", p.name.replace('_', "-")) == flag
});
match param {
Some(p) => {
if p.param_type == "boolean" {
let next = raw_args.get(i + 1);
match next.map(|s| s.as_str()) {
Some("true") | Some("false") => {
let b: bool = next.unwrap().parse().unwrap();
values.insert(p.name.clone(), Value::Bool(b));
i += 2;
}
_ => {
values.insert(p.name.clone(), Value::Bool(true));
i += 1;
}
}
} else {
i += 1;
let val_str = raw_args
.get(i)
.with_context(|| format!("{} requires a value", flag))?;
let typed_value = match p.param_type.as_str() {
"integer" => {
let n: i64 = val_str
.parse()
.with_context(|| format!("{} must be an integer", flag))?;
Value::Number(n.into())
}
"array" | "object" => parse_json_or_file(val_str)
.with_context(|| format!("{} must be valid JSON", flag))?,
_ => Value::String(val_str.to_string()),
};
values.insert(p.name.clone(), typed_value);
i += 1;
}
}
None => {
bail!(
"Unknown flag '{}'. Available flags: {}",
flag,
operation
.params
.iter()
.map(|p| p.cli_flag())
.collect::<Vec<_>>()
.join(", ")
);
}
}
}
Ok(values)
}
fn parse_json_or_file(input: &str) -> Result<Value> {
if let Some(path) = input.strip_prefix("file://") {
let content =
std::fs::read_to_string(path).with_context(|| format!("Cannot read file: {}", path))?;
serde_json::from_str(&content).with_context(|| format!("Invalid JSON in file: {}", path))
} else {
serde_json::from_str(input)
.with_context(|| format!("Invalid JSON for --body: {}", input))
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_param(name: &str, param_type: &str, required: bool) -> Param {
Param {
name: name.to_string(),
param_type: param_type.to_string(),
cli_name: None,
required,
default: None,
position: None,
required_unless: None,
required_when: None,
description: None,
choices: None,
transform: None,
}
}
fn make_operation(params: Vec<Param>) -> Operation {
Operation {
http: crate::model::types::HttpConfig {
method: "GET".to_string(),
path: "/test".to_string(),
},
params,
body_param: None,
output: Default::default(),
pagination: None,
description: None,
examples: vec![],
dedicated_only: false,
body_transform: None,
body_defaults: Default::default(),
}
}
#[test]
fn test_missing_params_non_tty_produces_error() {
let op = make_operation(vec![
make_param("name", "string", true),
]);
let raw_args: Vec<String> = vec![];
let param_values = parse_args(&raw_args, &op).unwrap();
let missing: Vec<&Param> = op
.params
.iter()
.filter(|p| p.required && !param_values.contains_key(&p.name))
.collect();
assert_eq!(missing.len(), 1);
assert_eq!(missing[0].name, "name");
}
#[test]
fn test_complex_types_not_promptable() {
let params = [
make_param("name", "string", true),
make_param("schema", "array", true),
make_param("config", "object", true),
];
let refs: Vec<&Param> = params.iter().collect();
let (promptable, complex): (Vec<&Param>, Vec<&Param>) = refs
.into_iter()
.partition(|p| !matches!(p.param_type.as_str(), "array" | "object"));
assert_eq!(promptable.len(), 1);
assert_eq!(promptable[0].name, "name");
assert_eq!(complex.len(), 2);
}
#[test]
fn test_provided_params_not_prompted() {
let op = make_operation(vec![
make_param("name", "string", true),
make_param("clusterId", "string", true),
]);
let raw_args: Vec<String> = vec![
"--name".to_string(),
"test".to_string(),
];
let param_values = parse_args(&raw_args, &op).unwrap();
let missing: Vec<&Param> = op
.params
.iter()
.filter(|p| p.required && !param_values.contains_key(&p.name))
.collect();
assert_eq!(missing.len(), 1);
assert_eq!(missing[0].name, "clusterId");
}
#[test]
fn test_all_params_provided_no_missing() {
let op = make_operation(vec![
make_param("name", "string", true),
]);
let raw_args: Vec<String> = vec![
"--name".to_string(),
"test".to_string(),
];
let param_values = parse_args(&raw_args, &op).unwrap();
let missing: Vec<&Param> = op
.params
.iter()
.filter(|p| p.required && !param_values.contains_key(&p.name))
.collect();
assert!(missing.is_empty());
}
#[test]
fn test_boolean_param_default_true() {
let op = make_operation(vec![
make_param("autoIndex", "boolean", false),
]);
let raw_args: Vec<String> = vec!["--autoIndex".to_string()];
let values = parse_args(&raw_args, &op).unwrap();
assert_eq!(values.get("autoIndex"), Some(&Value::Bool(true)));
}
}