use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
use futures::channel::mpsc;
use dsf_core::types::Id;
pub use dsf_rpc::{SubscriptionInfo, SubscriptionKind};
use crate::error::Error;
use crate::io::unix::UnixMessage;
#[derive(Clone)]
pub struct SubscriberInst {
pub info: SubscriptionInfo,
}
#[derive(Clone, Debug)]
pub struct UnixSubscriber {
pub connection_id: u32,
pub sender: mpsc::Sender<UnixMessage>,
}
#[derive(Clone)]
pub struct SubscriberManager {
store: Arc<Mutex<HashMap<Id, Vec<SubscriberInst>>>>,
}
impl SubscriberManager {
pub fn new() -> Self {
Self {
store: Arc::new(Mutex::new(HashMap::new())),
}
}
pub fn find(&self, service_id: &Id) -> Result<Vec<SubscriberInst>, Error> {
let s = self.store.lock().unwrap();
match s.get(service_id) {
Some(v) => Ok(v.clone()),
None => Ok(vec![]),
}
}
pub fn update_peer<F: Fn(&mut SubscriberInst)>(
&mut self,
service_id: &Id,
peer_id: &Id,
f: F,
) -> Result<(), Error> {
let mut store = self.store.lock().unwrap();
let subscribers = store.entry(service_id.clone()).or_insert(vec![]);
let mut subscriber = subscribers.iter_mut().find(|s| {
if let SubscriptionKind::Peer(i) = &s.info.kind {
return i == peer_id;
}
false
});
if subscriber.is_none() {
let s = SubscriberInst {
info: SubscriptionInfo {
service_id: service_id.clone(),
kind: SubscriptionKind::Peer(peer_id.clone()),
updated: Some(SystemTime::now()),
expiry: None,
},
};
subscribers.push(s);
let n = subscribers.len();
subscriber = Some(&mut subscribers[n - 1]);
}
if let Some(mut s) = subscriber {
f(&mut s);
}
Ok(())
}
pub fn update_socket<F: Fn(&mut SubscriberInst)>(
&mut self,
service_id: &Id,
socket_id: u32,
f: F,
) -> Result<(), Error> {
let mut store = self.store.lock().unwrap();
let subscribers = store.entry(service_id.clone()).or_insert(vec![]);
let mut subscriber = subscribers.iter_mut().find(|s| {
if let SubscriptionKind::Socket(i) = &s.info.kind {
return *i == socket_id;
}
false
});
if subscriber.is_none() {
let s = SubscriberInst {
info: SubscriptionInfo {
service_id: service_id.clone(),
kind: SubscriptionKind::Socket(socket_id),
updated: Some(SystemTime::now()),
expiry: None,
},
};
subscribers.push(s);
let n = subscribers.len();
subscriber = Some(&mut subscribers[n - 1]);
}
if let Some(mut s) = subscriber {
f(&mut s);
}
Ok(())
}
pub fn remove(&mut self, service_id: &Id, peer_id: &Id) -> Result<(), Error> {
let mut store = self.store.lock().unwrap();
let subscribers = store.entry(service_id.clone()).or_insert(vec![]);
for i in 0..subscribers.len() {
match &subscribers[i].info.kind {
SubscriptionKind::Peer(id) if id == peer_id => {
subscribers.remove(i);
}
_ => (),
}
}
Ok(())
}
}