a2a_protocol_server/store/task_store.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 Tom F.
3
4//! Task persistence trait and in-memory implementation.
5//!
6//! [`TaskStore`] abstracts task persistence so that the server framework can
7//! be backed by any storage engine. [`InMemoryTaskStore`] provides a simple
8//! `HashMap`-based implementation suitable for testing and single-process
9//! deployments.
10
11use std::collections::HashMap;
12use std::future::Future;
13use std::pin::Pin;
14use std::time::{Duration, Instant};
15
16use a2a_protocol_types::error::A2aResult;
17use a2a_protocol_types::params::ListTasksParams;
18use a2a_protocol_types::responses::TaskListResponse;
19use a2a_protocol_types::task::{Task, TaskId};
20use tokio::sync::RwLock;
21
22/// Trait for persisting and retrieving [`Task`] objects.
23///
24/// All methods return `Pin<Box<dyn Future>>` for object safety — this trait
25/// is used as `Box<dyn TaskStore>`.
26///
27/// # Object safety
28///
29/// Do not add `async fn` methods; use the explicit `Pin<Box<...>>` form.
30pub trait TaskStore: Send + Sync + 'static {
31 /// Saves (creates or updates) a task.
32 ///
33 /// # Errors
34 ///
35 /// Returns an [`A2aError`](a2a_protocol_types::error::A2aError) if the store operation fails.
36 fn save<'a>(&'a self, task: Task) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>>;
37
38 /// Retrieves a task by its ID, returning `None` if not found.
39 ///
40 /// # Errors
41 ///
42 /// Returns an [`A2aError`](a2a_protocol_types::error::A2aError) if the store operation fails.
43 fn get<'a>(
44 &'a self,
45 id: &'a TaskId,
46 ) -> Pin<Box<dyn Future<Output = A2aResult<Option<Task>>> + Send + 'a>>;
47
48 /// Lists tasks matching the given filter parameters.
49 ///
50 /// # Errors
51 ///
52 /// Returns an [`A2aError`](a2a_protocol_types::error::A2aError) if the store operation fails.
53 fn list<'a>(
54 &'a self,
55 params: &'a ListTasksParams,
56 ) -> Pin<Box<dyn Future<Output = A2aResult<TaskListResponse>> + Send + 'a>>;
57
58 /// Atomically inserts a task only if no task with the same ID exists.
59 ///
60 /// Returns `Ok(true)` if the task was inserted, `Ok(false)` if a task
61 /// with the same ID already exists (no modification made).
62 ///
63 /// # Errors
64 ///
65 /// Returns an [`A2aError`](a2a_protocol_types::error::A2aError) if the store operation fails.
66 fn insert_if_absent<'a>(
67 &'a self,
68 task: Task,
69 ) -> Pin<Box<dyn Future<Output = A2aResult<bool>> + Send + 'a>>;
70
71 /// Deletes a task by its ID.
72 ///
73 /// # Errors
74 ///
75 /// Returns an [`A2aError`](a2a_protocol_types::error::A2aError) if the store operation fails.
76 fn delete<'a>(
77 &'a self,
78 id: &'a TaskId,
79 ) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>>;
80}
81
82/// Entry in the in-memory task store, tracking creation time for TTL eviction.
83#[derive(Debug, Clone)]
84struct TaskEntry {
85 /// The stored task.
86 task: Task,
87 /// When this entry was last written (for TTL-based eviction).
88 last_updated: Instant,
89}
90
91/// Configuration for [`InMemoryTaskStore`].
92#[derive(Debug, Clone)]
93pub struct TaskStoreConfig {
94 /// Maximum number of tasks to keep in the store. Once exceeded, the oldest
95 /// completed/failed tasks are evicted. `None` means no limit.
96 pub max_capacity: Option<usize>,
97
98 /// Time-to-live for completed or failed tasks. Tasks in terminal states
99 /// older than this duration are evicted on the next write operation.
100 /// `None` means no TTL-based eviction.
101 pub task_ttl: Option<Duration>,
102}
103
104impl Default for TaskStoreConfig {
105 fn default() -> Self {
106 Self {
107 max_capacity: Some(10_000),
108 task_ttl: Some(Duration::from_secs(3600)), // 1 hour
109 }
110 }
111}
112
113/// Number of writes between automatic eviction sweeps.
114///
115/// Amortizes the O(n) eviction cost so it doesn't run on every single `save()`.
116const EVICTION_INTERVAL: u64 = 64;
117
118/// Maximum allowed page size for list queries. Larger values are clamped to
119/// this limit to prevent a single request from requesting billions of tasks.
120const MAX_PAGE_SIZE: u32 = 1000;
121
122/// In-memory [`TaskStore`] backed by a [`HashMap`] under a [`RwLock`].
123///
124/// Suitable for testing and single-process deployments. Data is lost when the
125/// process exits.
126///
127/// Supports TTL-based eviction of terminal tasks and a maximum capacity limit
128/// to prevent unbounded memory growth.
129///
130/// # Eviction behavior
131///
132/// Eviction runs automatically every 64 writes (`EVICTION_INTERVAL`) and
133/// whenever the store exceeds `max_capacity`. However, if the system goes
134/// idle (no `save()` calls), completed tasks may persist in memory longer
135/// than their TTL.
136///
137/// **Operators should call [`run_eviction()`](Self::run_eviction) periodically**
138/// (e.g. every 60 seconds via `tokio::time::interval`) to ensure timely
139/// cleanup of terminal tasks during idle periods.
140#[derive(Debug)]
141pub struct InMemoryTaskStore {
142 entries: RwLock<HashMap<TaskId, TaskEntry>>,
143 config: TaskStoreConfig,
144 /// Counter for amortized eviction (only run every `EVICTION_INTERVAL` writes).
145 write_count: std::sync::atomic::AtomicU64,
146}
147
148impl Default for InMemoryTaskStore {
149 fn default() -> Self {
150 Self::new()
151 }
152}
153
154impl InMemoryTaskStore {
155 /// Creates a new empty in-memory task store with default configuration.
156 ///
157 /// Default: max 10,000 tasks, 1-hour TTL for terminal tasks.
158 #[must_use]
159 pub fn new() -> Self {
160 Self {
161 entries: RwLock::new(HashMap::new()),
162 config: TaskStoreConfig::default(),
163 write_count: std::sync::atomic::AtomicU64::new(0),
164 }
165 }
166
167 /// Creates a new in-memory task store with custom configuration.
168 #[must_use]
169 pub fn with_config(config: TaskStoreConfig) -> Self {
170 Self {
171 entries: RwLock::new(HashMap::new()),
172 config,
173 write_count: std::sync::atomic::AtomicU64::new(0),
174 }
175 }
176
177 /// Runs background eviction of expired and over-capacity entries.
178 ///
179 /// Call this periodically (e.g. every 60 seconds) to clean up terminal
180 /// tasks that would otherwise persist until the next `save()` call.
181 pub async fn run_eviction(&self) {
182 let mut store = self.entries.write().await;
183 Self::evict(&mut store, &self.config);
184 }
185
186 /// Evicts expired and over-capacity entries (must be called with write lock held).
187 fn evict(store: &mut HashMap<TaskId, TaskEntry>, config: &TaskStoreConfig) {
188 let now = Instant::now();
189
190 // TTL eviction: remove terminal tasks older than the TTL.
191 if let Some(ttl) = config.task_ttl {
192 store.retain(|_, entry| {
193 if entry.task.status.state.is_terminal() {
194 now.duration_since(entry.last_updated) < ttl
195 } else {
196 true
197 }
198 });
199 }
200
201 // Capacity eviction: remove oldest terminal tasks if over capacity.
202 if let Some(max) = config.max_capacity {
203 if store.len() > max {
204 let overflow = store.len() - max;
205 // Collect terminal tasks sorted by age (oldest first).
206 let mut terminal: Vec<(TaskId, Instant)> = store
207 .iter()
208 .filter(|(_, e)| e.task.status.state.is_terminal())
209 .map(|(id, e)| (id.clone(), e.last_updated))
210 .collect();
211 terminal.sort_by_key(|(_, t)| *t);
212
213 for (id, _) in terminal.into_iter().take(overflow) {
214 store.remove(&id);
215 }
216 }
217 }
218 }
219}
220
221#[allow(clippy::manual_async_fn)]
222impl TaskStore for InMemoryTaskStore {
223 fn save<'a>(&'a self, task: Task) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> {
224 Box::pin(async move {
225 trace_debug!(task_id = %task.id, state = ?task.status.state, "saving task");
226 let mut store = self.entries.write().await;
227
228 store.insert(
229 task.id.clone(),
230 TaskEntry {
231 task,
232 last_updated: Instant::now(),
233 },
234 );
235
236 // Amortized eviction: only run every EVICTION_INTERVAL writes,
237 // or immediately if the store exceeds max capacity.
238 let count = self
239 .write_count
240 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
241 let over_capacity = self
242 .config
243 .max_capacity
244 .is_some_and(|max| store.len() > max);
245 if count.is_multiple_of(EVICTION_INTERVAL) || over_capacity {
246 Self::evict(&mut store, &self.config);
247 }
248
249 drop(store);
250 Ok(())
251 })
252 }
253
254 fn get<'a>(
255 &'a self,
256 id: &'a TaskId,
257 ) -> Pin<Box<dyn Future<Output = A2aResult<Option<Task>>> + Send + 'a>> {
258 Box::pin(async move {
259 trace_debug!(task_id = %id, "fetching task");
260 let store = self.entries.read().await;
261 let result = store.get(id).map(|e| e.task.clone());
262 drop(store);
263 Ok(result)
264 })
265 }
266
267 fn list<'a>(
268 &'a self,
269 params: &'a ListTasksParams,
270 ) -> Pin<Box<dyn Future<Output = A2aResult<TaskListResponse>> + Send + 'a>> {
271 Box::pin(async move {
272 let store = self.entries.read().await;
273 let mut tasks: Vec<Task> = store
274 .values()
275 .filter(|e| {
276 if let Some(ref ctx) = params.context_id {
277 if e.task.context_id.0 != *ctx {
278 return false;
279 }
280 }
281 if let Some(ref status) = params.status {
282 if e.task.status.state != *status {
283 return false;
284 }
285 }
286 true
287 })
288 .map(|e| e.task.clone())
289 .collect();
290 drop(store);
291
292 // Sort by task ID for deterministic output.
293 tasks.sort_by(|a, b| a.id.0.cmp(&b.id.0));
294
295 // Apply cursor-based pagination via page_token.
296 // The page_token is the last task ID from the previous page.
297 if let Some(ref token) = params.page_token {
298 if let Some(pos) = tasks.iter().position(|t| t.id.0 == *token) {
299 // Skip up to and including the cursor task.
300 tasks = tasks.split_off(pos + 1);
301 } else {
302 // Token refers to a non-existent task — return empty page.
303 tasks.clear();
304 }
305 }
306
307 // Treat page_size of 0 as "use default"; clamp to MAX_PAGE_SIZE.
308 let page_size = match params.page_size {
309 Some(0) | None => 50_usize,
310 Some(n) => (n.min(MAX_PAGE_SIZE)) as usize,
311 };
312 let next_page_token = if tasks.len() > page_size {
313 tasks
314 .get(page_size.saturating_sub(1))
315 .map(|t| t.id.0.clone())
316 } else {
317 None
318 };
319 tasks.truncate(page_size);
320
321 let mut response = TaskListResponse::new(tasks);
322 response.next_page_token = next_page_token;
323 Ok(response)
324 })
325 }
326
327 fn insert_if_absent<'a>(
328 &'a self,
329 task: Task,
330 ) -> Pin<Box<dyn Future<Output = A2aResult<bool>> + Send + 'a>> {
331 Box::pin(async move {
332 let mut store = self.entries.write().await;
333 if store.contains_key(&task.id) {
334 return Ok(false);
335 }
336 store.insert(
337 task.id.clone(),
338 TaskEntry {
339 task,
340 last_updated: Instant::now(),
341 },
342 );
343
344 // Amortized eviction.
345 let count = self
346 .write_count
347 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
348 let over_capacity = self
349 .config
350 .max_capacity
351 .is_some_and(|max| store.len() > max);
352 if count.is_multiple_of(EVICTION_INTERVAL) || over_capacity {
353 Self::evict(&mut store, &self.config);
354 }
355 drop(store);
356 Ok(true)
357 })
358 }
359
360 fn delete<'a>(
361 &'a self,
362 id: &'a TaskId,
363 ) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> {
364 Box::pin(async move {
365 let mut store = self.entries.write().await;
366 store.remove(id);
367 drop(store);
368 Ok(())
369 })
370 }
371}