Skip to main content

a2a_protocol_server/push/
config_store.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 Tom F.
3
4//! Push notification configuration storage trait and in-memory implementation.
5
6use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9
10use a2a_protocol_types::error::A2aResult;
11use a2a_protocol_types::push::TaskPushNotificationConfig;
12use tokio::sync::RwLock;
13
14/// Trait for storing push notification configurations.
15///
16/// Object-safe; used as `Box<dyn PushConfigStore>`.
17pub trait PushConfigStore: Send + Sync + 'static {
18    /// Stores (creates or updates) a push notification config.
19    ///
20    /// # Errors
21    ///
22    /// Returns an [`A2aError`](a2a_protocol_types::error::A2aError) if the operation fails.
23    fn set<'a>(
24        &'a self,
25        config: TaskPushNotificationConfig,
26    ) -> Pin<Box<dyn Future<Output = A2aResult<TaskPushNotificationConfig>> + Send + 'a>>;
27
28    /// Retrieves a push notification config by task ID and config ID.
29    ///
30    /// # Errors
31    ///
32    /// Returns an [`A2aError`](a2a_protocol_types::error::A2aError) if the operation fails.
33    fn get<'a>(
34        &'a self,
35        task_id: &'a str,
36        id: &'a str,
37    ) -> Pin<Box<dyn Future<Output = A2aResult<Option<TaskPushNotificationConfig>>> + Send + 'a>>;
38
39    /// Lists all push notification configs for a task.
40    ///
41    /// # Errors
42    ///
43    /// Returns an [`A2aError`](a2a_protocol_types::error::A2aError) if the operation fails.
44    fn list<'a>(
45        &'a self,
46        task_id: &'a str,
47    ) -> Pin<Box<dyn Future<Output = A2aResult<Vec<TaskPushNotificationConfig>>> + Send + 'a>>;
48
49    /// Deletes a push notification config by task ID and config ID.
50    ///
51    /// # Errors
52    ///
53    /// Returns an [`A2aError`](a2a_protocol_types::error::A2aError) if the operation fails.
54    fn delete<'a>(
55        &'a self,
56        task_id: &'a str,
57        id: &'a str,
58    ) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>>;
59}
60
61/// Maximum number of push notification configs allowed per task.
62///
63/// Prevents a malicious client from creating unbounded push configs.
64const MAX_PUSH_CONFIGS_PER_TASK: usize = 100;
65
66/// In-memory [`PushConfigStore`] backed by a `HashMap`.
67#[derive(Debug, Default)]
68pub struct InMemoryPushConfigStore {
69    configs: RwLock<HashMap<(String, String), TaskPushNotificationConfig>>,
70}
71
72impl InMemoryPushConfigStore {
73    /// Creates a new empty in-memory push config store.
74    #[must_use]
75    pub fn new() -> Self {
76        Self::default()
77    }
78}
79
80#[allow(clippy::manual_async_fn)]
81impl PushConfigStore for InMemoryPushConfigStore {
82    fn set<'a>(
83        &'a self,
84        mut config: TaskPushNotificationConfig,
85    ) -> Pin<Box<dyn Future<Output = A2aResult<TaskPushNotificationConfig>> + Send + 'a>> {
86        Box::pin(async move {
87            // Assign an ID if not present.
88            let id = config
89                .id
90                .clone()
91                .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
92            config.id = Some(id.clone());
93
94            let key = (config.task_id.clone(), id);
95            let mut store = self.configs.write().await;
96
97            // Reject if this is a new config and the per-task limit is reached.
98            if !store.contains_key(&key) {
99                let task_id = &config.task_id;
100                let count = store.keys().filter(|(tid, _)| tid == task_id).count();
101                if count >= MAX_PUSH_CONFIGS_PER_TASK {
102                    drop(store);
103                    return Err(a2a_protocol_types::error::A2aError::invalid_params(format!(
104                        "push config limit exceeded: task {task_id} already has {count} configs (max {MAX_PUSH_CONFIGS_PER_TASK})"
105                    )));
106                }
107            }
108
109            store.insert(key, config.clone());
110            drop(store);
111            Ok(config)
112        })
113    }
114
115    fn get<'a>(
116        &'a self,
117        task_id: &'a str,
118        id: &'a str,
119    ) -> Pin<Box<dyn Future<Output = A2aResult<Option<TaskPushNotificationConfig>>> + Send + 'a>>
120    {
121        Box::pin(async move {
122            let store = self.configs.read().await;
123            let key = (task_id.to_owned(), id.to_owned());
124            let result = store.get(&key).cloned();
125            drop(store);
126            Ok(result)
127        })
128    }
129
130    fn list<'a>(
131        &'a self,
132        task_id: &'a str,
133    ) -> Pin<Box<dyn Future<Output = A2aResult<Vec<TaskPushNotificationConfig>>> + Send + 'a>> {
134        Box::pin(async move {
135            let store = self.configs.read().await;
136            let configs: Vec<_> = store
137                .iter()
138                .filter(|((tid, _), _)| tid == task_id)
139                .map(|(_, v)| v.clone())
140                .collect();
141            drop(store);
142            Ok(configs)
143        })
144    }
145
146    fn delete<'a>(
147        &'a self,
148        task_id: &'a str,
149        id: &'a str,
150    ) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> {
151        Box::pin(async move {
152            let mut store = self.configs.write().await;
153            let key = (task_id.to_owned(), id.to_owned());
154            store.remove(&key);
155            drop(store);
156            Ok(())
157        })
158    }
159}