use futures_util::StreamExt;
#[path = "../src/models.rs"]
#[allow(dead_code, unused_imports)]
mod models;
#[path = "support/llm_client.rs"]
mod llm_client;
use crate::llm_client::LlmClient;
use crate::llm_client::mock::{MockLlmClient, canned};
use crate::models::{ContentBlock, Delta, Message, MessageRequest, StreamEvent, Usage};
fn user_message(text: &str) -> Message {
Message {
role: "user".to_string(),
content: vec![ContentBlock::Text {
text: text.to_string(),
cache_control: None,
}],
}
}
fn assistant_thinking(thinking: &str, text: &str) -> Message {
Message {
role: "assistant".to_string(),
content: vec![
ContentBlock::Thinking {
thinking: thinking.to_string(),
},
ContentBlock::Text {
text: text.to_string(),
cache_control: None,
},
],
}
}
fn assistant_tool_call(id: &str, name: &str, input: serde_json::Value) -> Message {
Message {
role: "assistant".to_string(),
content: vec![ContentBlock::ToolUse {
id: id.to_string(),
name: name.to_string(),
input,
caller: None,
}],
}
}
fn tool_result_message(tool_use_id: &str, content: &str) -> Message {
Message {
role: "user".to_string(),
content: vec![ContentBlock::ToolResult {
tool_use_id: tool_use_id.to_string(),
content: content.to_string(),
is_error: None,
content_blocks: None,
}],
}
}
fn make_request(messages: Vec<Message>) -> MessageRequest {
MessageRequest {
model: "deepseek-v4-pro".to_string(),
messages,
max_tokens: 4096,
system: None,
tools: None,
tool_choice: None,
metadata: None,
thinking: None,
reasoning_effort: Some("high".to_string()),
stream: Some(true),
temperature: None,
top_p: None,
}
}
async fn drain_stream_text(
mock: &MockLlmClient,
request: MessageRequest,
) -> (String, Option<String>) {
let mut stream = mock
.create_message_stream(request)
.await
.expect("stream open");
let mut text = String::new();
let mut stop_reason: Option<String> = None;
while let Some(ev) = stream.next().await {
match ev.expect("event") {
StreamEvent::ContentBlockDelta {
delta: Delta::TextDelta { text: t },
..
} => text.push_str(&t),
StreamEvent::MessageDelta { delta, .. } => {
stop_reason = delta.stop_reason;
}
StreamEvent::MessageStop => break,
_ => {}
}
}
(text, stop_reason)
}
#[tokio::test]
async fn full_turn_loop_streams_text_chunks() {
let turn = vec![
canned::message_start("msg_1"),
canned::text_block_start(0),
canned::text_delta(0, "Hello, "),
canned::text_delta(0, "world!"),
canned::block_stop(0),
canned::message_delta("end_turn", Some(Usage::default())),
canned::message_stop(),
];
let mock = MockLlmClient::new(vec![turn]);
let request = make_request(vec![user_message("greet me")]);
let (text, stop) = drain_stream_text(&mock, request).await;
assert_eq!(text, "Hello, world!");
assert_eq!(stop.as_deref(), Some("end_turn"));
assert_eq!(mock.call_count(), 1);
assert_eq!(mock.captured_requests().len(), 1);
}
#[tokio::test]
async fn reasoning_replay_required_on_subsequent_turn() {
let turn1 = vec![
canned::message_start("r1"),
canned::thinking_delta(0, "I should call list_dir."),
canned::tool_use_block_start(1, "call_a", "list_dir"),
canned::tool_input_delta(1, r#"{"path":"/tmp"}"#),
canned::block_stop(1),
canned::message_delta("tool_use", None),
canned::message_stop(),
];
let turn2 = vec![
canned::message_start("r2"),
canned::text_block_start(0),
canned::text_delta(0, "I see /tmp."),
canned::block_stop(0),
canned::message_delta("end_turn", None),
canned::message_stop(),
];
let mock = MockLlmClient::new(vec![turn1, turn2]);
let req1 = make_request(vec![user_message("list /tmp")]);
let _ = mock.create_message_stream(req1).await.unwrap().next().await;
let next_messages = vec![
user_message("list /tmp"),
assistant_thinking("I should call list_dir.", ""),
assistant_tool_call("call_a", "list_dir", serde_json::json!({ "path": "/tmp" })),
tool_result_message("call_a", "/tmp/file1\n/tmp/file2"),
];
let req2 = make_request(next_messages);
let _ = mock.create_message_stream(req2).await.unwrap();
let captured = mock.captured_requests();
assert_eq!(captured.len(), 2);
let req2 = &captured[1];
let assistant_with_thinking = req2
.messages
.iter()
.find(|m| {
m.role == "assistant"
&& m.content
.iter()
.any(|b| matches!(b, ContentBlock::Thinking { .. }))
})
.expect("turn 2 request must replay assistant Thinking content");
let thinking_text = assistant_with_thinking
.content
.iter()
.find_map(|b| match b {
ContentBlock::Thinking { thinking } => Some(thinking.clone()),
_ => None,
})
.expect("Thinking block present");
assert_eq!(
thinking_text, "I should call list_dir.",
"reasoning_content must be replayed verbatim across tool-call rounds"
);
}
#[tokio::test]
async fn tool_call_round_trip_streams_args_then_continues() {
let turn1 = vec![
canned::message_start("rt1"),
canned::tool_use_block_start(0, "call_x", "read_file"),
canned::tool_input_delta(0, r#"{"path":"#),
canned::tool_input_delta(0, r#""README.md"}"#),
canned::block_stop(0),
canned::message_delta("tool_use", None),
canned::message_stop(),
];
let turn2 = vec![
canned::message_start("rt2"),
canned::text_block_start(0),
canned::text_delta(0, "README starts with: # deepseek-tui"),
canned::block_stop(0),
canned::message_delta("end_turn", None),
canned::message_stop(),
];
let mock = MockLlmClient::new(vec![turn1, turn2]);
let mut s1 = mock
.create_message_stream(make_request(vec![user_message("read README.md")]))
.await
.unwrap();
let mut tool_use_seen = false;
let mut json_seen = String::new();
while let Some(ev) = s1.next().await {
match ev.unwrap() {
StreamEvent::ContentBlockStart { content_block, .. } => {
use crate::models::ContentBlockStart;
if let ContentBlockStart::ToolUse { name, .. } = content_block {
assert_eq!(name, "read_file");
tool_use_seen = true;
}
}
StreamEvent::ContentBlockDelta {
delta: Delta::InputJsonDelta { partial_json },
..
} => json_seen.push_str(&partial_json),
StreamEvent::MessageStop => break,
_ => {}
}
}
assert!(tool_use_seen);
let parsed: serde_json::Value =
serde_json::from_str(&json_seen).expect("valid JSON after concat");
assert_eq!(parsed["path"], "README.md");
let req2 = make_request(vec![
user_message("read README.md"),
assistant_tool_call(
"call_x",
"read_file",
serde_json::json!({ "path": "README.md" }),
),
tool_result_message("call_x", "# deepseek-tui\n..."),
]);
let (text, stop) = drain_stream_text(&mock, req2).await;
assert!(text.contains("# deepseek-tui"));
assert_eq!(stop.as_deref(), Some("end_turn"));
}
#[tokio::test]
async fn parallel_tool_calls_preserve_ordering_in_turn_payload() {
let turn = vec![
canned::message_start("p1"),
canned::tool_use_block_start(0, "call_one", "list_dir"),
canned::tool_input_delta(0, r#"{"path":"a"}"#),
canned::block_stop(0),
canned::tool_use_block_start(1, "call_two", "list_dir"),
canned::tool_input_delta(1, r#"{"path":"b"}"#),
canned::block_stop(1),
canned::message_delta("tool_use", None),
canned::message_stop(),
];
let mock = MockLlmClient::new(vec![turn]);
let mut stream = mock
.create_message_stream(make_request(vec![user_message("list both")]))
.await
.unwrap();
let mut starts: Vec<(u32, String)> = Vec::new();
while let Some(ev) = stream.next().await {
if let StreamEvent::ContentBlockStart {
index,
content_block,
} = ev.unwrap()
{
use crate::models::ContentBlockStart;
if let ContentBlockStart::ToolUse { id, .. } = content_block {
starts.push((index, id));
}
}
}
assert_eq!(starts.len(), 2);
assert_eq!(starts[0], (0, "call_one".to_string()));
assert_eq!(starts[1], (1, "call_two".to_string()));
}
#[tokio::test]
async fn compaction_non_streaming_returns_queued_message_response() {
use crate::models::MessageResponse;
let mock = MockLlmClient::new(vec![]);
mock.push_message_response(MessageResponse {
id: "compact_msg".to_string(),
r#type: "message".to_string(),
role: "assistant".to_string(),
content: vec![ContentBlock::Text {
text: "## Summary\n- Step 1\n- Step 2".to_string(),
cache_control: None,
}],
model: "deepseek-v4-pro".to_string(),
stop_reason: Some("end_turn".to_string()),
stop_sequence: None,
container: None,
usage: Usage::default(),
});
let req = MessageRequest {
stream: Some(false),
..make_request(vec![user_message("summarize")])
};
let resp = mock.create_message(req).await.unwrap();
let text = match &resp.content[0] {
ContentBlock::Text { text, .. } => text.clone(),
_ => panic!("expected text content"),
};
assert!(text.contains("Summary"));
assert_eq!(resp.id, "compact_msg");
assert_eq!(mock.call_count(), 1);
}
#[tokio::test]
async fn sub_agent_parent_and_child_each_drive_independent_mocks() {
let parent_turn = vec![
canned::message_start("parent_t1"),
canned::tool_use_block_start(0, "spawn_id", "agent_spawn"),
canned::tool_input_delta(0, r#"{"prompt":"compute 2+2"}"#),
canned::block_stop(0),
canned::message_delta("tool_use", None),
canned::message_stop(),
];
let parent = MockLlmClient::new(vec![parent_turn])
.with_provider("mock-parent")
.with_model("deepseek-v4-pro");
let child_turn = vec![
canned::message_start("child_t1"),
canned::text_block_start(0),
canned::text_delta(0, "4"),
canned::block_stop(0),
canned::message_delta("end_turn", None),
canned::message_stop(),
];
let child = MockLlmClient::new(vec![child_turn])
.with_provider("mock-child")
.with_model("deepseek-v4-flash");
let _ = parent
.create_message_stream(make_request(vec![user_message("delegate")]))
.await
.unwrap()
.next()
.await;
let (child_text, _) =
drain_stream_text(&child, make_request(vec![user_message("compute 2+2")])).await;
assert_eq!(child_text, "4");
assert_eq!(parent.provider_name(), "mock-parent");
assert_eq!(child.provider_name(), "mock-child");
assert_eq!(parent.captured_requests().len(), 1);
assert_eq!(child.captured_requests().len(), 1);
}
#[tokio::test]
async fn capacity_gate_can_observe_request_before_response_streams() {
let turn = vec![canned::simple_text_turn("ok")];
let mock = MockLlmClient::new(turn);
let mut messages = Vec::new();
for i in 0..200 {
messages.push(user_message(&format!("m{i}")));
}
let req = make_request(messages);
let stream_future = mock.create_message_stream(req);
let mut stream = stream_future.await.unwrap();
assert_eq!(mock.captured_requests().len(), 1);
let captured = mock.last_request().unwrap();
assert_eq!(captured.messages.len(), 200);
let total_chars: usize = captured
.messages
.iter()
.flat_map(|m| m.content.iter())
.map(|b| match b {
ContentBlock::Text { text, .. } => text.len(),
_ => 0,
})
.sum();
assert!(
total_chars > 100,
"synthetic over-cap request should have non-trivial size"
);
while stream.next().await.is_some() {}
}
#[test]
fn compaction_config_defaults_are_enabled_for_session_survivability() {
let config =
crate::models::compaction_threshold_for_model_and_effort("deepseek-v4-pro", Some("high"));
assert!(config > 0, "compaction threshold must be positive");
assert!(config < 1_000_000, "compaction threshold must be below 1M");
}