1use std::collections::HashMap;
9use std::sync::Arc;
10use std::time::Instant;
11
12use a2a_protocol_types::events::{StreamResponse, TaskStatusUpdateEvent};
13use a2a_protocol_types::params::MessageSendParams;
14use a2a_protocol_types::responses::SendMessageResponse;
15use a2a_protocol_types::task::{ContextId, Task, TaskId, TaskState, TaskStatus};
16
17use crate::error::{ServerError, ServerResult};
18use crate::request_context::RequestContext;
19use crate::streaming::EventQueueWriter;
20
21use super::helpers::{build_call_context, validate_id};
22use super::{CancellationEntry, RequestHandler, SendMessageResult};
23
24fn json_byte_len(value: &serde_json::Value) -> serde_json::Result<usize> {
26 struct CountWriter(usize);
27 impl std::io::Write for CountWriter {
28 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
29 self.0 += buf.len();
30 Ok(buf.len())
31 }
32 fn flush(&mut self) -> std::io::Result<()> {
33 Ok(())
34 }
35 }
36 let mut w = CountWriter(0);
37 serde_json::to_writer(&mut w, value)?;
38 Ok(w.0)
39}
40
41impl RequestHandler {
42 pub async fn on_send_message(
51 &self,
52 params: MessageSendParams,
53 streaming: bool,
54 headers: Option<&HashMap<String, String>>,
55 ) -> ServerResult<SendMessageResult> {
56 let method_name = if streaming {
57 "SendStreamingMessage"
58 } else {
59 "SendMessage"
60 };
61 let start = Instant::now();
62 trace_info!(method = method_name, streaming, "handling send message");
63 self.metrics.on_request(method_name);
64
65 let tenant = params.tenant.clone().unwrap_or_default();
66 let result = crate::store::tenant::TenantContext::scope(tenant, async {
67 self.send_message_inner(params, streaming, method_name, headers)
68 .await
69 })
70 .await;
71 let elapsed = start.elapsed();
72 match &result {
73 Ok(_) => {
74 self.metrics.on_response(method_name);
75 self.metrics.on_latency(method_name, elapsed);
76 }
77 Err(e) => {
78 self.metrics.on_error(method_name, &e.to_string());
79 self.metrics.on_latency(method_name, elapsed);
80 }
81 }
82 result
83 }
84
85 #[allow(clippy::too_many_lines)]
88 async fn send_message_inner(
89 &self,
90 params: MessageSendParams,
91 streaming: bool,
92 method_name: &str,
93 headers: Option<&HashMap<String, String>>,
94 ) -> ServerResult<SendMessageResult> {
95 let call_ctx = build_call_context(method_name, headers);
96 self.interceptors.run_before(&call_ctx).await?;
97
98 if let Some(ref ctx_id) = params.message.context_id {
100 validate_id(&ctx_id.0, "context_id", self.limits.max_id_length)?;
101 }
102 if let Some(ref task_id) = params.message.task_id {
103 validate_id(&task_id.0, "task_id", self.limits.max_id_length)?;
104 }
105
106 if params.message.parts.is_empty() {
108 return Err(ServerError::InvalidParams(
109 "message must contain at least one part".into(),
110 ));
111 }
112
113 let max_meta = self.limits.max_metadata_size;
116 if let Some(ref meta) = params.message.metadata {
117 let meta_size = json_byte_len(meta).map_err(|_| {
118 ServerError::InvalidParams("message metadata is not serializable".into())
119 })?;
120 if meta_size > max_meta {
121 return Err(ServerError::InvalidParams(format!(
122 "message metadata exceeds maximum size ({meta_size} bytes, max {max_meta})"
123 )));
124 }
125 }
126 if let Some(ref meta) = params.metadata {
127 let meta_size = json_byte_len(meta).map_err(|_| {
128 ServerError::InvalidParams("request metadata is not serializable".into())
129 })?;
130 if meta_size > max_meta {
131 return Err(ServerError::InvalidParams(format!(
132 "request metadata exceeds maximum size ({meta_size} bytes, max {max_meta})"
133 )));
134 }
135 }
136
137 let task_id = TaskId::new(uuid::Uuid::new_v4().to_string());
140 let context_id = params
141 .context_id
142 .as_deref()
143 .or_else(|| params.message.context_id.as_ref().map(|c| c.0.as_str()))
144 .map_or_else(|| uuid::Uuid::new_v4().to_string(), ToString::to_string);
145
146 let context_lock = {
150 let mut locks = self.context_locks.write().await;
151 locks.entry(context_id.clone()).or_default().clone()
152 };
153 let context_guard = context_lock.lock().await;
154
155 let stored_task = self.find_task_by_context(&context_id).await?;
157
158 if let Some(ref msg_task_id) = params.message.task_id {
161 if let Some(ref stored) = stored_task {
162 if msg_task_id != &stored.id {
163 return Err(ServerError::InvalidParams(
164 "message task_id does not match task found for context".into(),
165 ));
166 }
167 } else {
168 let placeholder = Task {
171 id: msg_task_id.clone(),
172 context_id: ContextId::new(&context_id),
173 status: TaskStatus::with_timestamp(TaskState::Submitted),
174 history: None,
175 artifacts: None,
176 metadata: None,
177 };
178 if !self.task_store.insert_if_absent(placeholder).await? {
179 return Err(ServerError::InvalidParams(
180 "task_id already exists; cannot create duplicate".into(),
181 ));
182 }
183 }
184 }
185
186 let return_immediately = params
188 .configuration
189 .as_ref()
190 .and_then(|c| c.return_immediately)
191 .unwrap_or(false);
192
193 trace_debug!(
195 task_id = %task_id,
196 context_id = %context_id,
197 "creating task"
198 );
199 let task = Task {
200 id: task_id.clone(),
201 context_id: ContextId::new(&context_id),
202 status: TaskStatus::with_timestamp(TaskState::Submitted),
203 history: None,
204 artifacts: None,
205 metadata: None,
206 };
207
208 let mut ctx = RequestContext::new(params.message, task_id.clone(), context_id);
211 if let Some(stored) = stored_task {
212 ctx = ctx.with_stored_task(stored);
213 }
214 if let Some(meta) = params.metadata {
215 ctx = ctx.with_metadata(meta);
216 }
217
218 {
223 let stale_ids: Vec<TaskId> = {
227 let tokens = self.cancellation_tokens.read().await;
228 if tokens.len() >= self.limits.max_cancellation_tokens {
229 let now = Instant::now();
230 tokens
231 .iter()
232 .filter(|(_, entry)| {
233 entry.token.is_cancelled()
234 || now.duration_since(entry.created_at) >= self.limits.max_token_age
235 })
236 .map(|(id, _)| id.clone())
237 .collect()
238 } else {
239 Vec::new()
240 }
241 };
242
243 if !stale_ids.is_empty() {
245 let mut tokens = self.cancellation_tokens.write().await;
246 for id in &stale_ids {
247 tokens.remove(id);
248 }
249 }
250
251 let mut tokens = self.cancellation_tokens.write().await;
253 tokens.insert(
254 task_id.clone(),
255 CancellationEntry {
256 token: ctx.cancellation_token.clone(),
257 created_at: Instant::now(),
258 },
259 );
260 }
261
262 self.task_store.save(task.clone()).await?;
263
264 drop(context_guard);
267
268 let (writer, reader, persistence_rx) = if streaming {
272 let (w, r, p) = self
273 .event_queue_manager
274 .get_or_create_with_persistence(&task_id)
275 .await;
276 let r = match r {
277 Some(r) => r,
278 None => {
279 self.event_queue_manager
281 .subscribe(&task_id)
282 .await
283 .ok_or_else(|| {
284 ServerError::Internal("event queue disappeared during subscribe".into())
285 })?
286 }
287 };
288 (w, r, p)
289 } else {
290 let (w, r) = self.event_queue_manager.get_or_create(&task_id).await;
291 let r = match r {
292 Some(r) => r,
293 None => {
294 self.event_queue_manager
296 .subscribe(&task_id)
297 .await
298 .ok_or_else(|| {
299 ServerError::Internal("event queue disappeared during subscribe".into())
300 })?
301 }
302 };
303 (w, r, None)
304 };
305
306 let executor = Arc::clone(&self.executor);
310 let task_id_for_cleanup = task_id.clone();
311 let event_queue_mgr = self.event_queue_manager.clone();
312 let cancel_tokens = Arc::clone(&self.cancellation_tokens);
313 let executor_timeout = self.executor_timeout;
314 let executor_handle = tokio::spawn(async move {
315 trace_debug!(task_id = %ctx.task_id, "executor started");
316
317 #[allow(clippy::items_after_statements)]
322 struct CleanupGuard {
323 task_id: Option<TaskId>,
324 queue_mgr: crate::streaming::EventQueueManager,
325 tokens: std::sync::Arc<tokio::sync::RwLock<HashMap<TaskId, CancellationEntry>>>,
326 }
327 #[allow(clippy::items_after_statements)]
328 impl Drop for CleanupGuard {
329 fn drop(&mut self) {
330 if let Some(tid) = self.task_id.take() {
331 let qmgr = self.queue_mgr.clone();
332 let tokens = std::sync::Arc::clone(&self.tokens);
333 tokio::task::spawn(async move {
334 qmgr.destroy(&tid).await;
335 tokens.write().await.remove(&tid);
336 });
337 }
338 }
339 }
340 let mut cleanup_guard = CleanupGuard {
341 task_id: Some(task_id_for_cleanup.clone()),
342 queue_mgr: event_queue_mgr.clone(),
343 tokens: Arc::clone(&cancel_tokens),
344 };
345
346 let result = {
348 let exec_future = if let Some(timeout) = executor_timeout {
349 tokio::time::timeout(timeout, executor.execute(&ctx, writer.as_ref()))
350 .await
351 .unwrap_or_else(|_| {
352 Err(a2a_protocol_types::error::A2aError::internal(format!(
353 "executor timed out after {}s",
354 timeout.as_secs()
355 )))
356 })
357 } else {
358 executor.execute(&ctx, writer.as_ref()).await
359 };
360 exec_future
361 };
362
363 if let Err(ref e) = result {
364 trace_error!(task_id = %ctx.task_id, error = %e, "executor failed");
365 let fail_event = StreamResponse::StatusUpdate(TaskStatusUpdateEvent {
367 task_id: ctx.task_id.clone(),
368 context_id: ContextId::new(ctx.context_id.clone()),
369 status: TaskStatus::with_timestamp(TaskState::Failed),
370 metadata: Some(serde_json::json!({ "error": e.to_string() })),
371 });
372 if let Err(_write_err) = writer.write(fail_event).await {
373 trace_error!(
374 task_id = %ctx.task_id,
375 error = %_write_err,
376 "failed to write failure event to queue"
377 );
378 }
379 }
380 drop(writer);
382 event_queue_mgr.destroy(&task_id_for_cleanup).await;
385 cancel_tokens.write().await.remove(&task_id_for_cleanup);
386 cleanup_guard.task_id = None;
387 });
388
389 self.interceptors.run_after(&call_ctx).await?;
390
391 if streaming {
392 self.spawn_background_event_processor(task_id.clone(), executor_handle, persistence_rx);
402 Ok(SendMessageResult::Stream(reader))
403 } else if return_immediately {
404 Ok(SendMessageResult::Response(SendMessageResponse::Task(task)))
406 } else {
407 let final_task = self
410 .collect_events(reader, task_id.clone(), executor_handle)
411 .await?;
412 Ok(SendMessageResult::Response(SendMessageResponse::Task(
413 final_task,
414 )))
415 }
416 }
417}
418
419#[cfg(test)]
420mod tests {
421 use super::*;
422 use a2a_protocol_types::message::{Message, MessageId, MessageRole, Part};
423 use a2a_protocol_types::params::{MessageSendParams, SendMessageConfiguration};
424 use a2a_protocol_types::task::ContextId;
425
426 use crate::agent_executor;
427 use crate::builder::RequestHandlerBuilder;
428
429 struct DummyExecutor;
430 agent_executor!(DummyExecutor, |_ctx, _queue| async { Ok(()) });
431
432 fn make_handler() -> RequestHandler {
433 RequestHandlerBuilder::new(DummyExecutor)
434 .build()
435 .expect("default build should succeed")
436 }
437
438 fn make_params(context_id: Option<&str>) -> MessageSendParams {
439 MessageSendParams {
440 context_id: None,
441 message: Message {
442 id: MessageId::new("msg-1"),
443 role: MessageRole::User,
444 parts: vec![Part::text("hello")],
445 context_id: context_id.map(ContextId::new),
446 task_id: None,
447 reference_task_ids: None,
448 extensions: None,
449 metadata: None,
450 },
451 configuration: None,
452 metadata: None,
453 tenant: None,
454 }
455 }
456
457 #[tokio::test]
458 async fn empty_message_parts_returns_invalid_params() {
459 let handler = make_handler();
460 let mut params = make_params(None);
461 params.message.parts = vec![];
462
463 let result = handler.on_send_message(params, false, None).await;
464
465 assert!(
466 matches!(result, Err(ServerError::InvalidParams(_))),
467 "expected InvalidParams for empty parts"
468 );
469 }
470
471 #[tokio::test]
472 async fn oversized_message_metadata_returns_invalid_params() {
473 let handler = make_handler();
474 let mut params = make_params(None);
475 let big_value = "x".repeat(1_100_000);
477 params.message.metadata = Some(serde_json::json!(big_value));
478
479 let result = handler.on_send_message(params, false, None).await;
480
481 assert!(
482 matches!(result, Err(ServerError::InvalidParams(_))),
483 "expected InvalidParams for oversized message metadata"
484 );
485 }
486
487 #[tokio::test]
488 async fn oversized_request_metadata_returns_invalid_params() {
489 let handler = make_handler();
490 let mut params = make_params(None);
491 let big_value = "x".repeat(1_100_000);
493 params.metadata = Some(serde_json::json!(big_value));
494
495 let result = handler.on_send_message(params, false, None).await;
496
497 assert!(
498 matches!(result, Err(ServerError::InvalidParams(_))),
499 "expected InvalidParams for oversized request metadata"
500 );
501 }
502
503 #[tokio::test]
504 async fn valid_message_returns_ok() {
505 let handler = make_handler();
506 let params = make_params(None);
507
508 let result = handler.on_send_message(params, false, None).await;
509
510 let send_result = result.expect("expected Ok for valid message");
511 assert!(
512 matches!(
513 send_result,
514 SendMessageResult::Response(SendMessageResponse::Task(_))
515 ),
516 "expected Response(Task) for non-streaming send"
517 );
518 }
519
520 #[tokio::test]
521 async fn return_immediately_returns_task() {
522 let handler = make_handler();
523 let mut params = make_params(None);
524 params.configuration = Some(SendMessageConfiguration {
525 accepted_output_modes: vec!["text/plain".into()],
526 task_push_notification_config: None,
527 history_length: None,
528 return_immediately: Some(true),
529 });
530
531 let result = handler.on_send_message(params, false, None).await;
532
533 assert!(
534 matches!(
535 result,
536 Ok(SendMessageResult::Response(SendMessageResponse::Task(_)))
537 ),
538 "expected Response(Task) for return_immediately=true"
539 );
540 }
541
542 #[tokio::test]
543 async fn empty_context_id_returns_invalid_params() {
544 let handler = make_handler();
545 let params = make_params(Some(""));
546
547 let result = handler.on_send_message(params, false, None).await;
548
549 assert!(
550 matches!(result, Err(ServerError::InvalidParams(_))),
551 "expected InvalidParams for empty context_id"
552 );
553 }
554
555 #[tokio::test]
556 async fn too_long_context_id_returns_invalid_params() {
557 use crate::handler::limits::HandlerLimits;
559
560 let handler = RequestHandlerBuilder::new(DummyExecutor)
561 .with_handler_limits(HandlerLimits::default().with_max_id_length(10))
562 .build()
563 .unwrap();
564 let long_ctx = "x".repeat(20);
565 let params = make_params(Some(&long_ctx));
566
567 let result = handler.on_send_message(params, false, None).await;
568 assert!(
569 matches!(result, Err(ServerError::InvalidParams(ref msg)) if msg.contains("maximum length")),
570 "expected InvalidParams for too-long context_id"
571 );
572 }
573
574 #[tokio::test]
575 async fn too_long_task_id_returns_invalid_params() {
576 use crate::handler::limits::HandlerLimits;
578 use a2a_protocol_types::task::TaskId;
579
580 let handler = RequestHandlerBuilder::new(DummyExecutor)
581 .with_handler_limits(HandlerLimits::default().with_max_id_length(10))
582 .build()
583 .unwrap();
584 let mut params = make_params(None);
585 params.message.task_id = Some(TaskId::new("a".repeat(20)));
586
587 let result = handler.on_send_message(params, false, None).await;
588 assert!(
589 matches!(result, Err(ServerError::InvalidParams(ref msg)) if msg.contains("maximum length")),
590 "expected InvalidParams for too-long task_id"
591 );
592 }
593
594 #[tokio::test]
595 async fn empty_task_id_returns_invalid_params() {
596 use a2a_protocol_types::task::TaskId;
598
599 let handler = make_handler();
600 let mut params = make_params(None);
601 params.message.task_id = Some(TaskId::new(""));
602
603 let result = handler.on_send_message(params, false, None).await;
604 assert!(
605 matches!(result, Err(ServerError::InvalidParams(ref msg)) if msg.contains("empty")),
606 "expected InvalidParams for empty task_id"
607 );
608 }
609
610 #[tokio::test]
611 async fn task_id_mismatch_returns_invalid_params() {
612 use a2a_protocol_types::task::{Task, TaskId, TaskState, TaskStatus};
614
615 let handler = make_handler();
616
617 let task = Task {
619 id: TaskId::new("stored-task-id"),
620 context_id: ContextId::new("ctx-existing"),
621 status: TaskStatus::new(TaskState::Completed),
622 history: None,
623 artifacts: None,
624 metadata: None,
625 };
626 handler.task_store.save(task).await.unwrap();
627
628 let mut params = make_params(Some("ctx-existing"));
630 params.message.task_id = Some(TaskId::new("different-task-id"));
631
632 let result = handler.on_send_message(params, false, None).await;
633 assert!(
634 matches!(result, Err(ServerError::InvalidParams(ref msg)) if msg.contains("does not match")),
635 "expected InvalidParams for task_id mismatch"
636 );
637 }
638
639 #[tokio::test]
640 async fn send_message_with_request_metadata() {
641 let handler = make_handler();
643 let mut params = make_params(None);
644 params.metadata = Some(serde_json::json!({"key": "value"}));
645
646 let result = handler.on_send_message(params, false, None).await;
647 assert!(
648 result.is_ok(),
649 "send_message with request metadata should succeed"
650 );
651 }
652
653 #[tokio::test]
654 async fn send_message_error_path_records_metrics() {
655 use crate::call_context::CallContext;
657 use crate::interceptor::ServerInterceptor;
658 use std::future::Future;
659 use std::pin::Pin;
660
661 struct FailInterceptor;
662 impl ServerInterceptor for FailInterceptor {
663 fn before<'a>(
664 &'a self,
665 _ctx: &'a CallContext,
666 ) -> Pin<Box<dyn Future<Output = a2a_protocol_types::error::A2aResult<()>> + Send + 'a>>
667 {
668 Box::pin(async {
669 Err(a2a_protocol_types::error::A2aError::internal(
670 "forced failure",
671 ))
672 })
673 }
674 fn after<'a>(
675 &'a self,
676 _ctx: &'a CallContext,
677 ) -> Pin<Box<dyn Future<Output = a2a_protocol_types::error::A2aResult<()>> + Send + 'a>>
678 {
679 Box::pin(async { Ok(()) })
680 }
681 }
682
683 let handler = RequestHandlerBuilder::new(DummyExecutor)
684 .with_interceptor(FailInterceptor)
685 .build()
686 .unwrap();
687
688 let params = make_params(None);
689 let result = handler.on_send_message(params, false, None).await;
690 assert!(
691 result.is_err(),
692 "send_message should fail when interceptor rejects, exercising error metrics path"
693 );
694 }
695
696 #[tokio::test]
697 async fn send_streaming_message_error_path_records_metrics() {
698 use crate::call_context::CallContext;
700 use crate::interceptor::ServerInterceptor;
701 use std::future::Future;
702 use std::pin::Pin;
703
704 struct FailInterceptor;
705 impl ServerInterceptor for FailInterceptor {
706 fn before<'a>(
707 &'a self,
708 _ctx: &'a CallContext,
709 ) -> Pin<Box<dyn Future<Output = a2a_protocol_types::error::A2aResult<()>> + Send + 'a>>
710 {
711 Box::pin(async {
712 Err(a2a_protocol_types::error::A2aError::internal(
713 "forced failure",
714 ))
715 })
716 }
717 fn after<'a>(
718 &'a self,
719 _ctx: &'a CallContext,
720 ) -> Pin<Box<dyn Future<Output = a2a_protocol_types::error::A2aResult<()>> + Send + 'a>>
721 {
722 Box::pin(async { Ok(()) })
723 }
724 }
725
726 let handler = RequestHandlerBuilder::new(DummyExecutor)
727 .with_interceptor(FailInterceptor)
728 .build()
729 .unwrap();
730
731 let params = make_params(None);
732 let result = handler.on_send_message(params, true, None).await;
733 assert!(
734 result.is_err(),
735 "streaming send_message should fail when interceptor rejects"
736 );
737 }
738
739 #[tokio::test]
740 async fn streaming_mode_returns_stream_result() {
741 let handler = make_handler();
743 let params = make_params(None);
744
745 let result = handler.on_send_message(params, true, None).await;
746 assert!(
747 matches!(result, Ok(SendMessageResult::Stream(_))),
748 "expected Stream result in streaming mode"
749 );
750 }
751
752 #[tokio::test]
753 async fn send_message_with_stored_task_continuation() {
754 use a2a_protocol_types::task::{Task, TaskState, TaskStatus};
757
758 let handler = make_handler();
759
760 let task = Task {
762 id: TaskId::new("existing-task"),
763 context_id: ContextId::new("continue-ctx"),
764 status: TaskStatus::new(TaskState::Completed),
765 history: None,
766 artifacts: None,
767 metadata: None,
768 };
769 handler.task_store.save(task).await.unwrap();
770
771 let params = make_params(Some("continue-ctx"));
773 let result = handler.on_send_message(params, false, None).await;
774 assert!(
775 result.is_ok(),
776 "send_message with existing context should succeed"
777 );
778 }
779
780 #[tokio::test]
781 async fn send_message_with_headers() {
782 let handler = make_handler();
784 let params = make_params(None);
785 let mut headers = HashMap::new();
786 headers.insert("authorization".to_string(), "Bearer test-token".to_string());
787
788 let result = handler.on_send_message(params, false, Some(&headers)).await;
789 let send_result = result.expect("send_message with headers should succeed");
790 assert!(
791 matches!(
792 send_result,
793 SendMessageResult::Response(SendMessageResponse::Task(_))
794 ),
795 "expected Response(Task) for send with headers"
796 );
797 }
798
799 #[tokio::test]
800 async fn duplicate_task_id_without_context_match_returns_error() {
801 use a2a_protocol_types::task::{Task, TaskId as TId, TaskState, TaskStatus};
803
804 let handler = make_handler();
805
806 let task = Task {
808 id: TId::new("dup-task"),
809 context_id: ContextId::new("other-ctx"),
810 status: TaskStatus::new(TaskState::Completed),
811 history: None,
812 artifacts: None,
813 metadata: None,
814 };
815 handler.task_store.save(task).await.unwrap();
816
817 let mut params = make_params(Some("brand-new-ctx"));
819 params.message.task_id = Some(TId::new("dup-task"));
820
821 let result = handler.on_send_message(params, false, None).await;
822 assert!(
823 matches!(result, Err(ServerError::InvalidParams(ref msg)) if msg.contains("already exists")),
824 "expected InvalidParams for duplicate task_id"
825 );
826 }
827
828 #[tokio::test]
829 async fn send_message_with_tenant() {
830 let handler = make_handler();
832 let mut params = make_params(None);
833 params.tenant = Some("test-tenant".to_string());
834
835 let result = handler.on_send_message(params, false, None).await;
836 let send_result = result.expect("send_message with tenant should succeed");
837 assert!(
838 matches!(
839 send_result,
840 SendMessageResult::Response(SendMessageResponse::Task(_))
841 ),
842 "expected Response(Task) for send with tenant"
843 );
844 }
845
846 #[tokio::test]
847 async fn executor_timeout_returns_failed_task() {
848 use a2a_protocol_types::error::A2aResult;
850 use std::time::Duration;
851
852 struct SlowExecutor;
853 impl crate::executor::AgentExecutor for SlowExecutor {
854 fn execute<'a>(
855 &'a self,
856 _ctx: &'a crate::request_context::RequestContext,
857 _queue: &'a dyn crate::streaming::EventQueueWriter,
858 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = A2aResult<()>> + Send + 'a>>
859 {
860 Box::pin(async {
861 tokio::time::sleep(Duration::from_secs(60)).await;
862 Ok(())
863 })
864 }
865 }
866
867 let handler = RequestHandlerBuilder::new(SlowExecutor)
868 .with_executor_timeout(Duration::from_millis(50))
869 .build()
870 .unwrap();
871
872 let params = make_params(None);
873 let result = handler.on_send_message(params, false, None).await;
875 assert!(
877 result.is_ok(),
878 "executor timeout should still return a task result"
879 );
880 }
881
882 #[tokio::test]
883 async fn executor_failure_writes_failed_event() {
884 use a2a_protocol_types::error::{A2aError, A2aResult};
886
887 struct FailExecutor;
888 impl crate::executor::AgentExecutor for FailExecutor {
889 fn execute<'a>(
890 &'a self,
891 _ctx: &'a crate::request_context::RequestContext,
892 _queue: &'a dyn crate::streaming::EventQueueWriter,
893 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = A2aResult<()>> + Send + 'a>>
894 {
895 Box::pin(async { Err(A2aError::internal("executor exploded")) })
896 }
897 }
898
899 let handler = RequestHandlerBuilder::new(FailExecutor).build().unwrap();
900 let params = make_params(None);
901
902 let result = handler.on_send_message(params, false, None).await;
903 assert!(
905 result.is_ok(),
906 "executor failure should produce a task result"
907 );
908 }
909
910 #[tokio::test]
911 async fn cancellation_token_sweep_runs_when_map_is_full() {
912 use crate::handler::limits::HandlerLimits;
915
916 struct SlowExec;
918 impl crate::executor::AgentExecutor for SlowExec {
919 fn execute<'a>(
920 &'a self,
921 _ctx: &'a crate::request_context::RequestContext,
922 _queue: &'a dyn crate::streaming::EventQueueWriter,
923 ) -> std::pin::Pin<
924 Box<
925 dyn std::future::Future<Output = a2a_protocol_types::error::A2aResult<()>>
926 + Send
927 + 'a,
928 >,
929 > {
930 Box::pin(async {
931 tokio::time::sleep(std::time::Duration::from_secs(10)).await;
933 Ok(())
934 })
935 }
936 }
937
938 let handler = RequestHandlerBuilder::new(SlowExec)
939 .with_handler_limits(HandlerLimits::default().with_max_cancellation_tokens(2))
940 .build()
941 .unwrap();
942
943 for _ in 0..3 {
946 let params = make_params(None);
947 let _ = handler.on_send_message(params, true, None).await;
948 }
949 handler.shutdown().await;
952 }
953
954 #[tokio::test]
955 async fn stale_cancellation_tokens_cleaned_up() {
956 use crate::handler::limits::HandlerLimits;
958 use std::time::Duration;
959
960 struct SlowExec2;
962 impl crate::executor::AgentExecutor for SlowExec2 {
963 fn execute<'a>(
964 &'a self,
965 _ctx: &'a crate::request_context::RequestContext,
966 _queue: &'a dyn crate::streaming::EventQueueWriter,
967 ) -> std::pin::Pin<
968 Box<
969 dyn std::future::Future<Output = a2a_protocol_types::error::A2aResult<()>>
970 + Send
971 + 'a,
972 >,
973 > {
974 Box::pin(async {
975 tokio::time::sleep(Duration::from_secs(10)).await;
976 Ok(())
977 })
978 }
979 }
980
981 let handler = RequestHandlerBuilder::new(SlowExec2)
982 .with_handler_limits(
983 HandlerLimits::default()
984 .with_max_cancellation_tokens(2)
985 .with_max_token_age(Duration::from_millis(1)),
987 )
988 .build()
989 .unwrap();
990
991 for _ in 0..2 {
993 let params = make_params(None);
994 let _ = handler.on_send_message(params, true, None).await;
995 }
996
997 tokio::time::sleep(Duration::from_millis(50)).await;
999
1000 let params = make_params(None);
1004 let _ = handler.on_send_message(params, true, None).await;
1005
1006 handler.shutdown().await;
1008 }
1009
1010 #[tokio::test]
1011 async fn streaming_executor_failure_writes_error_event() {
1012 use a2a_protocol_types::error::{A2aError, A2aResult};
1014
1015 struct FailExecutor;
1016 impl crate::executor::AgentExecutor for FailExecutor {
1017 fn execute<'a>(
1018 &'a self,
1019 _ctx: &'a crate::request_context::RequestContext,
1020 _queue: &'a dyn crate::streaming::EventQueueWriter,
1021 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = A2aResult<()>> + Send + 'a>>
1022 {
1023 Box::pin(async { Err(A2aError::internal("streaming fail")) })
1024 }
1025 }
1026
1027 let handler = RequestHandlerBuilder::new(FailExecutor).build().unwrap();
1028 let params = make_params(None);
1029
1030 let result = handler.on_send_message(params, true, None).await;
1031 assert!(
1032 matches!(result, Ok(SendMessageResult::Stream(_))),
1033 "streaming executor failure should still return stream"
1034 );
1035 }
1036}