car-a2a 0.18.0

Bridge between Common Agent Runtime and the Linux Foundation Agent2Agent (A2A) v1.0 protocol
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
//! In-memory task store for the A2A bridge.
//!
//! Tracks every task this agent has accepted from peers, plus its
//! current state and accumulated artifacts. Persistent embedders can
//! supply their own implementation of [`TaskStore`] backed by the
//! same storage they use for CAR event journals.
//!
//! `AbortHandle`s for in-flight executor tasks live in a separate
//! [`AbortRegistry`] (an in-process `HashMap<String, AbortHandle>`).
//! They have no place in a persistable storage trait — a sqlite or
//! postgres backend has no way to store one — so the split keeps
//! `TaskStore` truly persistable.

use crate::types::{Artifact, Message, PushNotificationConfig, Task, TaskState, TaskStatus};
use chrono::Utc;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock};
use tokio_util::sync::CancellationToken;

/// One row of the persistable task store.
#[derive(Clone)]
pub struct TaskRecord {
    pub task: Task,
    /// Push-notification subscribers keyed by config id.
    pub push_configs: HashMap<String, PushNotificationConfig>,
    /// True once the caller has issued `tasks/cancel`. Stays sticky
    /// so the executor task can observe it after the fact and skip
    /// publishing a redundant final event.
    pub cancel_requested: bool,
}

impl TaskRecord {
    pub fn new(task: Task) -> Self {
        Self {
            task,
            push_configs: HashMap::new(),
            cancel_requested: false,
        }
    }
}

#[async_trait::async_trait]
pub trait TaskStore: Send + Sync {
    async fn create(&self, task: Task) -> TaskRecord;
    async fn get(&self, task_id: &str) -> Option<TaskRecord>;
    /// Update lifecycle state. **Honours the terminal-state guard**:
    /// once the task is in `Completed`/`Failed`/`Canceled`/`Rejected`,
    /// later writes are silently ignored. This is how cancel races
    /// are closed — without the guard, an in-flight executor can
    /// resume after `tasks/cancel` and clobber `Canceled` back to
    /// `Completed`. Returns the (possibly unchanged) record.
    async fn update_state(&self, task_id: &str, state: TaskState) -> Option<TaskRecord>;
    async fn append_artifacts(&self, task_id: &str, artifacts: Vec<Artifact>)
        -> Option<TaskRecord>;
    async fn append_history(&self, task_id: &str, message: Message) -> Option<TaskRecord>;
    async fn list(&self, filter: ListFilter) -> Vec<TaskRecord>;
    /// Mark the task `Canceled` and set `cancel_requested = true`.
    /// No-op when the task is already terminal. Returns the post-
    /// update record so the caller can decide whether to publish a
    /// status event.
    async fn request_cancel(&self, task_id: &str) -> Option<TaskRecord>;
    async fn add_push_config(
        &self,
        task_id: &str,
        config_id: String,
        config: PushNotificationConfig,
    ) -> Option<TaskRecord>;
    async fn remove_push_config(&self, task_id: &str, config_id: &str) -> Option<TaskRecord>;
    async fn get_push_config(
        &self,
        task_id: &str,
        config_id: &str,
    ) -> Option<PushNotificationConfig>;
    async fn list_push_configs(&self, task_id: &str) -> Vec<(String, PushNotificationConfig)>;
}

#[derive(Debug, Clone, Default)]
pub struct ListFilter {
    pub context_id: Option<String>,
    pub state: Option<TaskState>,
    pub limit: Option<u32>,
}

#[derive(Default)]
pub struct InMemoryTaskStore {
    inner: RwLock<HashMap<String, TaskRecord>>,
}

impl InMemoryTaskStore {
    pub fn new() -> Self {
        Self::default()
    }

    pub fn shared() -> Arc<Self> {
        Arc::new(Self::new())
    }
}

#[async_trait::async_trait]
impl TaskStore for InMemoryTaskStore {
    async fn create(&self, task: Task) -> TaskRecord {
        let record = TaskRecord::new(task);
        self.inner
            .write()
            .await
            .insert(record.task.id.clone(), record.clone());
        record
    }

