use std::collections::BTreeMap;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use dynamo_llm::model_card::ModelDeploymentCard;
use dynamo_llm::preprocessor::OpenAIPreprocessor;
use dynamo_llm::protocols::openai::chat_completions::{
NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse,
};
use dynamo_protocols::types::{
ChatCompletionMessageContent, ChatCompletionToolChoiceOption, FinishReason,
};
use dynamo_runtime::protocols::annotated::Annotated;
use futures::{StreamExt, stream};
use serde_json::Value;
const REQUEST_JSON: &str = r#"{"messages":[{"role":"user","content":"What is the capital of Tuvalu?"}],"model":"Qwen/Qwen3-0.6B","max_completion_tokens":3000,"stream":true,"stream_options":{"include_usage":true,"continuous_usage_stats":false},"temperature":1.0,"top_p":1.0}"#;
fn build_preprocessor(
reasoning_parser: Option<&str>,
tool_call_parser: Option<&str>,
) -> Arc<OpenAIPreprocessor> {
let model_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests/data/sample-models/mock-llama-3.1-8b-instruct");
let mut mdc = ModelDeploymentCard::load_from_disk(model_path, None).unwrap();
mdc.runtime_config.reasoning_parser = reasoning_parser.map(ToString::to_string);
mdc.runtime_config.tool_call_parser = tool_call_parser.map(ToString::to_string);
OpenAIPreprocessor::new(mdc).unwrap()
}
fn fixture_path(name: &str) -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests/data/replays")
.join(name)
}
fn parse_fixture(
jsonl_path: &Path,
) -> (
NvCreateChatCompletionRequest,
Vec<Value>,
Vec<NvCreateChatCompletionStreamResponse>,
) {
let content = fs::read_to_string(jsonl_path)
.unwrap_or_else(|e| panic!("failed to read fixture {}: {e}", jsonl_path.display()));
let mut expected_stream_json = Vec::new();
let mut input_chunks = Vec::new();
for line in content.lines().filter(|l| !l.is_empty()) {
let value: Value = serde_json::from_str(line).unwrap();
let chunk: NvCreateChatCompletionStreamResponse =
serde_json::from_value(value.clone()).unwrap();
let normalized = serde_json::to_value(&chunk).unwrap();
expected_stream_json.push(normalized);
input_chunks.push(chunk);
}
let request: NvCreateChatCompletionRequest = serde_json::from_str(REQUEST_JSON).unwrap();
assert!(
!input_chunks.is_empty(),
"missing stream chunks in fixture {}",
jsonl_path.display()
);
(request, expected_stream_json, input_chunks)
}
fn get_text(content: &ChatCompletionMessageContent) -> &str {
match content {
ChatCompletionMessageContent::Text(text) => text.as_str(),
ChatCompletionMessageContent::Parts(_) => "",
}
}
#[derive(Default, Clone)]
struct MergedToolCall {
id: Option<String>,
r#type: Option<String>,
name: Option<String>,
arguments: String,
}
impl MergedToolCall {
fn merge_from(
&mut self,
tool_call: &dynamo_protocols::types::ChatCompletionMessageToolCallChunk,
) {
if self.id.is_none() {
self.id = tool_call.id.clone();
}
if self.r#type.is_none() {
self.r#type = tool_call.r#type.as_ref().map(|t| {
serde_json::to_string(t)
.unwrap()
.trim_matches('"')
.to_string()
});
}
if let Some(function) = &tool_call.function {
if self.name.is_none() {
self.name = function.name.clone();
}
if let Some(arguments) = &function.arguments {
self.arguments.push_str(arguments);
}
}
}
}
#[tokio::test]
async fn postprocessor_parsing_stream_replays_unit_test_fixture() {
let preprocessor = build_preprocessor(None, None);
let (request, expected_stream_json, input_chunks) =
parse_fixture(&fixture_path("stream_interval_1.jsonl"));
let input_stream = stream::iter(input_chunks.into_iter().map(Annotated::from_data));
let output_stream = preprocessor
.postprocessor_parsing_stream(input_stream, &request, false)
.expect("postprocessor_parsing_stream should build");
let output_chunks: Vec<Annotated<NvCreateChatCompletionStreamResponse>> =
output_stream.collect().await;
assert_eq!(output_chunks.len(), expected_stream_json.len());
for (idx, (output, expected)) in output_chunks
.iter()
.zip(expected_stream_json.iter())
.enumerate()
{
let output_data = output
.data
.as_ref()
.expect("output stream chunk should include data");
let output_json = serde_json::to_value(output_data).unwrap();
assert_eq!(output_json, *expected, "chunk {idx} did not match fixture");
}
}
#[tokio::test]
async fn postprocessor_parsing_stream_replays_interval_20_fixture() {
let preprocessor = build_preprocessor(Some("qwen"), Some("hermes"));
let (mut request, _expected_stream_json, input_chunks) =
parse_fixture(&fixture_path("stream_interval_20.jsonl"));
let tools: Vec<dynamo_protocols::types::ChatCompletionTool> =
serde_json::from_value(serde_json::json!([
{
"type": "function",
"function": {
"name": "search_gutenberg_books",
"description": "Search for books in the Project Gutenberg library",
"parameters": {
"type": "object",
"properties": {
"search_terms": {
"type": "array",
"items": {"type": "string"},
"description": "List of search terms to find books"
}
},
"required": ["search_terms"]
}
}
}
]))
.unwrap();
request.inner.tools = Some(tools);
request.inner.tool_choice = Some(ChatCompletionToolChoiceOption::Auto);
let input_stream = stream::iter(input_chunks.into_iter().map(Annotated::from_data));
let output_stream = preprocessor
.postprocessor_parsing_stream(input_stream, &request, false)
.expect("postprocessor_parsing_stream should build");
let output_chunks: Vec<Annotated<NvCreateChatCompletionStreamResponse>> =
output_stream.collect().await;
let mut reasoning = String::new();
let mut all_content = String::new();
let mut finish_reasons = Vec::new();
let mut merged_tool_calls: BTreeMap<u32, MergedToolCall> = BTreeMap::new();
for output in &output_chunks {
let Some(output_data) = output.data.as_ref() else {
continue;
};
for choice in &output_data.inner.choices {
if let Some(reasoning_content) = &choice.delta.reasoning_content {
reasoning.push_str(reasoning_content);
}
if let Some(content) = &choice.delta.content {
all_content.push_str(get_text(content));
}
if let Some(reason) = choice.finish_reason {
finish_reasons.push(reason);
}
if let Some(tool_calls) = &choice.delta.tool_calls {
for tool_call in tool_calls {
merged_tool_calls
.entry(tool_call.index)
.or_default()
.merge_from(tool_call);
}
}
}
}
let tool_calls: Vec<MergedToolCall> = merged_tool_calls.values().cloned().collect();
assert!(
reasoning.contains("the user is asking for the titles of some James Joyce books"),
"reasoning did not contain expected phrase: {reasoning}"
);
assert!(
reasoning.contains("the user's request.\n"),
"reasoning did not contain expected ending: {reasoning}"
);
assert_eq!(
tool_calls.len(),
1,
"Expected 1 tool call but got {}. Tool-call markup was likely emitted as plain content instead.",
tool_calls.len()
);
let tc = &tool_calls[0];
assert_eq!(tc.name.as_deref(), Some("search_gutenberg_books"));
let arguments_json: Value = serde_json::from_str(&tc.arguments).unwrap();
assert_eq!(
arguments_json,
serde_json::json!({
"search_terms": ["James Joyce", "Project Gutenberg"]
})
);
assert!(
tc.id
.as_ref()
.is_some_and(|id| id.starts_with("call-") || id.starts_with("chatcmpl-tool-")),
"tool call id did not match expected prefix: {:?}",
tc.id
);
assert_eq!(tc.r#type.as_deref(), Some("function"));
assert!(
!all_content.contains("<tool_call>"),
"Raw <tool_call> markup leaked into content: {all_content:?}"
);
assert!(!all_content.contains("</tool_call>"));
if !finish_reasons.is_empty() {
assert!(
finish_reasons.contains(&FinishReason::Stop)
|| finish_reasons.contains(&FinishReason::ToolCalls),
"expected terminal finish reason (stop/tool_calls), got: {:?}",
finish_reasons
);
}
}
fn mock_content_chunk(content: &str) -> NvCreateChatCompletionStreamResponse {
use dynamo_protocols::types::{
ChatChoiceStream, ChatCompletionStreamResponseDelta, CreateChatCompletionStreamResponse,
Role,
};
#[allow(deprecated)]
let choice = ChatChoiceStream {
index: 0,
delta: ChatCompletionStreamResponseDelta {
role: Some(Role::Assistant),
content: Some(ChatCompletionMessageContent::Text(content.to_string())),
tool_calls: None,
function_call: None,
refusal: None,
reasoning_content: None,
},
finish_reason: None,
logprobs: None,
};
NvCreateChatCompletionStreamResponse {
inner: CreateChatCompletionStreamResponse {
id: "test-id".to_string(),
choices: vec![choice],
created: 0,
model: "test-model".to_string(),
system_fingerprint: None,
object: "chat.completion.chunk".to_string(),
usage: None,
service_tier: None,
},
nvext: None,
}
}
fn mock_multi_choice_content_chunk(
choices: &[(u32, &str)],
) -> NvCreateChatCompletionStreamResponse {
use dynamo_protocols::types::{
ChatChoiceStream, ChatCompletionStreamResponseDelta, CreateChatCompletionStreamResponse,
Role,
};
#[allow(deprecated)]
let choices = choices
.iter()
.map(|(index, content)| ChatChoiceStream {
index: *index,
delta: ChatCompletionStreamResponseDelta {
role: Some(Role::Assistant),
content: Some(ChatCompletionMessageContent::Text((*content).to_string())),
tool_calls: None,
function_call: None,
refusal: None,
reasoning_content: None,
},
finish_reason: None,
logprobs: None,
})
.collect();
NvCreateChatCompletionStreamResponse {
inner: CreateChatCompletionStreamResponse {
id: "test-id".to_string(),
choices,
created: 0,
model: "test-model".to_string(),
system_fingerprint: None,
object: "chat.completion.chunk".to_string(),
usage: None,
service_tier: None,
},
nvext: None,
}
}
fn mock_final_chunk() -> NvCreateChatCompletionStreamResponse {
use dynamo_protocols::types::{
ChatChoiceStream, ChatCompletionStreamResponseDelta, CreateChatCompletionStreamResponse,
};
#[allow(deprecated)]
let choice = ChatChoiceStream {
index: 0,
delta: ChatCompletionStreamResponseDelta {
role: None,
content: None,
tool_calls: None,
function_call: None,
refusal: None,
reasoning_content: None,
},
finish_reason: Some(FinishReason::Stop),
logprobs: None,
};
NvCreateChatCompletionStreamResponse {
inner: CreateChatCompletionStreamResponse {
id: "test-id".to_string(),
choices: vec![choice],
created: 0,
model: "test-model".to_string(),
system_fingerprint: None,
object: "chat.completion.chunk".to_string(),
usage: None,
service_tier: None,
},
nvext: None,
}
}
#[tokio::test]
async fn postprocessor_parsing_stream_deepseek_v4_tool_continuation_keeps_injected_reasoning() {
let preprocessor = build_preprocessor(Some("deepseek_v4"), None);
let request: NvCreateChatCompletionRequest = serde_json::from_value(serde_json::json!({
"messages": [
{"role": "user", "content": "Create and run a hello-world script."},
{
"role": "assistant",
"tool_calls": [{
"id": "call_1",
"type": "function",
"function": {
"name": "run_python",
"arguments": "{\"path\":\"/tmp/hello.py\"}"
}
}]
},
{
"role": "tool",
"tool_call_id": "call_1",
"content": "Hello, world!"
}
],
"model": "deepseek-ai/DeepSeek-V4-Pro",
"stream": true
}))
.unwrap();
let input_chunks = vec![
mock_content_chunk("The script ran successfully."),
mock_content_chunk("</think>"),
mock_content_chunk("Done. Output: `Hello, world!`"),
mock_final_chunk(),
];
let input_stream = stream::iter(input_chunks.into_iter().map(Annotated::from_data));
let output_stream = preprocessor
.postprocessor_parsing_stream(input_stream, &request, true)
.expect("postprocessor_parsing_stream should build");
let output_chunks: Vec<Annotated<NvCreateChatCompletionStreamResponse>> =
output_stream.collect().await;
let mut reasoning = String::new();
let mut content = String::new();
for output in &output_chunks {
let Some(data) = output.data.as_ref() else {
continue;
};
for choice in &data.inner.choices {
if let Some(r) = &choice.delta.reasoning_content {
reasoning.push_str(r);
}
if let Some(c) = &choice.delta.content {
content.push_str(get_text(c));
}
}
}
assert_eq!(reasoning, "The script ran successfully.");
assert_eq!(content, "Done. Output: `Hello, world!`");
assert!(
!content.contains("</think>"),
"literal closing tag leaked into content: {content:?}"
);
}
#[tokio::test]
async fn postprocessor_parsing_stream_kimi_k25_tool_continuation_suppresses_injected_reasoning() {
let preprocessor = build_preprocessor(Some("kimi_k25"), None);
let request: NvCreateChatCompletionRequest = serde_json::from_value(serde_json::json!({
"messages": [
{"role": "user", "content": "Create and run a hello-world script."},
{
"role": "assistant",
"tool_calls": [{
"id": "call_1",
"type": "function",
"function": {
"name": "run_python",
"arguments": "{\"path\":\"/tmp/hello.py\"}"
}
}]
},
{
"role": "tool",
"tool_call_id": "call_1",
"content": "Hello, world!"
}
],
"model": "moonshotai/Kimi-K2.5-Instruct",
"stream": true
}))
.unwrap();
let input_chunks = vec![
mock_content_chunk("Done. Output: `Hello, world!`"),
mock_final_chunk(),
];
let input_stream = stream::iter(input_chunks.into_iter().map(Annotated::from_data));
let output_stream = preprocessor
.postprocessor_parsing_stream(input_stream, &request, true)
.expect("postprocessor_parsing_stream should build");
let output_chunks: Vec<Annotated<NvCreateChatCompletionStreamResponse>> =
output_stream.collect().await;
let mut reasoning = String::new();
let mut content = String::new();
for output in &output_chunks {
let Some(data) = output.data.as_ref() else {
continue;
};
for choice in &data.inner.choices {
if let Some(r) = &choice.delta.reasoning_content {
reasoning.push_str(r);
}
if let Some(c) = &choice.delta.content {
content.push_str(get_text(c));
}
}
}
assert_eq!(
reasoning, "",
"direct post-tool Kimi answer must not be mislabeled as reasoning_content",
);
assert_eq!(content, "Done. Output: `Hello, world!`");
}
#[tokio::test]
async fn postprocessor_parsing_stream_nemotron_v3_enable_thinking_false_returns_content() {
let preprocessor = build_preprocessor(Some("nemotron_v3"), None);
let mut request: NvCreateChatCompletionRequest = serde_json::from_str(REQUEST_JSON).unwrap();
request.chat_template_args = Some(
serde_json::from_value(serde_json::json!({
"enable_thinking": false
}))
.unwrap(),
);
let input_chunks = vec![mock_content_chunk("This is plain content")];
let input_stream = stream::iter(input_chunks.into_iter().map(Annotated::from_data));
let output_stream = preprocessor
.postprocessor_parsing_stream(input_stream, &request, false)
.expect("postprocessor_parsing_stream should build");
let output_chunks: Vec<Annotated<NvCreateChatCompletionStreamResponse>> =
output_stream.collect().await;
let mut reasoning = String::new();
let mut content = String::new();
for output in &output_chunks {
let Some(data) = output.data.as_ref() else {
continue;
};
for choice in &data.inner.choices {
if let Some(r) = &choice.delta.reasoning_content {
reasoning.push_str(r);
}
if let Some(c) = &choice.delta.content {
content.push_str(get_text(c));
}
}
}
assert_eq!(reasoning, "");
assert_eq!(content, "This is plain content");
}
#[tokio::test]
async fn postprocessor_parsing_stream_nemotron_v3_force_nonempty_strips_start_token() {
let preprocessor = build_preprocessor(Some("nemotron_v3"), None);
let mut request: NvCreateChatCompletionRequest = serde_json::from_str(REQUEST_JSON).unwrap();
request.chat_template_args = Some(
serde_json::from_value(serde_json::json!({
"force_nonempty_content": true
}))
.unwrap(),
);
let input_chunks = vec![
mock_content_chunk("<thi"),
mock_content_chunk("nk>This is plain content"),
];
let input_stream = stream::iter(input_chunks.into_iter().map(Annotated::from_data));
let output_stream = preprocessor
.postprocessor_parsing_stream(input_stream, &request, false)
.expect("postprocessor_parsing_stream should build");
let output_chunks: Vec<Annotated<NvCreateChatCompletionStreamResponse>> =
output_stream.collect().await;
let mut reasoning = String::new();
let mut content = String::new();
for output in &output_chunks {
let Some(data) = output.data.as_ref() else {
continue;
};
for choice in &data.inner.choices {
if let Some(r) = &choice.delta.reasoning_content {
reasoning.push_str(r);
}
if let Some(c) = &choice.delta.content {
content.push_str(get_text(c));
}
}
}
assert_eq!(reasoning, "");
assert_eq!(content, "This is plain content");
}
#[tokio::test]
async fn postprocessor_parsing_stream_nemotron_v3_force_nonempty_flushes_partial_prefix_on_finish()
{
let preprocessor = build_preprocessor(Some("nemotron_v3"), None);
let mut request: NvCreateChatCompletionRequest = serde_json::from_str(REQUEST_JSON).unwrap();
request.chat_template_args = Some(
serde_json::from_value(serde_json::json!({
"force_nonempty_content": true
}))
.unwrap(),
);
let input_chunks = vec![mock_content_chunk("<thi"), mock_final_chunk()];
let input_stream = stream::iter(input_chunks.into_iter().map(Annotated::from_data));
let output_stream = preprocessor
.postprocessor_parsing_stream(input_stream, &request, false)
.expect("postprocessor_parsing_stream should build");
let output_chunks: Vec<Annotated<NvCreateChatCompletionStreamResponse>> =
output_stream.collect().await;
let mut reasoning = String::new();
let mut content = String::new();
let mut finish_reasons = Vec::new();
for output in &output_chunks {
let Some(data) = output.data.as_ref() else {
continue;
};
for choice in &data.inner.choices {
if let Some(r) = &choice.delta.reasoning_content {
reasoning.push_str(r);
}
if let Some(c) = &choice.delta.content {
content.push_str(get_text(c));
}
if let Some(fr) = choice.finish_reason {
finish_reasons.push(fr);
}
}
}
assert_eq!(reasoning, "");
assert_eq!(content, "<thi");
assert!(finish_reasons.contains(&FinishReason::Stop));
}
#[tokio::test]
async fn postprocessor_parsing_stream_nemotron_v3_force_nonempty_flushes_partial_prefix_on_eof() {
let preprocessor = build_preprocessor(Some("nemotron_v3"), None);
let mut request: NvCreateChatCompletionRequest = serde_json::from_str(REQUEST_JSON).unwrap();
request.chat_template_args = Some(
serde_json::from_value(serde_json::json!({
"force_nonempty_content": true
}))
.unwrap(),
);
let input_chunks = vec![mock_content_chunk("<thi")];
let input_stream = stream::iter(input_chunks.into_iter().map(Annotated::from_data));
let output_stream = preprocessor
.postprocessor_parsing_stream(input_stream, &request, false)
.expect("postprocessor_parsing_stream should build");
let output_chunks: Vec<Annotated<NvCreateChatCompletionStreamResponse>> =
output_stream.collect().await;
let mut reasoning = String::new();
let mut content = String::new();
for output in &output_chunks {
let Some(data) = output.data.as_ref() else {
continue;
};
for choice in &data.inner.choices {
if let Some(r) = &choice.delta.reasoning_content {
reasoning.push_str(r);
}
if let Some(c) = &choice.delta.content {
content.push_str(get_text(c));
}
}
}
assert_eq!(reasoning, "");
assert_eq!(content, "<thi");
}
#[tokio::test]
async fn postprocessor_parsing_stream_nemotron_v3_force_nonempty_tracks_prefix_per_choice() {
let preprocessor = build_preprocessor(Some("nemotron_v3"), None);
let mut request: NvCreateChatCompletionRequest = serde_json::from_str(REQUEST_JSON).unwrap();
request.chat_template_args = Some(
serde_json::from_value(serde_json::json!({
"force_nonempty_content": true
}))
.unwrap(),
);
let input_chunks = vec![
mock_multi_choice_content_chunk(&[(0, "<thi"), (1, "<thi")]),
mock_multi_choice_content_chunk(&[(0, "nk>First"), (1, "nk>Second")]),
];
let input_stream = stream::iter(input_chunks.into_iter().map(Annotated::from_data));
let output_stream = preprocessor
.postprocessor_parsing_stream(input_stream, &request, false)
.expect("postprocessor_parsing_stream should build");
let output_chunks: Vec<Annotated<NvCreateChatCompletionStreamResponse>> =
output_stream.collect().await;
let mut content_by_choice = BTreeMap::new();
for output in &output_chunks {
let Some(data) = output.data.as_ref() else {
continue;
};
for choice in &data.inner.choices {
if let Some(c) = &choice.delta.content {
content_by_choice
.entry(choice.index)
.or_insert_with(String::new)
.push_str(get_text(c));
}
assert!(
choice.delta.reasoning_content.is_none(),
"reasoning_content must stay empty when force_nonempty_content=true"
);
}
}
assert_eq!(content_by_choice.get(&0).map(String::as_str), Some("First"));
assert_eq!(
content_by_choice.get(&1).map(String::as_str),
Some("Second")
);
}
#[tokio::test]
async fn postprocessor_parsing_stream_minimax_required_bypasses_reasoning() {
let preprocessor = build_preprocessor(Some("minimax_append_think"), Some("minimax_m2"));
let mut request: NvCreateChatCompletionRequest = serde_json::from_str(REQUEST_JSON).unwrap();
let tools: Vec<dynamo_protocols::types::ChatCompletionTool> =
serde_json::from_value(serde_json::json!([{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get the current weather for a location.",
"parameters": {
"type": "object",
"properties": {"location": {"type": "string"}},
"required": ["location"]
}
}
}]))
.unwrap();
request.inner.tools = Some(tools);
request.inner.tool_choice = Some(ChatCompletionToolChoiceOption::Required);
let bare_json = r#"[{"name": "get_weather", "parameters": {"location": "San Francisco"}}]"#;
let input_chunks = vec![mock_content_chunk(bare_json), mock_final_chunk()];
let input_stream = stream::iter(input_chunks.into_iter().map(Annotated::from_data));
let output_stream = preprocessor
.postprocessor_parsing_stream(input_stream, &request, false)
.expect("postprocessor_parsing_stream should build");
let output_chunks: Vec<Annotated<NvCreateChatCompletionStreamResponse>> =
output_stream.collect().await;
let mut reasoning = String::new();
let mut content = String::new();
let mut merged_tool_calls: BTreeMap<u32, MergedToolCall> = BTreeMap::new();
let mut finish_reasons = Vec::new();
for output in &output_chunks {
let Some(data) = output.data.as_ref() else {
continue;
};
for choice in &data.inner.choices {
if let Some(r) = &choice.delta.reasoning_content {
reasoning.push_str(r);
}
if let Some(c) = &choice.delta.content {
content.push_str(get_text(c));
}
if let Some(tcs) = &choice.delta.tool_calls {
for tc in tcs {
merged_tool_calls
.entry(tc.index)
.or_default()
.merge_from(tc);
}
}
if let Some(fr) = choice.finish_reason {
finish_reasons.push(fr);
}
}
}
assert!(
reasoning.is_empty(),
"reasoning_content must be empty when tool_choice=required forces bare JSON, got: {reasoning:?}"
);
assert!(
!content.contains("get_weather"),
"tool call JSON must not leak into content, got: {content:?}"
);
let tool_calls: Vec<MergedToolCall> = merged_tool_calls.values().cloned().collect();
assert_eq!(tool_calls.len(), 1, "expected one tool call");
assert_eq!(tool_calls[0].name.as_deref(), Some("get_weather"));
let args: Value = serde_json::from_str(&tool_calls[0].arguments).unwrap();
assert_eq!(args, serde_json::json!({"location": "San Francisco"}));
assert!(
finish_reasons.contains(&FinishReason::ToolCalls),
"expected ToolCalls finish_reason, got: {finish_reasons:?}"
);
}
#[tokio::test]
async fn postprocessor_parsing_stream_nemotron_required_smoke_case() {
for (case, parser) in [("nano", "nemotron_nano"), ("super/deci", "nemotron_deci")] {
let preprocessor = build_preprocessor(Some(parser), Some(parser));
let mut request: NvCreateChatCompletionRequest =
serde_json::from_value(serde_json::json!({
"model": "nvidia/nvidia/nemotron-3-super-120b-long-ctx",
"messages": [
{"role": "user", "content": "What is the weather in San Francisco?"}
],
"stream": true,
"temperature": 0.0
}))
.unwrap();
let tools: Vec<dynamo_protocols::types::ChatCompletionTool> =
serde_json::from_value(serde_json::json!([{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get the current weather for a location.",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "City name"
}
},
"required": ["location"]
}
}
}]))
.unwrap();
request.inner.tools = Some(tools);
request.inner.tool_choice = Some(ChatCompletionToolChoiceOption::Required);
let bare_json = r#"[{"name":"get_weather","parameters":{"location":"San Francisco"}}]"#;
let input_chunks = vec![mock_content_chunk(bare_json), mock_final_chunk()];
let input_stream = stream::iter(input_chunks.into_iter().map(Annotated::from_data));
let output_stream = preprocessor
.postprocessor_parsing_stream(input_stream, &request, true)
.expect("postprocessor_parsing_stream should build");
let output_chunks: Vec<Annotated<NvCreateChatCompletionStreamResponse>> =
output_stream.collect().await;
let mut reasoning = String::new();
let mut content = String::new();
let mut merged_tool_calls: BTreeMap<u32, MergedToolCall> = BTreeMap::new();
let mut finish_reasons = Vec::new();
for output in &output_chunks {
let Some(data) = output.data.as_ref() else {
continue;
};
for choice in &data.inner.choices {
if let Some(r) = &choice.delta.reasoning_content {
reasoning.push_str(r);
}
if let Some(c) = &choice.delta.content {
content.push_str(get_text(c));
}
if let Some(tcs) = &choice.delta.tool_calls {
for tc in tcs {
merged_tool_calls
.entry(tc.index)
.or_default()
.merge_from(tc);
}
}
if let Some(fr) = choice.finish_reason {
finish_reasons.push(fr);
}
}
}
assert!(
reasoning.is_empty(),
"{case}: reasoning_content must be empty when tool_choice=required forces bare JSON, got: {reasoning:?}"
);
assert!(
!content.contains("get_weather"),
"{case}: tool-call JSON must not leak into content, got: {content:?}"
);
assert!(
!content.contains("<tool_call>"),
"{case}: raw <tool_call> XML must not leak into content, got: {content:?}"
);
let tool_calls: Vec<MergedToolCall> = merged_tool_calls.values().cloned().collect();
assert_eq!(tool_calls.len(), 1, "{case}: expected one tool call");
assert_eq!(
tool_calls[0].name.as_deref(),
Some("get_weather"),
"{case}: wrong tool name"
);
let args: Value = serde_json::from_str(&tool_calls[0].arguments).unwrap();
assert_eq!(
args,
serde_json::json!({"location": "San Francisco"}),
"{case}: wrong arguments"
);
assert!(
finish_reasons.contains(&FinishReason::ToolCalls),
"{case}: expected ToolCalls finish_reason, got: {finish_reasons:?}"
);
}
}
#[tokio::test]
async fn postprocessor_parsing_stream_minimax_named_bypasses_reasoning() {
let preprocessor = build_preprocessor(Some("minimax_append_think"), Some("minimax_m2"));
let mut request: NvCreateChatCompletionRequest = serde_json::from_str(REQUEST_JSON).unwrap();
let tools: Vec<dynamo_protocols::types::ChatCompletionTool> =
serde_json::from_value(serde_json::json!([{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get the current weather for a location.",
"parameters": {
"type": "object",
"properties": {"location": {"type": "string"}},
"required": ["location"]
}
}
}]))
.unwrap();
request.inner.tools = Some(tools);
request.inner.tool_choice = Some(ChatCompletionToolChoiceOption::Named(
"get_weather".to_string().into(),
));
let bare_json = r#"[{"name": "get_weather", "parameters": {"location": "Tokyo"}}]"#;
let input_chunks = vec![mock_content_chunk(bare_json), mock_final_chunk()];
let input_stream = stream::iter(input_chunks.into_iter().map(Annotated::from_data));
let output_stream = preprocessor
.postprocessor_parsing_stream(input_stream, &request, false)
.expect("postprocessor_parsing_stream should build");
let output_chunks: Vec<Annotated<NvCreateChatCompletionStreamResponse>> =
output_stream.collect().await;
let mut reasoning = String::new();
let mut merged_tool_calls: BTreeMap<u32, MergedToolCall> = BTreeMap::new();
let mut finish_reasons = Vec::new();
for output in &output_chunks {
let Some(data) = output.data.as_ref() else {
continue;
};
for choice in &data.inner.choices {
if let Some(r) = &choice.delta.reasoning_content {
reasoning.push_str(r);
}
if let Some(tcs) = &choice.delta.tool_calls {
for tc in tcs {
merged_tool_calls
.entry(tc.index)
.or_default()
.merge_from(tc);
}
}
if let Some(fr) = choice.finish_reason {
finish_reasons.push(fr);
}
}
}
assert!(
reasoning.is_empty(),
"reasoning_content must be empty for tool_choice=named, got: {reasoning:?}"
);
let tool_calls: Vec<MergedToolCall> = merged_tool_calls.values().cloned().collect();
assert_eq!(tool_calls.len(), 1);
assert_eq!(tool_calls[0].name.as_deref(), Some("get_weather"));
assert!(
finish_reasons.contains(&FinishReason::ToolCalls),
"named tool_choice with emitted tool_calls should finish as ToolCalls, got: {finish_reasons:?}"
);
}
#[tokio::test]
async fn postprocessor_parsing_stream_minimax_named_bare_parameters() {
let preprocessor = build_preprocessor(Some("minimax_append_think"), Some("minimax_m2"));
let mut request: NvCreateChatCompletionRequest = serde_json::from_str(REQUEST_JSON).unwrap();
let tools: Vec<dynamo_protocols::types::ChatCompletionTool> =
serde_json::from_value(serde_json::json!([{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get the current weather for a location.",
"parameters": {
"type": "object",
"properties": {"location": {"type": "string"}},
"required": ["location"]
}
}
}]))
.unwrap();
request.inner.tools = Some(tools);
request.inner.tool_choice = Some(ChatCompletionToolChoiceOption::Named(
"get_weather".to_string().into(),
));
let bare_params = r#"{"location": "Paris", "unit": "celsius"}"#;
let input_chunks = vec![mock_content_chunk(bare_params), mock_final_chunk()];
let input_stream = stream::iter(input_chunks.into_iter().map(Annotated::from_data));
let output_stream = preprocessor
.postprocessor_parsing_stream(input_stream, &request, false)
.expect("postprocessor_parsing_stream should build");
let output_chunks: Vec<Annotated<NvCreateChatCompletionStreamResponse>> =
output_stream.collect().await;
let mut reasoning = String::new();
let mut content = String::new();
let mut merged_tool_calls: BTreeMap<u32, MergedToolCall> = BTreeMap::new();
for output in &output_chunks {
let Some(data) = output.data.as_ref() else {
continue;
};
for choice in &data.inner.choices {
if let Some(r) = &choice.delta.reasoning_content {
reasoning.push_str(r);
}
if let Some(c) = &choice.delta.content {
content.push_str(get_text(c));
}
if let Some(tcs) = &choice.delta.tool_calls {
for tc in tcs {
merged_tool_calls
.entry(tc.index)
.or_default()
.merge_from(tc);
}
}
}
}
assert!(
reasoning.is_empty(),
"reasoning_content must be empty (parser must be gated off), got: {reasoning:?}"
);
assert!(
!content.contains("<think>"),
"no <think> prefix should reach the client, got: {content:?}"
);
let tool_calls: Vec<MergedToolCall> = merged_tool_calls.values().cloned().collect();
assert_eq!(tool_calls.len(), 1, "expected one tool call");
assert_eq!(tool_calls[0].name.as_deref(), Some("get_weather"));
let args: Value = serde_json::from_str(&tool_calls[0].arguments).unwrap();
assert_eq!(
args,
serde_json::json!({"location": "Paris", "unit": "celsius"})
);
}