use super::{SyncTransport, TransportError};
use crate::transport::leptos_ws_pro_transport::{LeptosWsProTransport, LeptosWsProConfig, MessageProtocolAdapter};
use serde::{Deserialize, Serialize};
use thiserror::Error;
#[derive(Error, Debug)]
pub enum CompatibilityError {
#[error("Transport error: {0}")]
Transport(#[from] TransportError),
#[error("Serialization error: {0}")]
Serialization(String),
#[error("Protocol error: {0}")]
Protocol(String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum SyncMessage {
#[serde(rename = "sync")]
Sync {
peer_id: String,
data: serde_json::Value,
timestamp: String,
},
#[serde(rename = "presence")]
Presence {
peer_id: String,
action: String,
timestamp: String,
},
#[serde(rename = "heartbeat")]
Heartbeat {
timestamp: String,
},
#[serde(rename = "welcome")]
Welcome {
peer_id: String,
timestamp: String,
server_info: ServerInfo,
},
#[serde(rename = "binary_ack")]
BinaryAck {
peer_id: String,
size: usize,
timestamp: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServerInfo {
pub version: String,
pub max_connections: usize,
pub heartbeat_interval: u64,
}
pub struct CompatibilityTransport {
leptos_ws_pro: LeptosWsProTransport,
adapter: MessageProtocolAdapter,
message_buffer: std::collections::VecDeque<Vec<u8>>,
}
impl CompatibilityTransport {
pub fn new(config: LeptosWsProConfig) -> Self {
let leptos_ws_pro = LeptosWsProTransport::new(config);
let adapter = MessageProtocolAdapter::new(leptos_ws_pro.clone());
Self {
leptos_ws_pro,
adapter,
message_buffer: std::collections::VecDeque::new(),
}
}
pub fn with_url(url: String) -> Self {
let config = LeptosWsProConfig {
url,
..Default::default()
};
Self::new(config)
}
pub async fn connect(&self) -> Result<(), CompatibilityError> {
self.leptos_ws_pro.connect().await
.map_err(|e| CompatibilityError::Transport(e.into()))
}
pub async fn disconnect(&self) -> Result<(), CompatibilityError> {
self.leptos_ws_pro.disconnect().await
.map_err(|e| CompatibilityError::Transport(e.into()))
}
pub async fn send_sync(&self, peer_id: &str, data: serde_json::Value) -> Result<(), CompatibilityError> {
self.adapter.send_sync_message(peer_id, data).await
.map_err(|e| CompatibilityError::Transport(e.into()))
}
pub async fn send_presence(&self, peer_id: &str, action: &str) -> Result<(), CompatibilityError> {
self.adapter.send_presence_message(peer_id, action).await
.map_err(|e| CompatibilityError::Transport(e.into()))
}
pub async fn send_heartbeat(&self) -> Result<(), CompatibilityError> {
self.adapter.send_heartbeat().await
.map_err(|e| CompatibilityError::Transport(e.into()))
}
pub async fn receive_messages(&self) -> Result<Vec<SyncMessage>, CompatibilityError> {
let raw_messages = self.adapter.receive_messages().await
.map_err(|e| CompatibilityError::Transport(e.into()))?;
let mut parsed_messages = Vec::new();
for raw_message in raw_messages {
match serde_json::from_value::<SyncMessage>(raw_message) {
Ok(parsed) => parsed_messages.push(parsed),
Err(e) => {
tracing::warn!("Failed to parse sync message: {}", e);
continue;
}
}
}
Ok(parsed_messages)
}
pub fn leptos_ws_pro_transport(&self) -> &LeptosWsProTransport {
&self.leptos_ws_pro
}
pub fn message_adapter(&self) -> &MessageProtocolAdapter {
&self.adapter
}
}
impl SyncTransport for CompatibilityTransport {
type Error = TransportError;
fn send<'a>(&'a self, data: &'a [u8]) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), Self::Error>> + Send + 'a>> {
Box::pin(async move {
if let Ok(message) = serde_json::from_slice::<serde_json::Value>(data) {
if let Some(msg_type) = message.get("type").and_then(|t| t.as_str()) {
match msg_type {
"sync" => {
if let (Some(peer_id), Some(data)) = (
message.get("peer_id").and_then(|p| p.as_str()),
message.get("data")
) {
return self.send_sync(peer_id, data.clone()).await
.map_err(|e| match e {
CompatibilityError::Transport(t) => t,
_ => TransportError::SendFailed(e.to_string()),
});
}
}
"presence" => {
if let (Some(peer_id), Some(action)) = (
message.get("peer_id").and_then(|p| p.as_str()),
message.get("action").and_then(|a| a.as_str())
) {
return self.send_presence(peer_id, action).await
.map_err(|e| match e {
CompatibilityError::Transport(t) => t,
_ => TransportError::SendFailed(e.to_string()),
});
}
}
"heartbeat" => {
return self.send_heartbeat().await
.map_err(|e| match e {
CompatibilityError::Transport(t) => t,
_ => TransportError::SendFailed(e.to_string()),
});
}
_ => {
}
}
}
}
self.leptos_ws_pro.send(data).await
})
}
fn receive(&self) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<Vec<u8>>, Self::Error>> + Send + '_>> {
Box::pin(async move {
let messages = self.receive_messages().await
.map_err(|e| match e {
CompatibilityError::Transport(t) => t,
_ => TransportError::ReceiveFailed(e.to_string()),
})?;
let mut raw_messages = Vec::new();
for message in messages {
match serde_json::to_vec(&message) {
Ok(raw) => raw_messages.push(raw),
Err(e) => {
tracing::warn!("Failed to serialize message: {}", e);
continue;
}
}
}
Ok(raw_messages)
})
}
fn is_connected(&self) -> bool {
self.leptos_ws_pro.is_connected()
}
}
impl Clone for CompatibilityTransport {
fn clone(&self) -> Self {
Self {
leptos_ws_pro: self.leptos_ws_pro.clone(),
adapter: MessageProtocolAdapter::new(self.leptos_ws_pro.clone()),
message_buffer: std::collections::VecDeque::new(),
}
}
}
pub struct MigrationHelper {
new_transport: CompatibilityTransport,
migration_complete: bool,
}
impl MigrationHelper {
pub fn new(config: LeptosWsProConfig) -> Self {
Self {
new_transport: CompatibilityTransport::new(config),
migration_complete: false,
}
}
pub async fn migrate(&mut self) -> Result<(), CompatibilityError> {
self.new_transport.connect().await?;
self.migration_complete = true;
Ok(())
}
pub fn is_migration_complete(&self) -> bool {
self.migration_complete
}
pub fn new_transport(&self) -> &CompatibilityTransport {
&self.new_transport
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::transport::InMemoryTransport;
#[tokio::test]
async fn test_compatibility_transport_creation() {
let config = LeptosWsProConfig::default();
let transport = CompatibilityTransport::new(config);
assert!(!transport.is_connected());
}
#[tokio::test]
async fn test_sync_message_parsing() {
let config = LeptosWsProConfig::default();
let transport = CompatibilityTransport::new(config);
let sync_data = serde_json::json!({
"changes": ["change1", "change2"],
"client_id": "test_client"
});
let result = transport.send_sync("test_peer", sync_data).await;
assert!(result.is_err()); }
#[tokio::test]
async fn test_presence_message_parsing() {
let config = LeptosWsProConfig::default();
let transport = CompatibilityTransport::new(config);
let result = transport.send_presence("test_peer", "connected").await;
assert!(result.is_err()); }
#[tokio::test]
async fn test_heartbeat_message_parsing() {
let config = LeptosWsProConfig::default();
let transport = CompatibilityTransport::new(config);
let result = transport.send_heartbeat().await;
assert!(result.is_err()); }
#[tokio::test]
async fn test_migration_helper() {
let config = LeptosWsProConfig {
url: "ws://invalid-url-that-does-not-exist:9999".to_string(),
..Default::default()
};
let mut helper = MigrationHelper::new(config);
assert!(!helper.is_migration_complete());
let result = helper.migrate().await;
assert!(result.is_err()); }
#[tokio::test]
async fn test_sync_transport_trait_compliance() {
let config = LeptosWsProConfig::default();
let transport = CompatibilityTransport::new(config);
assert!(!transport.is_connected());
let data = b"trait compliance test";
let send_result = transport.send(data).await;
assert!(send_result.is_err());
let receive_result = transport.receive().await;
assert!(receive_result.is_ok());
}
}