Skip to main content

car_multi/
task_context.rs

1//! Task-local agent context using tokio::task_local!
2//!
3//! Provides per-agent isolation without full Runtime overhead.
4//! Each spawned agent task gets its own state overlay and event log
5//! while reading through to the parent's shared state.
6
7use car_eventlog::EventLog;
8use car_state::StateStore;
9use std::sync::Arc;
10use tokio::sync::Mutex as TokioMutex;
11
12tokio::task_local! {
13    /// Per-task agent context. Set via `TaskScope::run()`.
14    static AGENT_CTX: AgentContext;
15}
16
17/// Lightweight per-agent context stored in tokio task-local storage.
18/// Provides an isolated state overlay and event log for each agent.
19#[derive(Clone)]
20pub struct AgentContext {
21    /// Agent name for this task.
22    pub agent_name: String,
23    /// Per-agent state overlay — writes go here, reads fall through to parent.
24    pub local_state: Arc<StateStore>,
25    /// Per-agent event log — isolated from other agents.
26    pub local_log: Arc<TokioMutex<EventLog>>,
27    /// Parent shared state — reads fall through here when local misses.
28    parent_state: Arc<StateStore>,
29}
30
31impl AgentContext {
32    pub fn new(agent_name: &str, parent_state: Arc<StateStore>) -> Self {
33        Self {
34            agent_name: agent_name.to_string(),
35            local_state: Arc::new(StateStore::new()),
36            local_log: Arc::new(TokioMutex::new(EventLog::new())),
37            parent_state,
38        }
39    }
40
41    /// Read a key — checks local overlay first, then parent.
42    pub fn get(&self, key: &str) -> Option<serde_json::Value> {
43        self.local_state
44            .get(key)
45            .or_else(|| self.parent_state.get(key))
46    }
47
48    /// Write a key — always writes to the local overlay.
49    pub fn set(&self, key: &str, value: serde_json::Value) {
50        self.local_state.set(key, value, &self.agent_name);
51    }
52
53    /// Merge local state changes back into the parent.
54    pub fn merge_to_parent(&self) {
55        for key in self.local_state.keys() {
56            if let Some(value) = self.local_state.get(&key) {
57                self.parent_state.set(&key, value, &self.agent_name);
58            }
59        }
60    }
61}
62
63/// Scoped execution with task-local agent context.
64pub struct TaskScope;
65
66impl TaskScope {
67    /// Run a future with the given agent context set as task-local.
68    pub async fn run<F, T>(ctx: AgentContext, f: F) -> T
69    where
70        F: std::future::Future<Output = T>,
71    {
72        AGENT_CTX.scope(ctx, f).await
73    }
74
75    /// Try to access the current task's agent context.
76    /// Returns None if not running inside a TaskScope.
77    pub fn try_with<F, R>(f: F) -> Option<R>
78    where
79        F: FnOnce(&AgentContext) -> R,
80    {
81        AGENT_CTX.try_with(f).ok()
82    }
83
84    /// Get the current agent name, if inside a task scope.
85    pub fn agent_name() -> Option<String> {
86        Self::try_with(|ctx| ctx.agent_name.clone())
87    }
88}
89
90#[cfg(test)]
91mod tests {
92    use super::*;
93
94    #[tokio::test]
95    async fn test_agent_context_isolation() {
96        let parent = Arc::new(StateStore::new());
97        parent.set("shared_key", serde_json::json!("parent_value"), "test");
98
99        let ctx = AgentContext::new("agent_a", Arc::clone(&parent));
100
101        // Reads fall through to parent
102        assert_eq!(
103            ctx.get("shared_key"),
104            Some(serde_json::json!("parent_value"))
105        );
106
107        // Writes go to local overlay
108        ctx.set("local_key", serde_json::json!("local_value"));
109        assert_eq!(ctx.get("local_key"), Some(serde_json::json!("local_value")));
110        assert!(parent.get("local_key").is_none());
111
112        // Local overlay shadows parent
113        ctx.set("shared_key", serde_json::json!("overridden"));
114        assert_eq!(ctx.get("shared_key"), Some(serde_json::json!("overridden")));
115        assert_eq!(
116            parent.get("shared_key"),
117            Some(serde_json::json!("parent_value"))
118        );
119
120        // Merge propagates local writes to parent
121        ctx.merge_to_parent();
122        assert_eq!(
123            parent.get("local_key"),
124            Some(serde_json::json!("local_value"))
125        );
126        assert_eq!(
127            parent.get("shared_key"),
128            Some(serde_json::json!("overridden"))
129        );
130    }
131
132    #[tokio::test]
133    async fn test_task_scope() {
134        let parent = Arc::new(StateStore::new());
135        let ctx = AgentContext::new("scoped_agent", Arc::clone(&parent));
136
137        // Outside scope, try_with returns None
138        assert!(TaskScope::agent_name().is_none());
139
140        // Inside scope, context is accessible
141        let name = TaskScope::run(ctx, async { TaskScope::agent_name().unwrap() }).await;
142        assert_eq!(name, "scoped_agent");
143    }
144
145    #[tokio::test]
146    async fn test_parallel_isolation() {
147        let parent = Arc::new(StateStore::new());
148        parent.set("counter", serde_json::json!(0), "init");
149
150        let ctx_a = AgentContext::new("agent_a", Arc::clone(&parent));
151        let ctx_b = AgentContext::new("agent_b", Arc::clone(&parent));
152
153        let handle_a = tokio::spawn({
154            let ctx = ctx_a.clone();
155            async move {
156                TaskScope::run(ctx.clone(), async {
157                    ctx.set("counter", serde_json::json!(1));
158                    ctx.set("agent_a_only", serde_json::json!("a_data"));
159                })
160                .await;
161                ctx
162            }
163        });
164
165        let handle_b = tokio::spawn({
166            let ctx = ctx_b.clone();
167            async move {
168                TaskScope::run(ctx.clone(), async {
169                    ctx.set("counter", serde_json::json!(2));
170                    ctx.set("agent_b_only", serde_json::json!("b_data"));
171                })
172                .await;
173                ctx
174            }
175        });
176
177        let ctx_a = handle_a.await.unwrap();
178        let ctx_b = handle_b.await.unwrap();
179
180        // Each agent sees its own counter value
181        assert_eq!(ctx_a.get("counter"), Some(serde_json::json!(1)));
182        assert_eq!(ctx_b.get("counter"), Some(serde_json::json!(2)));
183
184        // Parent unchanged until merge
185        assert_eq!(parent.get("counter"), Some(serde_json::json!(0)));
186
187        // Merge both — last merge wins for shared keys
188        ctx_a.merge_to_parent();
189        ctx_b.merge_to_parent();
190        assert_eq!(
191            parent.get("agent_a_only"),
192            Some(serde_json::json!("a_data"))
193        );
194        assert_eq!(
195            parent.get("agent_b_only"),
196            Some(serde_json::json!("b_data"))
197        );
198    }
199}