use inferd_daemon::endpoint::bind_tcp;
use inferd_daemon::lifecycle::{AcceptContext, serve_tcp, wait_for_ready};
use inferd_daemon::router::Router;
use inferd_engine::mock::{Mock, MockConfig};
use inferd_proto::{ErrorCode, Message, Request, Response, Role, StopReason, write_frame};
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpStream;
async fn boot_daemon(
mock_config: MockConfig,
) -> (
String,
tokio::sync::oneshot::Sender<()>,
tokio::task::JoinHandle<()>,
) {
let mock = Arc::new(Mock::with_config(mock_config));
let router = Arc::new(Router::new(vec![mock]));
wait_for_ready(&router, Duration::from_secs(2))
.await
.expect("backend ready");
let listener = bind_tcp("127.0.0.1:0").await.expect("bind tcp");
let addr = listener.local_addr().unwrap().to_string();
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
let handle = tokio::spawn(async move {
let _ = serve_tcp(listener, router, AcceptContext::default(), shutdown_rx).await;
});
(addr, shutdown_tx, handle)
}
async fn send_and_collect(addr: &str, req: &Request) -> Vec<Response> {
let mut stream = TcpStream::connect(addr).await.expect("connect");
let mut buf = Vec::new();
write_frame(&mut buf, req).expect("write frame");
stream.write_all(&buf).await.unwrap();
stream.flush().await.unwrap();
let mut reader = BufReader::new(stream);
let mut frames = Vec::new();
loop {
let mut line = Vec::new();
let n = reader.read_until(b'\n', &mut line).await.expect("read");
if n == 0 {
break;
}
let resp: Response = serde_json::from_slice(&line).expect("decode");
let terminal = resp.is_terminal();
frames.push(resp);
if terminal {
break;
}
}
frames
}
#[tokio::test]
async fn end_to_end_streams_tokens_then_done() {
let (addr, shutdown, handle) = boot_daemon(MockConfig {
tokens: vec!["alpha ".into(), "beta ".into(), "gamma".into()],
..Default::default()
})
.await;
let req = Request {
id: "req-1".into(),
messages: vec![Message {
role: Role::User,
content: "hello".into(),
}],
temperature: None,
top_p: None,
top_k: None,
max_tokens: None,
stream: None,
image_token_budget: None,
grammar: String::new(),
};
let frames = send_and_collect(&addr, &req).await;
assert_eq!(frames.len(), 4, "3 tokens + 1 done; got {frames:#?}");
for f in &frames {
assert_eq!(f.id(), "req-1");
}
for (i, expected) in ["alpha ", "beta ", "gamma"].iter().enumerate() {
match &frames[i] {
Response::Token { content, .. } => assert_eq!(content, expected),
other => panic!("frame[{i}] expected Token, got {other:?}"),
}
}
match &frames[3] {
Response::Done {
content,
stop_reason,
backend,
usage,
..
} => {
assert_eq!(content, "alpha beta gamma");
assert_eq!(*stop_reason, StopReason::End);
assert_eq!(backend, "mock");
assert_eq!(usage.completion_tokens, 3);
}
other => panic!("expected Done, got {other:?}"),
}
let _ = shutdown.send(());
let _ = tokio::time::timeout(Duration::from_secs(1), handle).await;
}
#[tokio::test]
async fn invalid_request_yields_error_frame() {
let (addr, shutdown, handle) = boot_daemon(MockConfig::default()).await;
let req = Request {
id: "bad".into(),
messages: vec![],
temperature: None,
top_p: None,
top_k: None,
max_tokens: None,
stream: None,
image_token_budget: None,
grammar: String::new(),
};
let frames = send_and_collect(&addr, &req).await;
assert_eq!(frames.len(), 1);
match &frames[0] {
Response::Error { id, code, message } => {
assert_eq!(id, "bad");
assert_eq!(*code, ErrorCode::InvalidRequest);
assert!(message.contains("messages"), "message: {message}");
}
other => panic!("expected Error frame, got {other:?}"),
}
let _ = shutdown.send(());
let _ = tokio::time::timeout(Duration::from_secs(1), handle).await;
}
#[tokio::test]
async fn mid_stream_drop_yields_backend_unavailable_error() {
let (addr, shutdown, handle) = boot_daemon(MockConfig {
tokens: vec!["partial".into(), "rest".into()],
mid_stream_drop_after: Some(1),
..Default::default()
})
.await;
let req = Request {
id: "drop-1".into(),
messages: vec![Message {
role: Role::User,
content: "x".into(),
}],
temperature: None,
top_p: None,
top_k: None,
max_tokens: None,
stream: None,
image_token_budget: None,
grammar: String::new(),
};
let frames = send_and_collect(&addr, &req).await;
assert_eq!(frames.len(), 2, "got: {frames:#?}");
match &frames[0] {
Response::Token { content, .. } => assert_eq!(content, "partial"),
other => panic!("frame[0] expected Token, got {other:?}"),
}
match &frames[1] {
Response::Error { code, .. } => {
assert_eq!(*code, ErrorCode::BackendUnavailable);
}
other => panic!("frame[1] expected Error, got {other:?}"),
}
let _ = shutdown.send(());
let _ = tokio::time::timeout(Duration::from_secs(1), handle).await;
}
#[tokio::test]
async fn ready_gating_blocks_listener_creation_until_ready() {
let mock = Arc::new(Mock::new());
mock.set_ready(false);
let router = Router::new(vec![Arc::clone(&mock) as _]);
let res = tokio::time::timeout(
Duration::from_millis(200),
wait_for_ready(&router, Duration::from_secs(5)),
)
.await;
assert!(res.is_err(), "wait_for_ready returned before ready");
mock.set_ready(true);
let elapsed = wait_for_ready(&router, Duration::from_secs(2))
.await
.unwrap();
assert!(elapsed < Duration::from_millis(150));
}