car_multi/
task_context.rs1use car_eventlog::EventLog;
8use car_state::StateStore;
9use std::sync::Arc;
10use tokio::sync::Mutex as TokioMutex;
11
12tokio::task_local! {
13 static AGENT_CTX: AgentContext;
15}
16
17#[derive(Clone)]
20pub struct AgentContext {
21 pub agent_name: String,
23 pub local_state: Arc<StateStore>,
25 pub local_log: Arc<TokioMutex<EventLog>>,
27 parent_state: Arc<StateStore>,
29}
30
31impl AgentContext {
32 pub fn new(agent_name: &str, parent_state: Arc<StateStore>) -> Self {
33 Self {
34 agent_name: agent_name.to_string(),
35 local_state: Arc::new(StateStore::new()),
36 local_log: Arc::new(TokioMutex::new(EventLog::new())),
37 parent_state,
38 }
39 }
40
41 pub fn get(&self, key: &str) -> Option<serde_json::Value> {
43 self.local_state
44 .get(key)
45 .or_else(|| self.parent_state.get(key))
46 }
47
48 pub fn set(&self, key: &str, value: serde_json::Value) {
50 self.local_state.set(key, value, &self.agent_name);
51 }
52
53 pub fn merge_to_parent(&self) {
55 for key in self.local_state.keys() {
56 if let Some(value) = self.local_state.get(&key) {
57 self.parent_state.set(&key, value, &self.agent_name);
58 }
59 }
60 }
61}
62
63pub struct TaskScope;
65
66impl TaskScope {
67 pub async fn run<F, T>(ctx: AgentContext, f: F) -> T
69 where
70 F: std::future::Future<Output = T>,
71 {
72 AGENT_CTX.scope(ctx, f).await
73 }
74
75 pub fn try_with<F, R>(f: F) -> Option<R>
78 where
79 F: FnOnce(&AgentContext) -> R,
80 {
81 AGENT_CTX.try_with(f).ok()
82 }
83
84 pub fn agent_name() -> Option<String> {
86 Self::try_with(|ctx| ctx.agent_name.clone())
87 }
88}
89
90#[cfg(test)]
91mod tests {
92 use super::*;
93
94 #[tokio::test]
95 async fn test_agent_context_isolation() {
96 let parent = Arc::new(StateStore::new());
97 parent.set("shared_key", serde_json::json!("parent_value"), "test");
98
99 let ctx = AgentContext::new("agent_a", Arc::clone(&parent));
100
101 assert_eq!(ctx.get("shared_key"), Some(serde_json::json!("parent_value")));
103
104 ctx.set("local_key", serde_json::json!("local_value"));
106 assert_eq!(ctx.get("local_key"), Some(serde_json::json!("local_value")));
107 assert!(parent.get("local_key").is_none());
108
109 ctx.set("shared_key", serde_json::json!("overridden"));
111 assert_eq!(ctx.get("shared_key"), Some(serde_json::json!("overridden")));
112 assert_eq!(parent.get("shared_key"), Some(serde_json::json!("parent_value")));
113
114 ctx.merge_to_parent();
116 assert_eq!(parent.get("local_key"), Some(serde_json::json!("local_value")));
117 assert_eq!(parent.get("shared_key"), Some(serde_json::json!("overridden")));
118 }
119
120 #[tokio::test]
121 async fn test_task_scope() {
122 let parent = Arc::new(StateStore::new());
123 let ctx = AgentContext::new("scoped_agent", Arc::clone(&parent));
124
125 assert!(TaskScope::agent_name().is_none());
127
128 let name = TaskScope::run(ctx, async {
130 TaskScope::agent_name().unwrap()
131 })
132 .await;
133 assert_eq!(name, "scoped_agent");
134 }
135
136 #[tokio::test]
137 async fn test_parallel_isolation() {
138 let parent = Arc::new(StateStore::new());
139 parent.set("counter", serde_json::json!(0), "init");
140
141 let ctx_a = AgentContext::new("agent_a", Arc::clone(&parent));
142 let ctx_b = AgentContext::new("agent_b", Arc::clone(&parent));
143
144 let handle_a = tokio::spawn({
145 let ctx = ctx_a.clone();
146 async move {
147 TaskScope::run(ctx.clone(), async {
148 ctx.set("counter", serde_json::json!(1));
149 ctx.set("agent_a_only", serde_json::json!("a_data"));
150 })
151 .await;
152 ctx
153 }
154 });
155
156 let handle_b = tokio::spawn({
157 let ctx = ctx_b.clone();
158 async move {
159 TaskScope::run(ctx.clone(), async {
160 ctx.set("counter", serde_json::json!(2));
161 ctx.set("agent_b_only", serde_json::json!("b_data"));
162 })
163 .await;
164 ctx
165 }
166 });
167
168 let ctx_a = handle_a.await.unwrap();
169 let ctx_b = handle_b.await.unwrap();
170
171 assert_eq!(ctx_a.get("counter"), Some(serde_json::json!(1)));
173 assert_eq!(ctx_b.get("counter"), Some(serde_json::json!(2)));
174
175 assert_eq!(parent.get("counter"), Some(serde_json::json!(0)));
177
178 ctx_a.merge_to_parent();
180 ctx_b.merge_to_parent();
181 assert_eq!(parent.get("agent_a_only"), Some(serde_json::json!("a_data")));
182 assert_eq!(parent.get("agent_b_only"), Some(serde_json::json!("b_data")));
183 }
184}