1use std::time::{Duration, Instant};
2
3use clap::Subcommand;
4use serde::Deserialize;
5use serde_json::{Value, json};
6use tokio::time::sleep;
7use uuid::Uuid;
8
9use crate::util::{api_request, client, exit_error, read_json_from_file};
10
11const CLI_ANALYSIS_RUN_OVERALL_TIMEOUT_MS_DEFAULT: u64 = 90_000;
12const CLI_ANALYSIS_RUN_OVERALL_TIMEOUT_MS_MIN: u64 = 1_000;
13const CLI_ANALYSIS_RUN_OVERALL_TIMEOUT_MS_MAX: u64 = 300_000;
14const CLI_ANALYSIS_RUN_POLL_INTERVAL_MS_DEFAULT: u64 = 1_000;
15const CLI_ANALYSIS_RUN_POLL_INTERVAL_MS_MIN: u64 = 100;
16const CLI_ANALYSIS_RUN_POLL_INTERVAL_MS_MAX: u64 = 5_000;
17
18#[derive(Subcommand)]
19pub enum AnalysisCommands {
20 Run {
22 #[arg(long, conflicts_with_all = ["objective", "objective_text", "horizon_days", "focus"])]
24 request_file: Option<String>,
25 #[arg(long, short = 'o', conflicts_with = "objective_text")]
27 objective: Option<String>,
28 #[arg(value_name = "OBJECTIVE", conflicts_with = "objective")]
30 objective_text: Option<String>,
31 #[arg(long = "horizon-days", alias = "days")]
33 horizon_days: Option<i32>,
34 #[arg(long)]
36 focus: Vec<String>,
37 #[arg(long)]
39 wait_timeout_ms: Option<u64>,
40 #[arg(long)]
42 overall_timeout_ms: Option<u64>,
43 #[arg(long)]
45 poll_interval_ms: Option<u64>,
46 },
47 Create {
49 #[arg(long)]
51 request_file: String,
52 },
53 Status {
55 #[arg(long)]
57 job_id: Uuid,
58 },
59}
60
61pub async fn run(api_url: &str, token: Option<&str>, command: AnalysisCommands) -> i32 {
62 match command {
63 AnalysisCommands::Run {
64 request_file,
65 objective,
66 objective_text,
67 horizon_days,
68 focus,
69 wait_timeout_ms,
70 overall_timeout_ms,
71 poll_interval_ms,
72 } => {
73 run_blocking(
74 api_url,
75 token,
76 request_file.as_deref(),
77 objective,
78 objective_text,
79 horizon_days,
80 focus,
81 wait_timeout_ms,
82 overall_timeout_ms,
83 poll_interval_ms,
84 )
85 .await
86 }
87 AnalysisCommands::Create { request_file } => create(api_url, token, &request_file).await,
88 AnalysisCommands::Status { job_id } => status(api_url, token, job_id).await,
89 }
90}
91
92async fn run_blocking(
93 api_url: &str,
94 token: Option<&str>,
95 request_file: Option<&str>,
96 objective_flag: Option<String>,
97 objective_positional: Option<String>,
98 horizon_days: Option<i32>,
99 focus: Vec<String>,
100 wait_timeout_ms: Option<u64>,
101 overall_timeout_ms: Option<u64>,
102 poll_interval_ms: Option<u64>,
103) -> i32 {
104 let base_body = build_run_request_body(
105 request_file,
106 objective_flag,
107 objective_positional,
108 horizon_days,
109 focus,
110 )
111 .unwrap_or_else(|e| {
112 exit_error(
113 &e,
114 Some(
115 "Use `kura analysis run --objective \"...\"` (or positional objective) for user-facing analyses, or `--request-file payload.json` for full JSON requests.",
116 ),
117 )
118 });
119 let body = apply_wait_timeout_override(base_body, wait_timeout_ms).unwrap_or_else(|e| {
120 exit_error(
121 &e,
122 Some("Analysis run request must be a JSON object with objective/horizon/focus fields."),
123 )
124 });
125
126 let overall_timeout_ms = clamp_cli_overall_timeout_ms(overall_timeout_ms);
127 let poll_interval_ms = clamp_cli_poll_interval_ms(poll_interval_ms);
128 let started = Instant::now();
129
130 let (status, run_body) = request_json(
131 api_url,
132 reqwest::Method::POST,
133 "/v1/analysis/jobs/run",
134 token,
135 Some(body),
136 )
137 .await
138 .unwrap_or_else(|e| exit_error(&e, Some("Check API availability/auth and retry.")));
139
140 if !is_success_status(status) {
141 return print_json_response(status, &run_body);
142 }
143
144 let Some(run_response) = parse_cli_run_response(&run_body) else {
145 return print_json_response(status, &run_body);
146 };
147
148 if !run_response_needs_cli_poll_fallback(&run_response) {
149 return print_json_response(status, &run_body);
150 }
151
152 let mut latest_job = run_body.get("job").cloned().unwrap_or(Value::Null);
153 let mut polls = 0u32;
154 let mut last_retry_after_ms =
155 clamp_cli_poll_interval_ms(run_response.retry_after_ms.or(Some(poll_interval_ms)));
156 let job_id = run_response.job.job_id;
157 let path = format!("/v1/analysis/jobs/{job_id}");
158
159 loop {
160 let elapsed_total_ms = elapsed_ms(started);
161 if elapsed_total_ms >= overall_timeout_ms {
162 let timeout_output = build_cli_poll_fallback_output(
163 latest_job,
164 elapsed_total_ms,
165 false,
166 true,
167 Some(last_retry_after_ms),
168 polls,
169 overall_timeout_ms,
170 run_response.mode.as_deref(),
171 );
172 return print_json_response(200, &timeout_output);
173 }
174
175 let remaining_ms = overall_timeout_ms.saturating_sub(elapsed_total_ms);
176 let sleep_ms = min_u64(last_retry_after_ms, remaining_ms);
177 sleep(Duration::from_millis(sleep_ms)).await;
178
179 let (poll_status, poll_body) =
180 request_json(api_url, reqwest::Method::GET, &path, token, None)
181 .await
182 .unwrap_or_else(|e| {
183 exit_error(
184 &e,
185 Some(
186 "Blocking analysis fallback polling failed. Retry `kura analysis status --job-id <id>` in the same session if needed.",
187 ),
188 )
189 });
190
191 if !is_success_status(poll_status) {
192 return print_json_response(poll_status, &poll_body);
193 }
194
195 polls = polls.saturating_add(1);
196 latest_job = poll_body.clone();
197
198 if let Some(job_status) = parse_cli_job_status(&poll_body) {
199 if analysis_job_status_is_terminal(&job_status.status) {
200 let final_output = build_cli_poll_fallback_output(
201 poll_body,
202 elapsed_ms(started),
203 true,
204 false,
205 None,
206 polls,
207 overall_timeout_ms,
208 run_response.mode.as_deref(),
209 );
210 return print_json_response(200, &final_output);
211 }
212 } else {
213 return print_json_response(200, &poll_body);
215 }
216
217 last_retry_after_ms = poll_interval_ms;
218 }
219}
220
221async fn create(api_url: &str, token: Option<&str>, request_file: &str) -> i32 {
222 let body = match read_json_from_file(request_file) {
223 Ok(v) => v,
224 Err(e) => {
225 crate::util::exit_error(&e, Some("Provide a valid JSON analysis request payload."))
226 }
227 };
228
229 api_request(
230 api_url,
231 reqwest::Method::POST,
232 "/v1/analysis/jobs",
233 token,
234 Some(body),
235 &[],
236 &[],
237 false,
238 false,
239 )
240 .await
241}
242
243async fn status(api_url: &str, token: Option<&str>, job_id: Uuid) -> i32 {
244 let path = format!("/v1/analysis/jobs/{job_id}");
245 api_request(
246 api_url,
247 reqwest::Method::GET,
248 &path,
249 token,
250 None,
251 &[],
252 &[],
253 false,
254 false,
255 )
256 .await
257}
258
259fn build_run_request_body(
260 request_file: Option<&str>,
261 objective_flag: Option<String>,
262 objective_positional: Option<String>,
263 horizon_days: Option<i32>,
264 focus: Vec<String>,
265) -> Result<Value, String> {
266 if let Some(path) = request_file {
267 if objective_flag.is_some() || objective_positional.is_some() || horizon_days.is_some() {
268 return Err(
269 "`--request-file` cannot be combined with inline objective/horizon flags"
270 .to_string(),
271 );
272 }
273 return read_json_from_file(path);
274 }
275
276 let objective = choose_inline_objective(objective_flag, objective_positional)?;
277 let objective = objective
278 .ok_or_else(|| "Missing analysis objective. Provide `--objective \"...\"`, positional OBJECTIVE, or `--request-file`.".to_string())?;
279
280 let normalized_focus = normalize_focus_flags(focus);
281 let mut body = json!({ "objective": objective });
282 let obj = body
283 .as_object_mut()
284 .ok_or_else(|| "analysis request body must be a JSON object".to_string())?;
285 if let Some(days) = horizon_days {
286 obj.insert("horizon_days".to_string(), json!(days));
287 }
288 if !normalized_focus.is_empty() {
289 obj.insert("focus".to_string(), json!(normalized_focus));
290 }
291 Ok(body)
292}
293
294fn choose_inline_objective(
295 objective_flag: Option<String>,
296 objective_positional: Option<String>,
297) -> Result<Option<String>, String> {
298 if objective_flag.is_some() && objective_positional.is_some() {
299 return Err(
300 "Provide the objective either as positional text or via `--objective`, not both."
301 .to_string(),
302 );
303 }
304 Ok(objective_flag
305 .or(objective_positional)
306 .map(|s| s.trim().to_string())
307 .filter(|s| !s.is_empty()))
308}
309
310fn normalize_focus_flags(values: Vec<String>) -> Vec<String> {
311 let mut out = Vec::new();
312 for value in values {
313 let normalized = value.trim();
314 if normalized.is_empty() {
315 continue;
316 }
317 let normalized = normalized.to_string();
318 if !out.contains(&normalized) {
319 out.push(normalized);
320 }
321 }
322 out
323}
324
325fn apply_wait_timeout_override(
326 mut body: Value,
327 wait_timeout_ms: Option<u64>,
328) -> Result<Value, String> {
329 if let Some(timeout_ms) = wait_timeout_ms {
330 let obj = body
331 .as_object_mut()
332 .ok_or_else(|| "analysis request body must be a JSON object".to_string())?;
333 obj.insert("wait_timeout_ms".to_string(), json!(timeout_ms));
334 }
335 Ok(body)
336}
337
338#[derive(Debug, Deserialize)]
339struct CliRunAnalysisResponse {
340 #[allow(dead_code)]
341 mode: Option<String>,
342 terminal: bool,
343 timed_out: bool,
344 #[serde(default)]
345 retry_after_ms: Option<u64>,
346 job: CliJobStatusRef,
347}
348
349#[derive(Debug, Deserialize)]
350struct CliJobStatusRef {
351 job_id: Uuid,
352 status: String,
353}
354
355#[derive(Debug, Deserialize)]
356struct CliJobStatusEnvelope {
357 status: String,
358}
359
360fn parse_cli_run_response(value: &Value) -> Option<CliRunAnalysisResponse> {
361 serde_json::from_value(value.clone()).ok()
362}
363
364fn parse_cli_job_status(value: &Value) -> Option<CliJobStatusEnvelope> {
365 serde_json::from_value(value.clone()).ok()
366}
367
368fn run_response_needs_cli_poll_fallback(response: &CliRunAnalysisResponse) -> bool {
369 response.timed_out
370 && !response.terminal
371 && !analysis_job_status_is_terminal(&response.job.status)
372}
373
374fn analysis_job_status_is_terminal(status: &str) -> bool {
375 matches!(status, "completed" | "failed")
376}
377
378fn clamp_cli_overall_timeout_ms(timeout_ms: Option<u64>) -> u64 {
379 timeout_ms
380 .unwrap_or(CLI_ANALYSIS_RUN_OVERALL_TIMEOUT_MS_DEFAULT)
381 .clamp(
382 CLI_ANALYSIS_RUN_OVERALL_TIMEOUT_MS_MIN,
383 CLI_ANALYSIS_RUN_OVERALL_TIMEOUT_MS_MAX,
384 )
385}
386
387fn clamp_cli_poll_interval_ms(timeout_ms: Option<u64>) -> u64 {
388 timeout_ms
389 .unwrap_or(CLI_ANALYSIS_RUN_POLL_INTERVAL_MS_DEFAULT)
390 .clamp(
391 CLI_ANALYSIS_RUN_POLL_INTERVAL_MS_MIN,
392 CLI_ANALYSIS_RUN_POLL_INTERVAL_MS_MAX,
393 )
394}
395
396fn elapsed_ms(started: Instant) -> u64 {
397 let ms = started.elapsed().as_millis();
398 if ms > u128::from(u64::MAX) {
399 u64::MAX
400 } else {
401 ms as u64
402 }
403}
404
405fn min_u64(a: u64, b: u64) -> u64 {
406 if a < b { a } else { b }
407}
408
409fn build_cli_poll_fallback_output(
410 job: Value,
411 waited_ms: u64,
412 terminal: bool,
413 timed_out: bool,
414 retry_after_ms: Option<u64>,
415 polls: u32,
416 overall_timeout_ms: u64,
417 initial_mode: Option<&str>,
418) -> Value {
419 let mut out = json!({
420 "mode": if terminal { "blocking_cli_poll_fallback_completed" } else { "blocking_cli_poll_fallback_timeout" },
421 "terminal": terminal,
422 "timed_out": timed_out,
423 "waited_ms": waited_ms,
424 "retry_after_ms": retry_after_ms,
425 "job": job,
426 "cli_fallback": {
427 "used": true,
428 "polls": polls,
429 "overall_timeout_ms": overall_timeout_ms,
430 "initial_mode": initial_mode,
431 }
432 });
433 if retry_after_ms.is_none() {
434 if let Some(obj) = out.as_object_mut() {
435 obj.insert("retry_after_ms".to_string(), Value::Null);
436 }
437 }
438 out
439}
440
441async fn request_json(
442 api_url: &str,
443 method: reqwest::Method,
444 path: &str,
445 token: Option<&str>,
446 body: Option<Value>,
447) -> Result<(u16, Value), String> {
448 let url = reqwest::Url::parse(&format!("{api_url}{path}"))
449 .map_err(|e| format!("Invalid URL: {api_url}{path}: {e}"))?;
450
451 let mut req = client().request(method, url);
452 if let Some(t) = token {
453 req = req.header("Authorization", format!("Bearer {t}"));
454 }
455 if let Some(b) = body {
456 req = req.json(&b);
457 }
458
459 let resp = req.send().await.map_err(|e| format!("{e}"))?;
460 let status = resp.status().as_u16();
461 let body: Value = match resp.bytes().await {
462 Ok(bytes) => {
463 if bytes.is_empty() {
464 Value::Null
465 } else {
466 serde_json::from_slice(&bytes)
467 .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()))
468 }
469 }
470 Err(e) => json!({"raw_error": format!("Failed to read response body: {e}")}),
471 };
472 Ok((status, body))
473}
474
475fn is_success_status(status: u16) -> bool {
476 (200..=299).contains(&status)
477}
478
479fn http_status_exit_code(status: u16) -> i32 {
480 match status {
481 200..=299 => 0,
482 400..=499 => 1,
483 _ => 2,
484 }
485}
486
487fn print_json_response(status: u16, body: &Value) -> i32 {
488 let exit_code = http_status_exit_code(status);
489 let formatted = serde_json::to_string_pretty(body).unwrap_or_else(|_| body.to_string());
490 if exit_code == 0 {
491 println!("{formatted}");
492 } else {
493 eprintln!("{formatted}");
494 }
495 exit_code
496}
497
498#[cfg(test)]
499mod tests {
500 use super::{
501 analysis_job_status_is_terminal, apply_wait_timeout_override, build_run_request_body,
502 clamp_cli_overall_timeout_ms, clamp_cli_poll_interval_ms, parse_cli_job_status,
503 run_response_needs_cli_poll_fallback,
504 };
505 use serde_json::json;
506
507 #[test]
508 fn apply_wait_timeout_override_merges_field_into_request_object() {
509 let body = json!({
510 "objective": "trend of readiness",
511 "horizon_days": 90
512 });
513 let patched = apply_wait_timeout_override(body, Some(2500)).unwrap();
514 assert_eq!(patched["wait_timeout_ms"], json!(2500));
515 assert_eq!(patched["objective"], json!("trend of readiness"));
516 }
517
518 #[test]
519 fn apply_wait_timeout_override_rejects_non_object_when_override_present() {
520 let err = apply_wait_timeout_override(json!(["bad"]), Some(1000)).unwrap_err();
521 assert!(err.contains("JSON object"));
522 }
523
524 #[test]
525 fn build_run_request_body_accepts_inline_objective_and_flags() {
526 let body = build_run_request_body(
527 None,
528 Some("trend of plyometric quality".to_string()),
529 None,
530 Some(90),
531 vec![
532 "plyo".to_string(),
533 " lower_body ".to_string(),
534 "".to_string(),
535 ],
536 )
537 .unwrap();
538 assert_eq!(body["objective"], json!("trend of plyometric quality"));
539 assert_eq!(body["horizon_days"], json!(90));
540 assert_eq!(body["focus"], json!(["plyo", "lower_body"]));
541 }
542
543 #[test]
544 fn build_run_request_body_supports_positional_objective() {
545 let body = build_run_request_body(
546 None,
547 None,
548 Some("trend of sleep quality".to_string()),
549 None,
550 vec![],
551 )
552 .unwrap();
553 assert_eq!(body["objective"], json!("trend of sleep quality"));
554 assert!(body.get("horizon_days").is_none());
555 }
556
557 #[test]
558 fn build_run_request_body_rejects_missing_input() {
559 let err = build_run_request_body(None, None, None, None, vec![]).unwrap_err();
560 assert!(err.contains("Missing analysis objective"));
561 }
562
563 #[test]
564 fn build_run_request_body_rejects_duplicate_inline_objective_sources() {
565 let err = build_run_request_body(
566 None,
567 Some("a".to_string()),
568 Some("b".to_string()),
569 None,
570 vec![],
571 )
572 .unwrap_err();
573 assert!(err.contains("either as positional"));
574 }
575
576 #[test]
577 fn clamp_cli_timeouts_apply_bounds() {
578 assert_eq!(clamp_cli_overall_timeout_ms(None), 90_000);
579 assert_eq!(clamp_cli_overall_timeout_ms(Some(1)), 1_000);
580 assert_eq!(clamp_cli_overall_timeout_ms(Some(999_999)), 300_000);
581 assert_eq!(clamp_cli_poll_interval_ms(None), 1_000);
582 assert_eq!(clamp_cli_poll_interval_ms(Some(1)), 100);
583 assert_eq!(clamp_cli_poll_interval_ms(Some(50_000)), 5_000);
584 }
585
586 #[test]
587 fn analysis_terminal_status_matches_api_contract() {
588 assert!(analysis_job_status_is_terminal("completed"));
589 assert!(analysis_job_status_is_terminal("failed"));
590 assert!(!analysis_job_status_is_terminal("queued"));
591 assert!(!analysis_job_status_is_terminal("processing"));
592 }
593
594 #[test]
595 fn run_response_fallback_detection_requires_timeout_and_non_terminal_job() {
596 let timed_out = serde_json::from_value(json!({
597 "terminal": false,
598 "timed_out": true,
599 "retry_after_ms": 500,
600 "job": { "job_id": "00000000-0000-0000-0000-000000000000", "status": "queued" }
601 }))
602 .unwrap();
603 assert!(run_response_needs_cli_poll_fallback(&timed_out));
604
605 let completed = serde_json::from_value(json!({
606 "terminal": true,
607 "timed_out": false,
608 "job": { "job_id": "00000000-0000-0000-0000-000000000000", "status": "completed" }
609 }))
610 .unwrap();
611 assert!(!run_response_needs_cli_poll_fallback(&completed));
612 }
613
614 #[test]
615 fn parse_cli_job_status_reads_status_field() {
616 let parsed = parse_cli_job_status(&json!({
617 "job_id": "00000000-0000-0000-0000-000000000000",
618 "status": "queued"
619 }))
620 .unwrap();
621 assert_eq!(parsed.status, "queued");
622 }
623}