Skip to main content

a2a_protocol_server/store/
task_store.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 Tom F.
3
4//! Task persistence trait and in-memory implementation.
5//!
6//! [`TaskStore`] abstracts task persistence so that the server framework can
7//! be backed by any storage engine. [`InMemoryTaskStore`] provides a simple
8//! `HashMap`-based implementation suitable for testing and single-process
9//! deployments.
10
11use std::collections::HashMap;
12use std::future::Future;
13use std::pin::Pin;
14use std::time::{Duration, Instant};
15
16use a2a_protocol_types::error::A2aResult;
17use a2a_protocol_types::params::ListTasksParams;
18use a2a_protocol_types::responses::TaskListResponse;
19use a2a_protocol_types::task::{Task, TaskId};
20use tokio::sync::RwLock;
21
22/// Trait for persisting and retrieving [`Task`] objects.
23///
24/// All methods return `Pin<Box<dyn Future>>` for object safety — this trait
25/// is used as `Box<dyn TaskStore>`.
26///
27/// # Object safety
28///
29/// Do not add `async fn` methods; use the explicit `Pin<Box<...>>` form.
30pub trait TaskStore: Send + Sync + 'static {
31    /// Saves (creates or updates) a task.
32    ///
33    /// # Errors
34    ///
35    /// Returns an [`A2aError`](a2a_protocol_types::error::A2aError) if the store operation fails.
36    fn save<'a>(&'a self, task: Task) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>>;
37
38    /// Retrieves a task by its ID, returning `None` if not found.
39    ///
40    /// # Errors
41    ///
42    /// Returns an [`A2aError`](a2a_protocol_types::error::A2aError) if the store operation fails.
43    fn get<'a>(
44        &'a self,
45        id: &'a TaskId,
46    ) -> Pin<Box<dyn Future<Output = A2aResult<Option<Task>>> + Send + 'a>>;
47
48    /// Lists tasks matching the given filter parameters.
49    ///
50    /// # Errors
51    ///
52    /// Returns an [`A2aError`](a2a_protocol_types::error::A2aError) if the store operation fails.
53    fn list<'a>(
54        &'a self,
55        params: &'a ListTasksParams,
56    ) -> Pin<Box<dyn Future<Output = A2aResult<TaskListResponse>> + Send + 'a>>;
57
58    /// Atomically inserts a task only if no task with the same ID exists.
59    ///
60    /// Returns `Ok(true)` if the task was inserted, `Ok(false)` if a task
61    /// with the same ID already exists (no modification made).
62    ///
63    /// # Errors
64    ///
65    /// Returns an [`A2aError`](a2a_protocol_types::error::A2aError) if the store operation fails.
66    fn insert_if_absent<'a>(
67        &'a self,
68        task: Task,
69    ) -> Pin<Box<dyn Future<Output = A2aResult<bool>> + Send + 'a>>;
70
71    /// Deletes a task by its ID.
72    ///
73    /// # Errors
74    ///
75    /// Returns an [`A2aError`](a2a_protocol_types::error::A2aError) if the store operation fails.
76    fn delete<'a>(
77        &'a self,
78        id: &'a TaskId,
79    ) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>>;
80}
81
82/// Entry in the in-memory task store, tracking creation time for TTL eviction.
83#[derive(Debug, Clone)]
84struct TaskEntry {
85    /// The stored task.
86    task: Task,
87    /// When this entry was last written (for TTL-based eviction).
88    last_updated: Instant,
89}
90
91/// Configuration for [`InMemoryTaskStore`].
92#[derive(Debug, Clone)]
93pub struct TaskStoreConfig {
94    /// Maximum number of tasks to keep in the store. Once exceeded, the oldest
95    /// completed/failed tasks are evicted. `None` means no limit.
96    pub max_capacity: Option<usize>,
97
98    /// Time-to-live for completed or failed tasks. Tasks in terminal states
99    /// older than this duration are evicted on the next write operation.
100    /// `None` means no TTL-based eviction.
101    pub task_ttl: Option<Duration>,
102}
103
104impl Default for TaskStoreConfig {
105    fn default() -> Self {
106        Self {
107            max_capacity: Some(10_000),
108            task_ttl: Some(Duration::from_secs(3600)), // 1 hour
109        }
110    }
111}
112
113/// Number of writes between automatic eviction sweeps.
114///
115/// Amortizes the O(n) eviction cost so it doesn't run on every single `save()`.
116const EVICTION_INTERVAL: u64 = 64;
117
118/// Maximum allowed page size for list queries. Larger values are clamped to
119/// this limit to prevent a single request from requesting billions of tasks.
120const MAX_PAGE_SIZE: u32 = 1000;
121
122/// In-memory [`TaskStore`] backed by a [`HashMap`] under a [`RwLock`].
123///
124/// Suitable for testing and single-process deployments. Data is lost when the
125/// process exits.
126///
127/// Supports TTL-based eviction of terminal tasks and a maximum capacity limit
128/// to prevent unbounded memory growth.
129///
130/// # Eviction behavior
131///
132/// Eviction runs automatically every 64 writes (`EVICTION_INTERVAL`) and
133/// whenever the store exceeds `max_capacity`. However, if the system goes
134/// idle (no `save()` calls), completed tasks may persist in memory longer
135/// than their TTL.
136///
137/// **Operators should call [`run_eviction()`](Self::run_eviction) periodically**
138/// (e.g. every 60 seconds via `tokio::time::interval`) to ensure timely
139/// cleanup of terminal tasks during idle periods.
140#[derive(Debug)]
141pub struct InMemoryTaskStore {
142    entries: RwLock<HashMap<TaskId, TaskEntry>>,
143    config: TaskStoreConfig,
144    /// Counter for amortized eviction (only run every `EVICTION_INTERVAL` writes).
145    write_count: std::sync::atomic::AtomicU64,
146}
147
148impl Default for InMemoryTaskStore {
149    fn default() -> Self {
150        Self::new()
151    }
152}
153
154impl InMemoryTaskStore {
155    /// Creates a new empty in-memory task store with default configuration.
156    ///
157    /// Default: max 10,000 tasks, 1-hour TTL for terminal tasks.
158    #[must_use]
159    pub fn new() -> Self {
160        Self {
161            entries: RwLock::new(HashMap::new()),
162            config: TaskStoreConfig::default(),
163            write_count: std::sync::atomic::AtomicU64::new(0),
164        }
165    }
166
167    /// Creates a new in-memory task store with custom configuration.
168    #[must_use]
169    pub fn with_config(config: TaskStoreConfig) -> Self {
170        Self {
171            entries: RwLock::new(HashMap::new()),
172            config,
173            write_count: std::sync::atomic::AtomicU64::new(0),
174        }
175    }
176
177    /// Runs background eviction of expired and over-capacity entries.
178    ///
179    /// Call this periodically (e.g. every 60 seconds) to clean up terminal
180    /// tasks that would otherwise persist until the next `save()` call.
181    pub async fn run_eviction(&self) {
182        let mut store = self.entries.write().await;
183        Self::evict(&mut store, &self.config);
184    }
185
186    /// Evicts expired and over-capacity entries (must be called with write lock held).
187    fn evict(store: &mut HashMap<TaskId, TaskEntry>, config: &TaskStoreConfig) {
188        let now = Instant::now();
189
190        // TTL eviction: remove terminal tasks older than the TTL.
191        if let Some(ttl) = config.task_ttl {
192            store.retain(|_, entry| {
193                if entry.task.status.state.is_terminal() {
194                    now.duration_since(entry.last_updated) < ttl
195                } else {
196                    true
197                }
198            });
199        }
200
201        // Capacity eviction: remove oldest terminal tasks if over capacity.
202        if let Some(max) = config.max_capacity {
203            if store.len() > max {
204                let overflow = store.len() - max;
205                // Collect terminal tasks sorted by age (oldest first).
206                let mut terminal: Vec<(TaskId, Instant)> = store
207                    .iter()
208                    .filter(|(_, e)| e.task.status.state.is_terminal())
209                    .map(|(id, e)| (id.clone(), e.last_updated))
210                    .collect();
211                terminal.sort_by_key(|(_, t)| *t);
212
213                for (id, _) in terminal.into_iter().take(overflow) {
214                    store.remove(&id);
215                }
216            }
217        }
218    }
219}
220
221#[allow(clippy::manual_async_fn)]
222impl TaskStore for InMemoryTaskStore {
223    fn save<'a>(&'a self, task: Task) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> {
224        Box::pin(async move {
225            trace_debug!(task_id = %task.id, state = ?task.status.state, "saving task");
226            let mut store = self.entries.write().await;
227
228            store.insert(
229                task.id.clone(),
230                TaskEntry {
231                    task,
232                    last_updated: Instant::now(),
233                },
234            );
235
236            // Amortized eviction: only run every EVICTION_INTERVAL writes,
237            // or immediately if the store exceeds max capacity.
238            let count = self
239                .write_count
240                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
241            let over_capacity = self
242                .config
243                .max_capacity
244                .is_some_and(|max| store.len() > max);
245            if count.is_multiple_of(EVICTION_INTERVAL) || over_capacity {
246                Self::evict(&mut store, &self.config);
247            }
248
249            drop(store);
250            Ok(())
251        })
252    }
253
254    fn get<'a>(
255        &'a self,
256        id: &'a TaskId,
257    ) -> Pin<Box<dyn Future<Output = A2aResult<Option<Task>>> + Send + 'a>> {
258        Box::pin(async move {
259            trace_debug!(task_id = %id, "fetching task");
260            let store = self.entries.read().await;
261            let result = store.get(id).map(|e| e.task.clone());
262            drop(store);
263            Ok(result)
264        })
265    }
266
267    fn list<'a>(
268        &'a self,
269        params: &'a ListTasksParams,
270    ) -> Pin<Box<dyn Future<Output = A2aResult<TaskListResponse>> + Send + 'a>> {
271        Box::pin(async move {
272            let store = self.entries.read().await;
273            let mut tasks: Vec<Task> = store
274                .values()
275                .filter(|e| {
276                    if let Some(ref ctx) = params.context_id {
277                        if e.task.context_id.0 != *ctx {
278                            return false;
279                        }
280                    }
281                    if let Some(ref status) = params.status {
282                        if e.task.status.state != *status {
283                            return false;
284                        }
285                    }
286                    true
287                })
288                .map(|e| e.task.clone())
289                .collect();
290            drop(store);
291
292            // Sort by task ID for deterministic output.
293            tasks.sort_by(|a, b| a.id.0.cmp(&b.id.0));
294
295            // Apply cursor-based pagination via page_token.
296            // The page_token is the last task ID from the previous page.
297            if let Some(ref token) = params.page_token {
298                if let Some(pos) = tasks.iter().position(|t| t.id.0 == *token) {
299                    // Skip up to and including the cursor task.
300                    tasks = tasks.split_off(pos + 1);
301                } else {
302                    // Token refers to a non-existent task — return empty page.
303                    tasks.clear();
304                }
305            }
306
307            // Treat page_size of 0 as "use default"; clamp to MAX_PAGE_SIZE.
308            let page_size = match params.page_size {
309                Some(0) | None => 50_usize,
310                Some(n) => (n.min(MAX_PAGE_SIZE)) as usize,
311            };
312            let next_page_token = if tasks.len() > page_size {
313                tasks
314                    .get(page_size.saturating_sub(1))
315                    .map(|t| t.id.0.clone())
316            } else {
317                None
318            };
319            tasks.truncate(page_size);
320
321            let mut response = TaskListResponse::new(tasks);
322            response.next_page_token = next_page_token;
323            Ok(response)
324        })
325    }
326
327    fn insert_if_absent<'a>(
328        &'a self,
329        task: Task,
330    ) -> Pin<Box<dyn Future<Output = A2aResult<bool>> + Send + 'a>> {
331        Box::pin(async move {
332            let mut store = self.entries.write().await;
333            if store.contains_key(&task.id) {
334                return Ok(false);
335            }
336            store.insert(
337                task.id.clone(),
338                TaskEntry {
339                    task,
340                    last_updated: Instant::now(),
341                },
342            );
343
344            // Amortized eviction.
345            let count = self
346                .write_count
347                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
348            let over_capacity = self
349                .config
350                .max_capacity
351                .is_some_and(|max| store.len() > max);
352            if count.is_multiple_of(EVICTION_INTERVAL) || over_capacity {
353                Self::evict(&mut store, &self.config);
354            }
355            drop(store);
356            Ok(true)
357        })
358    }
359
360    fn delete<'a>(
361        &'a self,
362        id: &'a TaskId,
363    ) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> {
364        Box::pin(async move {
365            let mut store = self.entries.write().await;
366            store.remove(id);
367            drop(store);
368            Ok(())
369        })
370    }
371}