guardian_db/p2p/
events.rs

1use crate::error::{GuardianError, Result};
2use crate::traits::{DirectChannelEmitter, EventPubSub, EventPubSubMessage, EventPubSubPayload};
3use async_trait::async_trait;
4use libp2p::PeerId;
5use std::any::{Any, TypeId};
6use std::collections::HashMap;
7use std::sync::Arc;
8use tokio::sync::{RwLock, broadcast};
9
10// ============================================================================
11// EVENT BUS IMPLEMENTATION usando Tokio Channels
12// ============================================================================
13
14/// Event Bus baseado em canais do Tokio
15/// Oferece funcionalidade de pub/sub type-safe usando broadcast channels
16#[derive(Clone)]
17pub struct EventBus {
18    channels: Arc<RwLock<HashMap<TypeId, Box<dyn Any + Send + Sync>>>>,
19}
20
21impl Default for EventBus {
22    fn default() -> Self {
23        Self::new()
24    }
25}
26
27impl EventBus {
28    /// Cria um novo Event Bus
29    pub fn new() -> Self {
30        Self {
31            channels: Arc::new(RwLock::new(HashMap::new())),
32        }
33    }
34
35    /// Cria um emitter para um tipo específico de evento
36    pub async fn emitter<T>(&self) -> Result<Emitter<T>>
37    where
38        T: Clone + Send + Sync + 'static,
39    {
40        let type_id = TypeId::of::<T>();
41        let mut channels = self.channels.write().await;
42
43        channels.entry(type_id).or_insert_with(|| {
44            let (sender, _) = broadcast::channel::<T>(1024); // Buffer de 1024 eventos
45            Box::new(sender)
46        });
47
48        let sender = channels
49            .get(&type_id)
50            .and_then(|any| any.downcast_ref::<broadcast::Sender<T>>())
51            .ok_or_else(|| GuardianError::Other("Failed to get sender for type".to_string()))?
52            .clone();
53
54        Ok(Emitter { sender })
55    }
56
57    /// Subscribe para receber eventos de um tipo específico
58    pub async fn subscribe<T>(&self) -> Result<broadcast::Receiver<T>>
59    where
60        T: Clone + Send + Sync + 'static,
61    {
62        let type_id = TypeId::of::<T>();
63        let mut channels = self.channels.write().await;
64
65        channels.entry(type_id).or_insert_with(|| {
66            let (sender, _) = broadcast::channel::<T>(1024);
67            Box::new(sender)
68        });
69
70        let sender = channels
71            .get(&type_id)
72            .and_then(|any| any.downcast_ref::<broadcast::Sender<T>>())
73            .ok_or_else(|| GuardianError::Other("Failed to get sender for type".to_string()))?;
74
75        Ok(sender.subscribe())
76    }
77}
78
79/// Emitter type-safe para um tipo específico de evento
80pub struct Emitter<T> {
81    sender: broadcast::Sender<T>,
82}
83
84impl<T> Emitter<T>
85where
86    T: Clone + Send + Sync + 'static,
87{
88    /// Emite um evento para todos os subscribers
89    pub fn emit(&self, event: T) -> Result<()> {
90        // broadcast::send retorna erro apenas se não há receivers
91        // Neste caso, ignoramos o erro pois é normal não ter listeners
92        let _ = self.sender.send(event);
93        Ok(())
94    }
95
96    /// Retorna o número de subscribers ativos
97    pub fn receiver_count(&self) -> usize {
98        self.sender.receiver_count()
99    }
100
101    /// Fecha o emitter - implementação básica para compatibilidade
102    /// Como broadcast::Sender não tem método close(), esta é uma implementação de compatibilidade
103    pub async fn close(&self) -> Result<()> {
104        // Para broadcast::Sender, não há método close() direto
105        // O channel é fechado automaticamente quando todos os senders são dropados
106        // ***Por enquanto, esta é uma implementação de compatibilidade que sempre retorna Ok
107        Ok(())
108    }
109}
110
111// ============================================================================
112// PAYLOAD EMITTER
113// ============================================================================
114
115pub type Bus = EventBus;
116
117pub struct PayloadEmitter {
118    // EventBus baseado em Tokio
119    emitter: Emitter<EventPubSubPayload>,
120}
121
122impl PayloadEmitter {
123    /// Cria um novo emissor de eventos para payloads de pub/sub.
124    pub async fn new(bus: &Bus) -> Result<Self> {
125        let emitter = bus.emitter::<EventPubSubPayload>().await?;
126        Ok(PayloadEmitter { emitter })
127    }
128
129    /// Emite um evento de payload.
130    pub fn emit_payload(&self, evt: EventPubSubPayload) -> Result<()> {
131        self.emitter.emit(evt)
132    }
133}
134
135// Implementação do trait DirectChannelEmitter
136#[async_trait]
137impl DirectChannelEmitter for PayloadEmitter {
138    type Error = GuardianError;
139
140    async fn emit(&self, payload: EventPubSubPayload) -> std::result::Result<(), Self::Error> {
141        self.emit_payload(payload)
142    }
143
144    async fn close(&self) -> std::result::Result<(), Self::Error> {
145        // PayloadEmitter não precisa fechar nada especial
146        Ok(())
147    }
148}
149
150/// Cria um novo evento de Mensagem.
151pub fn new_event_message(content: Vec<u8>) -> EventPubSubMessage {
152    EventPubSubMessage { content }
153}
154
155/// Cria um novo evento de Payload.
156pub fn new_event_payload(payload: Vec<u8>, peer: PeerId) -> EventPubSubPayload {
157    EventPubSubPayload { payload, peer }
158}
159
160/// Cria um novo evento EventPubSubJoin.
161pub fn new_event_peer_join(peer: PeerId, topic: String) -> EventPubSub {
162    EventPubSub::Join { peer, topic }
163}
164
165/// Cria um novo evento EventPubSubLeave.
166pub fn new_event_peer_leave(peer: PeerId, topic: String) -> EventPubSub {
167    EventPubSub::Leave { peer, topic }
168}