control_ingest_job_lifecycle/
control_ingest_job_lifecycle.rs1#[path = "support/utils.rs"]
2mod _utils;
3
4use _utils::{boxed_error, cleanup_run, create_client, new_run_id, print_summary, require};
5use mubit_sdk::TransportMode;
6use serde_json::{json, Value};
7use std::error::Error;
8use std::time::Instant;
9use tokio::time::{sleep, Duration};
10
11#[tokio::main(flavor = "current_thread")]
12async fn main() -> Result<(), Box<dyn Error>> {
13 let name = "control_ingest_job_lifecycle";
14 let started = Instant::now();
15 let client = create_client().await?;
16 let run_id = new_run_id("control_ingest_job_lifecycle");
17 client.set_run_id(Some(run_id.clone()));
18 client.set_transport(TransportMode::Http);
19
20 let mut passed = true;
21 let mut detail = "validated control ingest job lifecycle".to_string();
22 let mut metrics = json!({});
23
24 let scenario = async {
25 let accepted = client
26 .control
27 .ingest(json!({
28 "run_id": run_id,
29 "agent_id": "rust-sdk-example",
30 "idempotency_key": format!("ingest-job-{}", run_id),
31 "parallel": false,
32 "items": [
33 {
34 "item_id": "incident-latency-1",
35 "content_type": "text",
36 "text": "Incident SEV2-7314 at 2026-02-14T03:22:00Z: checkout API latency in us-east rose from 180ms to 2.4s after cache warmer v1.8.1 rollout; rollback restored baseline.",
37 "payload_json": "",
38 "hints_json": "{\"priority\":\"high\",\"signal\":\"performance\"}",
39 "metadata_json": "{\"source\":\"ops-incident\",\"region\":\"us-east\",\"service\":\"checkout-api\"}",
40 },
41 {
42 "item_id": "customer-advisory-2",
43 "content_type": "text",
44 "text": "Customer advisory draft: EU enterprise tenant requested SLA clarification for batch exports; legal referenced DPA section 4.2 and retention requirement of 30 days.",
45 "payload_json": "",
46 "hints_json": "{\"priority\":\"medium\",\"signal\":\"policy\"}",
47 "metadata_json": "{\"source\":\"customer-success\",\"region\":\"eu-central\",\"team\":\"legal-ops\"}",
48 },
49 {
50 "item_id": "cold-chain-telemetry-3",
51 "content_type": "text",
52 "text": "Warehouse telemetry snapshot: cold-chain lane C7 drifted from 2.1C to 6.4C for 14 minutes; sensor recalibrated and shipment placed on QA hold.",
53 "payload_json": "",
54 "hints_json": "{\"priority\":\"high\",\"signal\":\"logistics\"}",
55 "metadata_json": "{\"source\":\"iot-telemetry\",\"facility\":\"atl-03\",\"lane\":\"C7\"}",
56 }
57 ],
58 }))
59 .await?;
60
61 require(
62 accepted
63 .get("accepted")
64 .and_then(Value::as_bool)
65 .unwrap_or(false),
66 format!("ingest not accepted: {accepted}"),
67 )?;
68
69 let job_id = accepted
70 .get("job_id")
71 .or_else(|| accepted.get("jobId"))
72 .and_then(Value::as_str)
73 .ok_or_else(|| boxed_error(format!("ingest did not return job_id: {accepted}")))?
74 .to_string();
75
76 let mut job: Option<Value> = None;
77 for _ in 0..60 {
78 let current = client
79 .control
80 .get_ingest_job(json!({ "run_id": run_id, "job_id": job_id }))
81 .await?;
82 if current
83 .get("done")
84 .and_then(Value::as_bool)
85 .unwrap_or(false)
86 {
87 job = Some(current);
88 break;
89 }
90 sleep(Duration::from_millis(250)).await;
91 }
92
93 let job = job.ok_or_else(|| boxed_error("timeout waiting for ingest job completion"))?;
94
95 require(
96 job.get("done").and_then(Value::as_bool).unwrap_or(false),
97 format!("job should be done: {job}"),
98 )?;
99 require(
100 job.get("status").and_then(Value::as_str) == Some("completed"),
101 format!("job should be completed: {job}"),
102 )?;
103
104 let traces = job
105 .get("traces")
106 .and_then(Value::as_array)
107 .ok_or_else(|| boxed_error(format!("job traces missing: {job}")))?;
108 require(!traces.is_empty(), format!("job traces missing: {job}"))?;
109
110 let mut write_count = 0usize;
111 let mut successful_writes = 0usize;
112 for trace in traces {
113 let writes = trace
114 .get("writes")
115 .and_then(Value::as_array)
116 .cloned()
117 .unwrap_or_default();
118 write_count += writes.len();
119 successful_writes += writes
120 .iter()
121 .filter(|write| {
122 write
123 .get("success")
124 .and_then(Value::as_bool)
125 .unwrap_or(false)
126 })
127 .count();
128 }
129
130 require(
131 write_count > 0,
132 format!("expected non-empty ingestion writes: {job}"),
133 )?;
134 require(
135 successful_writes > 0,
136 format!("expected at least one successful write: {job}"),
137 )?;
138
139 metrics = json!({
140 "run_id": run_id,
141 "job_id": job_id,
142 "trace_count": traces.len(),
143 "write_count": write_count,
144 "successful_writes": successful_writes,
145 });
146
147 Ok::<(), Box<dyn Error>>(())
148 }
149 .await;
150
151 if let Err(err) = scenario {
152 passed = false;
153 detail = err.to_string();
154 }
155
156 let cleanup_ok = cleanup_run(&client, &run_id).await;
157 if !cleanup_ok {
158 passed = false;
159 detail = format!("{detail} | cleanup failures");
160 }
161
162 print_summary(
163 name,
164 passed,
165 &detail,
166 &metrics,
167 started.elapsed().as_secs_f64(),
168 cleanup_ok,
169 );
170
171 if passed {
172 Ok(())
173 } else {
174 Err(boxed_error(detail))
175 }
176}