Skip to main content

control_ingest_job_lifecycle/
control_ingest_job_lifecycle.rs

1#[path = "support/utils.rs"]
2mod _utils;
3
4use _utils::{boxed_error, cleanup_run, create_client, new_run_id, print_summary, require};
5use mubit_sdk::TransportMode;
6use serde_json::{json, Value};
7use std::error::Error;
8use std::time::Instant;
9use tokio::time::{sleep, Duration};
10
11#[tokio::main(flavor = "current_thread")]
12async fn main() -> Result<(), Box<dyn Error>> {
13    let name = "control_ingest_job_lifecycle";
14    let started = Instant::now();
15    let client = create_client().await?;
16    let run_id = new_run_id("control_ingest_job_lifecycle");
17    client.set_run_id(Some(run_id.clone()));
18    client.set_transport(TransportMode::Http);
19
20    let mut passed = true;
21    let mut detail = "validated control ingest job lifecycle".to_string();
22    let mut metrics = json!({});
23
24    let scenario = async {
25        let accepted = client
26            .control
27            .ingest(json!({
28                "run_id": run_id,
29                "agent_id": "rust-sdk-example",
30                "idempotency_key": format!("ingest-job-{}", run_id),
31                "parallel": false,
32                "items": [
33                    {
34                        "item_id": "incident-latency-1",
35                        "content_type": "text",
36                        "text": "Incident SEV2-7314 at 2026-02-14T03:22:00Z: checkout API latency in us-east rose from 180ms to 2.4s after cache warmer v1.8.1 rollout; rollback restored baseline.",
37                        "payload_json": "",
38                        "hints_json": "{\"priority\":\"high\",\"signal\":\"performance\"}",
39                        "metadata_json": "{\"source\":\"ops-incident\",\"region\":\"us-east\",\"service\":\"checkout-api\"}",
40                    },
41                    {
42                        "item_id": "customer-advisory-2",
43                        "content_type": "text",
44                        "text": "Customer advisory draft: EU enterprise tenant requested SLA clarification for batch exports; legal referenced DPA section 4.2 and retention requirement of 30 days.",
45                        "payload_json": "",
46                        "hints_json": "{\"priority\":\"medium\",\"signal\":\"policy\"}",
47                        "metadata_json": "{\"source\":\"customer-success\",\"region\":\"eu-central\",\"team\":\"legal-ops\"}",
48                    },
49                    {
50                        "item_id": "cold-chain-telemetry-3",
51                        "content_type": "text",
52                        "text": "Warehouse telemetry snapshot: cold-chain lane C7 drifted from 2.1C to 6.4C for 14 minutes; sensor recalibrated and shipment placed on QA hold.",
53                        "payload_json": "",
54                        "hints_json": "{\"priority\":\"high\",\"signal\":\"logistics\"}",
55                        "metadata_json": "{\"source\":\"iot-telemetry\",\"facility\":\"atl-03\",\"lane\":\"C7\"}",
56                    }
57                ],
58            }))
59            .await?;
60
61        require(
62            accepted
63                .get("accepted")
64                .and_then(Value::as_bool)
65                .unwrap_or(false),
66            format!("ingest not accepted: {accepted}"),
67        )?;
68
69        let job_id = accepted
70            .get("job_id")
71            .or_else(|| accepted.get("jobId"))
72            .and_then(Value::as_str)
73            .ok_or_else(|| boxed_error(format!("ingest did not return job_id: {accepted}")))?
74            .to_string();
75
76        let mut job: Option<Value> = None;
77        for _ in 0..60 {
78            let current = client
79                .control
80                .get_ingest_job(json!({ "run_id": run_id, "job_id": job_id }))
81                .await?;
82            if current
83                .get("done")
84                .and_then(Value::as_bool)
85                .unwrap_or(false)
86            {
87                job = Some(current);
88                break;
89            }
90            sleep(Duration::from_millis(250)).await;
91        }
92
93        let job = job.ok_or_else(|| boxed_error("timeout waiting for ingest job completion"))?;
94
95        require(
96            job.get("done").and_then(Value::as_bool).unwrap_or(false),
97            format!("job should be done: {job}"),
98        )?;
99        require(
100            job.get("status").and_then(Value::as_str) == Some("completed"),
101            format!("job should be completed: {job}"),
102        )?;
103
104        let traces = job
105            .get("traces")
106            .and_then(Value::as_array)
107            .ok_or_else(|| boxed_error(format!("job traces missing: {job}")))?;
108        require(!traces.is_empty(), format!("job traces missing: {job}"))?;
109
110        let mut write_count = 0usize;
111        let mut successful_writes = 0usize;
112        for trace in traces {
113            let writes = trace
114                .get("writes")
115                .and_then(Value::as_array)
116                .cloned()
117                .unwrap_or_default();
118            write_count += writes.len();
119            successful_writes += writes
120                .iter()
121                .filter(|write| {
122                    write
123                        .get("success")
124                        .and_then(Value::as_bool)
125                        .unwrap_or(false)
126                })
127                .count();
128        }
129
130        require(
131            write_count > 0,
132            format!("expected non-empty ingestion writes: {job}"),
133        )?;
134        require(
135            successful_writes > 0,
136            format!("expected at least one successful write: {job}"),
137        )?;
138
139        metrics = json!({
140            "run_id": run_id,
141            "job_id": job_id,
142            "trace_count": traces.len(),
143            "write_count": write_count,
144            "successful_writes": successful_writes,
145        });
146
147        Ok::<(), Box<dyn Error>>(())
148    }
149    .await;
150
151    if let Err(err) = scenario {
152        passed = false;
153        detail = err.to_string();
154    }
155
156    let cleanup_ok = cleanup_run(&client, &run_id).await;
157    if !cleanup_ok {
158        passed = false;
159        detail = format!("{detail} | cleanup failures");
160    }
161
162    print_summary(
163        name,
164        passed,
165        &detail,
166        &metrics,
167        started.elapsed().as_secs_f64(),
168        cleanup_ok,
169    );
170
171    if passed {
172        Ok(())
173    } else {
174        Err(boxed_error(detail))
175    }
176}