agentrs_multi/
communication.rs1use async_trait::async_trait;
2use futures::{stream::BoxStream, StreamExt};
3use tokio::sync::broadcast;
4use tokio_stream::wrappers::BroadcastStream;
5
6use agentrs_core::{AgentOutput, Result};
7
8#[derive(Debug, Clone)]
10pub enum OrchestratorEvent {
11 AgentCompleted {
13 agent: String,
15 output: AgentOutput,
17 },
18}
19
20#[async_trait]
22pub trait EventBus: Send + Sync {
23 async fn publish(&self, event: OrchestratorEvent) -> Result<()>;
25
26 async fn subscribe(&self) -> Result<BoxStream<'static, OrchestratorEvent>>;
28}
29
30#[derive(Clone)]
32pub struct InMemoryBus {
33 sender: broadcast::Sender<OrchestratorEvent>,
34}
35
36impl InMemoryBus {
37 pub fn new(capacity: usize) -> Self {
39 let (sender, _) = broadcast::channel(capacity);
40 Self { sender }
41 }
42}
43
44#[async_trait]
45impl EventBus for InMemoryBus {
46 async fn publish(&self, event: OrchestratorEvent) -> Result<()> {
47 let _ = self.sender.send(event);
48 Ok(())
49 }
50
51 async fn subscribe(&self) -> Result<BoxStream<'static, OrchestratorEvent>> {
52 Ok(BroadcastStream::new(self.sender.subscribe())
53 .filter_map(|result| async move { result.ok() })
54 .boxed())
55 }
56}