    async fn get(&self, task_id: &str) -> Option<TaskRecord> {
        self.inner.read().await.get(task_id).cloned()
    }

    async fn update_state(&self, task_id: &str, state: TaskState) -> Option<TaskRecord> {
        let mut guard = self.inner.write().await;
        let record = guard.get_mut(task_id)?;
        if record.task.status.state.is_terminal() {
            return Some(record.clone());
        }
        record.task.status = TaskStatus {
            state,
            message: None,
            timestamp: Utc::now(),
        };
        Some(record.clone())
    }

    async fn append_artifacts(
        &self,
        task_id: &str,
        artifacts: Vec<Artifact>,
    ) -> Option<TaskRecord> {
        let mut guard = self.inner.write().await;
        let record = guard.get_mut(task_id)?;
        record.task.artifacts.extend(artifacts);
        Some(record.clone())
    }

    async fn append_history(&self, task_id: &str, message: Message) -> Option<TaskRecord> {
        let mut guard = self.inner.write().await;
        let record = guard.get_mut(task_id)?;
        record.task.history.push(message);
        Some(record.clone())
    }

    async fn list(&self, filter: ListFilter) -> Vec<TaskRecord> {
        let guard = self.inner.read().await;
        let mut out: Vec<TaskRecord> = guard
            .values()
            .filter(|r| match &filter.context_id {
                Some(ctx) => &r.task.context_id == ctx,
                None => true,
            })
            .filter(|r| match filter.state {
                Some(s) => r.task.status.state == s,
                None => true,
            })
            .cloned()
            .collect();
        out.sort_by(|a, b| b.task.status.timestamp.cmp(&a.task.status.timestamp));
        if let Some(limit) = filter.limit {
            out.truncate(limit as usize);
        }
        out
    }

    async fn request_cancel(&self, task_id: &str) -> Option<TaskRecord> {
        let mut guard = self.inner.write().await;
        let record = guard.get_mut(task_id)?;
        if !record.task.status.state.is_terminal() {
            record.cancel_requested = true;
            record.task.status = TaskStatus {
                state: TaskState::Canceled,
                message: None,
                timestamp: Utc::now(),
            };
        }
        Some(record.clone())
    }

    async fn add_push_config(
        &self,
        task_id: &str,
        config_id: String,
        config: PushNotificationConfig,
    ) -> Option<TaskRecord> {
        let mut guard = self.inner.write().await;
        let record = guard.get_mut(task_id)?;
        record.push_configs.insert(config_id, config);
        Some(record.clone())
    }

    async fn remove_push_config(&self, task_id: &str, config_id: &str) -> Option<TaskRecord> {
        let mut guard = self.inner.write().await;
        let record = guard.get_mut(task_id)?;
        record.push_configs.remove(config_id);
        Some(record.clone())
    }

    async fn get_push_config(
        &self,
        task_id: &str,
        config_id: &str,
    ) -> Option<PushNotificationConfig> {
        self.inner
            .read()
            .await
            .get(task_id)
            .and_then(|r| r.push_configs.get(config_id).cloned())
    }

    async fn list_push_configs(&self, task_id: &str) -> Vec<(String, PushNotificationConfig)> {
        self.inner
            .read()
            .await
            .get(task_id)
            .map(|r| {
                r.push_configs
                    .iter()
                    .map(|(k, v)| (k.clone(), v.clone()))
                    .collect()
            })
            .unwrap_or_default()
    }
}

/// In-process registry of `CancellationToken`s for spawned executor
/// tasks.
///
/// Lives separately from [`TaskStore`] because the tokens are
/// transient runtime state that no persistable backend can store.
/// The dispatcher owns one of these and drops tokens when tasks
/// finish or get cancelled.
///
/// The previous implementation stored `tokio::task::AbortHandle`s
/// and called `handle.abort()` to interrupt mid-await. That left
/// no `ProposalResult` record of which actions actually ran. Using
/// `CancellationToken` lets the engine cooperate: it sees the
/// cancellation at level boundaries and emits canceled action
/// results so the bridge can return a clean partial state.
#[derive(Default)]
pub struct AbortRegistry {
    inner: Mutex<HashMap<String, CancellationToken>>,
}

