use super::{
WebSocketClient, WebSocketClientConfig, WebSocketSyncEngine, WebSocketIntegrationConfig,
WebSocketSyncEngineBuilder, SyncMessage, MessageCodec, CrdtType,
};
use crate::crdt::ReplicaId;
use crate::storage::memory::MemoryStorage;
use std::time::SystemTime;
use tokio::time::{sleep, Duration};
fn create_test_replica_id() -> ReplicaId {
ReplicaId::from(uuid::Uuid::new_v4())
}
#[tokio::test]
async fn test_websocket_client_basic_operations() {
let replica_id = create_test_replica_id();
let config = WebSocketClientConfig::default();
let client = WebSocketClient::new(config, replica_id);
assert_eq!(client.replica_id(), replica_id);
assert_eq!(client.connection_state().await, crate::transport::websocket_client::ConnectionState::Disconnected);
let result = client.connect().await;
assert!(result.is_ok());
assert_eq!(client.connection_state().await, crate::transport::websocket_client::ConnectionState::Connected);
let message = SyncMessage::Heartbeat {
replica_id: replica_id.clone(),
timestamp: SystemTime::now(),
};
let result = client.send_message(message).await;
assert!(result.is_ok());
let result = client.disconnect().await;
assert!(result.is_ok());
assert_eq!(client.connection_state().await, crate::transport::websocket_client::ConnectionState::Disconnected);
}
#[tokio::test]
async fn test_message_protocol_serialization() {
let replica_id = create_test_replica_id();
let heartbeat = SyncMessage::Heartbeat {
replica_id: replica_id.clone(),
timestamp: SystemTime::now(),
};
let serialized = MessageCodec::serialize(&heartbeat).unwrap();
let deserialized = MessageCodec::deserialize(&serialized).unwrap();
match (heartbeat, deserialized) {
(SyncMessage::Heartbeat { replica_id: id1, timestamp: t1 },
SyncMessage::Heartbeat { replica_id: id2, timestamp: t2 }) => {
assert_eq!(id1, id2);
assert_eq!(t1, t2);
}
_ => panic!("Message types don't match"),
}
let delta = SyncMessage::Delta {
collection_id: "test_collection".to_string(),
crdt_type: CrdtType::LwwRegister,
delta: b"test delta data".to_vec(),
timestamp: SystemTime::now(),
replica_id: replica_id.clone(),
};
let serialized = MessageCodec::serialize(&delta).unwrap();
let deserialized = MessageCodec::deserialize(&serialized).unwrap();
match deserialized {
SyncMessage::Delta {
collection_id,
crdt_type,
delta: delta_data,
replica_id: id,
timestamp: _
} => {
assert_eq!(collection_id, "test_collection");
assert_eq!(crdt_type, CrdtType::LwwRegister);
assert_eq!(delta_data, b"test delta data");
assert_eq!(id, replica_id);
}
_ => panic!("Expected Delta message"),
}
}
#[tokio::test]
async fn test_websocket_sync_engine_lifecycle() {
let storage = MemoryStorage::new();
let config = WebSocketIntegrationConfig::default();
let replica_id = create_test_replica_id();
let engine = WebSocketSyncEngine::new(crate::storage::Storage::Memory(storage), config, replica_id);
assert_eq!(engine.websocket_client().replica_id(), replica_id);
assert!(!engine.is_running().await);
let result = engine.start().await;
assert!(result.is_ok());
assert!(engine.is_running().await);
let result = engine.stop().await;
assert!(result.is_ok());
assert!(!engine.is_running().await);
}
#[tokio::test]
async fn test_websocket_sync_engine_builder() {
let storage = MemoryStorage::new();
let replica_id = create_test_replica_id();
let engine = WebSocketSyncEngineBuilder::new()
.with_replica_id(replica_id)
.with_url("ws://test.example.com:8080".to_string())
.build(crate::storage::Storage::Memory(storage));
assert_eq!(engine.websocket_client().replica_id(), replica_id);
}
#[tokio::test]
async fn test_send_delta() {
let storage = MemoryStorage::new();
let config = WebSocketIntegrationConfig::default();
let replica_id = create_test_replica_id();
let engine = WebSocketSyncEngine::new(crate::storage::Storage::Memory(storage), config, replica_id);
engine.start().await.unwrap();
let delta_data = b"test delta data".to_vec();
let result = engine.send_delta(
"test_collection".to_string(),
CrdtType::LwwRegister,
delta_data,
).await;
assert!(result.is_ok());
engine.stop().await.unwrap();
}
#[tokio::test]
async fn test_multiple_clients() {
let replica_id1 = create_test_replica_id();
let replica_id2 = create_test_replica_id();
let config1 = WebSocketClientConfig {
url: "ws://localhost:3001/client1".to_string(),
..Default::default()
};
let config2 = WebSocketClientConfig {
url: "ws://localhost:3001/client2".to_string(),
..Default::default()
};
let client1 = WebSocketClient::new(config1, replica_id1);
let client2 = WebSocketClient::new(config2, replica_id2);
assert!(client1.connect().await.is_ok());
assert!(client2.connect().await.is_ok());
let message1 = SyncMessage::PeerJoin {
replica_id: replica_id1,
user_info: None,
};
let message2 = SyncMessage::PeerJoin {
replica_id: replica_id2,
user_info: None,
};
assert!(client1.send_message(message1).await.is_ok());
assert!(client2.send_message(message2).await.is_ok());
assert!(client1.disconnect().await.is_ok());
assert!(client2.disconnect().await.is_ok());
}
#[tokio::test]
async fn test_connection_retry() {
let replica_id = create_test_replica_id();
let config = WebSocketClientConfig {
url: "ws://invalid-url:9999".to_string(),
reconnect_attempts: 3,
retry_delay: Duration::from_millis(100),
..Default::default()
};
let client = WebSocketClient::new(config, replica_id);
let result = client.connect().await;
assert!(result.is_ok() || result.is_err());
}
#[tokio::test]
async fn test_heartbeat() {
let replica_id = create_test_replica_id();
let config = WebSocketClientConfig {
heartbeat_interval: Duration::from_millis(100),
..Default::default()
};
let client = WebSocketClient::new(config, replica_id);
assert!(client.connect().await.is_ok());
sleep(Duration::from_millis(150)).await;
assert_eq!(client.connection_state().await, crate::transport::websocket_client::ConnectionState::Connected);
assert!(client.disconnect().await.is_ok());
}
#[tokio::test]
async fn test_message_timeout() {
let replica_id = create_test_replica_id();
let config = WebSocketClientConfig {
message_timeout: Duration::from_millis(50),
..Default::default()
};
let client = WebSocketClient::new(config, replica_id);
let result = client.receive_message().await;
assert!(result.is_err()); }
#[tokio::test]
async fn test_compression() {
let replica_id = create_test_replica_id();
let message = SyncMessage::Delta {
collection_id: "test_collection".to_string(),
crdt_type: CrdtType::LwwRegister,
delta: b"large test data that could benefit from compression".repeat(100),
timestamp: SystemTime::now(),
replica_id,
};
let compressed = MessageCodec::serialize_compressed(&message).unwrap();
let decompressed = MessageCodec::deserialize_compressed(&compressed).unwrap();
match (message, decompressed) {
(SyncMessage::Delta { collection_id: id1, crdt_type: type1, delta: delta1, replica_id: rid1, .. },
SyncMessage::Delta { collection_id: id2, crdt_type: type2, delta: delta2, replica_id: rid2, .. }) => {
assert_eq!(id1, id2);
assert_eq!(type1, type2);
assert_eq!(delta1, delta2);
assert_eq!(rid1, rid2);
}
_ => panic!("Message types don't match"),
}
}
#[tokio::test]
async fn test_error_handling() {
let replica_id = create_test_replica_id();
let client = WebSocketClient::new(WebSocketClientConfig::default(), replica_id);
let message = SyncMessage::Heartbeat {
replica_id,
timestamp: SystemTime::now(),
};
let result = client.send_message(message).await;
assert!(result.is_err()); }
#[tokio::test]
async fn test_sync_transport_trait() {
let replica_id = create_test_replica_id();
let client = WebSocketClient::new(WebSocketClientConfig::default(), replica_id);
use crate::transport::SyncTransport;
let test_data = b"test data";
let result = client.send(test_data).await;
assert!(result.is_ok());
let result = client.receive().await;
assert!(result.is_ok());
assert!(client.is_connected().await); }