use axum::{Json, extract::State, response::IntoResponse};
use serde::Deserialize;
use serde_json::{Value, json};
use tokio::sync::oneshot;
use tracing::info;
use uuid::Uuid;
use crate::{
a2a::{
errors as a2a_errors,
types::{
A2aArtifact, A2aMessage, A2aTask, A2aTaskStatus, AgentCapabilities, AgentCard,
AgentExtension, AgentInterface, AgentProvider, AgentSkill, JsonRpcRequest,
JsonRpcResponse, PushNotificationConfig, SendMessageParams, TaskState,
},
},
server::AppState,
};
use rsclaw_agent::{AgentMessage, AgentReply};
use rsclaw_config::schema::BindMode;
pub const PROTOCOL_VERSION: &str = "1.0";
pub const V1_METHODS: &[&str] = &[
"SendMessage",
"SendStreamingMessage",
"GetTask",
"ListTasks",
"CancelTask",
"SubscribeToTask",
"CreateTaskPushNotificationConfig",
"GetTaskPushNotificationConfig",
"ListTaskPushNotificationConfigs",
"DeleteTaskPushNotificationConfig",
"GetExtendedAgentCard",
];
pub fn known_method(name: &str) -> bool {
V1_METHODS.contains(&name)
}
pub async fn agent_card_handler(State(state): State<AppState>) -> impl IntoResponse {
Json(build_agent_card(&state, false))
}
pub fn build_agent_card(state: &AppState, _extended: bool) -> AgentCard {
let host = match state.config.gateway.bind {
BindMode::Loopback => "127.0.0.1",
BindMode::All | BindMode::Lan | BindMode::Auto | BindMode::Custom | BindMode::Tailnet => {
"0.0.0.0"
}
};
let port = state.config.gateway.port;
let base_url = format!("http://{host}:{port}/api/v1/a2a");
let skills: Vec<AgentSkill> = state
.agents
.all()
.into_iter()
.map(|h| AgentSkill {
id: h.id.clone(),
name: h.id.clone(),
description: None,
input_modes: vec!["text/plain".to_owned()],
output_modes: vec!["text/plain".to_owned()],
})
.collect();
AgentCard {
protocol_version: PROTOCOL_VERSION.to_owned(),
name: "rsclaw".to_owned(),
description: Some("AI Agent Engine Compatible with OpenClaw".to_owned()),
url: base_url.clone(),
provider: Some(AgentProvider {
organization: "rsclaw".to_owned(),
url: Some("https://github.com/oopos/rsclaw".to_owned()),
}),
version: env!("CARGO_PKG_VERSION").to_owned(),
capabilities: AgentCapabilities {
streaming: true,
push_notifications: true,
extended_agent_card: true,
},
security_schemes: Some(json!({
"bearer": { "type": "http", "scheme": "bearer" },
"apiKey": { "type": "apiKey", "in": "header", "name": "X-API-Key" }
})),
security: Some(vec![json!({ "bearer": [] }), json!({ "apiKey": [] })]),
default_input_modes: vec![
"text/plain".to_owned(),
"application/octet-stream".to_owned(),
],
default_output_modes: vec!["text/plain".to_owned()],
skills,
extensions: Vec::<AgentExtension>::new(),
signatures: vec![],
interfaces: vec![AgentInterface {
url: base_url,
transport: "JSONRPC".to_owned(),
}],
}
}
pub(crate) fn caller_owns(
store: &crate::a2a::store::TaskStore,
caller: &Option<crate::a2a::auth::A2aIdentity>,
task_id: &str,
) -> bool {
match caller {
None => true,
Some(c) if c.id == "gateway-auth" => true,
Some(c) => match store.get_owner(task_id) {
Ok(Some(owner)) => owner == c.id,
Ok(None) => true,
Err(_) => false,
},
}
}
pub async fn a2a_rpc_handler(
State(state): State<AppState>,
caller: Option<axum::Extension<crate::a2a::auth::A2aIdentity>>,
Json(req): Json<JsonRpcRequest>,
) -> impl IntoResponse {
a2a_rpc_handler_inner(state, caller.map(|e| e.0), req).await
}
pub async fn a2a_dispatch(
State(state): State<AppState>,
caller: Option<axum::Extension<crate::a2a::auth::A2aIdentity>>,
Json(req): Json<JsonRpcRequest>,
) -> axum::response::Response {
let caller = caller.map(|e| e.0);
match req.method.as_str() {
"SendStreamingMessage" | "SubscribeToTask" => {
crate::a2a::streaming::handle_streaming_rpc(state, caller, req)
.await
.into_response()
}
_ => a2a_rpc_handler_inner(state, caller, req)
.await
.into_response(),
}
}
pub async fn a2a_rpc_handler_inner(
state: AppState,
caller: Option<crate::a2a::auth::A2aIdentity>,
req: JsonRpcRequest,
) -> Json<JsonRpcResponse> {
let id = req.id.clone();
if let Some(response) =
crate::a2a::relay::try_forward_jsonrpc(&state, caller.as_ref(), &req).await
{
return Json(response);
}
match req.method.as_str() {
"SendMessage" => handle_send_message(state, caller, id, req.params).await,
"GetExtendedAgentCard" => Json(JsonRpcResponse::ok(
id,
serde_json::to_value(build_agent_card(&state, true)).unwrap_or(Value::Null),
)),
"SendStreamingMessage" | "SubscribeToTask" => Json(JsonRpcResponse::err(
id,
-32601,
"use Accept: text/event-stream for streaming methods",
)),
"GetTask" => handle_get_task(state, caller, id, req.params).await,
"ListTasks" => handle_list_tasks(state, caller, id, req.params).await,
"CancelTask" => handle_cancel_task(state, caller, id, req.params).await,
"CreateTaskPushNotificationConfig" => {
handle_create_push_config(state, caller, id, req.params).await
}
"GetTaskPushNotificationConfig" => {
handle_get_push_config(state, caller, id, req.params).await
}
"ListTaskPushNotificationConfigs" => {
handle_list_push_configs(state, caller, id, req.params).await
}
"DeleteTaskPushNotificationConfig" => {
handle_delete_push_config(state, caller, id, req.params).await
}
other => Json(JsonRpcResponse::err(
id,
-32601,
format!("method not found: {other}"),
)),
}
}
async fn handle_send_message(
state: AppState,
caller: Option<crate::a2a::auth::A2aIdentity>,
id: Value,
params: Value,
) -> Json<JsonRpcResponse> {
let params: SendMessageParams = match serde_json::from_value(params) {
Ok(p) => p,
Err(e) => {
return Json(JsonRpcResponse::err(
id,
-32602,
format!("invalid params: {e}"),
));
}
};
let agent_id = params
.metadata
.as_ref()
.and_then(|m| m.get("agentId").and_then(|v| v.as_str()).map(str::to_owned));
let workspace = resolve_agent_workspace(&state, agent_id.as_deref()).await;
let ingested = crate::a2a::files::ingest_message_parts(&workspace, ¶ms.message.parts).await;
let text = ingested.text;
if text.is_empty() {
return Json(JsonRpcResponse::err(id, -32602, "no text part in message"));
}
let handle = if let Some(ref aid) = agent_id {
match state.agents.get(aid) {
Ok(h) => h,
Err(_) => {
return Json(JsonRpcResponse::err(
id,
-32001,
format!("agent not found: {aid}"),
));
}
}
} else {
match state.agents.default_agent() {
Ok(h) => h,
Err(e) => {
return Json(JsonRpcResponse::err(
id,
-32001,
format!("no default agent: {e}"),
));
}
}
};
let session_key = params
.message
.context_id
.clone()
.unwrap_or_else(|| format!("a2a:{}", Uuid::new_v4()));
let task_id = params
.message
.task_id
.clone()
.unwrap_or_else(|| Uuid::new_v4().to_string());
if let Some((_, suspended)) = state.suspended_tasks.remove(&task_id) {
let mut bus_rx = state.task_event_bus.subscribe(&task_id);
let _ = suspended.resume_tx.send(text);
let timeout_secs: u64 = std::env::var("RSCLAW_A2A_RESUME_TIMEOUT_SECS")
.ok()
.and_then(|v| v.parse().ok())
.filter(|&n: &u64| n > 0)
.unwrap_or(600);
let wait = async {
loop {
match bus_rx.recv().await {
Ok(rsclaw_a2a_types::event::AgentEvent::Status {
state: s,
final_: true,
..
}) => {
return Some(s);
}
Ok(rsclaw_a2a_types::event::AgentEvent::InputRequired { .. }) => {
return Some(TaskState::InputRequired);
}
Ok(rsclaw_a2a_types::event::AgentEvent::AuthRequired { .. }) => {
return Some(TaskState::AuthRequired);
}
Ok(_) => continue, Err(_) => return None, }
}
};
let _final_state =
tokio::time::timeout(std::time::Duration::from_secs(timeout_secs), wait).await;
let task = state
.task_store
.get(&task_id)
.ok()
.flatten()
.unwrap_or_else(|| A2aTask {
id: task_id.clone(),
context_id: Some(session_key.clone()),
status: A2aTaskStatus {
state: TaskState::Working,
message: None,
timestamp: Some(chrono::Utc::now().to_rfc3339()),
},
history: vec![],
artifacts: vec![],
metadata: None,
});
return Json(JsonRpcResponse::ok(
id,
serde_json::to_value(task).unwrap_or(Value::Null),
));
}
let initial_history = A2aMessage {
message_id: params.message.message_id.clone(),
role: params.message.role.clone(),
parts: params.message.parts.clone(),
context_id: Some(session_key.clone()),
task_id: Some(task_id.clone()),
metadata: params.message.metadata.clone(),
};
let initial_task = A2aTask {
id: task_id.clone(),
context_id: Some(session_key.clone()),
status: A2aTaskStatus {
state: TaskState::Submitted,
message: None,
timestamp: Some(chrono::Utc::now().to_rfc3339()),
},
history: vec![initial_history],
artifacts: vec![],
metadata: None,
};
if let Err(e) = state.task_store.put(&initial_task) {
info!(err = %e, "failed to persist initial task");
}
if let Some(c) = caller.as_ref() {
let _ = state.task_store.put_owner(&task_id, &c.id);
}
let cancel_token = tokio_util::sync::CancellationToken::new();
state
.task_cancels
.insert(task_id.clone(), cancel_token.clone());
state.push_dispatcher.clone().watch(task_id.clone());
state
.task_event_bus
.publish(rsclaw_a2a_types::event::AgentEvent::Status {
task_id: task_id.clone(),
context_id: session_key.clone(),
state: TaskState::Submitted,
message: None,
final_: false,
});
state
.task_event_bus
.publish(rsclaw_a2a_types::event::AgentEvent::Status {
task_id: task_id.clone(),
context_id: session_key.clone(),
state: TaskState::Working,
message: None,
final_: false,
});
let (ireq_tx, ireq_rx) = tokio::sync::mpsc::channel::<tokio::sync::oneshot::Sender<String>>(4);
spawn_input_request_listener(state.clone(), task_id.clone(), session_key.clone(), ireq_rx);
let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<rsclaw_a2a_types::event::AgentEvent>(64);
{
let bus = state.task_event_bus.clone();
tokio::spawn(async move {
while let Some(ev) = event_rx.recv().await {
bus.publish(ev);
}
});
}
let (reply_tx, reply_rx) = oneshot::channel::<AgentReply>();
let msg = AgentMessage {
session_key: session_key.clone(),
text,
channel: "a2a".to_owned(),
peer_id: caller
.as_ref()
.map(|c| c.id.clone())
.unwrap_or_else(|| "a2a-client".to_owned()),
chat_id: String::new(),
reply_tx,
task_id: Some(task_id.clone()),
context_id: Some(session_key.clone()),
event_tx: Some(event_tx),
cancel_token: Some(cancel_token),
input_request_tx: Some(ireq_tx),
extra_tools: vec![],
images: vec![],
files: vec![],
account: None,
};
if handle.tx.send(msg).await.is_err() {
finalize_failed_task(&state, &task_id, &session_key);
return Json(JsonRpcResponse::err(id, -32603, "agent inbox closed"));
}
let timeout_secs = state.config.agents.defaults.timeout_seconds.unwrap_or(600) as u64;
let reply =
match tokio::time::timeout(std::time::Duration::from_secs(timeout_secs), reply_rx).await {
Ok(Ok(r)) => r,
Ok(Err(_)) => {
finalize_failed_task(&state, &task_id, &session_key);
return Json(JsonRpcResponse::err(id, -32603, "reply channel dropped"));
}
Err(_) => {
finalize_failed_task(&state, &task_id, &session_key);
return Json(JsonRpcResponse::err(
id,
-32000,
format!("agent timed out after {timeout_secs}s"),
));
}
};
let history_msg = A2aMessage {
message_id: params.message.message_id.clone(),
role: params.message.role.clone(),
parts: params.message.parts.clone(),
context_id: Some(session_key.clone()),
task_id: Some(task_id.clone()),
metadata: params.message.metadata.clone(),
};
let pending_outcome = crate::gateway::task_queue::drain_pending_outcome(&session_key);
match reply.outcome {
rsclaw_agent::registry::ReplyOutcome::Ok => {
let artifact_parts =
crate::a2a::files::emit_reply_parts(&reply.text, &reply.images, &reply.files);
let artifact = A2aArtifact {
artifact_id: Uuid::new_v4().to_string(),
parts: artifact_parts,
name: None,
description: None,
metadata: None,
};
let _ = state.task_store.append_artifact(&task_id, artifact.clone());
if let Some(ref outcome) = pending_outcome {
let _ = state.task_store.attach_outcome_metadata(&task_id, outcome);
}
let _ = state.task_store.set_status(&task_id, TaskState::Completed);
let _ = state.task_store.delete_push_configs_for_task(&task_id);
state.task_cancels.remove(&task_id);
state
.task_event_bus
.publish(rsclaw_a2a_types::event::AgentEvent::Artifact {
task_id: task_id.clone(),
context_id: session_key.clone(),
artifact_id: artifact.artifact_id.clone(),
parts: artifact.parts.clone(),
append: false,
last_chunk: true,
});
state
.task_event_bus
.publish(rsclaw_a2a_types::event::AgentEvent::Status {
task_id: task_id.clone(),
context_id: session_key.clone(),
state: TaskState::Completed,
message: None,
final_: true,
});
state.task_event_bus.close(&task_id);
let result = json!({
"id": task_id,
"contextId": session_key,
"status": { "state": "TASK_STATE_COMPLETED" },
"artifacts": [artifact],
"history": [history_msg],
});
info!(task_id, agent = %handle.id, "A2A SendMessage completed");
Json(JsonRpcResponse::ok(id, result))
}
rsclaw_agent::registry::ReplyOutcome::Error => {
if let Some(ref outcome) = pending_outcome {
let _ = state.task_store.attach_outcome_metadata(&task_id, outcome);
}
let _ = state.task_store.set_status(&task_id, TaskState::Failed);
let _ = state.task_store.delete_push_configs_for_task(&task_id);
state.task_cancels.remove(&task_id);
state
.task_event_bus
.publish(rsclaw_a2a_types::event::AgentEvent::Status {
task_id: task_id.clone(),
context_id: session_key.clone(),
state: TaskState::Failed,
message: Some(rsclaw_a2a_types::event::text_message(&reply.text)),
final_: true,
});
state.task_event_bus.close(&task_id);
let result = json!({
"id": task_id,
"contextId": session_key,
"status": {
"state": "TASK_STATE_FAILED",
"message": rsclaw_a2a_types::event::text_message(&reply.text),
},
"history": [history_msg],
});
info!(task_id, agent = %handle.id, err = %reply.text, "A2A SendMessage failed");
Json(JsonRpcResponse::ok(id, result))
}
rsclaw_agent::registry::ReplyOutcome::Canceled => {
state.task_cancels.remove(&task_id);
let task_snapshot = state.task_store.get(&task_id).ok().flatten();
let result = serde_json::to_value(task_snapshot).unwrap_or_else(|_| {
json!({
"id": task_id,
"contextId": session_key,
"status": { "state": "TASK_STATE_CANCELED" },
"history": [history_msg],
})
});
info!(task_id, agent = %handle.id, "A2A SendMessage canceled");
Json(JsonRpcResponse::ok(id, result))
}
}
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct GetTaskParams {
id: String,
#[serde(default)]
history_length: Option<usize>,
}
async fn handle_get_task(
state: AppState,
caller: Option<crate::a2a::auth::A2aIdentity>,
id: Value,
params: Value,
) -> Json<JsonRpcResponse> {
let params: GetTaskParams = match serde_json::from_value(params) {
Ok(p) => p,
Err(e) => {
return Json(JsonRpcResponse::err_struct(
id,
a2a_errors::invalid_argument(format!("invalid params: {e}"), "params"),
));
}
};
match state.task_store.get(¶ms.id) {
Ok(Some(mut task)) if caller_owns(&state.task_store, &caller, ¶ms.id) => {
if let Some(n) = params.history_length
&& task.history.len() > n
{
let skip = task.history.len() - n;
task.history = task.history.split_off(skip);
}
Json(JsonRpcResponse::ok(
id,
serde_json::to_value(task).unwrap_or(Value::Null),
))
}
Ok(_) => Json(JsonRpcResponse::err_struct(
id,
a2a_errors::not_found(format!("tasks/{}", params.id)),
)),
Err(e) => Json(JsonRpcResponse::err_struct(
id,
a2a_errors::internal(format!("store error: {e}")),
)),
}
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct ListTasksParams {
#[serde(default)]
page_size: Option<usize>,
#[serde(default)]
page_token: Option<String>,
}
async fn handle_list_tasks(
state: AppState,
caller: Option<crate::a2a::auth::A2aIdentity>,
id: Value,
params: Value,
) -> Json<JsonRpcResponse> {
let params: ListTasksParams = serde_json::from_value(params).unwrap_or(ListTasksParams {
page_size: None,
page_token: None,
});
let offset: usize = params
.page_token
.as_deref()
.and_then(|s| s.parse().ok())
.unwrap_or(0);
let limit = params.page_size.unwrap_or(50).min(500);
match state.task_store.list(offset, limit) {
Ok(tasks) => {
let next_token = if tasks.len() == limit {
Some((offset + limit).to_string())
} else {
None
};
let tasks: Vec<_> = tasks
.into_iter()
.filter(|t| caller_owns(&state.task_store, &caller, &t.id))
.collect();
Json(JsonRpcResponse::ok(
id,
json!({ "tasks": tasks, "nextPageToken": next_token }),
))
}
Err(e) => Json(JsonRpcResponse::err_struct(
id,
a2a_errors::internal(format!("store error: {e}")),
)),
}
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct CancelTaskParams {
id: String,
}
async fn handle_cancel_task(
state: AppState,
caller: Option<crate::a2a::auth::A2aIdentity>,
id: Value,
params: Value,
) -> Json<JsonRpcResponse> {
let params: CancelTaskParams = match serde_json::from_value(params) {
Ok(p) => p,
Err(e) => {
return Json(JsonRpcResponse::err_struct(
id,
a2a_errors::invalid_argument(format!("invalid params: {e}"), "params"),
));
}
};
if !caller_owns(&state.task_store, &caller, ¶ms.id) {
return Json(JsonRpcResponse::err_struct(
id,
a2a_errors::not_found(format!("tasks/{}", params.id)),
));
}
match state.task_cancels.remove(¶ms.id) {
Some((_, token)) => {
token.cancel();
let _ = state.task_store.set_status(¶ms.id, TaskState::Canceled);
let _ = state.task_store.delete_push_configs_for_task(¶ms.id);
let ctx = state
.task_store
.get(¶ms.id)
.ok()
.flatten()
.and_then(|t| t.context_id)
.unwrap_or_default();
state
.task_event_bus
.publish(rsclaw_a2a_types::event::AgentEvent::Status {
task_id: params.id.clone(),
context_id: ctx,
state: TaskState::Canceled,
message: None,
final_: true,
});
state.task_event_bus.close(¶ms.id);
match state.task_store.get(¶ms.id) {
Ok(Some(task)) => Json(JsonRpcResponse::ok(
id,
serde_json::to_value(task).unwrap_or(Value::Null),
)),
_ => Json(JsonRpcResponse::ok(
id,
json!({ "id": params.id, "status": { "state": "TASK_STATE_CANCELED" } }),
)),
}
}
None => match state.task_store.get(¶ms.id) {
Ok(Some(t)) if t.status.state.is_terminal() => Json(JsonRpcResponse::err_struct(
id,
a2a_errors::precondition_failed(format!(
"task already terminal: {:?}",
t.status.state
)),
)),
Ok(Some(_)) => Json(JsonRpcResponse::err_struct(
id,
a2a_errors::precondition_failed(
"task running but no cancel token (gateway restart?)".to_owned(),
),
)),
Ok(None) => Json(JsonRpcResponse::err_struct(
id,
a2a_errors::not_found(format!("tasks/{}", params.id)),
)),
Err(e) => Json(JsonRpcResponse::err_struct(
id,
a2a_errors::internal(format!("store error: {e}")),
)),
},
}
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct CreatePushConfigParams {
task_id: String,
push_notification_config: PushNotificationConfig,
}
async fn handle_create_push_config(
state: AppState,
caller: Option<crate::a2a::auth::A2aIdentity>,
id: Value,
params: Value,
) -> Json<JsonRpcResponse> {
let mut params: CreatePushConfigParams = match serde_json::from_value(params) {
Ok(p) => p,
Err(e) => {
return Json(JsonRpcResponse::err_struct(
id,
a2a_errors::invalid_argument(format!("invalid params: {e}"), "params"),
));
}
};
if !caller_owns(&state.task_store, &caller, ¶ms.task_id) {
return Json(JsonRpcResponse::err_struct(
id,
a2a_errors::not_found(format!("tasks/{}", params.task_id)),
));
}
params.push_notification_config.task_id = params.task_id.clone();
if params.push_notification_config.id.is_empty() {
params.push_notification_config.id = Uuid::new_v4().to_string();
}
match state
.task_store
.put_push_config(¶ms.push_notification_config)
{
Ok(_) => Json(JsonRpcResponse::ok(
id,
serde_json::to_value(¶ms.push_notification_config).unwrap_or(Value::Null),
)),
Err(e) => Json(JsonRpcResponse::err_struct(
id,
a2a_errors::internal(format!("store error: {e}")),
)),
}
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct GetPushConfigParams {
task_id: String,
push_notification_config_id: String,
}
async fn handle_get_push_config(
state: AppState,
caller: Option<crate::a2a::auth::A2aIdentity>,
id: Value,
params: Value,
) -> Json<JsonRpcResponse> {
let params: GetPushConfigParams = match serde_json::from_value(params) {
Ok(p) => p,
Err(e) => {
return Json(JsonRpcResponse::err_struct(
id,
a2a_errors::invalid_argument(format!("invalid params: {e}"), "params"),
));
}
};
if !caller_owns(&state.task_store, &caller, ¶ms.task_id) {
return Json(JsonRpcResponse::err_struct(
id,
a2a_errors::not_found(format!(
"tasks/{}/pushNotificationConfigs/{}",
params.task_id, params.push_notification_config_id
)),
));
}
match state
.task_store
.get_push_config(¶ms.task_id, ¶ms.push_notification_config_id)
{
Ok(Some(c)) => Json(JsonRpcResponse::ok(
id,
serde_json::to_value(c).unwrap_or(Value::Null),
)),
Ok(None) => Json(JsonRpcResponse::err_struct(
id,
a2a_errors::not_found(format!(
"tasks/{}/pushNotificationConfigs/{}",
params.task_id, params.push_notification_config_id
)),
)),
Err(e) => Json(JsonRpcResponse::err_struct(
id,
a2a_errors::internal(format!("store error: {e}")),
)),
}
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct ListPushConfigsParams {
task_id: String,
}
async fn handle_list_push_configs(
state: AppState,
caller: Option<crate::a2a::auth::A2aIdentity>,
id: Value,
params: Value,
) -> Json<JsonRpcResponse> {
let params: ListPushConfigsParams = match serde_json::from_value(params) {
Ok(p) => p,
Err(e) => {
return Json(JsonRpcResponse::err_struct(
id,
a2a_errors::invalid_argument(format!("invalid params: {e}"), "params"),
));
}
};
if !caller_owns(&state.task_store, &caller, ¶ms.task_id) {
return Json(JsonRpcResponse::err_struct(
id,
a2a_errors::not_found(format!("tasks/{}", params.task_id)),
));
}
match state.task_store.list_push_configs(¶ms.task_id) {
Ok(configs) => Json(JsonRpcResponse::ok(id, json!({ "configs": configs }))),
Err(e) => Json(JsonRpcResponse::err_struct(
id,
a2a_errors::internal(format!("store error: {e}")),
)),
}
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct DeletePushConfigParams {
task_id: String,
push_notification_config_id: String,
}
async fn handle_delete_push_config(
state: AppState,
caller: Option<crate::a2a::auth::A2aIdentity>,
id: Value,
params: Value,
) -> Json<JsonRpcResponse> {
let params: DeletePushConfigParams = match serde_json::from_value(params) {
Ok(p) => p,
Err(e) => {
return Json(JsonRpcResponse::err_struct(
id,
a2a_errors::invalid_argument(format!("invalid params: {e}"), "params"),
));
}
};
if !caller_owns(&state.task_store, &caller, ¶ms.task_id) {
return Json(JsonRpcResponse::err_struct(
id,
a2a_errors::not_found(format!(
"tasks/{}/pushNotificationConfigs/{}",
params.task_id, params.push_notification_config_id
)),
));
}
match state
.task_store
.delete_push_config(¶ms.task_id, ¶ms.push_notification_config_id)
{
Ok(true) => Json(JsonRpcResponse::ok(id, json!({ "deleted": true }))),
Ok(false) => Json(JsonRpcResponse::err_struct(
id,
a2a_errors::not_found(format!(
"tasks/{}/pushNotificationConfigs/{}",
params.task_id, params.push_notification_config_id
)),
)),
Err(e) => Json(JsonRpcResponse::err_struct(
id,
a2a_errors::internal(format!("store error: {e}")),
)),
}
}
pub(crate) async fn resolve_agent_workspace_pub(
state: &AppState,
agent_id: Option<&str>,
) -> std::path::PathBuf {
resolve_agent_workspace(state, agent_id).await
}
pub(crate) fn spawn_input_request_listener(
state: AppState,
task_id: String,
context_id: String,
mut ireq_rx: tokio::sync::mpsc::Receiver<tokio::sync::oneshot::Sender<String>>,
) {
let timeout_secs: u64 = std::env::var("RSCLAW_A2A_WAIT_INPUT_TIMEOUT_SECS")
.ok()
.and_then(|v| v.parse().ok())
.filter(|&n: &u64| n > 0)
.unwrap_or(1800);
tokio::spawn(async move {
while let Some(resume_tx) = ireq_rx.recv().await {
state.suspended_tasks.insert(
task_id.clone(),
rsclaw_a2a_types::event::SuspendedTask {
task_id: task_id.clone(),
context_id: context_id.clone(),
resume_tx,
},
);
let state_t = state.clone();
let task_id_t = task_id.clone();
let context_id_t = context_id.clone();
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(timeout_secs)).await;
if state_t.suspended_tasks.remove(&task_id_t).is_some() {
let _ = state_t.task_store.set_status(&task_id_t, TaskState::Failed);
let _ = state_t.task_store.delete_push_configs_for_task(&task_id_t);
state_t.task_cancels.remove(&task_id_t);
state_t.task_event_bus.publish(
rsclaw_a2a_types::event::AgentEvent::Status {
task_id: task_id_t.clone(),
context_id: context_id_t,
state: TaskState::Failed,
message: Some(rsclaw_a2a_types::event::text_message(&format!(
"wait_input timed out after {timeout_secs}s without a resume SendMessage on the same taskId"
))),
final_: true,
},
);
state_t.task_event_bus.close(&task_id_t);
}
});
}
});
}
async fn resolve_agent_workspace(state: &AppState, agent_id: Option<&str>) -> std::path::PathBuf {
let per_agent = if let Some(aid) = agent_id {
state
.agents
.get(aid)
.ok()
.and_then(|h| h.config.workspace.clone())
} else {
state
.agents
.default_agent()
.ok()
.and_then(|h| h.config.workspace.clone())
};
let default = state.live.agents.read().await.defaults.workspace.clone();
per_agent
.or(default)
.map(|p| rsclaw_config::loader::expand_tilde_path_pub(&p))
.unwrap_or_else(|| rsclaw_config::loader::base_dir().join("workspace"))
}
fn finalize_failed_task(state: &AppState, task_id: &str, context_id: &str) {
let _ = state.task_store.set_status(task_id, TaskState::Failed);
let _ = state.task_store.delete_push_configs_for_task(task_id);
state.task_cancels.remove(task_id);
state
.task_event_bus
.publish(rsclaw_a2a_types::event::AgentEvent::Status {
task_id: task_id.to_owned(),
context_id: context_id.to_owned(),
state: TaskState::Failed,
message: None,
final_: true,
});
state.task_event_bus.close(task_id);
}
#[cfg(test)]
mod tests {
use super::*;
use crate::a2a::{auth::A2aIdentity, store::TaskStore};
fn ident(id: &str) -> Option<A2aIdentity> {
Some(A2aIdentity {
id: id.to_owned(),
scopes: vec![],
})
}
#[test]
fn caller_owns_enforces_section_7_5_access() {
let tmp = tempfile::tempdir().unwrap();
let store = TaskStore::open(&tmp.path().join("tasks.redb")).unwrap();
store.put_owner("task-1", "alice").unwrap();
assert!(caller_owns(&store, &ident("alice"), "task-1"));
assert!(!caller_owns(&store, &ident("bob"), "task-1"));
assert!(caller_owns(&store, &ident("gateway-auth"), "task-1"));
assert!(caller_owns(&store, &None::<A2aIdentity>, "task-1"));
assert!(caller_owns(&store, &ident("bob"), "no-such-task"));
}
}