impl AbortRegistry {
    pub fn new() -> Self {
        Self::default()
    }

    pub async fn register(&self, task_id: String, token: CancellationToken) {
        self.inner.lock().await.insert(task_id, token);
    }

    pub async fn clear(&self, task_id: &str) {
        self.inner.lock().await.remove(task_id);
    }

    /// Cancel the in-flight executor task. Returns `true` if a token
    /// was found and cancelled.
    pub async fn abort(&self, task_id: &str) -> bool {
        match self.inner.lock().await.remove(task_id) {
            Some(token) => {
                token.cancel();
                true
            }
            None => false,
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::bridge::task_with_status;

    #[tokio::test]
    async fn terminal_state_guard_blocks_clobber() {
        let store = InMemoryTaskStore::new();
        store
            .create(task_with_status(
                "t-guard".into(),
                "ctx".into(),
                TaskState::Submitted,
                vec![],
                vec![],
            ))
            .await;
        // Set terminal.
        store
            .update_state("t-guard", TaskState::Canceled)
            .await
            .unwrap();
        // Now an attempt to flip to Completed should no-op.
        let after = store
            .update_state("t-guard", TaskState::Completed)
            .await
            .unwrap();
        assert_eq!(after.task.status.state, TaskState::Canceled);
    }

    #[tokio::test]
    async fn abort_registry_signals_token() {
        let registry = AbortRegistry::new();
        let token = CancellationToken::new();
        registry.register("t-1".into(), token.clone()).await;
        let aborted = registry.abort("t-1").await;
        assert!(aborted);
        assert!(token.is_cancelled());
        // Subsequent abort returns false because the token was
        // removed on the first abort.
        assert!(!registry.abort("t-1").await);
    }

    #[tokio::test]
    async fn create_and_update_lifecycle() {
        let store = InMemoryTaskStore::new();
        let task = task_with_status(
            "t-1".into(),
            "ctx-1".into(),
            TaskState::Submitted,
            vec![],
            vec![],
        );
        store.create(task).await;
        let updated = store
            .update_state("t-1", TaskState::Working)
            .await
            .expect("found");
        assert_eq!(updated.task.status.state, TaskState::Working);

        let canceled = store.request_cancel("t-1").await.expect("found");
        assert_eq!(canceled.task.status.state, TaskState::Canceled);
        assert!(canceled.cancel_requested);
    }

    #[tokio::test]
    async fn list_filters_by_state_and_limit() {
        let store = InMemoryTaskStore::new();
        for i in 0..5 {
            let task = task_with_status(
                format!("t-{}", i),
                "ctx-a".into(),
                if i % 2 == 0 {
                    TaskState::Working
                } else {
                    TaskState::Completed
                },
                vec![],
                vec![],
            );
            store.create(task).await;
        }
        let working = store
            .list(ListFilter {
                state: Some(TaskState::Working),
                ..Default::default()
            })
            .await;
        assert_eq!(working.len(), 3);
        let limited = store
            .list(ListFilter {
                limit: Some(2),
                ..Default::default()
            })
            .await;
        assert_eq!(limited.len(), 2);
    }

    #[tokio::test]
    async fn push_config_crud() {
        let store = InMemoryTaskStore::new();
        let task = task_with_status(
            "t-9".into(),
            "ctx-9".into(),
            TaskState::Submitted,
            vec![],
            vec![],
        );
        store.create(task).await;
        store
            .add_push_config(
                "t-9",
                "cfg-1".into(),
                PushNotificationConfig {
                    url: "https://example.com/hook".into(),
                    token: None,
                    authentication: None,
                },
            )
            .await
            .expect("task");
        let cfgs = store.list_push_configs("t-9").await;
        assert_eq!(cfgs.len(), 1);
        assert_eq!(cfgs[0].0, "cfg-1");

        store
            .remove_push_config("t-9", "cfg-1")
            .await
            .expect("task");
        assert!(store.list_push_configs("t-9").await.is_empty());
    }
}