guardian_db/p2p/
events.rs1use crate::error::{GuardianError, Result};
2use crate::traits::{DirectChannelEmitter, EventPubSub, EventPubSubMessage, EventPubSubPayload};
3use async_trait::async_trait;
4use libp2p::PeerId;
5use std::any::{Any, TypeId};
6use std::collections::HashMap;
7use std::sync::Arc;
8use tokio::sync::{RwLock, broadcast};
9
10#[derive(Clone)]
17pub struct EventBus {
18 channels: Arc<RwLock<HashMap<TypeId, Box<dyn Any + Send + Sync>>>>,
19}
20
21impl Default for EventBus {
22 fn default() -> Self {
23 Self::new()
24 }
25}
26
27impl EventBus {
28 pub fn new() -> Self {
30 Self {
31 channels: Arc::new(RwLock::new(HashMap::new())),
32 }
33 }
34
35 pub async fn emitter<T>(&self) -> Result<Emitter<T>>
37 where
38 T: Clone + Send + Sync + 'static,
39 {
40 let type_id = TypeId::of::<T>();
41 let mut channels = self.channels.write().await;
42
43 channels.entry(type_id).or_insert_with(|| {
44 let (sender, _) = broadcast::channel::<T>(1024); Box::new(sender)
46 });
47
48 let sender = channels
49 .get(&type_id)
50 .and_then(|any| any.downcast_ref::<broadcast::Sender<T>>())
51 .ok_or_else(|| GuardianError::Other("Failed to get sender for type".to_string()))?
52 .clone();
53
54 Ok(Emitter { sender })
55 }
56
57 pub async fn subscribe<T>(&self) -> Result<broadcast::Receiver<T>>
59 where
60 T: Clone + Send + Sync + 'static,
61 {
62 let type_id = TypeId::of::<T>();
63 let mut channels = self.channels.write().await;
64
65 channels.entry(type_id).or_insert_with(|| {
66 let (sender, _) = broadcast::channel::<T>(1024);
67 Box::new(sender)
68 });
69
70 let sender = channels
71 .get(&type_id)
72 .and_then(|any| any.downcast_ref::<broadcast::Sender<T>>())
73 .ok_or_else(|| GuardianError::Other("Failed to get sender for type".to_string()))?;
74
75 Ok(sender.subscribe())
76 }
77}
78
79pub struct Emitter<T> {
81 sender: broadcast::Sender<T>,
82}
83
84impl<T> Emitter<T>
85where
86 T: Clone + Send + Sync + 'static,
87{
88 pub fn emit(&self, event: T) -> Result<()> {
90 let _ = self.sender.send(event);
93 Ok(())
94 }
95
96 pub fn receiver_count(&self) -> usize {
98 self.sender.receiver_count()
99 }
100
101 pub async fn close(&self) -> Result<()> {
104 Ok(())
108 }
109}
110
111pub type Bus = EventBus;
116
117pub struct PayloadEmitter {
118 emitter: Emitter<EventPubSubPayload>,
120}
121
122impl PayloadEmitter {
123 pub async fn new(bus: &Bus) -> Result<Self> {
125 let emitter = bus.emitter::<EventPubSubPayload>().await?;
126 Ok(PayloadEmitter { emitter })
127 }
128
129 pub fn emit_payload(&self, evt: EventPubSubPayload) -> Result<()> {
131 self.emitter.emit(evt)
132 }
133}
134
135#[async_trait]
137impl DirectChannelEmitter for PayloadEmitter {
138 type Error = GuardianError;
139
140 async fn emit(&self, payload: EventPubSubPayload) -> std::result::Result<(), Self::Error> {
141 self.emit_payload(payload)
142 }
143
144 async fn close(&self) -> std::result::Result<(), Self::Error> {
145 Ok(())
147 }
148}
149
150pub fn new_event_message(content: Vec<u8>) -> EventPubSubMessage {
152 EventPubSubMessage { content }
153}
154
155pub fn new_event_payload(payload: Vec<u8>, peer: PeerId) -> EventPubSubPayload {
157 EventPubSubPayload { payload, peer }
158}
159
160pub fn new_event_peer_join(peer: PeerId, topic: String) -> EventPubSub {
162 EventPubSub::Join { peer, topic }
163}
164
165pub fn new_event_peer_leave(peer: PeerId, topic: String) -> EventPubSub {
167 EventPubSub::Leave { peer, topic }
168}