Skip to main content

control_ingest_idempotency/
control_ingest_idempotency.rs

1#[path = "support/utils.rs"]
2mod _utils;
3
4use _utils::{boxed_error, cleanup_run, create_client, new_run_id, print_summary, require};
5use mubit_sdk::TransportMode;
6use serde_json::{json, Value};
7use std::error::Error;
8use std::time::Instant;
9
10#[tokio::main(flavor = "current_thread")]
11async fn main() -> Result<(), Box<dyn Error>> {
12    let name = "control_ingest_idempotency";
13    let started = Instant::now();
14    let client = create_client().await?;
15    let run_id = new_run_id("control_ingest_idempotency");
16    client.set_run_id(Some(run_id.clone()));
17    client.set_transport(TransportMode::Http);
18
19    let mut passed = true;
20    let mut detail = "validated control ingest idempotency".to_string();
21    let mut metrics = json!({});
22
23    let scenario = async {
24        let idempotency_key = format!("idempotency-{run_id}");
25
26        let first = client
27            .control
28            .ingest(json!({
29                "run_id": run_id,
30                "agent_id": "rust-sdk-example",
31                "idempotency_key": idempotency_key,
32                "parallel": false,
33                "items": [
34                    {
35                        "item_id": "release-risk-1",
36                        "content_type": "text",
37                        "text": "Release planning memo: mobile v5.12 RC is blocked by an Android 15 beta crash loop triggered during background sync with low-memory pressure.",
38                        "payload_json": "",
39                        "hints_json": "{\"priority\":\"high\",\"signal\":\"release-risk\"}",
40                        "metadata_json": "{\"source\":\"release-management\",\"platform\":\"android\",\"track\":\"v5.12\"}",
41                    },
42                    {
43                        "item_id": "finance-recon-2",
44                        "content_type": "text",
45                        "text": "Finance operations note: invoice batch B-4402 contained three duplicate vendor IDs; reconciliation script flagged records for manual approval.",
46                        "payload_json": "",
47                        "hints_json": "{\"priority\":\"medium\",\"signal\":\"billing\"}",
48                        "metadata_json": "{\"source\":\"finance-ops\",\"batch\":\"B-4402\",\"workflow\":\"reconciliation\"}",
49                    }
50                ],
51            }))
52            .await?;
53
54        let second = client
55            .control
56            .ingest(json!({
57                "run_id": run_id,
58                "agent_id": "rust-sdk-example",
59                "idempotency_key": idempotency_key,
60                "parallel": false,
61                "items": [{
62                    "item_id": "procurement-followup-3",
63                    "content_type": "text",
64                    "text": "Procurement update: alternate supplier onboarding for lithium cells was escalated after lead-time slipped from 11 to 19 days in Q1 forecasts.",
65                    "payload_json": "",
66                    "hints_json": "{\"priority\":\"medium\",\"signal\":\"supply-chain\"}",
67                    "metadata_json": "{\"source\":\"procurement\",\"component\":\"lithium-cell\",\"quarter\":\"Q1\"}",
68                }],
69            }))
70            .await?;
71
72        let first_job_id = first
73            .get("job_id")
74            .or_else(|| first.get("jobId"))
75            .and_then(Value::as_str)
76            .ok_or_else(|| boxed_error(format!("first ingest missing job_id: {first}")))?;
77        let second_job_id = second
78            .get("job_id")
79            .or_else(|| second.get("jobId"))
80            .and_then(Value::as_str)
81            .ok_or_else(|| boxed_error(format!("second ingest missing job_id: {second}")))?;
82
83        require(
84            first_job_id == second_job_id,
85            format!("idempotency should reuse job_id: {first} vs {second}"),
86        )?;
87        require(
88            second
89                .get("deduplicated")
90                .and_then(Value::as_bool)
91                .unwrap_or(false),
92            format!("second ingest should be deduplicated: {second}"),
93        )?;
94
95        metrics = json!({
96            "run_id": run_id,
97            "job_id": first_job_id,
98            "second_deduplicated": second.get("deduplicated").cloned().unwrap_or(Value::Bool(false)),
99        });
100
101        Ok::<(), Box<dyn Error>>(())
102    }
103    .await;
104
105    if let Err(err) = scenario {
106        passed = false;
107        detail = err.to_string();
108    }
109
110    let cleanup_ok = cleanup_run(&client, &run_id).await;
111    if !cleanup_ok {
112        passed = false;
113        detail = format!("{detail} | cleanup failures");
114    }
115
116    print_summary(
117        name,
118        passed,
119        &detail,
120        &metrics,
121        started.elapsed().as_secs_f64(),
122        cleanup_ok,
123    );
124
125    if passed {
126        Ok(())
127    } else {
128        Err(boxed_error(detail))
129    }
130}