Skip to main content

turul_a2a/
router.rs

1//! HTTP router matching proto google.api.http annotations.
2//!
3//! Routes use axum wildcard catch-all for task paths because the proto
4//! uses `{id=*}:action` patterns that don't map directly to axum's `:param` syntax.
5
6use std::sync::Arc;
7use std::time::Duration;
8
9use axum::extract::{Path, Query, State};
10use axum::response::IntoResponse;
11use axum::response::sse::{Event, KeepAlive, Sse};
12use axum::routing::{get, post};
13use axum::{Json, Router};
14use tokio::sync::broadcast;
15use tokio_stream::wrappers::ReceiverStream;
16
17use crate::error::A2aError;
18use crate::executor::AgentExecutor;
19use crate::storage::{
20    A2aAtomicStore, A2aEventStore, A2aPushNotificationStorage, A2aStorageError, A2aTaskStorage,
21    TaskFilter, TaskListPage,
22};
23use crate::streaming::{StreamEvent, replay};
24use turul_a2a_types::{Message, Task, TaskState, TaskStatus};
25
26/// Shared server state.
27#[derive(Clone)]
28pub struct AppState {
29    pub executor: Arc<dyn AgentExecutor>,
30    pub task_storage: Arc<dyn A2aTaskStorage>,
31    pub push_storage: Arc<dyn A2aPushNotificationStorage>,
32    pub event_store: Arc<dyn crate::storage::A2aEventStore>,
33    pub atomic_store: Arc<dyn A2aAtomicStore>,
34    pub event_broker: crate::streaming::TaskEventBroker,
35    pub middleware_stack: Arc<crate::middleware::MiddlewareStack>,
36    /// Runtime configuration carried from [`crate::server::A2aServerBuilder`].
37    /// Consumers read fields via `state.runtime_config` — cancellation
38    /// handler grace / poll interval, cross-instance cancel poll
39    /// interval, blocking-send two-deadline timeouts, and push delivery
40    /// tuning. Construct via
41    /// [`crate::server::RuntimeConfig::default()`] when building
42    /// `AppState` directly (tests, Lambda adapter) rather than through
43    /// the server builder.
44    pub runtime_config: crate::server::RuntimeConfig,
45
46    /// In-flight task registry. Holds one
47    /// [`crate::server::in_flight::InFlightHandle`] per spawned executor
48    /// — keyed by `(tenant, task_id)`. Populated by the executor spawn
49    /// path; consumed by the `:cancel` handler to trip the local
50    /// cancellation token if the executor runs on this instance. May be
51    /// empty when no executor has been spawned through the registry yet.
52    pub in_flight: Arc<crate::server::in_flight::InFlightRegistry>,
53
54    /// Supervisor-only cancel-marker reads (ADR-012 §3 / §10). Separate
55    /// from `task_storage` so handler code cannot reach the unscoped
56    /// reads. Use `set_cancel_requested` on `task_storage` for marker
57    /// writes (owner-scoped, handler-safe).
58    pub cancellation_supervisor: Arc<dyn crate::storage::A2aCancellationSupervisor>,
59
60    /// Push-delivery coordination store. `None` on
61    /// deployments that do not wire a `PushDeliveryWorker`; the
62    /// push config CRUD paths continue to work without it —
63    /// configs are stored, just not delivered. Set via
64    /// [`crate::server::A2aServerBuilder::push_delivery_store`].
65    pub push_delivery_store: Option<Arc<dyn crate::push::A2aPushDeliveryStore>>,
66
67    /// Push-delivery dispatcher (ADR-011 §2, §13.13). Populated by the
68    /// server builder iff `push_delivery_store` is wired. Handler and
69    /// executor commit paths call [`crate::push::PushDispatcher::dispatch`] after a
70    /// successful terminal atomic-store write so every terminal —
71    /// executor-driven, framework-forced CANCEL, or hard-timeout
72    /// FAILED — fans out to registered push configs by the same
73    /// contract.
74    pub push_dispatcher: Option<Arc<crate::push::PushDispatcher>>,
75
76    /// Durable executor queue. When `Some`, the
77    /// `return_immediately = true` path in
78    /// [`core_send_message`] enqueues a
79    /// [`crate::durable_executor::QueuedExecutorJob`] instead of
80    /// spawning the executor locally. A separate invocation consumes
81    /// the queue and runs the executor to terminal. Wired exclusively
82    /// through `LambdaA2aServerBuilder::with_durable_executor` /
83    /// `with_sqs_return_immediately` in `turul-a2a-aws-lambda`; the
84    /// long-lived [`crate::server::A2aServerBuilder`] keeps this
85    /// `None` because `tokio::spawn` is already durable in a
86    /// long-lived process.
87    pub durable_executor_queue: Option<Arc<dyn crate::durable_executor::DurableExecutorQueue>>,
88}
89
90/// Build the axum router with all proto-defined routes.
91pub fn build_router(state: AppState) -> Router {
92    let router = Router::new()
93        // Agent card discovery
94        .route("/.well-known/agent-card.json", get(agent_card_handler))
95        .route("/extendedAgentCard", get(extended_agent_card_handler))
96        // Message operations (proto lines 23, 35)
97        .route("/message:send", post(send_message_handler))
98        .route("/message:stream", post(send_streaming_message_handler))
99        // Task list (proto line 57)
100        .route("/tasks", get(list_tasks_handler))
101        // Task operations via wildcard (proto lines 47, 66, 78, 92-139)
102        .route(
103            "/tasks/{*rest}",
104            get(task_get_dispatch)
105                .post(task_post_dispatch)
106                .delete(task_delete_dispatch),
107        )
108        // Tenant-prefixed routes (proto additional_bindings)
109        .route("/{tenant}/message:send", post(tenant_send_message_handler))
110        .route(
111            "/{tenant}/message:stream",
112            post(tenant_send_streaming_message_handler),
113        )
114        .route("/{tenant}/tasks", get(tenant_list_tasks_handler))
115        .route(
116            "/{tenant}/extendedAgentCard",
117            get(extended_agent_card_handler),
118        )
119        .route(
120            "/{tenant}/tasks/{*rest}",
121            get(tenant_task_get_dispatch)
122                .post(tenant_task_post_dispatch)
123                .delete(tenant_task_delete_dispatch),
124        );
125
126    // JSON-RPC dispatch: /jsonrpc is the canonical endpoint
127    let router = router.route("/jsonrpc", post(crate::jsonrpc::jsonrpc_dispatch_handler));
128
129    // A2A v0.3 compat: root POST route for a2a-sdk 0.3.x clients that POST
130    // to the agent card URL (base URL) rather than /message:send.
131    // Removal condition: when a2a-sdk supports v1.0 routing.
132    #[cfg(feature = "compat-v03")]
133    let router = router.route("/", post(crate::jsonrpc::jsonrpc_dispatch_handler));
134
135    // Wrap with auth Tower layer (runs second — after transport compliance)
136    let auth_layer = crate::middleware::AuthLayer::new(state.middleware_stack.clone());
137    // Wrap with transport compliance layer (runs first — outermost)
138    let transport_layer = crate::middleware::transport::TransportComplianceLayer;
139    router
140        .with_state(state)
141        .layer(auth_layer)
142        .layer(transport_layer)
143}
144
145// =========================================================
146// Task path parsing
147// =========================================================
148
149/// Parsed task path action from the wildcard segment.
150///
151/// For push notification config paths, the same parse result is used for
152/// multiple HTTP methods (GET=list/get, POST=create, DELETE=delete).
153/// The dispatch functions disambiguate by HTTP method.
154#[derive(Debug, PartialEq)]
155enum TaskAction {
156    /// /tasks/{id} — GET=GetTask
157    GetTask(String),
158    /// /tasks/{id}:cancel — POST=CancelTask
159    CancelTask(String),
160    /// /tasks/{id}:subscribe — GET=SubscribeToTask (SSE)
161    SubscribeToTask(String),
162    /// /tasks/{task_id}/pushNotificationConfigs — GET=list, POST=create
163    PushConfigCollection(String),
164    /// /tasks/{task_id}/pushNotificationConfigs/{config_id} — GET=get, DELETE=delete
165    PushConfigItem(String, String),
166}
167
168fn parse_task_path(rest: &str) -> Option<TaskAction> {
169    let rest = rest.strip_prefix('/').unwrap_or(rest);
170    let parts: Vec<&str> = rest.split('/').collect();
171
172    match parts.as_slice() {
173        // /tasks/{id}:cancel or /tasks/{id}:subscribe or /tasks/{id}
174        [segment] => {
175            if let Some(id) = segment.strip_suffix(":cancel") {
176                Some(TaskAction::CancelTask(id.to_string()))
177            } else if let Some(id) = segment.strip_suffix(":subscribe") {
178                Some(TaskAction::SubscribeToTask(id.to_string()))
179            } else {
180                Some(TaskAction::GetTask(segment.to_string()))
181            }
182        }
183        // /tasks/{task_id}/pushNotificationConfigs — disambiguated by HTTP method in dispatch
184        [task_id, "pushNotificationConfigs"] => {
185            Some(TaskAction::PushConfigCollection(task_id.to_string()))
186        }
187        // /tasks/{task_id}/pushNotificationConfigs/{config_id} — disambiguated by HTTP method
188        [task_id, "pushNotificationConfigs", config_id] => Some(TaskAction::PushConfigItem(
189            task_id.to_string(),
190            config_id.to_string(),
191        )),
192        _ => None,
193    }
194}
195
196// =========================================================
197// Dispatch — default tenant (primary routes)
198// =========================================================
199
200async fn task_get_dispatch(
201    State(state): State<AppState>,
202    axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
203    headers: axum::http::HeaderMap,
204    Path(rest): Path<String>,
205    Query(query): Query<TaskGetCombinedQuery>,
206) -> Result<axum::response::Response, A2aError> {
207    let last_event_id = headers
208        .get("Last-Event-ID")
209        .or_else(|| headers.get("last-event-id"))
210        .and_then(|v| v.to_str().ok())
211        .map(String::from);
212    dispatch_task_get(
213        state,
214        DEFAULT_TENANT,
215        ctx.identity.owner(),
216        &rest,
217        &query,
218        last_event_id.as_deref(),
219    )
220    .await
221}
222
223async fn task_post_dispatch(
224    State(state): State<AppState>,
225    axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
226    Path(rest): Path<String>,
227    body: String,
228) -> Result<axum::response::Response, A2aError> {
229    dispatch_task_post(state, DEFAULT_TENANT, ctx.identity.owner(), &rest, body).await
230}
231
232async fn task_delete_dispatch(
233    State(state): State<AppState>,
234    axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
235    Path(rest): Path<String>,
236) -> Result<axum::response::Response, A2aError> {
237    dispatch_task_delete(state, DEFAULT_TENANT, ctx.identity.owner(), &rest).await
238}
239
240// =========================================================
241// Dispatch — tenant-prefixed routes
242// =========================================================
243
244async fn tenant_send_message_handler(
245    State(state): State<AppState>,
246    axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
247    Path(tenant): Path<String>,
248    body: String,
249) -> Result<Json<serde_json::Value>, A2aError> {
250    let claims = ctx.identity.claims().cloned();
251    core_send_message(state, &tenant, ctx.identity.owner(), claims, body).await
252}
253
254async fn tenant_list_tasks_handler(
255    State(state): State<AppState>,
256    axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
257    Path(tenant): Path<String>,
258    Query(query): Query<ListTasksQuery>,
259) -> Result<Json<serde_json::Value>, A2aError> {
260    core_list_tasks(state, &tenant, ctx.identity.owner(), &query).await
261}
262
263async fn tenant_task_get_dispatch(
264    State(state): State<AppState>,
265    axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
266    headers: axum::http::HeaderMap,
267    Path((tenant, rest)): Path<(String, String)>,
268    Query(query): Query<TaskGetCombinedQuery>,
269) -> Result<axum::response::Response, A2aError> {
270    let last_event_id = headers
271        .get("Last-Event-ID")
272        .or_else(|| headers.get("last-event-id"))
273        .and_then(|v| v.to_str().ok())
274        .map(String::from);
275    dispatch_task_get(
276        state,
277        &tenant,
278        ctx.identity.owner(),
279        &rest,
280        &query,
281        last_event_id.as_deref(),
282    )
283    .await
284}
285
286async fn tenant_task_post_dispatch(
287    State(state): State<AppState>,
288    axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
289    Path((tenant, rest)): Path<(String, String)>,
290    body: String,
291) -> Result<axum::response::Response, A2aError> {
292    dispatch_task_post(state, &tenant, ctx.identity.owner(), &rest, body).await
293}
294
295async fn tenant_task_delete_dispatch(
296    State(state): State<AppState>,
297    axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
298    Path((tenant, rest)): Path<(String, String)>,
299) -> Result<axum::response::Response, A2aError> {
300    dispatch_task_delete(state, &tenant, ctx.identity.owner(), &rest).await
301}
302
303// =========================================================
304// Shared dispatch logic
305// =========================================================
306
307/// Combined query params — works for both task and push config GET routes.
308/// axum parses all query params into a single struct; unused fields default.
309#[derive(serde::Deserialize, Default)]
310#[serde(default)]
311struct TaskGetCombinedQuery {
312    #[serde(rename = "historyLength")]
313    history_length: Option<i32>,
314    #[serde(rename = "pageSize")]
315    page_size: Option<i32>,
316    #[serde(rename = "pageToken")]
317    page_token: Option<String>,
318}
319
320async fn dispatch_task_get(
321    state: AppState,
322    tenant: &str,
323    owner: &str,
324    rest: &str,
325    query: &TaskGetCombinedQuery,
326    last_event_id: Option<&str>,
327) -> Result<axum::response::Response, A2aError> {
328    match parse_task_path(rest) {
329        Some(TaskAction::GetTask(id)) => {
330            let Json(v) = core_get_task(state, tenant, owner, &id, query.history_length).await?;
331            Ok(Json(v).into_response())
332        }
333        Some(TaskAction::SubscribeToTask(id)) => {
334            core_subscribe_to_task(state, tenant, owner, &id, last_event_id).await
335        }
336        Some(TaskAction::PushConfigCollection(task_id)) => {
337            let pq = PushConfigQuery {
338                page_size: query.page_size,
339                page_token: query.page_token.clone(),
340            };
341            let Json(v) = core_list_push_configs(state, tenant, owner, &task_id, &pq).await?;
342            Ok(Json(v).into_response())
343        }
344        Some(TaskAction::PushConfigItem(task_id, config_id)) => {
345            let Json(v) = core_get_push_config(state, tenant, owner, &task_id, &config_id).await?;
346            Ok(Json(v).into_response())
347        }
348        _ => Err(A2aError::InvalidRequest {
349            message: "Invalid task path".into(),
350        }),
351    }
352}
353
354async fn dispatch_task_post(
355    state: AppState,
356    tenant: &str,
357    owner: &str,
358    rest: &str,
359    body: String,
360) -> Result<axum::response::Response, A2aError> {
361    match parse_task_path(rest) {
362        Some(TaskAction::CancelTask(id)) => {
363            let Json(v) = core_cancel_task(state, tenant, owner, &id).await?;
364            Ok(Json(v).into_response())
365        }
366        Some(TaskAction::PushConfigCollection(task_id)) => {
367            let Json(v) = core_create_push_config(state, tenant, owner, &task_id, body).await?;
368            Ok(Json(v).into_response())
369        }
370        _ => Err(A2aError::InvalidRequest {
371            message: "Invalid task path".into(),
372        }),
373    }
374}
375
376async fn dispatch_task_delete(
377    state: AppState,
378    tenant: &str,
379    owner: &str,
380    rest: &str,
381) -> Result<axum::response::Response, A2aError> {
382    match parse_task_path(rest) {
383        Some(TaskAction::PushConfigItem(task_id, config_id)) => {
384            let Json(v) =
385                core_delete_push_config(state, tenant, owner, &task_id, &config_id).await?;
386            Ok(Json(v).into_response())
387        }
388        _ => Err(A2aError::InvalidRequest {
389            message: "Invalid task path".into(),
390        }),
391    }
392}
393
394// =========================================================
395// Handlers — agent card (no tenant scoping)
396// =========================================================
397
398async fn agent_card_handler(State(state): State<AppState>) -> impl IntoResponse {
399    let card = serde_json::to_value(state.executor.agent_card()).unwrap_or_default();
400    #[cfg(feature = "compat-v03")]
401    let card = crate::compat_v03::inject_agent_card_compat(card);
402    Json(card)
403}
404
405async fn extended_agent_card_handler(
406    State(state): State<AppState>,
407) -> Result<impl IntoResponse, A2aError> {
408    match state.executor.extended_agent_card(None) {
409        Some(extended) => {
410            let card = serde_json::to_value(extended).unwrap_or_default();
411            #[cfg(feature = "compat-v03")]
412            let card = crate::compat_v03::inject_agent_card_compat(card);
413            Ok(Json(card))
414        }
415        None => Err(A2aError::ExtendedAgentCardNotConfigured),
416    }
417}
418
419async fn send_streaming_message_handler(
420    State(state): State<AppState>,
421    axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
422    body: String,
423) -> Result<axum::response::Response, A2aError> {
424    let claims = ctx.identity.claims().cloned();
425    core_send_streaming_message(state, DEFAULT_TENANT, ctx.identity.owner(), claims, body).await
426}
427
428async fn tenant_send_streaming_message_handler(
429    State(state): State<AppState>,
430    axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
431    Path(tenant): Path<String>,
432    body: String,
433) -> Result<axum::response::Response, A2aError> {
434    let claims = ctx.identity.claims().cloned();
435    core_send_streaming_message(state, &tenant, ctx.identity.owner(), claims, body).await
436}
437
438/// Task-side setup for streaming send: parse request, create task,
439/// emit SUBMITTED + WORKING through the atomic store, spawn the
440/// executor. Returns the `task_id` and a broker wake-up receiver.
441///
442/// Shared by the HTTP SSE path (`core_send_streaming_message`) and the
443/// gRPC streaming adapter. Keeping both transports on this single entry
444/// point is the ADR-005 invariant extended to three transports — no
445/// behaviour fork between SSE and gRPC streaming.
446pub(crate) async fn setup_streaming_send(
447    state: AppState,
448    tenant: &str,
449    owner: &str,
450    claims: Option<serde_json::Value>,
451    body: String,
452) -> Result<(String, broadcast::Receiver<()>), A2aError> {
453    let request: turul_a2a_proto::SendMessageRequest =
454        serde_json::from_str(&body).map_err(|e| A2aError::InvalidRequest {
455            message: format!("Invalid request body: {e}"),
456        })?;
457
458    let proto_message = request.message.ok_or(A2aError::InvalidRequest {
459        message: "message field is required".into(),
460    })?;
461
462    let message = Message::try_from(proto_message).map_err(|e| A2aError::InvalidRequest {
463        message: format!("Invalid message: {e}"),
464    })?;
465
466    let task_id = uuid::Uuid::now_v7().to_string();
467    let context_id = if message.as_proto().context_id.is_empty() {
468        uuid::Uuid::now_v7().to_string()
469    } else {
470        message.as_proto().context_id.clone()
471    };
472
473    // Subscribe to wake-ups BEFORE creating the task so we don't miss notifications.
474    let wake_rx = state.event_broker.subscribe(&task_id).await;
475
476    // Atomic: create task (SUBMITTED) + SUBMITTED event.
477    let mut task =
478        Task::new(&task_id, TaskStatus::new(TaskState::Submitted)).with_context_id(&context_id);
479    task.append_message(message.clone());
480
481    let submitted_event = StreamEvent::StatusUpdate {
482        status_update: crate::streaming::StatusUpdatePayload {
483            task_id: task_id.clone(),
484            context_id: context_id.clone(),
485            status: serde_json::to_value(TaskStatus::new(TaskState::Submitted)).unwrap_or_default(),
486        },
487    };
488
489    state
490        .atomic_store
491        .create_task_with_events(tenant, owner, task, vec![submitted_event])
492        .await
493        .map_err(A2aError::from)?;
494
495    state.event_broker.notify(&task_id).await;
496
497    // Advance SUBMITTED → WORKING via the CAS-guarded atomic store so
498    // streaming subscribers see the WORKING event before the executor
499    // emits its own events.
500    let working_event = StreamEvent::StatusUpdate {
501        status_update: crate::streaming::StatusUpdatePayload {
502            task_id: task_id.clone(),
503            context_id: context_id.clone(),
504            status: serde_json::to_value(TaskStatus::new(TaskState::Working)).unwrap_or_default(),
505        },
506    };
507    state
508        .atomic_store
509        .update_task_status_with_events(
510            tenant,
511            &task_id,
512            owner,
513            TaskStatus::new(TaskState::Working),
514            vec![working_event],
515        )
516        .await
517        .map_err(A2aError::from)?;
518    state.event_broker.notify(&task_id).await;
519
520    // Spawn the executor on a tracked handle. We drop the yielded
521    // receiver — streaming transport does not block on it; subscribers
522    // observe terminal events through the durable store instead.
523    let spawn_deps = crate::server::spawn::SpawnDeps {
524        executor: state.executor.clone(),
525        task_storage: state.task_storage.clone(),
526        atomic_store: state.atomic_store.clone(),
527        event_broker: state.event_broker.clone(),
528        in_flight: state.in_flight.clone(),
529        push_dispatcher: state.push_dispatcher.clone(),
530    };
531    let scope = crate::server::spawn::SpawnScope {
532        tenant: tenant.to_string(),
533        owner: owner.to_string(),
534        task_id: task_id.clone(),
535        context_id: context_id.clone(),
536        message: message.clone(),
537        claims: claims.clone(),
538    };
539    let _spawn = crate::server::spawn::spawn_tracked_executor(spawn_deps, scope)?;
540
541    Ok((task_id, wake_rx))
542}
543
544pub(crate) async fn core_send_streaming_message(
545    state: AppState,
546    tenant: &str,
547    owner: &str,
548    claims: Option<serde_json::Value>,
549    body: String,
550) -> Result<axum::response::Response, A2aError> {
551    let (task_id, wake_rx) =
552        setup_streaming_send(state.clone(), tenant, owner, claims, body).await?;
553    // SSE stream reads from the durable store. No initial Task
554    // snapshot: SUBMITTED + WORKING are already in the event store and
555    // will replay on subscription.
556    Ok(make_store_sse_response(
557        state.event_store,
558        tenant.to_string(),
559        task_id,
560        0,
561        wake_rx,
562        None,
563    ))
564}
565
566pub(crate) async fn core_subscribe_to_task(
567    state: AppState,
568    tenant: &str,
569    owner: &str,
570    task_id: &str,
571    last_event_id_header: Option<&str>,
572) -> Result<axum::response::Response, A2aError> {
573    // Verify task exists and caller owns it
574    let task = state
575        .task_storage
576        .get_task(tenant, task_id, owner, None)
577        .await
578        .map_err(A2aError::from)?
579        .ok_or_else(|| A2aError::TaskNotFound {
580            task_id: task_id.to_string(),
581        })?;
582
583    // Spec §3.1.6: terminal tasks return UnsupportedOperationError
584    if let Some(status) = task.status() {
585        if let Ok(s) = status.state() {
586            if s.is_terminal() {
587                return Err(A2aError::UnsupportedOperation {
588                    message: format!("Task {task_id} is already in terminal state {s:?}"),
589                });
590            }
591        }
592    }
593
594    // Parse Last-Event-ID for replay
595    let after_sequence = last_event_id_header
596        .and_then(replay::parse_last_event_id)
597        .filter(|parsed| parsed.task_id == task_id)
598        .map(|parsed| parsed.sequence)
599        .unwrap_or(0);
600
601    // Spec §3.1.6: MUST return a Task object as the first event.
602    // On reconnection (Last-Event-ID present), skip the snapshot — client has context.
603    let initial_task = if after_sequence == 0 {
604        Some(task)
605    } else {
606        None
607    };
608
609    // Subscribe to wake-up notifications
610    let wake_rx = state.event_broker.subscribe(task_id).await;
611
612    Ok(make_store_sse_response(
613        state.event_store,
614        tenant.to_string(),
615        task_id.to_string(),
616        after_sequence,
617        wake_rx,
618        initial_task,
619    ))
620}
621
622/// Polling interval for cross-instance subscribers that don't receive
623/// same-instance broker notifications.
624const STORE_POLL_INTERVAL: Duration = Duration::from_secs(2);
625
626/// Build an SSE response that reads events from the durable store.
627///
628/// 1. Initial Task snapshot (spec §3.1.6): emit Task object as first event (if provided)
629/// 2. Replay: emit all events after `after_sequence` from the store
630/// 3. Live loop: wait for broker wake-up or poll timeout, re-query store
631/// 4. Close: when a terminal event is emitted
632///
633/// Each SSE event has `id: {task_id}:{sequence}` for reconnection support.
634/// The initial Task snapshot uses `id: {task_id}:0` (before any event sequence).
635fn make_store_sse_response(
636    event_store: Arc<dyn A2aEventStore>,
637    tenant: String,
638    task_id: String,
639    after_sequence: u64,
640    wake_rx: broadcast::Receiver<()>,
641    initial_task: Option<Task>,
642) -> axum::response::Response {
643    let (tx, rx) = tokio::sync::mpsc::channel::<Result<Event, std::convert::Infallible>>(64);
644
645    tokio::spawn(async move {
646        // Spec §3.1.6: emit Task object as first event in the stream
647        if let Some(task) = initial_task {
648            let task_json =
649                serde_json::json!({"task": serde_json::to_value(&task).unwrap_or_default()});
650            let json = serde_json::to_string(&task_json).unwrap_or_default();
651            let sse_event = Event::default()
652                .id(replay::format_event_id(&task_id, 0))
653                .data(json);
654            if tx.send(Ok(sse_event)).await.is_err() {
655                return;
656            }
657        }
658
659        let mut last_seq = after_sequence;
660        let mut wake_rx = wake_rx;
661
662        loop {
663            // Query store for events after last_seq
664            let events = match event_store
665                .get_events_after(&tenant, &task_id, last_seq)
666                .await
667            {
668                Ok(e) => e,
669                Err(_) => break,
670            };
671
672            let mut saw_terminal = false;
673            for (seq, event) in events {
674                last_seq = seq;
675                let event_id = replay::format_event_id(&task_id, seq);
676                let json = serde_json::to_string(&event).unwrap_or_default();
677                let sse_event = Event::default().id(event_id).data(json);
678                if tx.send(Ok(sse_event)).await.is_err() {
679                    return; // subscriber disconnected
680                }
681                if event.is_terminal() {
682                    saw_terminal = true;
683                }
684            }
685
686            if saw_terminal {
687                break; // close stream after terminal event
688            }
689
690            // Wait for broker wake-up or periodic poll (cross-instance fallback)
691            tokio::select! {
692                result = wake_rx.recv() => {
693                    match result {
694                        Ok(()) => {} // re-query store
695                        Err(broadcast::error::RecvError::Closed) => break,
696                        Err(broadcast::error::RecvError::Lagged(_)) => {} // re-query
697                    }
698                }
699                _ = tokio::time::sleep(STORE_POLL_INTERVAL) => {
700                    // Periodic poll for cross-instance correctness
701                }
702            }
703        }
704    });
705
706    let stream = ReceiverStream::new(rx);
707    Sse::new(stream)
708        .keep_alive(KeepAlive::default())
709        .into_response()
710}
711
712// =========================================================
713// Default-tenant axum handlers (delegate to core with DEFAULT_TENANT)
714// =========================================================
715
716const DEFAULT_TENANT: &str = "";
717
718async fn send_message_handler(
719    State(state): State<AppState>,
720    axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
721    body: String,
722) -> Result<Json<serde_json::Value>, A2aError> {
723    let claims = ctx.identity.claims().cloned();
724    core_send_message(state, DEFAULT_TENANT, ctx.identity.owner(), claims, body).await
725}
726
727async fn list_tasks_handler(
728    State(state): State<AppState>,
729    axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
730    Query(query): Query<ListTasksQuery>,
731) -> Result<Json<serde_json::Value>, A2aError> {
732    core_list_tasks(state, DEFAULT_TENANT, ctx.identity.owner(), &query).await
733}
734
735// =========================================================
736// Query param structs
737// =========================================================
738
739#[derive(serde::Deserialize, Default)]
740#[serde(default)]
741pub(crate) struct ListTasksQuery {
742    #[serde(rename = "contextId")]
743    pub(crate) context_id: Option<String>,
744    pub(crate) status: Option<String>,
745    #[serde(rename = "pageSize")]
746    pub(crate) page_size: Option<i32>,
747    #[serde(rename = "pageToken")]
748    pub(crate) page_token: Option<String>,
749    #[serde(rename = "historyLength")]
750    pub(crate) history_length: Option<i32>,
751    #[serde(rename = "includeArtifacts")]
752    pub(crate) include_artifacts: Option<bool>,
753}
754
755#[derive(serde::Deserialize, Default)]
756#[serde(default)]
757pub(crate) struct PushConfigQuery {
758    #[serde(rename = "pageSize")]
759    pub(crate) page_size: Option<i32>,
760    #[serde(rename = "pageToken")]
761    pub(crate) page_token: Option<String>,
762}
763
764// =========================================================
765// Core handler functions — all take tenant explicitly
766// =========================================================
767
768fn parse_task_state(s: &str) -> Option<TaskState> {
769    match s {
770        "TASK_STATE_SUBMITTED" => Some(TaskState::Submitted),
771        "TASK_STATE_WORKING" => Some(TaskState::Working),
772        "TASK_STATE_COMPLETED" => Some(TaskState::Completed),
773        "TASK_STATE_FAILED" => Some(TaskState::Failed),
774        "TASK_STATE_CANCELED" => Some(TaskState::Canceled),
775        "TASK_STATE_INPUT_REQUIRED" => Some(TaskState::InputRequired),
776        "TASK_STATE_REJECTED" => Some(TaskState::Rejected),
777        "TASK_STATE_AUTH_REQUIRED" => Some(TaskState::AuthRequired),
778        _ => None,
779    }
780}
781
782#[doc(hidden)]
783pub async fn core_send_message(
784    state: AppState,
785    tenant: &str,
786    owner: &str,
787    claims: Option<serde_json::Value>,
788    body: String,
789) -> Result<Json<serde_json::Value>, A2aError> {
790    let request: turul_a2a_proto::SendMessageRequest =
791        serde_json::from_str(&body).map_err(|e| A2aError::InvalidRequest {
792            message: format!("Invalid request body: {e}"),
793        })?;
794
795    let proto_message = request.message.ok_or(A2aError::InvalidRequest {
796        message: "message field is required".into(),
797    })?;
798
799    let message = Message::try_from(proto_message).map_err(|e| A2aError::InvalidRequest {
800        message: format!("Invalid message: {e}"),
801    })?;
802
803    let return_immediately = request
804        .configuration
805        .as_ref()
806        .map(|c| c.return_immediately)
807        .unwrap_or(false);
808
809    // Bug 1: runtimes that cannot guarantee post-return
810    // execution (e.g., AWS Lambda) refuse `return_immediately = true`
811    // here — before any storage write — to avoid silently orphaning
812    // the spawned executor. Mapped to HTTP 400 / JSON-RPC -32004 /
813    // gRPC INVALID_ARGUMENT.
814    if return_immediately && !state.runtime_config.supports_return_immediately {
815        tracing::warn!(
816            tenant = tenant,
817            owner = owner,
818            "rejecting SendMessageConfiguration.return_immediately=true: \
819             this runtime does not support post-return executor continuation \
820             (ADR-017 §Decision Bug 1, ADR-013 §4.4)"
821        );
822        return Err(A2aError::UnsupportedOperation {
823            message: "return_immediately=true is not supported on this runtime \
824                      (post-return executor continuation is not guaranteed); \
825                      resubmit with return_immediately=false for a blocking \
826                      send"
827                .into(),
828        });
829    }
830
831    // Bug 3: extract SendMessageConfiguration.history_length
832    // preserving the proto tri-state — None=unbounded, Some(0)=empty,
833    // Some(n>0)=last n (a2a.proto:150-154). Do NOT collapse Some(0) to
834    // None; the storage layer's `trim_task` already differentiates
835    // (e.g., storage/postgres.rs:212-222).
836    let history_length: Option<i32> = request
837        .configuration
838        .as_ref()
839        .and_then(|c| c.history_length);
840
841    // Bug 2 step 1: validate inline push config URL BEFORE any
842    // storage write. A malformed URL returns HTTP 400 / -32602 /
843    // INVALID_ARGUMENT with zero task persistence. URL parse mirrors
844    // `core_create_push_config` (router.rs:1411) and ADR-011 §R1.
845    let inline_push_config: Option<turul_a2a_proto::TaskPushNotificationConfig> = request
846        .configuration
847        .as_ref()
848        .and_then(|c| c.task_push_notification_config.clone());
849    if let Some(cfg) = inline_push_config.as_ref() {
850        if cfg.url.is_empty() {
851            return Err(A2aError::InvalidRequest {
852                message: "inline push config url is required".into(),
853            });
854        }
855        if let Err(e) = url::Url::parse(&cfg.url) {
856            return Err(A2aError::InvalidRequest {
857                message: format!("inline push config url is not a valid URL: {e}"),
858            });
859        }
860    }
861
862    let msg_task_id = message.as_proto().task_id.clone();
863
864    // If message has a task_id, continue the existing task; otherwise create new.
865    let (task_id, context_id, is_continuation) = if !msg_task_id.is_empty() {
866        let existing = state
867            .task_storage
868            .get_task(tenant, &msg_task_id, owner, None)
869            .await
870            .map_err(A2aError::from)?
871            .ok_or_else(|| A2aError::TaskNotFound {
872                task_id: msg_task_id.clone(),
873            })?;
874
875        // Reject contextId/taskId mismatch (spec §3.4.3 MUST).
876        let msg_context_id = &message.as_proto().context_id;
877        if !msg_context_id.is_empty() && msg_context_id != existing.context_id() {
878            return Err(A2aError::InvalidRequest {
879                message: format!(
880                    "contextId mismatch: message has '{}' but task {} has '{}'",
881                    msg_context_id,
882                    msg_task_id,
883                    existing.context_id()
884                ),
885            });
886        }
887
888        // Continuation is only valid from interrupted states.
889        if let Some(status) = existing.status() {
890            if let Ok(s) = status.state() {
891                match s {
892                    TaskState::InputRequired | TaskState::AuthRequired => {}
893                    _ => {
894                        return Err(A2aError::InvalidRequest {
895                            message: format!(
896                                "Task {msg_task_id} is in state {s:?}, only INPUT_REQUIRED or AUTH_REQUIRED tasks accept follow-up messages"
897                            ),
898                        });
899                    }
900                }
901            }
902        }
903
904        (msg_task_id, existing.context_id().to_string(), true)
905    } else {
906        let context_id = if message.as_proto().context_id.is_empty() {
907            uuid::Uuid::now_v7().to_string()
908        } else {
909            message.as_proto().context_id.clone()
910        };
911        (uuid::Uuid::now_v7().to_string(), context_id, false)
912    };
913
914    // §HTTP invocation step 3-4: if a durable executor queue
915    // is wired AND `return_immediately` is requested, build the
916    // envelope and run `check_payload_size` BEFORE any storage write.
917    // Oversize payloads return `A2aError::InvalidRequest` (HTTP 400 /
918    // JSON-RPC -32602 / gRPC INVALID_ARGUMENT) with zero task
919    // persistence. The built envelope is stashed for the enqueue call
920    // after task creation to avoid re-cloning.
921    let durable_queue = if return_immediately {
922        state.durable_executor_queue.clone()
923    } else {
924        None
925    };
926    let durable_job: Option<crate::durable_executor::QueuedExecutorJob> =
927        if let Some(queue) = durable_queue.as_ref() {
928            let now_micros = chrono::Utc::now().timestamp_micros();
929            let job = crate::durable_executor::QueuedExecutorJob {
930                version: crate::durable_executor::QueuedExecutorJob::VERSION,
931                tenant: tenant.to_string(),
932                owner: owner.to_string(),
933                task_id: task_id.clone(),
934                context_id: context_id.clone(),
935                message: message.as_proto().clone(),
936                claims: claims.clone(),
937                enqueued_at_micros: now_micros,
938            };
939            if let Err(e) = queue.check_payload_size(&job) {
940                return Err(A2aError::InvalidRequest {
941                    message: format!("durable executor queue: {e}"),
942                });
943            }
944            Some(job)
945        } else {
946            None
947        };
948
949    // Set up task storage in WORKING atomically, emitting SUBMITTED
950    // + WORKING events through the CAS-guarded atomic store so that
951    // subscribers / push delivery observe the lifecycle even on the
952    // blocking-send path. For continuations, append the incoming
953    // message and transition INPUT_REQUIRED/AUTH_REQUIRED → WORKING.
954    if is_continuation {
955        state
956            .task_storage
957            .append_message(tenant, &task_id, owner, message.clone())
958            .await
959            .map_err(A2aError::from)?;
960
961        let working_event = StreamEvent::StatusUpdate {
962            status_update: crate::streaming::StatusUpdatePayload {
963                task_id: task_id.clone(),
964                context_id: context_id.clone(),
965                status: serde_json::to_value(TaskStatus::new(TaskState::Working))
966                    .unwrap_or_default(),
967            },
968        };
969        state
970            .atomic_store
971            .update_task_status_with_events(
972                tenant,
973                &task_id,
974                owner,
975                TaskStatus::new(TaskState::Working),
976                vec![working_event],
977            )
978            .await
979            .map_err(A2aError::from)?;
980        state.event_broker.notify(&task_id).await;
981    } else {
982        let mut task =
983            Task::new(&task_id, TaskStatus::new(TaskState::Submitted)).with_context_id(&context_id);
984        task.append_message(message.clone());
985
986        let submitted_event = StreamEvent::StatusUpdate {
987            status_update: crate::streaming::StatusUpdatePayload {
988                task_id: task_id.clone(),
989                context_id: context_id.clone(),
990                status: serde_json::to_value(TaskStatus::new(TaskState::Submitted))
991                    .unwrap_or_default(),
992            },
993        };
994        state
995            .atomic_store
996            .create_task_with_events(tenant, owner, task, vec![submitted_event])
997            .await
998            .map_err(A2aError::from)?;
999        state.event_broker.notify(&task_id).await;
1000
1001        let working_event = StreamEvent::StatusUpdate {
1002            status_update: crate::streaming::StatusUpdatePayload {
1003                task_id: task_id.clone(),
1004                context_id: context_id.clone(),
1005                status: serde_json::to_value(TaskStatus::new(TaskState::Working))
1006                    .unwrap_or_default(),
1007            },
1008        };
1009        state
1010            .atomic_store
1011            .update_task_status_with_events(
1012                tenant,
1013                &task_id,
1014                owner,
1015                TaskStatus::new(TaskState::Working),
1016                vec![working_event],
1017            )
1018            .await
1019            .map_err(A2aError::from)?;
1020        state.event_broker.notify(&task_id).await;
1021    }
1022
1023    // Bug 2 steps 4-5: register any inline push config BEFORE
1024    // spawning the executor so a pre-terminal failure cannot leave a
1025    // live executor with no registered callback. URL was parsed up
1026    // front (step 1), so the remaining failure mode is storage.
1027    // Failure path: transition the just-created task to FAILED with an
1028    // agent Message carrying the reason text (pattern matches the
1029    // hard-timeout compensation at router.rs:1034-1054), then return
1030    // the original storage error. The executor is never spawned.
1031    if let Some(mut cfg) = inline_push_config {
1032        cfg.task_id = task_id.clone();
1033        match state.push_storage.create_config(tenant, cfg).await {
1034            Ok(_) => {}
1035            Err(e) => {
1036                let reason = format!("inline push notification config registration failed: {e}");
1037                tracing::warn!(
1038                    tenant = tenant,
1039                    owner = owner,
1040                    task_id = %task_id,
1041                    "ADR-017 Bug 2 compensation: transitioning task to FAILED \
1042                     after inline push config registration failure"
1043                );
1044                let reason_msg = Message::new(
1045                    uuid::Uuid::now_v7().to_string(),
1046                    turul_a2a_types::Role::Agent,
1047                    vec![turul_a2a_types::Part::text(reason)],
1048                );
1049                let failed_status = TaskStatus::new(TaskState::Failed).with_message(reason_msg);
1050                let failed_event = StreamEvent::StatusUpdate {
1051                    status_update: crate::streaming::StatusUpdatePayload {
1052                        task_id: task_id.clone(),
1053                        context_id: context_id.clone(),
1054                        status: serde_json::to_value(&failed_status).unwrap_or_default(),
1055                    },
1056                };
1057                // Best-effort compensation. If the FAILED transition
1058                // itself fails (backend unavailable, CAS race with a
1059                // concurrent writer), we still return the original
1060                // push-storage error — double-failure is flagged to
1061                // the caller via the original error, and the task
1062                // will be observable as Working until a supervisor
1063                // sweep or manual cleanup resolves it.
1064                if let Err(compensate_err) = state
1065                    .atomic_store
1066                    .update_task_status_with_events(
1067                        tenant,
1068                        &task_id,
1069                        owner,
1070                        failed_status,
1071                        vec![failed_event],
1072                    )
1073                    .await
1074                {
1075                    tracing::error!(
1076                        tenant = tenant,
1077                        owner = owner,
1078                        task_id = %task_id,
1079                        error = %compensate_err,
1080                        "ADR-017 Bug 2 compensation failed: task may remain \
1081                         in WORKING without a callback until supervisor sweep"
1082                    );
1083                }
1084                state.event_broker.notify(&task_id).await;
1085                return Err(A2aError::from(e));
1086            }
1087        }
1088    }
1089
1090    // §HTTP invocation step 7: if the durable executor path
1091    // is active, enqueue the (already-built + size-checked) job and
1092    // return the Working task. The executor is NOT spawned locally —
1093    // a separate invocation consumes the queue (SQS Lambda handler)
1094    // and runs the executor to terminal via
1095    // `run_executor_for_existing_task`.
1096    //
1097    // Enqueue failure → FAILED compensation in the same shape as
1098    // Bug 2 inline-push-config storage-failure compensation.
1099    if let (Some(queue), Some(job)) = (durable_queue.as_ref(), durable_job) {
1100        match queue.enqueue(job).await {
1101            Ok(()) => {
1102                let current = state
1103                    .task_storage
1104                    .get_task(tenant, &task_id, owner, history_length)
1105                    .await
1106                    .map_err(A2aError::from)?
1107                    .ok_or_else(|| A2aError::TaskNotFound {
1108                        task_id: task_id.clone(),
1109                    })?;
1110                return Ok(Json(serde_json::json!({
1111                    "task": serde_json::to_value(&current).unwrap_or_default()
1112                })));
1113            }
1114            Err(e) => {
1115                let reason = format!("durable executor enqueue failed: {e}");
1116                tracing::warn!(
1117                    tenant = tenant,
1118                    owner = owner,
1119                    task_id = %task_id,
1120                    queue_kind = queue.kind(),
1121                    "ADR-018 enqueue failed: transitioning task to FAILED"
1122                );
1123                let reason_msg = Message::new(
1124                    uuid::Uuid::now_v7().to_string(),
1125                    turul_a2a_types::Role::Agent,
1126                    vec![turul_a2a_types::Part::text(reason.clone())],
1127                );
1128                let failed_status = TaskStatus::new(TaskState::Failed).with_message(reason_msg);
1129                let failed_event = StreamEvent::StatusUpdate {
1130                    status_update: crate::streaming::StatusUpdatePayload {
1131                        task_id: task_id.clone(),
1132                        context_id: context_id.clone(),
1133                        status: serde_json::to_value(&failed_status).unwrap_or_default(),
1134                    },
1135                };
1136                if let Err(compensate_err) = state
1137                    .atomic_store
1138                    .update_task_status_with_events(
1139                        tenant,
1140                        &task_id,
1141                        owner,
1142                        failed_status,
1143                        vec![failed_event],
1144                    )
1145                    .await
1146                {
1147                    tracing::error!(
1148                        tenant = tenant,
1149                        owner = owner,
1150                        task_id = %task_id,
1151                        error = %compensate_err,
1152                        "ADR-018 FAILED-compensation itself failed: task may \
1153                         remain in WORKING until supervisor sweep"
1154                    );
1155                }
1156                state.event_broker.notify(&task_id).await;
1157                return Err(A2aError::Internal(reason));
1158            }
1159        }
1160    }
1161
1162    // Spawn the executor on a tracked handle with a live EventSink.
1163    let spawn_deps = crate::server::spawn::SpawnDeps {
1164        executor: state.executor.clone(),
1165        task_storage: state.task_storage.clone(),
1166        atomic_store: state.atomic_store.clone(),
1167        event_broker: state.event_broker.clone(),
1168        in_flight: state.in_flight.clone(),
1169        push_dispatcher: state.push_dispatcher.clone(),
1170    };
1171    let scope = crate::server::spawn::SpawnScope {
1172        tenant: tenant.to_string(),
1173        owner: owner.to_string(),
1174        task_id: task_id.clone(),
1175        context_id: context_id.clone(),
1176        message: message.clone(),
1177        claims: claims.clone(),
1178    };
1179    let spawn = crate::server::spawn::spawn_tracked_executor(spawn_deps, scope)?;
1180
1181    if return_immediately {
1182        // Non-blocking send (A2A spec §3.2.2, proto
1183        // SendMessageConfiguration.return_immediately=true): return the
1184        // task in its current state immediately. Drop yielded_rx — the
1185        // background executor keeps running; the caller polls / streams
1186        // to observe completion.
1187        //
1188        // Bug 3: `history_length` threaded through so the
1189        // storage-layer `trim_task` honours the proto tri-state
1190        // (a2a.proto:150-154).
1191        drop(spawn.yielded_rx);
1192        let current = state
1193            .task_storage
1194            .get_task(tenant, &task_id, owner, history_length)
1195            .await
1196            .map_err(A2aError::from)?
1197            .ok_or_else(|| A2aError::TaskNotFound {
1198                task_id: task_id.clone(),
1199            })?;
1200        return Ok(Json(serde_json::json!({
1201            "task": serde_json::to_value(&current).unwrap_or_default()
1202        })));
1203    }
1204
1205    // Blocking send: await yielded_rx with the two-deadline timeout.
1206    let task = await_yielded_with_two_deadlines(
1207        spawn.yielded_rx,
1208        spawn.cancellation,
1209        spawn.handle,
1210        &state,
1211        tenant,
1212        owner,
1213        &task_id,
1214        &context_id,
1215    )
1216    .await?;
1217
1218    // Bug 3: if the caller supplied `history_length`, re-read
1219    // from storage so the same `trim_task` path that `GetTask` uses
1220    // applies to the send response. Skipping this when the field is
1221    // unset avoids an unnecessary storage round-trip on the default
1222    // path. The task yielded from the executor is otherwise authoritative.
1223    let task = if history_length.is_some() {
1224        state
1225            .task_storage
1226            .get_task(tenant, &task_id, owner, history_length)
1227            .await
1228            .map_err(A2aError::from)?
1229            .unwrap_or(task)
1230    } else {
1231        task
1232    };
1233
1234    Ok(Json(serde_json::json!({
1235        "task": serde_json::to_value(&task).unwrap_or_default()
1236    })))
1237}
1238
1239/// Two-deadline blocking-send timeout.
1240///
1241/// 1. **Soft deadline** (`blocking_task_timeout`): wait for the
1242///    executor to emit a terminal or interrupted event. If the deadline
1243///    fires first, trip `cancellation` and enter the cooperative window.
1244/// 2. **Cooperative window** (through `timeout_abort_grace`): the
1245///    cancellation token is already set; a well-behaved executor
1246///    observes it and emits `sink.cancelled(...)` (or another terminal).
1247/// 3. **Hard deadline** (`soft + grace`): force-commit `FAILED` via
1248///    `update_task_status_with_events` — CAS-guarded. On
1249///    `TerminalStateAlreadySet` the executor won the race at the last
1250///    moment; re-read and return the persisted terminal. On success,
1251///    abort the spawned JoinHandle as best-effort cleanup.
1252///
1253/// Returns the Task the caller should hand back to the client.
1254#[allow(clippy::too_many_arguments)] // router glue: all args are request-derived
1255async fn await_yielded_with_two_deadlines(
1256    mut yielded_rx: tokio::sync::oneshot::Receiver<Task>,
1257    cancellation: tokio_util::sync::CancellationToken,
1258    handle: Arc<crate::server::in_flight::InFlightHandle>,
1259    state: &AppState,
1260    tenant: &str,
1261    owner: &str,
1262    task_id: &str,
1263    context_id: &str,
1264) -> Result<Task, A2aError> {
1265    let soft = tokio::time::Instant::now() + state.runtime_config.blocking_task_timeout;
1266    let hard = soft + state.runtime_config.timeout_abort_grace;
1267
1268    // Soft wait: wait for yielded OR the soft deadline.
1269    let soft_outcome = tokio::select! {
1270        result = &mut yielded_rx => YieldedOutcome::Yielded(result.ok()),
1271        _ = tokio::time::sleep_until(soft) => YieldedOutcome::SoftTimeout,
1272    };
1273
1274    if let YieldedOutcome::Yielded(Some(task)) = soft_outcome {
1275        return Ok(task);
1276    }
1277    if let YieldedOutcome::Yielded(None) = soft_outcome {
1278        // The yielded sender was dropped without firing — the
1279        // supervisor sentinel ran its cleanup before any terminal
1280        // landed. Surface this as Internal so the client sees a clear
1281        // failure rather than hanging.
1282        return Err(A2aError::Internal(
1283            "executor exited without emitting a terminal or interrupted event".into(),
1284        ));
1285    }
1286
1287    // Cooperative wait: soft deadline expired. Trip the cancellation
1288    // token so the executor observes cancellation, and wait for the
1289    // cooperative window to expire (or for yielded to fire from the
1290    // executor's own terminal emit).
1291    cancellation.cancel();
1292
1293    let cooperative_outcome = tokio::select! {
1294        result = &mut yielded_rx => YieldedOutcome::Yielded(result.ok()),
1295        _ = tokio::time::sleep_until(hard) => YieldedOutcome::HardTimeout,
1296    };
1297
1298    if let YieldedOutcome::Yielded(Some(task)) = cooperative_outcome {
1299        return Ok(task);
1300    }
1301    // Yielded-None during the cooperative wait also falls through to the
1302    // hard timeout.
1303
1304    // Hard timeout: force-commit FAILED via CAS.
1305    let reason_msg = Message::new(
1306        uuid::Uuid::now_v7().to_string(),
1307        turul_a2a_types::Role::Agent,
1308        vec![turul_a2a_types::Part::text(
1309            "task timed out: hard deadline exceeded without terminal emission",
1310        )],
1311    );
1312    let failed_status = TaskStatus::new(TaskState::Failed).with_message(reason_msg);
1313    let failed_event = StreamEvent::StatusUpdate {
1314        status_update: crate::streaming::StatusUpdatePayload {
1315            task_id: task_id.to_string(),
1316            context_id: context_id.to_string(),
1317            status: serde_json::to_value(&failed_status).unwrap_or_default(),
1318        },
1319    };
1320
1321    let failed_event_for_dispatch = failed_event.clone();
1322    let result = state
1323        .atomic_store
1324        .update_task_status_with_events(tenant, task_id, owner, failed_status, vec![failed_event])
1325        .await;
1326
1327    match result {
1328        Ok((task, seqs)) => {
1329            // Abort the spawned executor. The cloneable AbortHandle
1330            // is independent of the supervisor's JoinHandle
1331            // ownership, so this works whether or not the supervisor
1332            // has already taken the JoinHandle to `.await` it. The
1333            // supervisor continues to track the task for registry
1334            // cleanup — its `.await` returns `JoinError::Cancelled`
1335            // once the abort propagates at the next executor yield
1336            // point.
1337            handle.abort();
1338            // framework-committed terminals fan out
1339            // to push configs identically to executor-emitted ones.
1340            // Hard-timeout FAILED lands here.
1341            if let Some(dispatcher) = &state.push_dispatcher {
1342                let seq = seqs.first().copied().unwrap_or(0);
1343                dispatcher.dispatch(
1344                    tenant.to_string(),
1345                    owner.to_string(),
1346                    task.clone(),
1347                    vec![(seq, failed_event_for_dispatch)],
1348                );
1349            }
1350            state.event_broker.notify(task_id).await;
1351            Ok(task)
1352        }
1353        Err(crate::storage::A2aStorageError::TerminalStateAlreadySet { .. }) => {
1354            // Executor emitted its own terminal while we were
1355            // racing the hard deadline. Return the actually-persisted
1356            // terminal.
1357            state.event_broker.notify(task_id).await;
1358            let persisted = state
1359                .task_storage
1360                .get_task(tenant, task_id, owner, None)
1361                .await
1362                .map_err(A2aError::from)?
1363                .ok_or_else(|| A2aError::TaskNotFound {
1364                    task_id: task_id.to_string(),
1365                })?;
1366            Ok(persisted)
1367        }
1368        Err(e) => Err(A2aError::from(e)),
1369    }
1370}
1371
1372#[allow(clippy::large_enum_variant)] // Option<Task> is ~hundreds of bytes; the enum is short-lived on the stack of one function
1373enum YieldedOutcome {
1374    Yielded(Option<Task>),
1375    SoftTimeout,
1376    HardTimeout,
1377}
1378
1379pub(crate) async fn core_list_tasks(
1380    state: AppState,
1381    tenant: &str,
1382    owner: &str,
1383    query: &ListTasksQuery,
1384) -> Result<Json<serde_json::Value>, A2aError> {
1385    let status = match &query.status {
1386        Some(s) => Some(parse_task_state(s).ok_or_else(|| A2aError::InvalidRequest {
1387            message: format!("Invalid status value: {s}"),
1388        })?),
1389        None => None,
1390    };
1391
1392    let filter = TaskFilter {
1393        tenant: Some(tenant.to_string()),
1394        owner: Some(owner.to_string()),
1395        context_id: query.context_id.clone(),
1396        status,
1397        page_size: query.page_size,
1398        page_token: query.page_token.clone(),
1399        history_length: query.history_length,
1400        include_artifacts: query.include_artifacts,
1401        ..Default::default()
1402    };
1403
1404    let page = state
1405        .task_storage
1406        .list_tasks(filter)
1407        .await
1408        .map_err(A2aError::from)?;
1409    Ok(Json(list_page_to_json(&page)))
1410}
1411
1412fn list_page_to_json(page: &TaskListPage) -> serde_json::Value {
1413    let tasks: Vec<serde_json::Value> = page
1414        .tasks
1415        .iter()
1416        .map(|t| serde_json::to_value(t).unwrap_or_default())
1417        .collect();
1418    serde_json::json!({
1419        "tasks": tasks,
1420        "nextPageToken": page.next_page_token,
1421        "pageSize": page.page_size,
1422        "totalSize": page.total_size,
1423    })
1424}
1425
1426pub(crate) async fn core_get_task(
1427    state: AppState,
1428    tenant: &str,
1429    owner: &str,
1430    task_id: &str,
1431    history_length: Option<i32>,
1432) -> Result<Json<serde_json::Value>, A2aError> {
1433    let task = state
1434        .task_storage
1435        .get_task(tenant, task_id, owner, history_length)
1436        .await
1437        .map_err(A2aError::from)?
1438        .ok_or_else(|| A2aError::TaskNotFound {
1439            task_id: task_id.to_string(),
1440        })?;
1441
1442    Ok(Json(serde_json::to_value(&task).unwrap_or_default()))
1443}
1444
1445/// `CancelTask` handler.
1446///
1447/// Sequence:
1448/// 1. Validate existence + ownership (via owner-scoped `get_task`).
1449/// 2. Reject with 409 if task is already terminal.
1450/// 3. Write the cancel marker (owner-scoped, idempotent).
1451/// 4. Trip the local in-flight cancellation token if present.
1452/// 5. Wait up to `cancel_handler_grace`, polling task state every
1453///    `cancel_handler_poll_interval`. On observed terminal, return that
1454///    persisted task snapshot (cooperative cancel won the race).
1455/// 6. On grace expiry, force-commit `CANCELED` via the atomic store
1456///    (CAS-guarded via the atomic store's terminal-preservation
1457///    contract). On success, return CANCELED. On
1458///    `TerminalStateAlreadySet`, re-read and return the actual persisted
1459///    terminal (another path won at the CAS layer).
1460///
1461/// Framework-committed terminals carry `message = None` —
1462/// the plain `TaskStatus::new(TaskState::Canceled)` constructor produces
1463/// exactly that.
1464#[doc(hidden)]
1465pub async fn core_cancel_task(
1466    state: AppState,
1467    tenant: &str,
1468    owner: &str,
1469    task_id: &str,
1470) -> Result<Json<serde_json::Value>, A2aError> {
1471    // Step 1: validate existence + ownership. `get_task` is owner-scoped —
1472    // wrong owner returns None (TaskNotFound) as anti-enumeration.
1473    let initial_task = state
1474        .task_storage
1475        .get_task(tenant, task_id, owner, Some(0))
1476        .await
1477        .map_err(A2aError::from)?
1478        .ok_or_else(|| A2aError::TaskNotFound {
1479            task_id: task_id.to_string(),
1480        })?;
1481
1482    // Step 2: reject terminal tasks up-front with 409.
1483    if let Some(status) = initial_task.status() {
1484        if let Ok(s) = status.state() {
1485            if turul_a2a_types::state_machine::is_terminal(s) {
1486                return Err(A2aError::TaskNotCancelable {
1487                    task_id: task_id.to_string(),
1488                });
1489            }
1490        }
1491    }
1492
1493    let context_id = initial_task.context_id().to_string();
1494
1495    // Step 3: write the cancel-requested marker. Idempotent; errors map
1496    // to the usual wire responses.
1497    match state
1498        .task_storage
1499        .set_cancel_requested(tenant, task_id, owner)
1500        .await
1501    {
1502        Ok(()) => {}
1503        Err(A2aStorageError::TaskNotFound(_)) => {
1504            return Err(A2aError::TaskNotFound {
1505                task_id: task_id.to_string(),
1506            });
1507        }
1508        Err(A2aStorageError::TerminalState(_))
1509        | Err(A2aStorageError::InvalidTransition { .. })
1510        | Err(A2aStorageError::TerminalStateAlreadySet { .. }) => {
1511            return Err(A2aError::TaskNotCancelable {
1512                task_id: task_id.to_string(),
1513            });
1514        }
1515        Err(other) => return Err(A2aError::from(other)),
1516    }
1517
1518    // Step 4: fast-path token trip if this instance owns the in-flight
1519    // executor for this task. Cross-instance cases rely on the supervisor
1520    // poll loop (see `server::in_flight::run_cross_instance_cancel_poller`).
1521    let in_flight_key = (tenant.to_string(), task_id.to_string());
1522    if let Some(handle) = state.in_flight.get(&in_flight_key) {
1523        handle.cancellation.cancel();
1524    }
1525
1526    // Step 5: grace-wait with poll. Return early if the task reaches
1527    // a terminal state via the executor's cooperative response.
1528    let deadline = tokio::time::Instant::now() + state.runtime_config.cancel_handler_grace;
1529    let poll_interval = state.runtime_config.cancel_handler_poll_interval;
1530    loop {
1531        let now = tokio::time::Instant::now();
1532        if now >= deadline {
1533            break;
1534        }
1535
1536        match state
1537            .task_storage
1538            .get_task(tenant, task_id, owner, Some(0))
1539            .await
1540            .map_err(A2aError::from)?
1541        {
1542            Some(current) => {
1543                if let Some(status) = current.status() {
1544                    if let Ok(s) = status.state() {
1545                        if turul_a2a_types::state_machine::is_terminal(s) {
1546                            // Cooperative terminal (or another path) resolved
1547                            // the cancel during grace. Return persisted state.
1548                            state.event_broker.notify(task_id).await;
1549                            return Ok(Json(serde_json::to_value(&current).unwrap_or_default()));
1550                        }
1551                    }
1552                }
1553            }
1554            None => {
1555                return Err(A2aError::TaskNotFound {
1556                    task_id: task_id.to_string(),
1557                });
1558            }
1559        }
1560
1561        let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
1562        if remaining.is_zero() {
1563            break;
1564        }
1565        let sleep_for = std::cmp::min(poll_interval, remaining);
1566        tokio::time::sleep(sleep_for).await;
1567    }
1568
1569    // Step 6: grace expired. Force-commit CANCELED via atomic store.
1570    // Per ADR-012 §8, framework-committed terminals use `message = None`
1571    // so history / SSE consumers can distinguish from executor-authored
1572    // cancels without conflating framework telemetry with agent output.
1573    let cancel_event = StreamEvent::StatusUpdate {
1574        status_update: crate::streaming::StatusUpdatePayload {
1575            task_id: task_id.to_string(),
1576            context_id,
1577            status: serde_json::to_value(TaskStatus::new(TaskState::Canceled)).unwrap_or_default(),
1578        },
1579    };
1580    let cancel_event_for_dispatch = cancel_event.clone();
1581
1582    let result = state
1583        .atomic_store
1584        .update_task_status_with_events(
1585            tenant,
1586            task_id,
1587            owner,
1588            TaskStatus::new(TaskState::Canceled),
1589            vec![cancel_event],
1590        )
1591        .await;
1592
1593    match result {
1594        Ok((task, seqs)) => {
1595            // framework-committed CANCELED must
1596            // trigger push delivery exactly like an executor-emitted
1597            // cancel. The :cancel grace-expiry path reaches this
1598            // arm after the executor failed to react in time.
1599            if let Some(dispatcher) = &state.push_dispatcher {
1600                let seq = seqs.first().copied().unwrap_or(0);
1601                dispatcher.dispatch(
1602                    tenant.to_string(),
1603                    owner.to_string(),
1604                    task.clone(),
1605                    vec![(seq, cancel_event_for_dispatch)],
1606                );
1607            }
1608            state.event_broker.notify(task_id).await;
1609            Ok(Json(serde_json::to_value(&task).unwrap_or_default()))
1610        }
1611        Err(A2aStorageError::TerminalStateAlreadySet { .. }) => {
1612            // Race resolved at the atomic-store CAS: another writer
1613            // (executor emitting its own terminal, or a concurrent
1614            // CancelTask from another instance) committed first. Re-read
1615            // and return the actual persisted terminal.
1616            let persisted = state
1617                .task_storage
1618                .get_task(tenant, task_id, owner, None)
1619                .await
1620                .map_err(A2aError::from)?
1621                .ok_or_else(|| A2aError::TaskNotFound {
1622                    task_id: task_id.to_string(),
1623                })?;
1624            state.event_broker.notify(task_id).await;
1625            Ok(Json(serde_json::to_value(&persisted).unwrap_or_default()))
1626        }
1627        Err(A2aStorageError::TaskNotFound(id)) => Err(A2aError::TaskNotFound { task_id: id }),
1628        Err(A2aStorageError::TerminalState(_)) | Err(A2aStorageError::InvalidTransition { .. }) => {
1629            Err(A2aError::TaskNotCancelable {
1630                task_id: task_id.to_string(),
1631            })
1632        }
1633        Err(other) => Err(A2aError::from(other)),
1634    }
1635}
1636
1637/// Verify the caller owns the task before push config operations.
1638async fn verify_task_ownership(
1639    state: &AppState,
1640    tenant: &str,
1641    owner: &str,
1642    task_id: &str,
1643) -> Result<(), A2aError> {
1644    state
1645        .task_storage
1646        .get_task(tenant, task_id, owner, Some(0))
1647        .await
1648        .map_err(A2aError::from)?
1649        .ok_or_else(|| A2aError::TaskNotFound {
1650            task_id: task_id.to_string(),
1651        })?;
1652    Ok(())
1653}
1654
1655pub(crate) async fn core_create_push_config(
1656    state: AppState,
1657    tenant: &str,
1658    owner: &str,
1659    task_id: &str,
1660    body: String,
1661) -> Result<Json<serde_json::Value>, A2aError> {
1662    verify_task_ownership(&state, tenant, owner, task_id).await?;
1663
1664    let mut config: turul_a2a_proto::TaskPushNotificationConfig = serde_json::from_str(&body)
1665        .map_err(|e| A2aError::InvalidRequest {
1666            message: format!("Invalid push config: {e}"),
1667        })?;
1668    config.task_id = task_id.to_string();
1669
1670    // §R1: URL must parse at CRUD time. The dispatcher used
1671    // to silently skip unparseable URLs with no failed-delivery row,
1672    // so an operator who mistyped the webhook had no feedback loop.
1673    // Scheme (http/https) and SSRF checks still happen at delivery
1674    // time — those depend on runtime flags (`allow_insecure_push_urls`)
1675    // the CRUD call does not see.
1676    if config.url.is_empty() {
1677        return Err(A2aError::InvalidRequest {
1678            message: "push config url is required".into(),
1679        });
1680    }
1681    if let Err(e) = url::Url::parse(&config.url) {
1682        return Err(A2aError::InvalidRequest {
1683            message: format!("push config url is not a valid URL: {e}"),
1684        });
1685    }
1686
1687    let created = state
1688        .push_storage
1689        .create_config(tenant, config)
1690        .await
1691        .map_err(A2aError::from)?;
1692    Ok(Json(serde_json::to_value(&created).unwrap_or_default()))
1693}
1694
1695pub(crate) async fn core_list_push_configs(
1696    state: AppState,
1697    tenant: &str,
1698    owner: &str,
1699    task_id: &str,
1700    query: &PushConfigQuery,
1701) -> Result<Json<serde_json::Value>, A2aError> {
1702    verify_task_ownership(&state, tenant, owner, task_id).await?;
1703
1704    let page = state
1705        .push_storage
1706        .list_configs(
1707            tenant,
1708            task_id,
1709            query.page_token.as_deref(),
1710            query.page_size,
1711        )
1712        .await
1713        .map_err(A2aError::from)?;
1714
1715    let configs: Vec<serde_json::Value> = page
1716        .configs
1717        .iter()
1718        .map(|c| serde_json::to_value(c).unwrap_or_default())
1719        .collect();
1720
1721    Ok(Json(serde_json::json!({
1722        "configs": configs,
1723        "nextPageToken": page.next_page_token,
1724    })))
1725}
1726
1727pub(crate) async fn core_get_push_config(
1728    state: AppState,
1729    tenant: &str,
1730    owner: &str,
1731    task_id: &str,
1732    config_id: &str,
1733) -> Result<Json<serde_json::Value>, A2aError> {
1734    verify_task_ownership(&state, tenant, owner, task_id).await?;
1735
1736    let config = state
1737        .push_storage
1738        .get_config(tenant, task_id, config_id)
1739        .await
1740        .map_err(A2aError::from)?
1741        .ok_or_else(|| A2aError::TaskNotFound {
1742            task_id: format!("push config {config_id} for task {task_id}"),
1743        })?;
1744
1745    Ok(Json(serde_json::to_value(&config).unwrap_or_default()))
1746}
1747
1748pub(crate) async fn core_delete_push_config(
1749    state: AppState,
1750    tenant: &str,
1751    owner: &str,
1752    task_id: &str,
1753    config_id: &str,
1754) -> Result<Json<serde_json::Value>, A2aError> {
1755    verify_task_ownership(&state, tenant, owner, task_id).await?;
1756
1757    state
1758        .push_storage
1759        .delete_config(tenant, task_id, config_id)
1760        .await
1761        .map_err(A2aError::from)?;
1762
1763    Ok(Json(serde_json::json!({})))
1764}
1765
1766// IntoResponse for A2aError — returns AIP-193 HTTP error body
1767impl IntoResponse for A2aError {
1768    fn into_response(self) -> axum::response::Response {
1769        let status = axum::http::StatusCode::from_u16(self.http_status())
1770            .unwrap_or(axum::http::StatusCode::INTERNAL_SERVER_ERROR);
1771        let body = self.to_http_error_body();
1772        (status, Json(body)).into_response()
1773    }
1774}
1775
1776// =========================================================
1777// Path parsing tests
1778// =========================================================
1779
1780#[cfg(test)]
1781mod tests {
1782    use super::*;
1783
1784    #[test]
1785    fn parse_get_task() {
1786        assert_eq!(
1787            parse_task_path("abc-123"),
1788            Some(TaskAction::GetTask("abc-123".into()))
1789        );
1790        assert_eq!(
1791            parse_task_path("/abc-123"),
1792            Some(TaskAction::GetTask("abc-123".into()))
1793        );
1794    }
1795
1796    #[test]
1797    fn parse_cancel_task() {
1798        assert_eq!(
1799            parse_task_path("abc-123:cancel"),
1800            Some(TaskAction::CancelTask("abc-123".into()))
1801        );
1802    }
1803
1804    #[test]
1805    fn parse_subscribe_to_task() {
1806        assert_eq!(
1807            parse_task_path("abc-123:subscribe"),
1808            Some(TaskAction::SubscribeToTask("abc-123".into()))
1809        );
1810    }
1811
1812    #[test]
1813    fn parse_push_config_collection() {
1814        // The collection path parses as CreatePushConfig; HTTP method disambiguates GET=list vs POST=create
1815        assert_eq!(
1816            parse_task_path("task-1/pushNotificationConfigs"),
1817            Some(TaskAction::PushConfigCollection("task-1".into()))
1818        );
1819    }
1820
1821    #[test]
1822    fn parse_push_config_item() {
1823        // The item path parses as DeletePushConfig; HTTP method disambiguates GET=get vs DELETE=delete
1824        assert_eq!(
1825            parse_task_path("task-1/pushNotificationConfigs/cfg-1"),
1826            Some(TaskAction::PushConfigItem("task-1".into(), "cfg-1".into()))
1827        );
1828    }
1829
1830    #[test]
1831    fn parse_invalid_path() {
1832        assert_eq!(parse_task_path("a/b/c/d"), None);
1833    }
1834
1835    // v0.3 compat tests live in compat_v03.rs
1836}