zilliz 1.2.0

TUI and CLI tool for managing Zilliz Cloud clusters and Milvus operations
Documentation
use std::io::{self, BufRead, Write};
use std::time::{Duration, Instant};

use anyhow::{bail, Context, Result};
use serde::Deserialize;

use crate::config::manager::ConfigManager;
use crate::model::types::AuthConfig;

// ---------------------------------------------------------------------------
// OAuth Device Code Flow types
// ---------------------------------------------------------------------------

#[derive(Debug, Deserialize)]
struct DeviceCodeResponse {
    device_code: String,
    user_code: String,
    verification_uri_complete: String,
    expires_in: u64,
    #[serde(default = "default_interval")]
    interval: u64,
}

fn default_interval() -> u64 {
    5
}

#[derive(Debug, Deserialize)]
struct TokenResponse {
    access_token: String,
}

#[derive(Debug, Deserialize)]
struct TokenErrorResponse {
    error: String,
    #[serde(default)]
    error_description: String,
}

// ---------------------------------------------------------------------------
// Login
// ---------------------------------------------------------------------------

pub async fn login(
    config_mgr: &ConfigManager,
    auth_config: &AuthConfig,
    no_browser: bool,
    api_key_value: Option<&str>,
) -> Result<()> {
    // Check existing login
    if let Some(user) = config_mgr.get_user_info() {
        println!("Already logged in as {}", user.email);
        if !confirm("Do you want to re-authenticate?")? {
            return Ok(());
        }
    } else if config_mgr.get_credential("api_key").is_some() {
        println!("Already logged in with API key");
        if !confirm("Do you want to re-authenticate?")? {
            return Ok(());
        }
    }

    config_mgr.clear_context()?;

    match api_key_value {
        Some("") => {
            // --api-key with no value: interactive prompt
            login_with_api_key(config_mgr).await
        }
        Some(key) => {
            // --api-key <value>: store directly
            config_mgr.save_api_key_only(key)?;
            println!("Successfully configured API key!");
            println!("  API Key: {}", mask_api_key(key));
            Ok(())
        }
        None => {
            // No --api-key: browser OAuth
            login_with_browser(config_mgr, auth_config, no_browser).await
        }
    }
}

async fn login_with_api_key(config_mgr: &ConfigManager) -> Result<()> {
    println!("\nAPI Key Authentication");
    println!("You can find your API key in the Zilliz Cloud console under API Keys.");
    println!();

    print!("Please paste your API key: ");
    io::stdout().flush()?;

    let mut api_key = String::new();
    io::stdin().lock().read_line(&mut api_key)?;
    let api_key = api_key.trim().to_string();

    if api_key.is_empty() {
        bail!("API key cannot be empty");
    }

    config_mgr.save_api_key_only(&api_key)?;

    println!("\nSuccessfully configured API key!");
    println!("  API Key: {}", mask_api_key(&api_key));
    println!("\nNote: You're using API key authentication. Some features may be limited.");

    Ok(())
}

async fn login_with_browser(
    config_mgr: &ConfigManager,
    auth_config: &AuthConfig,
    no_browser: bool,
) -> Result<()> {
    println!("Starting browser-based login...");

    // Step 1: Request device code
    let client = reqwest::Client::builder()
        .user_agent(format!("zilliz-cli/{}", env!("CARGO_PKG_VERSION")))
        .build()
        .context("Failed to build HTTP client")?;
    let device_resp: DeviceCodeResponse = client
        .post(format!("{}/oauth/device/code", auth_config.auth0_domain))
        .form(&[
            ("client_id", auth_config.client_id.as_str()),
            ("scope", "openid email profile"),
        ])
        .send()
        .await
        .context("Failed to request device code")?
        .json()
        .await
        .context("Invalid device code response")?;

    println!("\nYour verification code: {}", device_resp.user_code);
    println!("Please visit: {}", device_resp.verification_uri_complete);

    // Step 2: Open browser
    if !no_browser {
        println!("\nOpening browser to complete login...");
        if open_browser(&device_resp.verification_uri_complete).is_err() {
            println!("Could not open browser. Please visit the URL above manually.");
        }
    } else {
        println!("\nPlease visit the URL above to complete login.");
    }

    println!("\nWaiting for authentication...");

    // Step 3: Poll for token
    let token = poll_for_token(
        &client,
        auth_config,
        &device_resp.device_code,
        device_resp.interval,
        device_resp.expires_in,
    )
    .await?;

    println!("\nExchanging token for credentials...");

    // Step 4: Exchange token for CLI credentials
    let cli_login_url = format!("{}/account/v1/cli/login", auth_config.login_api);

    let resp = client
        .post(&cli_login_url)
        .header("Authorization", format!("Bearer {}", token.access_token))
        .send()
        .await
        .context("Failed to exchange token")?;

    let status = resp.status();
    let body_text = resp.text().await.context("Failed to read login response")?;
    let body: serde_json::Value = serde_json::from_str(&body_text).with_context(|| {
        format!(
            "Invalid login response (HTTP {}): {}",
            status.as_u16(),
            if body_text.len() > 200 {
                &body_text[..200]
            } else {
                &body_text
            }
        )
    })?;

    let code = body
        .get("code")
        .or_else(|| body.get("Code"))
        .and_then(|v| v.as_i64())
        .unwrap_or(-1);

    if !status.is_success() || (code != 0 && code != 200) {
        let msg = body
            .get("msg")
            .or_else(|| body.get("Message"))
            .and_then(|v| v.as_str())
            .unwrap_or("Login failed");
        bail!("Login failed ({}): {}", status.as_u16(), msg);
    }

    let result = body
        .get("data")
        .or_else(|| body.get("Data"))
        .cloned()
        .unwrap_or_default();

    let user = result.get("user").cloned().unwrap_or_default();
    let orgs = result
        .get("orgs")
        .and_then(|v| v.as_array())
        .cloned()
        .unwrap_or_default();

    // Save login data
    config_mgr.save_login_data(
        user.get("userId").and_then(|v| v.as_str()).unwrap_or(""),
        user.get("email").and_then(|v| v.as_str()).unwrap_or(""),
        user.get("name").and_then(|v| v.as_str()).unwrap_or(""),
        &orgs,
    )?;

    let user_name = user.get("name").and_then(|v| v.as_str()).unwrap_or("");
    let user_email = user.get("email").and_then(|v| v.as_str()).unwrap_or("");

    println!("\nSuccessfully logged in!");
    println!("  User: {} ({})", user_name, user_email);

    if let Some(first_org) = orgs.first() {
        let org_name = first_org.get("name").and_then(|v| v.as_str()).unwrap_or("");
        let org_id = first_org
            .get("orgId")
            .and_then(|v| v.as_str())
            .unwrap_or("");
        println!("  Organization: {} ({})", org_name, org_id);

        if orgs.len() > 1 {
            println!(
                "\n{} organizations available. Use 'zilliz auth switch' to change.",
                orgs.len()
            );
        }
    }

    Ok(())
}

