car-a2a 0.12.0

Bridge between Common Agent Runtime and the Linux Foundation Agent2Agent (A2A) v1.0 protocol
Documentation
//! SSE-friendly task event bus.
//!
//! A2A v1.0 streaming methods (`message/stream`, `tasks/resubscribe`)
//! emit a sequence of three event kinds:
//!
//! 1. The initial `Task` snapshot.
//! 2. Zero or more `TaskStatusUpdateEvent`s as the task progresses.
//! 3. Zero or more `TaskArtifactUpdateEvent`s as artifacts accumulate.
//!
//! The bus provides per-task `tokio::sync::broadcast` channels so the
//! HTTP layer can attach SSE writers without blocking the executor.
//! Senders are created lazily and dropped after the final event so
//! subscribers see a clean `RecvError::Closed` rather than wedging.

use crate::types::{Artifact, Task, TaskStatus};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{broadcast, RwLock};

/// Buffer depth for each per-task channel. Sized for short bursts of
/// updates (status flip → artifacts arrive → terminal status). Slow
/// subscribers that fall behind get `RecvError::Lagged`; the HTTP
/// layer treats that as a recoverable signal and resyncs by fetching
/// the current task via `tasks/get`.
const CHANNEL_CAPACITY: usize = 64;

/// One frame on the stream — what the HTTP layer turns into an SSE
/// `data:` event.
///
/// `Task` is the *initial* snapshot frame the bridge emits to a fresh
/// SSE subscriber so peers don't have to round-trip `tasks/get` to
/// catch up. It never carries `final = true`; terminal-ness is the
/// `StatusUpdate` variant's job.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum StreamEvent {
    Task(Task),
    StatusUpdate(TaskStatusUpdateEvent),
    ArtifactUpdate(TaskArtifactUpdateEvent),
}

impl StreamEvent {
    pub fn is_final(&self) -> bool {
        match self {
            StreamEvent::StatusUpdate(s) => s.final_event,
            StreamEvent::Task(_) | StreamEvent::ArtifactUpdate(_) => false,
        }
    }
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TaskStatusUpdateEvent {
    pub task_id: String,
    pub context_id: String,
    /// Discriminator. Always `"status-update"` per A2A v1.0.
    #[serde(default = "status_update_kind")]
    pub kind: String,
    pub status: TaskStatus,
    /// True iff this update is the terminal status — matches the
    /// `final` field in the spec; renamed because `final` is a
    /// reserved word in Rust.
    #[serde(rename = "final")]
    pub final_event: bool,
    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
    pub metadata: HashMap<String, Value>,
}

fn status_update_kind() -> String {
    "status-update".into()
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TaskArtifactUpdateEvent {
    pub task_id: String,
    pub context_id: String,
    /// Discriminator. Always `"artifact-update"` per A2A v1.0.
    #[serde(default = "artifact_update_kind")]
    pub kind: String,
    pub artifact: Artifact,
    /// Whether the artifact's parts append to a prior chunk of the
    /// same `artifactId`. Defaults to `false` (replace).
    #[serde(default)]
    pub append: bool,
    /// True iff this is the last chunk for this artifact id.
    #[serde(default)]
    pub last_chunk: bool,
    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
    pub metadata: HashMap<String, Value>,
}

fn artifact_update_kind() -> String {
    "artifact-update".into()
}

/// Per-task broadcast channels keyed by task id. Cheap to clone — the
/// inner `Arc<RwLock<HashMap>>` is shared.
#[derive(Clone, Default)]
pub struct EventBus {
    inner: Arc<RwLock<HashMap<String, broadcast::Sender<StreamEvent>>>>,
}

impl EventBus {
    pub fn new() -> Self {
        Self::default()
    }

    /// Subscribe to a task's stream. Creates the channel if it does
    /// not yet exist so subscribers can attach before the first
    /// publish.
    pub async fn subscribe(&self, task_id: &str) -> broadcast::Receiver<StreamEvent> {
        let mut guard = self.inner.write().await;
        guard
            .entry(task_id.to_string())
            .or_insert_with(|| broadcast::channel(CHANNEL_CAPACITY).0)
            .subscribe()
    }

    /// Publish an event. Acquires the write lock once — both the
    /// upsert (channel may not exist yet) and the conditional remove
    /// (drop the channel after a `final` event) happen under the
    /// same critical section so a concurrent `subscribe` can't slip
    /// in and leave an orphan sender behind.
    pub async fn publish(&self, task_id: &str, event: StreamEvent) {
        let is_final = event.is_final();
        let mut guard = self.inner.write().await;
        let sender = guard
            .entry(task_id.to_string())
            .or_insert_with(|| broadcast::channel(CHANNEL_CAPACITY).0)
            .clone();
        if is_final {
            guard.remove(task_id);
        }
        drop(guard);
        // `send` returns Err only when there are no receivers, which
        // is normal for tasks nobody is streaming.
        let _ = sender.send(event);
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::bridge::task_with_status;
    use crate::types::TaskState;
    use chrono::Utc;
    use tokio::time::{timeout, Duration};

    #[tokio::test]
    async fn subscriber_receives_published_events() {
        let bus = EventBus::new();
        let mut rx = bus.subscribe("t-1").await;
        let task = task_with_status(
            "t-1".into(),
            "ctx".into(),
            TaskState::Submitted,
            vec![],
            vec![],
        );
        bus.publish("t-1", StreamEvent::Task(task)).await;
        let event = timeout(Duration::from_secs(1), rx.recv())
            .await
            .expect("recv timeout")
            .expect("recv ok");
        assert!(matches!(event, StreamEvent::Task(_)));
    }

    #[tokio::test]
    async fn final_event_closes_channel() {
        let bus = EventBus::new();
        let mut rx = bus.subscribe("t-2").await;
        bus.publish(
            "t-2",
            StreamEvent::StatusUpdate(TaskStatusUpdateEvent {
                task_id: "t-2".into(),
                context_id: "ctx".into(),
                kind: status_update_kind(),
                status: TaskStatus {
                    state: TaskState::Completed,
                    message: None,
                    timestamp: Utc::now(),
                },
                final_event: true,
                metadata: HashMap::new(),
            }),
        )
        .await;
        // First recv: the final update.
        let _ = rx.recv().await.expect("event");
        // Second recv: channel was dropped, so we should see Closed.
        let res = rx.recv().await;
        assert!(matches!(res, Err(broadcast::error::RecvError::Closed)));
    }

    #[tokio::test]
    async fn stream_event_round_trip_serializes() {
        let event = StreamEvent::StatusUpdate(TaskStatusUpdateEvent {
            task_id: "t".into(),
            context_id: "c".into(),
            kind: "status-update".into(),
            status: TaskStatus {
                state: TaskState::Working,
                message: None,
                timestamp: Utc::now(),
            },
            final_event: false,
            metadata: HashMap::new(),
        });
        let v = serde_json::to_value(&event).unwrap();
        assert_eq!(v["kind"], "status-update");
        assert_eq!(v["taskId"], "t");
        assert_eq!(v["final"], false);
    }
}