Skip to main content

agentrs_multi/
communication.rs

1use async_trait::async_trait;
2use futures::{stream::BoxStream, StreamExt};
3use tokio::sync::broadcast;
4use tokio_stream::wrappers::BroadcastStream;
5
6use agentrs_core::{AgentOutput, Result};
7
8/// Events emitted during orchestration.
9#[derive(Debug, Clone)]
10pub enum OrchestratorEvent {
11    /// An agent completed execution.
12    AgentCompleted {
13        /// Name of the completed agent.
14        agent: String,
15        /// Output produced by the agent.
16        output: AgentOutput,
17    },
18}
19
20/// Pluggable event bus for orchestration observability.
21#[async_trait]
22pub trait EventBus: Send + Sync {
23    /// Publishes an event.
24    async fn publish(&self, event: OrchestratorEvent) -> Result<()>;
25
26    /// Subscribes to future events.
27    async fn subscribe(&self) -> Result<BoxStream<'static, OrchestratorEvent>>;
28}
29
30/// In-memory broadcast event bus.
31#[derive(Clone)]
32pub struct InMemoryBus {
33    sender: broadcast::Sender<OrchestratorEvent>,
34}
35
36impl InMemoryBus {
37    /// Creates a bus with the requested capacity.
38    pub fn new(capacity: usize) -> Self {
39        let (sender, _) = broadcast::channel(capacity);
40        Self { sender }
41    }
42}
43
44#[async_trait]
45impl EventBus for InMemoryBus {
46    async fn publish(&self, event: OrchestratorEvent) -> Result<()> {
47        let _ = self.sender.send(event);
48        Ok(())
49    }
50
51    async fn subscribe(&self) -> Result<BoxStream<'static, OrchestratorEvent>> {
52        Ok(BroadcastStream::new(self.sender.subscribe())
53            .filter_map(|result| async move { result.ok() })
54            .boxed())
55    }
56}