leptos_ws 0.9.7

Leptos WS is a Websocket for the Leptos framework to support updates coordinated from the Server
Documentation
use std::sync::Arc;

use crate::error::Error;
use crate::messages::Messages;
use crate::messages::SignalUpdate;
use crate::traits::ChannelSignalTrait;
use crate::traits::WsSignalCore;
use dashmap::DashMap;
use dashmap::mapref::entry::Entry;
use leptos::prelude::*;
use serde_json::Value;
use tokio::sync::broadcast::Receiver;

#[derive(Clone)]
pub struct WsSignals {
    signals: Arc<DashMap<String, Arc<dyn WsSignalCore + Send + Sync + 'static>>>,
    channels: Arc<DashMap<String, Arc<dyn ChannelSignalTrait + Send + Sync + 'static>>>,
}

impl WsSignals {
    pub fn new() -> Self {
        let signals = Arc::new(DashMap::new());
        let channels = Arc::new(DashMap::new());
        Self { signals, channels }
    }

    pub fn create_signal<T>(&mut self, name: &str, value: T, msg: &Messages) -> Result<(), Error>
    where
        T: WsSignalCore + Send + Sync + Clone + 'static,
    {
        #[cfg(any(feature = "csr", feature = "hydrate"))]
        {
            use crate::ServerSignalWebSocket;

            let ws = use_context::<ServerSignalWebSocket>().ok_or(Error::MissingServerSignals)?;

            match self.signals.entry(name.to_owned()) {
                Entry::Vacant(entry) => {
                    entry.insert(Arc::new(value));
                    ws.send(msg)?;
                    Ok(())
                }
                Entry::Occupied(_) => Err(Error::AddingSignalFailed),
            }
        }

        #[cfg(all(feature = "ssr", not(any(feature = "hydrate", feature = "csr"))))]
        {
            match self.signals.entry(name.to_owned()) {
                Entry::Vacant(entry) => {
                    entry.insert(Arc::new(value));
                    Ok(())
                }
                Entry::Occupied(_) => Err(Error::AddingSignalFailed),
            }
        }
        #[cfg(not(any(feature = "ssr", feature = "hydrate", feature = "csr")))]
        return Err(Error::AddingSignalFailed);
    }

    pub fn create_channel<T>(&mut self, name: &str, value: T, msg: &Messages) -> Result<(), Error>
    where
        T: ChannelSignalTrait + Send + Sync + Clone + 'static,
    {
        #[cfg(any(feature = "csr", feature = "hydrate"))]
        {
            use crate::ServerSignalWebSocket;

            let ws = use_context::<ServerSignalWebSocket>().ok_or(Error::MissingServerSignals)?;

            match self.channels.entry(name.to_owned()) {
                Entry::Vacant(entry) => {
                    entry.insert(Arc::new(value));
                    ws.send(msg)?;
                    Ok(())
                }
                Entry::Occupied(_) => Err(Error::AddingSignalFailed),
            }
        }

        #[cfg(all(feature = "ssr", not(any(feature = "hydrate", feature = "csr"))))]
        {
            match self.channels.entry(name.to_owned()) {
                Entry::Vacant(entry) => {
                    entry.insert(Arc::new(value));
                    Ok(())
                }
                Entry::Occupied(_) => Err(Error::AddingSignalFailed),
            }
        }
        #[cfg(not(any(feature = "ssr", feature = "hydrate", feature = "csr")))]
        return Err(Error::AddingSignalFailed);
    }

    pub fn get_signal<T: Clone + 'static>(&mut self, name: &str) -> Option<T> {
        self.signals
            .get_mut(name)
            .map(|value| value.as_any().downcast_ref::<T>().unwrap().clone())
    }

    pub fn get_channel<T: Clone + 'static>(&mut self, name: &str) -> Option<T> {
        self.channels
            .get_mut(name)
            .map(|value| value.as_any().downcast_ref::<T>().unwrap().clone())
    }

    pub fn contains(&self, name: &str) -> bool {
        self.signals.contains_key(name)
    }

    pub fn add_observer(&self, name: &str) -> Option<Receiver<(Option<String>, Messages)>> {
        self.signals
            .get(name)
            .and_then(|v| v.value().subscribe().ok())
    }

    pub fn add_observer_channel(&self, name: &str) -> Option<Receiver<(Option<String>, Messages)>> {
        self.channels
            .get(name)
            .and_then(|v| v.value().subscribe().ok())
    }

    pub fn handle_message(&self, name: &str, message: Value) -> Option<Result<(), Error>> {
        self.channels.get(name).map(|v| v.handle_message(message))
    }

    pub fn json(&self, name: &str) -> Option<Result<Value, Error>> {
        self.signals.get(name).map(|v| v.json())
    }
    pub async fn update(
        &self,
        name: &str,
        patch: SignalUpdate,
        id: Option<String>,
    ) -> Option<Result<(), Error>> {
        match self.signals.get_mut(name) {
            Some(value) => Some(value.update_json(patch.get_patch(), id).await),
            None => None,
        }
    }

    pub fn set_json(&self, name: &str, new_value: Value) -> Option<Result<(), Error>> {
        self.signals
            .get_mut(name)
            .map(|value| value.set_json(new_value))
    }

    pub fn delete_signal(&mut self, name: &str) -> Result<(), Error> {
        if let Some(signal) = self.signals.remove(name) {
            signal.1.delete()?;
            return Ok(());
        }
        Err(Error::DeletingSignalFailed)
    }

    pub fn delete_channel(&mut self, name: &str) -> Result<(), Error> {
        if let Some(signal) = self.channels.remove(name) {
            signal.1.delete();
            return Ok(());
        }
        Err(Error::DeletingChannelHandlerFailed)
    }

    pub fn get_reconnect_messages(&self) -> Vec<Messages> {
        let mut messages = Vec::new();
        for data in self.signals.iter() {
            if let Ok(message) = data.on_reconnect_message() {
                messages.push(message);
            }
        }

        for data in self.channels.iter() {
            if let Ok(message) = data.on_reconnect_message() {
                messages.push(message);
            }
        }
        messages
    }
}