#![doc = include_str!("../README.md")]
#![feature(unboxed_closures)]
#[cfg(not(feature = "ssr"))]
use crate::client_signal::ClientSignal;
#[cfg(not(feature = "ssr"))]
use client_signals::ClientSignals;
use codee::string::JsonSerdeCodec;
use leptos::prelude::*;
#[cfg(not(feature = "ssr"))]
use leptos_use::core::ConnectionReadyState;
#[cfg(not(feature = "ssr"))]
use leptos_use::{use_websocket_with_options, UseWebSocketOptions, UseWebSocketReturn};
#[cfg(not(feature = "ssr"))]
use messages::Messages;
#[cfg(not(feature = "ssr"))]
use std::sync::{Arc, Mutex};
pub mod error;
pub mod messages;
#[cfg(feature = "ssr")]
mod server_signal;
#[cfg(feature = "ssr")]
pub mod server_signals;
#[cfg(not(feature = "ssr"))]
mod client_signal;
#[cfg(not(feature = "ssr"))]
mod client_signals;
#[cfg(all(feature = "axum", feature = "ssr"))]
pub mod axum;
#[cfg(feature = "ssr")]
pub type ServerSignal<T> = server_signal::ServerSignal<T>;
#[cfg(not(feature = "ssr"))]
pub type ServerSignal<T> = ClientSignal<T>;
#[cfg(not(feature = "ssr"))]
#[derive(Clone)]
struct ServerSignalWebSocket {
send: Arc<dyn Fn(&Messages) + Send + Sync + 'static>,
ready_state: Signal<ConnectionReadyState>,
delayed_msgs: Arc<Mutex<Vec<Messages>>>,
}
#[cfg(not(feature = "ssr"))]
impl ServerSignalWebSocket {
pub fn send(&self, msg: &Messages) -> Result<(), serde_json::Error> {
if self.ready_state.get() != ConnectionReadyState::Open {
self.delayed_msgs
.lock()
.expect("Failed to lock delayed_msgs")
.push(msg.clone());
} else {
(self.send)(&msg);
}
Ok(())
}
pub fn new(url: &str) -> Self {
let delayed_msgs = Arc::default();
let state_signals = ClientSignals::new();
let UseWebSocketReturn {
ready_state,
send,
open,
..
} = use_websocket_with_options::<Messages, Messages, JsonSerdeCodec>(
url,
UseWebSocketOptions::default()
.on_message(Self::handle_message(state_signals.clone()))
.immediate(false),
);
let ws_client = Self {
ready_state: ready_state.clone(),
send: Arc::new(send),
delayed_msgs,
};
open();
provide_context(state_signals);
Self::setup_delayed_message_processor(&ws_client, ready_state);
ws_client
}
fn handle_message(state_signals: ClientSignals) -> impl Fn(&Messages) {
move |msg: &Messages| match msg {
Messages::Establish(_) => todo!(),
Messages::EstablishResponse((name, value)) => {
state_signals.set_json(name, value.to_owned());
}
Messages::Update(update) => {
state_signals.update(&update.name, update.to_owned());
}
}
}
fn setup_delayed_message_processor(
ws_client: &Self,
ready_state: Signal<ConnectionReadyState>,
) {
let ws_clone = ws_client.clone();
Effect::new(move |_| {
if ready_state.get() == ConnectionReadyState::Open {
Self::process_delayed_messages(&ws_clone);
}
});
}
fn process_delayed_messages(ws: &Self) {
let messages = {
let mut delayed_msgs = ws.delayed_msgs.lock().expect("Failed to lock delayed_msgs");
delayed_msgs.drain(..).collect::<Vec<_>>()
};
for msg in messages {
if let Err(err) = ws.send(&msg) {
eprintln!("Failed to send delayed message: {:?}", err);
}
}
}
}
#[cfg(not(feature = "ssr"))]
#[inline]
fn provide_websocket_inner(url: &str) -> Option<()> {
use leptos::prelude::{provide_context, use_context};
if let None = use_context::<ServerSignalWebSocket>() {
provide_context(ServerSignalWebSocket::new(url));
}
Some(())
}
#[cfg(feature = "ssr")]
#[inline]
fn provide_websocket_inner(_url: &str) -> Option<()> {
None
}
pub fn provide_websocket(url: &str) -> Option<()> {
provide_websocket_inner(url)
}