use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock, Notify};
use tokio::time::{interval, Duration};
use serde_json::json;
pub struct MultiBus {
pub number_of_workers: Mutex<i64>,
pub memory: RwLock<HashMap<String, VecDeque<String>>>,
pub notifiers: RwLock<HashMap<String, Arc<Notify>>>
}
impl MultiBus {
pub async fn initialize(self: Arc<Self>, address: String) {
let mut safety_time = interval(Duration::from_millis(70));
let mut memory = self.memory.write().await;
let mut notifiers = self.notifiers.write().await;
notifiers.insert(address.clone(), Arc::new(Notify::new()));
memory.insert(address.clone(), VecDeque::new());
drop(memory);
safety_time.tick().await;
}
pub async fn remove(self: Arc<Self>, address: String) {
let mut memory = self.memory.write().await;
let mut notifiers = self.notifiers.write().await;
notifiers.remove(&*address.clone());
memory.remove(&*address.clone());
drop(memory);
drop(notifiers);
}
pub async fn send_data(self: Arc<Self>, data: String, address: String, src: String, kind: String) {
let mut safety_time = interval(Duration::from_millis(90));
while *self.number_of_workers.lock().await == 0 {
safety_time.tick().await;
}
let data_to_insert = json!({
"src": src.to_string(),
"data": data.to_string(),
"type": kind.to_owned()
}).to_string();
let mut inf_nr = self.number_of_workers.lock().await;
*inf_nr -= 1;
let mut memory = self.memory.write().await;
memory.entry(address.clone())
.or_default()
.push_front(data_to_insert.clone());
drop(memory);
let notifiers = self.notifiers.write().await;
let notif_feature = notifiers.get(&address).unwrap().clone();
drop(notifiers);
notif_feature.notify_waiters();
}
pub async fn request_data(self: Arc<Self>, address: String) -> Option<String> {
let memory_data = self.memory.read().await;
let nr_req = memory_data.get(&address).unwrap().len();
drop(memory_data);
if nr_req > 0 {
return self.get_one_request(&address).await;
}
let notifiers = self.notifiers.read().await;
let notif_feature = notifiers.get(&address).unwrap().clone();
drop(notifiers);
notif_feature.notified().await;
self.get_one_request(&address).await
}
async fn get_one_request(self: Arc<Self>, address: &str) -> Option<String> {
let mut memory_data = self.memory.write().await;
let result = memory_data.entry(address.to_string())
.or_default()
.pop_back();
drop(memory_data);
let self_clone = self.clone();
let mut inf_nr = self_clone.number_of_workers.lock().await;
*inf_nr += 1;
result
}
}
pub fn create_bus() -> Arc<MultiBus> {
let hsh_mp: RwLock<HashMap<String, VecDeque<String>>> = RwLock::new(HashMap::new());
let nr_worker: Mutex<i64> = Mutex::new(10);
let notifiers: RwLock<HashMap<String, Arc<Notify>>> = RwLock::new(HashMap::new());
Arc::new(MultiBus {
number_of_workers: nr_worker,
memory: hsh_mp,
notifiers
})
}