car-a2a 0.7.0

Bridge between Common Agent Runtime and the Linux Foundation Agent2Agent (A2A) v1.0 protocol
Documentation
//! Push-notification delivery.
//!
//! When a task transitions state or accumulates new artifacts, the
//! dispatcher publishes the current `Task` snapshot to every URL the
//! peer registered via `tasks/pushNotificationConfig/set`. Delivery is
//! fire-and-forget (a `tokio::spawn` per config); failures are logged
//! but do not block the runtime.
//!
//! This is intentionally simpler than the spec's full delivery
//! semantics — no retries, no signed payloads, no replay protection.
//! Production embedders that need those should wrap or replace
//! [`PushDispatcher`] with their own implementation.

use crate::types::{PushNotificationConfig, Task};
use tracing::{debug, warn};

/// Sends `Task` snapshots to push subscribers over HTTP.
#[derive(Clone)]
pub struct PushDispatcher {
    client: reqwest::Client,
}

impl Default for PushDispatcher {
    fn default() -> Self {
        Self {
            client: reqwest::Client::builder()
                .timeout(std::time::Duration::from_secs(10))
                .build()
                .expect("default reqwest client must build"),
        }
    }
}

impl PushDispatcher {
    pub fn new(client: reqwest::Client) -> Self {
        Self { client }
    }

    /// Fan out a task snapshot to every configured push subscriber.
    /// Returns immediately; deliveries run on a detached tokio task.
    pub fn deliver(&self, configs: Vec<PushNotificationConfig>, task: Task) {
        if configs.is_empty() {
            return;
        }
        let client = self.client.clone();
        tokio::spawn(async move {
            for cfg in configs {
                let mut req = client.post(&cfg.url).json(&task);
                if let Some(token) = &cfg.token {
                    req = req.bearer_auth(token);
                }
                match req.send().await {
                    Ok(resp) if resp.status().is_success() => {
                        debug!(url = %cfg.url, task = %task.id, "push delivered");
                    }
                    Ok(resp) => {
                        warn!(url = %cfg.url, status = %resp.status(), "push non-2xx");
                    }
                    Err(e) => {
                        warn!(url = %cfg.url, error = %e, "push delivery failed");
                    }
                }
            }
        });
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::bridge::task_with_status;
    use crate::types::TaskState;

    #[tokio::test]
    async fn empty_config_list_is_noop() {
        let d = PushDispatcher::default();
        let task = task_with_status(
            "t".into(),
            "ctx".into(),
            TaskState::Submitted,
            vec![],
            vec![],
        );
        // Should not spawn anything or panic.
        d.deliver(vec![], task);
    }
}