1use anyhow::{Result, anyhow};
2use crb_agent::{Agent, Envelope, Event, OnEvent, TheEvent};
3use crb_core::{mpsc, sync::Mutex};
4use derive_more::{Deref, DerefMut};
5use futures::Stream;
7use tokio_stream::wrappers::UnboundedReceiverStream;
8
9pub struct EventBridge<T> {
10 tx: mpsc::UnboundedSender<T>,
11 rx: Mutex<Option<mpsc::UnboundedReceiver<T>>>,
12}
13
14impl<T: TheEvent> Default for EventBridge<T> {
15 fn default() -> Self {
16 Self::new()
17 }
18}
19
20impl<T: TheEvent> EventBridge<T> {
21 pub fn new() -> Self {
22 let (tx, rx) = mpsc::unbounded_channel();
23 Self {
24 tx,
25 rx: Mutex::new(Some(rx)),
26 }
27 }
28
29 pub fn send(&self, msg: T) {
30 self.tx.send(msg).ok();
31 }
32
33 pub async fn events(&self) -> Result<impl Stream<Item = T>> {
34 self.rx
35 .lock()
36 .await
37 .take()
38 .ok_or_else(|| anyhow!("Receiver of the EventBridge has consumed already."))
39 .map(UnboundedReceiverStream::new)
40 }
41}
42
43#[derive(Deref, DerefMut)]
44pub struct AgentBridge<A> {
45 bridge: EventBridge<Envelope<A>>,
46}
47
48impl<A: Agent> Default for AgentBridge<A> {
49 fn default() -> Self {
50 Self::new()
51 }
52}
53
54impl<A: Agent> AgentBridge<A> {
55 pub fn new() -> Self {
56 Self {
57 bridge: EventBridge::new(),
58 }
59 }
60
61 pub fn event<E>(&self, msg: E)
62 where
63 A: OnEvent<E>,
64 E: TheEvent,
65 {
66 let event = Event::envelope(msg);
67 self.tx.send(event).ok();
68 }
69}