use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::RwLock;
use thiserror::Error;
pub mod websocket;
pub mod memory;
pub mod multi_transport;
pub mod leptos_ws_pro_transport;
pub mod compatibility_layer;
pub mod hybrid_transport_impl;
pub mod message_protocol;
pub mod websocket_client;
pub mod websocket_integration;
#[cfg(test)]
pub mod leptos_ws_pro_tests;
#[cfg(test)]
pub mod real_websocket_tests;
#[cfg(test)]
pub mod server_compatibility_tests;
#[cfg(test)]
pub mod hybrid_transport_tests;
#[cfg(test)]
pub mod enhanced_features_tests;
#[cfg(test)]
pub mod websocket_integration_tests;
#[derive(Error, Debug)]
pub enum TransportError {
#[error("Connection failed: {0}")]
ConnectionFailed(String),
#[error("Send failed: {0}")]
SendFailed(String),
#[error("Receive failed: {0}")]
ReceiveFailed(String),
#[error("Serialization failed: {0}")]
SerializationFailed(String),
#[error("Not connected")]
NotConnected,
}
impl From<compatibility_layer::CompatibilityError> for TransportError {
fn from(err: compatibility_layer::CompatibilityError) -> Self {
match err {
compatibility_layer::CompatibilityError::Transport(transport_err) => transport_err,
compatibility_layer::CompatibilityError::Serialization(msg) => {
TransportError::SerializationFailed(msg)
}
compatibility_layer::CompatibilityError::Protocol(msg) => {
TransportError::ConnectionFailed(msg)
}
}
}
}
pub trait SyncTransport: Send + Sync {
type Error: std::error::Error + Send + Sync;
fn send<'a>(&'a self, data: &'a [u8]) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), Self::Error>> + Send + 'a>>;
fn receive(&self) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<Vec<u8>>, Self::Error>> + Send + '_>>;
fn is_connected(&self) -> bool;
}
pub struct InMemoryTransport {
connected: bool,
message_queue: Arc<RwLock<Vec<Vec<u8>>>>,
}
impl InMemoryTransport {
pub fn new() -> Self {
Self {
connected: true,
message_queue: Arc::new(RwLock::new(Vec::new())),
}
}
pub fn with_connection_status(connected: bool) -> Self {
Self {
connected,
message_queue: Arc::new(RwLock::new(Vec::new())),
}
}
}
impl SyncTransport for InMemoryTransport {
type Error = TransportError;
fn send<'a>(&'a self, data: &'a [u8]) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), Self::Error>> + Send + 'a>> {
Box::pin(async move {
if !self.connected {
return Err(TransportError::NotConnected);
}
let mut queue = self.message_queue.write().await;
queue.push(data.to_vec());
Ok(())
})
}
fn receive(&self) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<Vec<u8>>, Self::Error>> + Send + '_>> {
Box::pin(async move {
if !self.connected {
return Err(TransportError::NotConnected);
}
let mut queue = self.message_queue.write().await;
let messages = queue.drain(..).collect();
Ok(messages)
})
}
fn is_connected(&self) -> bool {
self.connected
}
}
impl Clone for InMemoryTransport {
fn clone(&self) -> Self {
Self {
connected: self.connected,
message_queue: self.message_queue.clone(),
}
}
}
pub use websocket::{WebSocketTransport, WebSocketConfig, WebSocketError, ConnectionState};
pub use hybrid_transport_impl::HybridTransport;
pub use websocket_client::{WebSocketClient, WebSocketClientConfig, WebSocketClientError};
pub use message_protocol::{SyncMessage, MessageCodec, CrdtType, UserInfo, PresenceAction, ServerInfo};
pub use websocket_integration::{WebSocketSyncEngine, WebSocketIntegrationConfig, WebSocketSyncEngineBuilder};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TransportConfig {
pub url: Option<String>,
pub timeout: std::time::Duration,
pub retry_attempts: u32,
pub heartbeat_interval: std::time::Duration,
}
impl Default for TransportConfig {
fn default() -> Self {
Self {
url: None,
timeout: std::time::Duration::from_secs(30),
retry_attempts: 3,
heartbeat_interval: std::time::Duration::from_secs(30),
}
}
}
pub struct TransportFactory;
impl TransportFactory {
pub fn websocket(url: String) -> WebSocketTransport {
WebSocketTransport::new(url)
}
pub fn leptos_ws_pro(config: leptos_ws_pro_transport::LeptosWsProConfig) -> HybridTransport {
HybridTransport::with_leptos_ws_pro(config)
}
pub fn compatibility(config: leptos_ws_pro_transport::LeptosWsProConfig) -> HybridTransport {
HybridTransport::with_compatibility(config)
}
pub fn in_memory() -> InMemoryTransport {
InMemoryTransport::new()
}
pub fn hybrid(primary_url: String) -> HybridTransport {
let primary = HybridTransport::with_leptos_ws_pro(leptos_ws_pro_transport::LeptosWsProConfig {
url: primary_url,
..Default::default()
});
let fallback = HybridTransport::with_in_memory();
HybridTransport::with_fallback(primary, fallback)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_in_memory_transport() {
let transport = InMemoryTransport::new();
let data = b"test message";
assert!(transport.send(data).await.is_ok());
let received = transport.receive().await.unwrap();
assert_eq!(received.len(), 1);
assert_eq!(received[0], data);
}
#[tokio::test]
async fn test_hybrid_transport_fallback() {
let primary = HybridTransport::with_websocket("ws://invalid-url".to_string());
let fallback = HybridTransport::with_in_memory();
let transport = HybridTransport::with_fallback(primary, fallback.clone());
let data = b"test message";
assert!(fallback.send(data).await.is_ok());
let received = fallback.receive().await.unwrap();
assert_eq!(received.len(), 1);
assert_eq!(received[0], data);
}
}