use std::{collections::HashMap, sync::{mpsc, Arc, RwLock}, thread::{self, sleep}, time::Duration};
use lazy_static;
use log::error;
use uuid::Uuid;
use serde_json::Value;
lazy_static! {
static ref XQ_NOTIFICATION_MANAGER: Arc<RwLock<NotificationManager>> =
Arc::new(RwLock::new(NotificationManager::new()));
}
pub type NotificationType = Value;
pub struct NotificationManager {
sender_map: HashMap<String, Vec<NotificationObj>>,
}
unsafe impl Send for NotificationManager {}
unsafe impl Sync for NotificationManager {}
#[derive(Debug, Clone)]
struct NotificationObj {
id: String,
notification_id: String,
sender: mpsc::Sender<NotificationType>,
}
impl NotificationManager {
fn new() -> Self {
thread::spawn(|| {
sleep(Duration::from_secs(30));
match XQ_NOTIFICATION_MANAGER.write() {
Ok(_) => {},
Err(_) => {},
}
});
NotificationManager {
sender_map: HashMap::new(),
}
}
pub fn observe(notification_id: &str) -> Option<mpsc::Receiver<NotificationType>> {
match XQ_NOTIFICATION_MANAGER.write() {
Ok(mut manager) => {
let (s, r) = mpsc::channel();
let id = Uuid::new_v4().to_string();
let obj = NotificationObj {
id,
notification_id: notification_id.to_string(),
sender: s,
};
match manager.sender_map.get_mut(notification_id) {
Some(senders) => {
senders.push(obj);
}
None => {
manager
.sender_map
.insert(notification_id.to_string(), vec![obj]);
}
}
return Some(r);
}
Err(e) => {
error!("observe try_write error: {}", e);
}
}
None
}
pub fn publish(notification_id: &str, msg: NotificationType) -> bool {
match XQ_NOTIFICATION_MANAGER.write() {
Ok(mut manager) => {
return manager.publish_s(notification_id, msg);
},
Err(e) => {
error!("publish write error: {}", e);
}
}
false
}
fn publish_s(&mut self, notification_id: &str, msg: NotificationType) -> bool {
match self.sender_map.get_mut(notification_id) {
Some(senders) => {
let mut index = 0;
let mut send_succeed = false;
while index < senders.len() {
let item = &senders[index];
if item.notification_id.eq(notification_id) {
match item.sender.send(msg.clone()) {
Ok(_) => {
send_succeed = true;
}
Err(e) => {
if e.to_string().eq("sending on a closed channel") {
senders.remove(index);
continue;
} else {
error!("publish error: {}", e);
}
}
}
}
index += 1;
}
if senders.len() == 0 {
self.sender_map.remove(notification_id);
}
return send_succeed;
}
None => {}
}
false
}
pub fn clear_notification_id(notification_id: &str) {
match XQ_NOTIFICATION_MANAGER.write() {
Ok(mut manager) => {
manager.sender_map.remove(notification_id);
}
Err(_) => {}
}
}
#[allow(dead_code)]
fn drop_receiver_from_id(id: &str) {
match XQ_NOTIFICATION_MANAGER.write() {
Ok(mut manager) => {
for (_, value) in manager.sender_map.iter_mut() {
let mut index = 0;
while index < value.len() {
let item = &value[index];
if item.id.eq(id) {
value.remove(index);
return;
} else {
index += 1;
}
}
}
}
Err(_) => {}
}
}
}