async fn poll_for_token(
    client: &reqwest::Client,
    auth_config: &AuthConfig,
    device_code: &str,
    mut interval: u64,
    timeout_secs: u64,
) -> Result<TokenResponse> {
    let url = format!("{}/oauth/token", auth_config.auth0_domain);
    let start = Instant::now();
    let timeout = Duration::from_secs(timeout_secs);

    loop {
        if start.elapsed() > timeout {
            bail!("Authentication timed out. Please try again.");
        }

        tokio::time::sleep(Duration::from_secs(interval)).await;

        let resp = client
            .post(&url)
            .form(&[
                ("grant_type", "urn:ietf:params:oauth:grant-type:device_code"),
                ("device_code", device_code),
                ("client_id", auth_config.client_id.as_str()),
            ])
            .send()
            .await
            .context("Token poll request failed")?;

        if resp.status().is_success() {
            return resp
                .json::<TokenResponse>()
                .await
                .context("Invalid token response");
        }

        let error_resp: TokenErrorResponse = resp.json().await.unwrap_or(TokenErrorResponse {
            error: "unknown".to_string(),
            error_description: "Unknown error".to_string(),
        });

        match error_resp.error.as_str() {
            "authorization_pending" => continue,
            "slow_down" => {
                interval = (interval + 5).min(60);
                continue;
            }
            "expired_token" => bail!("Authentication timed out. Please try again."),
            "access_denied" => bail!("Authentication was denied."),
            _ => bail!("Authentication failed: {}", error_resp.error_description),
        }
    }
}

// ---------------------------------------------------------------------------
// Logout
// ---------------------------------------------------------------------------

pub fn logout(config_mgr: &ConfigManager) -> Result<()> {
    if let Some(user) = config_mgr.get_user_info() {
        config_mgr.clear_login_data()?;
        config_mgr.clear_context()?;
        println!("Logged out from {}", user.email);
    } else if config_mgr.get_credential("api_key").is_some() {
        config_mgr.clear_login_data()?;
        config_mgr.clear_context()?;
        println!("Cleared API key credentials.");
    } else {
        println!("Not logged in.");
    }

    Ok(())
}

// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------

fn mask_api_key(key: &str) -> String {
    let chars: Vec<char> = key.chars().collect();
    if chars.len() >= 16 {
        let prefix: String = chars[..4].iter().collect();
        let suffix: String = chars[chars.len() - 4..].iter().collect();
        format!("{}****{}", prefix, suffix)
    } else {
        "****".to_string()
    }
}

fn confirm(prompt: &str) -> Result<bool> {
    print!("{} [y/N] ", prompt);
    io::stdout().flush()?;
    let mut input = String::new();
    io::stdin().lock().read_line(&mut input)?;
    Ok(input.trim().eq_ignore_ascii_case("y"))
}

fn open_browser(url: &str) -> Result<()> {
    #[cfg(target_os = "macos")]
    {
        std::process::Command::new("open").arg(url).spawn()?;
    }
    #[cfg(target_os = "linux")]
    {
        std::process::Command::new("xdg-open").arg(url).spawn()?;
    }
    #[cfg(target_os = "windows")]
    {
        std::process::Command::new("cmd")
            .args(["/c", "start", url])
            .spawn()?;
    }
    Ok(())
}