a2a_protocol_server/push/
tenant_config_store.rs1use std::collections::HashMap;
12use std::future::Future;
13use std::pin::Pin;
14use std::sync::Arc;
15
16use a2a_protocol_types::error::A2aResult;
17use a2a_protocol_types::push::TaskPushNotificationConfig;
18use tokio::sync::RwLock;
19
20use super::config_store::{InMemoryPushConfigStore, PushConfigStore};
21use crate::store::tenant::TenantContext;
22
23#[derive(Debug)]
45pub struct TenantAwareInMemoryPushConfigStore {
46 stores: RwLock<HashMap<String, Arc<InMemoryPushConfigStore>>>,
47 max_tenants: usize,
48 max_configs_per_task: usize,
49}
50
51impl Default for TenantAwareInMemoryPushConfigStore {
52 fn default() -> Self {
53 Self::new()
54 }
55}
56
57impl TenantAwareInMemoryPushConfigStore {
58 #[must_use]
60 pub fn new() -> Self {
61 Self {
62 stores: RwLock::new(HashMap::new()),
63 max_tenants: 1000,
64 max_configs_per_task: 100,
65 }
66 }
67
68 #[must_use]
70 pub fn with_limits(max_tenants: usize, max_configs_per_task: usize) -> Self {
71 Self {
72 stores: RwLock::new(HashMap::new()),
73 max_tenants,
74 max_configs_per_task,
75 }
76 }
77
78 async fn get_store(&self) -> A2aResult<Arc<InMemoryPushConfigStore>> {
80 let tenant = TenantContext::current();
81
82 {
83 let stores = self.stores.read().await;
84 if let Some(store) = stores.get(&tenant) {
85 return Ok(Arc::clone(store));
86 }
87 }
88
89 let mut stores = self.stores.write().await;
90 if let Some(store) = stores.get(&tenant) {
91 return Ok(Arc::clone(store));
92 }
93
94 if stores.len() >= self.max_tenants {
95 return Err(a2a_protocol_types::error::A2aError::internal(format!(
96 "tenant limit exceeded: max {} tenants",
97 self.max_tenants
98 )));
99 }
100
101 let store = Arc::new(InMemoryPushConfigStore::with_max_configs_per_task(
102 self.max_configs_per_task,
103 ));
104 stores.insert(tenant, Arc::clone(&store));
105 drop(stores);
106 Ok(store)
107 }
108
109 pub async fn tenant_count(&self) -> usize {
111 self.stores.read().await.len()
112 }
113}
114
115#[allow(clippy::manual_async_fn)]
116impl PushConfigStore for TenantAwareInMemoryPushConfigStore {
117 fn set<'a>(
118 &'a self,
119 config: TaskPushNotificationConfig,
120 ) -> Pin<Box<dyn Future<Output = A2aResult<TaskPushNotificationConfig>> + Send + 'a>> {
121 Box::pin(async move {
122 let store = self.get_store().await?;
123 store.set(config).await
124 })
125 }
126
127 fn get<'a>(
128 &'a self,
129 task_id: &'a str,
130 id: &'a str,
131 ) -> Pin<Box<dyn Future<Output = A2aResult<Option<TaskPushNotificationConfig>>> + Send + 'a>>
132 {
133 Box::pin(async move {
134 let store = self.get_store().await?;
135 store.get(task_id, id).await
136 })
137 }
138
139 fn list<'a>(
140 &'a self,
141 task_id: &'a str,
142 ) -> Pin<Box<dyn Future<Output = A2aResult<Vec<TaskPushNotificationConfig>>> + Send + 'a>> {
143 Box::pin(async move {
144 let store = self.get_store().await?;
145 store.list(task_id).await
146 })
147 }
148
149 fn delete<'a>(
150 &'a self,
151 task_id: &'a str,
152 id: &'a str,
153 ) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> {
154 Box::pin(async move {
155 let store = self.get_store().await?;
156 store.delete(task_id, id).await
157 })
158 }
159}
160
161#[cfg(test)]
162mod tests {
163 use super::*;
164 use a2a_protocol_types::push::TaskPushNotificationConfig;
165
166 fn make_config(task_id: &str, id: Option<&str>, url: &str) -> TaskPushNotificationConfig {
167 TaskPushNotificationConfig {
168 tenant: None,
169 id: id.map(String::from),
170 task_id: task_id.to_string(),
171 url: url.to_string(),
172 token: None,
173 authentication: None,
174 }
175 }
176
177 #[tokio::test]
178 async fn new_store_has_zero_tenants() {
179 let store = TenantAwareInMemoryPushConfigStore::new();
180 assert_eq!(
181 store.tenant_count().await,
182 0,
183 "new store should have no tenants"
184 );
185 }
186
187 #[tokio::test]
188 async fn set_and_get_within_tenant_scope() {
189 let store = TenantAwareInMemoryPushConfigStore::new();
190 TenantContext::scope("tenant-a", async {
191 store
192 .set(make_config("task-1", Some("cfg-1"), "https://a.com/hook"))
193 .await
194 .expect("set should succeed");
195
196 let config = store
197 .get("task-1", "cfg-1")
198 .await
199 .expect("get should succeed")
200 .expect("config should exist");
201 assert_eq!(config.url, "https://a.com/hook");
202 })
203 .await;
204 }
205
206 #[tokio::test]
207 async fn tenant_isolation() {
208 let store = TenantAwareInMemoryPushConfigStore::new();
209
210 TenantContext::scope("tenant-a", async {
212 store
213 .set(make_config("task-1", Some("cfg-1"), "https://a.com"))
214 .await
215 .unwrap();
216 })
217 .await;
218
219 TenantContext::scope("tenant-b", async {
221 let result = store.get("task-1", "cfg-1").await.unwrap();
222 assert!(
223 result.is_none(),
224 "tenant-b should not see tenant-a's config"
225 );
226 })
227 .await;
228
229 TenantContext::scope("tenant-a", async {
231 let result = store.get("task-1", "cfg-1").await.unwrap();
232 assert!(result.is_some(), "tenant-a should still see its own config");
233 })
234 .await;
235 }
236
237 #[tokio::test]
238 async fn tenant_count_tracks_distinct_tenants() {
239 let store = TenantAwareInMemoryPushConfigStore::new();
240
241 TenantContext::scope("tenant-a", async {
242 store
243 .set(make_config("t1", Some("c1"), "https://a.com"))
244 .await
245 .unwrap();
246 })
247 .await;
248 assert_eq!(store.tenant_count().await, 1);
249
250 TenantContext::scope("tenant-b", async {
251 store
252 .set(make_config("t1", Some("c1"), "https://b.com"))
253 .await
254 .unwrap();
255 })
256 .await;
257 assert_eq!(store.tenant_count().await, 2);
258
259 TenantContext::scope("tenant-a", async {
261 store
262 .set(make_config("t2", Some("c2"), "https://a2.com"))
263 .await
264 .unwrap();
265 })
266 .await;
267 assert_eq!(
268 store.tenant_count().await,
269 2,
270 "re-using an existing tenant should not increase count"
271 );
272 }
273
274 #[tokio::test]
275 async fn with_limits_enforces_max_tenants() {
276 let store = TenantAwareInMemoryPushConfigStore::with_limits(1, 100);
277
278 TenantContext::scope("tenant-a", async {
279 store
280 .set(make_config("t1", Some("c1"), "https://a.com"))
281 .await
282 .unwrap();
283 })
284 .await;
285
286 let err = TenantContext::scope("tenant-b", async {
287 store
288 .set(make_config("t1", Some("c1"), "https://b.com"))
289 .await
290 })
291 .await
292 .expect_err("second tenant should exceed max_tenants limit");
293
294 let msg = format!("{err}");
295 assert!(
296 msg.contains("tenant limit exceeded"),
297 "error should mention tenant limit, got: {msg}"
298 );
299 }
300
301 #[tokio::test]
302 async fn with_limits_enforces_per_task_config_limit() {
303 let store = TenantAwareInMemoryPushConfigStore::with_limits(100, 1);
304
305 let err = TenantContext::scope("tenant-a", async {
306 store
307 .set(make_config("t1", Some("c1"), "https://a.com"))
308 .await
309 .unwrap();
310 store
311 .set(make_config("t1", Some("c2"), "https://b.com"))
312 .await
313 })
314 .await
315 .expect_err("second config should exceed per-task limit");
316
317 let msg = format!("{err}");
318 assert!(
319 msg.contains("limit exceeded"),
320 "error should mention limit exceeded, got: {msg}"
321 );
322 }
323
324 #[tokio::test]
325 async fn list_scoped_to_tenant() {
326 let store = TenantAwareInMemoryPushConfigStore::new();
327
328 TenantContext::scope("tenant-a", async {
329 store
330 .set(make_config("t1", Some("c1"), "https://a.com/1"))
331 .await
332 .unwrap();
333 store
334 .set(make_config("t1", Some("c2"), "https://a.com/2"))
335 .await
336 .unwrap();
337 })
338 .await;
339
340 TenantContext::scope("tenant-b", async {
341 store
342 .set(make_config("t1", Some("c3"), "https://b.com/1"))
343 .await
344 .unwrap();
345 })
346 .await;
347
348 let a_list =
349 TenantContext::scope("tenant-a", async { store.list("t1").await.unwrap() }).await;
350 assert_eq!(a_list.len(), 2, "tenant-a should see 2 configs for task t1");
351
352 let b_list =
353 TenantContext::scope("tenant-b", async { store.list("t1").await.unwrap() }).await;
354 assert_eq!(b_list.len(), 1, "tenant-b should see 1 config for task t1");
355 }
356
357 #[tokio::test]
358 async fn delete_scoped_to_tenant() {
359 let store = TenantAwareInMemoryPushConfigStore::new();
360
361 TenantContext::scope("tenant-a", async {
363 store
364 .set(make_config("t1", Some("c1"), "https://a.com"))
365 .await
366 .unwrap();
367 })
368 .await;
369 TenantContext::scope("tenant-b", async {
370 store
371 .set(make_config("t1", Some("c1"), "https://b.com"))
372 .await
373 .unwrap();
374 })
375 .await;
376
377 TenantContext::scope("tenant-a", async {
379 store.delete("t1", "c1").await.unwrap();
380 })
381 .await;
382
383 let a_result =
385 TenantContext::scope("tenant-a", async { store.get("t1", "c1").await.unwrap() }).await;
386 assert!(a_result.is_none(), "tenant-a config should be deleted");
387
388 let b_result =
390 TenantContext::scope("tenant-b", async { store.get("t1", "c1").await.unwrap() }).await;
391 assert!(
392 b_result.is_some(),
393 "tenant-b config should be unaffected by tenant-a's delete"
394 );
395 }
396
397 #[test]
400 fn default_impl_creates_empty_store() {
401 let store = TenantAwareInMemoryPushConfigStore::default();
402 assert_eq!(store.max_tenants, 1000);
403 assert_eq!(store.max_configs_per_task, 100);
404 }
405
406 #[tokio::test]
407 async fn default_is_same_as_new() {
408 let store = TenantAwareInMemoryPushConfigStore::default();
409 assert_eq!(store.tenant_count().await, 0);
410 TenantContext::scope("t", async {
412 store
413 .set(make_config("t1", Some("c1"), "https://x.com"))
414 .await
415 .unwrap();
416 })
417 .await;
418 assert_eq!(store.tenant_count().await, 1);
419 }
420}