Skip to main content

kura_cli/commands/
analysis.rs

1use std::time::{Duration, Instant};
2
3use clap::Subcommand;
4use serde::Deserialize;
5use serde_json::{Value, json};
6use tokio::time::sleep;
7use uuid::Uuid;
8
9use crate::util::{api_request, client, exit_error, read_json_from_file};
10
11const CLI_ANALYSIS_RUN_OVERALL_TIMEOUT_MS_DEFAULT: u64 = 90_000;
12const CLI_ANALYSIS_RUN_OVERALL_TIMEOUT_MS_MIN: u64 = 1_000;
13const CLI_ANALYSIS_RUN_OVERALL_TIMEOUT_MS_MAX: u64 = 300_000;
14const CLI_ANALYSIS_RUN_POLL_INTERVAL_MS_DEFAULT: u64 = 1_000;
15const CLI_ANALYSIS_RUN_POLL_INTERVAL_MS_MIN: u64 = 100;
16const CLI_ANALYSIS_RUN_POLL_INTERVAL_MS_MAX: u64 = 5_000;
17
18#[derive(Subcommand)]
19pub enum AnalysisCommands {
20    /// Queue and wait (bounded) for deep analysis via /v1/analysis/jobs/run
21    Run {
22        /// Full JSON request payload (use '-' for stdin)
23        #[arg(long, conflicts_with_all = ["objective", "objective_text", "horizon_days", "focus"])]
24        request_file: Option<String>,
25        /// Free-text analysis objective (preferred for one-off agent calls)
26        #[arg(long, short = 'o', conflicts_with = "objective_text")]
27        objective: Option<String>,
28        /// Free-text analysis objective (positional alias for --objective)
29        #[arg(value_name = "OBJECTIVE", conflicts_with = "objective")]
30        objective_text: Option<String>,
31        /// Horizon in days (defaults server-side to 90)
32        #[arg(long = "horizon-days", alias = "days")]
33        horizon_days: Option<i32>,
34        /// Focus hints (repeatable). Example: --focus lower_body --focus power
35        #[arg(long)]
36        focus: Vec<String>,
37        /// Override server-side initial wait timeout in milliseconds (server clamps to safe bounds)
38        #[arg(long)]
39        wait_timeout_ms: Option<u64>,
40        /// Total CLI wait budget including timeout fallback polling
41        #[arg(long)]
42        overall_timeout_ms: Option<u64>,
43        /// Poll interval used after server-side timeout fallback kicks in
44        #[arg(long)]
45        poll_interval_ms: Option<u64>,
46    },
47    /// Queue a new async analysis job via /v1/analysis/jobs
48    Create {
49        /// Full JSON request payload (use '-' for stdin)
50        #[arg(long)]
51        request_file: String,
52    },
53    /// Fetch analysis job status by id
54    Status {
55        /// Analysis job UUID
56        #[arg(long)]
57        job_id: Uuid,
58    },
59}
60
61pub async fn run(api_url: &str, token: Option<&str>, command: AnalysisCommands) -> i32 {
62    match command {
63        AnalysisCommands::Run {
64            request_file,
65            objective,
66            objective_text,
67            horizon_days,
68            focus,
69            wait_timeout_ms,
70            overall_timeout_ms,
71            poll_interval_ms,
72        } => {
73            run_blocking(
74                api_url,
75                token,
76                request_file.as_deref(),
77                objective,
78                objective_text,
79                horizon_days,
80                focus,
81                wait_timeout_ms,
82                overall_timeout_ms,
83                poll_interval_ms,
84            )
85            .await
86        }
87        AnalysisCommands::Create { request_file } => create(api_url, token, &request_file).await,
88        AnalysisCommands::Status { job_id } => status(api_url, token, job_id).await,
89    }
90}
91
92async fn run_blocking(
93    api_url: &str,
94    token: Option<&str>,
95    request_file: Option<&str>,
96    objective_flag: Option<String>,
97    objective_positional: Option<String>,
98    horizon_days: Option<i32>,
99    focus: Vec<String>,
100    wait_timeout_ms: Option<u64>,
101    overall_timeout_ms: Option<u64>,
102    poll_interval_ms: Option<u64>,
103) -> i32 {
104    let base_body = build_run_request_body(
105        request_file,
106        objective_flag,
107        objective_positional,
108        horizon_days,
109        focus,
110    )
111    .unwrap_or_else(|e| {
112        exit_error(
113            &e,
114            Some(
115                "Use `kura analysis run --objective \"...\"` (or positional objective) for user-facing analyses, or `--request-file payload.json` for full JSON requests.",
116            ),
117        )
118    });
119    let body = apply_wait_timeout_override(base_body, wait_timeout_ms).unwrap_or_else(|e| {
120        exit_error(
121            &e,
122            Some("Analysis run request must be a JSON object with objective/horizon/focus fields."),
123        )
124    });
125
126    let overall_timeout_ms = clamp_cli_overall_timeout_ms(overall_timeout_ms);
127    let poll_interval_ms = clamp_cli_poll_interval_ms(poll_interval_ms);
128    let started = Instant::now();
129
130    let (status, run_body) = request_json(
131        api_url,
132        reqwest::Method::POST,
133        "/v1/analysis/jobs/run",
134        token,
135        Some(body),
136    )
137    .await
138    .unwrap_or_else(|e| exit_error(&e, Some("Check API availability/auth and retry.")));
139
140    if !is_success_status(status) {
141        return print_json_response(status, &run_body);
142    }
143
144    let Some(run_response) = parse_cli_run_response(&run_body) else {
145        return print_json_response(status, &run_body);
146    };
147
148    if !run_response_needs_cli_poll_fallback(&run_response) {
149        return print_json_response(status, &run_body);
150    }
151
152    let mut latest_job = run_body.get("job").cloned().unwrap_or(Value::Null);
153    let mut polls = 0u32;
154    let mut last_retry_after_ms =
155        clamp_cli_poll_interval_ms(run_response.retry_after_ms.or(Some(poll_interval_ms)));
156    let job_id = run_response.job.job_id;
157    let path = format!("/v1/analysis/jobs/{job_id}");
158
159    loop {
160        let elapsed_total_ms = elapsed_ms(started);
161        if elapsed_total_ms >= overall_timeout_ms {
162            let timeout_output = build_cli_poll_fallback_output(
163                latest_job,
164                elapsed_total_ms,
165                false,
166                true,
167                Some(last_retry_after_ms),
168                polls,
169                overall_timeout_ms,
170                run_response.mode.as_deref(),
171            );
172            return print_json_response(200, &timeout_output);
173        }
174
175        let remaining_ms = overall_timeout_ms.saturating_sub(elapsed_total_ms);
176        let sleep_ms = min_u64(last_retry_after_ms, remaining_ms);
177        sleep(Duration::from_millis(sleep_ms)).await;
178
179        let (poll_status, poll_body) =
180            request_json(api_url, reqwest::Method::GET, &path, token, None)
181                .await
182                .unwrap_or_else(|e| {
183                    exit_error(
184                        &e,
185                        Some(
186                            "Blocking analysis fallback polling failed. Retry `kura analysis status --job-id <id>` in the same session if needed.",
187                        ),
188                    )
189                });
190
191        if !is_success_status(poll_status) {
192            return print_json_response(poll_status, &poll_body);
193        }
194
195        polls = polls.saturating_add(1);
196        latest_job = poll_body.clone();
197
198        if let Some(job_status) = parse_cli_job_status(&poll_body) {
199            if analysis_job_status_is_terminal(&job_status.status) {
200                let final_output = build_cli_poll_fallback_output(
201                    poll_body,
202                    elapsed_ms(started),
203                    true,
204                    false,
205                    None,
206                    polls,
207                    overall_timeout_ms,
208                    run_response.mode.as_deref(),
209                );
210                return print_json_response(200, &final_output);
211            }
212        } else {
213            // If shape is unexpected, surface the payload instead of masking it.
214            return print_json_response(200, &poll_body);
215        }
216
217        last_retry_after_ms = poll_interval_ms;
218    }
219}
220
221async fn create(api_url: &str, token: Option<&str>, request_file: &str) -> i32 {
222    let body = match read_json_from_file(request_file) {
223        Ok(v) => v,
224        Err(e) => {
225            crate::util::exit_error(&e, Some("Provide a valid JSON analysis request payload."))
226        }
227    };
228
229    api_request(
230        api_url,
231        reqwest::Method::POST,
232        "/v1/analysis/jobs",
233        token,
234        Some(body),
235        &[],
236        &[],
237        false,
238        false,
239    )
240    .await
241}
242
243async fn status(api_url: &str, token: Option<&str>, job_id: Uuid) -> i32 {
244    let path = format!("/v1/analysis/jobs/{job_id}");
245    api_request(
246        api_url,
247        reqwest::Method::GET,
248        &path,
249        token,
250        None,
251        &[],
252        &[],
253        false,
254        false,
255    )
256    .await
257}
258
259fn build_run_request_body(
260    request_file: Option<&str>,
261    objective_flag: Option<String>,
262    objective_positional: Option<String>,
263    horizon_days: Option<i32>,
264    focus: Vec<String>,
265) -> Result<Value, String> {
266    if let Some(path) = request_file {
267        if objective_flag.is_some() || objective_positional.is_some() || horizon_days.is_some() {
268            return Err(
269                "`--request-file` cannot be combined with inline objective/horizon flags"
270                    .to_string(),
271            );
272        }
273        return read_json_from_file(path);
274    }
275
276    let objective = choose_inline_objective(objective_flag, objective_positional)?;
277    let objective = objective
278        .ok_or_else(|| "Missing analysis objective. Provide `--objective \"...\"`, positional OBJECTIVE, or `--request-file`.".to_string())?;
279
280    let normalized_focus = normalize_focus_flags(focus);
281    let mut body = json!({ "objective": objective });
282    let obj = body
283        .as_object_mut()
284        .ok_or_else(|| "analysis request body must be a JSON object".to_string())?;
285    if let Some(days) = horizon_days {
286        obj.insert("horizon_days".to_string(), json!(days));
287    }
288    if !normalized_focus.is_empty() {
289        obj.insert("focus".to_string(), json!(normalized_focus));
290    }
291    Ok(body)
292}
293
294fn choose_inline_objective(
295    objective_flag: Option<String>,
296    objective_positional: Option<String>,
297) -> Result<Option<String>, String> {
298    if objective_flag.is_some() && objective_positional.is_some() {
299        return Err(
300            "Provide the objective either as positional text or via `--objective`, not both."
301                .to_string(),
302        );
303    }
304    Ok(objective_flag
305        .or(objective_positional)
306        .map(|s| s.trim().to_string())
307        .filter(|s| !s.is_empty()))
308}
309
310fn normalize_focus_flags(values: Vec<String>) -> Vec<String> {
311    let mut out = Vec::new();
312    for value in values {
313        let normalized = value.trim();
314        if normalized.is_empty() {
315            continue;
316        }
317        let normalized = normalized.to_string();
318        if !out.contains(&normalized) {
319            out.push(normalized);
320        }
321    }
322    out
323}
324
325fn apply_wait_timeout_override(
326    mut body: Value,
327    wait_timeout_ms: Option<u64>,
328) -> Result<Value, String> {
329    if let Some(timeout_ms) = wait_timeout_ms {
330        let obj = body
331            .as_object_mut()
332            .ok_or_else(|| "analysis request body must be a JSON object".to_string())?;
333        obj.insert("wait_timeout_ms".to_string(), json!(timeout_ms));
334    }
335    Ok(body)
336}
337
338#[derive(Debug, Deserialize)]
339struct CliRunAnalysisResponse {
340    #[allow(dead_code)]
341    mode: Option<String>,
342    terminal: bool,
343    timed_out: bool,
344    #[serde(default)]
345    retry_after_ms: Option<u64>,
346    job: CliJobStatusRef,
347}
348
349#[derive(Debug, Deserialize)]
350struct CliJobStatusRef {
351    job_id: Uuid,
352    status: String,
353}
354
355#[derive(Debug, Deserialize)]
356struct CliJobStatusEnvelope {
357    status: String,
358}
359
360fn parse_cli_run_response(value: &Value) -> Option<CliRunAnalysisResponse> {
361    serde_json::from_value(value.clone()).ok()
362}
363
364fn parse_cli_job_status(value: &Value) -> Option<CliJobStatusEnvelope> {
365    serde_json::from_value(value.clone()).ok()
366}
367
368fn run_response_needs_cli_poll_fallback(response: &CliRunAnalysisResponse) -> bool {
369    response.timed_out
370        && !response.terminal
371        && !analysis_job_status_is_terminal(&response.job.status)
372}
373
374fn analysis_job_status_is_terminal(status: &str) -> bool {
375    matches!(status, "completed" | "failed")
376}
377
378fn clamp_cli_overall_timeout_ms(timeout_ms: Option<u64>) -> u64 {
379    timeout_ms
380        .unwrap_or(CLI_ANALYSIS_RUN_OVERALL_TIMEOUT_MS_DEFAULT)
381        .clamp(
382            CLI_ANALYSIS_RUN_OVERALL_TIMEOUT_MS_MIN,
383            CLI_ANALYSIS_RUN_OVERALL_TIMEOUT_MS_MAX,
384        )
385}
386
387fn clamp_cli_poll_interval_ms(timeout_ms: Option<u64>) -> u64 {
388    timeout_ms
389        .unwrap_or(CLI_ANALYSIS_RUN_POLL_INTERVAL_MS_DEFAULT)
390        .clamp(
391            CLI_ANALYSIS_RUN_POLL_INTERVAL_MS_MIN,
392            CLI_ANALYSIS_RUN_POLL_INTERVAL_MS_MAX,
393        )
394}
395
396fn elapsed_ms(started: Instant) -> u64 {
397    let ms = started.elapsed().as_millis();
398    if ms > u128::from(u64::MAX) {
399        u64::MAX
400    } else {
401        ms as u64
402    }
403}
404
405fn min_u64(a: u64, b: u64) -> u64 {
406    if a < b { a } else { b }
407}
408
409fn build_cli_poll_fallback_output(
410    job: Value,
411    waited_ms: u64,
412    terminal: bool,
413    timed_out: bool,
414    retry_after_ms: Option<u64>,
415    polls: u32,
416    overall_timeout_ms: u64,
417    initial_mode: Option<&str>,
418) -> Value {
419    let mut out = json!({
420        "mode": if terminal { "blocking_cli_poll_fallback_completed" } else { "blocking_cli_poll_fallback_timeout" },
421        "terminal": terminal,
422        "timed_out": timed_out,
423        "waited_ms": waited_ms,
424        "retry_after_ms": retry_after_ms,
425        "job": job,
426        "cli_fallback": {
427            "used": true,
428            "polls": polls,
429            "overall_timeout_ms": overall_timeout_ms,
430            "initial_mode": initial_mode,
431        }
432    });
433    if retry_after_ms.is_none() {
434        if let Some(obj) = out.as_object_mut() {
435            obj.insert("retry_after_ms".to_string(), Value::Null);
436        }
437    }
438    out
439}
440
441async fn request_json(
442    api_url: &str,
443    method: reqwest::Method,
444    path: &str,
445    token: Option<&str>,
446    body: Option<Value>,
447) -> Result<(u16, Value), String> {
448    let url = reqwest::Url::parse(&format!("{api_url}{path}"))
449        .map_err(|e| format!("Invalid URL: {api_url}{path}: {e}"))?;
450
451    let mut req = client().request(method, url);
452    if let Some(t) = token {
453        req = req.header("Authorization", format!("Bearer {t}"));
454    }
455    if let Some(b) = body {
456        req = req.json(&b);
457    }
458
459    let resp = req.send().await.map_err(|e| format!("{e}"))?;
460    let status = resp.status().as_u16();
461    let body: Value = match resp.bytes().await {
462        Ok(bytes) => {
463            if bytes.is_empty() {
464                Value::Null
465            } else {
466                serde_json::from_slice(&bytes)
467                    .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()))
468            }
469        }
470        Err(e) => json!({"raw_error": format!("Failed to read response body: {e}")}),
471    };
472    Ok((status, body))
473}
474
475fn is_success_status(status: u16) -> bool {
476    (200..=299).contains(&status)
477}
478
479fn http_status_exit_code(status: u16) -> i32 {
480    match status {
481        200..=299 => 0,
482        400..=499 => 1,
483        _ => 2,
484    }
485}
486
487fn print_json_response(status: u16, body: &Value) -> i32 {
488    let exit_code = http_status_exit_code(status);
489    let formatted = serde_json::to_string_pretty(body).unwrap_or_else(|_| body.to_string());
490    if exit_code == 0 {
491        println!("{formatted}");
492    } else {
493        eprintln!("{formatted}");
494    }
495    exit_code
496}
497
498#[cfg(test)]
499mod tests {
500    use super::{
501        analysis_job_status_is_terminal, apply_wait_timeout_override, build_run_request_body,
502        clamp_cli_overall_timeout_ms, clamp_cli_poll_interval_ms, parse_cli_job_status,
503        run_response_needs_cli_poll_fallback,
504    };
505    use serde_json::json;
506
507    #[test]
508    fn apply_wait_timeout_override_merges_field_into_request_object() {
509        let body = json!({
510            "objective": "trend of readiness",
511            "horizon_days": 90
512        });
513        let patched = apply_wait_timeout_override(body, Some(2500)).unwrap();
514        assert_eq!(patched["wait_timeout_ms"], json!(2500));
515        assert_eq!(patched["objective"], json!("trend of readiness"));
516    }
517
518    #[test]
519    fn apply_wait_timeout_override_rejects_non_object_when_override_present() {
520        let err = apply_wait_timeout_override(json!(["bad"]), Some(1000)).unwrap_err();
521        assert!(err.contains("JSON object"));
522    }
523
524    #[test]
525    fn build_run_request_body_accepts_inline_objective_and_flags() {
526        let body = build_run_request_body(
527            None,
528            Some("trend of plyometric quality".to_string()),
529            None,
530            Some(90),
531            vec![
532                "plyo".to_string(),
533                "  lower_body ".to_string(),
534                "".to_string(),
535            ],
536        )
537        .unwrap();
538        assert_eq!(body["objective"], json!("trend of plyometric quality"));
539        assert_eq!(body["horizon_days"], json!(90));
540        assert_eq!(body["focus"], json!(["plyo", "lower_body"]));
541    }
542
543    #[test]
544    fn build_run_request_body_supports_positional_objective() {
545        let body = build_run_request_body(
546            None,
547            None,
548            Some("trend of sleep quality".to_string()),
549            None,
550            vec![],
551        )
552        .unwrap();
553        assert_eq!(body["objective"], json!("trend of sleep quality"));
554        assert!(body.get("horizon_days").is_none());
555    }
556
557    #[test]
558    fn build_run_request_body_rejects_missing_input() {
559        let err = build_run_request_body(None, None, None, None, vec![]).unwrap_err();
560        assert!(err.contains("Missing analysis objective"));
561    }
562
563    #[test]
564    fn build_run_request_body_rejects_duplicate_inline_objective_sources() {
565        let err = build_run_request_body(
566            None,
567            Some("a".to_string()),
568            Some("b".to_string()),
569            None,
570            vec![],
571        )
572        .unwrap_err();
573        assert!(err.contains("either as positional"));
574    }
575
576    #[test]
577    fn clamp_cli_timeouts_apply_bounds() {
578        assert_eq!(clamp_cli_overall_timeout_ms(None), 90_000);
579        assert_eq!(clamp_cli_overall_timeout_ms(Some(1)), 1_000);
580        assert_eq!(clamp_cli_overall_timeout_ms(Some(999_999)), 300_000);
581        assert_eq!(clamp_cli_poll_interval_ms(None), 1_000);
582        assert_eq!(clamp_cli_poll_interval_ms(Some(1)), 100);
583        assert_eq!(clamp_cli_poll_interval_ms(Some(50_000)), 5_000);
584    }
585
586    #[test]
587    fn analysis_terminal_status_matches_api_contract() {
588        assert!(analysis_job_status_is_terminal("completed"));
589        assert!(analysis_job_status_is_terminal("failed"));
590        assert!(!analysis_job_status_is_terminal("queued"));
591        assert!(!analysis_job_status_is_terminal("processing"));
592    }
593
594    #[test]
595    fn run_response_fallback_detection_requires_timeout_and_non_terminal_job() {
596        let timed_out = serde_json::from_value(json!({
597            "terminal": false,
598            "timed_out": true,
599            "retry_after_ms": 500,
600            "job": { "job_id": "00000000-0000-0000-0000-000000000000", "status": "queued" }
601        }))
602        .unwrap();
603        assert!(run_response_needs_cli_poll_fallback(&timed_out));
604
605        let completed = serde_json::from_value(json!({
606            "terminal": true,
607            "timed_out": false,
608            "job": { "job_id": "00000000-0000-0000-0000-000000000000", "status": "completed" }
609        }))
610        .unwrap();
611        assert!(!run_response_needs_cli_poll_fallback(&completed));
612    }
613
614    #[test]
615    fn parse_cli_job_status_reads_status_field() {
616        let parsed = parse_cli_job_status(&json!({
617            "job_id": "00000000-0000-0000-0000-000000000000",
618            "status": "queued"
619        }))
620        .unwrap();
621        assert_eq!(parsed.status, "queued");
622    }
623}