1pub mod in_flight;
4pub mod obs;
5pub mod spawn;
6
7use std::net::SocketAddr;
8use std::sync::Arc;
9use std::time::Duration;
10
11use crate::error::A2aError;
12use crate::executor::AgentExecutor;
13use crate::middleware::{A2aMiddleware, MiddlewareStack, SecurityContribution};
14use crate::router::{AppState, build_router};
15use crate::storage::{
16 A2aAtomicStore, A2aPushNotificationStorage, A2aTaskStorage, InMemoryA2aStorage,
17};
18use crate::streaming::TaskEventBroker;
19
20#[derive(Debug, Clone)]
40#[non_exhaustive]
41pub struct RuntimeConfig {
42 pub blocking_task_timeout: Duration,
43 pub timeout_abort_grace: Duration,
44 pub cancel_handler_grace: Duration,
45 pub cancel_handler_poll_interval: Duration,
46 pub cross_instance_cancel_poll_interval: Duration,
47
48 pub push_max_attempts: usize,
49 pub push_backoff_base: Duration,
50 pub push_backoff_cap: Duration,
51 pub push_backoff_jitter: f32,
52 pub push_request_timeout: Duration,
53 pub push_connect_timeout: Duration,
54 pub push_read_timeout: Duration,
55 pub push_claim_expiry: Duration,
56 pub push_config_cache_ttl: Duration,
57 pub push_failed_delivery_retention: Duration,
58 pub push_max_payload_bytes: usize,
59 pub allow_insecure_push_urls: bool,
60
61 pub push_reclaim_sweep_interval: Duration,
70
71 pub push_reclaim_sweep_batch: usize,
75
76 pub supports_return_immediately: bool,
114}
115
116impl Default for RuntimeConfig {
117 fn default() -> Self {
118 Self {
119 blocking_task_timeout: Duration::from_secs(30),
120 timeout_abort_grace: Duration::from_secs(5),
121 cancel_handler_grace: Duration::from_secs(5),
122 cancel_handler_poll_interval: Duration::from_millis(100),
123 cross_instance_cancel_poll_interval: Duration::from_secs(1),
124 push_max_attempts: 8,
125 push_backoff_base: Duration::from_secs(2),
126 push_backoff_cap: Duration::from_secs(60),
127 push_backoff_jitter: 0.25,
128 push_request_timeout: Duration::from_secs(30),
129 push_connect_timeout: Duration::from_secs(5),
130 push_read_timeout: Duration::from_secs(30),
131 push_claim_expiry: Duration::from_secs(10 * 60),
132 push_config_cache_ttl: Duration::from_secs(5),
133 push_failed_delivery_retention: Duration::from_secs(7 * 24 * 60 * 60),
134 push_max_payload_bytes: 1024 * 1024,
135 allow_insecure_push_urls: false,
136 push_reclaim_sweep_interval: Duration::from_secs(60),
137 push_reclaim_sweep_batch: 64,
138 supports_return_immediately: true,
139 }
140 }
141}
142
143pub struct A2aServerBuilder {
145 executor: Option<Arc<dyn AgentExecutor>>,
146 task_storage: Option<Arc<dyn A2aTaskStorage>>,
147 push_storage: Option<Arc<dyn A2aPushNotificationStorage>>,
148 event_store: Option<Arc<dyn crate::storage::A2aEventStore>>,
149 atomic_store: Option<Arc<dyn A2aAtomicStore>>,
150 cancellation_supervisor: Option<Arc<dyn crate::storage::A2aCancellationSupervisor>>,
151 push_delivery_store: Option<Arc<dyn crate::push::A2aPushDeliveryStore>>,
152 bind_addr: SocketAddr,
153 middleware: Vec<Arc<dyn A2aMiddleware>>,
154 runtime_config: RuntimeConfig,
155}
156
157impl A2aServerBuilder {
158 pub fn new() -> Self {
159 Self {
160 executor: None,
161 task_storage: None,
162 push_storage: None,
163 event_store: None,
164 atomic_store: None,
165 cancellation_supervisor: None,
166 push_delivery_store: None,
167 bind_addr: ([0, 0, 0, 0], 3000).into(),
168 middleware: vec![],
169 runtime_config: RuntimeConfig::default(),
170 }
171 }
172
173 pub fn blocking_task_timeout(mut self, d: Duration) -> Self {
184 self.runtime_config.blocking_task_timeout = d;
185 self
186 }
187
188 pub fn timeout_abort_grace(mut self, d: Duration) -> Self {
191 self.runtime_config.timeout_abort_grace = d;
192 self
193 }
194
195 pub fn cancel_handler_grace(mut self, d: Duration) -> Self {
198 self.runtime_config.cancel_handler_grace = d;
199 self
200 }
201
202 pub fn cancel_handler_poll_interval(mut self, d: Duration) -> Self {
205 self.runtime_config.cancel_handler_poll_interval = d;
206 self
207 }
208
209 pub fn cross_instance_cancel_poll_interval(mut self, d: Duration) -> Self {
213 self.runtime_config.cross_instance_cancel_poll_interval = d;
214 self
215 }
216
217 pub fn push_max_attempts(mut self, n: usize) -> Self {
220 self.runtime_config.push_max_attempts = n;
221 self
222 }
223
224 pub fn push_backoff_base(mut self, d: Duration) -> Self {
227 self.runtime_config.push_backoff_base = d;
228 self
229 }
230
231 pub fn push_backoff_cap(mut self, d: Duration) -> Self {
233 self.runtime_config.push_backoff_cap = d;
234 self
235 }
236
237 pub fn push_backoff_jitter(mut self, j: f32) -> Self {
239 self.runtime_config.push_backoff_jitter = j;
240 self
241 }
242
243 pub fn push_request_timeout(mut self, d: Duration) -> Self {
245 self.runtime_config.push_request_timeout = d;
246 self
247 }
248
249 pub fn push_connect_timeout(mut self, d: Duration) -> Self {
251 self.runtime_config.push_connect_timeout = d;
252 self
253 }
254
255 pub fn push_read_timeout(mut self, d: Duration) -> Self {
257 self.runtime_config.push_read_timeout = d;
258 self
259 }
260
261 pub fn push_claim_expiry(mut self, d: Duration) -> Self {
265 self.runtime_config.push_claim_expiry = d;
266 self
267 }
268
269 pub fn push_config_cache_ttl(mut self, d: Duration) -> Self {
271 self.runtime_config.push_config_cache_ttl = d;
272 self
273 }
274
275 pub fn push_failed_delivery_retention(mut self, d: Duration) -> Self {
278 self.runtime_config.push_failed_delivery_retention = d;
279 self
280 }
281
282 pub fn push_max_payload_bytes(mut self, bytes: usize) -> Self {
284 self.runtime_config.push_max_payload_bytes = bytes;
285 self
286 }
287
288 pub fn allow_insecure_push_urls(mut self, allow: bool) -> Self {
292 self.runtime_config.allow_insecure_push_urls = allow;
293 self
294 }
295
296 pub fn push_reclaim_sweep_interval(mut self, d: Duration) -> Self {
298 self.runtime_config.push_reclaim_sweep_interval = d;
299 self
300 }
301
302 pub fn push_reclaim_sweep_batch(mut self, n: usize) -> Self {
304 self.runtime_config.push_reclaim_sweep_batch = n;
305 self
306 }
307
308 pub fn executor(mut self, executor: impl AgentExecutor + 'static) -> Self {
309 self.executor = Some(Arc::new(executor));
310 self
311 }
312
313 pub fn storage<S>(mut self, storage: S) -> Self
327 where
328 S: A2aTaskStorage
329 + A2aPushNotificationStorage
330 + crate::storage::A2aEventStore
331 + A2aAtomicStore
332 + crate::storage::A2aCancellationSupervisor
333 + Clone
334 + 'static,
335 {
336 self.task_storage = Some(Arc::new(storage.clone()));
337 self.push_storage = Some(Arc::new(storage.clone()));
338 self.event_store = Some(Arc::new(storage.clone()));
339 self.atomic_store = Some(Arc::new(storage.clone()));
340 self.cancellation_supervisor = Some(Arc::new(storage));
341 self
342 }
343
344 pub fn cancellation_supervisor(
348 mut self,
349 supervisor: impl crate::storage::A2aCancellationSupervisor + 'static,
350 ) -> Self {
351 self.cancellation_supervisor = Some(Arc::new(supervisor));
352 self
353 }
354
355 pub fn task_storage(mut self, storage: impl A2aTaskStorage + 'static) -> Self {
357 self.task_storage = Some(Arc::new(storage));
358 self
359 }
360
361 pub fn push_storage(mut self, storage: impl A2aPushNotificationStorage + 'static) -> Self {
363 self.push_storage = Some(Arc::new(storage));
364 self
365 }
366
367 pub fn event_store(mut self, store: impl crate::storage::A2aEventStore + 'static) -> Self {
369 self.event_store = Some(Arc::new(store));
370 self
371 }
372
373 pub fn atomic_store(mut self, store: impl A2aAtomicStore + 'static) -> Self {
375 self.atomic_store = Some(Arc::new(store));
376 self
377 }
378
379 pub fn push_delivery_store(
387 mut self,
388 store: impl crate::push::A2aPushDeliveryStore + 'static,
389 ) -> Self {
390 self.push_delivery_store = Some(Arc::new(store));
391 self
392 }
393
394 pub fn bind(mut self, addr: impl Into<SocketAddr>) -> Self {
395 self.bind_addr = addr.into();
396 self
397 }
398
399 pub fn middleware(mut self, mw: Arc<dyn A2aMiddleware>) -> Self {
402 self.middleware.push(mw);
403 self
404 }
405
406 pub fn build(self) -> Result<A2aServer, A2aError> {
407 let executor = self
408 .executor
409 .ok_or(A2aError::Internal("executor is required".into()))?;
410
411 let default_storage = InMemoryA2aStorage::new();
412 let task_storage = self
413 .task_storage
414 .unwrap_or_else(|| Arc::new(default_storage.clone()));
415 let push_storage = self
416 .push_storage
417 .unwrap_or_else(|| Arc::new(default_storage.clone()));
418 let event_store: Arc<dyn crate::storage::A2aEventStore> = self
419 .event_store
420 .unwrap_or_else(|| Arc::new(default_storage.clone()));
421 let atomic_store: Arc<dyn A2aAtomicStore> = self
422 .atomic_store
423 .unwrap_or_else(|| Arc::new(default_storage.clone()));
424 let cancellation_supervisor: Arc<dyn crate::storage::A2aCancellationSupervisor> = self
425 .cancellation_supervisor
426 .unwrap_or_else(|| Arc::new(default_storage.clone()));
427 let push_delivery_store: Option<Arc<dyn crate::push::A2aPushDeliveryStore>> =
428 self.push_delivery_store;
429
430 let task_backend = task_storage.backend_name();
432 let push_backend = push_storage.backend_name();
433 let event_backend = event_store.backend_name();
434 let atomic_backend = atomic_store.backend_name();
435 let supervisor_backend = cancellation_supervisor.backend_name();
436 if task_backend != push_backend
437 || task_backend != event_backend
438 || task_backend != atomic_backend
439 || task_backend != supervisor_backend
440 {
441 return Err(A2aError::Internal(format!(
442 "Storage backend mismatch: task={task_backend}, push={push_backend}, \
443 event={event_backend}, atomic={atomic_backend}, \
444 cancellation_supervisor={supervisor_backend}. \
445 ADR-009 requires all storage traits to share the same backend."
446 )));
447 }
448
449 match (
456 push_delivery_store.is_some(),
457 atomic_store.push_dispatch_enabled(),
458 ) {
459 (true, true) | (false, false) => {}
460 (true, false) => {
461 return Err(A2aError::Internal(
462 "push_delivery_store wired but atomic_store.push_dispatch_enabled() \
463 is false. Call .with_push_dispatch_enabled(true) on the backend \
464 storage before passing it to .storage()."
465 .into(),
466 ));
467 }
468 (false, true) => {
469 return Err(A2aError::Internal(
470 "atomic_store.push_dispatch_enabled() is true but no \
471 push_delivery_store is wired. Pending-dispatch markers would be \
472 written with no consumer, imposing load-bearing infra for no \
473 benefit. If you need to populate markers for an external \
474 consumer, open an issue for a distinctly-named opt-in — for now, \
475 this configuration is rejected."
476 .into(),
477 ));
478 }
479 }
480
481 let push_dispatcher: Option<Arc<crate::push::PushDispatcher>> =
482 if let Some(push_delivery) = push_delivery_store.as_ref() {
483 let push_delivery_backend = push_delivery.backend_name();
484 if task_backend != push_delivery_backend {
485 return Err(A2aError::Internal(format!(
486 "Storage backend mismatch: task={task_backend}, \
487 push_delivery={push_delivery_backend}. \
488 ADR-009 requires all storage traits to share the same backend."
489 )));
490 }
491
492 let retry_horizon = self
499 .runtime_config
500 .push_backoff_cap
501 .saturating_mul(self.runtime_config.push_max_attempts as u32);
502 if self.runtime_config.push_claim_expiry <= retry_horizon {
503 return Err(A2aError::Internal(format!(
504 "push_claim_expiry ({:?}) must be greater than retry horizon \
505 (push_max_attempts={} * push_backoff_cap={:?} = {:?}). \
506 Raise push_claim_expiry or lower push_max_attempts/push_backoff_cap.",
507 self.runtime_config.push_claim_expiry,
508 self.runtime_config.push_max_attempts,
509 self.runtime_config.push_backoff_cap,
510 retry_horizon
511 )));
512 }
513
514 let delivery_cfg = crate::push::delivery::PushDeliveryConfig {
519 max_attempts: self.runtime_config.push_max_attempts as u32,
520 backoff_base: self.runtime_config.push_backoff_base,
521 backoff_cap: self.runtime_config.push_backoff_cap,
522 backoff_jitter: self.runtime_config.push_backoff_jitter,
523 request_timeout: self.runtime_config.push_request_timeout,
524 connect_timeout: self.runtime_config.push_connect_timeout,
525 read_timeout: self.runtime_config.push_read_timeout,
526 claim_expiry: self.runtime_config.push_claim_expiry,
527 max_payload_bytes: self.runtime_config.push_max_payload_bytes,
528 allow_insecure_urls: self.runtime_config.allow_insecure_push_urls,
529 ..crate::push::delivery::PushDeliveryConfig::default()
530 };
531
532 let instance_id = format!("a2a-server-{}", uuid::Uuid::now_v7());
533 let worker = crate::push::delivery::PushDeliveryWorker::new(
534 push_delivery.clone(),
535 delivery_cfg,
536 None,
537 instance_id,
538 )
539 .map_err(|e| A2aError::Internal(format!("push worker build failed: {e}")))?;
540
541 Some(Arc::new(crate::push::PushDispatcher::new(
542 Arc::new(worker),
543 push_storage.clone(),
544 task_storage.clone(),
545 )))
546 } else {
547 None
548 };
549
550 let contributions: Vec<SecurityContribution> = self
552 .middleware
553 .iter()
554 .map(|m| m.security_contribution())
555 .collect();
556 let merged = merge_stacked_contributions(&contributions)?;
557
558 let public_materialized = apply_security_merge(executor.agent_card(), &merged);
563 validate_card_security_references(&public_materialized, "agent_card")?;
564 if let Some(extended_raw) = executor.extended_agent_card(None) {
565 let extended_materialized = apply_security_merge(extended_raw, &merged);
566 validate_card_security_references(&extended_materialized, "extended_agent_card")?;
567 }
568
569 Ok(A2aServer {
570 state: AppState {
571 executor,
572 task_storage,
573 push_storage,
574 event_store,
575 atomic_store,
576 event_broker: TaskEventBroker::new(),
577 middleware_stack: Arc::new(MiddlewareStack::new(self.middleware)),
578 runtime_config: self.runtime_config,
579 in_flight: Arc::new(crate::server::in_flight::InFlightRegistry::new()),
580 cancellation_supervisor,
581 push_delivery_store,
582 push_dispatcher,
583 durable_executor_queue: None,
586 },
587 merged_security: merged,
588 bind_addr: self.bind_addr,
589 })
590 }
591}
592
593impl Default for A2aServerBuilder {
594 fn default() -> Self {
595 Self::new()
596 }
597}
598
599fn merge_stacked_contributions(
604 contributions: &[SecurityContribution],
605) -> Result<SecurityContribution, A2aError> {
606 let mut merged = SecurityContribution::new();
607
608 if contributions.is_empty() {
609 return Ok(merged);
610 }
611
612 let mut seen_schemes: std::collections::HashMap<String, turul_a2a_proto::SecurityScheme> =
614 std::collections::HashMap::new();
615
616 for contrib in contributions {
617 for (name, scheme) in &contrib.schemes {
618 if let Some(existing) = seen_schemes.get(name) {
619 if !schemes_equivalent(existing, scheme) {
621 return Err(A2aError::Internal(format!(
622 "Security scheme collision: '{}' has conflicting definitions",
623 name
624 )));
625 }
626 } else {
628 seen_schemes.insert(name.clone(), scheme.clone());
629 merged.schemes.push((name.clone(), scheme.clone()));
630 }
631 }
632 }
633
634 let requirement_sets: Vec<&[turul_a2a_proto::SecurityRequirement]> = contributions
636 .iter()
637 .filter(|c| !c.requirements.is_empty())
638 .map(|c| c.requirements.as_slice())
639 .collect();
640
641 if requirement_sets.is_empty() {
642 return Ok(merged);
643 }
644
645 let mut combined: Vec<turul_a2a_proto::SecurityRequirement> = requirement_sets[0].to_vec();
646
647 for alternatives in &requirement_sets[1..] {
648 let mut new_combined = Vec::new();
649 for existing in &combined {
650 for alt in *alternatives {
651 let mut merged_schemes = existing.schemes.clone();
652 for (name, scopes) in &alt.schemes {
653 merged_schemes
654 .entry(name.clone())
655 .and_modify(|existing_scopes| {
656 for s in &scopes.list {
658 if !existing_scopes.list.contains(s) {
659 existing_scopes.list.push(s.clone());
660 }
661 }
662 existing_scopes.list.sort();
663 existing_scopes.list.dedup();
664 })
665 .or_insert_with(|| scopes.clone());
666 }
667 new_combined.push(turul_a2a_proto::SecurityRequirement {
668 schemes: merged_schemes,
669 });
670 }
671 }
672 combined = new_combined;
673 }
674
675 merged.requirements = combined;
676 Ok(merged)
677}
678
679fn apply_security_merge(
684 mut card: turul_a2a_proto::AgentCard,
685 security: &SecurityContribution,
686) -> turul_a2a_proto::AgentCard {
687 for (name, scheme) in &security.schemes {
688 card.security_schemes
689 .entry(name.clone())
690 .or_insert_with(|| scheme.clone());
691 }
692 for req in &security.requirements {
693 card.security_requirements.push(req.clone());
694 }
695 card
696}
697
698fn validate_card_security_references(
706 card: &turul_a2a_proto::AgentCard,
707 surface: &str,
708) -> Result<(), A2aError> {
709 for req in &card.security_requirements {
710 for scheme_name in req.schemes.keys() {
711 if !card.security_schemes.contains_key(scheme_name) {
712 return Err(A2aError::InvalidRequest {
713 message: format!(
714 "{surface}: agent-level security requirement references \
715 undeclared scheme '{scheme_name}'"
716 ),
717 });
718 }
719 }
720 }
721 for skill in &card.skills {
722 for req in &skill.security_requirements {
723 for scheme_name in req.schemes.keys() {
724 if !card.security_schemes.contains_key(scheme_name) {
725 return Err(A2aError::InvalidRequest {
726 message: format!(
727 "{surface}: skill '{skill_id}' references \
728 undeclared security scheme '{scheme_name}'",
729 skill_id = skill.id
730 ),
731 });
732 }
733 }
734 }
735 }
736 Ok(())
737}
738
739fn schemes_equivalent(
754 a: &turul_a2a_proto::SecurityScheme,
755 b: &turul_a2a_proto::SecurityScheme,
756) -> bool {
757 a == b
758}
759
760pub struct A2aServer {
762 state: AppState,
763 merged_security: SecurityContribution,
764 bind_addr: SocketAddr,
765}
766
767impl A2aServer {
768 pub fn builder() -> A2aServerBuilder {
769 A2aServerBuilder::new()
770 }
771
772 pub fn into_router(self) -> axum::Router {
775 let (state, had_security) = self.into_augmented_state();
776 if !had_security {
777 return build_router(state);
778 }
779 build_router(state)
780 }
781
782 #[cfg(feature = "grpc")]
792 pub fn into_tonic_router(self) -> crate::grpc::LayeredGrpcRouter {
793 let (state, _) = self.into_augmented_state();
794 let middleware_stack = state.middleware_stack.clone();
795 crate::grpc::make_grpc_router(state, middleware_stack)
796 }
797
798 pub(crate) fn into_augmented_state(self) -> (AppState, bool) {
804 if self.merged_security.is_empty() {
805 return (self.state, false);
806 }
807
808 let wrapped = SecurityAugmentedExecutor {
809 inner: self.state.executor.clone(),
810 security: self.merged_security,
811 };
812
813 let augmented = AppState {
814 executor: Arc::new(wrapped),
815 task_storage: self.state.task_storage,
816 push_storage: self.state.push_storage,
817 event_store: self.state.event_store,
818 atomic_store: self.state.atomic_store,
819 event_broker: self.state.event_broker,
820 middleware_stack: self.state.middleware_stack,
821 runtime_config: self.state.runtime_config,
822 in_flight: self.state.in_flight,
823 cancellation_supervisor: self.state.cancellation_supervisor,
824 push_delivery_store: self.state.push_delivery_store,
829 push_dispatcher: self.state.push_dispatcher,
830 durable_executor_queue: self.state.durable_executor_queue,
831 };
832
833 (augmented, true)
834 }
835
836 pub async fn run(self) -> Result<(), A2aError> {
838 let bind_addr = self.bind_addr;
839 let poller_registry = std::sync::Arc::clone(&self.state.in_flight);
845 let poller_supervisor = std::sync::Arc::clone(&self.state.cancellation_supervisor);
846 let poller_interval = self
847 .state
848 .runtime_config
849 .cross_instance_cancel_poll_interval;
850 let push_delivery_store_for_sweep = self.state.push_delivery_store.clone();
851 let self_push_dispatcher_for_sweep = self.state.push_dispatcher.clone();
852 let sweep_interval_for_task = self.state.runtime_config.push_reclaim_sweep_interval;
853 let sweep_batch_for_task = self.state.runtime_config.push_reclaim_sweep_batch;
854 let push_claim_expiry_for_sweep = self.state.runtime_config.push_claim_expiry;
855
856 let app = self.into_router();
857 let listener = tokio::net::TcpListener::bind(bind_addr)
858 .await
859 .map_err(|e| A2aError::Internal(format!("Failed to bind: {e}")))?;
860 tracing::info!("A2A server listening on {}", bind_addr);
861
862 let shutdown = tokio_util::sync::CancellationToken::new();
868 let poller_shutdown = shutdown.clone();
869 let poller_handle =
870 tokio::spawn(crate::server::in_flight::run_cross_instance_cancel_poller(
871 poller_registry,
872 poller_supervisor,
873 poller_interval,
874 poller_shutdown,
875 ));
876
877 let sweep_handle = match (
897 push_delivery_store_for_sweep,
898 self_push_dispatcher_for_sweep,
899 ) {
900 (Some(store), Some(dispatcher)) => {
901 let shutdown = shutdown.clone();
902 let interval = sweep_interval_for_task;
903 let batch = sweep_batch_for_task;
904 let pending_stale_threshold = push_claim_expiry_for_sweep;
905 Some(tokio::spawn(async move {
906 let mut ticker = tokio::time::interval(interval);
907 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
908 loop {
909 tokio::select! {
910 _ = shutdown.cancelled() => break,
911 _ = ticker.tick() => {
912 let cutoff = std::time::SystemTime::now()
913 .checked_sub(pending_stale_threshold)
914 .unwrap_or(std::time::SystemTime::UNIX_EPOCH);
915 match store
916 .list_stale_pending_dispatches(cutoff, batch)
917 .await
918 {
919 Ok(rows) if !rows.is_empty() => {
920 tracing::warn!(
921 target: "turul_a2a::push_pending_dispatches_stale",
922 count = rows.len(),
923 "reclaim sweep found stale pending-dispatch \
924 markers; re-running fan-out"
925 );
926 for row in rows {
927 dispatcher.redispatch_pending(row).await;
928 }
929 }
930 Ok(_) => {}
931 Err(e) => {
932 tracing::error!(
933 target: "turul_a2a::push_pending_sweep_error",
934 error = %e,
935 "pending-dispatch sweep failed"
936 );
937 }
938 }
939 match store.list_reclaimable_claims(batch).await {
940 Ok(rows) if !rows.is_empty() => {
941 tracing::warn!(
942 target: "turul_a2a::push_claims_reclaimed",
943 count = rows.len(),
944 "reclaim sweep found expired non-terminal \
945 push claims; redispatching"
946 );
947 for row in rows {
948 dispatcher.redispatch_one(row).await;
949 }
950 }
951 Ok(_) => {}
952 Err(e) => {
953 tracing::error!(
954 target: "turul_a2a::push_sweep_error",
955 error = %e,
956 "push claim sweep failed"
957 );
958 }
959 }
960 }
961 }
962 }
963 }))
964 }
965 _ => None,
966 };
967
968 let serve_result = axum::serve(listener, app).await;
969
970 shutdown.cancel();
972 let _ = poller_handle.await;
973 if let Some(h) = sweep_handle {
974 let _ = h.await;
975 }
976
977 serve_result.map_err(|e| A2aError::Internal(format!("Server error: {e}")))?;
978 Ok(())
979 }
980}
981
982struct SecurityAugmentedExecutor {
984 inner: Arc<dyn AgentExecutor>,
985 security: SecurityContribution,
986}
987
988#[async_trait::async_trait]
989impl AgentExecutor for SecurityAugmentedExecutor {
990 async fn execute(
991 &self,
992 task: &mut turul_a2a_types::Task,
993 msg: &turul_a2a_types::Message,
994 ctx: &crate::executor::ExecutionContext,
995 ) -> Result<(), A2aError> {
996 self.inner.execute(task, msg, ctx).await
997 }
998
999 fn agent_card(&self) -> turul_a2a_proto::AgentCard {
1000 apply_security_merge(self.inner.agent_card(), &self.security)
1001 }
1002
1003 fn extended_agent_card(
1004 &self,
1005 claims: Option<&serde_json::Value>,
1006 ) -> Option<turul_a2a_proto::AgentCard> {
1007 self.inner
1008 .extended_agent_card(claims)
1009 .map(|card| apply_security_merge(card, &self.security))
1010 }
1011}
1012
1013#[cfg(test)]
1014mod tests {
1015 use super::*;
1016 use crate::error::A2aError;
1017 use crate::executor::AgentExecutor;
1018 use turul_a2a_types::{Message, Task};
1019
1020 struct DummyExecutor;
1021
1022 #[async_trait::async_trait]
1023 impl AgentExecutor for DummyExecutor {
1024 async fn execute(
1025 &self,
1026 _task: &mut Task,
1027 _msg: &Message,
1028 _ctx: &crate::executor::ExecutionContext,
1029 ) -> Result<(), A2aError> {
1030 Ok(())
1031 }
1032 fn agent_card(&self) -> turul_a2a_proto::AgentCard {
1033 turul_a2a_proto::AgentCard::default()
1034 }
1035 }
1036
1037 #[test]
1038 fn builder_requires_executor() {
1039 let result = A2aServer::builder().build();
1040 assert!(result.is_err());
1041 }
1042
1043 #[test]
1044 fn builder_with_executor_defaults_storage() {
1045 let server = A2aServer::builder()
1046 .executor(DummyExecutor)
1047 .build()
1048 .unwrap();
1049 let _ = server.into_router();
1050 }
1051
1052 #[test]
1053 fn builder_with_explicit_storage() {
1054 let storage = InMemoryA2aStorage::new();
1057 let server = A2aServer::builder()
1058 .executor(DummyExecutor)
1059 .storage(storage)
1060 .bind(([127, 0, 0, 1], 8080))
1061 .build()
1062 .unwrap();
1063 let _ = server.into_router();
1064 }
1065
1066 struct FakeEventStore;
1068
1069 #[async_trait::async_trait]
1070 impl crate::storage::A2aEventStore for FakeEventStore {
1071 fn backend_name(&self) -> &'static str {
1072 "fake-backend"
1073 }
1074 async fn append_event(
1075 &self,
1076 _t: &str,
1077 _tid: &str,
1078 _e: crate::streaming::StreamEvent,
1079 ) -> Result<u64, crate::storage::A2aStorageError> {
1080 Ok(0)
1081 }
1082 async fn get_events_after(
1083 &self,
1084 _t: &str,
1085 _tid: &str,
1086 _s: u64,
1087 ) -> Result<Vec<(u64, crate::streaming::StreamEvent)>, crate::storage::A2aStorageError>
1088 {
1089 Ok(vec![])
1090 }
1091 async fn latest_sequence(
1092 &self,
1093 _t: &str,
1094 _tid: &str,
1095 ) -> Result<u64, crate::storage::A2aStorageError> {
1096 Ok(0)
1097 }
1098 async fn cleanup_expired(&self) -> Result<u64, crate::storage::A2aStorageError> {
1099 Ok(0)
1100 }
1101 }
1102
1103 #[test]
1104 fn mixed_backend_rejected_at_build() {
1105 let result = A2aServer::builder()
1107 .executor(DummyExecutor)
1108 .event_store(FakeEventStore)
1109 .build();
1110
1111 match result {
1112 Err(e) => {
1113 let msg = e.to_string();
1114 assert!(
1115 msg.contains("backend mismatch") || msg.contains("Storage backend mismatch"),
1116 "Error should mention backend mismatch: {msg}"
1117 );
1118 }
1119 Ok(_) => panic!("Mixed backends should be rejected"),
1120 }
1121 }
1122
1123 struct FakeAtomicStore;
1125
1126 #[async_trait::async_trait]
1127 impl crate::storage::A2aAtomicStore for FakeAtomicStore {
1128 fn backend_name(&self) -> &'static str {
1129 "fake-atomic"
1130 }
1131 async fn create_task_with_events(
1132 &self,
1133 _t: &str,
1134 _o: &str,
1135 task: turul_a2a_types::Task,
1136 _e: Vec<crate::streaming::StreamEvent>,
1137 ) -> Result<(turul_a2a_types::Task, Vec<u64>), crate::storage::A2aStorageError> {
1138 Ok((task, vec![]))
1139 }
1140 async fn update_task_status_with_events(
1141 &self,
1142 _t: &str,
1143 _tid: &str,
1144 _o: &str,
1145 _s: turul_a2a_types::TaskStatus,
1146 _e: Vec<crate::streaming::StreamEvent>,
1147 ) -> Result<(turul_a2a_types::Task, Vec<u64>), crate::storage::A2aStorageError> {
1148 unimplemented!()
1149 }
1150 async fn update_task_with_events(
1151 &self,
1152 _t: &str,
1153 _o: &str,
1154 _task: turul_a2a_types::Task,
1155 _e: Vec<crate::streaming::StreamEvent>,
1156 ) -> Result<Vec<u64>, crate::storage::A2aStorageError> {
1157 Ok(vec![])
1158 }
1159 }
1160
1161 #[test]
1162 fn mixed_atomic_backend_rejected_at_build() {
1163 let result = A2aServer::builder()
1165 .executor(DummyExecutor)
1166 .atomic_store(FakeAtomicStore)
1167 .build();
1168
1169 match result {
1170 Err(e) => {
1171 let msg = e.to_string();
1172 assert!(
1173 msg.contains("backend mismatch") || msg.contains("Storage backend mismatch"),
1174 "Error should mention backend mismatch: {msg}"
1175 );
1176 }
1177 Ok(_) => panic!("Mixed atomic backend should be rejected"),
1178 }
1179 }
1180
1181 #[test]
1182 fn same_backend_accepted() {
1183 let storage = InMemoryA2aStorage::new();
1185 let result = A2aServer::builder()
1186 .executor(DummyExecutor)
1187 .task_storage(storage.clone())
1188 .push_storage(storage.clone())
1189 .event_store(storage.clone())
1190 .atomic_store(storage)
1191 .build();
1192
1193 assert!(result.is_ok(), "Same backend should be accepted");
1194 }
1195
1196 #[test]
1197 fn unified_storage_accepted() {
1198 let result = A2aServer::builder()
1201 .executor(DummyExecutor)
1202 .storage(InMemoryA2aStorage::new())
1203 .build();
1204
1205 assert!(result.is_ok(), "Unified .storage() should be accepted");
1206 }
1207
1208 #[test]
1209 fn runtime_config_setters_survive_build() {
1210 let server = A2aServer::builder()
1215 .executor(DummyExecutor)
1216 .blocking_task_timeout(Duration::from_secs(42))
1218 .timeout_abort_grace(Duration::from_secs(7))
1219 .cancel_handler_grace(Duration::from_secs(3))
1220 .cancel_handler_poll_interval(Duration::from_millis(50))
1221 .cross_instance_cancel_poll_interval(Duration::from_secs(2))
1222 .push_max_attempts(17)
1223 .push_backoff_base(Duration::from_millis(500))
1224 .push_backoff_cap(Duration::from_secs(90))
1225 .push_backoff_jitter(0.5)
1226 .push_request_timeout(Duration::from_secs(20))
1227 .push_connect_timeout(Duration::from_secs(3))
1228 .push_read_timeout(Duration::from_secs(15))
1229 .push_claim_expiry(Duration::from_secs(20 * 60))
1230 .push_config_cache_ttl(Duration::from_secs(10))
1231 .push_failed_delivery_retention(Duration::from_secs(48 * 60 * 60))
1232 .push_max_payload_bytes(2 * 1024 * 1024)
1233 .allow_insecure_push_urls(true)
1234 .build()
1235 .expect("build must succeed");
1236
1237 let cfg = &server.state.runtime_config;
1238 assert_eq!(cfg.blocking_task_timeout, Duration::from_secs(42));
1239 assert_eq!(cfg.timeout_abort_grace, Duration::from_secs(7));
1240 assert_eq!(cfg.cancel_handler_grace, Duration::from_secs(3));
1241 assert_eq!(cfg.cancel_handler_poll_interval, Duration::from_millis(50));
1242 assert_eq!(
1243 cfg.cross_instance_cancel_poll_interval,
1244 Duration::from_secs(2)
1245 );
1246 assert_eq!(cfg.push_max_attempts, 17);
1247 assert_eq!(cfg.push_backoff_base, Duration::from_millis(500));
1248 assert_eq!(cfg.push_backoff_cap, Duration::from_secs(90));
1249 assert!((cfg.push_backoff_jitter - 0.5).abs() < f32::EPSILON);
1250 assert_eq!(cfg.push_request_timeout, Duration::from_secs(20));
1251 assert_eq!(cfg.push_connect_timeout, Duration::from_secs(3));
1252 assert_eq!(cfg.push_read_timeout, Duration::from_secs(15));
1253 assert_eq!(cfg.push_claim_expiry, Duration::from_secs(20 * 60));
1254 assert_eq!(cfg.push_config_cache_ttl, Duration::from_secs(10));
1255 assert_eq!(
1256 cfg.push_failed_delivery_retention,
1257 Duration::from_secs(48 * 60 * 60)
1258 );
1259 assert_eq!(cfg.push_max_payload_bytes, 2 * 1024 * 1024);
1260 assert!(cfg.allow_insecure_push_urls);
1261 }
1262
1263 #[test]
1264 fn runtime_config_defaults_reach_built_server() {
1265 let server = A2aServer::builder()
1268 .executor(DummyExecutor)
1269 .build()
1270 .expect("build must succeed");
1271
1272 let cfg = &server.state.runtime_config;
1273 let defaults = RuntimeConfig::default();
1274 assert_eq!(cfg.blocking_task_timeout, defaults.blocking_task_timeout);
1275 assert_eq!(cfg.push_max_attempts, defaults.push_max_attempts);
1276 assert_eq!(
1277 cfg.allow_insecure_push_urls,
1278 defaults.allow_insecure_push_urls
1279 );
1280 }
1281
1282 #[test]
1287 fn unified_storage_does_not_auto_wire_push_delivery_store() {
1288 let server = A2aServer::builder()
1293 .executor(DummyExecutor)
1294 .storage(InMemoryA2aStorage::new())
1295 .build()
1296 .expect("build must succeed");
1297 assert!(
1298 server.state.push_delivery_store.is_none(),
1299 ".storage() must NOT auto-wire push_delivery_store — push is opt-in via .push_delivery_store()"
1300 );
1301 }
1302
1303 #[test]
1304 fn default_storage_leaves_push_delivery_store_unset() {
1305 let server = A2aServer::builder()
1308 .executor(DummyExecutor)
1309 .build()
1310 .expect("build must succeed");
1311 assert!(
1312 server.state.push_delivery_store.is_none(),
1313 "default build must leave push_delivery_store unset"
1314 );
1315 }
1316
1317 #[test]
1318 fn explicit_push_delivery_store_setter() {
1319 let storage = InMemoryA2aStorage::new().with_push_dispatch_enabled(true);
1323 let server = A2aServer::builder()
1324 .executor(DummyExecutor)
1325 .storage(storage.clone())
1326 .push_delivery_store(storage)
1327 .build()
1328 .expect("build must succeed");
1329 assert!(server.state.push_delivery_store.is_some());
1330 }
1331
1332 #[test]
1337 fn builder_rejects_push_consumer_without_dispatch_enabled() {
1338 let storage = InMemoryA2aStorage::new(); let res = A2aServer::builder()
1343 .executor(DummyExecutor)
1344 .task_storage(storage.clone())
1345 .push_storage(storage.clone())
1346 .event_store(storage.clone())
1347 .atomic_store(storage.clone())
1348 .cancellation_supervisor(storage.clone())
1349 .push_delivery_store(storage)
1350 .build();
1351 let err = match res {
1352 Err(e) => e.to_string(),
1353 Ok(_) => panic!("orphan delivery store must be rejected"),
1354 };
1355 assert!(
1356 err.contains("push_delivery_store wired")
1357 && err.contains("push_dispatch_enabled")
1358 && err.contains("with_push_dispatch_enabled(true)"),
1359 "error should name the fix: {err}"
1360 );
1361 }
1362
1363 #[test]
1364 fn builder_rejects_push_dispatch_without_consumer() {
1365 let storage = InMemoryA2aStorage::new().with_push_dispatch_enabled(true);
1369 let res = A2aServer::builder()
1370 .executor(DummyExecutor)
1371 .task_storage(storage.clone())
1372 .push_storage(storage.clone())
1373 .event_store(storage.clone())
1374 .atomic_store(storage.clone())
1375 .cancellation_supervisor(storage)
1376 .build();
1378 let err = match res {
1379 Err(e) => e.to_string(),
1380 Ok(_) => panic!("orphan dispatch flag must be rejected"),
1381 };
1382 assert!(
1383 err.contains("push_dispatch_enabled() is true") && err.contains("no consumer"),
1384 "error should cite the orphaned-marker rationale: {err}"
1385 );
1386 }
1387
1388 #[test]
1389 fn builder_accepts_push_fully_wired() {
1390 let storage = InMemoryA2aStorage::new().with_push_dispatch_enabled(true);
1393 let res = A2aServer::builder()
1394 .executor(DummyExecutor)
1395 .storage(storage.clone())
1396 .push_delivery_store(storage)
1397 .build();
1398 assert!(res.is_ok(), "push fully wired must build: {:?}", res.err());
1399 }
1400
1401 #[test]
1402 fn builder_accepts_non_push_deployment() {
1403 let storage = InMemoryA2aStorage::new(); let res = A2aServer::builder()
1406 .executor(DummyExecutor)
1407 .task_storage(storage.clone())
1408 .push_storage(storage.clone())
1409 .event_store(storage.clone())
1410 .atomic_store(storage.clone())
1411 .cancellation_supervisor(storage)
1412 .build();
1413 assert!(
1414 res.is_ok(),
1415 "non-push deployment must build: {:?}",
1416 res.err()
1417 );
1418 }
1419
1420 #[test]
1421 fn retry_horizon_violation_rejected() {
1422 let storage = InMemoryA2aStorage::new().with_push_dispatch_enabled(true);
1426 let res = A2aServer::builder()
1427 .executor(DummyExecutor)
1428 .storage(storage.clone())
1429 .push_delivery_store(storage)
1430 .push_max_attempts(10)
1431 .push_backoff_cap(Duration::from_secs(60))
1432 .push_claim_expiry(Duration::from_secs(600))
1434 .build();
1435 let err = match res {
1436 Err(e) => e,
1437 Ok(_) => panic!("retry horizon violation must be rejected"),
1438 };
1439 let msg = err.to_string();
1440 assert!(
1441 msg.contains("retry horizon") || msg.contains("push_claim_expiry"),
1442 "error should mention retry horizon: {msg}"
1443 );
1444 }
1445
1446 struct ContribMiddleware;
1450
1451 #[async_trait::async_trait]
1452 impl A2aMiddleware for ContribMiddleware {
1453 async fn before_request(
1454 &self,
1455 _ctx: &mut crate::middleware::RequestContext,
1456 ) -> Result<(), crate::middleware::MiddlewareError> {
1457 Ok(())
1458 }
1459 fn security_contribution(&self) -> SecurityContribution {
1460 SecurityContribution::new().with_scheme(
1461 "TestApiKey",
1462 turul_a2a_proto::SecurityScheme {
1463 scheme: Some(
1464 turul_a2a_proto::security_scheme::Scheme::ApiKeySecurityScheme(
1465 turul_a2a_proto::ApiKeySecurityScheme {
1466 description: "test".into(),
1467 location: "header".into(),
1468 name: "X-Test-Key".into(),
1469 },
1470 ),
1471 ),
1472 },
1473 vec![],
1474 )
1475 }
1476 }
1477
1478 #[test]
1479 fn push_delivery_store_survives_security_augmentation() {
1480 let storage = InMemoryA2aStorage::new().with_push_dispatch_enabled(true);
1485 let server = A2aServer::builder()
1486 .executor(DummyExecutor)
1487 .storage(storage.clone())
1488 .push_delivery_store(storage)
1489 .middleware(Arc::new(ContribMiddleware))
1490 .build()
1491 .expect("build must succeed");
1492 assert!(server.state.push_delivery_store.is_some());
1494
1495 let (augmented, had_security) = server.into_augmented_state();
1496 assert!(
1497 had_security,
1498 "ContribMiddleware contributed a scheme — augmentation must run"
1499 );
1500 assert!(
1501 augmented.push_delivery_store.is_some(),
1502 "push_delivery_store must survive security augmentation"
1503 );
1504 }
1505
1506 #[test]
1507 fn push_delivery_store_passthrough_without_security() {
1508 let storage = InMemoryA2aStorage::new().with_push_dispatch_enabled(true);
1511 let server = A2aServer::builder()
1512 .executor(DummyExecutor)
1513 .storage(storage.clone())
1514 .push_delivery_store(storage)
1515 .build()
1516 .expect("build must succeed");
1517 let (state, had_security) = server.into_augmented_state();
1518 assert!(!had_security);
1519 assert!(state.push_delivery_store.is_some());
1520 }
1521
1522 #[test]
1523 fn retry_horizon_satisfied_accepted() {
1524 let storage = InMemoryA2aStorage::new().with_push_dispatch_enabled(true);
1526 let server = A2aServer::builder()
1527 .executor(DummyExecutor)
1528 .storage(storage.clone())
1529 .push_delivery_store(storage)
1530 .push_max_attempts(5)
1531 .push_backoff_cap(Duration::from_secs(60))
1532 .push_claim_expiry(Duration::from_secs(5 * 60 + 1))
1533 .build()
1534 .expect("horizon-satisfying config must build");
1535 assert!(server.state.push_delivery_store.is_some());
1536 }
1537}