Skip to main content

internal_control_ingest_batch_job_lifecycle/
control_ingest_batch_job_lifecycle.rs

1#[path = "../support/utils.rs"]
2mod _utils;
3
4use _utils::{boxed_error, cleanup_run, create_client, new_run_id, print_summary, require};
5use mubit_sdk::TransportMode;
6use serde_json::{json, Value};
7use std::error::Error;
8use std::time::Instant;
9use tokio::time::{sleep, Duration};
10
11#[tokio::main(flavor = "current_thread")]
12async fn main() -> Result<(), Box<dyn Error>> {
13    let name = "internal_control_ingest_batch_job_lifecycle";
14    let started = Instant::now();
15    let client = create_client().await?;
16    let run_id = new_run_id("internal_control_ingest_batch");
17    client.set_run_id(Some(run_id.clone()));
18    client.set_transport(TransportMode::Http);
19    let mut passed = true;
20    let mut detail = "validated raw control ingest and batch lifecycle".to_string();
21    let mut metrics = json!({});
22
23    let scenario = async {
24        let accepted = client.control.ingest(json!({
25            "run_id": run_id,
26            "agent_id": "rust-sdk-example",
27            "parallel": false,
28            "items": [{
29                "item_id": "incident-1",
30                "content_type": "text",
31                "text": "Incident SEV2-7314: cache warmer rollback restored baseline latency.",
32                "payload_json": "",
33                "hints_json": "{\"priority\":\"high\"}",
34                "metadata_json": "{\"source\":\"ops-incident\"}"
35            }]
36        })).await?;
37        let job_id = accepted.get("job_id").and_then(Value::as_str).ok_or_else(|| boxed_error(format!("ingest did not return job_id: {accepted}")))?.to_string();
38
39        let mut job: Option<Value> = None;
40        for _ in 0..60 {
41            let current = client.control.get_ingest_job(json!({"run_id": run_id, "job_id": job_id})).await?;
42            if current.get("done").and_then(Value::as_bool).unwrap_or(false) { job = Some(current); break; }
43            sleep(Duration::from_millis(250)).await;
44        }
45        let job = job.ok_or_else(|| boxed_error("timeout waiting for ingest job completion"))?;
46
47        let batch = client.control.batch_insert(json!({
48            "run_id": run_id,
49            "deduplicate": true,
50            "items": [
51                {"item_id": "batch-1", "text": "Release checkpoint reduced p95 latency.", "metadata_json": "{\"source\":\"release\"}", "source": "rust-sdk-batch", "embedding": []},
52                {"item_id": "batch-2", "text": "Warehouse QA hold can be released after clean cycles.", "metadata_json": "{\"source\":\"warehouse\"}", "source": "rust-sdk-batch", "embedding": []}
53            ]
54        })).await?;
55
56        require(job.get("status").and_then(Value::as_str) == Some("completed"), format!("job should be completed: {job}"))?;
57        require(batch.get("count").and_then(Value::as_u64).unwrap_or(0) >= 2, format!("expected batch inserts: {batch}"))?;
58
59        metrics = json!({"run_id": run_id, "job_id": job_id, "batch_count": batch.get("count").cloned().unwrap_or(Value::Null)});
60        Ok::<(), Box<dyn Error>>(())
61    }.await;
62
63    if let Err(err) = scenario { passed = false; detail = err.to_string(); }
64    let cleanup_ok = cleanup_run(&client, &run_id).await;
65    if !cleanup_ok { passed = false; detail = format!("{detail} | cleanup failures"); }
66    print_summary(name, passed, &detail, &metrics, started.elapsed().as_secs_f64(), cleanup_ok);
67    if passed { Ok(()) } else { Err(boxed_error(detail)) }
68}