a2a_protocol_server/push/
config_store.rs1use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9
10use a2a_protocol_types::error::A2aResult;
11use a2a_protocol_types::push::TaskPushNotificationConfig;
12use tokio::sync::RwLock;
13
14pub trait PushConfigStore: Send + Sync + 'static {
18 fn set<'a>(
24 &'a self,
25 config: TaskPushNotificationConfig,
26 ) -> Pin<Box<dyn Future<Output = A2aResult<TaskPushNotificationConfig>> + Send + 'a>>;
27
28 fn get<'a>(
34 &'a self,
35 task_id: &'a str,
36 id: &'a str,
37 ) -> Pin<Box<dyn Future<Output = A2aResult<Option<TaskPushNotificationConfig>>> + Send + 'a>>;
38
39 fn list<'a>(
45 &'a self,
46 task_id: &'a str,
47 ) -> Pin<Box<dyn Future<Output = A2aResult<Vec<TaskPushNotificationConfig>>> + Send + 'a>>;
48
49 fn delete<'a>(
55 &'a self,
56 task_id: &'a str,
57 id: &'a str,
58 ) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>>;
59}
60
61const MAX_PUSH_CONFIGS_PER_TASK: usize = 100;
65
66#[derive(Debug, Default)]
68pub struct InMemoryPushConfigStore {
69 configs: RwLock<HashMap<(String, String), TaskPushNotificationConfig>>,
70}
71
72impl InMemoryPushConfigStore {
73 #[must_use]
75 pub fn new() -> Self {
76 Self::default()
77 }
78}
79
80#[allow(clippy::manual_async_fn)]
81impl PushConfigStore for InMemoryPushConfigStore {
82 fn set<'a>(
83 &'a self,
84 mut config: TaskPushNotificationConfig,
85 ) -> Pin<Box<dyn Future<Output = A2aResult<TaskPushNotificationConfig>> + Send + 'a>> {
86 Box::pin(async move {
87 let id = config
89 .id
90 .clone()
91 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
92 config.id = Some(id.clone());
93
94 let key = (config.task_id.clone(), id);
95 let mut store = self.configs.write().await;
96
97 if !store.contains_key(&key) {
99 let task_id = &config.task_id;
100 let count = store.keys().filter(|(tid, _)| tid == task_id).count();
101 if count >= MAX_PUSH_CONFIGS_PER_TASK {
102 drop(store);
103 return Err(a2a_protocol_types::error::A2aError::invalid_params(format!(
104 "push config limit exceeded: task {task_id} already has {count} configs (max {MAX_PUSH_CONFIGS_PER_TASK})"
105 )));
106 }
107 }
108
109 store.insert(key, config.clone());
110 drop(store);
111 Ok(config)
112 })
113 }
114
115 fn get<'a>(
116 &'a self,
117 task_id: &'a str,
118 id: &'a str,
119 ) -> Pin<Box<dyn Future<Output = A2aResult<Option<TaskPushNotificationConfig>>> + Send + 'a>>
120 {
121 Box::pin(async move {
122 let store = self.configs.read().await;
123 let key = (task_id.to_owned(), id.to_owned());
124 let result = store.get(&key).cloned();
125 drop(store);
126 Ok(result)
127 })
128 }
129
130 fn list<'a>(
131 &'a self,
132 task_id: &'a str,
133 ) -> Pin<Box<dyn Future<Output = A2aResult<Vec<TaskPushNotificationConfig>>> + Send + 'a>> {
134 Box::pin(async move {
135 let store = self.configs.read().await;
136 let configs: Vec<_> = store
137 .iter()
138 .filter(|((tid, _), _)| tid == task_id)
139 .map(|(_, v)| v.clone())
140 .collect();
141 drop(store);
142 Ok(configs)
143 })
144 }
145
146 fn delete<'a>(
147 &'a self,
148 task_id: &'a str,
149 id: &'a str,
150 ) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> {
151 Box::pin(async move {
152 let mut store = self.configs.write().await;
153 let key = (task_id.to_owned(), id.to_owned());
154 store.remove(&key);
155 drop(store);
156 Ok(())
157 })
158 }
159}