Skip to main content

public_09_user_scoping_and_isolation/
09_user_scoping_and_isolation.rs

1#[path = "../support/utils.rs"]
2mod _utils;
3
4use _utils::{boxed_error, cleanup_run, create_client, new_run_id, print_summary, require};
5use mubit_sdk::{RecallOptions, RememberOptions, TransportMode};
6use serde_json::json;
7use std::error::Error;
8use std::time::Instant;
9use tokio::time::{sleep, Duration};
10
11#[tokio::main(flavor = "current_thread")]
12async fn main() -> Result<(), Box<dyn Error>> {
13    let name = "public_09_user_scoping_and_isolation";
14    let started = Instant::now();
15    let client = create_client().await?;
16    let user_a = "sdk-user-a".to_string();
17    let user_b = "sdk-user-b".to_string();
18    let seed_run = new_run_id("public_user_scope_seed");
19    let allowed_run = new_run_id("public_user_scope_allowed");
20    let blocked_run = new_run_id("public_user_scope_blocked");
21    client.set_transport(TransportMode::Http);
22    client.set_run_id(Some(seed_run.clone()));
23
24    let mut passed = true;
25    let mut detail = "validated user scoping and same-user cross-run reuse".to_string();
26    let mut metrics = json!({});
27
28    let scenario = async {
29        let mut lesson = RememberOptions::new("When a claim is missing a city, check the original intake note before escalating.");
30        lesson.run_id = Some(seed_run.clone());
31        lesson.user_id = Some(user_a.clone());
32        lesson.agent_id = Some("planner".into());
33        lesson.intent = Some("lesson".into());
34        lesson.lesson_type = Some("success".into());
35        lesson.lesson_scope = Some("global".into());
36        lesson.lesson_importance = Some("high".into());
37        lesson.metadata = Some(json!({"source": "public-example", "family": "user-scope"}));
38        let lesson_write = client.remember(lesson).await?;
39
40        let mut lessons = serde_json::Value::Null;
41        for _ in 0..24 {
42            let current = client
43                .control
44                .lessons(json!({"run_id": seed_run, "user_id": user_a, "limit": 10}))
45                .await?;
46            let has_lesson = current
47                .get("lessons")
48                .and_then(|v| v.as_array())
49                .map(|items| !items.is_empty())
50                .unwrap_or(false);
51            lessons = current;
52            if has_lesson {
53                break;
54            }
55            sleep(Duration::from_millis(500)).await;
56        }
57        let lesson_id = lessons.get("lessons").and_then(|v| v.as_array()).and_then(|items| items.iter().find_map(|item| item.get("id").and_then(|v| v.as_str()))).ok_or_else(|| boxed_error(format!("expected lesson for isolation scenario: {lessons}")))?.to_string();
58
59        let mut allowed = RecallOptions::new(
60            "A claim is missing a city. Before escalating, what note should I check?",
61        );
62        allowed.run_id = Some(allowed_run.clone());
63        allowed.user_id = Some(user_a.clone());
64        allowed.entry_types = vec!["lesson".into()];
65        allowed.limit = 10;
66        let allowed_response = client.recall(allowed).await?;
67
68        let mut blocked = RecallOptions::new(
69            "A claim is missing a city. Before escalating, what note should I check?",
70        );
71        blocked.run_id = Some(blocked_run.clone());
72        blocked.user_id = Some(user_b.clone());
73        blocked.entry_types = vec!["lesson".into()];
74        blocked.limit = 10;
75        let blocked_response = client.recall(blocked).await?;
76
77        let allowed_text = format!("{} {}",
78            allowed_response.get("final_answer").and_then(|v| v.as_str()).unwrap_or(""),
79            allowed_response.get("evidence").and_then(|v| v.as_array()).map(|items| items.iter().filter_map(|item| item.get("content").and_then(|v| v.as_str())).collect::<Vec<_>>().join(" ")).unwrap_or_default()
80        ).to_lowercase();
81        let blocked_text = format!("{} {}",
82            blocked_response.get("final_answer").and_then(|v| v.as_str()).unwrap_or(""),
83            blocked_response.get("evidence").and_then(|v| v.as_array()).map(|items| items.iter().filter_map(|item| item.get("content").and_then(|v| v.as_str())).collect::<Vec<_>>().join(" ")).unwrap_or_default()
84        ).to_lowercase();
85
86        require(lesson_write.get("accepted").and_then(|v| v.as_bool()).unwrap_or(false) || lesson_write.get("job_id").is_some(), format!("lesson write failed: {lesson_write}"))?;
87        require(allowed_text.contains("original intake note"), format!("same-user cross-run lesson reuse failed: {allowed_response}"))?;
88        require(!blocked_text.contains("original intake note"), format!("cross-user leak detected: {blocked_response}"))?;
89
90        metrics = json!({
91            "seed_run": seed_run,
92            "allowed_run": allowed_run,
93            "blocked_run": blocked_run,
94            "lesson_id": lesson_id,
95            "allowed_evidence_count": allowed_response.get("evidence").and_then(|v| v.as_array()).map(|items| items.len()).unwrap_or(0),
96            "blocked_evidence_count": blocked_response.get("evidence").and_then(|v| v.as_array()).map(|items| items.len()).unwrap_or(0),
97        });
98        Ok::<(), Box<dyn Error>>(())
99    }
100    .await;
101
102    if let Err(err) = scenario {
103        passed = false;
104        detail = err.to_string();
105    }
106
107    let mut cleanup_ok = true;
108    for run_id in [&seed_run, &allowed_run, &blocked_run] {
109        cleanup_ok = cleanup_ok && cleanup_run(&client, run_id).await;
110    }
111    if !cleanup_ok {
112        passed = false;
113        detail = format!("{detail} | cleanup failures");
114    }
115
116    print_summary(name, passed, &detail, &metrics, started.elapsed().as_secs_f64(), cleanup_ok);
117    if passed { Ok(()) } else { Err(boxed_error(detail)) }
118}