Skip to main content

control_query_agent_routed/
control_query_agent_routed.rs

1#[path = "support/utils.rs"]
2mod _utils;
3
4use _utils::{boxed_error, cleanup_run, create_client, new_run_id, print_summary, require};
5use mubit_sdk::TransportMode;
6use serde_json::{json, Value};
7use std::error::Error;
8use std::time::Instant;
9use tokio::time::{sleep, Duration};
10
11#[tokio::main(flavor = "current_thread")]
12async fn main() -> Result<(), Box<dyn Error>> {
13    let name = "control_query_agent_routed";
14    let started = Instant::now();
15    let client = create_client().await?;
16    let run_id = new_run_id("control_query_agent_routed");
17    client.set_run_id(Some(run_id.clone()));
18    client.set_transport(TransportMode::Http);
19
20    let mut passed = true;
21    let mut detail = "validated control query agent-routed flow".to_string();
22    let mut metrics = json!({});
23
24    let scenario = async {
25        let accepted = client
26            .control
27            .ingest(json!({
28                "run_id": run_id,
29                "agent_id": "rust-sdk-example",
30                "idempotency_key": format!("query-routed-{}", run_id),
31                "parallel": false,
32                "items": [
33                    {
34                        "item_id": "safety-incident-1",
35                        "content_type": "text",
36                        "text": "Safety briefing: forklift zone B had two near-miss events this week; camera review confirmed blind-spot congestion during shift handoff.",
37                        "payload_json": "",
38                        "hints_json": "{\"priority\":\"high\",\"signal\":\"safety\"}",
39                        "metadata_json": "{\"source\":\"warehouse-safety\",\"zone\":\"B\",\"period\":\"weekly\"}",
40                    },
41                    {
42                        "item_id": "product-telemetry-2",
43                        "content_type": "text",
44                        "text": "Product telemetry: onboarding completion rose from 61% to 74% after removing optional steps, but payment-link retries increased by 9%.",
45                        "payload_json": "",
46                        "hints_json": "{\"priority\":\"medium\",\"signal\":\"product-metrics\"}",
47                        "metadata_json": "{\"source\":\"product-analytics\",\"funnel\":\"onboarding\",\"metric_window\":\"7d\"}",
48                    },
49                    {
50                        "item_id": "support-escalation-3",
51                        "content_type": "text",
52                        "text": "Support escalation summary: three enterprise tickets cited delayed audit log exports; root cause traced to a throttling rule on shared workers.",
53                        "payload_json": "",
54                        "hints_json": "{\"priority\":\"high\",\"signal\":\"customer-impact\"}",
55                        "metadata_json": "{\"source\":\"support-ops\",\"segment\":\"enterprise\",\"topic\":\"audit-logs\"}",
56                    }
57                ],
58            }))
59            .await?;
60
61        let job_id = accepted
62            .get("job_id")
63            .or_else(|| accepted.get("jobId"))
64            .and_then(Value::as_str)
65            .ok_or_else(|| boxed_error(format!("ingest did not return job_id: {accepted}")))?
66            .to_string();
67
68        let mut done = false;
69        for _ in 0..60 {
70            let current = client
71                .control
72                .get_ingest_job(json!({ "run_id": run_id, "job_id": job_id }))
73                .await?;
74            if current
75                .get("done")
76                .and_then(Value::as_bool)
77                .unwrap_or(false)
78            {
79                done = true;
80                break;
81            }
82            sleep(Duration::from_millis(250)).await;
83        }
84        require(done, "ingest job did not complete before query")?;
85
86        let response = client
87            .control
88            .query(json!({
89                "run_id": run_id,
90                "query": "summarize the safety, product, and support signals captured for this run",
91                "schema": "",
92                "mode": "agent_routed",
93                "direct_lane": "semantic_search",
94                "include_linked_runs": false,
95                "limit": 5,
96                "embedding": [],
97            }))
98            .await?;
99
100        require(
101            response.get("mode").and_then(Value::as_str) == Some("agent_routed"),
102            format!("unexpected query mode: {response}"),
103        )?;
104        require(
105            response
106                .get("routing_summary")
107                .and_then(Value::as_str)
108                .map(|value| !value.trim().is_empty())
109                .unwrap_or(false),
110            format!("routing_summary should be non-empty: {response}"),
111        )?;
112
113        let consulted_runs = response
114            .get("consulted_runs")
115            .and_then(Value::as_array)
116            .ok_or_else(|| boxed_error(format!("consulted_runs must be array: {response}")))?;
117        require(
118            consulted_runs
119                .iter()
120                .filter_map(Value::as_str)
121                .any(|value| value == run_id || value.ends_with(&format!("::{run_id}"))),
122            format!("consulted_runs missing primary run_id: {response}"),
123        )?;
124
125        require(
126            response
127                .get("final_answer")
128                .and_then(Value::as_str)
129                .map(|value| !value.trim().is_empty())
130                .unwrap_or(false),
131            format!("final_answer should be non-empty: {response}"),
132        )?;
133
134        let evidence_count = response
135            .get("evidence")
136            .and_then(Value::as_array)
137            .map(std::vec::Vec::len)
138            .unwrap_or(0);
139
140        metrics = json!({
141            "run_id": run_id,
142            "job_id": job_id,
143            "mode": response.get("mode").cloned().unwrap_or(Value::Null),
144            "consulted_runs": consulted_runs,
145            "evidence_count": evidence_count,
146        });
147
148        Ok::<(), Box<dyn Error>>(())
149    }
150    .await;
151
152    if let Err(err) = scenario {
153        passed = false;
154        detail = err.to_string();
155    }
156
157    let cleanup_ok = cleanup_run(&client, &run_id).await;
158    if !cleanup_ok {
159        passed = false;
160        detail = format!("{detail} | cleanup failures");
161    }
162
163    print_summary(
164        name,
165        passed,
166        &detail,
167        &metrics,
168        started.elapsed().as_secs_f64(),
169        cleanup_ok,
170    );
171
172    if passed {
173        Ok(())
174    } else {
175        Err(boxed_error(detail))
176    }
177}