Skip to main content

a2a_rs/port/
notification_manager.rs

1//! Push notification management port definitions
2
3use async_trait::async_trait;
4
5use crate::domain::{
6    A2AError, DeleteTaskPushNotificationConfigParams, GetTaskPushNotificationConfigParams,
7    ListTaskPushNotificationConfigsParams, TaskArtifactUpdateEvent, TaskPushNotificationConfig,
8    TaskStatusUpdateEvent,
9};
10
11/// Validate a push notification config URL.
12///
13/// Checks that the URL is non-empty, well-formed, and uses HTTPS
14/// (HTTP is allowed only for localhost for development purposes).
15fn validate_push_notification_url(config: &TaskPushNotificationConfig) -> Result<(), A2AError> {
16    if config.url.trim().is_empty() {
17        return Err(A2AError::ValidationError {
18            field: "url".to_string(),
19            message: "Webhook URL cannot be empty".to_string(),
20        });
21    }
22
23    match url::Url::parse(&config.url) {
24        Ok(parsed_url) => {
25            let scheme = parsed_url.scheme();
26            if scheme != "https" {
27                let is_localhost = parsed_url
28                    .host_str()
29                    .map(|h| h == "localhost" || h == "127.0.0.1" || h == "::1")
30                    .unwrap_or(false);
31
32                if scheme != "http" || !is_localhost {
33                    return Err(A2AError::ValidationError {
34                        field: "url".to_string(),
35                        message: "Webhook URL must use HTTPS (HTTP is only allowed for localhost)"
36                            .to_string(),
37                    });
38                }
39            }
40        }
41        Err(_) => {
42            return Err(A2AError::ValidationError {
43                field: "url".to_string(),
44                message: "Invalid webhook URL format".to_string(),
45            });
46        }
47    }
48
49    Ok(())
50}
51
52/// Async management of push-notification configurations.
53///
54/// Expressed in terms of the A2A v1.0.0 multi-config CRUD model — the richest
55/// shape — so a single capability covers both single- and multi-config storage.
56/// Validation conveniences (URL/task-id checks) live on
57/// [`AsyncNotificationManagerExt`], which is blanket-implemented for every
58/// `AsyncNotificationManager`.
59#[async_trait]
60pub trait AsyncNotificationManager: Send + Sync {
61    /// Create or replace a push-notification config, returning it with any
62    /// server-assigned ID populated.
63    async fn set_config(
64        &self,
65        config: &TaskPushNotificationConfig,
66    ) -> Result<TaskPushNotificationConfig, A2AError>;
67
68    /// Get a push-notification config for a task.
69    async fn get_config(
70        &self,
71        params: &GetTaskPushNotificationConfigParams,
72    ) -> Result<TaskPushNotificationConfig, A2AError>;
73
74    /// List all push-notification configs for a task.
75    async fn list_configs(
76        &self,
77        params: &ListTaskPushNotificationConfigsParams,
78    ) -> Result<Vec<TaskPushNotificationConfig>, A2AError>;
79
80    /// Delete a push-notification config. Idempotent per the v1.0.0 spec.
81    async fn delete_config(
82        &self,
83        params: &DeleteTaskPushNotificationConfigParams,
84    ) -> Result<(), A2AError>;
85}
86
87/// Validation conveniences over [`AsyncNotificationManager`].
88///
89/// Blanket-implemented for every `AsyncNotificationManager`, so implementors
90/// only stub the core CRUD primitives.
91#[async_trait]
92pub trait AsyncNotificationManagerExt: AsyncNotificationManager {
93    /// Validate a push-notification config's webhook URL.
94    fn validate_config(&self, config: &TaskPushNotificationConfig) -> Result<(), A2AError> {
95        validate_push_notification_url(config)
96    }
97
98    /// Validate the task ID and webhook URL, then store the config.
99    async fn set_validated(
100        &self,
101        config: &TaskPushNotificationConfig,
102    ) -> Result<TaskPushNotificationConfig, A2AError> {
103        if config.task_id.trim().is_empty() {
104            return Err(A2AError::ValidationError {
105                field: "task_id".to_string(),
106                message: "Task ID cannot be empty".to_string(),
107            });
108        }
109        self.validate_config(config)?;
110        self.set_config(config).await
111    }
112}
113
114impl<T: AsyncNotificationManager + ?Sized> AsyncNotificationManagerExt for T {}
115
116/// Out-of-band delivery of task updates to a task's configured push endpoint.
117///
118/// This is the **delivery** half of push notifications, deliberately separate
119/// from the config-CRUD capability ([`AsyncNotificationManager`]) and from the
120/// in-process streaming fan-out ([`AsyncStreamingHandler`](crate::port::AsyncStreamingHandler)).
121/// Keeping delivery behind its own port is what lets the orchestration layer
122/// (the [`TaskStatusBroadcast`](crate::application::TaskStatusBroadcast) mixin)
123/// "commit, announce to subscribers, then notify the webhook" without any one
124/// adapter taking on a second job — and lets the notification backend be swapped
125/// freely (HTTP webhook, no-op, a queue, a test spy) at the composition edge.
126///
127/// Errors are surfaced to the caller, but the orchestration layer treats
128/// delivery as best-effort: a webhook that is down must not fail the task
129/// mutation that triggered it.
130#[async_trait]
131pub trait AsyncPushNotifier: Send + Sync {
132    /// Deliver a status update to the task's configured push endpoint, if any.
133    ///
134    /// A task with no registered config is not an error — implementations
135    /// return `Ok(())`.
136    async fn notify_status(
137        &self,
138        task_id: &str,
139        event: &TaskStatusUpdateEvent,
140    ) -> Result<(), A2AError>;
141
142    /// Deliver an artifact update to the task's configured push endpoint, if any.
143    async fn notify_artifact(
144        &self,
145        task_id: &str,
146        event: &TaskArtifactUpdateEvent,
147    ) -> Result<(), A2AError>;
148}
149
150/// Deref-forwarding impl so an `Arc<dyn AsyncPushNotifier>` (e.g. the value
151/// handed out by `InMemoryTaskStorage::push_notifier`) satisfies `impl
152/// AsyncPushNotifier` bounds directly, without re-wrapping.
153#[async_trait]
154impl<T: AsyncPushNotifier + ?Sized> AsyncPushNotifier for std::sync::Arc<T> {
155    async fn notify_status(
156        &self,
157        task_id: &str,
158        event: &TaskStatusUpdateEvent,
159    ) -> Result<(), A2AError> {
160        (**self).notify_status(task_id, event).await
161    }
162
163    async fn notify_artifact(
164        &self,
165        task_id: &str,
166        event: &TaskArtifactUpdateEvent,
167    ) -> Result<(), A2AError> {
168        (**self).notify_artifact(task_id, event).await
169    }
170}
171
172/// A no-op [`AsyncPushNotifier`] for compositions with no push backend wired.
173///
174/// Every method succeeds without doing anything, mirroring `NoopStreamingHandler`
175/// on the streaming side.
176#[derive(Clone, Debug, Default)]
177pub struct NoopPushNotifier;
178
179#[async_trait]
180impl AsyncPushNotifier for NoopPushNotifier {
181    async fn notify_status(
182        &self,
183        _task_id: &str,
184        _event: &TaskStatusUpdateEvent,
185    ) -> Result<(), A2AError> {
186        Ok(())
187    }
188
189    async fn notify_artifact(
190        &self,
191        _task_id: &str,
192        _event: &TaskArtifactUpdateEvent,
193    ) -> Result<(), A2AError> {
194        Ok(())
195    }
196}