use std::{collections::HashMap, hash::RandomState};
use serde::Serialize;
use tauri::ipc::Channel;
use tokio::sync::RwLock;
use crate::{
operations::serialize::{object_array_from_value, object_from_value, OperationNotification},
queries::{serialize::QueryTree, Checkable},
};
pub fn process_channel_event<'a, T>(
channels: &'a HashMap<String, (QueryTree, Channel<serde_json::Value>)>,
operation: &OperationNotification<T>,
) -> Vec<&'a str>
where
T: Clone + Serialize,
{
let serialized_operation = serde_json::to_value(operation).unwrap();
let data = serialized_operation.get("data").unwrap();
let mut failing_channels: Vec<&str> = Vec::new();
match operation {
OperationNotification::Create { .. } | OperationNotification::Delete { .. } => {
let object = object_from_value(data.clone()).unwrap();
for (key, (query, channel)) in channels.iter() {
if query.check(&object) {
if channel.send(serialized_operation.clone()).is_err() {
failing_channels.push(key);
}
}
}
}
OperationNotification::Update {
table,
data: notif_data,
id,
} => {
let object = object_from_value(data.clone()).unwrap();
for (key, (query, channel)) in channels.iter() {
if query.check(&object) {
if channel.send(serialized_operation.clone()).is_err() {
failing_channels.push(key);
}
} else {
let delete_operation = serde_json::to_value(OperationNotification::Delete {
table: table.clone(),
data: notif_data.clone(),
id: id.clone(),
})
.unwrap();
if channel.send(delete_operation).is_err() {
failing_channels.push(key);
}
}
}
}
OperationNotification::CreateMany {
data: unserialized_data,
..
} => {
let objects = object_array_from_value(data.clone()).unwrap();
for (key, (query, channel)) in channels.iter() {
let mut matching_objects: Vec<T> = Vec::new();
for (index, object) in objects.iter().enumerate() {
if query.check(&object) {
matching_objects.push(unserialized_data[index].clone());
}
}
if !matching_objects.is_empty() {
let serialized_operation =
serde_json::to_value(OperationNotification::CreateMany {
table: "todos".to_string(),
data: matching_objects,
})
.unwrap();
if channel.send(serialized_operation).is_err() {
failing_channels.push(key);
}
}
}
}
};
failing_channels
}
pub async fn process_event_and_update_channels<T>(
channels: &RwLock<HashMap<String, (QueryTree, Channel<serde_json::Value>), RandomState>>,
operation: &OperationNotification<T>,
) where
T: Clone + Serialize,
{
let subscriptions = channels.read().await;
let failing_channels = process_channel_event(&subscriptions, operation);
if !failing_channels.is_empty() {
let mut subscriptions = channels.write().await;
for key in failing_channels {
subscriptions.remove(key);
}
}
}