a2a_protocol_server/handler/
push_config.rs1use std::collections::HashMap;
9use std::time::Instant;
10
11use a2a_protocol_types::params::{DeletePushConfigParams, GetPushConfigParams};
12use a2a_protocol_types::push::TaskPushNotificationConfig;
13
14use crate::error::{ServerError, ServerResult};
15
16use super::helpers::build_call_context;
17use super::RequestHandler;
18
19impl RequestHandler {
20 pub async fn on_set_push_config(
26 &self,
27 config: TaskPushNotificationConfig,
28 headers: Option<&HashMap<String, String>>,
29 ) -> ServerResult<TaskPushNotificationConfig> {
30 let start = Instant::now();
31 self.metrics.on_request("CreateTaskPushNotificationConfig");
32
33 let tenant = config.tenant.clone().unwrap_or_default();
34 let result: ServerResult<_> = crate::store::tenant::TenantContext::scope(tenant, async {
35 let Some(ref sender) = self.push_sender else {
36 return Err(ServerError::PushNotSupported);
37 };
38 if !sender.allows_private_urls() {
43 crate::push::sender::validate_webhook_url(&config.url)?;
44 }
45
46 let call_ctx = build_call_context("CreateTaskPushNotificationConfig", headers);
47 self.interceptors.run_before(&call_ctx).await?;
48 let result = self.push_config_store.set(config).await?;
49 self.interceptors.run_after(&call_ctx).await?;
50 Ok(result)
51 })
52 .await;
53
54 let elapsed = start.elapsed();
55 match &result {
56 Ok(_) => {
57 self.metrics.on_response("CreateTaskPushNotificationConfig");
58 self.metrics
59 .on_latency("CreateTaskPushNotificationConfig", elapsed);
60 }
61 Err(e) => {
62 self.metrics
63 .on_error("CreateTaskPushNotificationConfig", &e.to_string());
64 self.metrics
65 .on_latency("CreateTaskPushNotificationConfig", elapsed);
66 }
67 }
68 result
69 }
70
71 pub async fn on_get_push_config(
77 &self,
78 params: GetPushConfigParams,
79 headers: Option<&HashMap<String, String>>,
80 ) -> ServerResult<TaskPushNotificationConfig> {
81 let start = Instant::now();
82 self.metrics.on_request("GetTaskPushNotificationConfig");
83
84 let tenant = params.tenant.clone().unwrap_or_default();
85 let result: ServerResult<_> = crate::store::tenant::TenantContext::scope(tenant, async {
86 let call_ctx = build_call_context("GetTaskPushNotificationConfig", headers);
87 self.interceptors.run_before(&call_ctx).await?;
88
89 let config = self
90 .push_config_store
91 .get(¶ms.task_id, ¶ms.id)
92 .await?
93 .ok_or_else(|| {
94 ServerError::InvalidParams(format!(
95 "push config not found: task={}, id={}",
96 params.task_id, params.id
97 ))
98 })?;
99
100 self.interceptors.run_after(&call_ctx).await?;
101 Ok(config)
102 })
103 .await;
104
105 let elapsed = start.elapsed();
106 match &result {
107 Ok(_) => {
108 self.metrics.on_response("GetTaskPushNotificationConfig");
109 self.metrics
110 .on_latency("GetTaskPushNotificationConfig", elapsed);
111 }
112 Err(e) => {
113 self.metrics
114 .on_error("GetTaskPushNotificationConfig", &e.to_string());
115 self.metrics
116 .on_latency("GetTaskPushNotificationConfig", elapsed);
117 }
118 }
119 result
120 }
121
122 pub async fn on_list_push_configs(
128 &self,
129 task_id: &str,
130 tenant: Option<&str>,
131 headers: Option<&HashMap<String, String>>,
132 ) -> ServerResult<Vec<TaskPushNotificationConfig>> {
133 let start = Instant::now();
134 self.metrics.on_request("ListTaskPushNotificationConfigs");
135
136 let tenant_owned = tenant.unwrap_or_default().to_owned();
137 let result: ServerResult<_> =
138 crate::store::tenant::TenantContext::scope(tenant_owned, async {
139 let call_ctx = build_call_context("ListTaskPushNotificationConfigs", headers);
140 self.interceptors.run_before(&call_ctx).await?;
141 let configs = self.push_config_store.list(task_id).await?;
142 self.interceptors.run_after(&call_ctx).await?;
143 Ok(configs)
144 })
145 .await;
146
147 let elapsed = start.elapsed();
148 match &result {
149 Ok(_) => {
150 self.metrics.on_response("ListTaskPushNotificationConfigs");
151 self.metrics
152 .on_latency("ListTaskPushNotificationConfigs", elapsed);
153 }
154 Err(e) => {
155 self.metrics
156 .on_error("ListTaskPushNotificationConfigs", &e.to_string());
157 self.metrics
158 .on_latency("ListTaskPushNotificationConfigs", elapsed);
159 }
160 }
161 result
162 }
163
164 pub async fn on_delete_push_config(
170 &self,
171 params: DeletePushConfigParams,
172 headers: Option<&HashMap<String, String>>,
173 ) -> ServerResult<()> {
174 let start = Instant::now();
175 self.metrics.on_request("DeleteTaskPushNotificationConfig");
176
177 let tenant = params.tenant.clone().unwrap_or_default();
178 let result: ServerResult<_> = crate::store::tenant::TenantContext::scope(tenant, async {
179 let call_ctx = build_call_context("DeleteTaskPushNotificationConfig", headers);
180 self.interceptors.run_before(&call_ctx).await?;
181 self.push_config_store
182 .delete(¶ms.task_id, ¶ms.id)
183 .await?;
184 self.interceptors.run_after(&call_ctx).await?;
185 Ok(())
186 })
187 .await;
188
189 let elapsed = start.elapsed();
190 match &result {
191 Ok(()) => {
192 self.metrics.on_response("DeleteTaskPushNotificationConfig");
193 self.metrics
194 .on_latency("DeleteTaskPushNotificationConfig", elapsed);
195 }
196 Err(e) => {
197 self.metrics
198 .on_error("DeleteTaskPushNotificationConfig", &e.to_string());
199 self.metrics
200 .on_latency("DeleteTaskPushNotificationConfig", elapsed);
201 }
202 }
203 result
204 }
205}
206
207#[cfg(test)]
208mod tests {
209 use super::*;
210 use crate::agent_executor;
211 use crate::builder::RequestHandlerBuilder;
212
213 struct DummyExecutor;
214 agent_executor!(DummyExecutor, |_ctx, _queue| async { Ok(()) });
215
216 fn make_handler() -> RequestHandler {
217 RequestHandlerBuilder::new(DummyExecutor).build().unwrap()
218 }
219
220 fn make_push_config(task_id: &str) -> TaskPushNotificationConfig {
221 TaskPushNotificationConfig {
222 tenant: None,
223 id: Some("cfg-1".to_owned()),
224 task_id: task_id.to_owned(),
225 url: "https://example.com/webhook".to_owned(),
226 token: None,
227 authentication: None,
228 }
229 }
230
231 #[tokio::test]
234 async fn set_push_config_without_sender_returns_push_not_supported() {
235 let handler = make_handler();
236 let config = make_push_config("task-1");
237 let result = handler.on_set_push_config(config, None).await;
238 assert!(
239 matches!(result, Err(crate::error::ServerError::PushNotSupported)),
240 "expected PushNotSupported, got: {result:?}"
241 );
242 }
243
244 #[tokio::test]
247 async fn get_push_config_not_found_returns_invalid_params() {
248 use a2a_protocol_types::params::GetPushConfigParams;
249
250 let handler = make_handler();
251 let params = GetPushConfigParams {
252 tenant: None,
253 task_id: "no-task".to_owned(),
254 id: "no-id".to_owned(),
255 };
256 let result = handler.on_get_push_config(params, None).await;
257 assert!(
258 matches!(result, Err(crate::error::ServerError::InvalidParams(_))),
259 "expected InvalidParams for missing config, got: {result:?}"
260 );
261 }
262
263 #[tokio::test]
266 async fn list_push_configs_empty_returns_empty_vec() {
267 let handler = make_handler();
268 let result = handler
269 .on_list_push_configs("no-task", None, None)
270 .await
271 .expect("list should succeed on empty store");
272 assert!(
273 result.is_empty(),
274 "listing configs for an unknown task should return an empty vec"
275 );
276 }
277
278 #[tokio::test]
281 async fn delete_push_config_nonexistent_returns_ok() {
282 use a2a_protocol_types::params::DeletePushConfigParams;
283
284 let handler = make_handler();
285 let params = DeletePushConfigParams {
286 tenant: None,
287 task_id: "no-task".to_owned(),
288 id: "no-id".to_owned(),
289 };
290 let result = handler.on_delete_push_config(params, None).await;
293 assert!(
294 result.is_ok(),
295 "deleting a non-existent push config should return Ok, got: {result:?}"
296 );
297 }
298
299 #[tokio::test]
302 async fn list_push_configs_error_path_records_metrics() {
303 use crate::call_context::CallContext;
306 use crate::interceptor::ServerInterceptor;
307 use std::future::Future;
308 use std::pin::Pin;
309
310 struct FailInterceptor;
311 impl ServerInterceptor for FailInterceptor {
312 fn before<'a>(
313 &'a self,
314 _ctx: &'a CallContext,
315 ) -> Pin<Box<dyn Future<Output = a2a_protocol_types::error::A2aResult<()>> + Send + 'a>>
316 {
317 Box::pin(async {
318 Err(a2a_protocol_types::error::A2aError::internal(
319 "forced failure",
320 ))
321 })
322 }
323 fn after<'a>(
324 &'a self,
325 _ctx: &'a CallContext,
326 ) -> Pin<Box<dyn Future<Output = a2a_protocol_types::error::A2aResult<()>> + Send + 'a>>
327 {
328 Box::pin(async { Ok(()) })
329 }
330 }
331
332 let handler = RequestHandlerBuilder::new(DummyExecutor)
333 .with_interceptor(FailInterceptor)
334 .build()
335 .unwrap();
336
337 let result = handler.on_list_push_configs("task-1", None, None).await;
338 assert!(
339 result.is_err(),
340 "list_push_configs should fail when interceptor rejects"
341 );
342 }
343
344 #[tokio::test]
345 async fn delete_push_config_error_path_records_metrics() {
346 use crate::call_context::CallContext;
349 use crate::interceptor::ServerInterceptor;
350 use a2a_protocol_types::params::DeletePushConfigParams;
351 use std::future::Future;
352 use std::pin::Pin;
353
354 struct FailInterceptor;
355 impl ServerInterceptor for FailInterceptor {
356 fn before<'a>(
357 &'a self,
358 _ctx: &'a CallContext,
359 ) -> Pin<Box<dyn Future<Output = a2a_protocol_types::error::A2aResult<()>> + Send + 'a>>
360 {
361 Box::pin(async {
362 Err(a2a_protocol_types::error::A2aError::internal(
363 "forced failure",
364 ))
365 })
366 }
367 fn after<'a>(
368 &'a self,
369 _ctx: &'a CallContext,
370 ) -> Pin<Box<dyn Future<Output = a2a_protocol_types::error::A2aResult<()>> + Send + 'a>>
371 {
372 Box::pin(async { Ok(()) })
373 }
374 }
375
376 let handler = RequestHandlerBuilder::new(DummyExecutor)
377 .with_interceptor(FailInterceptor)
378 .build()
379 .unwrap();
380
381 let params = DeletePushConfigParams {
382 tenant: None,
383 task_id: "task-1".to_owned(),
384 id: "cfg-1".to_owned(),
385 };
386 let result = handler.on_delete_push_config(params, None).await;
387 assert!(
388 result.is_err(),
389 "delete_push_config should fail when interceptor rejects"
390 );
391 }
392
393 #[tokio::test]
394 async fn set_push_config_error_path_records_metrics() {
395 let handler = make_handler();
398 let config = make_push_config("task-err");
399 let result = handler.on_set_push_config(config, None).await;
400 assert!(
401 result.is_err(),
402 "set_push_config without push sender should hit error metrics path"
403 );
404 }
405
406 #[tokio::test]
407 async fn get_push_config_error_path_records_metrics() {
408 use a2a_protocol_types::params::GetPushConfigParams;
411
412 let handler = make_handler();
413 let params = GetPushConfigParams {
414 tenant: None,
415 task_id: "missing-task".to_owned(),
416 id: "missing-id".to_owned(),
417 };
418 let result = handler.on_get_push_config(params, None).await;
419 assert!(
420 result.is_err(),
421 "get_push_config for missing config should hit error metrics path"
422 );
423 }
424}