use std::sync::Arc;
use std::time::Duration;
use axum::extract::{Path, Query, State};
use axum::response::IntoResponse;
use axum::response::sse::{Event, KeepAlive, Sse};
use axum::routing::{get, post};
use axum::{Json, Router};
use tokio::sync::broadcast;
use tokio_stream::wrappers::ReceiverStream;
use crate::error::A2aError;
use crate::executor::AgentExecutor;
use crate::storage::{
A2aAtomicStore, A2aEventStore, A2aPushNotificationStorage, A2aStorageError, A2aTaskStorage,
TaskFilter, TaskListPage,
};
use crate::streaming::{StreamEvent, replay};
use turul_a2a_types::{Message, Task, TaskState, TaskStatus};
#[derive(Clone)]
pub struct AppState {
pub executor: Arc<dyn AgentExecutor>,
pub task_storage: Arc<dyn A2aTaskStorage>,
pub push_storage: Arc<dyn A2aPushNotificationStorage>,
pub event_store: Arc<dyn crate::storage::A2aEventStore>,
pub atomic_store: Arc<dyn A2aAtomicStore>,
pub event_broker: crate::streaming::TaskEventBroker,
pub middleware_stack: Arc<crate::middleware::MiddlewareStack>,
pub runtime_config: crate::server::RuntimeConfig,
pub in_flight: Arc<crate::server::in_flight::InFlightRegistry>,
pub cancellation_supervisor: Arc<dyn crate::storage::A2aCancellationSupervisor>,
pub push_delivery_store: Option<Arc<dyn crate::push::A2aPushDeliveryStore>>,
pub push_dispatcher: Option<Arc<crate::push::PushDispatcher>>,
pub durable_executor_queue: Option<Arc<dyn crate::durable_executor::DurableExecutorQueue>>,
}
pub fn build_router(state: AppState) -> Router {
let router = Router::new()
.route("/.well-known/agent-card.json", get(agent_card_handler))
.route("/extendedAgentCard", get(extended_agent_card_handler))
.route("/message:send", post(send_message_handler))
.route("/message:stream", post(send_streaming_message_handler))
.route("/tasks", get(list_tasks_handler))
.route(
"/tasks/{*rest}",
get(task_get_dispatch)
.post(task_post_dispatch)
.delete(task_delete_dispatch),
)
.route("/{tenant}/message:send", post(tenant_send_message_handler))
.route(
"/{tenant}/message:stream",
post(tenant_send_streaming_message_handler),
)
.route("/{tenant}/tasks", get(tenant_list_tasks_handler))
.route(
"/{tenant}/extendedAgentCard",
get(extended_agent_card_handler),
)
.route(
"/{tenant}/tasks/{*rest}",
get(tenant_task_get_dispatch)
.post(tenant_task_post_dispatch)
.delete(tenant_task_delete_dispatch),
);
let router = router.route("/jsonrpc", post(crate::jsonrpc::jsonrpc_dispatch_handler));
#[cfg(feature = "compat-v03")]
let router = router.route("/", post(crate::jsonrpc::jsonrpc_dispatch_handler));
let auth_layer = crate::middleware::AuthLayer::new(state.middleware_stack.clone());
let transport_layer = crate::middleware::transport::TransportComplianceLayer;
router
.with_state(state)
.layer(auth_layer)
.layer(transport_layer)
}
#[derive(Debug, PartialEq)]
enum TaskAction {
GetTask(String),
CancelTask(String),
SubscribeToTask(String),
PushConfigCollection(String),
PushConfigItem(String, String),
}
fn parse_task_path(rest: &str) -> Option<TaskAction> {
let rest = rest.strip_prefix('/').unwrap_or(rest);
let parts: Vec<&str> = rest.split('/').collect();
match parts.as_slice() {
[segment] => {
if let Some(id) = segment.strip_suffix(":cancel") {
Some(TaskAction::CancelTask(id.to_string()))
} else if let Some(id) = segment.strip_suffix(":subscribe") {
Some(TaskAction::SubscribeToTask(id.to_string()))
} else {
Some(TaskAction::GetTask(segment.to_string()))
}
}
[task_id, "pushNotificationConfigs"] => {
Some(TaskAction::PushConfigCollection(task_id.to_string()))
}
[task_id, "pushNotificationConfigs", config_id] => Some(TaskAction::PushConfigItem(
task_id.to_string(),
config_id.to_string(),
)),
_ => None,
}
}
async fn task_get_dispatch(
State(state): State<AppState>,
axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
headers: axum::http::HeaderMap,
Path(rest): Path<String>,
Query(query): Query<TaskGetCombinedQuery>,
) -> Result<axum::response::Response, A2aError> {
let last_event_id = headers
.get("Last-Event-ID")
.or_else(|| headers.get("last-event-id"))
.and_then(|v| v.to_str().ok())
.map(String::from);
dispatch_task_get(
state,
DEFAULT_TENANT,
ctx.identity.owner(),
&rest,
&query,
last_event_id.as_deref(),
)
.await
}
async fn task_post_dispatch(
State(state): State<AppState>,
axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
Path(rest): Path<String>,
body: String,
) -> Result<axum::response::Response, A2aError> {
dispatch_task_post(state, DEFAULT_TENANT, ctx.identity.owner(), &rest, body).await
}
async fn task_delete_dispatch(
State(state): State<AppState>,
axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
Path(rest): Path<String>,
) -> Result<axum::response::Response, A2aError> {
dispatch_task_delete(state, DEFAULT_TENANT, ctx.identity.owner(), &rest).await
}
async fn tenant_send_message_handler(
State(state): State<AppState>,
axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
Path(tenant): Path<String>,
body: String,
) -> Result<Json<serde_json::Value>, A2aError> {
let claims = ctx.identity.claims().cloned();
core_send_message(state, &tenant, ctx.identity.owner(), claims, body).await
}
async fn tenant_list_tasks_handler(
State(state): State<AppState>,
axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
Path(tenant): Path<String>,
Query(query): Query<ListTasksQuery>,
) -> Result<Json<serde_json::Value>, A2aError> {
core_list_tasks(state, &tenant, ctx.identity.owner(), &query).await
}
async fn tenant_task_get_dispatch(
State(state): State<AppState>,
axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
headers: axum::http::HeaderMap,
Path((tenant, rest)): Path<(String, String)>,
Query(query): Query<TaskGetCombinedQuery>,
) -> Result<axum::response::Response, A2aError> {
let last_event_id = headers
.get("Last-Event-ID")
.or_else(|| headers.get("last-event-id"))
.and_then(|v| v.to_str().ok())
.map(String::from);
dispatch_task_get(
state,
&tenant,
ctx.identity.owner(),
&rest,
&query,
last_event_id.as_deref(),
)
.await
}
async fn tenant_task_post_dispatch(
State(state): State<AppState>,
axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
Path((tenant, rest)): Path<(String, String)>,
body: String,
) -> Result<axum::response::Response, A2aError> {
dispatch_task_post(state, &tenant, ctx.identity.owner(), &rest, body).await
}
async fn tenant_task_delete_dispatch(
State(state): State<AppState>,
axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
Path((tenant, rest)): Path<(String, String)>,
) -> Result<axum::response::Response, A2aError> {
dispatch_task_delete(state, &tenant, ctx.identity.owner(), &rest).await
}
#[derive(serde::Deserialize, Default)]
#[serde(default)]
struct TaskGetCombinedQuery {
#[serde(rename = "historyLength")]
history_length: Option<i32>,
#[serde(rename = "pageSize")]
page_size: Option<i32>,
#[serde(rename = "pageToken")]
page_token: Option<String>,
}
async fn dispatch_task_get(
state: AppState,
tenant: &str,
owner: &str,
rest: &str,
query: &TaskGetCombinedQuery,
last_event_id: Option<&str>,
) -> Result<axum::response::Response, A2aError> {
match parse_task_path(rest) {
Some(TaskAction::GetTask(id)) => {
let Json(v) = core_get_task(state, tenant, owner, &id, query.history_length).await?;
Ok(Json(v).into_response())
}
Some(TaskAction::SubscribeToTask(id)) => {
core_subscribe_to_task(state, tenant, owner, &id, last_event_id).await
}
Some(TaskAction::PushConfigCollection(task_id)) => {
let pq = PushConfigQuery {
page_size: query.page_size,
page_token: query.page_token.clone(),
};
let Json(v) = core_list_push_configs(state, tenant, owner, &task_id, &pq).await?;
Ok(Json(v).into_response())
}
Some(TaskAction::PushConfigItem(task_id, config_id)) => {
let Json(v) = core_get_push_config(state, tenant, owner, &task_id, &config_id).await?;
Ok(Json(v).into_response())
}
_ => Err(A2aError::InvalidRequest {
message: "Invalid task path".into(),
}),
}
}
async fn dispatch_task_post(
state: AppState,
tenant: &str,
owner: &str,
rest: &str,
body: String,
) -> Result<axum::response::Response, A2aError> {
match parse_task_path(rest) {
Some(TaskAction::CancelTask(id)) => {
let Json(v) = core_cancel_task(state, tenant, owner, &id).await?;
Ok(Json(v).into_response())
}
Some(TaskAction::PushConfigCollection(task_id)) => {
let Json(v) = core_create_push_config(state, tenant, owner, &task_id, body).await?;
Ok(Json(v).into_response())
}
_ => Err(A2aError::InvalidRequest {
message: "Invalid task path".into(),
}),
}
}
async fn dispatch_task_delete(
state: AppState,
tenant: &str,
owner: &str,
rest: &str,
) -> Result<axum::response::Response, A2aError> {
match parse_task_path(rest) {
Some(TaskAction::PushConfigItem(task_id, config_id)) => {
let Json(v) =
core_delete_push_config(state, tenant, owner, &task_id, &config_id).await?;
Ok(Json(v).into_response())
}
_ => Err(A2aError::InvalidRequest {
message: "Invalid task path".into(),
}),
}
}
async fn agent_card_handler(State(state): State<AppState>) -> impl IntoResponse {
let card = serde_json::to_value(state.executor.agent_card()).unwrap_or_default();
#[cfg(feature = "compat-v03")]
let card = crate::compat_v03::inject_agent_card_compat(card);
Json(card)
}
async fn extended_agent_card_handler(
State(state): State<AppState>,
) -> Result<impl IntoResponse, A2aError> {
match state.executor.extended_agent_card(None) {
Some(extended) => {
let card = serde_json::to_value(extended).unwrap_or_default();
#[cfg(feature = "compat-v03")]
let card = crate::compat_v03::inject_agent_card_compat(card);
Ok(Json(card))
}
None => Err(A2aError::ExtendedAgentCardNotConfigured),
}
}
async fn send_streaming_message_handler(
State(state): State<AppState>,
axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
body: String,
) -> Result<axum::response::Response, A2aError> {
let claims = ctx.identity.claims().cloned();
core_send_streaming_message(state, DEFAULT_TENANT, ctx.identity.owner(), claims, body).await
}
async fn tenant_send_streaming_message_handler(
State(state): State<AppState>,
axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
Path(tenant): Path<String>,
body: String,
) -> Result<axum::response::Response, A2aError> {
let claims = ctx.identity.claims().cloned();
core_send_streaming_message(state, &tenant, ctx.identity.owner(), claims, body).await
}
pub(crate) async fn setup_streaming_send(
state: AppState,
tenant: &str,
owner: &str,
claims: Option<serde_json::Value>,
body: String,
) -> Result<(String, broadcast::Receiver<()>), A2aError> {
let request: turul_a2a_proto::SendMessageRequest =
serde_json::from_str(&body).map_err(|e| A2aError::InvalidRequest {
message: format!("Invalid request body: {e}"),
})?;
let proto_message = request.message.ok_or(A2aError::InvalidRequest {
message: "message field is required".into(),
})?;
let message = Message::try_from(proto_message).map_err(|e| A2aError::InvalidRequest {
message: format!("Invalid message: {e}"),
})?;
let task_id = uuid::Uuid::now_v7().to_string();
let context_id = if message.as_proto().context_id.is_empty() {
uuid::Uuid::now_v7().to_string()
} else {
message.as_proto().context_id.clone()
};
let wake_rx = state.event_broker.subscribe(&task_id).await;
let mut task =
Task::new(&task_id, TaskStatus::new(TaskState::Submitted)).with_context_id(&context_id);
task.append_message(message.clone());
let submitted_event = StreamEvent::StatusUpdate {
status_update: crate::streaming::StatusUpdatePayload {
task_id: task_id.clone(),
context_id: context_id.clone(),
status: serde_json::to_value(TaskStatus::new(TaskState::Submitted)).unwrap_or_default(),
},
};
state
.atomic_store
.create_task_with_events(tenant, owner, task, vec![submitted_event])
.await
.map_err(A2aError::from)?;
state.event_broker.notify(&task_id).await;
let working_event = StreamEvent::StatusUpdate {
status_update: crate::streaming::StatusUpdatePayload {
task_id: task_id.clone(),
context_id: context_id.clone(),
status: serde_json::to_value(TaskStatus::new(TaskState::Working)).unwrap_or_default(),
},
};
state
.atomic_store
.update_task_status_with_events(
tenant,
&task_id,
owner,
TaskStatus::new(TaskState::Working),
vec![working_event],
)
.await
.map_err(A2aError::from)?;
state.event_broker.notify(&task_id).await;
let spawn_deps = crate::server::spawn::SpawnDeps {
executor: state.executor.clone(),
task_storage: state.task_storage.clone(),
atomic_store: state.atomic_store.clone(),
event_broker: state.event_broker.clone(),
in_flight: state.in_flight.clone(),
push_dispatcher: state.push_dispatcher.clone(),
};
let scope = crate::server::spawn::SpawnScope {
tenant: tenant.to_string(),
owner: owner.to_string(),
task_id: task_id.clone(),
context_id: context_id.clone(),
message: message.clone(),
claims: claims.clone(),
};
let _spawn = crate::server::spawn::spawn_tracked_executor(spawn_deps, scope)?;
Ok((task_id, wake_rx))
}
pub(crate) async fn core_send_streaming_message(
state: AppState,
tenant: &str,
owner: &str,
claims: Option<serde_json::Value>,
body: String,
) -> Result<axum::response::Response, A2aError> {
let (task_id, wake_rx) =
setup_streaming_send(state.clone(), tenant, owner, claims, body).await?;
Ok(make_store_sse_response(
state.event_store,
tenant.to_string(),
task_id,
0,
wake_rx,
None,
))
}
pub(crate) async fn core_subscribe_to_task(
state: AppState,
tenant: &str,
owner: &str,
task_id: &str,
last_event_id_header: Option<&str>,
) -> Result<axum::response::Response, A2aError> {
let task = state
.task_storage
.get_task(tenant, task_id, owner, None)
.await
.map_err(A2aError::from)?
.ok_or_else(|| A2aError::TaskNotFound {
task_id: task_id.to_string(),
})?;
if let Some(status) = task.status() {
if let Ok(s) = status.state() {
if s.is_terminal() {
return Err(A2aError::UnsupportedOperation {
message: format!("Task {task_id} is already in terminal state {s:?}"),
});
}
}
}
let after_sequence = last_event_id_header
.and_then(replay::parse_last_event_id)
.filter(|parsed| parsed.task_id == task_id)
.map(|parsed| parsed.sequence)
.unwrap_or(0);
let initial_task = if after_sequence == 0 {
Some(task)
} else {
None
};
let wake_rx = state.event_broker.subscribe(task_id).await;
Ok(make_store_sse_response(
state.event_store,
tenant.to_string(),
task_id.to_string(),
after_sequence,
wake_rx,
initial_task,
))
}
const STORE_POLL_INTERVAL: Duration = Duration::from_secs(2);
fn make_store_sse_response(
event_store: Arc<dyn A2aEventStore>,
tenant: String,
task_id: String,
after_sequence: u64,
wake_rx: broadcast::Receiver<()>,
initial_task: Option<Task>,
) -> axum::response::Response {
let (tx, rx) = tokio::sync::mpsc::channel::<Result<Event, std::convert::Infallible>>(64);
tokio::spawn(async move {
if let Some(task) = initial_task {
let task_json =
serde_json::json!({"task": serde_json::to_value(&task).unwrap_or_default()});
let json = serde_json::to_string(&task_json).unwrap_or_default();
let sse_event = Event::default()
.id(replay::format_event_id(&task_id, 0))
.data(json);
if tx.send(Ok(sse_event)).await.is_err() {
return;
}
}
let mut last_seq = after_sequence;
let mut wake_rx = wake_rx;
loop {
let events = match event_store
.get_events_after(&tenant, &task_id, last_seq)
.await
{
Ok(e) => e,
Err(_) => break,
};
let mut saw_terminal = false;
for (seq, event) in events {
last_seq = seq;
let event_id = replay::format_event_id(&task_id, seq);
let json = serde_json::to_string(&event).unwrap_or_default();
let sse_event = Event::default().id(event_id).data(json);
if tx.send(Ok(sse_event)).await.is_err() {
return; }
if event.is_terminal() {
saw_terminal = true;
}
}
if saw_terminal {
break; }
tokio::select! {
result = wake_rx.recv() => {
match result {
Ok(()) => {} Err(broadcast::error::RecvError::Closed) => break,
Err(broadcast::error::RecvError::Lagged(_)) => {} }
}
_ = tokio::time::sleep(STORE_POLL_INTERVAL) => {
}
}
}
});
let stream = ReceiverStream::new(rx);
Sse::new(stream)
.keep_alive(KeepAlive::default())
.into_response()
}
const DEFAULT_TENANT: &str = "";
async fn send_message_handler(
State(state): State<AppState>,
axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
body: String,
) -> Result<Json<serde_json::Value>, A2aError> {
let claims = ctx.identity.claims().cloned();
core_send_message(state, DEFAULT_TENANT, ctx.identity.owner(), claims, body).await
}
async fn list_tasks_handler(
State(state): State<AppState>,
axum::Extension(ctx): axum::Extension<crate::middleware::RequestContext>,
Query(query): Query<ListTasksQuery>,
) -> Result<Json<serde_json::Value>, A2aError> {
core_list_tasks(state, DEFAULT_TENANT, ctx.identity.owner(), &query).await
}
#[derive(serde::Deserialize, Default)]
#[serde(default)]
pub(crate) struct ListTasksQuery {
#[serde(rename = "contextId")]
pub(crate) context_id: Option<String>,
pub(crate) status: Option<String>,
#[serde(rename = "pageSize")]
pub(crate) page_size: Option<i32>,
#[serde(rename = "pageToken")]
pub(crate) page_token: Option<String>,
#[serde(rename = "historyLength")]
pub(crate) history_length: Option<i32>,
#[serde(rename = "includeArtifacts")]
pub(crate) include_artifacts: Option<bool>,
}
#[derive(serde::Deserialize, Default)]
#[serde(default)]
pub(crate) struct PushConfigQuery {
#[serde(rename = "pageSize")]
pub(crate) page_size: Option<i32>,
#[serde(rename = "pageToken")]
pub(crate) page_token: Option<String>,
}
fn parse_task_state(s: &str) -> Option<TaskState> {
match s {
"TASK_STATE_SUBMITTED" => Some(TaskState::Submitted),
"TASK_STATE_WORKING" => Some(TaskState::Working),
"TASK_STATE_COMPLETED" => Some(TaskState::Completed),
"TASK_STATE_FAILED" => Some(TaskState::Failed),
"TASK_STATE_CANCELED" => Some(TaskState::Canceled),
"TASK_STATE_INPUT_REQUIRED" => Some(TaskState::InputRequired),
"TASK_STATE_REJECTED" => Some(TaskState::Rejected),
"TASK_STATE_AUTH_REQUIRED" => Some(TaskState::AuthRequired),
_ => None,
}
}
#[doc(hidden)]
pub async fn core_send_message(
state: AppState,
tenant: &str,
owner: &str,
claims: Option<serde_json::Value>,
body: String,
) -> Result<Json<serde_json::Value>, A2aError> {
let request: turul_a2a_proto::SendMessageRequest =
serde_json::from_str(&body).map_err(|e| A2aError::InvalidRequest {
message: format!("Invalid request body: {e}"),
})?;
let proto_message = request.message.ok_or(A2aError::InvalidRequest {
message: "message field is required".into(),
})?;
let message = Message::try_from(proto_message).map_err(|e| A2aError::InvalidRequest {
message: format!("Invalid message: {e}"),
})?;
let return_immediately = request
.configuration
.as_ref()
.map(|c| c.return_immediately)
.unwrap_or(false);
if return_immediately && !state.runtime_config.supports_return_immediately {
tracing::warn!(
tenant = tenant,
owner = owner,
"rejecting SendMessageConfiguration.return_immediately=true: \
this runtime does not support post-return executor continuation \
(ADR-017 §Decision Bug 1, ADR-013 §4.4)"
);
return Err(A2aError::UnsupportedOperation {
message: "return_immediately=true is not supported on this runtime \
(post-return executor continuation is not guaranteed); \
resubmit with return_immediately=false for a blocking \
send"
.into(),
});
}
let history_length: Option<i32> = request
.configuration
.as_ref()
.and_then(|c| c.history_length);
let inline_push_config: Option<turul_a2a_proto::TaskPushNotificationConfig> = request
.configuration
.as_ref()
.and_then(|c| c.task_push_notification_config.clone());
if let Some(cfg) = inline_push_config.as_ref() {
if cfg.url.is_empty() {
return Err(A2aError::InvalidRequest {
message: "inline push config url is required".into(),
});
}
if let Err(e) = url::Url::parse(&cfg.url) {
return Err(A2aError::InvalidRequest {
message: format!("inline push config url is not a valid URL: {e}"),
});
}
}
let msg_task_id = message.as_proto().task_id.clone();
let (task_id, context_id, is_continuation) = if !msg_task_id.is_empty() {
let existing = state
.task_storage
.get_task(tenant, &msg_task_id, owner, None)
.await
.map_err(A2aError::from)?
.ok_or_else(|| A2aError::TaskNotFound {
task_id: msg_task_id.clone(),
})?;
let msg_context_id = &message.as_proto().context_id;
if !msg_context_id.is_empty() && msg_context_id != existing.context_id() {
return Err(A2aError::InvalidRequest {
message: format!(
"contextId mismatch: message has '{}' but task {} has '{}'",
msg_context_id,
msg_task_id,
existing.context_id()
),
});
}
if let Some(status) = existing.status() {
if let Ok(s) = status.state() {
match s {
TaskState::InputRequired | TaskState::AuthRequired => {}
_ => {
return Err(A2aError::InvalidRequest {
message: format!(
"Task {msg_task_id} is in state {s:?}, only INPUT_REQUIRED or AUTH_REQUIRED tasks accept follow-up messages"
),
});
}
}
}
}
(msg_task_id, existing.context_id().to_string(), true)
} else {
let context_id = if message.as_proto().context_id.is_empty() {
uuid::Uuid::now_v7().to_string()
} else {
message.as_proto().context_id.clone()
};
(uuid::Uuid::now_v7().to_string(), context_id, false)
};
let durable_queue = if return_immediately {
state.durable_executor_queue.clone()
} else {
None
};
let durable_job: Option<crate::durable_executor::QueuedExecutorJob> =
if let Some(queue) = durable_queue.as_ref() {
let now_micros = chrono::Utc::now().timestamp_micros();
let job = crate::durable_executor::QueuedExecutorJob {
version: crate::durable_executor::QueuedExecutorJob::VERSION,
tenant: tenant.to_string(),
owner: owner.to_string(),
task_id: task_id.clone(),
context_id: context_id.clone(),
message: message.as_proto().clone(),
claims: claims.clone(),
enqueued_at_micros: now_micros,
};
if let Err(e) = queue.check_payload_size(&job) {
return Err(A2aError::InvalidRequest {
message: format!("durable executor queue: {e}"),
});
}
Some(job)
} else {
None
};
if is_continuation {
state
.task_storage
.append_message(tenant, &task_id, owner, message.clone())
.await
.map_err(A2aError::from)?;
let working_event = StreamEvent::StatusUpdate {
status_update: crate::streaming::StatusUpdatePayload {
task_id: task_id.clone(),
context_id: context_id.clone(),
status: serde_json::to_value(TaskStatus::new(TaskState::Working))
.unwrap_or_default(),
},
};
state
.atomic_store
.update_task_status_with_events(
tenant,
&task_id,
owner,
TaskStatus::new(TaskState::Working),
vec![working_event],
)
.await
.map_err(A2aError::from)?;
state.event_broker.notify(&task_id).await;
} else {
let mut task =
Task::new(&task_id, TaskStatus::new(TaskState::Submitted)).with_context_id(&context_id);
task.append_message(message.clone());
let submitted_event = StreamEvent::StatusUpdate {
status_update: crate::streaming::StatusUpdatePayload {
task_id: task_id.clone(),
context_id: context_id.clone(),
status: serde_json::to_value(TaskStatus::new(TaskState::Submitted))
.unwrap_or_default(),
},
};
state
.atomic_store
.create_task_with_events(tenant, owner, task, vec![submitted_event])
.await
.map_err(A2aError::from)?;
state.event_broker.notify(&task_id).await;
let working_event = StreamEvent::StatusUpdate {
status_update: crate::streaming::StatusUpdatePayload {
task_id: task_id.clone(),
context_id: context_id.clone(),
status: serde_json::to_value(TaskStatus::new(TaskState::Working))
.unwrap_or_default(),
},
};
state
.atomic_store
.update_task_status_with_events(
tenant,
&task_id,
owner,
TaskStatus::new(TaskState::Working),
vec![working_event],
)
.await
.map_err(A2aError::from)?;
state.event_broker.notify(&task_id).await;
}
if let Some(mut cfg) = inline_push_config {
cfg.task_id = task_id.clone();
match state.push_storage.create_config(tenant, cfg).await {
Ok(_) => {}
Err(e) => {
let reason = format!("inline push notification config registration failed: {e}");
tracing::warn!(
tenant = tenant,
owner = owner,
task_id = %task_id,
"ADR-017 Bug 2 compensation: transitioning task to FAILED \
after inline push config registration failure"
);
let reason_msg = Message::new(
uuid::Uuid::now_v7().to_string(),
turul_a2a_types::Role::Agent,
vec![turul_a2a_types::Part::text(reason)],
);
let failed_status = TaskStatus::new(TaskState::Failed).with_message(reason_msg);
let failed_event = StreamEvent::StatusUpdate {
status_update: crate::streaming::StatusUpdatePayload {
task_id: task_id.clone(),
context_id: context_id.clone(),
status: serde_json::to_value(&failed_status).unwrap_or_default(),
},
};
if let Err(compensate_err) = state
.atomic_store
.update_task_status_with_events(
tenant,
&task_id,
owner,
failed_status,
vec![failed_event],
)
.await
{
tracing::error!(
tenant = tenant,
owner = owner,
task_id = %task_id,
error = %compensate_err,
"ADR-017 Bug 2 compensation failed: task may remain \
in WORKING without a callback until supervisor sweep"
);
}
state.event_broker.notify(&task_id).await;
return Err(A2aError::from(e));
}
}
}
if let (Some(queue), Some(job)) = (durable_queue.as_ref(), durable_job) {
match queue.enqueue(job).await {
Ok(()) => {
let current = state
.task_storage
.get_task(tenant, &task_id, owner, history_length)
.await
.map_err(A2aError::from)?
.ok_or_else(|| A2aError::TaskNotFound {
task_id: task_id.clone(),
})?;
return Ok(Json(serde_json::json!({
"task": serde_json::to_value(¤t).unwrap_or_default()
})));
}
Err(e) => {
let reason = format!("durable executor enqueue failed: {e}");
tracing::warn!(
tenant = tenant,
owner = owner,
task_id = %task_id,
queue_kind = queue.kind(),
"ADR-018 enqueue failed: transitioning task to FAILED"
);
let reason_msg = Message::new(
uuid::Uuid::now_v7().to_string(),
turul_a2a_types::Role::Agent,
vec![turul_a2a_types::Part::text(reason.clone())],
);
let failed_status = TaskStatus::new(TaskState::Failed).with_message(reason_msg);
let failed_event = StreamEvent::StatusUpdate {
status_update: crate::streaming::StatusUpdatePayload {
task_id: task_id.clone(),
context_id: context_id.clone(),
status: serde_json::to_value(&failed_status).unwrap_or_default(),
},
};
if let Err(compensate_err) = state
.atomic_store
.update_task_status_with_events(
tenant,
&task_id,
owner,
failed_status,
vec![failed_event],
)
.await
{
tracing::error!(
tenant = tenant,
owner = owner,
task_id = %task_id,
error = %compensate_err,
"ADR-018 FAILED-compensation itself failed: task may \
remain in WORKING until supervisor sweep"
);
}
state.event_broker.notify(&task_id).await;
return Err(A2aError::Internal(reason));
}
}
}
let spawn_deps = crate::server::spawn::SpawnDeps {
executor: state.executor.clone(),
task_storage: state.task_storage.clone(),
atomic_store: state.atomic_store.clone(),
event_broker: state.event_broker.clone(),
in_flight: state.in_flight.clone(),
push_dispatcher: state.push_dispatcher.clone(),
};
let scope = crate::server::spawn::SpawnScope {
tenant: tenant.to_string(),
owner: owner.to_string(),
task_id: task_id.clone(),
context_id: context_id.clone(),
message: message.clone(),
claims: claims.clone(),
};
let spawn = crate::server::spawn::spawn_tracked_executor(spawn_deps, scope)?;
if return_immediately {
drop(spawn.yielded_rx);
let current = state
.task_storage
.get_task(tenant, &task_id, owner, history_length)
.await
.map_err(A2aError::from)?
.ok_or_else(|| A2aError::TaskNotFound {
task_id: task_id.clone(),
})?;
return Ok(Json(serde_json::json!({
"task": serde_json::to_value(¤t).unwrap_or_default()
})));
}
let task = await_yielded_with_two_deadlines(
spawn.yielded_rx,
spawn.cancellation,
spawn.handle,
&state,
tenant,
owner,
&task_id,
&context_id,
)
.await?;
let task = if history_length.is_some() {
state
.task_storage
.get_task(tenant, &task_id, owner, history_length)
.await
.map_err(A2aError::from)?
.unwrap_or(task)
} else {
task
};
Ok(Json(serde_json::json!({
"task": serde_json::to_value(&task).unwrap_or_default()
})))
}
#[allow(clippy::too_many_arguments)] async fn await_yielded_with_two_deadlines(
mut yielded_rx: tokio::sync::oneshot::Receiver<Task>,
cancellation: tokio_util::sync::CancellationToken,
handle: Arc<crate::server::in_flight::InFlightHandle>,
state: &AppState,
tenant: &str,
owner: &str,
task_id: &str,
context_id: &str,
) -> Result<Task, A2aError> {
let soft = tokio::time::Instant::now() + state.runtime_config.blocking_task_timeout;
let hard = soft + state.runtime_config.timeout_abort_grace;
let soft_outcome = tokio::select! {
result = &mut yielded_rx => YieldedOutcome::Yielded(result.ok()),
_ = tokio::time::sleep_until(soft) => YieldedOutcome::SoftTimeout,
};
if let YieldedOutcome::Yielded(Some(task)) = soft_outcome {
return Ok(task);
}
if let YieldedOutcome::Yielded(None) = soft_outcome {
return Err(A2aError::Internal(
"executor exited without emitting a terminal or interrupted event".into(),
));
}
cancellation.cancel();
let cooperative_outcome = tokio::select! {
result = &mut yielded_rx => YieldedOutcome::Yielded(result.ok()),
_ = tokio::time::sleep_until(hard) => YieldedOutcome::HardTimeout,
};
if let YieldedOutcome::Yielded(Some(task)) = cooperative_outcome {
return Ok(task);
}
let reason_msg = Message::new(
uuid::Uuid::now_v7().to_string(),
turul_a2a_types::Role::Agent,
vec![turul_a2a_types::Part::text(
"task timed out: hard deadline exceeded without terminal emission",
)],
);
let failed_status = TaskStatus::new(TaskState::Failed).with_message(reason_msg);
let failed_event = StreamEvent::StatusUpdate {
status_update: crate::streaming::StatusUpdatePayload {
task_id: task_id.to_string(),
context_id: context_id.to_string(),
status: serde_json::to_value(&failed_status).unwrap_or_default(),
},
};
let failed_event_for_dispatch = failed_event.clone();
let result = state
.atomic_store
.update_task_status_with_events(tenant, task_id, owner, failed_status, vec![failed_event])
.await;
match result {
Ok((task, seqs)) => {
handle.abort();
if let Some(dispatcher) = &state.push_dispatcher {
let seq = seqs.first().copied().unwrap_or(0);
dispatcher.dispatch(
tenant.to_string(),
owner.to_string(),
task.clone(),
vec![(seq, failed_event_for_dispatch)],
);
}
state.event_broker.notify(task_id).await;
Ok(task)
}
Err(crate::storage::A2aStorageError::TerminalStateAlreadySet { .. }) => {
state.event_broker.notify(task_id).await;
let persisted = state
.task_storage
.get_task(tenant, task_id, owner, None)
.await
.map_err(A2aError::from)?
.ok_or_else(|| A2aError::TaskNotFound {
task_id: task_id.to_string(),
})?;
Ok(persisted)
}
Err(e) => Err(A2aError::from(e)),
}
}
#[allow(clippy::large_enum_variant)] enum YieldedOutcome {
Yielded(Option<Task>),
SoftTimeout,
HardTimeout,
}
pub(crate) async fn core_list_tasks(
state: AppState,
tenant: &str,
owner: &str,
query: &ListTasksQuery,
) -> Result<Json<serde_json::Value>, A2aError> {
let status = match &query.status {
Some(s) => Some(parse_task_state(s).ok_or_else(|| A2aError::InvalidRequest {
message: format!("Invalid status value: {s}"),
})?),
None => None,
};
let filter = TaskFilter {
tenant: Some(tenant.to_string()),
owner: Some(owner.to_string()),
context_id: query.context_id.clone(),
status,
page_size: query.page_size,
page_token: query.page_token.clone(),
history_length: query.history_length,
include_artifacts: query.include_artifacts,
..Default::default()
};
let page = state
.task_storage
.list_tasks(filter)
.await
.map_err(A2aError::from)?;
Ok(Json(list_page_to_json(&page)))
}
fn list_page_to_json(page: &TaskListPage) -> serde_json::Value {
let tasks: Vec<serde_json::Value> = page
.tasks
.iter()
.map(|t| serde_json::to_value(t).unwrap_or_default())
.collect();
serde_json::json!({
"tasks": tasks,
"nextPageToken": page.next_page_token,
"pageSize": page.page_size,
"totalSize": page.total_size,
})
}
pub(crate) async fn core_get_task(
state: AppState,
tenant: &str,
owner: &str,
task_id: &str,
history_length: Option<i32>,
) -> Result<Json<serde_json::Value>, A2aError> {
let task = state
.task_storage
.get_task(tenant, task_id, owner, history_length)
.await
.map_err(A2aError::from)?
.ok_or_else(|| A2aError::TaskNotFound {
task_id: task_id.to_string(),
})?;
Ok(Json(serde_json::to_value(&task).unwrap_or_default()))
}
#[doc(hidden)]
pub async fn core_cancel_task(
state: AppState,
tenant: &str,
owner: &str,
task_id: &str,
) -> Result<Json<serde_json::Value>, A2aError> {
let initial_task = state
.task_storage
.get_task(tenant, task_id, owner, Some(0))
.await
.map_err(A2aError::from)?
.ok_or_else(|| A2aError::TaskNotFound {
task_id: task_id.to_string(),
})?;
if let Some(status) = initial_task.status() {
if let Ok(s) = status.state() {
if turul_a2a_types::state_machine::is_terminal(s) {
return Err(A2aError::TaskNotCancelable {
task_id: task_id.to_string(),
});
}
}
}
let context_id = initial_task.context_id().to_string();
match state
.task_storage
.set_cancel_requested(tenant, task_id, owner)
.await
{
Ok(()) => {}
Err(A2aStorageError::TaskNotFound(_)) => {
return Err(A2aError::TaskNotFound {
task_id: task_id.to_string(),
});
}
Err(A2aStorageError::TerminalState(_))
| Err(A2aStorageError::InvalidTransition { .. })
| Err(A2aStorageError::TerminalStateAlreadySet { .. }) => {
return Err(A2aError::TaskNotCancelable {
task_id: task_id.to_string(),
});
}
Err(other) => return Err(A2aError::from(other)),
}
let in_flight_key = (tenant.to_string(), task_id.to_string());
if let Some(handle) = state.in_flight.get(&in_flight_key) {
handle.cancellation.cancel();
}
let deadline = tokio::time::Instant::now() + state.runtime_config.cancel_handler_grace;
let poll_interval = state.runtime_config.cancel_handler_poll_interval;
loop {
let now = tokio::time::Instant::now();
if now >= deadline {
break;
}
match state
.task_storage
.get_task(tenant, task_id, owner, Some(0))
.await
.map_err(A2aError::from)?
{
Some(current) => {
if let Some(status) = current.status() {
if let Ok(s) = status.state() {
if turul_a2a_types::state_machine::is_terminal(s) {
state.event_broker.notify(task_id).await;
return Ok(Json(serde_json::to_value(¤t).unwrap_or_default()));
}
}
}
}
None => {
return Err(A2aError::TaskNotFound {
task_id: task_id.to_string(),
});
}
}
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
break;
}
let sleep_for = std::cmp::min(poll_interval, remaining);
tokio::time::sleep(sleep_for).await;
}
let cancel_event = StreamEvent::StatusUpdate {
status_update: crate::streaming::StatusUpdatePayload {
task_id: task_id.to_string(),
context_id,
status: serde_json::to_value(TaskStatus::new(TaskState::Canceled)).unwrap_or_default(),
},
};
let cancel_event_for_dispatch = cancel_event.clone();
let result = state
.atomic_store
.update_task_status_with_events(
tenant,
task_id,
owner,
TaskStatus::new(TaskState::Canceled),
vec![cancel_event],
)
.await;
match result {
Ok((task, seqs)) => {
if let Some(dispatcher) = &state.push_dispatcher {
let seq = seqs.first().copied().unwrap_or(0);
dispatcher.dispatch(
tenant.to_string(),
owner.to_string(),
task.clone(),
vec![(seq, cancel_event_for_dispatch)],
);
}
state.event_broker.notify(task_id).await;
Ok(Json(serde_json::to_value(&task).unwrap_or_default()))
}
Err(A2aStorageError::TerminalStateAlreadySet { .. }) => {
let persisted = state
.task_storage
.get_task(tenant, task_id, owner, None)
.await
.map_err(A2aError::from)?
.ok_or_else(|| A2aError::TaskNotFound {
task_id: task_id.to_string(),
})?;
state.event_broker.notify(task_id).await;
Ok(Json(serde_json::to_value(&persisted).unwrap_or_default()))
}
Err(A2aStorageError::TaskNotFound(id)) => Err(A2aError::TaskNotFound { task_id: id }),
Err(A2aStorageError::TerminalState(_)) | Err(A2aStorageError::InvalidTransition { .. }) => {
Err(A2aError::TaskNotCancelable {
task_id: task_id.to_string(),
})
}
Err(other) => Err(A2aError::from(other)),
}
}
async fn verify_task_ownership(
state: &AppState,
tenant: &str,
owner: &str,
task_id: &str,
) -> Result<(), A2aError> {
state
.task_storage
.get_task(tenant, task_id, owner, Some(0))
.await
.map_err(A2aError::from)?
.ok_or_else(|| A2aError::TaskNotFound {
task_id: task_id.to_string(),
})?;
Ok(())
}
pub(crate) async fn core_create_push_config(
state: AppState,
tenant: &str,
owner: &str,
task_id: &str,
body: String,
) -> Result<Json<serde_json::Value>, A2aError> {
verify_task_ownership(&state, tenant, owner, task_id).await?;
let mut config: turul_a2a_proto::TaskPushNotificationConfig = serde_json::from_str(&body)
.map_err(|e| A2aError::InvalidRequest {
message: format!("Invalid push config: {e}"),
})?;
config.task_id = task_id.to_string();
if config.url.is_empty() {
return Err(A2aError::InvalidRequest {
message: "push config url is required".into(),
});
}
if let Err(e) = url::Url::parse(&config.url) {
return Err(A2aError::InvalidRequest {
message: format!("push config url is not a valid URL: {e}"),
});
}
let created = state
.push_storage
.create_config(tenant, config)
.await
.map_err(A2aError::from)?;
Ok(Json(serde_json::to_value(&created).unwrap_or_default()))
}
pub(crate) async fn core_list_push_configs(
state: AppState,
tenant: &str,
owner: &str,
task_id: &str,
query: &PushConfigQuery,
) -> Result<Json<serde_json::Value>, A2aError> {
verify_task_ownership(&state, tenant, owner, task_id).await?;
let page = state
.push_storage
.list_configs(
tenant,
task_id,
query.page_token.as_deref(),
query.page_size,
)
.await
.map_err(A2aError::from)?;
let configs: Vec<serde_json::Value> = page
.configs
.iter()
.map(|c| serde_json::to_value(c).unwrap_or_default())
.collect();
Ok(Json(serde_json::json!({
"configs": configs,
"nextPageToken": page.next_page_token,
})))
}
pub(crate) async fn core_get_push_config(
state: AppState,
tenant: &str,
owner: &str,
task_id: &str,
config_id: &str,
) -> Result<Json<serde_json::Value>, A2aError> {
verify_task_ownership(&state, tenant, owner, task_id).await?;
let config = state
.push_storage
.get_config(tenant, task_id, config_id)
.await
.map_err(A2aError::from)?
.ok_or_else(|| A2aError::TaskNotFound {
task_id: format!("push config {config_id} for task {task_id}"),
})?;
Ok(Json(serde_json::to_value(&config).unwrap_or_default()))
}
pub(crate) async fn core_delete_push_config(
state: AppState,
tenant: &str,
owner: &str,
task_id: &str,
config_id: &str,
) -> Result<Json<serde_json::Value>, A2aError> {
verify_task_ownership(&state, tenant, owner, task_id).await?;
state
.push_storage
.delete_config(tenant, task_id, config_id)
.await
.map_err(A2aError::from)?;
Ok(Json(serde_json::json!({})))
}
impl IntoResponse for A2aError {
fn into_response(self) -> axum::response::Response {
let status = axum::http::StatusCode::from_u16(self.http_status())
.unwrap_or(axum::http::StatusCode::INTERNAL_SERVER_ERROR);
let body = self.to_http_error_body();
(status, Json(body)).into_response()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_get_task() {
assert_eq!(
parse_task_path("abc-123"),
Some(TaskAction::GetTask("abc-123".into()))
);
assert_eq!(
parse_task_path("/abc-123"),
Some(TaskAction::GetTask("abc-123".into()))
);
}
#[test]
fn parse_cancel_task() {
assert_eq!(
parse_task_path("abc-123:cancel"),
Some(TaskAction::CancelTask("abc-123".into()))
);
}
#[test]
fn parse_subscribe_to_task() {
assert_eq!(
parse_task_path("abc-123:subscribe"),
Some(TaskAction::SubscribeToTask("abc-123".into()))
);
}
#[test]
fn parse_push_config_collection() {
assert_eq!(
parse_task_path("task-1/pushNotificationConfigs"),
Some(TaskAction::PushConfigCollection("task-1".into()))
);
}
#[test]
fn parse_push_config_item() {
assert_eq!(
parse_task_path("task-1/pushNotificationConfigs/cfg-1"),
Some(TaskAction::PushConfigItem("task-1".into(), "cfg-1".into()))
);
}
#[test]
fn parse_invalid_path() {
assert_eq!(parse_task_path("a/b/c/d"), None);
}
}