1use clap::{Args, Subcommand, ValueEnum};
2use serde_json::{Map, Value, json};
3
4use crate::util::{
5 RawApiResponse, dry_run_enabled, emit_dry_run_request, exit_error, print_json_stderr,
6 print_json_stdout, raw_api_request_json, raw_api_request_with_query,
7};
8
9const READ_SOURCE_CATALOG_SECTION: &str =
10 "system_config.conventions::public_agent_read_source_catalog_v1";
11const READ_SOURCE_CATALOG_ENDPOINT: &str = "/v1/system/config/section";
12const READ_QUERY_ENDPOINT: &str = "/v4/agent/read-query";
13const READ_QUERY_SCHEMA_VERSION: &str = "read_query_request.v1";
14const READ_QUERY_HELP: &str = r#"Examples:
15 kura read_query --source-id activity_session_history --read-kind list --subject day_history --filters-json '{"date_from":"2026-04-02","date_to":"2026-04-08"}' --output-shape collection
16 kura read_query --source-id activity_session_history --read-kind lookup --subject session_detail --filters-json '{"session_id":"activity_session_019d77b001c472c1ae842d2dba567099"}' --limit 8 --output-shape item
17 kura read_query --source-id activity_session_history --read-kind aggregate --subject block_history --filters-json '{"date_from":"2026-04-01"}' --query-json '{"group_by":"exercise_identity","aggregate":"load_kg_max","sort":{"field":"load_kg_max","direction":"desc"}}' --limit 20 --output-shape aggregate
18 kura read_query --source-id performance_tests --read-kind timeline --subject test_timeline --filters-json '{"test_type":"cmj","measured_property":"jump_height_cm","date_from":"2026-01-01"}' --output-shape series
19
20Rules:
21 Use read_query for facts and bounded summaries. Use analyze for trend, stagnation, driver, influence, or training-decision questions.
22 Use source_id=performance_tests for benchmark, jump-test, and performance-test history questions; common aliases such as benchmarks are normalized to performance_tests.
23 Use activity_session_history day_history for compact multi-day recall, session_detail for one exact saved session, and block_history plus --query-json for programmable activity history reads.
24 Use activity_session_history comparison_stream for exact progress; exercise_identity is only the movement-family view when variants are mixed.
25 Continue with the returned cursor only when continuation.has_more=true.
26"#;
27
28pub fn public_read_query_payload_contract() -> Value {
29 json!({
30 "schema_version": "public_read_query_payload_contract.v1",
31 "entrypoint": "kura read_query",
32 "truth_namespace": "canonical_read_query",
33 "principles": [
34 "Use exactly one canonical source_id per call.",
35 "Use cursor only for the continuation returned by a previous read_query response.",
36 "For activity_session_history, use day_history for compact recall, session_detail for exact session detail, and block_history plus query for bounded programmable history reads.",
37 "For activity_session_history, use comparison_stream as the exact progress grouping; exercise_identity is only the movement-family view when variants are mixed."
38 ],
39 "examples": [
40 {
41 "label": "Compact activity history for recent days",
42 "payload": {
43 "source_id": "activity_session_history",
44 "read_kind": "list",
45 "subject": "day_history",
46 "filters": {
47 "date_from": "2026-04-02",
48 "date_to": "2026-04-08"
49 },
50 "output_shape": "collection"
51 }
52 },
53 {
54 "label": "Bounded block-history aggregation",
55 "payload": {
56 "source_id": "activity_session_history",
57 "read_kind": "aggregate",
58 "subject": "block_history",
59 "filters": {
60 "date_from": "2026-04-01"
61 },
62 "query": {
63 "group_by": "exercise_identity",
64 "aggregate": "load_kg_max",
65 "sort": {
66 "field": "load_kg_max",
67 "direction": "desc"
68 }
69 },
70 "limit": 20,
71 "output_shape": "aggregate"
72 }
73 },
74 {
75 "label": "Performance-test timeline",
76 "payload": {
77 "source_id": "performance_tests",
78 "read_kind": "timeline",
79 "subject": "test_timeline",
80 "filters": {
81 "test_type": "cmj",
82 "measured_property": "jump_height_cm",
83 "date_from": "2026-01-01"
84 },
85 "output_shape": "series"
86 }
87 }
88 ]
89 })
90}
91
92#[derive(Subcommand)]
93pub enum ReadCommands {
94 Sources(ReadSourcesArgs),
96 Query(ReadQueryArgs),
98}
99
100#[derive(Args)]
101pub struct ReadSourcesArgs {
102 #[arg(long, alias = "source")]
104 source_id: Option<String>,
105}
106
107#[derive(Args)]
108#[command(after_help = READ_QUERY_HELP)]
109pub struct ReadQueryArgs {
110 #[arg(long)]
112 source_id: String,
113 #[arg(long, value_enum)]
115 read_kind: ReadKindArg,
116 #[arg(long)]
118 subject: Option<String>,
119 #[arg(long)]
121 lookup_key: Option<String>,
122 #[arg(long)]
124 date: Option<String>,
125 #[arg(long)]
127 date_from: Option<String>,
128 #[arg(long)]
130 date_to: Option<String>,
131 #[arg(long = "filters-json", alias = "filters")]
133 filters_json: Option<String>,
134 #[arg(long = "query-json", alias = "query")]
136 query_json: Option<String>,
137 #[arg(long)]
139 limit: Option<u32>,
140 #[arg(long)]
142 cursor: Option<String>,
143 #[arg(long)]
145 result_handle: Option<String>,
146 #[arg(long, value_enum)]
148 output_shape: Option<ReadOutputShapeArg>,
149 #[arg(long, value_enum)]
151 analysis_mode: Option<ReadAnalysisModeArg>,
152}
153
154#[derive(Copy, Clone, Debug, Eq, PartialEq, ValueEnum)]
155pub enum ReadKindArg {
156 Lookup,
157 List,
158 Timeline,
159 Aggregate,
160}
161
162impl ReadKindArg {
163 fn as_contract_str(self) -> &'static str {
164 match self {
165 Self::Lookup => "lookup",
166 Self::List => "list",
167 Self::Timeline => "timeline",
168 Self::Aggregate => "aggregate",
169 }
170 }
171}
172
173#[derive(Copy, Clone, Debug, Eq, PartialEq, ValueEnum)]
174pub enum ReadOutputShapeArg {
175 Item,
176 Collection,
177 Series,
178 Aggregate,
179 #[value(name = "analysis_handoff")]
180 AnalysisHandoff,
181}
182
183impl ReadOutputShapeArg {
184 fn as_contract_str(self) -> &'static str {
185 match self {
186 Self::Item => "item",
187 Self::Collection => "collection",
188 Self::Series => "series",
189 Self::Aggregate => "aggregate",
190 Self::AnalysisHandoff => "analysis_handoff",
191 }
192 }
193}
194
195#[derive(Copy, Clone, Debug, Eq, PartialEq, ValueEnum)]
196pub enum ReadAnalysisModeArg {
197 None,
198 #[value(name = "allow_hybrid")]
199 AllowHybrid,
200 #[value(name = "require_analysis")]
201 RequireAnalysis,
202}
203
204impl ReadAnalysisModeArg {
205 fn as_contract_str(self) -> &'static str {
206 match self {
207 Self::None => "none",
208 Self::AllowHybrid => "allow_hybrid",
209 Self::RequireAnalysis => "require_analysis",
210 }
211 }
212}
213
214pub async fn run(api_url: &str, token: Option<&str>, command: ReadCommands) -> i32 {
215 match command {
216 ReadCommands::Sources(args) => read_sources(api_url, token, args).await,
217 ReadCommands::Query(args) => read_query(api_url, token, args).await,
218 }
219}
220
221pub async fn run_query(api_url: &str, token: Option<&str>, args: ReadQueryArgs) -> i32 {
222 read_query(api_url, token, args).await
223}
224
225async fn read_sources(api_url: &str, token: Option<&str>, args: ReadSourcesArgs) -> i32 {
226 let query = vec![(
227 "section".to_string(),
228 READ_SOURCE_CATALOG_SECTION.to_string(),
229 )];
230
231 if dry_run_enabled() {
232 return emit_dry_run_request(
233 &reqwest::Method::GET,
234 api_url,
235 READ_SOURCE_CATALOG_ENDPOINT,
236 token.is_some(),
237 None,
238 &query,
239 &[],
240 false,
241 Some("Fetch the machine-readable read source catalog for agent routing."),
242 );
243 }
244
245 let response = raw_api_request_with_query(
246 api_url,
247 reqwest::Method::GET,
248 READ_SOURCE_CATALOG_ENDPOINT,
249 token,
250 &query,
251 )
252 .await
253 .unwrap_or_else(|error| {
254 exit_error(
255 &format!("Failed to reach {READ_SOURCE_CATALOG_ENDPOINT}: {error}"),
256 Some("Check API availability/auth and retry."),
257 )
258 });
259
260 let (status, body) = response;
261 if (200..=299).contains(&status) {
262 let output = build_read_sources_output(body, args.source_id.as_deref())
263 .unwrap_or_else(|(message, docs_hint)| exit_error(&message, docs_hint.as_deref()));
264 print_json_stdout(&output);
265 0
266 } else {
267 print_json_stderr(&body);
268 if (400..=499).contains(&status) { 1 } else { 2 }
269 }
270}
271
272async fn read_query(api_url: &str, token: Option<&str>, args: ReadQueryArgs) -> i32 {
273 let body = build_read_query_body(&args);
274 execute_json_read(api_url, token, READ_QUERY_ENDPOINT, body).await
275}
276
277async fn execute_json_read(api_url: &str, token: Option<&str>, path: &str, body: Value) -> i32 {
278 if dry_run_enabled() {
279 return emit_dry_run_request(
280 &reqwest::Method::POST,
281 api_url,
282 path,
283 token.is_some(),
284 Some(&body),
285 &[],
286 &[],
287 false,
288 None,
289 );
290 }
291
292 let response = raw_api_request_json(
293 api_url,
294 reqwest::Method::POST,
295 path,
296 token,
297 Some(body),
298 &[],
299 &[],
300 )
301 .await
302 .unwrap_or_else(|error| {
303 exit_error(
304 &format!("Failed to reach {path}: {error}"),
305 Some("Check API availability/auth and retry."),
306 )
307 });
308
309 emit_json_response(response)
310}
311
312fn emit_json_response(response: RawApiResponse) -> i32 {
313 if (200..=299).contains(&response.status) {
314 print_json_stdout(&response.body);
315 0
316 } else {
317 print_json_stderr(&response.body);
318 if (400..=499).contains(&response.status) {
319 1
320 } else {
321 2
322 }
323 }
324}
325
326fn build_read_query_body(args: &ReadQueryArgs) -> Value {
327 let source_id =
328 canonical_read_source_id(&required_non_empty_string(&args.source_id, "--source-id"));
329 if matches!(
330 source_id.as_str(),
331 "authoritative_training_memory" | "training_timeline"
332 ) {
333 exit_error(
334 "`kura read_query` no longer accepts removed legacy training sources.",
335 Some(
336 "Use --source-id activity_session_history for saved training/activity history, or use `kura analyze` for trend and driver questions.",
337 ),
338 );
339 }
340
341 let mut body = Map::new();
342 body.insert(
343 "schema_version".to_string(),
344 json!(READ_QUERY_SCHEMA_VERSION),
345 );
346 body.insert("source_id".to_string(), json!(source_id));
347 body.insert(
348 "read_kind".to_string(),
349 json!(args.read_kind.as_contract_str()),
350 );
351
352 if let Some(subject) = optional_non_empty_string(args.subject.as_deref()) {
353 body.insert("subject".to_string(), json!(subject));
354 }
355
356 let mut filters =
357 parse_optional_json_object_arg(args.filters_json.as_deref(), "--filters-json");
358 insert_optional_string_checked(
359 &mut filters,
360 "lookup_key",
361 args.lookup_key.as_deref(),
362 "--lookup-key",
363 );
364 insert_optional_string_checked(&mut filters, "date", args.date.as_deref(), "--date");
365 insert_optional_string_checked(
366 &mut filters,
367 "date_from",
368 args.date_from.as_deref(),
369 "--date-from",
370 );
371 insert_optional_string_checked(
372 &mut filters,
373 "date_to",
374 args.date_to.as_deref(),
375 "--date-to",
376 );
377 if !filters.is_empty() {
378 body.insert("filters".to_string(), Value::Object(filters));
379 }
380 let query = parse_optional_json_object_arg(args.query_json.as_deref(), "--query-json");
381 if !query.is_empty() {
382 body.insert("query".to_string(), Value::Object(query));
383 }
384
385 if let Some(limit) = args.limit {
386 body.insert("limit".to_string(), json!(limit));
387 }
388 insert_optional_string(&mut body, "cursor", args.cursor.as_deref());
389 insert_optional_string(&mut body, "result_handle", args.result_handle.as_deref());
390 if let Some(output_shape) = args.output_shape {
391 body.insert(
392 "output_shape".to_string(),
393 json!(output_shape.as_contract_str()),
394 );
395 }
396 if let Some(analysis_mode) = args.analysis_mode {
397 body.insert(
398 "analysis_mode".to_string(),
399 json!(analysis_mode.as_contract_str()),
400 );
401 }
402
403 Value::Object(body)
404}
405
406fn build_read_sources_output(
407 section_body: Value,
408 selected_source_id: Option<&str>,
409) -> Result<Value, (String, Option<String>)> {
410 let catalog = section_body
411 .pointer("/value/contract")
412 .and_then(Value::as_object)
413 .cloned()
414 .ok_or_else(|| {
415 (
416 "Read source catalog section did not expose a contract payload.".to_string(),
417 Some(
418 "Check system_config.conventions::public_agent_read_source_catalog_v1 and retry."
419 .to_string(),
420 ),
421 )
422 })?;
423 let root = section_body.as_object().ok_or_else(|| {
424 (
425 "Read source catalog response must be a JSON object.".to_string(),
426 Some("Retry once the server returns the readable source catalog section.".to_string()),
427 )
428 })?;
429
430 let catalog_ref = READ_SOURCE_CATALOG_SECTION
431 .split("::")
432 .last()
433 .unwrap_or("public_agent_read_source_catalog_v1");
434
435 let section = root
436 .get("section")
437 .and_then(Value::as_str)
438 .unwrap_or(READ_SOURCE_CATALOG_SECTION);
439 let handle = root.get("handle").cloned().unwrap_or(Value::Null);
440 let version = root.get("version").cloned().unwrap_or(Value::Null);
441 let updated_at = root.get("updated_at").cloned().unwrap_or(Value::Null);
442
443 if let Some(source_id) =
444 selected_source_id.and_then(|value| optional_non_empty_string(Some(value)))
445 {
446 let sources = catalog
447 .get("sources")
448 .and_then(Value::as_object)
449 .ok_or_else(|| {
450 (
451 "Read source catalog contract did not expose a sources map.".to_string(),
452 Some("Check the source catalog contract and retry.".to_string()),
453 )
454 })?;
455 let source = sources.get(source_id.as_str()).cloned().ok_or_else(|| {
456 let available = sources.keys().cloned().collect::<Vec<_>>();
457 (
458 format!("Unknown read source_id '{source_id}'."),
459 Some(format!(
460 "Use one declared source_id from the read_query schema, for example: {}.",
461 available.join(", ")
462 )),
463 )
464 })?;
465 return Ok(json!({
466 "catalog_ref": catalog_ref,
467 "catalog_section": section,
468 "catalog_handle": handle,
469 "catalog_version": version,
470 "catalog_updated_at": updated_at,
471 "source_id": source_id,
472 "source": source,
473 }));
474 }
475
476 let mut output = catalog;
477 output.insert("catalog_ref".to_string(), json!(catalog_ref));
478 output.insert("catalog_section".to_string(), json!(section));
479 output.insert("catalog_handle".to_string(), handle);
480 output.insert("catalog_version".to_string(), version);
481 output.insert("catalog_updated_at".to_string(), updated_at);
482 Ok(Value::Object(output))
483}
484
485fn optional_non_empty_string(raw: Option<&str>) -> Option<String> {
486 raw.map(str::trim)
487 .filter(|value| !value.is_empty())
488 .map(str::to_string)
489}
490
491fn required_non_empty_string(raw: &str, flag_name: &str) -> String {
492 optional_non_empty_string(Some(raw)).unwrap_or_else(|| {
493 exit_error(
494 &format!("`kura read` requires a non-empty {flag_name} value."),
495 Some("Provide the exact contract value instead of an empty string."),
496 )
497 })
498}
499
500fn canonical_read_source_id(source_id: &str) -> String {
501 match source_id
502 .trim()
503 .to_ascii_lowercase()
504 .replace('-', "_")
505 .replace(' ', "_")
506 .as_str()
507 {
508 "benchmark"
509 | "benchmarks"
510 | "benchmark_results"
511 | "performance_test"
512 | "performance_test_truth"
513 | "jump_tests" => "performance_tests".to_string(),
514 _ => source_id.trim().to_string(),
515 }
516}
517
518fn parse_optional_json_object_arg(raw: Option<&str>, flag_name: &str) -> Map<String, Value> {
519 let Some(raw) = optional_non_empty_string(raw) else {
520 return Map::new();
521 };
522 let parsed: Value = serde_json::from_str(raw.as_str()).unwrap_or_else(|error| {
523 exit_error(
524 &format!("`kura read_query` could not parse {flag_name} as JSON: {error}"),
525 Some("Pass a JSON object, for example --filters-json '{\"date\":\"2026-04-02\"}'."),
526 )
527 });
528 parsed.as_object().cloned().unwrap_or_else(|| {
529 exit_error(
530 &format!("`kura read_query` requires {flag_name} to be a JSON object."),
531 Some("Use an object like '{\"date_from\":\"2026-04-01\"}', not an array or string."),
532 )
533 })
534}
535
536fn insert_json_field_checked(
537 target: &mut Map<String, Value>,
538 field: &str,
539 value: Value,
540 flag_name: &str,
541) {
542 if let Some(existing) = target.get(field) {
543 if existing == &value {
544 return;
545 }
546 exit_error(
547 &format!("`kura read_query` received {field} twice with different values."),
548 Some(&format!(
549 "Use either {flag_name} or the same field inside the JSON object, not both."
550 )),
551 );
552 }
553 target.insert(field.to_string(), value);
554}
555
556fn insert_optional_string_checked(
557 target: &mut Map<String, Value>,
558 field: &str,
559 raw: Option<&str>,
560 flag_name: &str,
561) {
562 if let Some(value) = optional_non_empty_string(raw) {
563 insert_json_field_checked(target, field, json!(value), flag_name);
564 }
565}
566
567fn insert_optional_string(target: &mut Map<String, Value>, field: &str, raw: Option<&str>) {
568 if let Some(value) = optional_non_empty_string(raw) {
569 target.insert(field.to_string(), json!(value));
570 }
571}
572
573#[cfg(test)]
574mod tests {
575 use super::*;
576
577 #[test]
578 fn build_read_query_body_serializes_live_public_read_contract() {
579 let body = build_read_query_body(&ReadQueryArgs {
580 source_id: "activity_session_history".to_string(),
581 read_kind: ReadKindArg::Aggregate,
582 subject: Some("activity_summary".to_string()),
583 lookup_key: None,
584 date: None,
585 date_from: Some("2026-04-01".to_string()),
586 date_to: Some("2026-04-07".to_string()),
587 filters_json: None,
588 query_json: None,
589 limit: Some(20),
590 cursor: Some("rq1:opaque".to_string()),
591 result_handle: None,
592 output_shape: Some(ReadOutputShapeArg::Aggregate),
593 analysis_mode: Some(ReadAnalysisModeArg::AllowHybrid),
594 });
595
596 assert_eq!(body["schema_version"], json!(READ_QUERY_SCHEMA_VERSION));
597 assert_eq!(body["source_id"], json!("activity_session_history"));
598 assert_eq!(body["read_kind"], json!("aggregate"));
599 assert_eq!(body["subject"], json!("activity_summary"));
600 assert_eq!(body["filters"]["date_from"], json!("2026-04-01"));
601 assert_eq!(body["filters"]["date_to"], json!("2026-04-07"));
602 assert_eq!(body["limit"], json!(20));
603 assert_eq!(body["cursor"], json!("rq1:opaque"));
604 assert_eq!(body["output_shape"], json!("aggregate"));
605 assert_eq!(body["analysis_mode"], json!("allow_hybrid"));
606 }
607
608 #[test]
609 fn build_read_query_body_omits_empty_optional_sections() {
610 let body = build_read_query_body(&ReadQueryArgs {
611 source_id: "recovery".to_string(),
612 read_kind: ReadKindArg::Timeline,
613 subject: None,
614 lookup_key: Some(" ".to_string()),
615 date: None,
616 date_from: None,
617 date_to: None,
618 filters_json: None,
619 query_json: None,
620 limit: None,
621 cursor: None,
622 result_handle: None,
623 output_shape: Some(ReadOutputShapeArg::Series),
624 analysis_mode: None,
625 });
626
627 assert!(body.get("filters").is_none());
628 assert_eq!(body["source_id"], json!("recovery"));
629 assert_eq!(body["read_kind"], json!("timeline"));
630 assert_eq!(body["output_shape"], json!("series"));
631 }
632
633 #[test]
634 fn build_read_query_body_accepts_json_filters() {
635 let body = build_read_query_body(&ReadQueryArgs {
636 source_id: "activity_session_history".to_string(),
637 read_kind: ReadKindArg::Aggregate,
638 subject: None,
639 lookup_key: None,
640 date: None,
641 date_from: None,
642 date_to: None,
643 filters_json: Some(r#"{"date_from":"2026-03-01","date_to":"2026-04-01","block_kind":"repetition_sets"}"#.to_string()),
644 query_json: Some(r#"{"group_by":"exercise_identity","aggregate":"load_kg_max","sort":{"field":"load_kg_max","direction":"desc"}}"#.to_string()),
645 limit: Some(10),
646 cursor: None,
647 result_handle: None,
648 output_shape: Some(ReadOutputShapeArg::Aggregate),
649 analysis_mode: Some(ReadAnalysisModeArg::AllowHybrid),
650 });
651
652 assert_eq!(body["filters"]["date_from"], json!("2026-03-01"));
653 assert_eq!(body["filters"]["date_to"], json!("2026-04-01"));
654 assert_eq!(body["filters"]["block_kind"], json!("repetition_sets"));
655 assert_eq!(body["query"]["group_by"], json!("exercise_identity"));
656 assert_eq!(body["query"]["aggregate"], json!("load_kg_max"));
657 }
658
659 #[test]
660 fn build_read_query_body_normalizes_benchmark_source_aliases() {
661 let body = build_read_query_body(&ReadQueryArgs {
662 source_id: "benchmarks".to_string(),
663 read_kind: ReadKindArg::Timeline,
664 subject: Some("test_timeline".to_string()),
665 lookup_key: None,
666 date: None,
667 date_from: None,
668 date_to: None,
669 filters_json: Some(r#"{"test_type":"cmj"}"#.to_string()),
670 query_json: None,
671 limit: None,
672 cursor: None,
673 result_handle: None,
674 output_shape: Some(ReadOutputShapeArg::Series),
675 analysis_mode: None,
676 });
677
678 assert_eq!(body["source_id"], json!("performance_tests"));
679 assert_eq!(body["filters"]["test_type"], json!("cmj"));
680 }
681
682 #[test]
683 fn build_read_sources_output_returns_single_source_card_when_requested() {
684 let output = build_read_sources_output(
685 json!({
686 "section": READ_SOURCE_CATALOG_SECTION,
687 "handle": "system_config/global@v9",
688 "version": 9,
689 "updated_at": "2026-04-02T10:00:00Z",
690 "value": {
691 "rules": ["x"],
692 "contract": {
693 "schema_version": "public_agent_read_source_catalog.v1",
694 "sources": {
695 "recovery": {
696 "source_id": "recovery",
697 "supported_read_kinds": ["lookup", "list", "timeline", "aggregate"]
698 }
699 }
700 }
701 }
702 }),
703 Some("recovery"),
704 )
705 .expect("single-source output should build");
706
707 assert_eq!(
708 output["catalog_ref"],
709 json!("public_agent_read_source_catalog_v1")
710 );
711 assert_eq!(output["source_id"], json!("recovery"));
712 assert_eq!(output["source"]["source_id"], json!("recovery"));
713 assert_eq!(output["catalog_handle"], json!("system_config/global@v9"));
714 }
715}