#![cfg(test)]
use std::sync::Arc;
use fraiseql_core::{
runtime::subscription::{
SubscriptionEvent, SubscriptionId, SubscriptionManager, SubscriptionOperation,
},
schema::CompiledSchema,
};
use fraiseql_server::subscriptions::{EntityEvent, EventBridge, EventBridgeConfig};
use serde_json::json;
fn create_test_schema() -> CompiledSchema {
CompiledSchema::new()
}
#[test]
fn test_subscription_manager_initialization() {
let schema = Arc::new(create_test_schema());
let manager = SubscriptionManager::new(schema);
assert_eq!(manager.subscription_count(), 0);
assert_eq!(manager.connection_count(), 0);
}
#[test]
fn test_subscription_manager_with_capacity() {
let schema = Arc::new(create_test_schema());
let manager = SubscriptionManager::with_capacity(schema, 512);
assert_eq!(manager.subscription_count(), 0);
let receiver = manager.receiver();
drop(receiver); }
#[test]
fn test_subscribe_to_subscription_type() {
let schema = Arc::new(create_test_schema());
let manager = SubscriptionManager::new(schema);
let result = manager.subscribe("OrderCreated", json!({}), json!({}), "conn_123");
assert!(result.is_err(), "expected Err for unknown subscription type, got: {result:?}");
}
#[test]
fn test_get_subscription_returns_none() {
let schema = Arc::new(create_test_schema());
let manager = SubscriptionManager::new(schema);
let sub_id = SubscriptionId::new();
let result = manager.get_subscription(sub_id);
assert!(result.is_none());
}
#[test]
fn test_unsubscribe_removes_subscription() {
let schema = Arc::new(create_test_schema());
let manager = SubscriptionManager::new(schema);
let sub_id = SubscriptionId::new();
let result = manager.unsubscribe(sub_id);
assert!(
result.is_err(),
"expected Err when unsubscribing non-existent id, got: {result:?}"
);
}
#[test]
fn test_unsubscribe_connection_removes_all() {
let schema = Arc::new(create_test_schema());
let manager = SubscriptionManager::new(schema);
manager.unsubscribe_connection("conn_123");
assert_eq!(manager.connection_count(), 0);
}
#[test]
fn test_get_connection_subscriptions() {
let schema = Arc::new(create_test_schema());
let manager = SubscriptionManager::new(schema);
let subs = manager.get_connection_subscriptions("conn_123");
assert_eq!(subs.len(), 0);
}
#[test]
fn test_publish_event_creates_payload() {
let schema = Arc::new(create_test_schema());
let manager = SubscriptionManager::new(schema);
let event = SubscriptionEvent::new(
"Order",
"order_123",
SubscriptionOperation::Create,
json!({
"id": "order_123",
"status": "pending"
}),
);
let matched = manager.publish_event(event);
assert_eq!(matched, 0); }
#[tokio::test]
async fn test_event_receiver_gets_messages() {
let schema = Arc::new(create_test_schema());
let manager = SubscriptionManager::new(schema);
let mut receiver = manager.receiver();
let event = SubscriptionEvent::new(
"Order",
"order_123",
SubscriptionOperation::Create,
json!({
"id": "order_123",
"status": "pending"
}),
);
manager.publish_event(event);
let timeout = tokio::time::sleep(tokio::time::Duration::from_millis(100));
tokio::select! {
msg = receiver.recv() => {
assert!(msg.is_err(), "expected Err (no payload sent), got: {msg:?}");
}
() = timeout => {
}
}
}
#[test]
fn test_multiple_subscription_instances() {
let schema = Arc::new(create_test_schema());
let manager = SubscriptionManager::new(schema);
assert_eq!(manager.subscription_count(), 0);
assert_eq!(manager.connection_count(), 0);
}
#[test]
fn test_event_bridge_initialization() {
let schema = Arc::new(create_test_schema());
let manager = Arc::new(SubscriptionManager::new(schema));
let config = EventBridgeConfig::new();
let bridge = EventBridge::new(manager, config);
let sender = bridge.get_sender();
sender
.try_reserve()
.unwrap_or_else(|e| panic!("expected Ok from try_reserve: {e}"));
}
#[test]
fn test_entity_event_conversion() {
let entity_event = EntityEvent::new(
"Order",
"order_123",
"INSERT",
json!({
"id": "order_123",
"amount": 100.0
}),
);
assert_eq!(entity_event.entity_type, "Order");
assert_eq!(entity_event.entity_id, "order_123");
assert_eq!(entity_event.operation, "INSERT");
}
#[tokio::test]
async fn test_event_routing_to_manager() {
let schema = Arc::new(create_test_schema());
let manager = Arc::new(SubscriptionManager::new(schema));
let config = EventBridgeConfig::new();
let bridge = EventBridge::new(manager, config);
let sender = bridge.get_sender();
let entity_event = EntityEvent::new(
"Order",
"order_123",
"INSERT",
json!({"id": "order_123", "status": "pending"}),
);
let result = sender.try_send(entity_event);
result.unwrap_or_else(|e| panic!("expected Ok from try_send: {e}"));
}
#[tokio::test]
async fn test_multiple_subscriptions_fanout() {
let schema = Arc::new(create_test_schema());
let manager = Arc::new(SubscriptionManager::new(schema));
let config = EventBridgeConfig::new();
let bridge = EventBridge::new(manager, config);
let sender = bridge.get_sender();
sender
.try_reserve()
.unwrap_or_else(|e| panic!("expected Ok from try_reserve: {e}"));
}
#[test]
fn test_filtering_by_entity_type() {
let schema = Arc::new(create_test_schema());
let manager = Arc::new(SubscriptionManager::new(schema));
let config = EventBridgeConfig::new();
let bridge = EventBridge::new(manager, config);
let sender = bridge.get_sender();
let order_event = EntityEvent::new("Order", "order_123", "INSERT", json!({"id": "order_123"}));
let user_event = EntityEvent::new("User", "user_123", "INSERT", json!({"id": "user_123"}));
sender
.try_send(order_event)
.unwrap_or_else(|e| panic!("expected Ok sending order event: {e}"));
sender
.try_send(user_event)
.unwrap_or_else(|e| panic!("expected Ok sending user event: {e}"));
}
#[test]
fn test_listener_error_handling() {
let schema = Arc::new(create_test_schema());
let manager = Arc::new(SubscriptionManager::new(schema));
let config = EventBridgeConfig::new();
let bridge = EventBridge::new(manager, config);
let sender = bridge.get_sender();
sender
.try_reserve()
.unwrap_or_else(|e| panic!("expected Ok from try_reserve: {e}"));
}
#[test]
fn test_subscription_manager_errors() {
let schema = Arc::new(create_test_schema());
let manager = SubscriptionManager::new(schema);
let result = manager.subscribe("NonExistent", json!({}), json!({}), "conn_1");
assert!(result.is_err(), "expected Err for non-existent subscription, got: {result:?}");
}
#[tokio::test]
async fn test_shutdown_cleanup() {
let schema = Arc::new(create_test_schema());
let manager = Arc::new(SubscriptionManager::new(schema));
let config = EventBridgeConfig::new();
let bridge = EventBridge::new(manager, config);
let handle = bridge.spawn();
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
handle.abort();
}
#[test]
fn test_websocket_disconnect_cleanup() {
let schema = Arc::new(create_test_schema());
let manager = SubscriptionManager::new(schema);
manager.unsubscribe_connection("conn_123");
assert_eq!(manager.connection_count(), 0);
}
#[test]
fn test_event_sequence_ordering() {
let schema = Arc::new(create_test_schema());
let manager = SubscriptionManager::new(schema);
let event1 = SubscriptionEvent::new(
"Order",
"order_1",
SubscriptionOperation::Create,
json!({"id": "order_1"}),
);
let event2 = SubscriptionEvent::new(
"Order",
"order_2",
SubscriptionOperation::Create,
json!({"id": "order_2"}),
);
manager.publish_event(event1);
manager.publish_event(event2);
}
#[tokio::test]
async fn test_websocket_end_to_end_delivery() {
let schema = Arc::new(create_test_schema());
let manager = Arc::new(SubscriptionManager::new(schema));
let config = EventBridgeConfig::new();
let bridge = EventBridge::new(manager, config);
let sender = bridge.get_sender();
let entity_event = EntityEvent::new("Order", "order_123", "INSERT", json!({"id": "order_123"}));
let result = sender.try_send(entity_event);
result.unwrap_or_else(|e| panic!("expected Ok from try_send: {e}"));
}
#[tokio::test]
async fn test_listener_recovery_after_restart() {
let schema = Arc::new(create_test_schema());
let manager = Arc::new(SubscriptionManager::new(schema));
let config = EventBridgeConfig::new();
let bridge1 = EventBridge::new(manager.clone(), config);
let handle1 = bridge1.spawn();
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
handle1.abort();
let bridge2 = EventBridge::new(manager, config);
let handle2 = bridge2.spawn();
assert!(!handle2.is_finished());
handle2.abort();
}
#[test]
fn test_subscription_projection_filters() {
let schema = Arc::new(create_test_schema());
let manager = Arc::new(SubscriptionManager::new(schema));
let config = EventBridgeConfig::new();
let bridge = EventBridge::new(manager, config);
let sender = bridge.get_sender();
let event = EntityEvent::new(
"Order",
"order_123",
"INSERT",
json!({
"id": "order_123",
"status": "pending",
"amount": 100.0,
"customer": "customer_123"
}),
);
sender
.try_send(event)
.unwrap_or_else(|e| panic!("expected Ok from try_send: {e}"));
}
#[tokio::test]
async fn test_concurrent_client_subscriptions() {
let schema = Arc::new(create_test_schema());
let manager = Arc::new(SubscriptionManager::new(schema));
let config = EventBridgeConfig::new();
let bridge = Arc::new(EventBridge::new(manager, config));
let sender = bridge.get_sender();
let handle1 = {
let sender = sender.clone();
tokio::spawn(async move {
let event = EntityEvent::new("Order", "order_1", "INSERT", json!({"id": "order_1"}));
sender.try_send(event).ok()
})
};
let handle2 = {
let sender = sender.clone();
tokio::spawn(async move {
let event = EntityEvent::new("Order", "order_2", "INSERT", json!({"id": "order_2"}));
sender.try_send(event).ok()
})
};
let _ = tokio::join!(handle1, handle2); }