use std::time::Duration;
use dynamo_llm::http::service::{realtime, service_v2::HttpService};
use dynamo_runtime::CancellationToken;
use futures::{SinkExt, StreamExt};
use serde_json::Value;
use tokio_tungstenite::tungstenite::Message;
#[path = "common/ports.rs"]
mod ports;
use ports::get_random_port;
fn ensure_echo_engine_installed() {
static INIT: std::sync::Once = std::sync::Once::new();
INIT.call_once(|| {
unsafe {
std::env::set_var("DYN_TOKEN_ECHO_DELAY_MS", "0");
}
let _ = realtime::install_echo_engine();
});
}
async fn wait_for_health(port: u16) {
let deadline = std::time::Instant::now() + Duration::from_secs(5);
while std::time::Instant::now() < deadline {
if reqwest::get(format!("http://127.0.0.1:{port}/health"))
.await
.map(|r| r.status().is_success())
.unwrap_or(false)
{
return;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
panic!("frontend never became healthy on port {port}");
}
#[tokio::test]
async fn realtime_websocket_echoes_per_char_and_finishes_per_request() {
ensure_echo_engine_installed();
let port = get_random_port().await;
let service = HttpService::builder().port(port).build().unwrap();
let token = CancellationToken::new();
let handle = service.spawn(token.clone()).await;
wait_for_health(port).await;
let url = format!("ws://127.0.0.1:{port}/v1/realtime");
let (mut ws, _resp) = tokio_tungstenite::connect_async(&url)
.await
.expect("ws connect");
let body1 = serde_json::json!({
"model": "echo",
"messages": [{ "role": "user", "content": "hi" }],
});
ws.send(Message::Text(body1.to_string().into()))
.await
.expect("send first request");
let body2 = serde_json::json!({
"model": "echo",
"messages": [{ "role": "user", "content": "ok" }],
});
ws.send(Message::Text(body2.to_string().into()))
.await
.expect("send second request");
let mut text = String::new();
let mut stops = 0usize;
let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
while tokio::time::Instant::now() < deadline {
let frame = tokio::time::timeout(Duration::from_secs(2), ws.next()).await;
let Ok(Some(Ok(msg))) = frame else { break };
match msg {
Message::Text(t) => {
let v: Value = serde_json::from_str(&t).expect("response is valid JSON");
let choices = v
.pointer("/data/choices")
.and_then(|c| c.as_array())
.cloned()
.unwrap_or_default();
for choice in choices {
if let Some(content) = choice
.get("delta")
.and_then(|d| d.get("content"))
.and_then(|c| c.as_str())
{
text.push_str(content);
}
if choice
.get("finish_reason")
.and_then(|f| f.as_str())
.map(|s| s == "stop")
.unwrap_or(false)
{
stops += 1;
}
}
}
Message::Close(_) => break,
_ => {}
}
if stops >= 2 {
break;
}
}
let _ = ws.close(None).await;
token.cancel();
let _ = handle.await;
assert_eq!(text, "hiok", "echoed text from both requests");
assert_eq!(stops, 2, "expected one finish_reason=stop per request");
}
#[tokio::test]
async fn realtime_websocket_emits_close_after_client_close() {
ensure_echo_engine_installed();
let port = get_random_port().await;
let service = HttpService::builder().port(port).build().unwrap();
let token = CancellationToken::new();
let handle = service.spawn(token.clone()).await;
wait_for_health(port).await;
let url = format!("ws://127.0.0.1:{port}/v1/realtime");
let (mut ws, _resp) = tokio_tungstenite::connect_async(&url)
.await
.expect("ws connect");
let body = serde_json::json!({
"model": "echo",
"messages": [{ "role": "user", "content": "hi" }],
});
ws.send(Message::Text(body.to_string().into()))
.await
.expect("send");
let first = tokio::time::timeout(Duration::from_secs(2), ws.next()).await;
assert!(
matches!(first, Ok(Some(Ok(Message::Text(_))))),
"expected at least one delta from the engine before closing, got {first:?}"
);
ws.close(None).await.expect("client close");
let mut got_close = false;
let deadline = tokio::time::Instant::now() + Duration::from_secs(3);
while tokio::time::Instant::now() < deadline {
let frame = tokio::time::timeout(Duration::from_secs(2), ws.next()).await;
let Ok(maybe) = frame else { break };
match maybe {
Some(Ok(Message::Close(_))) => {
got_close = true;
break;
}
None => break, _ => {} }
}
token.cancel();
let _ = handle.await;
assert!(
got_close,
"server should send an explicit Close frame after client-initiated close"
);
}
#[tokio::test]
async fn realtime_websocket_rejects_binary_frame() {
ensure_echo_engine_installed();
let port = get_random_port().await;
let service = HttpService::builder().port(port).build().unwrap();
let token = CancellationToken::new();
let handle = service.spawn(token.clone()).await;
wait_for_health(port).await;
let url = format!("ws://127.0.0.1:{port}/v1/realtime");
let (mut ws, _resp) = tokio_tungstenite::connect_async(&url)
.await
.expect("ws connect");
ws.send(Message::Binary(vec![0u8, 1, 2, 3].into()))
.await
.expect("send binary");
let mut got_close = false;
let deadline = tokio::time::Instant::now() + Duration::from_secs(3);
while tokio::time::Instant::now() < deadline {
let frame = tokio::time::timeout(Duration::from_secs(2), ws.next()).await;
let Ok(maybe) = frame else { break };
match maybe {
Some(Ok(Message::Close(_))) => {
got_close = true;
break;
}
None => break,
_ => {}
}
}
let _ = ws.close(None).await;
token.cancel();
let _ = handle.await;
assert!(
got_close,
"server should close the connection on a binary frame"
);
}