Skip to main content

crb_superagent/
bridge.rs

1use anyhow::{Result, anyhow};
2use crb_agent::{Agent, Envelope, Event, OnEvent, TheEvent};
3use crb_core::{mpsc, sync::Mutex};
4use derive_more::{Deref, DerefMut};
5// TODO: Move to the core
6use futures::Stream;
7use tokio_stream::wrappers::UnboundedReceiverStream;
8
9pub struct EventBridge<T> {
10    tx: mpsc::UnboundedSender<T>,
11    rx: Mutex<Option<mpsc::UnboundedReceiver<T>>>,
12}
13
14impl<T: TheEvent> Default for EventBridge<T> {
15    fn default() -> Self {
16        Self::new()
17    }
18}
19
20impl<T: TheEvent> EventBridge<T> {
21    pub fn new() -> Self {
22        let (tx, rx) = mpsc::unbounded_channel();
23        Self {
24            tx,
25            rx: Mutex::new(Some(rx)),
26        }
27    }
28
29    pub fn send(&self, msg: T) {
30        self.tx.send(msg).ok();
31    }
32
33    pub async fn events(&self) -> Result<impl Stream<Item = T>> {
34        self.rx
35            .lock()
36            .await
37            .take()
38            .ok_or_else(|| anyhow!("Receiver of the EventBridge has consumed already."))
39            .map(UnboundedReceiverStream::new)
40    }
41}
42
43#[derive(Deref, DerefMut)]
44pub struct AgentBridge<A> {
45    bridge: EventBridge<Envelope<A>>,
46}
47
48impl<A: Agent> Default for AgentBridge<A> {
49    fn default() -> Self {
50        Self::new()
51    }
52}
53
54impl<A: Agent> AgentBridge<A> {
55    pub fn new() -> Self {
56        Self {
57            bridge: EventBridge::new(),
58        }
59    }
60
61    pub fn event<E>(&self, msg: E)
62    where
63        A: OnEvent<E>,
64        E: TheEvent,
65    {
66        let event = Event::envelope(msg);
67        self.tx.send(event).ok();
68    }
69}