#![allow(clippy::unwrap_used)]
use std::{sync::Arc, time::Duration};
use fraiseql_core::{
runtime::subscription::{SubscriptionEvent, SubscriptionManager, SubscriptionOperation},
schema::CompiledSchema,
};
use fraiseql_server::subscriptions::{
broadcast::{BroadcastConfig, BroadcastManager},
event_bridge::{EntityEvent, EventBridge, EventBridgeConfig},
presence::{PresenceConfig, PresenceManager},
};
#[tokio::test]
async fn test_cdc_event_bridge_end_to_end() {
let schema = Arc::new(CompiledSchema::new());
let manager = Arc::new(SubscriptionManager::new(schema));
let bridge = EventBridge::new(manager, EventBridgeConfig::new());
let sender = bridge.sender();
let handle = bridge.spawn();
let event =
EntityEvent::new("Order", "order_1", "INSERT", serde_json::json!({"id": "order_1"}));
sender.send(event).await.unwrap();
tokio::task::yield_now().await;
assert!(!handle.is_finished(), "bridge should remain running");
handle.abort();
}
#[tokio::test]
async fn test_cdc_event_bridge_multiple_events() {
let schema = Arc::new(CompiledSchema::new());
let manager = Arc::new(SubscriptionManager::new(schema));
let bridge = EventBridge::new(manager, EventBridgeConfig::new());
let sender = bridge.sender();
let handle = bridge.spawn();
for i in 0..10 {
let op = match i % 3 {
0 => "INSERT",
1 => "UPDATE",
_ => "DELETE",
};
let event =
EntityEvent::new("Product", format!("prod_{i}"), op, serde_json::json!({"id": i}));
sender.send(event).await.unwrap();
}
tokio::task::yield_now().await;
assert!(!handle.is_finished(), "bridge should handle batch without errors");
handle.abort();
}
#[tokio::test]
async fn test_cdc_event_bridge_tenant_propagation() {
let schema = Arc::new(CompiledSchema::new());
let manager = Arc::new(SubscriptionManager::new(schema));
let _rx = manager.receiver();
let bridge = EventBridge::new(manager, EventBridgeConfig::new());
let sender = bridge.sender();
let handle = bridge.spawn();
let event = EntityEvent::new("User", "user_1", "INSERT", serde_json::json!({"id": "user_1"}))
.with_tenant_id("org_42");
sender.send(event).await.unwrap();
tokio::task::yield_now().await;
assert!(!handle.is_finished());
handle.abort();
}
#[tokio::test]
async fn test_broadcast_publish_and_receive() {
let manager = Arc::new(BroadcastManager::new(BroadcastConfig::new()));
let mut rx = manager.subscribe("chat:lobby").await.unwrap();
manager
.publish("chat:lobby", "message".into(), serde_json::json!({"text": "hello world"}))
.await
.unwrap();
let msg = rx.recv().await.unwrap();
assert_eq!(msg.channel, "chat:lobby");
assert_eq!(msg.event, "message");
assert_eq!(msg.payload["text"], "hello world");
}
#[tokio::test]
async fn test_broadcast_channel_isolation() {
let manager = Arc::new(BroadcastManager::new(BroadcastConfig::new()));
let mut rx_a = manager.subscribe("room:a").await.unwrap();
let _rx_b = manager.subscribe("room:b").await.unwrap();
manager
.publish("room:a", "event".into(), serde_json::json!({"v": 1}))
.await
.unwrap();
let msg = rx_a.recv().await.unwrap();
assert_eq!(msg.payload["v"], 1);
}
#[tokio::test]
async fn test_broadcast_multiple_subscribers() {
let manager = Arc::new(BroadcastManager::new(BroadcastConfig::new()));
let mut rx1 = manager.subscribe("events").await.unwrap();
let mut rx2 = manager.subscribe("events").await.unwrap();
let count = manager
.publish("events", "update".into(), serde_json::json!({"n": 42}))
.await
.unwrap();
assert_eq!(count, 2, "should notify 2 subscribers");
let m1 = rx1.recv().await.unwrap();
let m2 = rx2.recv().await.unwrap();
assert_eq!(m1.payload, m2.payload);
}
#[tokio::test]
async fn test_broadcast_payload_size_limit() {
let config = BroadcastConfig {
max_message_bytes: 32,
..BroadcastConfig::new()
};
let manager = BroadcastManager::new(config);
let result = manager
.publish(
"ch",
"e".into(),
serde_json::json!({"big": "this payload exceeds the 32 byte limit"}),
)
.await;
assert!(result.is_err(), "oversized payload should be rejected");
}
#[tokio::test]
async fn test_broadcast_gc_empty_channels() {
let manager = BroadcastManager::new(BroadcastConfig::new());
let _rx = manager.subscribe("active").await.unwrap();
manager.publish("orphan", "e".into(), serde_json::json!({})).await.unwrap();
assert_eq!(manager.channel_count().await, 2);
let removed = manager.gc_empty_channels().await;
assert_eq!(removed, 1);
assert_eq!(manager.channel_count().await, 1);
}
#[tokio::test]
async fn test_presence_join_and_state() {
let mgr = PresenceManager::new(PresenceConfig::new());
let (state, diff) = mgr
.join("room1", "alice", serde_json::json!({"status": "online"}))
.await
.unwrap();
assert_eq!(state.room, "room1");
assert_eq!(state.members.len(), 1);
assert_eq!(state.members[0].id, "alice");
assert_eq!(state.members[0].state["status"], "online");
assert_eq!(diff.joins.len(), 1);
assert!(diff.leaves.is_empty());
}
#[tokio::test]
async fn test_presence_join_multiple_members() {
let mgr = PresenceManager::new(PresenceConfig::new());
mgr.join("room1", "alice", serde_json::json!({})).await.unwrap();
let (state, diff) = mgr.join("room1", "bob", serde_json::json!({})).await.unwrap();
assert_eq!(state.members.len(), 2);
assert_eq!(diff.joins.len(), 1);
assert_eq!(diff.joins[0].id, "bob");
}
#[tokio::test]
async fn test_presence_leave_and_cleanup() {
let mgr = PresenceManager::new(PresenceConfig::new());
mgr.join("room1", "alice", serde_json::json!({})).await.unwrap();
let diff = mgr.leave("room1", "alice").await.unwrap();
assert_eq!(diff.leaves, vec!["alice"]);
assert!(mgr.get_room("room1").await.is_none(), "empty room should be cleaned up");
}
#[tokio::test(start_paused = true)]
async fn test_presence_heartbeat_eviction() {
let config = PresenceConfig {
heartbeat_timeout: Duration::from_millis(5),
..PresenceConfig::new()
};
let mgr = PresenceManager::new(config);
mgr.join("room1", "alice", serde_json::json!({})).await.unwrap();
mgr.join("room1", "bob", serde_json::json!({})).await.unwrap();
tokio::time::advance(Duration::from_millis(20)).await;
let diffs = mgr.evict_stale().await;
assert_eq!(diffs.len(), 1);
assert_eq!(diffs[0].leaves.len(), 2, "both members should be evicted");
assert!(mgr.get_room("room1").await.is_none(), "room should be cleaned up");
}
#[tokio::test(start_paused = true)]
async fn test_presence_heartbeat_keeps_member_alive() {
let config = PresenceConfig {
heartbeat_timeout: Duration::from_millis(50),
..PresenceConfig::new()
};
let mgr = PresenceManager::new(config);
mgr.join("room1", "alice", serde_json::json!({})).await.unwrap();
mgr.join("room1", "bob", serde_json::json!({})).await.unwrap();
tokio::time::advance(Duration::from_millis(30)).await;
mgr.heartbeat("room1", "alice").await;
tokio::time::advance(Duration::from_millis(30)).await;
let diffs = mgr.evict_stale().await;
assert_eq!(diffs.len(), 1);
assert_eq!(diffs[0].leaves, vec!["bob"]);
let state = mgr.get_room("room1").await.unwrap();
assert_eq!(state.members.len(), 1);
assert_eq!(state.members[0].id, "alice");
}
#[tokio::test]
async fn test_presence_update_state() {
let mgr = PresenceManager::new(PresenceConfig::new());
mgr.join("room1", "alice", serde_json::json!({"cursor": [0, 0]})).await.unwrap();
let diff = mgr
.update_state("room1", "alice", serde_json::json!({"cursor": [100, 200]}))
.await
.unwrap();
assert_eq!(diff.joins.len(), 1);
assert_eq!(diff.joins[0].state["cursor"][0], 100);
}
#[tokio::test]
async fn test_presence_room_capacity() {
let config = PresenceConfig {
max_members_per_room: 2,
..PresenceConfig::new()
};
let mgr = PresenceManager::new(config);
mgr.join("room1", "alice", serde_json::json!({})).await.unwrap();
mgr.join("room1", "bob", serde_json::json!({})).await.unwrap();
let result = mgr.join("room1", "charlie", serde_json::json!({})).await;
assert!(result.is_err(), "third member should be rejected");
}
#[tokio::test]
async fn test_tenant_filtering_subscription_event() {
let event = SubscriptionEvent::new(
"Order",
"order_1",
SubscriptionOperation::Create,
serde_json::json!({"id": "order_1"}),
)
.with_tenant_id("org_42");
assert_eq!(event.tenant_id.as_deref(), Some("org_42"));
}
#[tokio::test]
async fn test_tenant_filtering_bridge_event() {
let event = EntityEvent::new("User", "user_1", "INSERT", serde_json::json!({"id": "user_1"}))
.with_tenant_id("org_99");
assert_eq!(event.tenant_id.as_deref(), Some("org_99"));
}
#[tokio::test]
async fn test_tenant_filtering_event_conversion() {
let entity_event =
EntityEvent::new("Order", "order_1", "INSERT", serde_json::json!({"id": "order_1"}))
.with_tenant_id("org_42");
let sub_event = EventBridge::convert_event(entity_event);
assert_eq!(sub_event.tenant_id.as_deref(), Some("org_42"));
assert_eq!(sub_event.entity_type, "Order");
}
#[tokio::test]
async fn test_tenant_filtering_no_tenant_passes_through() {
let entity_event =
EntityEvent::new("Order", "order_1", "INSERT", serde_json::json!({"id": "order_1"}));
let sub_event = EventBridge::convert_event(entity_event);
assert!(sub_event.tenant_id.is_none());
}
#[tokio::test]
async fn test_broadcast_stats() {
let manager = Arc::new(BroadcastManager::new(BroadcastConfig::new()));
let _rx = manager.subscribe("ch1").await.unwrap();
manager.publish("ch1", "e".into(), serde_json::json!({})).await.unwrap();
manager.publish("ch2", "e".into(), serde_json::json!({})).await.unwrap();
let stats = manager.stats().await;
assert_eq!(stats.messages_published, 2);
assert_eq!(stats.active_channels, 2);
assert_eq!(stats.active_receivers, 1);
}
#[tokio::test]
async fn test_presence_stats() {
let mgr = PresenceManager::new(PresenceConfig::new());
mgr.join("r1", "alice", serde_json::json!({})).await.unwrap();
mgr.join("r1", "bob", serde_json::json!({})).await.unwrap();
mgr.join("r2", "charlie", serde_json::json!({})).await.unwrap();
mgr.leave("r1", "alice").await;
let stats = mgr.stats().await;
assert_eq!(stats.active_rooms, 2);
assert_eq!(stats.total_members, 2);
assert_eq!(stats.joins_total, 3);
assert_eq!(stats.leaves_total, 1);
}