car_multi/
task_context.rs1use car_eventlog::EventLog;
8use car_state::StateStore;
9use std::sync::Arc;
10use tokio::sync::Mutex as TokioMutex;
11
12tokio::task_local! {
13 static AGENT_CTX: AgentContext;
15}
16
17#[derive(Clone)]
20pub struct AgentContext {
21 pub agent_name: String,
23 pub local_state: Arc<StateStore>,
25 pub local_log: Arc<TokioMutex<EventLog>>,
27 parent_state: Arc<StateStore>,
29}
30
31impl AgentContext {
32 pub fn new(agent_name: &str, parent_state: Arc<StateStore>) -> Self {
33 Self {
34 agent_name: agent_name.to_string(),
35 local_state: Arc::new(StateStore::new()),
36 local_log: Arc::new(TokioMutex::new(EventLog::new())),
37 parent_state,
38 }
39 }
40
41 pub fn get(&self, key: &str) -> Option<serde_json::Value> {
43 self.local_state
44 .get(key)
45 .or_else(|| self.parent_state.get(key))
46 }
47
48 pub fn set(&self, key: &str, value: serde_json::Value) {
50 self.local_state.set(key, value, &self.agent_name);
51 }
52
53 pub fn merge_to_parent(&self) {
55 for key in self.local_state.keys() {
56 if let Some(value) = self.local_state.get(&key) {
57 self.parent_state.set(&key, value, &self.agent_name);
58 }
59 }
60 }
61}
62
63pub struct TaskScope;
65
66impl TaskScope {
67 pub async fn run<F, T>(ctx: AgentContext, f: F) -> T
69 where
70 F: std::future::Future<Output = T>,
71 {
72 AGENT_CTX.scope(ctx, f).await
73 }
74
75 pub fn try_with<F, R>(f: F) -> Option<R>
78 where
79 F: FnOnce(&AgentContext) -> R,
80 {
81 AGENT_CTX.try_with(f).ok()
82 }
83
84 pub fn agent_name() -> Option<String> {
86 Self::try_with(|ctx| ctx.agent_name.clone())
87 }
88}
89
90#[cfg(test)]
91mod tests {
92 use super::*;
93
94 #[tokio::test]
95 async fn test_agent_context_isolation() {
96 let parent = Arc::new(StateStore::new());
97 parent.set("shared_key", serde_json::json!("parent_value"), "test");
98
99 let ctx = AgentContext::new("agent_a", Arc::clone(&parent));
100
101 assert_eq!(
103 ctx.get("shared_key"),
104 Some(serde_json::json!("parent_value"))
105 );
106
107 ctx.set("local_key", serde_json::json!("local_value"));
109 assert_eq!(ctx.get("local_key"), Some(serde_json::json!("local_value")));
110 assert!(parent.get("local_key").is_none());
111
112 ctx.set("shared_key", serde_json::json!("overridden"));
114 assert_eq!(ctx.get("shared_key"), Some(serde_json::json!("overridden")));
115 assert_eq!(
116 parent.get("shared_key"),
117 Some(serde_json::json!("parent_value"))
118 );
119
120 ctx.merge_to_parent();
122 assert_eq!(
123 parent.get("local_key"),
124 Some(serde_json::json!("local_value"))
125 );
126 assert_eq!(
127 parent.get("shared_key"),
128 Some(serde_json::json!("overridden"))
129 );
130 }
131
132 #[tokio::test]
133 async fn test_task_scope() {
134 let parent = Arc::new(StateStore::new());
135 let ctx = AgentContext::new("scoped_agent", Arc::clone(&parent));
136
137 assert!(TaskScope::agent_name().is_none());
139
140 let name = TaskScope::run(ctx, async { TaskScope::agent_name().unwrap() }).await;
142 assert_eq!(name, "scoped_agent");
143 }
144
145 #[tokio::test]
146 async fn test_parallel_isolation() {
147 let parent = Arc::new(StateStore::new());
148 parent.set("counter", serde_json::json!(0), "init");
149
150 let ctx_a = AgentContext::new("agent_a", Arc::clone(&parent));
151 let ctx_b = AgentContext::new("agent_b", Arc::clone(&parent));
152
153 let handle_a = tokio::spawn({
154 let ctx = ctx_a.clone();
155 async move {
156 TaskScope::run(ctx.clone(), async {
157 ctx.set("counter", serde_json::json!(1));
158 ctx.set("agent_a_only", serde_json::json!("a_data"));
159 })
160 .await;
161 ctx
162 }
163 });
164
165 let handle_b = tokio::spawn({
166 let ctx = ctx_b.clone();
167 async move {
168 TaskScope::run(ctx.clone(), async {
169 ctx.set("counter", serde_json::json!(2));
170 ctx.set("agent_b_only", serde_json::json!("b_data"));
171 })
172 .await;
173 ctx
174 }
175 });
176
177 let ctx_a = handle_a.await.unwrap();
178 let ctx_b = handle_b.await.unwrap();
179
180 assert_eq!(ctx_a.get("counter"), Some(serde_json::json!(1)));
182 assert_eq!(ctx_b.get("counter"), Some(serde_json::json!(2)));
183
184 assert_eq!(parent.get("counter"), Some(serde_json::json!(0)));
186
187 ctx_a.merge_to_parent();
189 ctx_b.merge_to_parent();
190 assert_eq!(
191 parent.get("agent_a_only"),
192 Some(serde_json::json!("a_data"))
193 );
194 assert_eq!(
195 parent.get("agent_b_only"),
196 Some(serde_json::json!("b_data"))
197 );
198 }
199}