Skip to main content

car_multi/
task_context.rs

1//! Task-local agent context using tokio::task_local!
2//!
3//! Provides per-agent isolation without full Runtime overhead.
4//! Each spawned agent task gets its own state overlay and event log
5//! while reading through to the parent's shared state.
6
7use car_eventlog::EventLog;
8use car_state::StateStore;
9use std::sync::Arc;
10use tokio::sync::Mutex as TokioMutex;
11
12tokio::task_local! {
13    /// Per-task agent context. Set via `TaskScope::run()`.
14    static AGENT_CTX: AgentContext;
15}
16
17/// Lightweight per-agent context stored in tokio task-local storage.
18/// Provides an isolated state overlay and event log for each agent.
19#[derive(Clone)]
20pub struct AgentContext {
21    /// Agent name for this task.
22    pub agent_name: String,
23    /// Per-agent state overlay — writes go here, reads fall through to parent.
24    pub local_state: Arc<StateStore>,
25    /// Per-agent event log — isolated from other agents.
26    pub local_log: Arc<TokioMutex<EventLog>>,
27    /// Parent shared state — reads fall through here when local misses.
28    parent_state: Arc<StateStore>,
29}
30
31impl AgentContext {
32    pub fn new(agent_name: &str, parent_state: Arc<StateStore>) -> Self {
33        Self {
34            agent_name: agent_name.to_string(),
35            local_state: Arc::new(StateStore::new()),
36            local_log: Arc::new(TokioMutex::new(EventLog::new())),
37            parent_state,
38        }
39    }
40
41    /// Read a key — checks local overlay first, then parent.
42    pub fn get(&self, key: &str) -> Option<serde_json::Value> {
43        self.local_state
44            .get(key)
45            .or_else(|| self.parent_state.get(key))
46    }
47
48    /// Write a key — always writes to the local overlay.
49    pub fn set(&self, key: &str, value: serde_json::Value) {
50        self.local_state.set(key, value, &self.agent_name);
51    }
52
53    /// Merge local state changes back into the parent.
54    pub fn merge_to_parent(&self) {
55        for key in self.local_state.keys() {
56            if let Some(value) = self.local_state.get(&key) {
57                self.parent_state.set(&key, value, &self.agent_name);
58            }
59        }
60    }
61}
62
63/// Scoped execution with task-local agent context.
64pub struct TaskScope;
65
66impl TaskScope {
67    /// Run a future with the given agent context set as task-local.
68    pub async fn run<F, T>(ctx: AgentContext, f: F) -> T
69    where
70        F: std::future::Future<Output = T>,
71    {
72        AGENT_CTX.scope(ctx, f).await
73    }
74
75    /// Try to access the current task's agent context.
76    /// Returns None if not running inside a TaskScope.
77    pub fn try_with<F, R>(f: F) -> Option<R>
78    where
79        F: FnOnce(&AgentContext) -> R,
80    {
81        AGENT_CTX.try_with(f).ok()
82    }
83
84    /// Get the current agent name, if inside a task scope.
85    pub fn agent_name() -> Option<String> {
86        Self::try_with(|ctx| ctx.agent_name.clone())
87    }
88}
89
90#[cfg(test)]
91mod tests {
92    use super::*;
93
94    #[tokio::test]
95    async fn test_agent_context_isolation() {
96        let parent = Arc::new(StateStore::new());
97        parent.set("shared_key", serde_json::json!("parent_value"), "test");
98
99        let ctx = AgentContext::new("agent_a", Arc::clone(&parent));
100
101        // Reads fall through to parent
102        assert_eq!(ctx.get("shared_key"), Some(serde_json::json!("parent_value")));
103
104        // Writes go to local overlay
105        ctx.set("local_key", serde_json::json!("local_value"));
106        assert_eq!(ctx.get("local_key"), Some(serde_json::json!("local_value")));
107        assert!(parent.get("local_key").is_none());
108
109        // Local overlay shadows parent
110        ctx.set("shared_key", serde_json::json!("overridden"));
111        assert_eq!(ctx.get("shared_key"), Some(serde_json::json!("overridden")));
112        assert_eq!(parent.get("shared_key"), Some(serde_json::json!("parent_value")));
113
114        // Merge propagates local writes to parent
115        ctx.merge_to_parent();
116        assert_eq!(parent.get("local_key"), Some(serde_json::json!("local_value")));
117        assert_eq!(parent.get("shared_key"), Some(serde_json::json!("overridden")));
118    }
119
120    #[tokio::test]
121    async fn test_task_scope() {
122        let parent = Arc::new(StateStore::new());
123        let ctx = AgentContext::new("scoped_agent", Arc::clone(&parent));
124
125        // Outside scope, try_with returns None
126        assert!(TaskScope::agent_name().is_none());
127
128        // Inside scope, context is accessible
129        let name = TaskScope::run(ctx, async {
130            TaskScope::agent_name().unwrap()
131        })
132        .await;
133        assert_eq!(name, "scoped_agent");
134    }
135
136    #[tokio::test]
137    async fn test_parallel_isolation() {
138        let parent = Arc::new(StateStore::new());
139        parent.set("counter", serde_json::json!(0), "init");
140
141        let ctx_a = AgentContext::new("agent_a", Arc::clone(&parent));
142        let ctx_b = AgentContext::new("agent_b", Arc::clone(&parent));
143
144        let handle_a = tokio::spawn({
145            let ctx = ctx_a.clone();
146            async move {
147                TaskScope::run(ctx.clone(), async {
148                    ctx.set("counter", serde_json::json!(1));
149                    ctx.set("agent_a_only", serde_json::json!("a_data"));
150                })
151                .await;
152                ctx
153            }
154        });
155
156        let handle_b = tokio::spawn({
157            let ctx = ctx_b.clone();
158            async move {
159                TaskScope::run(ctx.clone(), async {
160                    ctx.set("counter", serde_json::json!(2));
161                    ctx.set("agent_b_only", serde_json::json!("b_data"));
162                })
163                .await;
164                ctx
165            }
166        });
167
168        let ctx_a = handle_a.await.unwrap();
169        let ctx_b = handle_b.await.unwrap();
170
171        // Each agent sees its own counter value
172        assert_eq!(ctx_a.get("counter"), Some(serde_json::json!(1)));
173        assert_eq!(ctx_b.get("counter"), Some(serde_json::json!(2)));
174
175        // Parent unchanged until merge
176        assert_eq!(parent.get("counter"), Some(serde_json::json!(0)));
177
178        // Merge both — last merge wins for shared keys
179        ctx_a.merge_to_parent();
180        ctx_b.merge_to_parent();
181        assert_eq!(parent.get("agent_a_only"), Some(serde_json::json!("a_data")));
182        assert_eq!(parent.get("agent_b_only"), Some(serde_json::json!("b_data")));
183    }
184}