Skip to main content

turul_a2a/server/
mod.rs

1//! A2aServer builder and runtime.
2
3pub mod in_flight;
4pub mod obs;
5pub mod spawn;
6
7use std::net::SocketAddr;
8use std::sync::Arc;
9use std::time::Duration;
10
11use crate::error::A2aError;
12use crate::executor::AgentExecutor;
13use crate::middleware::{A2aMiddleware, MiddlewareStack, SecurityContribution};
14use crate::router::{AppState, build_router};
15use crate::storage::{
16    A2aAtomicStore, A2aPushNotificationStorage, A2aTaskStorage, InMemoryA2aStorage,
17};
18use crate::streaming::TaskEventBroker;
19
20// ---------------------------------------------------------------------------
21// Runtime configuration
22// ---------------------------------------------------------------------------
23
24/// Runtime tuning knobs for advanced task lifecycle behaviors.
25///
26/// Groups: blocking-send two-deadline timeouts
27/// (`blocking_task_timeout`, `timeout_abort_grace`); cancellation
28/// handler grace / poll intervals and cross-instance cancel-marker
29/// poll interval (`cancel_handler_*`,
30/// `cross_instance_cancel_poll_interval`); push delivery tuning
31/// (`push_*`, `allow_insecure_push_urls`).
32///
33/// Exposed publicly so tests and advanced consumers can construct
34/// [`crate::router::AppState`] directly (e.g., router/transport-level
35/// integration tests). The `#[non_exhaustive]` marker reserves the right
36/// to add fields in patch releases; callers should use
37/// [`RuntimeConfig::default()`] as the construction base and modify
38/// specific fields rather than struct-literal construction.
39#[derive(Debug, Clone)]
40#[non_exhaustive]
41pub struct RuntimeConfig {
42    pub blocking_task_timeout: Duration,
43    pub timeout_abort_grace: Duration,
44    pub cancel_handler_grace: Duration,
45    pub cancel_handler_poll_interval: Duration,
46    pub cross_instance_cancel_poll_interval: Duration,
47
48    pub push_max_attempts: usize,
49    pub push_backoff_base: Duration,
50    pub push_backoff_cap: Duration,
51    pub push_backoff_jitter: f32,
52    pub push_request_timeout: Duration,
53    pub push_connect_timeout: Duration,
54    pub push_read_timeout: Duration,
55    pub push_claim_expiry: Duration,
56    pub push_config_cache_ttl: Duration,
57    pub push_failed_delivery_retention: Duration,
58    pub push_max_payload_bytes: usize,
59    pub allow_insecure_push_urls: bool,
60
61    /// Interval between reclaim-and-redispatch sweeps.
62    /// The server background task enumerates expired-but-non-terminal
63    /// claim rows via
64    /// [`crate::push::A2aPushDeliveryStore::list_reclaimable_claims`]
65    /// and drives each through
66    /// [`crate::push::PushDispatcher::redispatch_one`]. Shorter
67    /// cadence recovers stuck rows faster but increases steady-state
68    /// load; default 60s balances recovery latency against scan cost.
69    pub push_reclaim_sweep_interval: Duration,
70
71    /// Max reclaimable rows to pull per sweep tick. The sweeper
72    /// paginates: each tick fetches up to this many rows, redispatches
73    /// them, and returns. The next tick picks up any remainder.
74    pub push_reclaim_sweep_batch: usize,
75
76    /// Whether this runtime can honour
77    /// `SendMessageConfiguration.return_immediately = true`.
78    ///
79    /// `true` (default) — the runtime keeps the process alive after
80    /// the HTTP response returns so `tokio::spawn`'d executors run to
81    /// completion. Every long-lived host (`A2aServer::run`, ECS,
82    /// Fargate, AppRunner, Kubernetes, bare VM) sets this.
83    ///
84    /// `false` — the runtime cannot guarantee post-return execution;
85    /// `core_send_message` rejects `return_immediately = true` with
86    /// `A2aError::UnsupportedOperation` before any storage write. The
87    /// AWS Lambda adapter (ADR-008, ADR-013 §4.4) sets this.
88    ///
89    /// # Override policy
90    ///
91    /// This field is an escape hatch for tests and advanced internal
92    /// consumers that construct [`crate::router::AppState`] directly.
93    /// **It is not an adopter surface.** Do not set it to `true` on a
94    /// runtime that cannot actually guarantee post-return execution —
95    /// the guard exists to prevent silent executor orphaning.
96    ///
97    /// Adopters who need fire-and-forget-style work on Lambda today
98    /// should invoke their durable mechanism (Step Functions, SQS,
99    /// EventBridge, etc.) from inside their `AgentExecutor::execute`
100    /// body. The executor returns synchronously once the workflow is
101    /// accepted; the A2A task is Completed for "workflow accepted /
102    /// started", not "workflow finished". If the A2A task is meant to
103    /// track the full workflow lifecycle, the workflow itself must
104    /// later call back into turul-a2a storage to update task state.
105    /// See ADR-017 §"Alternatives considered" — Pattern A.
106    ///
107    /// A future ADR will add a capability-taking
108    /// `LambdaA2aServerBuilder` method (shape:
109    /// `with_durable_return_immediately(handler)`) that accepts a
110    /// durable continuation mechanism as an argument and sets this
111    /// flag as a side effect. At that point the flag still SHOULD NOT
112    /// be flipped directly — reach for the builder method.
113    pub supports_return_immediately: bool,
114}
115
116impl Default for RuntimeConfig {
117    fn default() -> Self {
118        Self {
119            blocking_task_timeout: Duration::from_secs(30),
120            timeout_abort_grace: Duration::from_secs(5),
121            cancel_handler_grace: Duration::from_secs(5),
122            cancel_handler_poll_interval: Duration::from_millis(100),
123            cross_instance_cancel_poll_interval: Duration::from_secs(1),
124            push_max_attempts: 8,
125            push_backoff_base: Duration::from_secs(2),
126            push_backoff_cap: Duration::from_secs(60),
127            push_backoff_jitter: 0.25,
128            push_request_timeout: Duration::from_secs(30),
129            push_connect_timeout: Duration::from_secs(5),
130            push_read_timeout: Duration::from_secs(30),
131            push_claim_expiry: Duration::from_secs(10 * 60),
132            push_config_cache_ttl: Duration::from_secs(5),
133            push_failed_delivery_retention: Duration::from_secs(7 * 24 * 60 * 60),
134            push_max_payload_bytes: 1024 * 1024,
135            allow_insecure_push_urls: false,
136            push_reclaim_sweep_interval: Duration::from_secs(60),
137            push_reclaim_sweep_batch: 64,
138            supports_return_immediately: true,
139        }
140    }
141}
142
143/// Builder for configuring and running an A2A server.
144pub struct A2aServerBuilder {
145    executor: Option<Arc<dyn AgentExecutor>>,
146    task_storage: Option<Arc<dyn A2aTaskStorage>>,
147    push_storage: Option<Arc<dyn A2aPushNotificationStorage>>,
148    event_store: Option<Arc<dyn crate::storage::A2aEventStore>>,
149    atomic_store: Option<Arc<dyn A2aAtomicStore>>,
150    cancellation_supervisor: Option<Arc<dyn crate::storage::A2aCancellationSupervisor>>,
151    push_delivery_store: Option<Arc<dyn crate::push::A2aPushDeliveryStore>>,
152    bind_addr: SocketAddr,
153    middleware: Vec<Arc<dyn A2aMiddleware>>,
154    runtime_config: RuntimeConfig,
155}
156
157impl A2aServerBuilder {
158    pub fn new() -> Self {
159        Self {
160            executor: None,
161            task_storage: None,
162            push_storage: None,
163            event_store: None,
164            atomic_store: None,
165            cancellation_supervisor: None,
166            push_delivery_store: None,
167            bind_addr: ([0, 0, 0, 0], 3000).into(),
168            middleware: vec![],
169            runtime_config: RuntimeConfig::default(),
170        }
171    }
172
173    // -----------------------------------------------------------------
174    // Runtime-config setters.
175    //
176    // Each setter updates the internal [`RuntimeConfig`] and returns
177    // `self`. Defaults are in [`RuntimeConfig::default`].
178    // -----------------------------------------------------------------
179
180    /// Soft timeout for blocking `SendMessage` requests. On expiry the
181    /// framework trips the executor's cancellation token and waits up to
182    /// [`Self::timeout_abort_grace`] for cooperative exit.
183    pub fn blocking_task_timeout(mut self, d: Duration) -> Self {
184        self.runtime_config.blocking_task_timeout = d;
185        self
186    }
187
188    /// Grace window between soft cancellation and hard `JoinHandle::abort()`.
189    ///
190    pub fn timeout_abort_grace(mut self, d: Duration) -> Self {
191        self.runtime_config.timeout_abort_grace = d;
192        self
193    }
194
195    /// How long the `CancelTask` handler waits for cancellation to resolve
196    /// before force-committing CANCELED itself.
197    pub fn cancel_handler_grace(mut self, d: Duration) -> Self {
198        self.runtime_config.cancel_handler_grace = d;
199        self
200    }
201
202    /// Poll interval used inside the `CancelTask` handler's grace window to
203    /// re-read task state from storage.
204    pub fn cancel_handler_poll_interval(mut self, d: Duration) -> Self {
205        self.runtime_config.cancel_handler_poll_interval = d;
206        self
207    }
208
209    /// How often the in-flight supervisor batch-polls the cancel marker
210    /// across in-flight tasks for cross-instance cancel propagation.
211    ///
212    pub fn cross_instance_cancel_poll_interval(mut self, d: Duration) -> Self {
213        self.runtime_config.cross_instance_cancel_poll_interval = d;
214        self
215    }
216
217    /// Maximum retry attempts (including first) per push delivery.
218    ///
219    pub fn push_max_attempts(mut self, n: usize) -> Self {
220        self.runtime_config.push_max_attempts = n;
221        self
222    }
223
224    /// Base delay before the second push attempt; doubles up to
225    /// [`Self::push_backoff_cap`].
226    pub fn push_backoff_base(mut self, d: Duration) -> Self {
227        self.runtime_config.push_backoff_base = d;
228        self
229    }
230
231    /// Maximum single-wait in the push retry schedule.
232    pub fn push_backoff_cap(mut self, d: Duration) -> Self {
233        self.runtime_config.push_backoff_cap = d;
234        self
235    }
236
237    /// Jitter fraction applied to push retry waits.
238    pub fn push_backoff_jitter(mut self, j: f32) -> Self {
239        self.runtime_config.push_backoff_jitter = j;
240        self
241    }
242
243    /// Total per-request timeout for a push POST.
244    pub fn push_request_timeout(mut self, d: Duration) -> Self {
245        self.runtime_config.push_request_timeout = d;
246        self
247    }
248
249    /// Connect-phase timeout for a push POST.
250    pub fn push_connect_timeout(mut self, d: Duration) -> Self {
251        self.runtime_config.push_connect_timeout = d;
252        self
253    }
254
255    /// Read-phase timeout for a push POST.
256    pub fn push_read_timeout(mut self, d: Duration) -> Self {
257        self.runtime_config.push_read_timeout = d;
258        self
259    }
260
261    /// Claim expiry for the push delivery claim table. Must exceed the
262    /// retry horizon implied by `push_max_attempts` + `push_backoff_cap`;
263    /// the push-delivery builder validates this on `build()`.
264    pub fn push_claim_expiry(mut self, d: Duration) -> Self {
265        self.runtime_config.push_claim_expiry = d;
266        self
267    }
268
269    /// Push-config cache TTL inside the delivery worker.
270    pub fn push_config_cache_ttl(mut self, d: Duration) -> Self {
271        self.runtime_config.push_config_cache_ttl = d;
272        self
273    }
274
275    /// Retention for failed push delivery records (operator inspection).
276    ///
277    pub fn push_failed_delivery_retention(mut self, d: Duration) -> Self {
278        self.runtime_config.push_failed_delivery_retention = d;
279        self
280    }
281
282    /// Maximum serialized Task body size for a push POST.
283    pub fn push_max_payload_bytes(mut self, bytes: usize) -> Self {
284        self.runtime_config.push_max_payload_bytes = bytes;
285        self
286    }
287
288    /// Dev-only escape hatch: permit `http://` webhook URLs AND bypass the
289    /// private-IP blocklist. Required for localhost wiremock testing.
290    /// Default false.
291    pub fn allow_insecure_push_urls(mut self, allow: bool) -> Self {
292        self.runtime_config.allow_insecure_push_urls = allow;
293        self
294    }
295
296    /// Interval between reclaim-and-redispatch sweeps. Default 60s.
297    pub fn push_reclaim_sweep_interval(mut self, d: Duration) -> Self {
298        self.runtime_config.push_reclaim_sweep_interval = d;
299        self
300    }
301
302    /// Max rows pulled per reclaim sweep tick. Default 64.
303    pub fn push_reclaim_sweep_batch(mut self, n: usize) -> Self {
304        self.runtime_config.push_reclaim_sweep_batch = n;
305        self
306    }
307
308    pub fn executor(mut self, executor: impl AgentExecutor + 'static) -> Self {
309        self.executor = Some(Arc::new(executor));
310        self
311    }
312
313    /// Set all storage from a single backend instance.
314    ///
315    /// This is the **preferred** method — a single struct implementing all storage
316    /// traits guarantees the same-backend requirement. Works with any
317    /// backend: `InMemoryA2aStorage`, `SqliteA2aStorage`, `PostgresA2aStorage`,
318    /// `DynamoDbA2aStorage`, etc.
319    ///
320    /// errata: `.storage()` wires storage traits only. It does
321    /// **not** auto-register the storage as a push-delivery store, even if the
322    /// backend happens to implement [`crate::push::A2aPushDeliveryStore`]. To
323    /// opt in to push delivery, call [`Self::push_delivery_store`] explicitly
324    /// and call `with_push_dispatch_enabled(true)` on the storage instance
325    /// before passing it here. Non-push deployments need neither.
326    pub fn storage<S>(mut self, storage: S) -> Self
327    where
328        S: A2aTaskStorage
329            + A2aPushNotificationStorage
330            + crate::storage::A2aEventStore
331            + A2aAtomicStore
332            + crate::storage::A2aCancellationSupervisor
333            + Clone
334            + 'static,
335    {
336        self.task_storage = Some(Arc::new(storage.clone()));
337        self.push_storage = Some(Arc::new(storage.clone()));
338        self.event_store = Some(Arc::new(storage.clone()));
339        self.atomic_store = Some(Arc::new(storage.clone()));
340        self.cancellation_supervisor = Some(Arc::new(storage));
341        self
342    }
343
344    /// Set the cancellation supervisor individually. Prefer `.storage()`
345    /// for ADR-009 same-backend compliance. Consumed by the `:cancel`
346    /// handler for cross-instance marker reads.
347    pub fn cancellation_supervisor(
348        mut self,
349        supervisor: impl crate::storage::A2aCancellationSupervisor + 'static,
350    ) -> Self {
351        self.cancellation_supervisor = Some(Arc::new(supervisor));
352        self
353    }
354
355    /// Set task storage individually. Prefer `.storage()` for ADR-009 compliance.
356    pub fn task_storage(mut self, storage: impl A2aTaskStorage + 'static) -> Self {
357        self.task_storage = Some(Arc::new(storage));
358        self
359    }
360
361    /// Set push notification storage individually. Prefer `.storage()` for ADR-009 compliance.
362    pub fn push_storage(mut self, storage: impl A2aPushNotificationStorage + 'static) -> Self {
363        self.push_storage = Some(Arc::new(storage));
364        self
365    }
366
367    /// Set event store individually. Prefer `.storage()` for ADR-009 compliance.
368    pub fn event_store(mut self, store: impl crate::storage::A2aEventStore + 'static) -> Self {
369        self.event_store = Some(Arc::new(store));
370        self
371    }
372
373    /// Set atomic store individually. Prefer `.storage()` for ADR-009 compliance.
374    pub fn atomic_store(mut self, store: impl A2aAtomicStore + 'static) -> Self {
375        self.atomic_store = Some(Arc::new(store));
376        self
377    }
378
379    /// Set the push delivery claim store individually.
380    ///
381    /// Required when push-notification delivery is enabled in the
382    /// deployment. `.storage()` wires this automatically from a unified
383    /// backend; use this setter only for mixed-backend tests or when
384    /// running against an external push-coordination service. Prefer
385    /// `.storage()` for ADR-009 same-backend compliance.
386    pub fn push_delivery_store(
387        mut self,
388        store: impl crate::push::A2aPushDeliveryStore + 'static,
389    ) -> Self {
390        self.push_delivery_store = Some(Arc::new(store));
391        self
392    }
393
394    pub fn bind(mut self, addr: impl Into<SocketAddr>) -> Self {
395        self.bind_addr = addr.into();
396        self
397    }
398
399    /// Add auth middleware. Multiple calls stack (AND semantics).
400    /// Use `AnyOfMiddleware` for OR semantics.
401    pub fn middleware(mut self, mw: Arc<dyn A2aMiddleware>) -> Self {
402        self.middleware.push(mw);
403        self
404    }
405
406    pub fn build(self) -> Result<A2aServer, A2aError> {
407        let executor = self
408            .executor
409            .ok_or(A2aError::Internal("executor is required".into()))?;
410
411        let default_storage = InMemoryA2aStorage::new();
412        let task_storage = self
413            .task_storage
414            .unwrap_or_else(|| Arc::new(default_storage.clone()));
415        let push_storage = self
416            .push_storage
417            .unwrap_or_else(|| Arc::new(default_storage.clone()));
418        let event_store: Arc<dyn crate::storage::A2aEventStore> = self
419            .event_store
420            .unwrap_or_else(|| Arc::new(default_storage.clone()));
421        let atomic_store: Arc<dyn A2aAtomicStore> = self
422            .atomic_store
423            .unwrap_or_else(|| Arc::new(default_storage.clone()));
424        let cancellation_supervisor: Arc<dyn crate::storage::A2aCancellationSupervisor> = self
425            .cancellation_supervisor
426            .unwrap_or_else(|| Arc::new(default_storage.clone()));
427        let push_delivery_store: Option<Arc<dyn crate::push::A2aPushDeliveryStore>> =
428            self.push_delivery_store;
429
430        // Same-backend enforcement: all storage traits must share the same backend.
431        let task_backend = task_storage.backend_name();
432        let push_backend = push_storage.backend_name();
433        let event_backend = event_store.backend_name();
434        let atomic_backend = atomic_store.backend_name();
435        let supervisor_backend = cancellation_supervisor.backend_name();
436        if task_backend != push_backend
437            || task_backend != event_backend
438            || task_backend != atomic_backend
439            || task_backend != supervisor_backend
440        {
441            return Err(A2aError::Internal(format!(
442                "Storage backend mismatch: task={task_backend}, push={push_backend}, \
443                 event={event_backend}, atomic={atomic_backend}, \
444                 cancellation_supervisor={supervisor_backend}. \
445                 ADR-009 requires all storage traits to share the same backend."
446            )));
447        }
448
449        // Push-dispatch consistency: the atomic store's
450        // opt-in flag and the presence of `push_delivery_store` MUST
451        // agree. Both-true = push fully wired; both-false = non-push
452        // deployment. The mixed cases are build errors — one orphans a
453        // load-bearing marker table without a consumer, the other
454        // wires a dispatcher that will never receive durable markers.
455        match (
456            push_delivery_store.is_some(),
457            atomic_store.push_dispatch_enabled(),
458        ) {
459            (true, true) | (false, false) => {}
460            (true, false) => {
461                return Err(A2aError::Internal(
462                    "push_delivery_store wired but atomic_store.push_dispatch_enabled() \
463                     is false. Call .with_push_dispatch_enabled(true) on the backend \
464                     storage before passing it to .storage()."
465                        .into(),
466                ));
467            }
468            (false, true) => {
469                return Err(A2aError::Internal(
470                    "atomic_store.push_dispatch_enabled() is true but no \
471                     push_delivery_store is wired. Pending-dispatch markers would be \
472                     written with no consumer, imposing load-bearing infra for no \
473                     benefit. If you need to populate markers for an external \
474                     consumer, open an issue for a distinctly-named opt-in — for now, \
475                     this configuration is rejected."
476                        .into(),
477                ));
478            }
479        }
480
481        let push_dispatcher: Option<Arc<crate::push::PushDispatcher>> =
482            if let Some(push_delivery) = push_delivery_store.as_ref() {
483                let push_delivery_backend = push_delivery.backend_name();
484                if task_backend != push_delivery_backend {
485                    return Err(A2aError::Internal(format!(
486                        "Storage backend mismatch: task={task_backend}, \
487                         push_delivery={push_delivery_backend}. \
488                         ADR-009 requires all storage traits to share the same backend."
489                    )));
490                }
491
492                // claim expiry must exceed the worst-case
493                // retry horizon so a claim held by a healthy worker is
494                // never mis-classified as stale mid-retry. Upper bound
495                // for the horizon is `max_attempts * backoff_cap`
496                // (every attempt waits the cap), which is conservative
497                // and independent of jitter.
498                let retry_horizon = self
499                    .runtime_config
500                    .push_backoff_cap
501                    .saturating_mul(self.runtime_config.push_max_attempts as u32);
502                if self.runtime_config.push_claim_expiry <= retry_horizon {
503                    return Err(A2aError::Internal(format!(
504                        "push_claim_expiry ({:?}) must be greater than retry horizon \
505                         (push_max_attempts={} * push_backoff_cap={:?} = {:?}). \
506                         Raise push_claim_expiry or lower push_max_attempts/push_backoff_cap.",
507                        self.runtime_config.push_claim_expiry,
508                        self.runtime_config.push_max_attempts,
509                        self.runtime_config.push_backoff_cap,
510                        retry_horizon
511                    )));
512                }
513
514                // Build the push-delivery worker + dispatcher now that
515                // we've validated the horizon. Runtime config carries
516                // the full worker tuning; the dispatcher closes the
517                // commit-to-POST loop (ADR-011 §2 + §13.13).
518                let delivery_cfg = crate::push::delivery::PushDeliveryConfig {
519                    max_attempts: self.runtime_config.push_max_attempts as u32,
520                    backoff_base: self.runtime_config.push_backoff_base,
521                    backoff_cap: self.runtime_config.push_backoff_cap,
522                    backoff_jitter: self.runtime_config.push_backoff_jitter,
523                    request_timeout: self.runtime_config.push_request_timeout,
524                    connect_timeout: self.runtime_config.push_connect_timeout,
525                    read_timeout: self.runtime_config.push_read_timeout,
526                    claim_expiry: self.runtime_config.push_claim_expiry,
527                    max_payload_bytes: self.runtime_config.push_max_payload_bytes,
528                    allow_insecure_urls: self.runtime_config.allow_insecure_push_urls,
529                    ..crate::push::delivery::PushDeliveryConfig::default()
530                };
531
532                let instance_id = format!("a2a-server-{}", uuid::Uuid::now_v7());
533                let worker = crate::push::delivery::PushDeliveryWorker::new(
534                    push_delivery.clone(),
535                    delivery_cfg,
536                    None,
537                    instance_id,
538                )
539                .map_err(|e| A2aError::Internal(format!("push worker build failed: {e}")))?;
540
541                Some(Arc::new(crate::push::PushDispatcher::new(
542                    Arc::new(worker),
543                    push_storage.clone(),
544                    task_storage.clone(),
545                )))
546            } else {
547                None
548            };
549
550        // Collect and merge security contributions
551        let contributions: Vec<SecurityContribution> = self
552            .middleware
553            .iter()
554            .map(|m| m.security_contribution())
555            .collect();
556        let merged = merge_stacked_contributions(&contributions)?;
557
558        // validate scheme references in every
559        // build-materializable advertised card surface. The merge
560        // applied here mirrors `SecurityAugmentedExecutor::agent_card()`
561        // so validation sees the exact surface clients will receive.
562        let public_materialized = apply_security_merge(executor.agent_card(), &merged);
563        validate_card_security_references(&public_materialized, "agent_card")?;
564        if let Some(extended_raw) = executor.extended_agent_card(None) {
565            let extended_materialized = apply_security_merge(extended_raw, &merged);
566            validate_card_security_references(&extended_materialized, "extended_agent_card")?;
567        }
568
569        Ok(A2aServer {
570            state: AppState {
571                executor,
572                task_storage,
573                push_storage,
574                event_store,
575                atomic_store,
576                event_broker: TaskEventBroker::new(),
577                middleware_stack: Arc::new(MiddlewareStack::new(self.middleware)),
578                runtime_config: self.runtime_config,
579                in_flight: Arc::new(crate::server::in_flight::InFlightRegistry::new()),
580                cancellation_supervisor,
581                push_delivery_store,
582                push_dispatcher,
583                // long-lived binary server never enqueues;
584                // `tokio::spawn` is durable in this runtime.
585                durable_executor_queue: None,
586            },
587            merged_security: merged,
588            bind_addr: self.bind_addr,
589        })
590    }
591}
592
593impl Default for A2aServerBuilder {
594    fn default() -> Self {
595        Self::new()
596    }
597}
598
599/// Merge stacked contributions (AND semantics).
600///
601/// Schemes: union with collision detection (identical = dedup, different = error).
602/// Requirements: Cartesian product (AND).
603fn merge_stacked_contributions(
604    contributions: &[SecurityContribution],
605) -> Result<SecurityContribution, A2aError> {
606    let mut merged = SecurityContribution::new();
607
608    if contributions.is_empty() {
609        return Ok(merged);
610    }
611
612    // 1. Collect schemes with collision detection
613    let mut seen_schemes: std::collections::HashMap<String, turul_a2a_proto::SecurityScheme> =
614        std::collections::HashMap::new();
615
616    for contrib in contributions {
617        for (name, scheme) in &contrib.schemes {
618            if let Some(existing) = seen_schemes.get(name) {
619                // Check semantic equality
620                if !schemes_equivalent(existing, scheme) {
621                    return Err(A2aError::Internal(format!(
622                        "Security scheme collision: '{}' has conflicting definitions",
623                        name
624                    )));
625                }
626                // Identical — skip (dedup)
627            } else {
628                seen_schemes.insert(name.clone(), scheme.clone());
629                merged.schemes.push((name.clone(), scheme.clone()));
630            }
631        }
632    }
633
634    // 2. Compute requirements via Cartesian product (AND)
635    let requirement_sets: Vec<&[turul_a2a_proto::SecurityRequirement]> = contributions
636        .iter()
637        .filter(|c| !c.requirements.is_empty())
638        .map(|c| c.requirements.as_slice())
639        .collect();
640
641    if requirement_sets.is_empty() {
642        return Ok(merged);
643    }
644
645    let mut combined: Vec<turul_a2a_proto::SecurityRequirement> = requirement_sets[0].to_vec();
646
647    for alternatives in &requirement_sets[1..] {
648        let mut new_combined = Vec::new();
649        for existing in &combined {
650            for alt in *alternatives {
651                let mut merged_schemes = existing.schemes.clone();
652                for (name, scopes) in &alt.schemes {
653                    merged_schemes
654                        .entry(name.clone())
655                        .and_modify(|existing_scopes| {
656                            // Union scopes, dedup + sort
657                            for s in &scopes.list {
658                                if !existing_scopes.list.contains(s) {
659                                    existing_scopes.list.push(s.clone());
660                                }
661                            }
662                            existing_scopes.list.sort();
663                            existing_scopes.list.dedup();
664                        })
665                        .or_insert_with(|| scopes.clone());
666                }
667                new_combined.push(turul_a2a_proto::SecurityRequirement {
668                    schemes: merged_schemes,
669                });
670            }
671        }
672        combined = new_combined;
673    }
674
675    merged.requirements = combined;
676    Ok(merged)
677}
678
679/// Apply a `SecurityContribution` to a raw `AgentCard`, producing the
680/// card that clients actually see. Mirrors the per-request merge in
681/// `SecurityAugmentedExecutor::agent_card()` so build-time validation
682/// and runtime serving agree on the final surface.
683fn apply_security_merge(
684    mut card: turul_a2a_proto::AgentCard,
685    security: &SecurityContribution,
686) -> turul_a2a_proto::AgentCard {
687    for (name, scheme) in &security.schemes {
688        card.security_schemes
689            .entry(name.clone())
690            .or_insert_with(|| scheme.clone());
691    }
692    for req in &security.requirements {
693        card.security_requirements.push(req.clone());
694    }
695    card
696}
697
698/// every scheme name referenced by a `SecurityRequirement`
699/// on the card MUST appear in `card.security_schemes`. Runs post-merge
700/// against the final advertised card (public or extended).
701///
702/// `surface` is the human-readable surface label (`agent_card` or
703/// `extended_agent_card`) — included verbatim in the error so the
704/// adopter can tell which card failed.
705fn validate_card_security_references(
706    card: &turul_a2a_proto::AgentCard,
707    surface: &str,
708) -> Result<(), A2aError> {
709    for req in &card.security_requirements {
710        for scheme_name in req.schemes.keys() {
711            if !card.security_schemes.contains_key(scheme_name) {
712                return Err(A2aError::InvalidRequest {
713                    message: format!(
714                        "{surface}: agent-level security requirement references \
715                         undeclared scheme '{scheme_name}'"
716                    ),
717                });
718            }
719        }
720    }
721    for skill in &card.skills {
722        for req in &skill.security_requirements {
723            for scheme_name in req.schemes.keys() {
724                if !card.security_schemes.contains_key(scheme_name) {
725                    return Err(A2aError::InvalidRequest {
726                        message: format!(
727                            "{surface}: skill '{skill_id}' references \
728                             undeclared security scheme '{scheme_name}'",
729                            skill_id = skill.id
730                        ),
731                    });
732                }
733            }
734        }
735    }
736    Ok(())
737}
738
739/// Semantic equality for SecurityScheme.
740///
741/// Uses structural PartialEq on proto types. This is correct for:
742/// - ApiKeySecurityScheme (no maps)
743/// - HttpAuthSecurityScheme (no maps)
744/// - MutualTlsSecurityScheme (no maps)
745///
746/// Limitation: For OAuth2SecurityScheme and OpenIdConnectSecurityScheme,
747/// scope maps (HashMap) have non-deterministic iteration order. PartialEq
748/// on HashMap compares by content (not order), so this is correct for
749/// HashMap but would need explicit normalization if proto ever uses
750/// BTreeMap or sorted structures. Sufficient for the scheme types
751/// this workspace supports (API Key + HTTP Bearer); revisit if new
752/// scheme types introduce ordering-sensitive fields.
753fn schemes_equivalent(
754    a: &turul_a2a_proto::SecurityScheme,
755    b: &turul_a2a_proto::SecurityScheme,
756) -> bool {
757    a == b
758}
759
760/// A configured A2A server ready to run.
761pub struct A2aServer {
762    state: AppState,
763    merged_security: SecurityContribution,
764    bind_addr: SocketAddr,
765}
766
767impl A2aServer {
768    pub fn builder() -> A2aServerBuilder {
769        A2aServerBuilder::new()
770    }
771
772    /// Build the axum router — useful for testing.
773    /// Augments the AgentCard with merged security contributions.
774    pub fn into_router(self) -> axum::Router {
775        let (state, had_security) = self.into_augmented_state();
776        if !had_security {
777            return build_router(state);
778        }
779        build_router(state)
780    }
781
782    /// Build the tonic gRPC router with the Tower auth stack already
783    /// layered (ADR-014 §2.2 / §2.4).
784    ///
785    /// The returned `tonic::transport::server::Router` always applies
786    /// the same [`MiddlewareStack`] as the HTTP path. A raw
787    /// unauthenticated service accessor is
788    /// deliberately not exposed — returning the inner service would
789    /// permit adopters to compose it into a custom tonic server
790    /// without the auth layer and silently bypass authentication.
791    #[cfg(feature = "grpc")]
792    pub fn into_tonic_router(self) -> crate::grpc::LayeredGrpcRouter {
793        let (state, _) = self.into_augmented_state();
794        let middleware_stack = state.middleware_stack.clone();
795        crate::grpc::make_grpc_router(state, middleware_stack)
796    }
797
798    /// Produce the `AppState` that `into_router` would mount, plus a flag
799    /// indicating whether security augmentation was applied. Exposed to
800    /// tests so they can assert post-augmentation invariants (e.g., that
801    /// `push_delivery_store` survives the auth rebuild) without going
802    /// through an HTTP round-trip.
803    pub(crate) fn into_augmented_state(self) -> (AppState, bool) {
804        if self.merged_security.is_empty() {
805            return (self.state, false);
806        }
807
808        let wrapped = SecurityAugmentedExecutor {
809            inner: self.state.executor.clone(),
810            security: self.merged_security,
811        };
812
813        let augmented = AppState {
814            executor: Arc::new(wrapped),
815            task_storage: self.state.task_storage,
816            push_storage: self.state.push_storage,
817            event_store: self.state.event_store,
818            atomic_store: self.state.atomic_store,
819            event_broker: self.state.event_broker,
820            middleware_stack: self.state.middleware_stack,
821            runtime_config: self.state.runtime_config,
822            in_flight: self.state.in_flight,
823            cancellation_supervisor: self.state.cancellation_supervisor,
824            // Preserve both the push claim store and the dispatcher
825            // through security augmentation. Dropping either would
826            // silently disable push delivery for every auth-gated
827            // deployment.
828            push_delivery_store: self.state.push_delivery_store,
829            push_dispatcher: self.state.push_dispatcher,
830            durable_executor_queue: self.state.durable_executor_queue,
831        };
832
833        (augmented, true)
834    }
835
836    /// Run the server.
837    pub async fn run(self) -> Result<(), A2aError> {
838        let bind_addr = self.bind_addr;
839        // Capture substrate refs for the cross-instance cancel poller
840        // before `into_router` consumes `self`. The poller uses the same
841        // Arcs for `in_flight` and `cancellation_supervisor` as the
842        // router's AppState, so a marker write from any instance is
843        // observed here within one `cross_instance_cancel_poll_interval`.
844        let poller_registry = std::sync::Arc::clone(&self.state.in_flight);
845        let poller_supervisor = std::sync::Arc::clone(&self.state.cancellation_supervisor);
846        let poller_interval = self
847            .state
848            .runtime_config
849            .cross_instance_cancel_poll_interval;
850        let push_delivery_store_for_sweep = self.state.push_delivery_store.clone();
851        let self_push_dispatcher_for_sweep = self.state.push_dispatcher.clone();
852        let sweep_interval_for_task = self.state.runtime_config.push_reclaim_sweep_interval;
853        let sweep_batch_for_task = self.state.runtime_config.push_reclaim_sweep_batch;
854        let push_claim_expiry_for_sweep = self.state.runtime_config.push_claim_expiry;
855
856        let app = self.into_router();
857        let listener = tokio::net::TcpListener::bind(bind_addr)
858            .await
859            .map_err(|e| A2aError::Internal(format!("Failed to bind: {e}")))?;
860        tracing::info!("A2A server listening on {}", bind_addr);
861
862        // Spawn the supervisor poll loop. A shutdown token lets us stop
863        // the poller when the server exits; for now axum::serve runs to
864        // completion so the token is never tripped (server shutdown is
865        // driven by process exit). Later: wire the token to a graceful
866        // shutdown signal.
867        let shutdown = tokio_util::sync::CancellationToken::new();
868        let poller_shutdown = shutdown.clone();
869        let poller_handle =
870            tokio::spawn(crate::server::in_flight::run_cross_instance_cancel_poller(
871                poller_registry,
872                poller_supervisor,
873                poller_interval,
874                poller_shutdown,
875            ));
876
877        // Push-delivery reclaim loop. Two
878        // enumerations run per tick:
879        //
880        // 1. `list_stale_pending_dispatches` — events whose initial
881        //    fan-out died (e.g. persistent config-store outage)
882        //    before any claim rows were created. The dispatcher
883        //    reloads the task and re-runs the fan-out; each
884        //    per-config delivery then creates or refreshes its
885        //    claim row.
886        //
887        // 2. `list_reclaimable_claims` — claim rows that reached a
888        //    non-terminal expired state (worker exhausted its
889        //    bounded persist retry). The dispatcher re-invokes
890        //    `deliver()` which re-claims and re-POSTs.
891        //
892        // Staleness threshold for pending dispatches: claim_expiry,
893        // the same budget the builder already validates must exceed
894        // the retry horizon. In-progress dispatches stay under this
895        // threshold; only genuinely stuck markers get picked up.
896        let sweep_handle = match (
897            push_delivery_store_for_sweep,
898            self_push_dispatcher_for_sweep,
899        ) {
900            (Some(store), Some(dispatcher)) => {
901                let shutdown = shutdown.clone();
902                let interval = sweep_interval_for_task;
903                let batch = sweep_batch_for_task;
904                let pending_stale_threshold = push_claim_expiry_for_sweep;
905                Some(tokio::spawn(async move {
906                    let mut ticker = tokio::time::interval(interval);
907                    ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
908                    loop {
909                        tokio::select! {
910                            _ = shutdown.cancelled() => break,
911                            _ = ticker.tick() => {
912                                let cutoff = std::time::SystemTime::now()
913                                    .checked_sub(pending_stale_threshold)
914                                    .unwrap_or(std::time::SystemTime::UNIX_EPOCH);
915                                match store
916                                    .list_stale_pending_dispatches(cutoff, batch)
917                                    .await
918                                {
919                                    Ok(rows) if !rows.is_empty() => {
920                                        tracing::warn!(
921                                            target: "turul_a2a::push_pending_dispatches_stale",
922                                            count = rows.len(),
923                                            "reclaim sweep found stale pending-dispatch \
924                                             markers; re-running fan-out"
925                                        );
926                                        for row in rows {
927                                            dispatcher.redispatch_pending(row).await;
928                                        }
929                                    }
930                                    Ok(_) => {}
931                                    Err(e) => {
932                                        tracing::error!(
933                                            target: "turul_a2a::push_pending_sweep_error",
934                                            error = %e,
935                                            "pending-dispatch sweep failed"
936                                        );
937                                    }
938                                }
939                                match store.list_reclaimable_claims(batch).await {
940                                    Ok(rows) if !rows.is_empty() => {
941                                        tracing::warn!(
942                                            target: "turul_a2a::push_claims_reclaimed",
943                                            count = rows.len(),
944                                            "reclaim sweep found expired non-terminal \
945                                             push claims; redispatching"
946                                        );
947                                        for row in rows {
948                                            dispatcher.redispatch_one(row).await;
949                                        }
950                                    }
951                                    Ok(_) => {}
952                                    Err(e) => {
953                                        tracing::error!(
954                                            target: "turul_a2a::push_sweep_error",
955                                            error = %e,
956                                            "push claim sweep failed"
957                                        );
958                                    }
959                                }
960                            }
961                        }
962                    }
963                }))
964            }
965            _ => None,
966        };
967
968        let serve_result = axum::serve(listener, app).await;
969
970        // Gracefully stop background loops — relevant if axum::serve ever returns.
971        shutdown.cancel();
972        let _ = poller_handle.await;
973        if let Some(h) = sweep_handle {
974            let _ = h.await;
975        }
976
977        serve_result.map_err(|e| A2aError::Internal(format!("Server error: {e}")))?;
978        Ok(())
979    }
980}
981
982/// Wraps an executor to augment its agent card with merged security contributions.
983struct SecurityAugmentedExecutor {
984    inner: Arc<dyn AgentExecutor>,
985    security: SecurityContribution,
986}
987
988#[async_trait::async_trait]
989impl AgentExecutor for SecurityAugmentedExecutor {
990    async fn execute(
991        &self,
992        task: &mut turul_a2a_types::Task,
993        msg: &turul_a2a_types::Message,
994        ctx: &crate::executor::ExecutionContext,
995    ) -> Result<(), A2aError> {
996        self.inner.execute(task, msg, ctx).await
997    }
998
999    fn agent_card(&self) -> turul_a2a_proto::AgentCard {
1000        apply_security_merge(self.inner.agent_card(), &self.security)
1001    }
1002
1003    fn extended_agent_card(
1004        &self,
1005        claims: Option<&serde_json::Value>,
1006    ) -> Option<turul_a2a_proto::AgentCard> {
1007        self.inner
1008            .extended_agent_card(claims)
1009            .map(|card| apply_security_merge(card, &self.security))
1010    }
1011}
1012
1013#[cfg(test)]
1014mod tests {
1015    use super::*;
1016    use crate::error::A2aError;
1017    use crate::executor::AgentExecutor;
1018    use turul_a2a_types::{Message, Task};
1019
1020    struct DummyExecutor;
1021
1022    #[async_trait::async_trait]
1023    impl AgentExecutor for DummyExecutor {
1024        async fn execute(
1025            &self,
1026            _task: &mut Task,
1027            _msg: &Message,
1028            _ctx: &crate::executor::ExecutionContext,
1029        ) -> Result<(), A2aError> {
1030            Ok(())
1031        }
1032        fn agent_card(&self) -> turul_a2a_proto::AgentCard {
1033            turul_a2a_proto::AgentCard::default()
1034        }
1035    }
1036
1037    #[test]
1038    fn builder_requires_executor() {
1039        let result = A2aServer::builder().build();
1040        assert!(result.is_err());
1041    }
1042
1043    #[test]
1044    fn builder_with_executor_defaults_storage() {
1045        let server = A2aServer::builder()
1046            .executor(DummyExecutor)
1047            .build()
1048            .unwrap();
1049        let _ = server.into_router();
1050    }
1051
1052    #[test]
1053    fn builder_with_explicit_storage() {
1054        // `.storage()` alone wires storage traits only. No push delivery
1055        // is implied — matches the common non-push-consumer deployment.
1056        let storage = InMemoryA2aStorage::new();
1057        let server = A2aServer::builder()
1058            .executor(DummyExecutor)
1059            .storage(storage)
1060            .bind(([127, 0, 0, 1], 8080))
1061            .build()
1062            .unwrap();
1063        let _ = server.into_router();
1064    }
1065
1066    /// Fake event store with a different backend_name for mismatch testing.
1067    struct FakeEventStore;
1068
1069    #[async_trait::async_trait]
1070    impl crate::storage::A2aEventStore for FakeEventStore {
1071        fn backend_name(&self) -> &'static str {
1072            "fake-backend"
1073        }
1074        async fn append_event(
1075            &self,
1076            _t: &str,
1077            _tid: &str,
1078            _e: crate::streaming::StreamEvent,
1079        ) -> Result<u64, crate::storage::A2aStorageError> {
1080            Ok(0)
1081        }
1082        async fn get_events_after(
1083            &self,
1084            _t: &str,
1085            _tid: &str,
1086            _s: u64,
1087        ) -> Result<Vec<(u64, crate::streaming::StreamEvent)>, crate::storage::A2aStorageError>
1088        {
1089            Ok(vec![])
1090        }
1091        async fn latest_sequence(
1092            &self,
1093            _t: &str,
1094            _tid: &str,
1095        ) -> Result<u64, crate::storage::A2aStorageError> {
1096            Ok(0)
1097        }
1098        async fn cleanup_expired(&self) -> Result<u64, crate::storage::A2aStorageError> {
1099            Ok(0)
1100        }
1101    }
1102
1103    #[test]
1104    fn mixed_backend_rejected_at_build() {
1105        // Task + push = in-memory, event = fake-backend → should fail
1106        let result = A2aServer::builder()
1107            .executor(DummyExecutor)
1108            .event_store(FakeEventStore)
1109            .build();
1110
1111        match result {
1112            Err(e) => {
1113                let msg = e.to_string();
1114                assert!(
1115                    msg.contains("backend mismatch") || msg.contains("Storage backend mismatch"),
1116                    "Error should mention backend mismatch: {msg}"
1117                );
1118            }
1119            Ok(_) => panic!("Mixed backends should be rejected"),
1120        }
1121    }
1122
1123    /// Fake atomic store with a different backend_name for mismatch testing.
1124    struct FakeAtomicStore;
1125
1126    #[async_trait::async_trait]
1127    impl crate::storage::A2aAtomicStore for FakeAtomicStore {
1128        fn backend_name(&self) -> &'static str {
1129            "fake-atomic"
1130        }
1131        async fn create_task_with_events(
1132            &self,
1133            _t: &str,
1134            _o: &str,
1135            task: turul_a2a_types::Task,
1136            _e: Vec<crate::streaming::StreamEvent>,
1137        ) -> Result<(turul_a2a_types::Task, Vec<u64>), crate::storage::A2aStorageError> {
1138            Ok((task, vec![]))
1139        }
1140        async fn update_task_status_with_events(
1141            &self,
1142            _t: &str,
1143            _tid: &str,
1144            _o: &str,
1145            _s: turul_a2a_types::TaskStatus,
1146            _e: Vec<crate::streaming::StreamEvent>,
1147        ) -> Result<(turul_a2a_types::Task, Vec<u64>), crate::storage::A2aStorageError> {
1148            unimplemented!()
1149        }
1150        async fn update_task_with_events(
1151            &self,
1152            _t: &str,
1153            _o: &str,
1154            _task: turul_a2a_types::Task,
1155            _e: Vec<crate::streaming::StreamEvent>,
1156        ) -> Result<Vec<u64>, crate::storage::A2aStorageError> {
1157            Ok(vec![])
1158        }
1159    }
1160
1161    #[test]
1162    fn mixed_atomic_backend_rejected_at_build() {
1163        // Task + push + event = in-memory, atomic = fake-atomic → should fail
1164        let result = A2aServer::builder()
1165            .executor(DummyExecutor)
1166            .atomic_store(FakeAtomicStore)
1167            .build();
1168
1169        match result {
1170            Err(e) => {
1171                let msg = e.to_string();
1172                assert!(
1173                    msg.contains("backend mismatch") || msg.contains("Storage backend mismatch"),
1174                    "Error should mention backend mismatch: {msg}"
1175                );
1176            }
1177            Ok(_) => panic!("Mixed atomic backend should be rejected"),
1178        }
1179    }
1180
1181    #[test]
1182    fn same_backend_accepted() {
1183        // All four from the same InMemoryA2aStorage — should succeed
1184        let storage = InMemoryA2aStorage::new();
1185        let result = A2aServer::builder()
1186            .executor(DummyExecutor)
1187            .task_storage(storage.clone())
1188            .push_storage(storage.clone())
1189            .event_store(storage.clone())
1190            .atomic_store(storage)
1191            .build();
1192
1193        assert!(result.is_ok(), "Same backend should be accepted");
1194    }
1195
1196    #[test]
1197    fn unified_storage_accepted() {
1198        // Single .storage() call — the preferred path. No push-delivery
1199        // wiring implied.
1200        let result = A2aServer::builder()
1201            .executor(DummyExecutor)
1202            .storage(InMemoryA2aStorage::new())
1203            .build();
1204
1205        assert!(result.is_ok(), "Unified .storage() should be accepted");
1206    }
1207
1208    #[test]
1209    fn runtime_config_setters_survive_build() {
1210        // Invariant: builder setters for runtime-config knobs must propagate
1211        // into the built server's AppState so that handlers can read
1212        // them via `state.runtime_config`. This test exists to prevent the
1213        // regression where setters silently become no-ops.
1214        let server = A2aServer::builder()
1215            .executor(DummyExecutor)
1216            // One representative setter per consumer group.
1217            .blocking_task_timeout(Duration::from_secs(42))
1218            .timeout_abort_grace(Duration::from_secs(7))
1219            .cancel_handler_grace(Duration::from_secs(3))
1220            .cancel_handler_poll_interval(Duration::from_millis(50))
1221            .cross_instance_cancel_poll_interval(Duration::from_secs(2))
1222            .push_max_attempts(17)
1223            .push_backoff_base(Duration::from_millis(500))
1224            .push_backoff_cap(Duration::from_secs(90))
1225            .push_backoff_jitter(0.5)
1226            .push_request_timeout(Duration::from_secs(20))
1227            .push_connect_timeout(Duration::from_secs(3))
1228            .push_read_timeout(Duration::from_secs(15))
1229            .push_claim_expiry(Duration::from_secs(20 * 60))
1230            .push_config_cache_ttl(Duration::from_secs(10))
1231            .push_failed_delivery_retention(Duration::from_secs(48 * 60 * 60))
1232            .push_max_payload_bytes(2 * 1024 * 1024)
1233            .allow_insecure_push_urls(true)
1234            .build()
1235            .expect("build must succeed");
1236
1237        let cfg = &server.state.runtime_config;
1238        assert_eq!(cfg.blocking_task_timeout, Duration::from_secs(42));
1239        assert_eq!(cfg.timeout_abort_grace, Duration::from_secs(7));
1240        assert_eq!(cfg.cancel_handler_grace, Duration::from_secs(3));
1241        assert_eq!(cfg.cancel_handler_poll_interval, Duration::from_millis(50));
1242        assert_eq!(
1243            cfg.cross_instance_cancel_poll_interval,
1244            Duration::from_secs(2)
1245        );
1246        assert_eq!(cfg.push_max_attempts, 17);
1247        assert_eq!(cfg.push_backoff_base, Duration::from_millis(500));
1248        assert_eq!(cfg.push_backoff_cap, Duration::from_secs(90));
1249        assert!((cfg.push_backoff_jitter - 0.5).abs() < f32::EPSILON);
1250        assert_eq!(cfg.push_request_timeout, Duration::from_secs(20));
1251        assert_eq!(cfg.push_connect_timeout, Duration::from_secs(3));
1252        assert_eq!(cfg.push_read_timeout, Duration::from_secs(15));
1253        assert_eq!(cfg.push_claim_expiry, Duration::from_secs(20 * 60));
1254        assert_eq!(cfg.push_config_cache_ttl, Duration::from_secs(10));
1255        assert_eq!(
1256            cfg.push_failed_delivery_retention,
1257            Duration::from_secs(48 * 60 * 60)
1258        );
1259        assert_eq!(cfg.push_max_payload_bytes, 2 * 1024 * 1024);
1260        assert!(cfg.allow_insecure_push_urls);
1261    }
1262
1263    #[test]
1264    fn runtime_config_defaults_reach_built_server() {
1265        // Second half of the contract: when no setters are called, the
1266        // documented defaults land in AppState.
1267        let server = A2aServer::builder()
1268            .executor(DummyExecutor)
1269            .build()
1270            .expect("build must succeed");
1271
1272        let cfg = &server.state.runtime_config;
1273        let defaults = RuntimeConfig::default();
1274        assert_eq!(cfg.blocking_task_timeout, defaults.blocking_task_timeout);
1275        assert_eq!(cfg.push_max_attempts, defaults.push_max_attempts);
1276        assert_eq!(
1277            cfg.allow_insecure_push_urls,
1278            defaults.allow_insecure_push_urls
1279        );
1280    }
1281
1282    // -----------------------------------------------------------------
1283    // Push delivery store wiring
1284    // -----------------------------------------------------------------
1285
1286    #[test]
1287    fn unified_storage_does_not_auto_wire_push_delivery_store() {
1288        // `.storage()` wires storage traits only.
1289        // Implementing the push-delivery trait is not intent to deliver —
1290        // a non-push deployment must be able to pass any all-in-one
1291        // backend without opting in to push semantics.
1292        let server = A2aServer::builder()
1293            .executor(DummyExecutor)
1294            .storage(InMemoryA2aStorage::new())
1295            .build()
1296            .expect("build must succeed");
1297        assert!(
1298            server.state.push_delivery_store.is_none(),
1299            ".storage() must NOT auto-wire push_delivery_store — push is opt-in via .push_delivery_store()"
1300        );
1301    }
1302
1303    #[test]
1304    fn default_storage_leaves_push_delivery_store_unset() {
1305        // When no storage is passed at all, push delivery is opt-in —
1306        // push-config CRUD must still work but no worker gets spawned.
1307        let server = A2aServer::builder()
1308            .executor(DummyExecutor)
1309            .build()
1310            .expect("build must succeed");
1311        assert!(
1312            server.state.push_delivery_store.is_none(),
1313            "default build must leave push_delivery_store unset"
1314        );
1315    }
1316
1317    #[test]
1318    fn explicit_push_delivery_store_setter() {
1319        // The individual setter exists for mixed-backend tests. It should
1320        // also satisfy the same-backend check when paired with matching
1321        // storage.
1322        let storage = InMemoryA2aStorage::new().with_push_dispatch_enabled(true);
1323        let server = A2aServer::builder()
1324            .executor(DummyExecutor)
1325            .storage(storage.clone())
1326            .push_delivery_store(storage)
1327            .build()
1328            .expect("build must succeed");
1329        assert!(server.state.push_delivery_store.is_some());
1330    }
1331
1332    // -----------------------------------------------------------------
1333    // Push-dispatch consistency (ADR-013 §4.3 / §10.2)
1334    // -----------------------------------------------------------------
1335
1336    #[test]
1337    fn builder_rejects_push_consumer_without_dispatch_enabled() {
1338        // `push_delivery_store` wired + atomic store's
1339        // `push_dispatch_enabled=false` ⇒ consumer would never receive
1340        // durable markers. Build must fail and cite `with_push_dispatch_enabled`.
1341        let storage = InMemoryA2aStorage::new(); // flag defaults to false
1342        let res = A2aServer::builder()
1343            .executor(DummyExecutor)
1344            .task_storage(storage.clone())
1345            .push_storage(storage.clone())
1346            .event_store(storage.clone())
1347            .atomic_store(storage.clone())
1348            .cancellation_supervisor(storage.clone())
1349            .push_delivery_store(storage)
1350            .build();
1351        let err = match res {
1352            Err(e) => e.to_string(),
1353            Ok(_) => panic!("orphan delivery store must be rejected"),
1354        };
1355        assert!(
1356            err.contains("push_delivery_store wired")
1357                && err.contains("push_dispatch_enabled")
1358                && err.contains("with_push_dispatch_enabled(true)"),
1359            "error should name the fix: {err}"
1360        );
1361    }
1362
1363    #[test]
1364    fn builder_rejects_push_dispatch_without_consumer() {
1365        // `push_dispatch_enabled=true` + no `push_delivery_store` ⇒
1366        // markers would accumulate with no reader. Build must fail and
1367        // mention the no-consumer rationale.
1368        let storage = InMemoryA2aStorage::new().with_push_dispatch_enabled(true);
1369        let res = A2aServer::builder()
1370            .executor(DummyExecutor)
1371            .task_storage(storage.clone())
1372            .push_storage(storage.clone())
1373            .event_store(storage.clone())
1374            .atomic_store(storage.clone())
1375            .cancellation_supervisor(storage)
1376            // deliberately no .push_delivery_store(...)
1377            .build();
1378        let err = match res {
1379            Err(e) => e.to_string(),
1380            Ok(_) => panic!("orphan dispatch flag must be rejected"),
1381        };
1382        assert!(
1383            err.contains("push_dispatch_enabled() is true") && err.contains("no consumer"),
1384            "error should cite the orphaned-marker rationale: {err}"
1385        );
1386    }
1387
1388    #[test]
1389    fn builder_accepts_push_fully_wired() {
1390        // Positive case: storage opted in via `with_push_dispatch_enabled(true)`
1391        // AND delivery store explicitly wired via `.push_delivery_store(...)`.
1392        let storage = InMemoryA2aStorage::new().with_push_dispatch_enabled(true);
1393        let res = A2aServer::builder()
1394            .executor(DummyExecutor)
1395            .storage(storage.clone())
1396            .push_delivery_store(storage)
1397            .build();
1398        assert!(res.is_ok(), "push fully wired must build: {:?}", res.err());
1399    }
1400
1401    #[test]
1402    fn builder_accepts_non_push_deployment() {
1403        // Positive case: both flag off and no delivery store.
1404        let storage = InMemoryA2aStorage::new(); // flag defaults to false
1405        let res = A2aServer::builder()
1406            .executor(DummyExecutor)
1407            .task_storage(storage.clone())
1408            .push_storage(storage.clone())
1409            .event_store(storage.clone())
1410            .atomic_store(storage.clone())
1411            .cancellation_supervisor(storage)
1412            .build();
1413        assert!(
1414            res.is_ok(),
1415            "non-push deployment must build: {:?}",
1416            res.err()
1417        );
1418    }
1419
1420    #[test]
1421    fn retry_horizon_violation_rejected() {
1422        // push_claim_expiry <= max_attempts * backoff_cap must fail fast
1423        //. The in-memory delivery store is required to
1424        // trigger the check — so push must be explicitly wired.
1425        let storage = InMemoryA2aStorage::new().with_push_dispatch_enabled(true);
1426        let res = A2aServer::builder()
1427            .executor(DummyExecutor)
1428            .storage(storage.clone())
1429            .push_delivery_store(storage)
1430            .push_max_attempts(10)
1431            .push_backoff_cap(Duration::from_secs(60))
1432            // 10 * 60s = 600s — equal to claim expiry, which is <=, so rejected.
1433            .push_claim_expiry(Duration::from_secs(600))
1434            .build();
1435        let err = match res {
1436            Err(e) => e,
1437            Ok(_) => panic!("retry horizon violation must be rejected"),
1438        };
1439        let msg = err.to_string();
1440        assert!(
1441            msg.contains("retry horizon") || msg.contains("push_claim_expiry"),
1442            "error should mention retry horizon: {msg}"
1443        );
1444    }
1445
1446    // Test middleware that contributes a non-empty SecurityContribution
1447    // so `into_augmented_state` must rebuild the state. Accepts every
1448    // request — this test is about wiring, not auth behaviour.
1449    struct ContribMiddleware;
1450
1451    #[async_trait::async_trait]
1452    impl A2aMiddleware for ContribMiddleware {
1453        async fn before_request(
1454            &self,
1455            _ctx: &mut crate::middleware::RequestContext,
1456        ) -> Result<(), crate::middleware::MiddlewareError> {
1457            Ok(())
1458        }
1459        fn security_contribution(&self) -> SecurityContribution {
1460            SecurityContribution::new().with_scheme(
1461                "TestApiKey",
1462                turul_a2a_proto::SecurityScheme {
1463                    scheme: Some(
1464                        turul_a2a_proto::security_scheme::Scheme::ApiKeySecurityScheme(
1465                            turul_a2a_proto::ApiKeySecurityScheme {
1466                                description: "test".into(),
1467                                location: "header".into(),
1468                                name: "X-Test-Key".into(),
1469                            },
1470                        ),
1471                    ),
1472                },
1473                vec![],
1474            )
1475        }
1476    }
1477
1478    #[test]
1479    fn push_delivery_store_survives_security_augmentation() {
1480        // Regression: `into_router`'s rebuilt AppState used to hard-code
1481        // push_delivery_store: None, silently disabling push delivery on
1482        // every auth-gated deployment. The augmented state must carry the
1483        // same claim-store Arc the builder installed.
1484        let storage = InMemoryA2aStorage::new().with_push_dispatch_enabled(true);
1485        let server = A2aServer::builder()
1486            .executor(DummyExecutor)
1487            .storage(storage.clone())
1488            .push_delivery_store(storage)
1489            .middleware(Arc::new(ContribMiddleware))
1490            .build()
1491            .expect("build must succeed");
1492        // Sanity: push_delivery_store is wired pre-augmentation.
1493        assert!(server.state.push_delivery_store.is_some());
1494
1495        let (augmented, had_security) = server.into_augmented_state();
1496        assert!(
1497            had_security,
1498            "ContribMiddleware contributed a scheme — augmentation must run"
1499        );
1500        assert!(
1501            augmented.push_delivery_store.is_some(),
1502            "push_delivery_store must survive security augmentation"
1503        );
1504    }
1505
1506    #[test]
1507    fn push_delivery_store_passthrough_without_security() {
1508        // With no contributing middleware, `into_augmented_state` returns
1509        // the original state unchanged — the store must still be present.
1510        let storage = InMemoryA2aStorage::new().with_push_dispatch_enabled(true);
1511        let server = A2aServer::builder()
1512            .executor(DummyExecutor)
1513            .storage(storage.clone())
1514            .push_delivery_store(storage)
1515            .build()
1516            .expect("build must succeed");
1517        let (state, had_security) = server.into_augmented_state();
1518        assert!(!had_security);
1519        assert!(state.push_delivery_store.is_some());
1520    }
1521
1522    #[test]
1523    fn retry_horizon_satisfied_accepted() {
1524        // push_claim_expiry > max_attempts * backoff_cap succeeds.
1525        let storage = InMemoryA2aStorage::new().with_push_dispatch_enabled(true);
1526        let server = A2aServer::builder()
1527            .executor(DummyExecutor)
1528            .storage(storage.clone())
1529            .push_delivery_store(storage)
1530            .push_max_attempts(5)
1531            .push_backoff_cap(Duration::from_secs(60))
1532            .push_claim_expiry(Duration::from_secs(5 * 60 + 1))
1533            .build()
1534            .expect("horizon-satisfying config must build");
1535        assert!(server.state.push_delivery_store.is_some());
1536    }
1537}