use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Instant;
use anyhow::{bail, Result};
use indicatif::{ProgressBar, ProgressStyle};
use serde_json::Value;
use crate::api::client::ApiClient;
const TERMINAL_STATES: &[&str] = &["SUCCESSFUL", "FAILED", "CANCELED"];
pub async fn wait_for_job(
client: &ApiClient,
job_id: &str,
timeout: u64,
interval: u64,
) -> Result<Value> {
let cancelled = Arc::new(AtomicBool::new(false));
let c = cancelled.clone();
let _ = ctrlc::set_handler(move || {
c.store(true, Ordering::SeqCst);
});
let spinner = ProgressBar::new_spinner();
spinner.set_style(
ProgressStyle::default_spinner()
.template("{spinner:.cyan} {msg}")
.unwrap(),
);
spinner.set_message(format!("Job {}: polling...", job_id));
let start = Instant::now();
let mut consecutive_errors = 0u32;
let mut last_result: Option<Value> = None;
loop {
if cancelled.load(Ordering::SeqCst) {
spinner.finish_and_clear();
if let Some(result) = &last_result {
eprintln!("Interrupted. Last known status:");
return Ok(result.clone());
}
std::process::exit(130);
}
if start.elapsed().as_secs() > timeout {
spinner.finish_and_clear();
let status = last_result
.as_ref()
.and_then(|r| r.get("status").and_then(|s| s.as_str()))
.unwrap_or("unknown");
bail!(
"Job {} timed out after {}s. Last status: {}",
job_id,
timeout,
status
);
}
let path = format!("/v2/jobs/{}", urlencoding::encode(job_id));
match client.call("GET", &path, None, None).await {
Ok(data) => {
consecutive_errors = 0;
let status = data
.get("status")
.and_then(|s| s.as_str())
.unwrap_or("UNKNOWN");
let progress = data
.get("progress")
.and_then(|p| p.as_f64())
.map(|p| format!("{:.0}%", p))
.unwrap_or_default();
spinner.set_message(format!("Job {}: {} {}", job_id, status, progress));
if TERMINAL_STATES.contains(&status) {
spinner.finish_and_clear();
return Ok(data);
}
last_result = Some(data);
}
Err(e) => {
consecutive_errors += 1;
if consecutive_errors >= 3 {
spinner.finish_and_clear();
bail!(
"Job {} polling failed after 3 consecutive errors: {}",
job_id,
e
);
}
spinner.set_message(format!(
"Job {}: poll error (attempt {}/3), retrying...",
job_id, consecutive_errors
));
}
}
tokio::time::sleep(tokio::time::Duration::from_secs(interval)).await;
}
}