#![allow(clippy::unwrap_used)] #![allow(clippy::missing_panics_doc)] #![allow(missing_docs)]
use std::sync::Arc;
use fraiseql_core::{
runtime::subscription::{SubscriptionEvent, SubscriptionManager, SubscriptionOperation},
schema::{CompiledSchema, SubscriptionDefinition},
};
use fraiseql_server::routes::subscriptions::{SubscriptionState, subscription_handler};
use futures::{SinkExt, StreamExt};
use serde_json::json;
use tokio::net::TcpListener;
use tokio_tungstenite::{connect_async, tungstenite};
fn schema_with_subscription(name: &str, return_type: &str) -> CompiledSchema {
let mut schema = CompiledSchema::new();
schema.subscriptions.push(SubscriptionDefinition::new(name, return_type));
schema
}
async fn spawn_ws_server(state: SubscriptionState) -> String {
let app = axum::Router::new()
.route("/ws", axum::routing::get(subscription_handler))
.with_state(state);
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind to ephemeral port");
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
format!("ws://{addr}/ws")
}
async fn send_json(
ws: &mut futures::stream::SplitSink<
tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>,
tungstenite::Message,
>,
value: serde_json::Value,
) {
let text = serde_json::to_string(&value).unwrap();
ws.send(tungstenite::Message::Text(text.into())).await.unwrap();
}
async fn recv_json(
ws: &mut futures::stream::SplitStream<
tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>,
>,
) -> serde_json::Value {
loop {
let msg = tokio::time::timeout(std::time::Duration::from_secs(5), ws.next())
.await
.expect("timed out waiting for WebSocket message")
.expect("stream ended unexpectedly")
.expect("WebSocket error");
if let tungstenite::Message::Text(text) = msg {
let value: serde_json::Value = serde_json::from_str(&text).unwrap();
if value.get("type").and_then(|t| t.as_str()) == Some("ping") {
continue;
}
return value;
}
}
}
async fn connect_ws(
url: &str,
) -> (
futures::stream::SplitSink<
tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>,
tungstenite::Message,
>,
futures::stream::SplitStream<
tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>,
>,
) {
let (ws_stream, _) = connect_async(url).await.expect("WebSocket connect failed");
ws_stream.split()
}
#[tokio::test]
async fn ws_e2e_subscribe_and_receive_next_frame() {
let schema = Arc::new(schema_with_subscription("orderCreated", "Order"));
let manager = Arc::new(SubscriptionManager::new(schema));
let state = SubscriptionState::new(manager.clone());
let url = spawn_ws_server(state).await;
let (mut sink, mut stream) = connect_ws(&url).await;
send_json(&mut sink, json!({"type": "connection_init"})).await;
let ack = recv_json(&mut stream).await;
assert_eq!(ack["type"], "connection_ack", "expected connection_ack, got {ack}");
send_json(
&mut sink,
json!({
"type": "subscribe",
"id": "op_1",
"payload": {
"query": "subscription { orderCreated { id status } }"
}
}),
)
.await;
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2);
while manager.subscription_count() != 1 {
assert!(tokio::time::Instant::now() < deadline, "subscription should be registered");
tokio::task::yield_now().await;
}
let event = SubscriptionEvent::new(
"Order",
"order_42",
SubscriptionOperation::Create,
json!({"id": "order_42", "status": "pending"}),
);
let matched = manager.publish_event(event);
assert_eq!(matched, 1, "event should match exactly one subscription");
let next_frame = recv_json(&mut stream).await;
assert_eq!(next_frame["type"], "next", "expected next frame, got {next_frame}");
assert_eq!(next_frame["id"], "op_1");
let payload = &next_frame["payload"];
assert!(payload.get("data").is_some(), "next frame must contain data");
let data = &payload["data"];
assert_eq!(data["orderCreated"]["id"], "order_42");
assert_eq!(data["orderCreated"]["status"], "pending");
}
#[tokio::test]
async fn ws_e2e_connection_init_ack_handshake() {
let schema = Arc::new(CompiledSchema::new());
let manager = Arc::new(SubscriptionManager::new(schema));
let state = SubscriptionState::new(manager);
let url = spawn_ws_server(state).await;
let (mut sink, mut stream) = connect_ws(&url).await;
send_json(&mut sink, json!({"type": "connection_init", "payload": {"token": "test-jwt"}}))
.await;
let ack = recv_json(&mut stream).await;
assert_eq!(ack["type"], "connection_ack");
}
#[tokio::test]
async fn ws_e2e_subscribe_unknown_returns_error() {
let schema = Arc::new(CompiledSchema::new()); let manager = Arc::new(SubscriptionManager::new(schema));
let state = SubscriptionState::new(manager);
let url = spawn_ws_server(state).await;
let (mut sink, mut stream) = connect_ws(&url).await;
send_json(&mut sink, json!({"type": "connection_init"})).await;
let ack = recv_json(&mut stream).await;
assert_eq!(ack["type"], "connection_ack");
send_json(
&mut sink,
json!({
"type": "subscribe",
"id": "op_bad",
"payload": {
"query": "subscription { nonExistent { id } }"
}
}),
)
.await;
let error_frame = recv_json(&mut stream).await;
assert_eq!(error_frame["type"], "error", "expected error frame, got {error_frame}");
assert_eq!(error_frame["id"], "op_bad");
}
#[tokio::test]
async fn ws_e2e_complete_unsubscribes() {
let schema = Arc::new(schema_with_subscription("orderCreated", "Order"));
let manager = Arc::new(SubscriptionManager::new(schema));
let state = SubscriptionState::new(manager.clone());
let url = spawn_ws_server(state).await;
let (mut sink, mut stream) = connect_ws(&url).await;
send_json(&mut sink, json!({"type": "connection_init"})).await;
let ack = recv_json(&mut stream).await;
assert_eq!(ack["type"], "connection_ack");
send_json(
&mut sink,
json!({
"type": "subscribe",
"id": "op_1",
"payload": {
"query": "subscription { orderCreated { id } }"
}
}),
)
.await;
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2);
while manager.subscription_count() != 1 {
assert!(tokio::time::Instant::now() < deadline, "subscription should be registered");
tokio::task::yield_now().await;
}
send_json(&mut sink, json!({"type": "complete", "id": "op_1"})).await;
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2);
while manager.subscription_count() != 0 {
assert!(
tokio::time::Instant::now() < deadline,
"subscription should be removed after complete"
);
tokio::task::yield_now().await;
}
}