#[cfg(not(feature = "std"))]
use alloc::{boxed::Box, string::String, sync::Arc, vec::Vec};
#[cfg(feature = "std")]
use std::sync::Arc;
#[cfg(feature = "std")]
use std::string::String;
#[cfg(feature = "std")]
use std::vec::Vec;
use crate::peer::PeatPeer;
use crate::sync::crdt::EventType;
use crate::NodeId;
#[derive(Debug, Clone)]
pub enum PeatEvent {
PeerDiscovered {
peer: PeatPeer,
},
PeerConnected {
node_id: NodeId,
},
PeerDisconnected {
node_id: NodeId,
reason: DisconnectReason,
},
PeerLost {
node_id: NodeId,
},
EmergencyReceived {
from_node: NodeId,
},
AckReceived {
from_node: NodeId,
},
EventReceived {
from_node: NodeId,
event_type: EventType,
},
DocumentSynced {
from_node: NodeId,
total_count: u64,
},
AppDocumentReceived {
type_id: u8,
source_node: NodeId,
timestamp: u64,
changed: bool,
},
MeshStateChanged {
peer_count: usize,
connected_count: usize,
},
AllPeersAcked {
ack_count: usize,
},
PeerE2eeEstablished {
peer_node_id: NodeId,
},
PeerE2eeClosed {
peer_node_id: NodeId,
},
PeerE2eeMessageReceived {
from_node: NodeId,
data: Vec<u8>,
},
PeerE2eeFailed {
peer_node_id: NodeId,
error: String,
},
SecurityViolation {
kind: SecurityViolationKind,
source: Option<String>,
},
MessageRelayed {
origin_node: NodeId,
relay_count: usize,
hop_count: u8,
},
DuplicateMessageDropped {
origin_node: NodeId,
seen_count: u32,
},
MessageTtlExpired {
origin_node: NodeId,
hop_count: u8,
},
#[cfg(feature = "mesh-translator")]
TranslatorNoCallback {
collection: String,
peer: Option<String>,
},
}
impl PeatEvent {
pub fn peer_discovered(peer: PeatPeer) -> Self {
Self::PeerDiscovered { peer }
}
pub fn peer_connected(node_id: NodeId) -> Self {
Self::PeerConnected { node_id }
}
pub fn peer_disconnected(node_id: NodeId, reason: DisconnectReason) -> Self {
Self::PeerDisconnected { node_id, reason }
}
pub fn peer_lost(node_id: NodeId) -> Self {
Self::PeerLost { node_id }
}
pub fn emergency_received(from_node: NodeId) -> Self {
Self::EmergencyReceived { from_node }
}
pub fn ack_received(from_node: NodeId) -> Self {
Self::AckReceived { from_node }
}
pub fn event_received(from_node: NodeId, event_type: EventType) -> Self {
Self::EventReceived {
from_node,
event_type,
}
}
pub fn document_synced(from_node: NodeId, total_count: u64) -> Self {
Self::DocumentSynced {
from_node,
total_count,
}
}
pub fn app_document_received(
type_id: u8,
source_node: NodeId,
timestamp: u64,
changed: bool,
) -> Self {
Self::AppDocumentReceived {
type_id,
source_node,
timestamp,
changed,
}
}
pub fn peer_e2ee_established(peer_node_id: NodeId) -> Self {
Self::PeerE2eeEstablished { peer_node_id }
}
pub fn peer_e2ee_closed(peer_node_id: NodeId) -> Self {
Self::PeerE2eeClosed { peer_node_id }
}
pub fn peer_e2ee_message_received(from_node: NodeId, data: Vec<u8>) -> Self {
Self::PeerE2eeMessageReceived { from_node, data }
}
pub fn peer_e2ee_failed(peer_node_id: NodeId, error: String) -> Self {
Self::PeerE2eeFailed {
peer_node_id,
error,
}
}
pub fn security_violation(kind: SecurityViolationKind, source: Option<String>) -> Self {
Self::SecurityViolation { kind, source }
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum DisconnectReason {
LocalRequest,
RemoteRequest,
Timeout,
LinkLoss,
ConnectionFailed,
#[default]
Unknown,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SecurityViolationKind {
UnencryptedInStrictMode,
DecryptionFailed,
ReplayDetected,
UnauthorizedNode,
}
pub trait PeatObserver: Send + Sync {
fn on_event(&self, event: PeatEvent);
}
#[cfg(feature = "std")]
#[derive(Debug, Default)]
pub struct CollectingObserver {
events: std::sync::Mutex<Vec<PeatEvent>>,
}
#[cfg(feature = "std")]
impl CollectingObserver {
pub fn new() -> Self {
Self {
events: std::sync::Mutex::new(Vec::new()),
}
}
pub fn events(&self) -> Vec<PeatEvent> {
self.events.lock().unwrap().clone()
}
pub fn clear(&self) {
self.events.lock().unwrap().clear();
}
pub fn count(&self) -> usize {
self.events.lock().unwrap().len()
}
}
#[cfg(feature = "std")]
impl PeatObserver for CollectingObserver {
fn on_event(&self, event: PeatEvent) {
self.events.lock().unwrap().push(event);
}
}
#[cfg(feature = "std")]
pub struct ObserverManager {
observers: std::sync::RwLock<Vec<Arc<dyn PeatObserver>>>,
}
#[cfg(feature = "std")]
impl Default for ObserverManager {
fn default() -> Self {
Self::new()
}
}
#[cfg(feature = "std")]
impl ObserverManager {
pub fn new() -> Self {
Self {
observers: std::sync::RwLock::new(Vec::new()),
}
}
pub fn add(&self, observer: Arc<dyn PeatObserver>) {
self.observers.write().unwrap().push(observer);
}
pub fn remove(&self, observer: &Arc<dyn PeatObserver>) {
self.observers
.write()
.unwrap()
.retain(|o| !Arc::ptr_eq(o, observer));
}
pub fn notify(&self, event: PeatEvent) {
if let Ok(observers) = self.observers.try_read() {
for observer in observers.iter() {
observer.on_event(event.clone());
}
}
}
pub fn count(&self) -> usize {
self.observers.read().unwrap().len()
}
}
#[cfg(all(test, feature = "std"))]
mod tests {
use super::*;
#[test]
fn test_collecting_observer() {
let observer = CollectingObserver::new();
observer.on_event(PeatEvent::peer_connected(NodeId::new(0x12345678)));
observer.on_event(PeatEvent::emergency_received(NodeId::new(0x87654321)));
assert_eq!(observer.count(), 2);
let events = observer.events();
assert!(matches!(events[0], PeatEvent::PeerConnected { .. }));
assert!(matches!(events[1], PeatEvent::EmergencyReceived { .. }));
observer.clear();
assert_eq!(observer.count(), 0);
}
#[test]
fn test_observer_manager() {
let manager = ObserverManager::new();
let obs1_concrete = Arc::new(CollectingObserver::new());
let obs2_concrete = Arc::new(CollectingObserver::new());
let observer1: Arc<dyn PeatObserver> = obs1_concrete.clone();
let observer2: Arc<dyn PeatObserver> = obs2_concrete.clone();
manager.add(observer1.clone());
manager.add(observer2.clone());
assert_eq!(manager.count(), 2);
manager.notify(PeatEvent::peer_connected(NodeId::new(0x12345678)));
assert_eq!(obs1_concrete.count(), 1);
assert_eq!(obs2_concrete.count(), 1);
manager.remove(&observer1);
assert_eq!(manager.count(), 1);
manager.notify(PeatEvent::peer_lost(NodeId::new(0x12345678)));
assert_eq!(obs1_concrete.count(), 1); assert_eq!(obs2_concrete.count(), 2); }
}