use std::time::{Duration, Instant};
use clap::{Args, Subcommand};
use serde::Deserialize;
use serde_json::{Value, json};
use tokio::time::sleep;
use uuid::Uuid;
use crate::util::{
api_request, client, dry_run_enabled, emit_dry_run_request, exit_error, print_json_stderr,
print_json_stdout, read_json_from_file,
};
const CLI_ANALYSIS_RUN_OVERALL_TIMEOUT_MS_DEFAULT: u64 = 90_000;
const CLI_ANALYSIS_RUN_OVERALL_TIMEOUT_MS_MIN: u64 = 1_000;
const CLI_ANALYSIS_RUN_OVERALL_TIMEOUT_MS_MAX: u64 = 300_000;
const CLI_ANALYSIS_RUN_POLL_INTERVAL_MS_DEFAULT: u64 = 1_000;
const CLI_ANALYSIS_RUN_POLL_INTERVAL_MS_MIN: u64 = 100;
const CLI_ANALYSIS_RUN_POLL_INTERVAL_MS_MAX: u64 = 5_000;
#[derive(Subcommand)]
pub enum AnalysisCommands {
Run {
#[arg(long, conflicts_with_all = ["objective", "objective_text", "horizon_days", "focus"])]
request_file: Option<String>,
#[arg(long, short = 'o', conflicts_with = "objective_text")]
objective: Option<String>,
#[arg(value_name = "OBJECTIVE", conflicts_with = "objective")]
objective_text: Option<String>,
#[arg(long = "horizon-days", alias = "days")]
horizon_days: Option<i32>,
#[arg(long)]
focus: Vec<String>,
#[arg(long)]
wait_timeout_ms: Option<u64>,
#[arg(long)]
overall_timeout_ms: Option<u64>,
#[arg(long)]
poll_interval_ms: Option<u64>,
},
Create {
#[arg(long)]
request_file: String,
},
Status {
#[arg(long)]
job_id: Uuid,
},
ValidateAnswer(ValidateAnswerArgs),
}
#[derive(Args)]
pub struct ValidateAnswerArgs {
#[arg(long)]
task_intent: String,
#[arg(long)]
draft_answer: String,
}
pub async fn run(api_url: &str, token: Option<&str>, command: AnalysisCommands) -> i32 {
match command {
AnalysisCommands::Run {
request_file,
objective,
objective_text,
horizon_days,
focus,
wait_timeout_ms,
overall_timeout_ms,
poll_interval_ms,
} => {
run_blocking(
api_url,
token,
request_file.as_deref(),
objective,
objective_text,
horizon_days,
focus,
wait_timeout_ms,
overall_timeout_ms,
poll_interval_ms,
)
.await
}
AnalysisCommands::Create { request_file } => create(api_url, token, &request_file).await,
AnalysisCommands::Status { job_id } => status(api_url, token, job_id).await,
AnalysisCommands::ValidateAnswer(args) => {
validate_answer(api_url, token, args.task_intent, args.draft_answer).await
}
}
}
async fn run_blocking(
api_url: &str,
token: Option<&str>,
request_file: Option<&str>,
objective_flag: Option<String>,
objective_positional: Option<String>,
horizon_days: Option<i32>,
focus: Vec<String>,
wait_timeout_ms: Option<u64>,
overall_timeout_ms: Option<u64>,
poll_interval_ms: Option<u64>,
) -> i32 {
let base_body = build_run_request_body(
request_file,
objective_flag,
objective_positional,
horizon_days,
focus,
)
.unwrap_or_else(|e| {
exit_error(
&e,
Some(
"Use `kura analysis run --objective \"...\"` (or positional objective) for user-facing analyses, or `--request-file payload.json` for full JSON requests.",
),
)
});
let body = apply_wait_timeout_override(base_body, wait_timeout_ms).unwrap_or_else(|e| {
exit_error(
&e,
Some("Analysis run request must be a JSON object with objective/horizon/focus fields."),
)
});
let overall_timeout_ms = clamp_cli_overall_timeout_ms(overall_timeout_ms);
let poll_interval_ms = clamp_cli_poll_interval_ms(poll_interval_ms);
if dry_run_enabled() {
return emit_dry_run_request(
&reqwest::Method::POST,
api_url,
"/v1/analysis/jobs/run",
token.is_some(),
Some(&body),
&[],
&[],
false,
Some(
"Dry-run skips server execution and fallback polling. Use `kura analysis status --job-id <id>` on a real run.",
),
);
}
let started = Instant::now();
let legacy_create_body = strip_run_only_fields(body.clone());
let (status, run_body) = request_json(
api_url,
reqwest::Method::POST,
"/v1/analysis/jobs/run",
token,
Some(body),
)
.await
.unwrap_or_else(|e| exit_error(&e, Some("Check API availability/auth and retry.")));
if !is_success_status(status) {
if is_run_endpoint_unsupported_status(status) {
return run_blocking_via_legacy_async_fallback(
api_url,
token,
legacy_create_body,
overall_timeout_ms,
poll_interval_ms,
started,
)
.await;
}
return print_json_response(status, &run_body);
}
let Some(run_response) = parse_cli_run_response(&run_body) else {
return print_json_response(status, &run_body);
};
if !run_response_needs_cli_poll_fallback(&run_response) {
return print_json_response(status, &run_body);
}
let mut latest_job = run_body.get("job").cloned().unwrap_or(Value::Null);
let mut polls = 0u32;
let mut last_retry_after_ms =
clamp_cli_poll_interval_ms(run_response.retry_after_ms.or(Some(poll_interval_ms)));
let job_id = run_response.job.job_id;
let path = format!("/v1/analysis/jobs/{job_id}");
loop {
let elapsed_total_ms = elapsed_ms(started);
if elapsed_total_ms >= overall_timeout_ms {
let timeout_output = build_cli_poll_fallback_output(
latest_job,
elapsed_total_ms,
false,
true,
Some(last_retry_after_ms),
polls,
overall_timeout_ms,
run_response.mode.as_deref(),
"server_run_timeout_poll",
);
return print_json_response(200, &timeout_output);
}
let remaining_ms = overall_timeout_ms.saturating_sub(elapsed_total_ms);
let sleep_ms = min_u64(last_retry_after_ms, remaining_ms);
sleep(Duration::from_millis(sleep_ms)).await;
let (poll_status, poll_body) =
request_json(api_url, reqwest::Method::GET, &path, token, None)
.await
.unwrap_or_else(|e| {
exit_error(
&e,
Some(
"Blocking analysis fallback polling failed. Retry `kura analysis status --job-id <id>` in the same session if needed.",
),
)
});
if !is_success_status(poll_status) {
return print_json_response(poll_status, &poll_body);
}
polls = polls.saturating_add(1);
latest_job = poll_body.clone();
if let Some(job_status) = parse_cli_job_status(&poll_body) {
if analysis_job_status_is_terminal(&job_status.status) {
let final_output = build_cli_poll_fallback_output(
poll_body,
elapsed_ms(started),
true,
false,
None,
polls,
overall_timeout_ms,
run_response.mode.as_deref(),
"server_run_timeout_poll",
);
return print_json_response(200, &final_output);
}
} else {
return print_json_response(200, &poll_body);
}
last_retry_after_ms = poll_interval_ms;
}
}
async fn run_blocking_via_legacy_async_fallback(
api_url: &str,
token: Option<&str>,
create_body: Value,
overall_timeout_ms: u64,
poll_interval_ms: u64,
started: Instant,
) -> i32 {
let (create_status, create_resp_body) = request_json(
api_url,
reqwest::Method::POST,
"/v1/analysis/jobs",
token,
Some(create_body),
)
.await
.unwrap_or_else(|e| {
exit_error(
&e,
Some("Legacy analysis fallback failed to create a job. Check API availability/auth."),
)
});
if !is_success_status(create_status) {
return print_json_response(create_status, &create_resp_body);
}
let Some(create_resp) = parse_cli_create_job_response(&create_resp_body) else {
return print_json_response(create_status, &create_resp_body);
};
let mut latest_job = Value::Null;
let mut polls = 0u32;
let path = format!("/v1/analysis/jobs/{}", create_resp.job_id);
loop {
let elapsed_total_ms = elapsed_ms(started);
if elapsed_total_ms >= overall_timeout_ms {
let timeout_output = build_cli_poll_fallback_output(
if latest_job.is_null() {
create_resp_body.clone()
} else {
latest_job
},
elapsed_total_ms,
false,
true,
Some(poll_interval_ms),
polls,
overall_timeout_ms,
Some("legacy_async_create"),
"legacy_server_async_create",
);
return print_json_response(200, &timeout_output);
}
let remaining_ms = overall_timeout_ms.saturating_sub(elapsed_total_ms);
sleep(Duration::from_millis(min_u64(
poll_interval_ms,
remaining_ms,
)))
.await;
let (poll_status, poll_body) =
request_json(api_url, reqwest::Method::GET, &path, token, None)
.await
.unwrap_or_else(|e| {
exit_error(
&e,
Some(
"Legacy analysis fallback polling failed. Retry `kura analysis status --job-id <id>` if needed.",
),
)
});
if !is_success_status(poll_status) {
return print_json_response(poll_status, &poll_body);
}
polls = polls.saturating_add(1);
latest_job = poll_body.clone();
if let Some(job_status) = parse_cli_job_status(&poll_body) {
if analysis_job_status_is_terminal(&job_status.status) {
let final_output = build_cli_poll_fallback_output(
poll_body,
elapsed_ms(started),
true,
false,
None,
polls,
overall_timeout_ms,
Some("legacy_async_create"),
"legacy_server_async_create",
);
return print_json_response(200, &final_output);
}
} else {
return print_json_response(200, &poll_body);
}
}
}
async fn create(api_url: &str, token: Option<&str>, request_file: &str) -> i32 {
let body = match read_json_from_file(request_file) {
Ok(v) => v,
Err(e) => {
crate::util::exit_error(&e, Some("Provide a valid JSON analysis request payload."))
}
};
if dry_run_enabled() {
return emit_dry_run_request(
&reqwest::Method::POST,
api_url,
"/v1/analysis/jobs",
token.is_some(),
Some(&body),
&[],
&[],
false,
None,
);
}
api_request(
api_url,
reqwest::Method::POST,
"/v1/analysis/jobs",
token,
Some(body),
&[],
&[],
false,
false,
)
.await
}
async fn status(api_url: &str, token: Option<&str>, job_id: Uuid) -> i32 {
let path = format!("/v1/analysis/jobs/{job_id}");
api_request(
api_url,
reqwest::Method::GET,
&path,
token,
None,
&[],
&[],
false,
false,
)
.await
}
async fn validate_answer(
api_url: &str,
token: Option<&str>,
task_intent: String,
draft_answer: String,
) -> i32 {
let body = build_validate_answer_body(task_intent, draft_answer);
api_request(
api_url,
reqwest::Method::POST,
"/v1/agent/answer-admissibility",
token,
Some(body),
&[],
&[],
false,
false,
)
.await
}
fn build_run_request_body(
request_file: Option<&str>,
objective_flag: Option<String>,
objective_positional: Option<String>,
horizon_days: Option<i32>,
focus: Vec<String>,
) -> Result<Value, String> {
if let Some(path) = request_file {
if objective_flag.is_some() || objective_positional.is_some() || horizon_days.is_some() {
return Err(
"`--request-file` cannot be combined with inline objective/horizon flags"
.to_string(),
);
}
return read_json_from_file(path);
}
let objective = choose_inline_objective(objective_flag, objective_positional)?;
let objective = objective
.ok_or_else(|| "Missing analysis objective. Provide `--objective \"...\"`, positional OBJECTIVE, or `--request-file`.".to_string())?;
let normalized_focus = normalize_focus_flags(focus);
let mut body = json!({ "objective": objective });
let obj = body
.as_object_mut()
.ok_or_else(|| "analysis request body must be a JSON object".to_string())?;
if let Some(days) = horizon_days {
obj.insert("horizon_days".to_string(), json!(days));
}
if !normalized_focus.is_empty() {
obj.insert("focus".to_string(), json!(normalized_focus));
}
Ok(body)
}
fn build_validate_answer_body(task_intent: String, draft_answer: String) -> Value {
json!({
"task_intent": task_intent,
"draft_answer": draft_answer,
})
}
fn choose_inline_objective(
objective_flag: Option<String>,
objective_positional: Option<String>,
) -> Result<Option<String>, String> {
if objective_flag.is_some() && objective_positional.is_some() {
return Err(
"Provide the objective either as positional text or via `--objective`, not both."
.to_string(),
);
}
Ok(objective_flag
.or(objective_positional)
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty()))
}
fn normalize_focus_flags(values: Vec<String>) -> Vec<String> {
let mut out = Vec::new();
for value in values {
let normalized = value.trim();
if normalized.is_empty() {
continue;
}
let normalized = normalized.to_string();
if !out.contains(&normalized) {
out.push(normalized);
}
}
out
}
fn apply_wait_timeout_override(
mut body: Value,
wait_timeout_ms: Option<u64>,
) -> Result<Value, String> {
if let Some(timeout_ms) = wait_timeout_ms {
let obj = body
.as_object_mut()
.ok_or_else(|| "analysis request body must be a JSON object".to_string())?;
obj.insert("wait_timeout_ms".to_string(), json!(timeout_ms));
}
Ok(body)
}
#[derive(Debug, Deserialize)]
struct CliRunAnalysisResponse {
#[allow(dead_code)]
mode: Option<String>,
terminal: bool,
timed_out: bool,
#[serde(default)]
retry_after_ms: Option<u64>,
job: CliJobStatusRef,
}
#[derive(Debug, Deserialize)]
struct CliJobStatusRef {
job_id: Uuid,
status: String,
}
#[derive(Debug, Deserialize)]
struct CliCreateAnalysisJobResponse {
job_id: Uuid,
}
#[derive(Debug, Deserialize)]
struct CliJobStatusEnvelope {
status: String,
}
fn parse_cli_run_response(value: &Value) -> Option<CliRunAnalysisResponse> {
serde_json::from_value(value.clone()).ok()
}
fn parse_cli_job_status(value: &Value) -> Option<CliJobStatusEnvelope> {
serde_json::from_value(value.clone()).ok()
}
fn parse_cli_create_job_response(value: &Value) -> Option<CliCreateAnalysisJobResponse> {
serde_json::from_value(value.clone()).ok()
}
fn run_response_needs_cli_poll_fallback(response: &CliRunAnalysisResponse) -> bool {
response.timed_out
&& !response.terminal
&& !analysis_job_status_is_terminal(&response.job.status)
}
fn analysis_job_status_is_terminal(status: &str) -> bool {
matches!(status, "completed" | "failed")
}
fn is_run_endpoint_unsupported_status(status: u16) -> bool {
matches!(status, 404 | 405)
}
fn strip_run_only_fields(mut body: Value) -> Value {
if let Some(obj) = body.as_object_mut() {
obj.remove("wait_timeout_ms");
}
body
}
fn clamp_cli_overall_timeout_ms(timeout_ms: Option<u64>) -> u64 {
timeout_ms
.unwrap_or(CLI_ANALYSIS_RUN_OVERALL_TIMEOUT_MS_DEFAULT)
.clamp(
CLI_ANALYSIS_RUN_OVERALL_TIMEOUT_MS_MIN,
CLI_ANALYSIS_RUN_OVERALL_TIMEOUT_MS_MAX,
)
}
fn clamp_cli_poll_interval_ms(timeout_ms: Option<u64>) -> u64 {
timeout_ms
.unwrap_or(CLI_ANALYSIS_RUN_POLL_INTERVAL_MS_DEFAULT)
.clamp(
CLI_ANALYSIS_RUN_POLL_INTERVAL_MS_MIN,
CLI_ANALYSIS_RUN_POLL_INTERVAL_MS_MAX,
)
}
fn elapsed_ms(started: Instant) -> u64 {
let ms = started.elapsed().as_millis();
if ms > u128::from(u64::MAX) {
u64::MAX
} else {
ms as u64
}
}
fn min_u64(a: u64, b: u64) -> u64 {
if a < b { a } else { b }
}
fn build_cli_poll_fallback_output(
job: Value,
waited_ms: u64,
terminal: bool,
timed_out: bool,
retry_after_ms: Option<u64>,
polls: u32,
overall_timeout_ms: u64,
initial_mode: Option<&str>,
fallback_kind: &str,
) -> Value {
let mut out = json!({
"mode": if terminal {
format!("blocking_cli_poll_fallback_completed:{fallback_kind}")
} else {
format!("blocking_cli_poll_fallback_timeout:{fallback_kind}")
},
"terminal": terminal,
"timed_out": timed_out,
"waited_ms": waited_ms,
"retry_after_ms": retry_after_ms,
"job": job,
"cli_fallback": {
"used": true,
"kind": fallback_kind,
"polls": polls,
"overall_timeout_ms": overall_timeout_ms,
"initial_mode": initial_mode,
}
});
if retry_after_ms.is_none() {
if let Some(obj) = out.as_object_mut() {
obj.insert("retry_after_ms".to_string(), Value::Null);
}
}
out
}
async fn request_json(
api_url: &str,
method: reqwest::Method,
path: &str,
token: Option<&str>,
body: Option<Value>,
) -> Result<(u16, Value), String> {
let url = reqwest::Url::parse(&format!("{api_url}{path}"))
.map_err(|e| format!("Invalid URL: {api_url}{path}: {e}"))?;
let mut req = client().request(method, url);
if let Some(t) = token {
req = req.header("Authorization", format!("Bearer {t}"));
}
if let Some(b) = body {
req = req.json(&b);
}
let resp = req.send().await.map_err(|e| format!("{e}"))?;
let status = resp.status().as_u16();
let body: Value = match resp.bytes().await {
Ok(bytes) => {
if bytes.is_empty() {
Value::Null
} else {
serde_json::from_slice(&bytes)
.unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()))
}
}
Err(e) => json!({"raw_error": format!("Failed to read response body: {e}")}),
};
Ok((status, body))
}
fn is_success_status(status: u16) -> bool {
(200..=299).contains(&status)
}
fn http_status_exit_code(status: u16) -> i32 {
match status {
200..=299 => 0,
400..=499 => 1,
_ => 2,
}
}
fn print_json_response(status: u16, body: &Value) -> i32 {
let exit_code = http_status_exit_code(status);
if exit_code == 0 {
print_json_stdout(body);
} else {
print_json_stderr(body);
}
exit_code
}
#[cfg(test)]
mod tests {
use super::{
analysis_job_status_is_terminal, apply_wait_timeout_override, build_run_request_body,
build_validate_answer_body, clamp_cli_overall_timeout_ms, clamp_cli_poll_interval_ms,
is_run_endpoint_unsupported_status, parse_cli_job_status,
run_response_needs_cli_poll_fallback, strip_run_only_fields,
};
use serde_json::json;
#[test]
fn apply_wait_timeout_override_merges_field_into_request_object() {
let body = json!({
"objective": "trend of readiness",
"horizon_days": 90
});
let patched = apply_wait_timeout_override(body, Some(2500)).unwrap();
assert_eq!(patched["wait_timeout_ms"], json!(2500));
assert_eq!(patched["objective"], json!("trend of readiness"));
}
#[test]
fn apply_wait_timeout_override_rejects_non_object_when_override_present() {
let err = apply_wait_timeout_override(json!(["bad"]), Some(1000)).unwrap_err();
assert!(err.contains("JSON object"));
}
#[test]
fn build_run_request_body_accepts_inline_objective_and_flags() {
let body = build_run_request_body(
None,
Some("trend of plyometric quality".to_string()),
None,
Some(90),
vec![
"plyo".to_string(),
" lower_body ".to_string(),
"".to_string(),
],
)
.unwrap();
assert_eq!(body["objective"], json!("trend of plyometric quality"));
assert_eq!(body["horizon_days"], json!(90));
assert_eq!(body["focus"], json!(["plyo", "lower_body"]));
}
#[test]
fn build_run_request_body_supports_positional_objective() {
let body = build_run_request_body(
None,
None,
Some("trend of sleep quality".to_string()),
None,
vec![],
)
.unwrap();
assert_eq!(body["objective"], json!("trend of sleep quality"));
assert!(body.get("horizon_days").is_none());
}
#[test]
fn build_run_request_body_rejects_missing_input() {
let err = build_run_request_body(None, None, None, None, vec![]).unwrap_err();
assert!(err.contains("Missing analysis objective"));
}
#[test]
fn build_run_request_body_rejects_duplicate_inline_objective_sources() {
let err = build_run_request_body(
None,
Some("a".to_string()),
Some("b".to_string()),
None,
vec![],
)
.unwrap_err();
assert!(err.contains("either as positional"));
}
#[test]
fn build_validate_answer_body_serializes_expected_shape() {
let body = build_validate_answer_body(
"How has my squat progressed?".to_string(),
"Your squat is clearly up 15%.".to_string(),
);
assert_eq!(body["task_intent"], json!("How has my squat progressed?"));
assert_eq!(body["draft_answer"], json!("Your squat is clearly up 15%."));
}
#[test]
fn clamp_cli_timeouts_apply_bounds() {
assert_eq!(clamp_cli_overall_timeout_ms(None), 90_000);
assert_eq!(clamp_cli_overall_timeout_ms(Some(1)), 1_000);
assert_eq!(clamp_cli_overall_timeout_ms(Some(999_999)), 300_000);
assert_eq!(clamp_cli_poll_interval_ms(None), 1_000);
assert_eq!(clamp_cli_poll_interval_ms(Some(1)), 100);
assert_eq!(clamp_cli_poll_interval_ms(Some(50_000)), 5_000);
}
#[test]
fn analysis_terminal_status_matches_api_contract() {
assert!(analysis_job_status_is_terminal("completed"));
assert!(analysis_job_status_is_terminal("failed"));
assert!(!analysis_job_status_is_terminal("queued"));
assert!(!analysis_job_status_is_terminal("processing"));
}
#[test]
fn run_response_fallback_detection_requires_timeout_and_non_terminal_job() {
let timed_out = serde_json::from_value(json!({
"terminal": false,
"timed_out": true,
"retry_after_ms": 500,
"job": { "job_id": "00000000-0000-0000-0000-000000000000", "status": "queued" }
}))
.unwrap();
assert!(run_response_needs_cli_poll_fallback(&timed_out));
let completed = serde_json::from_value(json!({
"terminal": true,
"timed_out": false,
"job": { "job_id": "00000000-0000-0000-0000-000000000000", "status": "completed" }
}))
.unwrap();
assert!(!run_response_needs_cli_poll_fallback(&completed));
}
#[test]
fn parse_cli_job_status_reads_status_field() {
let parsed = parse_cli_job_status(&json!({
"job_id": "00000000-0000-0000-0000-000000000000",
"status": "queued"
}))
.unwrap();
assert_eq!(parsed.status, "queued");
}
#[test]
fn run_endpoint_unsupported_status_detection_matches_legacy_fallback_cases() {
assert!(is_run_endpoint_unsupported_status(404));
assert!(is_run_endpoint_unsupported_status(405));
assert!(!is_run_endpoint_unsupported_status(400));
assert!(!is_run_endpoint_unsupported_status(401));
assert!(!is_run_endpoint_unsupported_status(500));
}
#[test]
fn strip_run_only_fields_removes_wait_timeout_for_legacy_create_fallback() {
let body = json!({
"objective": "trend of readiness",
"horizon_days": 90,
"wait_timeout_ms": 2000
});
let stripped = strip_run_only_fields(body);
assert!(stripped.get("wait_timeout_ms").is_none());
assert_eq!(stripped["objective"], json!("trend of readiness"));
}
}