pub mod app_client;
pub mod connector;
pub use app_client::A2AGatewayClient;
pub use connector::{RemoteAgentConnection, RemoteAgentManager};
use std::collections::HashMap;
use serde::Deserialize;
use serde_json::Value;
use uuid::Uuid;
use crate::a2a::types::{
AgentCard, Artifact, JsonRpcRequest, Message, Part, Task, TaskArtifactUpdateEvent, TaskState,
TaskStatus,
};
use crate::error::{AgentKitError, Result};
pub(crate) async fn send_a2a_message(
http: &reqwest::Client,
card: &AgentCard,
request: Message,
propagation_headers: HashMap<String, String>,
) -> Result<Task> {
let streaming = card.capabilities.streaming;
let method = if streaming { "message/stream" } else { "message/send" };
let fallback_context_id = request
.context_id
.clone()
.unwrap_or_else(|| Uuid::new_v4().to_string());
let rpc = JsonRpcRequest {
jsonrpc: "2.0".to_string(),
method: method.to_string(),
id: Some(serde_json::json!(Uuid::new_v4().to_string())),
params: serde_json::json!({ "message": request }),
};
let url = format!("{}/", card.url.trim_end_matches('/'));
let mut req_builder = http.post(&url).json(&rpc);
for (k, v) in &propagation_headers {
req_builder = req_builder.header(k, v);
}
let body = req_builder
.send()
.await?
.error_for_status()
.map_err(|e| AgentKitError::A2aClient(e.to_string()))?
.text()
.await
.map_err(|e| AgentKitError::A2aClient(e.to_string()))?;
if streaming {
Ok(parse_sse_task(&body, fallback_context_id))
} else {
parse_single_response(&body, fallback_context_id)
}
}
pub(crate) fn parse_sse_task(body: &str, fallback_context_id: String) -> Task {
let mut collected_parts: Vec<Part> = Vec::new();
let mut task_id = Uuid::new_v4().to_string();
let mut context_id = fallback_context_id;
let mut state = TaskState::Completed;
for line in body.lines() {
let data = match line.strip_prefix("data: ") {
Some(d) if !d.is_empty() => d,
_ => continue,
};
let envelope: Value = match serde_json::from_str(data) {
Ok(v) => v,
Err(_) => continue,
};
let result = match envelope.get("result") {
Some(r) => r,
None => continue,
};
match result.get("kind").and_then(Value::as_str) {
Some("task") => {
if let Ok(wire) = serde_json::from_value::<WireTask>(result.clone()) {
task_id = wire.id;
context_id = wire.context_id;
state = wire.status.state;
collected_parts.extend(wire.artifacts.into_iter().flat_map(|a| a.parts));
}
}
Some("artifact-update") => {
if let Ok(ev) = serde_json::from_value::<TaskArtifactUpdateEvent>(result.clone()) {
collected_parts.extend(ev.artifact.parts);
task_id = ev.task_id;
context_id = ev.context_id;
}
}
Some("status-update") => {
if let Some(tid) = result.get("taskId").and_then(Value::as_str) {
task_id = tid.to_string();
}
if let Some(cid) = result.get("contextId").and_then(Value::as_str) {
context_id = cid.to_string();
}
if let Some(status) = result.get("status") {
if let Ok(s) = serde_json::from_value::<TaskStatus>(status.clone()) {
state = s.state;
}
}
}
_ => {}
}
}
let artifacts = if collected_parts.is_empty() {
vec![]
} else {
vec![Artifact {
artifact_id: Uuid::new_v4().to_string(),
name: None,
description: None,
parts: collected_parts,
metadata: None,
}]
};
Task {
id: task_id,
context_id,
state,
artifacts,
history: vec![],
metadata: None,
}
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct WireTask {
id: String,
context_id: String,
status: TaskStatus,
#[serde(default)]
artifacts: Vec<Artifact>,
#[serde(default)]
history: Vec<Message>,
#[serde(default)]
metadata: Option<Value>,
}
fn parse_single_response(body: &str, fallback_context_id: String) -> Result<Task> {
let envelope: Value = serde_json::from_str(body)
.map_err(|e| AgentKitError::A2aClient(format!("invalid JSON-RPC response: {e}")))?;
if let Some(err) = envelope.get("error") {
return Err(AgentKitError::A2aClient(format!("agent returned a JSON-RPC error: {err}")));
}
let result = envelope.get("result").cloned().ok_or_else(|| {
AgentKitError::A2aClient("JSON-RPC response is missing 'result'".to_string())
})?;
if result.get("kind").and_then(Value::as_str) == Some("message") {
let message: Message = serde_json::from_value(result)
.map_err(|e| AgentKitError::A2aClient(format!("invalid Message in JSON-RPC response: {e}")))?;
let artifacts = if message.parts.is_empty() {
vec![]
} else {
vec![Artifact {
artifact_id: Uuid::new_v4().to_string(),
name: None,
description: None,
parts: message.parts,
metadata: None,
}]
};
return Ok(Task {
id: Uuid::new_v4().to_string(),
context_id: message.context_id.unwrap_or(fallback_context_id),
state: TaskState::Completed,
artifacts,
history: vec![],
metadata: None,
});
}
let wire: WireTask = serde_json::from_value(result)
.map_err(|e| AgentKitError::A2aClient(format!("invalid Task in JSON-RPC response: {e}")))?;
Ok(Task {
id: wire.id,
context_id: wire.context_id,
state: wire.status.state,
artifacts: wire.artifacts,
history: wire.history,
metadata: wire.metadata,
})
}