use std::sync::Arc;
use serde::Serialize;
use tokio::sync::broadcast;
use crate::a2a::core::task_types::{Task, TaskId, TaskState};
#[derive(Clone, Debug, Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum Event {
TaskCreated(Arc<Task>),
TaskStatusChanged {
task_id: TaskId,
old_state: TaskState,
new_state: TaskState,
task: Arc<Task>,
},
}
impl Event {
pub fn event_type(&self) -> &'static str {
match self {
Self::TaskCreated(_) => "task_created",
Self::TaskStatusChanged { .. } => "task_status_changed",
}
}
}
pub struct MessageBus {
sender: broadcast::Sender<Event>,
}
impl MessageBus {
pub fn new(capacity: usize) -> Self {
let (sender, _) = broadcast::channel(capacity);
Self { sender }
}
pub fn publish(&self, event: Event) {
let event_type = event.event_type();
if let Err(broadcast::error::SendError(_)) = self.sender.send(event) {
tracing::trace!(event_type, "no subscribers; bus event dropped");
}
}
pub fn subscribe(&self) -> broadcast::Receiver<Event> {
self.sender.subscribe()
}
}
#[cfg(test)]
mod tests {
use chrono::Utc;
use super::*;
use crate::a2a::core::task_types::{ContextId, Task, TaskMessage, TaskState, TaskStatus};
fn make_task() -> Task {
Task {
id: TaskId::new(),
context_id: ContextId::new(),
status: TaskStatus {
state: TaskState::Submitted,
message: None,
timestamp: Utc::now(),
},
artifacts: Vec::new(),
history: Vec::<TaskMessage>::new(),
metadata: None,
assignee: None,
creator: None,
deadline: None,
}
}
#[tokio::test]
async fn publish_and_subscribe_should_deliver_event_to_subscriber() {
let bus = MessageBus::new(16);
let mut rx = bus.subscribe();
let task = make_task();
bus.publish(Event::TaskCreated(Arc::new(task.clone())));
let received = rx.recv().await.expect("subscriber must receive the event");
let Event::TaskCreated(received_task) = received else {
panic!("expected TaskCreated, got something else");
};
assert_eq!(
received_task.id, task.id,
"received task id must match published task id"
);
}
#[tokio::test]
async fn multiple_subscribers_should_each_receive_same_event() {
let bus = MessageBus::new(16);
let mut rx1 = bus.subscribe();
let mut rx2 = bus.subscribe();
let task = make_task();
bus.publish(Event::TaskCreated(Arc::new(task.clone())));
let ev1 = rx1
.recv()
.await
.expect("subscriber 1 must receive the event");
let ev2 = rx2
.recv()
.await
.expect("subscriber 2 must receive the event");
let Event::TaskCreated(t1) = ev1 else {
panic!("subscriber 1: expected TaskCreated");
};
let Event::TaskCreated(t2) = ev2 else {
panic!("subscriber 2: expected TaskCreated");
};
assert_eq!(t1.id, task.id, "subscriber 1 task id must match");
assert_eq!(t2.id, task.id, "subscriber 2 task id must match");
}
#[tokio::test]
async fn subscriber_after_publish_should_miss_earlier_event() {
let bus = MessageBus::new(16);
bus.publish(Event::TaskCreated(Arc::new(make_task())));
let mut rx = bus.subscribe();
let result = tokio::time::timeout(std::time::Duration::from_millis(20), rx.recv()).await;
assert!(
result.is_err(),
"late subscriber must not receive events published before it subscribed"
);
}
#[tokio::test]
async fn publish_with_no_subscribers_should_not_panic() {
let bus = MessageBus::new(16);
bus.publish(Event::TaskCreated(Arc::new(make_task())));
}
#[test]
fn event_type_strings_are_stable() {
assert_eq!(
Event::TaskStatusChanged {
task_id: TaskId::new(),
old_state: TaskState::Submitted,
new_state: TaskState::Working,
task: Arc::new(make_task()),
}
.event_type(),
"task_status_changed"
);
}
}