1use std::sync::Arc;
7use std::time::Duration;
8
9use axum::extract::{Path, Query, State};
10use axum::response::IntoResponse;
11use axum::response::sse::{Event, KeepAlive, Sse};
12use axum::routing::{get, post};
13use axum::{Json, Router};
14use tokio::sync::broadcast;
15use tokio_stream::wrappers::ReceiverStream;
16
17use crate::error::A2aError;
18use crate::executor::AgentExecutor;
19use crate::storage::{
20 A2aAtomicStore, A2aEventStore, A2aPushNotificationStorage, A2aStorageError, A2aTaskStorage,
21 TaskFilter, TaskListPage,
22};
23use crate::streaming::{StreamEvent, replay};
24use turul_a2a_types::{Message, Task, TaskState, TaskStatus};
25
26#[derive(Clone)]
28pub struct AppState {
29 pub executor: Arc<dyn AgentExecutor>,
30 pub task_storage: Arc<dyn A2aTaskStorage>,
31 pub push_storage: Arc<dyn A2aPushNotificationStorage>,
32 pub event_store: Arc<dyn crate::storage::A2aEventStore>,
33 pub atomic_store: Arc<dyn A2aAtomicStore>,
34 pub event_broker: crate::streaming::TaskEventBroker,
35 pub middleware_stack: Arc<crate::middleware::MiddlewareStack>,
36 pub runtime_config: crate::server::RuntimeConfig,
45
46 pub in_flight: Arc<crate::server::in_flight::InFlightRegistry>,
53
54 pub cancellation_supervisor: Arc<dyn crate::storage::A2aCancellationSupervisor>,
59
60 pub push_delivery_store: Option<Arc<dyn crate::push::A2aPushDeliveryStore>>,
66
67 pub push_dispatcher: Option<Arc<crate::push::PushDispatcher>>,
75
76 pub durable_executor_queue: Option<Arc<dyn crate::durable_executor::DurableExecutorQueue>>,
88}
89
90pub fn build_router(state: AppState) -> Router {
92 let router = Router::new()
93 .route("/.well-known/agent-card.json", get(agent_card_handler))
95 .route("/extendedAgentCard", get(extended_agent_card_handler))
96 .route("/message:send", post(send_message_handler))
98 .route("/message:stream", post(send_streaming_message_handler))
99 .route("/tasks", get(list_tasks_handler))
101 .route(
103 "/tasks/{*rest}",
104 get(task_get_dispatch)
105 .post(task_post_dispatch)
106 .delete(task_delete_dispatch),
107 )
108 .route("/{tenant}/message:send", post(tenant_send_message_handler))
110 .route(
111 "/{tenant}/message:stream",
112 post(tenant_send_streaming_message_handler),
113 )
114 .route("/{tenant}/tasks", get(tenant_list_tasks_handler))
115 .route(
116 "/{tenant}/extendedAgentCard",
117 get(extended_agent_card_handler),
118 )
119 .route(
120 "/{tenant}/tasks/{*rest}",
121 get(tenant_task_get_dispatch)
122 .post(tenant_task_post_dispatch)
123 .delete(tenant_task_delete_dispatch),
124 );
125
126 let router = router.route("/jsonrpc", post(crate::jsonrpc::jsonrpc_dispatch_handler));
128
129 #[cfg(feature = "compat-v03")]
133 let router = router.route("/", post(crate::jsonrpc::jsonrpc_dispatch_handler));
134
135 let auth_layer = crate::middleware::AuthLayer::new(state.middleware_stack.clone());
137 let transport_layer = crate::middleware::transport::TransportComplianceLayer;
139 router
140 .with_state(state)
141 .layer(auth_layer)
142 .layer(transport_layer)
143}
144
145#[derive(Debug, PartialEq)]
155enum TaskAction {
156 GetTask(String),
158 CancelTask(String),
160 SubscribeToTask(String),
162 PushConfigCollection(String),
164 PushConfigItem(String, String),
166}
167
168fn parse_task_path(rest: &str) -> Option<TaskAction> {
169 let rest = rest.strip_prefix('/').unwrap_or(rest);
170 let parts: Vec<&str> = rest.split('/').collect();
171
172 match parts.as_slice() {
173 [segment] => {
175 if let Some(id) = segment.strip_suffix(":cancel") {
176 Some(TaskAction::CancelTask(id.to_string()))
177 } else if let Some(id) = segment.strip_suffix(":subscribe") {
178 Some(TaskAction::SubscribeToTask(id.to_string()))
179 } else {
180 Some(TaskAction::GetTask(segment.to_string()))
181 }
182 }
183 [task_id, "pushNotificationConfigs"] => {
185 Some(TaskAction::PushConfigCollection(task_id.to_string()))
186 }
187 [task_id, "pushNotificationConfigs", config_id] => Some(TaskAction::PushConfigItem(
189 task_id.to_string(),
190 config_id.to_string(),
191 )),
192 _ => None,
193 }
194}
195
196async fn task_get_dispatch(
201 State(state): State<AppState>,
202 axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
203 headers: axum::http::HeaderMap,
204 Path(rest): Path<String>,
205 Query(query): Query<TaskGetCombinedQuery>,
206) -> Result<axum::response::Response, A2aError> {
207 let last_event_id = headers
208 .get("Last-Event-ID")
209 .or_else(|| headers.get("last-event-id"))
210 .and_then(|v| v.to_str().ok())
211 .map(String::from);
212 dispatch_task_get(
213 state,
214 DEFAULT_TENANT,
215 ctx.identity.owner(),
216 &rest,
217 &query,
218 last_event_id.as_deref(),
219 )
220 .await
221}
222
223async fn task_post_dispatch(
224 State(state): State<AppState>,
225 axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
226 Path(rest): Path<String>,
227 body: String,
228) -> Result<axum::response::Response, A2aError> {
229 dispatch_task_post(state, DEFAULT_TENANT, ctx.identity.owner(), &rest, body).await
230}
231
232async fn task_delete_dispatch(
233 State(state): State<AppState>,
234 axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
235 Path(rest): Path<String>,
236) -> Result<axum::response::Response, A2aError> {
237 dispatch_task_delete(state, DEFAULT_TENANT, ctx.identity.owner(), &rest).await
238}
239
240async fn tenant_send_message_handler(
245 State(state): State<AppState>,
246 axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
247 Path(tenant): Path<String>,
248 body: String,
249) -> Result<Json<serde_json::Value>, A2aError> {
250 let claims = ctx.identity.claims().cloned();
251 core_send_message(state, &tenant, ctx.identity.owner(), claims, body).await
252}
253
254async fn tenant_list_tasks_handler(
255 State(state): State<AppState>,
256 axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
257 Path(tenant): Path<String>,
258 Query(query): Query<ListTasksQuery>,
259) -> Result<Json<serde_json::Value>, A2aError> {
260 core_list_tasks(state, &tenant, ctx.identity.owner(), &query).await
261}
262
263async fn tenant_task_get_dispatch(
264 State(state): State<AppState>,
265 axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
266 headers: axum::http::HeaderMap,
267 Path((tenant, rest)): Path<(String, String)>,
268 Query(query): Query<TaskGetCombinedQuery>,
269) -> Result<axum::response::Response, A2aError> {
270 let last_event_id = headers
271 .get("Last-Event-ID")
272 .or_else(|| headers.get("last-event-id"))
273 .and_then(|v| v.to_str().ok())
274 .map(String::from);
275 dispatch_task_get(
276 state,
277 &tenant,
278 ctx.identity.owner(),
279 &rest,
280 &query,
281 last_event_id.as_deref(),
282 )
283 .await
284}
285
286async fn tenant_task_post_dispatch(
287 State(state): State<AppState>,
288 axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
289 Path((tenant, rest)): Path<(String, String)>,
290 body: String,
291) -> Result<axum::response::Response, A2aError> {
292 dispatch_task_post(state, &tenant, ctx.identity.owner(), &rest, body).await
293}
294
295async fn tenant_task_delete_dispatch(
296 State(state): State<AppState>,
297 axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
298 Path((tenant, rest)): Path<(String, String)>,
299) -> Result<axum::response::Response, A2aError> {
300 dispatch_task_delete(state, &tenant, ctx.identity.owner(), &rest).await
301}
302
303#[derive(serde::Deserialize, Default)]
310#[serde(default)]
311struct TaskGetCombinedQuery {
312 #[serde(rename = "historyLength")]
313 history_length: Option<i32>,
314 #[serde(rename = "pageSize")]
315 page_size: Option<i32>,
316 #[serde(rename = "pageToken")]
317 page_token: Option<String>,
318}
319
320async fn dispatch_task_get(
321 state: AppState,
322 tenant: &str,
323 owner: &str,
324 rest: &str,
325 query: &TaskGetCombinedQuery,
326 last_event_id: Option<&str>,
327) -> Result<axum::response::Response, A2aError> {
328 match parse_task_path(rest) {
329 Some(TaskAction::GetTask(id)) => {
330 let Json(v) = core_get_task(state, tenant, owner, &id, query.history_length).await?;
331 Ok(Json(v).into_response())
332 }
333 Some(TaskAction::SubscribeToTask(id)) => {
334 core_subscribe_to_task(state, tenant, owner, &id, last_event_id).await
335 }
336 Some(TaskAction::PushConfigCollection(task_id)) => {
337 let pq = PushConfigQuery {
338 page_size: query.page_size,
339 page_token: query.page_token.clone(),
340 };
341 let Json(v) = core_list_push_configs(state, tenant, owner, &task_id, &pq).await?;
342 Ok(Json(v).into_response())
343 }
344 Some(TaskAction::PushConfigItem(task_id, config_id)) => {
345 let Json(v) = core_get_push_config(state, tenant, owner, &task_id, &config_id).await?;
346 Ok(Json(v).into_response())
347 }
348 _ => Err(A2aError::InvalidRequest {
349 message: "Invalid task path".into(),
350 }),
351 }
352}
353
354async fn dispatch_task_post(
355 state: AppState,
356 tenant: &str,
357 owner: &str,
358 rest: &str,
359 body: String,
360) -> Result<axum::response::Response, A2aError> {
361 match parse_task_path(rest) {
362 Some(TaskAction::CancelTask(id)) => {
363 let Json(v) = core_cancel_task(state, tenant, owner, &id).await?;
364 Ok(Json(v).into_response())
365 }
366 Some(TaskAction::PushConfigCollection(task_id)) => {
367 let Json(v) = core_create_push_config(state, tenant, owner, &task_id, body).await?;
368 Ok(Json(v).into_response())
369 }
370 _ => Err(A2aError::InvalidRequest {
371 message: "Invalid task path".into(),
372 }),
373 }
374}
375
376async fn dispatch_task_delete(
377 state: AppState,
378 tenant: &str,
379 owner: &str,
380 rest: &str,
381) -> Result<axum::response::Response, A2aError> {
382 match parse_task_path(rest) {
383 Some(TaskAction::PushConfigItem(task_id, config_id)) => {
384 let Json(v) =
385 core_delete_push_config(state, tenant, owner, &task_id, &config_id).await?;
386 Ok(Json(v).into_response())
387 }
388 _ => Err(A2aError::InvalidRequest {
389 message: "Invalid task path".into(),
390 }),
391 }
392}
393
394async fn agent_card_handler(State(state): State<AppState>) -> impl IntoResponse {
399 let card = serde_json::to_value(state.executor.agent_card()).unwrap_or_default();
400 #[cfg(feature = "compat-v03")]
401 let card = crate::compat_v03::inject_agent_card_compat(card);
402 Json(card)
403}
404
405async fn extended_agent_card_handler(
406 State(state): State<AppState>,
407) -> Result<impl IntoResponse, A2aError> {
408 match state.executor.extended_agent_card(None) {
409 Some(extended) => {
410 let card = serde_json::to_value(extended).unwrap_or_default();
411 #[cfg(feature = "compat-v03")]
412 let card = crate::compat_v03::inject_agent_card_compat(card);
413 Ok(Json(card))
414 }
415 None => Err(A2aError::ExtendedAgentCardNotConfigured),
416 }
417}
418
419async fn send_streaming_message_handler(
420 State(state): State<AppState>,
421 axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
422 body: String,
423) -> Result<axum::response::Response, A2aError> {
424 let claims = ctx.identity.claims().cloned();
425 core_send_streaming_message(state, DEFAULT_TENANT, ctx.identity.owner(), claims, body).await
426}
427
428async fn tenant_send_streaming_message_handler(
429 State(state): State<AppState>,
430 axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
431 Path(tenant): Path<String>,
432 body: String,
433) -> Result<axum::response::Response, A2aError> {
434 let claims = ctx.identity.claims().cloned();
435 core_send_streaming_message(state, &tenant, ctx.identity.owner(), claims, body).await
436}
437
438pub(crate) async fn setup_streaming_send(
447 state: AppState,
448 tenant: &str,
449 owner: &str,
450 claims: Option<serde_json::Value>,
451 body: String,
452) -> Result<(String, broadcast::Receiver<()>), A2aError> {
453 let request: turul_a2a_proto::SendMessageRequest =
454 serde_json::from_str(&body).map_err(|e| A2aError::InvalidRequest {
455 message: format!("Invalid request body: {e}"),
456 })?;
457
458 let proto_message = request.message.ok_or(A2aError::InvalidRequest {
459 message: "message field is required".into(),
460 })?;
461
462 let message = Message::try_from(proto_message).map_err(|e| A2aError::InvalidRequest {
463 message: format!("Invalid message: {e}"),
464 })?;
465
466 let task_id = uuid::Uuid::now_v7().to_string();
467 let context_id = if message.as_proto().context_id.is_empty() {
468 uuid::Uuid::now_v7().to_string()
469 } else {
470 message.as_proto().context_id.clone()
471 };
472
473 let wake_rx = state.event_broker.subscribe(&task_id).await;
475
476 let mut task =
478 Task::new(&task_id, TaskStatus::new(TaskState::Submitted)).with_context_id(&context_id);
479 task.append_message(message.clone());
480
481 let submitted_event = StreamEvent::StatusUpdate {
482 status_update: crate::streaming::StatusUpdatePayload {
483 task_id: task_id.clone(),
484 context_id: context_id.clone(),
485 status: serde_json::to_value(TaskStatus::new(TaskState::Submitted)).unwrap_or_default(),
486 },
487 };
488
489 state
490 .atomic_store
491 .create_task_with_events(tenant, owner, task, vec![submitted_event])
492 .await
493 .map_err(A2aError::from)?;
494
495 state.event_broker.notify(&task_id).await;
496
497 let working_event = StreamEvent::StatusUpdate {
501 status_update: crate::streaming::StatusUpdatePayload {
502 task_id: task_id.clone(),
503 context_id: context_id.clone(),
504 status: serde_json::to_value(TaskStatus::new(TaskState::Working)).unwrap_or_default(),
505 },
506 };
507 state
508 .atomic_store
509 .update_task_status_with_events(
510 tenant,
511 &task_id,
512 owner,
513 TaskStatus::new(TaskState::Working),
514 vec![working_event],
515 )
516 .await
517 .map_err(A2aError::from)?;
518 state.event_broker.notify(&task_id).await;
519
520 let spawn_deps = crate::server::spawn::SpawnDeps {
524 executor: state.executor.clone(),
525 task_storage: state.task_storage.clone(),
526 atomic_store: state.atomic_store.clone(),
527 event_broker: state.event_broker.clone(),
528 in_flight: state.in_flight.clone(),
529 push_dispatcher: state.push_dispatcher.clone(),
530 };
531 let scope = crate::server::spawn::SpawnScope {
532 tenant: tenant.to_string(),
533 owner: owner.to_string(),
534 task_id: task_id.clone(),
535 context_id: context_id.clone(),
536 message: message.clone(),
537 claims: claims.clone(),
538 };
539 let _spawn = crate::server::spawn::spawn_tracked_executor(spawn_deps, scope)?;
540
541 Ok((task_id, wake_rx))
542}
543
544pub(crate) async fn core_send_streaming_message(
545 state: AppState,
546 tenant: &str,
547 owner: &str,
548 claims: Option<serde_json::Value>,
549 body: String,
550) -> Result<axum::response::Response, A2aError> {
551 let (task_id, wake_rx) =
552 setup_streaming_send(state.clone(), tenant, owner, claims, body).await?;
553 Ok(make_store_sse_response(
557 state.event_store,
558 tenant.to_string(),
559 task_id,
560 0,
561 wake_rx,
562 None,
563 ))
564}
565
566pub(crate) async fn core_subscribe_to_task(
567 state: AppState,
568 tenant: &str,
569 owner: &str,
570 task_id: &str,
571 last_event_id_header: Option<&str>,
572) -> Result<axum::response::Response, A2aError> {
573 let task = state
575 .task_storage
576 .get_task(tenant, task_id, owner, None)
577 .await
578 .map_err(A2aError::from)?
579 .ok_or_else(|| A2aError::TaskNotFound {
580 task_id: task_id.to_string(),
581 })?;
582
583 if let Some(status) = task.status() {
585 if let Ok(s) = status.state() {
586 if s.is_terminal() {
587 return Err(A2aError::UnsupportedOperation {
588 message: format!("Task {task_id} is already in terminal state {s:?}"),
589 });
590 }
591 }
592 }
593
594 let after_sequence = last_event_id_header
596 .and_then(replay::parse_last_event_id)
597 .filter(|parsed| parsed.task_id == task_id)
598 .map(|parsed| parsed.sequence)
599 .unwrap_or(0);
600
601 let initial_task = if after_sequence == 0 {
604 Some(task)
605 } else {
606 None
607 };
608
609 let wake_rx = state.event_broker.subscribe(task_id).await;
611
612 Ok(make_store_sse_response(
613 state.event_store,
614 tenant.to_string(),
615 task_id.to_string(),
616 after_sequence,
617 wake_rx,
618 initial_task,
619 ))
620}
621
622const STORE_POLL_INTERVAL: Duration = Duration::from_secs(2);
625
626fn make_store_sse_response(
636 event_store: Arc<dyn A2aEventStore>,
637 tenant: String,
638 task_id: String,
639 after_sequence: u64,
640 wake_rx: broadcast::Receiver<()>,
641 initial_task: Option<Task>,
642) -> axum::response::Response {
643 let (tx, rx) = tokio::sync::mpsc::channel::<Result<Event, std::convert::Infallible>>(64);
644
645 tokio::spawn(async move {
646 if let Some(task) = initial_task {
648 let task_json =
649 serde_json::json!({"task": serde_json::to_value(&task).unwrap_or_default()});
650 let json = serde_json::to_string(&task_json).unwrap_or_default();
651 let sse_event = Event::default()
652 .id(replay::format_event_id(&task_id, 0))
653 .data(json);
654 if tx.send(Ok(sse_event)).await.is_err() {
655 return;
656 }
657 }
658
659 let mut last_seq = after_sequence;
660 let mut wake_rx = wake_rx;
661
662 loop {
663 let events = match event_store
665 .get_events_after(&tenant, &task_id, last_seq)
666 .await
667 {
668 Ok(e) => e,
669 Err(_) => break,
670 };
671
672 let mut saw_terminal = false;
673 for (seq, event) in events {
674 last_seq = seq;
675 let event_id = replay::format_event_id(&task_id, seq);
676 let json = serde_json::to_string(&event).unwrap_or_default();
677 let sse_event = Event::default().id(event_id).data(json);
678 if tx.send(Ok(sse_event)).await.is_err() {
679 return; }
681 if event.is_terminal() {
682 saw_terminal = true;
683 }
684 }
685
686 if saw_terminal {
687 break; }
689
690 tokio::select! {
692 result = wake_rx.recv() => {
693 match result {
694 Ok(()) => {} Err(broadcast::error::RecvError::Closed) => break,
696 Err(broadcast::error::RecvError::Lagged(_)) => {} }
698 }
699 _ = tokio::time::sleep(STORE_POLL_INTERVAL) => {
700 }
702 }
703 }
704 });
705
706 let stream = ReceiverStream::new(rx);
707 Sse::new(stream)
708 .keep_alive(KeepAlive::default())
709 .into_response()
710}
711
712const DEFAULT_TENANT: &str = "";
717
718async fn send_message_handler(
719 State(state): State<AppState>,
720 axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
721 body: String,
722) -> Result<Json<serde_json::Value>, A2aError> {
723 let claims = ctx.identity.claims().cloned();
724 core_send_message(state, DEFAULT_TENANT, ctx.identity.owner(), claims, body).await
725}
726
727async fn list_tasks_handler(
728 State(state): State<AppState>,
729 axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
730 Query(query): Query<ListTasksQuery>,
731) -> Result<Json<serde_json::Value>, A2aError> {
732 core_list_tasks(state, DEFAULT_TENANT, ctx.identity.owner(), &query).await
733}
734
735#[derive(serde::Deserialize, Default)]
740#[serde(default)]
741pub(crate) struct ListTasksQuery {
742 #[serde(rename = "contextId")]
743 pub(crate) context_id: Option<String>,
744 pub(crate) status: Option<String>,
745 #[serde(rename = "pageSize")]
746 pub(crate) page_size: Option<i32>,
747 #[serde(rename = "pageToken")]
748 pub(crate) page_token: Option<String>,
749 #[serde(rename = "historyLength")]
750 pub(crate) history_length: Option<i32>,
751 #[serde(rename = "includeArtifacts")]
752 pub(crate) include_artifacts: Option<bool>,
753}
754
755#[derive(serde::Deserialize, Default)]
756#[serde(default)]
757pub(crate) struct PushConfigQuery {
758 #[serde(rename = "pageSize")]
759 pub(crate) page_size: Option<i32>,
760 #[serde(rename = "pageToken")]
761 pub(crate) page_token: Option<String>,
762}
763
764fn parse_task_state(s: &str) -> Option<TaskState> {
769 match s {
770 "TASK_STATE_SUBMITTED" => Some(TaskState::Submitted),
771 "TASK_STATE_WORKING" => Some(TaskState::Working),
772 "TASK_STATE_COMPLETED" => Some(TaskState::Completed),
773 "TASK_STATE_FAILED" => Some(TaskState::Failed),
774 "TASK_STATE_CANCELED" => Some(TaskState::Canceled),
775 "TASK_STATE_INPUT_REQUIRED" => Some(TaskState::InputRequired),
776 "TASK_STATE_REJECTED" => Some(TaskState::Rejected),
777 "TASK_STATE_AUTH_REQUIRED" => Some(TaskState::AuthRequired),
778 _ => None,
779 }
780}
781
782#[doc(hidden)]
783pub async fn core_send_message(
784 state: AppState,
785 tenant: &str,
786 owner: &str,
787 claims: Option<serde_json::Value>,
788 body: String,
789) -> Result<Json<serde_json::Value>, A2aError> {
790 let request: turul_a2a_proto::SendMessageRequest =
791 serde_json::from_str(&body).map_err(|e| A2aError::InvalidRequest {
792 message: format!("Invalid request body: {e}"),
793 })?;
794
795 let proto_message = request.message.ok_or(A2aError::InvalidRequest {
796 message: "message field is required".into(),
797 })?;
798
799 let message = Message::try_from(proto_message).map_err(|e| A2aError::InvalidRequest {
800 message: format!("Invalid message: {e}"),
801 })?;
802
803 let return_immediately = request
804 .configuration
805 .as_ref()
806 .map(|c| c.return_immediately)
807 .unwrap_or(false);
808
809 if return_immediately && !state.runtime_config.supports_return_immediately {
815 tracing::warn!(
816 tenant = tenant,
817 owner = owner,
818 "rejecting SendMessageConfiguration.return_immediately=true: \
819 this runtime does not support post-return executor continuation \
820 (ADR-017 §Decision Bug 1, ADR-013 §4.4)"
821 );
822 return Err(A2aError::UnsupportedOperation {
823 message: "return_immediately=true is not supported on this runtime \
824 (post-return executor continuation is not guaranteed); \
825 resubmit with return_immediately=false for a blocking \
826 send"
827 .into(),
828 });
829 }
830
831 let history_length: Option<i32> = request
837 .configuration
838 .as_ref()
839 .and_then(|c| c.history_length);
840
841 let inline_push_config: Option<turul_a2a_proto::TaskPushNotificationConfig> = request
846 .configuration
847 .as_ref()
848 .and_then(|c| c.task_push_notification_config.clone());
849 if let Some(cfg) = inline_push_config.as_ref() {
850 if cfg.url.is_empty() {
851 return Err(A2aError::InvalidRequest {
852 message: "inline push config url is required".into(),
853 });
854 }
855 if let Err(e) = url::Url::parse(&cfg.url) {
856 return Err(A2aError::InvalidRequest {
857 message: format!("inline push config url is not a valid URL: {e}"),
858 });
859 }
860 }
861
862 let msg_task_id = message.as_proto().task_id.clone();
863
864 let (task_id, context_id, is_continuation) = if !msg_task_id.is_empty() {
866 let existing = state
867 .task_storage
868 .get_task(tenant, &msg_task_id, owner, None)
869 .await
870 .map_err(A2aError::from)?
871 .ok_or_else(|| A2aError::TaskNotFound {
872 task_id: msg_task_id.clone(),
873 })?;
874
875 let msg_context_id = &message.as_proto().context_id;
877 if !msg_context_id.is_empty() && msg_context_id != existing.context_id() {
878 return Err(A2aError::InvalidRequest {
879 message: format!(
880 "contextId mismatch: message has '{}' but task {} has '{}'",
881 msg_context_id,
882 msg_task_id,
883 existing.context_id()
884 ),
885 });
886 }
887
888 if let Some(status) = existing.status() {
890 if let Ok(s) = status.state() {
891 match s {
892 TaskState::InputRequired | TaskState::AuthRequired => {}
893 _ => {
894 return Err(A2aError::InvalidRequest {
895 message: format!(
896 "Task {msg_task_id} is in state {s:?}, only INPUT_REQUIRED or AUTH_REQUIRED tasks accept follow-up messages"
897 ),
898 });
899 }
900 }
901 }
902 }
903
904 (msg_task_id, existing.context_id().to_string(), true)
905 } else {
906 let context_id = if message.as_proto().context_id.is_empty() {
907 uuid::Uuid::now_v7().to_string()
908 } else {
909 message.as_proto().context_id.clone()
910 };
911 (uuid::Uuid::now_v7().to_string(), context_id, false)
912 };
913
914 let durable_queue = if return_immediately {
922 state.durable_executor_queue.clone()
923 } else {
924 None
925 };
926 let durable_job: Option<crate::durable_executor::QueuedExecutorJob> =
927 if let Some(queue) = durable_queue.as_ref() {
928 let now_micros = chrono::Utc::now().timestamp_micros();
929 let job = crate::durable_executor::QueuedExecutorJob {
930 version: crate::durable_executor::QueuedExecutorJob::VERSION,
931 tenant: tenant.to_string(),
932 owner: owner.to_string(),
933 task_id: task_id.clone(),
934 context_id: context_id.clone(),
935 message: message.as_proto().clone(),
936 claims: claims.clone(),
937 enqueued_at_micros: now_micros,
938 };
939 if let Err(e) = queue.check_payload_size(&job) {
940 return Err(A2aError::InvalidRequest {
941 message: format!("durable executor queue: {e}"),
942 });
943 }
944 Some(job)
945 } else {
946 None
947 };
948
949 if is_continuation {
955 state
956 .task_storage
957 .append_message(tenant, &task_id, owner, message.clone())
958 .await
959 .map_err(A2aError::from)?;
960
961 let working_event = StreamEvent::StatusUpdate {
962 status_update: crate::streaming::StatusUpdatePayload {
963 task_id: task_id.clone(),
964 context_id: context_id.clone(),
965 status: serde_json::to_value(TaskStatus::new(TaskState::Working))
966 .unwrap_or_default(),
967 },
968 };
969 state
970 .atomic_store
971 .update_task_status_with_events(
972 tenant,
973 &task_id,
974 owner,
975 TaskStatus::new(TaskState::Working),
976 vec![working_event],
977 )
978 .await
979 .map_err(A2aError::from)?;
980 state.event_broker.notify(&task_id).await;
981 } else {
982 let mut task =
983 Task::new(&task_id, TaskStatus::new(TaskState::Submitted)).with_context_id(&context_id);
984 task.append_message(message.clone());
985
986 let submitted_event = StreamEvent::StatusUpdate {
987 status_update: crate::streaming::StatusUpdatePayload {
988 task_id: task_id.clone(),
989 context_id: context_id.clone(),
990 status: serde_json::to_value(TaskStatus::new(TaskState::Submitted))
991 .unwrap_or_default(),
992 },
993 };
994 state
995 .atomic_store
996 .create_task_with_events(tenant, owner, task, vec![submitted_event])
997 .await
998 .map_err(A2aError::from)?;
999 state.event_broker.notify(&task_id).await;
1000
1001 let working_event = StreamEvent::StatusUpdate {
1002 status_update: crate::streaming::StatusUpdatePayload {
1003 task_id: task_id.clone(),
1004 context_id: context_id.clone(),
1005 status: serde_json::to_value(TaskStatus::new(TaskState::Working))
1006 .unwrap_or_default(),
1007 },
1008 };
1009 state
1010 .atomic_store
1011 .update_task_status_with_events(
1012 tenant,
1013 &task_id,
1014 owner,
1015 TaskStatus::new(TaskState::Working),
1016 vec![working_event],
1017 )
1018 .await
1019 .map_err(A2aError::from)?;
1020 state.event_broker.notify(&task_id).await;
1021 }
1022
1023 if let Some(mut cfg) = inline_push_config {
1032 cfg.task_id = task_id.clone();
1033 match state.push_storage.create_config(tenant, cfg).await {
1034 Ok(_) => {}
1035 Err(e) => {
1036 let reason = format!("inline push notification config registration failed: {e}");
1037 tracing::warn!(
1038 tenant = tenant,
1039 owner = owner,
1040 task_id = %task_id,
1041 "ADR-017 Bug 2 compensation: transitioning task to FAILED \
1042 after inline push config registration failure"
1043 );
1044 let reason_msg = Message::new(
1045 uuid::Uuid::now_v7().to_string(),
1046 turul_a2a_types::Role::Agent,
1047 vec![turul_a2a_types::Part::text(reason)],
1048 );
1049 let failed_status = TaskStatus::new(TaskState::Failed).with_message(reason_msg);
1050 let failed_event = StreamEvent::StatusUpdate {
1051 status_update: crate::streaming::StatusUpdatePayload {
1052 task_id: task_id.clone(),
1053 context_id: context_id.clone(),
1054 status: serde_json::to_value(&failed_status).unwrap_or_default(),
1055 },
1056 };
1057 if let Err(compensate_err) = state
1065 .atomic_store
1066 .update_task_status_with_events(
1067 tenant,
1068 &task_id,
1069 owner,
1070 failed_status,
1071 vec![failed_event],
1072 )
1073 .await
1074 {
1075 tracing::error!(
1076 tenant = tenant,
1077 owner = owner,
1078 task_id = %task_id,
1079 error = %compensate_err,
1080 "ADR-017 Bug 2 compensation failed: task may remain \
1081 in WORKING without a callback until supervisor sweep"
1082 );
1083 }
1084 state.event_broker.notify(&task_id).await;
1085 return Err(A2aError::from(e));
1086 }
1087 }
1088 }
1089
1090 if let (Some(queue), Some(job)) = (durable_queue.as_ref(), durable_job) {
1100 match queue.enqueue(job).await {
1101 Ok(()) => {
1102 let current = state
1103 .task_storage
1104 .get_task(tenant, &task_id, owner, history_length)
1105 .await
1106 .map_err(A2aError::from)?
1107 .ok_or_else(|| A2aError::TaskNotFound {
1108 task_id: task_id.clone(),
1109 })?;
1110 return Ok(Json(serde_json::json!({
1111 "task": serde_json::to_value(¤t).unwrap_or_default()
1112 })));
1113 }
1114 Err(e) => {
1115 let reason = format!("durable executor enqueue failed: {e}");
1116 tracing::warn!(
1117 tenant = tenant,
1118 owner = owner,
1119 task_id = %task_id,
1120 queue_kind = queue.kind(),
1121 "ADR-018 enqueue failed: transitioning task to FAILED"
1122 );
1123 let reason_msg = Message::new(
1124 uuid::Uuid::now_v7().to_string(),
1125 turul_a2a_types::Role::Agent,
1126 vec![turul_a2a_types::Part::text(reason.clone())],
1127 );
1128 let failed_status = TaskStatus::new(TaskState::Failed).with_message(reason_msg);
1129 let failed_event = StreamEvent::StatusUpdate {
1130 status_update: crate::streaming::StatusUpdatePayload {
1131 task_id: task_id.clone(),
1132 context_id: context_id.clone(),
1133 status: serde_json::to_value(&failed_status).unwrap_or_default(),
1134 },
1135 };
1136 if let Err(compensate_err) = state
1137 .atomic_store
1138 .update_task_status_with_events(
1139 tenant,
1140 &task_id,
1141 owner,
1142 failed_status,
1143 vec![failed_event],
1144 )
1145 .await
1146 {
1147 tracing::error!(
1148 tenant = tenant,
1149 owner = owner,
1150 task_id = %task_id,
1151 error = %compensate_err,
1152 "ADR-018 FAILED-compensation itself failed: task may \
1153 remain in WORKING until supervisor sweep"
1154 );
1155 }
1156 state.event_broker.notify(&task_id).await;
1157 return Err(A2aError::Internal(reason));
1158 }
1159 }
1160 }
1161
1162 let spawn_deps = crate::server::spawn::SpawnDeps {
1164 executor: state.executor.clone(),
1165 task_storage: state.task_storage.clone(),
1166 atomic_store: state.atomic_store.clone(),
1167 event_broker: state.event_broker.clone(),
1168 in_flight: state.in_flight.clone(),
1169 push_dispatcher: state.push_dispatcher.clone(),
1170 };
1171 let scope = crate::server::spawn::SpawnScope {
1172 tenant: tenant.to_string(),
1173 owner: owner.to_string(),
1174 task_id: task_id.clone(),
1175 context_id: context_id.clone(),
1176 message: message.clone(),
1177 claims: claims.clone(),
1178 };
1179 let spawn = crate::server::spawn::spawn_tracked_executor(spawn_deps, scope)?;
1180
1181 if return_immediately {
1182 drop(spawn.yielded_rx);
1192 let current = state
1193 .task_storage
1194 .get_task(tenant, &task_id, owner, history_length)
1195 .await
1196 .map_err(A2aError::from)?
1197 .ok_or_else(|| A2aError::TaskNotFound {
1198 task_id: task_id.clone(),
1199 })?;
1200 return Ok(Json(serde_json::json!({
1201 "task": serde_json::to_value(¤t).unwrap_or_default()
1202 })));
1203 }
1204
1205 let task = await_yielded_with_two_deadlines(
1207 spawn.yielded_rx,
1208 spawn.cancellation,
1209 spawn.handle,
1210 &state,
1211 tenant,
1212 owner,
1213 &task_id,
1214 &context_id,
1215 )
1216 .await?;
1217
1218 let task = if history_length.is_some() {
1224 state
1225 .task_storage
1226 .get_task(tenant, &task_id, owner, history_length)
1227 .await
1228 .map_err(A2aError::from)?
1229 .unwrap_or(task)
1230 } else {
1231 task
1232 };
1233
1234 Ok(Json(serde_json::json!({
1235 "task": serde_json::to_value(&task).unwrap_or_default()
1236 })))
1237}
1238
1239#[allow(clippy::too_many_arguments)] async fn await_yielded_with_two_deadlines(
1256 mut yielded_rx: tokio::sync::oneshot::Receiver<Task>,
1257 cancellation: tokio_util::sync::CancellationToken,
1258 handle: Arc<crate::server::in_flight::InFlightHandle>,
1259 state: &AppState,
1260 tenant: &str,
1261 owner: &str,
1262 task_id: &str,
1263 context_id: &str,
1264) -> Result<Task, A2aError> {
1265 let soft = tokio::time::Instant::now() + state.runtime_config.blocking_task_timeout;
1266 let hard = soft + state.runtime_config.timeout_abort_grace;
1267
1268 let soft_outcome = tokio::select! {
1270 result = &mut yielded_rx => YieldedOutcome::Yielded(result.ok()),
1271 _ = tokio::time::sleep_until(soft) => YieldedOutcome::SoftTimeout,
1272 };
1273
1274 if let YieldedOutcome::Yielded(Some(task)) = soft_outcome {
1275 return Ok(task);
1276 }
1277 if let YieldedOutcome::Yielded(None) = soft_outcome {
1278 return Err(A2aError::Internal(
1283 "executor exited without emitting a terminal or interrupted event".into(),
1284 ));
1285 }
1286
1287 cancellation.cancel();
1292
1293 let cooperative_outcome = tokio::select! {
1294 result = &mut yielded_rx => YieldedOutcome::Yielded(result.ok()),
1295 _ = tokio::time::sleep_until(hard) => YieldedOutcome::HardTimeout,
1296 };
1297
1298 if let YieldedOutcome::Yielded(Some(task)) = cooperative_outcome {
1299 return Ok(task);
1300 }
1301 let reason_msg = Message::new(
1306 uuid::Uuid::now_v7().to_string(),
1307 turul_a2a_types::Role::Agent,
1308 vec![turul_a2a_types::Part::text(
1309 "task timed out: hard deadline exceeded without terminal emission",
1310 )],
1311 );
1312 let failed_status = TaskStatus::new(TaskState::Failed).with_message(reason_msg);
1313 let failed_event = StreamEvent::StatusUpdate {
1314 status_update: crate::streaming::StatusUpdatePayload {
1315 task_id: task_id.to_string(),
1316 context_id: context_id.to_string(),
1317 status: serde_json::to_value(&failed_status).unwrap_or_default(),
1318 },
1319 };
1320
1321 let failed_event_for_dispatch = failed_event.clone();
1322 let result = state
1323 .atomic_store
1324 .update_task_status_with_events(tenant, task_id, owner, failed_status, vec![failed_event])
1325 .await;
1326
1327 match result {
1328 Ok((task, seqs)) => {
1329 handle.abort();
1338 if let Some(dispatcher) = &state.push_dispatcher {
1342 let seq = seqs.first().copied().unwrap_or(0);
1343 dispatcher.dispatch(
1344 tenant.to_string(),
1345 owner.to_string(),
1346 task.clone(),
1347 vec![(seq, failed_event_for_dispatch)],
1348 );
1349 }
1350 state.event_broker.notify(task_id).await;
1351 Ok(task)
1352 }
1353 Err(crate::storage::A2aStorageError::TerminalStateAlreadySet { .. }) => {
1354 state.event_broker.notify(task_id).await;
1358 let persisted = state
1359 .task_storage
1360 .get_task(tenant, task_id, owner, None)
1361 .await
1362 .map_err(A2aError::from)?
1363 .ok_or_else(|| A2aError::TaskNotFound {
1364 task_id: task_id.to_string(),
1365 })?;
1366 Ok(persisted)
1367 }
1368 Err(e) => Err(A2aError::from(e)),
1369 }
1370}
1371
1372#[allow(clippy::large_enum_variant)] enum YieldedOutcome {
1374 Yielded(Option<Task>),
1375 SoftTimeout,
1376 HardTimeout,
1377}
1378
1379pub(crate) async fn core_list_tasks(
1380 state: AppState,
1381 tenant: &str,
1382 owner: &str,
1383 query: &ListTasksQuery,
1384) -> Result<Json<serde_json::Value>, A2aError> {
1385 let status = match &query.status {
1386 Some(s) => Some(parse_task_state(s).ok_or_else(|| A2aError::InvalidRequest {
1387 message: format!("Invalid status value: {s}"),
1388 })?),
1389 None => None,
1390 };
1391
1392 let filter = TaskFilter {
1393 tenant: Some(tenant.to_string()),
1394 owner: Some(owner.to_string()),
1395 context_id: query.context_id.clone(),
1396 status,
1397 page_size: query.page_size,
1398 page_token: query.page_token.clone(),
1399 history_length: query.history_length,
1400 include_artifacts: query.include_artifacts,
1401 ..Default::default()
1402 };
1403
1404 let page = state
1405 .task_storage
1406 .list_tasks(filter)
1407 .await
1408 .map_err(A2aError::from)?;
1409 Ok(Json(list_page_to_json(&page)))
1410}
1411
1412fn list_page_to_json(page: &TaskListPage) -> serde_json::Value {
1413 let tasks: Vec<serde_json::Value> = page
1414 .tasks
1415 .iter()
1416 .map(|t| serde_json::to_value(t).unwrap_or_default())
1417 .collect();
1418 serde_json::json!({
1419 "tasks": tasks,
1420 "nextPageToken": page.next_page_token,
1421 "pageSize": page.page_size,
1422 "totalSize": page.total_size,
1423 })
1424}
1425
1426pub(crate) async fn core_get_task(
1427 state: AppState,
1428 tenant: &str,
1429 owner: &str,
1430 task_id: &str,
1431 history_length: Option<i32>,
1432) -> Result<Json<serde_json::Value>, A2aError> {
1433 let task = state
1434 .task_storage
1435 .get_task(tenant, task_id, owner, history_length)
1436 .await
1437 .map_err(A2aError::from)?
1438 .ok_or_else(|| A2aError::TaskNotFound {
1439 task_id: task_id.to_string(),
1440 })?;
1441
1442 Ok(Json(serde_json::to_value(&task).unwrap_or_default()))
1443}
1444
1445#[doc(hidden)]
1465pub async fn core_cancel_task(
1466 state: AppState,
1467 tenant: &str,
1468 owner: &str,
1469 task_id: &str,
1470) -> Result<Json<serde_json::Value>, A2aError> {
1471 let initial_task = state
1474 .task_storage
1475 .get_task(tenant, task_id, owner, Some(0))
1476 .await
1477 .map_err(A2aError::from)?
1478 .ok_or_else(|| A2aError::TaskNotFound {
1479 task_id: task_id.to_string(),
1480 })?;
1481
1482 if let Some(status) = initial_task.status() {
1484 if let Ok(s) = status.state() {
1485 if turul_a2a_types::state_machine::is_terminal(s) {
1486 return Err(A2aError::TaskNotCancelable {
1487 task_id: task_id.to_string(),
1488 });
1489 }
1490 }
1491 }
1492
1493 let context_id = initial_task.context_id().to_string();
1494
1495 match state
1498 .task_storage
1499 .set_cancel_requested(tenant, task_id, owner)
1500 .await
1501 {
1502 Ok(()) => {}
1503 Err(A2aStorageError::TaskNotFound(_)) => {
1504 return Err(A2aError::TaskNotFound {
1505 task_id: task_id.to_string(),
1506 });
1507 }
1508 Err(A2aStorageError::TerminalState(_))
1509 | Err(A2aStorageError::InvalidTransition { .. })
1510 | Err(A2aStorageError::TerminalStateAlreadySet { .. }) => {
1511 return Err(A2aError::TaskNotCancelable {
1512 task_id: task_id.to_string(),
1513 });
1514 }
1515 Err(other) => return Err(A2aError::from(other)),
1516 }
1517
1518 let in_flight_key = (tenant.to_string(), task_id.to_string());
1522 if let Some(handle) = state.in_flight.get(&in_flight_key) {
1523 handle.cancellation.cancel();
1524 }
1525
1526 let deadline = tokio::time::Instant::now() + state.runtime_config.cancel_handler_grace;
1529 let poll_interval = state.runtime_config.cancel_handler_poll_interval;
1530 loop {
1531 let now = tokio::time::Instant::now();
1532 if now >= deadline {
1533 break;
1534 }
1535
1536 match state
1537 .task_storage
1538 .get_task(tenant, task_id, owner, Some(0))
1539 .await
1540 .map_err(A2aError::from)?
1541 {
1542 Some(current) => {
1543 if let Some(status) = current.status() {
1544 if let Ok(s) = status.state() {
1545 if turul_a2a_types::state_machine::is_terminal(s) {
1546 state.event_broker.notify(task_id).await;
1549 return Ok(Json(serde_json::to_value(¤t).unwrap_or_default()));
1550 }
1551 }
1552 }
1553 }
1554 None => {
1555 return Err(A2aError::TaskNotFound {
1556 task_id: task_id.to_string(),
1557 });
1558 }
1559 }
1560
1561 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
1562 if remaining.is_zero() {
1563 break;
1564 }
1565 let sleep_for = std::cmp::min(poll_interval, remaining);
1566 tokio::time::sleep(sleep_for).await;
1567 }
1568
1569 let cancel_event = StreamEvent::StatusUpdate {
1574 status_update: crate::streaming::StatusUpdatePayload {
1575 task_id: task_id.to_string(),
1576 context_id,
1577 status: serde_json::to_value(TaskStatus::new(TaskState::Canceled)).unwrap_or_default(),
1578 },
1579 };
1580 let cancel_event_for_dispatch = cancel_event.clone();
1581
1582 let result = state
1583 .atomic_store
1584 .update_task_status_with_events(
1585 tenant,
1586 task_id,
1587 owner,
1588 TaskStatus::new(TaskState::Canceled),
1589 vec![cancel_event],
1590 )
1591 .await;
1592
1593 match result {
1594 Ok((task, seqs)) => {
1595 if let Some(dispatcher) = &state.push_dispatcher {
1600 let seq = seqs.first().copied().unwrap_or(0);
1601 dispatcher.dispatch(
1602 tenant.to_string(),
1603 owner.to_string(),
1604 task.clone(),
1605 vec![(seq, cancel_event_for_dispatch)],
1606 );
1607 }
1608 state.event_broker.notify(task_id).await;
1609 Ok(Json(serde_json::to_value(&task).unwrap_or_default()))
1610 }
1611 Err(A2aStorageError::TerminalStateAlreadySet { .. }) => {
1612 let persisted = state
1617 .task_storage
1618 .get_task(tenant, task_id, owner, None)
1619 .await
1620 .map_err(A2aError::from)?
1621 .ok_or_else(|| A2aError::TaskNotFound {
1622 task_id: task_id.to_string(),
1623 })?;
1624 state.event_broker.notify(task_id).await;
1625 Ok(Json(serde_json::to_value(&persisted).unwrap_or_default()))
1626 }
1627 Err(A2aStorageError::TaskNotFound(id)) => Err(A2aError::TaskNotFound { task_id: id }),
1628 Err(A2aStorageError::TerminalState(_)) | Err(A2aStorageError::InvalidTransition { .. }) => {
1629 Err(A2aError::TaskNotCancelable {
1630 task_id: task_id.to_string(),
1631 })
1632 }
1633 Err(other) => Err(A2aError::from(other)),
1634 }
1635}
1636
1637async fn verify_task_ownership(
1639 state: &AppState,
1640 tenant: &str,
1641 owner: &str,
1642 task_id: &str,
1643) -> Result<(), A2aError> {
1644 state
1645 .task_storage
1646 .get_task(tenant, task_id, owner, Some(0))
1647 .await
1648 .map_err(A2aError::from)?
1649 .ok_or_else(|| A2aError::TaskNotFound {
1650 task_id: task_id.to_string(),
1651 })?;
1652 Ok(())
1653}
1654
1655pub(crate) async fn core_create_push_config(
1656 state: AppState,
1657 tenant: &str,
1658 owner: &str,
1659 task_id: &str,
1660 body: String,
1661) -> Result<Json<serde_json::Value>, A2aError> {
1662 verify_task_ownership(&state, tenant, owner, task_id).await?;
1663
1664 let mut config: turul_a2a_proto::TaskPushNotificationConfig = serde_json::from_str(&body)
1665 .map_err(|e| A2aError::InvalidRequest {
1666 message: format!("Invalid push config: {e}"),
1667 })?;
1668 config.task_id = task_id.to_string();
1669
1670 if config.url.is_empty() {
1677 return Err(A2aError::InvalidRequest {
1678 message: "push config url is required".into(),
1679 });
1680 }
1681 if let Err(e) = url::Url::parse(&config.url) {
1682 return Err(A2aError::InvalidRequest {
1683 message: format!("push config url is not a valid URL: {e}"),
1684 });
1685 }
1686
1687 let created = state
1688 .push_storage
1689 .create_config(tenant, config)
1690 .await
1691 .map_err(A2aError::from)?;
1692 Ok(Json(serde_json::to_value(&created).unwrap_or_default()))
1693}
1694
1695pub(crate) async fn core_list_push_configs(
1696 state: AppState,
1697 tenant: &str,
1698 owner: &str,
1699 task_id: &str,
1700 query: &PushConfigQuery,
1701) -> Result<Json<serde_json::Value>, A2aError> {
1702 verify_task_ownership(&state, tenant, owner, task_id).await?;
1703
1704 let page = state
1705 .push_storage
1706 .list_configs(
1707 tenant,
1708 task_id,
1709 query.page_token.as_deref(),
1710 query.page_size,
1711 )
1712 .await
1713 .map_err(A2aError::from)?;
1714
1715 let configs: Vec<serde_json::Value> = page
1716 .configs
1717 .iter()
1718 .map(|c| serde_json::to_value(c).unwrap_or_default())
1719 .collect();
1720
1721 Ok(Json(serde_json::json!({
1722 "configs": configs,
1723 "nextPageToken": page.next_page_token,
1724 })))
1725}
1726
1727pub(crate) async fn core_get_push_config(
1728 state: AppState,
1729 tenant: &str,
1730 owner: &str,
1731 task_id: &str,
1732 config_id: &str,
1733) -> Result<Json<serde_json::Value>, A2aError> {
1734 verify_task_ownership(&state, tenant, owner, task_id).await?;
1735
1736 let config = state
1737 .push_storage
1738 .get_config(tenant, task_id, config_id)
1739 .await
1740 .map_err(A2aError::from)?
1741 .ok_or_else(|| A2aError::TaskNotFound {
1742 task_id: format!("push config {config_id} for task {task_id}"),
1743 })?;
1744
1745 Ok(Json(serde_json::to_value(&config).unwrap_or_default()))
1746}
1747
1748pub(crate) async fn core_delete_push_config(
1749 state: AppState,
1750 tenant: &str,
1751 owner: &str,
1752 task_id: &str,
1753 config_id: &str,
1754) -> Result<Json<serde_json::Value>, A2aError> {
1755 verify_task_ownership(&state, tenant, owner, task_id).await?;
1756
1757 state
1758 .push_storage
1759 .delete_config(tenant, task_id, config_id)
1760 .await
1761 .map_err(A2aError::from)?;
1762
1763 Ok(Json(serde_json::json!({})))
1764}
1765
1766impl IntoResponse for A2aError {
1768 fn into_response(self) -> axum::response::Response {
1769 let status = axum::http::StatusCode::from_u16(self.http_status())
1770 .unwrap_or(axum::http::StatusCode::INTERNAL_SERVER_ERROR);
1771 let body = self.to_http_error_body();
1772 (status, Json(body)).into_response()
1773 }
1774}
1775
1776#[cfg(test)]
1781mod tests {
1782 use super::*;
1783
1784 #[test]
1785 fn parse_get_task() {
1786 assert_eq!(
1787 parse_task_path("abc-123"),
1788 Some(TaskAction::GetTask("abc-123".into()))
1789 );
1790 assert_eq!(
1791 parse_task_path("/abc-123"),
1792 Some(TaskAction::GetTask("abc-123".into()))
1793 );
1794 }
1795
1796 #[test]
1797 fn parse_cancel_task() {
1798 assert_eq!(
1799 parse_task_path("abc-123:cancel"),
1800 Some(TaskAction::CancelTask("abc-123".into()))
1801 );
1802 }
1803
1804 #[test]
1805 fn parse_subscribe_to_task() {
1806 assert_eq!(
1807 parse_task_path("abc-123:subscribe"),
1808 Some(TaskAction::SubscribeToTask("abc-123".into()))
1809 );
1810 }
1811
1812 #[test]
1813 fn parse_push_config_collection() {
1814 assert_eq!(
1816 parse_task_path("task-1/pushNotificationConfigs"),
1817 Some(TaskAction::PushConfigCollection("task-1".into()))
1818 );
1819 }
1820
1821 #[test]
1822 fn parse_push_config_item() {
1823 assert_eq!(
1825 parse_task_path("task-1/pushNotificationConfigs/cfg-1"),
1826 Some(TaskAction::PushConfigItem("task-1".into(), "cfg-1".into()))
1827 );
1828 }
1829
1830 #[test]
1831 fn parse_invalid_path() {
1832 assert_eq!(parse_task_path("a/b/c/d"), None);
1833 }
1834
1835 }