1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
use super::PubSub;

use many_to_many::ManyToMany;
use std::sync::{Arc, Mutex};
use std::hash::Hash;

/// The pubsub client.
///
/// ## Example Usage
///
/// ```
/// use hive_pubsub::{Hive, PubSub};
/// 
/// let mut hive = Hive::new(
///    |users, data| {
///        // do something with users and data
///    }
/// );
///
/// hive.subscribe_multiple(1, vec! [ 2, 3 ]).unwrap();
/// hive.publish(&2, "Data sent for topic 2.".to_string()).unwrap();
/// ```
#[derive(Clone)]
pub struct Hive<ClientKey: Clone + Eq + Hash, TopicKey: Clone + Eq + Hash, Data> {
    callback: Arc<Mutex<Box<dyn Fn(Vec<ClientKey>, Data) + Send>>>,
    pub map: Arc<Mutex<ManyToMany<ClientKey, TopicKey>>>,
}

impl<ClientKey: Clone + Eq + Hash, TopicKey: Clone + Eq + Hash, Data> Hive<ClientKey, TopicKey, Data> {
    pub fn new
        <F: 'static + Fn(Vec<ClientKey>, Data) + Send>
        (callback: F) -> Hive<ClientKey, TopicKey, Data> {
        Hive {
            callback: Arc::new(Mutex::new(Box::new(callback))),
            map: Arc::new(Mutex::new(ManyToMany::new()))
        }
    }
}

impl<ClientKey: Clone + Eq + Hash, TopicKey: Clone + Eq + Hash, Data> PubSub<ClientKey, TopicKey, Data> for Hive<ClientKey, TopicKey, Data> {
    fn drop_client(&self, id: &ClientKey) -> Result<(), String> {
        self.map
            .lock()
            .map_err(|_| "Failed to lock many-to-many map.".to_string())?
            .remove_left(id);

        Ok(())
    }

    fn drop_topic(&self, id: &TopicKey) -> Result<(), String> {
        self.map
            .lock()
            .map_err(|_| "Failed to lock many-to-many map.".to_string())?
            .remove_right(id);

        Ok(())
    }

    fn subscribe(&self, client: ClientKey, topic: TopicKey) -> Result<(), String> {
        self.map
            .lock()
            .map_err(|_| "Failed to lock many-to-many map.".to_string())?
            .insert(client, topic);

        Ok(())
    }

    fn subscribe_multiple(&self, client: ClientKey, topics: Vec<TopicKey>) -> Result<(), String> {
        let mut map = self.map
            .lock()
            .map_err(|_| "Failed to lock many-to-many map.".to_string())?;
        
        for topic in topics {
            map.insert(client.clone(), topic);
        }

        Ok(())
    }

    fn unsubscribe(&self, client: &ClientKey, topic: &TopicKey) -> Result<(), String> {
        self.map
            .lock()
            .map_err(|_| "Failed to lock many-to-many map.".to_string())?
            .remove(client, topic);

        Ok(())
    }

    fn unsubscribe_multiple(&self, client: &ClientKey, topics: &Vec<TopicKey>) -> Result<(), String> {
        let mut map = self.map
            .lock()
            .map_err(|_| "Failed to lock many-to-many map.".to_string())?;
        
        for topic in topics {
            map.remove(client, topic);
        }

        Ok(())
    }

    fn publish(&self, topic: &TopicKey, data: Data) -> Result<(), String> {
        let topics = self.map
            .lock()
            .map_err(|_| "Failed to lock many-to-many map.".to_string())?
            .get_right(topic);
        
        if let Some(clients) = topics {
            (*self.callback.lock().map_err(|_| "Failed to lock callback.")?)(clients, data);
        }

        Ok(())
    }
}