use std::collections::HashMap;
use std::sync::mpsc::{Receiver, TryRecvError};
use super::error::PeerManagerError;
#[derive(Debug, PartialEq, Clone)]
pub enum PeerManagerNotification {
Connected { peer: String },
Disconnected { peer: String },
}
pub struct PeerNotificationIter {
pub(super) recv: Receiver<PeerManagerNotification>,
}
impl PeerNotificationIter {
pub fn try_next(&self) -> Result<Option<PeerManagerNotification>, PeerManagerError> {
match self.recv.try_recv() {
Ok(notifications) => Ok(Some(notifications)),
Err(TryRecvError::Empty) => Ok(None),
Err(TryRecvError::Disconnected) => Err(PeerManagerError::SendMessageError(
"The peer manager is no longer running".into(),
)),
}
}
}
impl Iterator for PeerNotificationIter {
type Item = PeerManagerNotification;
fn next(&mut self) -> Option<Self::Item> {
match self.recv.recv() {
Ok(notification) => Some(notification),
Err(_) => {
None
}
}
}
}
pub type SubscriberId = usize;
pub(super) type Subscriber =
Box<dyn Fn(PeerManagerNotification) -> Result<(), Box<dyn std::error::Error>> + Send>;
pub(super) struct SubscriberMap {
subscribers: HashMap<SubscriberId, Subscriber>,
next_id: SubscriberId,
}
impl SubscriberMap {
pub fn new() -> Self {
Self {
subscribers: HashMap::new(),
next_id: 0,
}
}
pub fn broadcast(&mut self, notification: PeerManagerNotification) {
let mut failures = vec![];
for (id, callback) in self.subscribers.iter() {
if let Err(err) = (*callback)(notification.clone()) {
failures.push(*id);
debug!("Dropping subscriber ({}): {}", id, err);
}
}
for id in failures {
self.subscribers.remove(&id);
}
}
pub fn add_subscriber(&mut self, subscriber: Subscriber) -> SubscriberId {
let subscriber_id = self.next_id;
self.next_id += 1;
self.subscribers.insert(subscriber_id, subscriber);
subscriber_id
}
pub fn remove_subscriber(&mut self, subscriber_id: SubscriberId) {
self.subscribers.remove(&subscriber_id);
}
}
#[cfg(test)]
pub mod tests {
use super::*;
use std::sync::mpsc::channel;
use std::thread;
#[test]
fn test_peer_manager_notifications() {
let (send, recv) = channel();
let notifcation_iter = PeerNotificationIter { recv };
let join_handle = thread::spawn(move || {
for i in 0..5 {
send.send(PeerManagerNotification::Connected {
peer: format!("test_peer{}", i),
})
.unwrap();
}
});
let mut notifications_sent = 0;
for notifcation in notifcation_iter {
assert_eq!(
notifcation,
PeerManagerNotification::Connected {
peer: format!("test_peer{}", notifications_sent),
}
);
notifications_sent += 1;
}
assert_eq!(notifications_sent, 5);
join_handle.join().unwrap();
}
}