use std::sync::Arc;
use crate::error::Error;
use crate::messages::Messages;
use crate::messages::SignalUpdate;
use crate::traits::ChannelSignalTrait;
use crate::traits::WsSignalCore;
use dashmap::DashMap;
use dashmap::mapref::entry::Entry;
use leptos::prelude::*;
use serde_json::Value;
use tokio::sync::broadcast::Receiver;
#[derive(Clone)]
pub struct WsSignals {
signals: Arc<DashMap<String, Arc<dyn WsSignalCore + Send + Sync + 'static>>>,
channels: Arc<DashMap<String, Arc<dyn ChannelSignalTrait + Send + Sync + 'static>>>,
}
impl WsSignals {
pub fn new() -> Self {
let signals = Arc::new(DashMap::new());
let channels = Arc::new(DashMap::new());
Self { signals, channels }
}
pub fn create_signal<T>(&mut self, name: &str, value: T, msg: &Messages) -> Result<(), Error>
where
T: WsSignalCore + Send + Sync + Clone + 'static,
{
#[cfg(any(feature = "csr", feature = "hydrate"))]
{
use crate::ServerSignalWebSocket;
let ws = use_context::<ServerSignalWebSocket>().ok_or(Error::MissingServerSignals)?;
match self.signals.entry(name.to_owned()) {
Entry::Vacant(entry) => {
entry.insert(Arc::new(value));
ws.send(msg)?;
Ok(())
}
Entry::Occupied(_) => Err(Error::AddingSignalFailed),
}
}
#[cfg(all(feature = "ssr", not(any(feature = "hydrate", feature = "csr"))))]
{
match self.signals.entry(name.to_owned()) {
Entry::Vacant(entry) => {
entry.insert(Arc::new(value));
Ok(())
}
Entry::Occupied(_) => Err(Error::AddingSignalFailed),
}
}
#[cfg(not(any(feature = "ssr", feature = "hydrate", feature = "csr")))]
return Err(Error::AddingSignalFailed);
}
pub fn create_channel<T>(&mut self, name: &str, value: T, msg: &Messages) -> Result<(), Error>
where
T: ChannelSignalTrait + Send + Sync + Clone + 'static,
{
#[cfg(any(feature = "csr", feature = "hydrate"))]
{
use crate::ServerSignalWebSocket;
let ws = use_context::<ServerSignalWebSocket>().ok_or(Error::MissingServerSignals)?;
match self.channels.entry(name.to_owned()) {
Entry::Vacant(entry) => {
entry.insert(Arc::new(value));
ws.send(msg)?;
Ok(())
}
Entry::Occupied(_) => Err(Error::AddingSignalFailed),
}
}
#[cfg(all(feature = "ssr", not(any(feature = "hydrate", feature = "csr"))))]
{
match self.channels.entry(name.to_owned()) {
Entry::Vacant(entry) => {
entry.insert(Arc::new(value));
Ok(())
}
Entry::Occupied(_) => Err(Error::AddingSignalFailed),
}
}
#[cfg(not(any(feature = "ssr", feature = "hydrate", feature = "csr")))]
return Err(Error::AddingSignalFailed);
}
pub fn get_signal<T: Clone + 'static>(&mut self, name: &str) -> Option<T> {
self.signals
.get_mut(name)
.map(|value| value.as_any().downcast_ref::<T>().unwrap().clone())
}
pub fn get_channel<T: Clone + 'static>(&mut self, name: &str) -> Option<T> {
self.channels
.get_mut(name)
.map(|value| value.as_any().downcast_ref::<T>().unwrap().clone())
}
pub fn contains(&self, name: &str) -> bool {
self.signals.contains_key(name)
}
pub fn add_observer(&self, name: &str) -> Option<Receiver<(Option<String>, Messages)>> {
self.signals
.get(name)
.and_then(|v| v.value().subscribe().ok())
}
pub fn add_observer_channel(&self, name: &str) -> Option<Receiver<(Option<String>, Messages)>> {
self.channels
.get(name)
.and_then(|v| v.value().subscribe().ok())
}
pub fn handle_message(&self, name: &str, message: Value) -> Option<Result<(), Error>> {
self.channels.get(name).map(|v| v.handle_message(message))
}
pub fn json(&self, name: &str) -> Option<Result<Value, Error>> {
self.signals.get(name).map(|v| v.json())
}
pub async fn update(
&self,
name: &str,
patch: SignalUpdate,
id: Option<String>,
) -> Option<Result<(), Error>> {
match self.signals.get_mut(name) {
Some(value) => Some(value.update_json(patch.get_patch(), id).await),
None => None,
}
}
pub fn set_json(&self, name: &str, new_value: Value) -> Option<Result<(), Error>> {
self.signals
.get_mut(name)
.map(|value| value.set_json(new_value))
}
pub fn delete_signal(&mut self, name: &str) -> Result<(), Error> {
if let Some(signal) = self.signals.remove(name) {
signal.1.delete()?;
return Ok(());
}
Err(Error::DeletingSignalFailed)
}
pub fn delete_channel(&mut self, name: &str) -> Result<(), Error> {
if let Some(signal) = self.channels.remove(name) {
signal.1.delete();
return Ok(());
}
Err(Error::DeletingChannelHandlerFailed)
}
pub fn get_reconnect_messages(&self) -> Vec<Messages> {
let mut messages = Vec::new();
for data in self.signals.iter() {
if let Ok(message) = data.on_reconnect_message() {
messages.push(message);
}
}
for data in self.channels.iter() {
if let Ok(message) = data.on_reconnect_message() {
messages.push(message);
}
}
messages
}
}