velocia 0.3.5

velocia – production-ready AI agent framework using ADK-Rust, A2A protocol, and AWS DynamoDB
pub mod app_client;
pub mod connector;

pub use app_client::A2AGatewayClient;
pub use connector::{RemoteAgentConnection, RemoteAgentManager};

use std::collections::HashMap;

use serde::Deserialize;
use serde_json::Value;
use uuid::Uuid;

use crate::a2a::types::{
    AgentCard, Artifact, JsonRpcRequest, Message, Part, Task, TaskArtifactUpdateEvent, TaskState,
    TaskStatus,
};
use crate::error::{AgentKitError, Result};

/// Send a `message/send` or `message/stream` JSON-RPC request depending on
/// whether the remote agent's card advertises streaming support, matching the
/// negotiation the reference Python `a2a-sdk` client performs.
pub(crate) async fn send_a2a_message(
    http: &reqwest::Client,
    card: &AgentCard,
    request: Message,
    propagation_headers: HashMap<String, String>,
) -> Result<Task> {
    let streaming = card.capabilities.streaming;
    let method = if streaming { "message/stream" } else { "message/send" };
    let fallback_context_id = request
        .context_id
        .clone()
        .unwrap_or_else(|| Uuid::new_v4().to_string());

    let rpc = JsonRpcRequest {
        jsonrpc: "2.0".to_string(),
        method: method.to_string(),
        id: Some(serde_json::json!(Uuid::new_v4().to_string())),
        params: serde_json::json!({ "message": request }),
    };

    let url = format!("{}/", card.url.trim_end_matches('/'));
    let mut req_builder = http.post(&url).json(&rpc);
    for (k, v) in &propagation_headers {
        req_builder = req_builder.header(k, v);
    }

    let body = req_builder
        .send()
        .await?
        .error_for_status()
        .map_err(|e| AgentKitError::A2aClient(e.to_string()))?
        .text()
        .await
        .map_err(|e| AgentKitError::A2aClient(e.to_string()))?;

    if streaming {
        Ok(parse_sse_task(&body, fallback_context_id))
    } else {
        parse_single_response(&body, fallback_context_id)
    }
}

/// Parse an A2A SSE response body into a `Task`, collecting artifact parts
/// from `artifact-update` events and state/IDs from `status-update` events.
pub(crate) fn parse_sse_task(body: &str, fallback_context_id: String) -> Task {
    let mut collected_parts: Vec<Part> = Vec::new();
    let mut task_id = Uuid::new_v4().to_string();
    let mut context_id = fallback_context_id;
    let mut state = TaskState::Completed;

    for line in body.lines() {
        let data = match line.strip_prefix("data: ") {
            Some(d) if !d.is_empty() => d,
            _ => continue,
        };

        let envelope: Value = match serde_json::from_str(data) {
            Ok(v) => v,
            Err(_) => continue,
        };

        let result = match envelope.get("result") {
            Some(r) => r,
            None => continue,
        };

        match result.get("kind").and_then(Value::as_str) {
            Some("task") => {
                if let Ok(wire) = serde_json::from_value::<WireTask>(result.clone()) {
                    task_id = wire.id;
                    context_id = wire.context_id;
                    state = wire.status.state;
                    collected_parts.extend(wire.artifacts.into_iter().flat_map(|a| a.parts));
                }
            }
            Some("artifact-update") => {
                if let Ok(ev) = serde_json::from_value::<TaskArtifactUpdateEvent>(result.clone()) {
                    collected_parts.extend(ev.artifact.parts);
                    task_id = ev.task_id;
                    context_id = ev.context_id;
                }
            }
            Some("status-update") => {
                if let Some(tid) = result.get("taskId").and_then(Value::as_str) {
                    task_id = tid.to_string();
                }
                if let Some(cid) = result.get("contextId").and_then(Value::as_str) {
                    context_id = cid.to_string();
                }
                if let Some(status) = result.get("status") {
                    if let Ok(s) = serde_json::from_value::<TaskStatus>(status.clone()) {
                        state = s.state;
                    }
                }
            }
            _ => {}
        }
    }

    let artifacts = if collected_parts.is_empty() {
        vec![]
    } else {
        vec![Artifact {
            artifact_id: Uuid::new_v4().to_string(),
            name: None,
            description: None,
            parts: collected_parts,
            metadata: None,
        }]
    };

    Task {
        id: task_id,
        context_id,
        state,
        artifacts,
        history: vec![],
        metadata: None,
    }
}

/// Wire-format `Task` matching the real A2A schema (`status: { state, ... }`),
/// used to decode non-streaming `message/send` responses before converting
/// into our flattened internal `Task` representation.
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct WireTask {
    id: String,
    context_id: String,
    status: TaskStatus,
    #[serde(default)]
    artifacts: Vec<Artifact>,
    #[serde(default)]
    history: Vec<Message>,
    #[serde(default)]
    metadata: Option<Value>,
}

/// Parse a non-streaming `message/send` JSON-RPC response — a single JSON
/// object whose `result` is either a `Task` or a `Message`, per the A2A spec.
fn parse_single_response(body: &str, fallback_context_id: String) -> Result<Task> {
    let envelope: Value = serde_json::from_str(body)
        .map_err(|e| AgentKitError::A2aClient(format!("invalid JSON-RPC response: {e}")))?;

    if let Some(err) = envelope.get("error") {
        return Err(AgentKitError::A2aClient(format!("agent returned a JSON-RPC error: {err}")));
    }

    let result = envelope.get("result").cloned().ok_or_else(|| {
        AgentKitError::A2aClient("JSON-RPC response is missing 'result'".to_string())
    })?;

    if result.get("kind").and_then(Value::as_str) == Some("message") {
        let message: Message = serde_json::from_value(result)
            .map_err(|e| AgentKitError::A2aClient(format!("invalid Message in JSON-RPC response: {e}")))?;

        let artifacts = if message.parts.is_empty() {
            vec![]
        } else {
            vec![Artifact {
                artifact_id: Uuid::new_v4().to_string(),
                name: None,
                description: None,
                parts: message.parts,
                metadata: None,
            }]
        };

        return Ok(Task {
            id: Uuid::new_v4().to_string(),
            context_id: message.context_id.unwrap_or(fallback_context_id),
            state: TaskState::Completed,
            artifacts,
            history: vec![],
            metadata: None,
        });
    }

    let wire: WireTask = serde_json::from_value(result)
        .map_err(|e| AgentKitError::A2aClient(format!("invalid Task in JSON-RPC response: {e}")))?;

    Ok(Task {
        id: wire.id,
        context_id: wire.context_id,
        state: wire.status.state,
        artifacts: wire.artifacts,
        history: wire.history,
        metadata: wire.metadata,
    })
}