zilliz 1.4.2

TUI and CLI tool for managing Zilliz Cloud clusters and Milvus operations
Documentation
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Instant;

use anyhow::{bail, Result};
use indicatif::{ProgressBar, ProgressStyle};
use serde_json::Value;

use crate::api::client::ApiClient;

const TERMINAL_STATES: &[&str] = &["SUCCESSFUL", "FAILED", "CANCELED"];

/// Poll `GET /v2/jobs/{jobId}` until a terminal state is reached.
///
/// - `timeout`: max seconds to wait (default 1800 = 30 min)
/// - `interval`: seconds between polls (default 5)
pub async fn wait_for_job(
    client: &ApiClient,
    job_id: &str,
    timeout: u64,
    interval: u64,
) -> Result<Value> {
    let cancelled = Arc::new(AtomicBool::new(false));
    let c = cancelled.clone();
    let _ = ctrlc::set_handler(move || {
        c.store(true, Ordering::SeqCst);
    });

    let spinner = ProgressBar::new_spinner();
    spinner.set_style(
        ProgressStyle::default_spinner()
            .template("{spinner:.cyan} {msg}")
            .unwrap(),
    );
    spinner.set_message(format!("Job {}: polling...", job_id));

    let start = Instant::now();
    let mut consecutive_errors = 0u32;
    let mut last_result: Option<Value> = None;

    loop {
        if cancelled.load(Ordering::SeqCst) {
            spinner.finish_and_clear();
            if let Some(result) = &last_result {
                eprintln!("Interrupted. Last known status:");
                return Ok(result.clone());
            }
            std::process::exit(130);
        }

        if start.elapsed().as_secs() > timeout {
            spinner.finish_and_clear();
            let status = last_result
                .as_ref()
                .and_then(|r| r.get("status").and_then(|s| s.as_str()))
                .unwrap_or("unknown");
            bail!(
                "Job {} timed out after {}s. Last status: {}",
                job_id,
                timeout,
                status
            );
        }

        let path = format!("/v2/jobs/{}", urlencoding::encode(job_id));
        match client.call("GET", &path, None, None).await {
            Ok(data) => {
                consecutive_errors = 0;
                let status = data
                    .get("status")
                    .and_then(|s| s.as_str())
                    .unwrap_or("UNKNOWN");
                let progress = data
                    .get("progress")
                    .and_then(|p| p.as_f64())
                    .map(|p| format!("{:.0}%", p))
                    .unwrap_or_default();

                spinner.set_message(format!("Job {}: {} {}", job_id, status, progress));

                if TERMINAL_STATES.contains(&status) {
                    spinner.finish_and_clear();
                    return Ok(data);
                }

                last_result = Some(data);
            }
            Err(e) => {
                consecutive_errors += 1;
                if consecutive_errors >= 3 {
                    spinner.finish_and_clear();
                    bail!(
                        "Job {} polling failed after 3 consecutive errors: {}",
                        job_id,
                        e
                    );
                }
                spinner.set_message(format!(
                    "Job {}: poll error (attempt {}/3), retrying...",
                    job_id, consecutive_errors
                ));
            }
        }

        tokio::time::sleep(tokio::time::Duration::from_secs(interval)).await;
    }
}