use dynamo_llm::preprocessor::OpenAIPreprocessor;
use dynamo_llm::protocols::openai::chat_completions::NvCreateChatCompletionStreamResponse;
use dynamo_llm::protocols::openai::chat_completions::jail::JailedStream;
use dynamo_protocols::types::{
ChatChoiceStream, ChatCompletionStreamResponseDelta, ChatCompletionToolChoiceOption,
CompletionUsage, FinishReason, Role,
};
use dynamo_runtime::protocols::annotated::Annotated;
#[cfg(test)]
mod tests {
use super::*;
use futures::StreamExt;
use futures::stream;
pub(crate) mod test_utils {
use super::*;
use dynamo_protocols::types::ChatCompletionMessageContent;
pub fn extract_text(content: &ChatCompletionMessageContent) -> &str {
match content {
ChatCompletionMessageContent::Text(text) => text.as_str(),
ChatCompletionMessageContent::Parts(_) => "",
}
}
pub fn create_mock_response_chunk(
content: String,
index: u32,
) -> Annotated<NvCreateChatCompletionStreamResponse> {
#[allow(deprecated)]
let choice = ChatChoiceStream {
index,
delta: ChatCompletionStreamResponseDelta {
role: Some(Role::Assistant),
content: Some(ChatCompletionMessageContent::Text(content)),
tool_calls: None,
function_call: None,
refusal: None,
reasoning_content: None,
},
finish_reason: None,
stop_reason: None,
logprobs: None,
};
let response = NvCreateChatCompletionStreamResponse {
inner: dynamo_protocols::types::CreateChatCompletionStreamResponse {
id: "test-id".to_string(),
choices: vec![choice],
created: 1234567890,
model: "test-model".to_string(),
system_fingerprint: Some("test-fingerprint".to_string()),
object: "chat.completion.chunk".to_string(),
usage: None,
service_tier: None,
},
nvext: None,
};
Annotated {
data: Some(response),
id: None,
event: None,
comment: None,
error: None,
}
}
pub fn create_final_response_chunk(
index: u32,
) -> Annotated<NvCreateChatCompletionStreamResponse> {
#[allow(deprecated)]
let choice = ChatChoiceStream {
index,
delta: ChatCompletionStreamResponseDelta {
role: None,
content: None,
tool_calls: None,
function_call: None,
refusal: None,
reasoning_content: None,
},
finish_reason: Some(FinishReason::Stop),
stop_reason: None,
logprobs: None,
};
let response = NvCreateChatCompletionStreamResponse {
inner: dynamo_protocols::types::CreateChatCompletionStreamResponse {
id: "test-id".to_string(),
choices: vec![choice],
created: 1234567890,
model: "test-model".to_string(),
system_fingerprint: Some("test-fingerprint".to_string()),
object: "chat.completion.chunk".to_string(),
usage: None,
service_tier: None,
},
nvext: None,
};
Annotated {
data: Some(response),
id: None,
event: None,
comment: None,
error: None,
}
}
pub fn create_annotated_chunk(
content: String,
index: u32,
id: Option<String>,
event: Option<String>,
comment: Option<Vec<String>>,
) -> Annotated<NvCreateChatCompletionStreamResponse> {
#[allow(deprecated)]
let choice = ChatChoiceStream {
index,
delta: ChatCompletionStreamResponseDelta {
role: Some(Role::Assistant),
content: Some(ChatCompletionMessageContent::Text(content)),
tool_calls: None,
function_call: None,
refusal: None,
reasoning_content: None,
},
finish_reason: None,
stop_reason: None,
logprobs: None,
};
let response = NvCreateChatCompletionStreamResponse {
inner: dynamo_protocols::types::CreateChatCompletionStreamResponse {
id: "test-id".to_string(),
choices: vec![choice],
created: 1234567890,
model: "test-model".to_string(),
system_fingerprint: Some("test-fingerprint".to_string()),
object: "chat.completion.chunk".to_string(),
usage: None,
service_tier: None,
},
nvext: None,
};
Annotated {
data: Some(response),
id,
event,
comment,
error: None,
}
}
pub fn create_multi_choice_chunk(
choices_content: Vec<(String, u32)>, ) -> Annotated<NvCreateChatCompletionStreamResponse> {
let choices: Vec<ChatChoiceStream> = choices_content
.into_iter()
.map(|(content, index)| {
#[allow(deprecated)]
ChatChoiceStream {
index,
delta: ChatCompletionStreamResponseDelta {
role: Some(Role::Assistant),
content: Some(ChatCompletionMessageContent::Text(content)),
tool_calls: None,
function_call: None,
refusal: None,
reasoning_content: None,
},
finish_reason: None,
stop_reason: None,
logprobs: None,
}
})
.collect();
let response = NvCreateChatCompletionStreamResponse {
inner: dynamo_protocols::types::CreateChatCompletionStreamResponse {
id: "test-id".to_string(),
choices,
created: 1234567890,
model: "test-model".to_string(),
system_fingerprint: Some("test-fingerprint".to_string()),
object: "chat.completion.chunk".to_string(),
usage: None,
service_tier: None,
},
nvext: None,
};
Annotated {
data: Some(response),
id: None,
event: None,
comment: None,
error: None,
}
}
pub fn create_multi_choice_finish_chunk(
choice_indices: Vec<u32>,
) -> Annotated<NvCreateChatCompletionStreamResponse> {
let choices: Vec<ChatChoiceStream> = choice_indices
.into_iter()
.map(|index| {
#[allow(deprecated)]
ChatChoiceStream {
index,
delta: ChatCompletionStreamResponseDelta {
role: None,
content: None,
tool_calls: None,
function_call: None,
refusal: None,
reasoning_content: None,
},
finish_reason: Some(FinishReason::Stop),
stop_reason: None,
logprobs: None,
}
})
.collect();
let response = NvCreateChatCompletionStreamResponse {
inner: dynamo_protocols::types::CreateChatCompletionStreamResponse {
id: "test-id".to_string(),
choices,
created: 1234567890,
model: "test-model".to_string(),
system_fingerprint: Some("test-fingerprint".to_string()),
object: "chat.completion.chunk".to_string(),
usage: None,
service_tier: None,
},
nvext: None,
};
Annotated {
data: Some(response),
id: None,
event: None,
comment: None,
error: None,
}
}
pub fn assert_content(
result: &Annotated<NvCreateChatCompletionStreamResponse>,
expected: &str,
) {
let content = result
.data
.as_ref()
.and_then(|d| d.inner.choices.first())
.and_then(|c| c.delta.content.as_ref())
.expect("Expected content in result");
assert_eq!(
extract_text(content),
expected,
"Content mismatch: expected '{}', got '{}'",
expected,
extract_text(content)
);
}
pub fn assert_tool_call(
result: &Annotated<NvCreateChatCompletionStreamResponse>,
name: &str,
args: serde_json::Value,
) {
let tool_calls = result
.data
.as_ref()
.and_then(|d| d.inner.choices.first())
.and_then(|c| c.delta.tool_calls.as_ref())
.expect("Expected tool calls in result");
assert!(!tool_calls.is_empty(), "Expected at least one tool call");
let tool_call = &tool_calls[0];
let function = tool_call
.function
.as_ref()
.expect("Expected function in tool call");
assert_eq!(
function.name.as_deref(),
Some(name),
"Tool call name mismatch: expected '{}', got '{:?}'",
name,
function.name
);
if let Some(arguments_str) = &function.arguments {
let parsed_args: serde_json::Value = serde_json::from_str(arguments_str)
.expect("Tool call arguments should be valid JSON");
assert_eq!(
parsed_args, args,
"Tool call arguments mismatch: expected {}, got {}",
args, parsed_args
);
} else if !args.is_null() {
panic!("Expected tool call arguments {} but got None", args);
}
}
#[allow(dead_code)]
pub fn assert_empty_emission(result: &Annotated<NvCreateChatCompletionStreamResponse>) {
if let Some(data) = &result.data
&& let Some(choice) = data.inner.choices.first()
{
assert!(
choice.delta.content.is_none()
|| choice.delta.content.as_ref().is_none_or(|c| match c {
dynamo_protocols::types::ChatCompletionMessageContent::Text(t) =>
t.is_empty(),
_ => false,
}),
"Expected no content but got: {:?}",
choice.delta.content
);
assert!(
choice.delta.tool_calls.is_none()
|| choice.delta.tool_calls.as_ref().unwrap().is_empty(),
"Expected no tool calls but got: {:?}",
choice.delta.tool_calls
);
}
}
pub fn reconstruct_content(
results: &[Annotated<NvCreateChatCompletionStreamResponse>],
) -> String {
results
.iter()
.filter_map(|r| {
r.data
.as_ref()
.and_then(|d| d.inner.choices.first())
.and_then(|c| c.delta.content.as_ref())
})
.map(extract_text)
.collect::<Vec<_>>()
.join("")
}
pub fn extract_content(result: &Annotated<NvCreateChatCompletionStreamResponse>) -> String {
result
.data
.as_ref()
.and_then(|d| d.inner.choices.first())
.and_then(|c| c.delta.content.as_ref())
.and_then(|content| match content {
ChatCompletionMessageContent::Text(text) => Some(text.clone()),
ChatCompletionMessageContent::Parts(_) => None,
})
.unwrap_or_default()
}
pub fn has_tool_call(result: &Annotated<NvCreateChatCompletionStreamResponse>) -> bool {
result
.data
.as_ref()
.and_then(|d| d.inner.choices.first())
.and_then(|c| c.delta.tool_calls.as_ref())
.map(|tc| !tc.is_empty())
.unwrap_or(false)
}
#[allow(dead_code)]
pub fn has_content(result: &Annotated<NvCreateChatCompletionStreamResponse>) -> bool {
result
.data
.as_ref()
.and_then(|d| d.inner.choices.first())
.and_then(|c| c.delta.content.as_ref())
.map(|content| !extract_text(content).is_empty())
.unwrap_or(false)
}
}
use serde_json::json;
use test_utils::*;
#[tokio::test]
async fn test_jailed_stream_with_start_end_sequences() {
let chunks = vec![
create_mock_response_chunk("Hello ".to_string(), 0),
create_mock_response_chunk("<jail>".to_string(), 0),
create_mock_response_chunk("This is jailed ".to_string(), 0),
create_mock_response_chunk("content".to_string(), 0),
create_mock_response_chunk("</jail>".to_string(), 0),
create_mock_response_chunk(" World".to_string(), 0),
];
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder()
.jail_start_sequence("<jail>")
.jail_end_sequence("</jail>")
.build();
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert_eq!(results.len(), 3);
assert_eq!(
results[0].data.as_ref().unwrap().inner.choices[0]
.delta
.content
.as_ref()
.map(extract_text),
Some("Hello ")
);
let unjailed_content = &results[1].data.as_ref().unwrap().inner.choices[0]
.delta
.content;
assert!(unjailed_content.is_some());
assert!(
extract_text(unjailed_content.as_ref().unwrap())
.contains("<jail>This is jailed content</jail>")
);
assert_eq!(
results[2].data.as_ref().unwrap().inner.choices[0]
.delta
.content
.as_ref()
.map(extract_text),
Some(" World")
);
}
#[tokio::test]
async fn test_jailed_stream_with_tool_calls() {
let chunks = vec![
create_mock_response_chunk("<TOOLCALL>".to_string(), 0),
create_mock_response_chunk(
"[{\"name\": \"get_weather\", \"arguments\": {\"location\": \"SF\"}}]".to_string(),
0,
),
create_mock_response_chunk("</TOOLCALL>".to_string(), 0),
];
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder()
.tool_call_parser("nemotron_deci")
.build();
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert!(!results.is_empty());
if let Some(last_result) = results.last()
&& let Some(ref response_data) = last_result.data
&& let Some(ref tool_calls) = response_data.inner.choices[0].delta.tool_calls
{
assert!(!tool_calls.as_slice().is_empty());
assert_eq!(
tool_calls[0].function.as_ref().unwrap().name.as_deref(),
Some("get_weather")
);
}
}
#[tokio::test]
async fn test_jailed_stream_dual_entry_paths() {
let chunks = vec![
create_mock_response_chunk("Normal text ".to_string(), 0),
create_mock_response_chunk("<jail><TOOLCALL>".to_string(), 0), create_mock_response_chunk("Jailed content".to_string(), 0),
create_mock_response_chunk("</jail>".to_string(), 0),
];
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder()
.jail_start_sequence("<jail>")
.jail_end_sequence("</jail>")
.tool_call_parser("nemotron_deci")
.build();
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert_eq!(results.len(), 2);
assert_eq!(
results[0].data.as_ref().unwrap().inner.choices[0]
.delta
.content
.as_ref()
.map(extract_text),
Some("Normal text ")
);
let jailed = results[1].data.as_ref().unwrap().inner.choices[0]
.delta
.content
.as_ref()
.expect("Expected accumulated jailed content");
assert!(extract_text(jailed).contains("<jail><TOOLCALL>Jailed content</jail>"));
}
#[tokio::test]
async fn test_jailed_stream_early_exit() {
let chunks = vec![
create_mock_response_chunk("<TOOLCALL>".to_string(), 0),
create_mock_response_chunk("[{\"name\": \"test\", ".to_string(), 0),
create_mock_response_chunk("\"arguments\": {}}]".to_string(), 0),
create_mock_response_chunk("</TOOLCALL>More text".to_string(), 0),
];
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder()
.tool_call_parser("nemotron_deci")
.build();
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert_eq!(
results.len(),
2,
"Should have tool call and trailing content"
);
test_utils::assert_tool_call(&results[0], "test", serde_json::json!({}));
test_utils::assert_content(&results[1], "More text");
let reconstructed = test_utils::reconstruct_content(&results);
assert_eq!(reconstructed, "More text");
}
#[tokio::test]
async fn test_jailed_stream_no_jailing() {
let chunks = vec![
create_mock_response_chunk("Hello ".to_string(), 0),
create_mock_response_chunk("World".to_string(), 0),
create_final_response_chunk(0),
];
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder()
.jail_start_sequence("<NOTPRESENT>")
.build();
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert_eq!(
results.len(),
3,
"Should pass through all 3 chunks unchanged"
);
assert_content(&results[0], "Hello ");
assert_content(&results[1], "World");
for (i, result) in results.iter().take(2).enumerate() {
assert!(
!has_tool_call(result),
"Chunk {} should not contain tool calls when no patterns match",
i
);
}
assert_eq!(
reconstruct_content(&results),
"Hello World",
"Content should pass through unchanged when no jailing occurs"
);
}
#[tokio::test]
async fn test_jailed_stream_hermes_parser() {
let chunks = vec![
create_mock_response_chunk("I'll help you with that. ".to_string(), 0),
create_mock_response_chunk("<tool_call>".to_string(), 0),
create_mock_response_chunk("{\"name\": \"search_web\", ".to_string(), 0),
create_mock_response_chunk(
"\"arguments\": {\"query\": \"weather today\"}}".to_string(),
0,
),
create_mock_response_chunk("</tool_call>".to_string(), 0),
create_mock_response_chunk(" Let me search for that.".to_string(), 0),
];
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder().tool_call_parser("hermes").build();
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert_eq!(
results.len(),
3,
"Should have content, tool call, and trailing content"
);
test_utils::assert_content(&results[0], "I'll help you with that. ");
test_utils::assert_tool_call(
&results[1],
"search_web",
serde_json::json!({"query": "weather today"}),
);
test_utils::assert_content(&results[2], " Let me search for that.");
let reconstructed = test_utils::reconstruct_content(&results);
assert_eq!(
reconstructed,
"I'll help you with that. Let me search for that."
);
}
#[tokio::test]
async fn test_jailed_stream_mistral_parser() {
let chunks = vec![
create_mock_response_chunk("Sure, I can help. ".to_string(), 0),
create_mock_response_chunk("[{".to_string(), 0),
create_mock_response_chunk("\"name\": \"calculate\", ".to_string(), 0),
create_mock_response_chunk("\"arguments\": {\"expression\": \"2+2\"}".to_string(), 0),
create_mock_response_chunk("}]".to_string(), 0),
create_mock_response_chunk(" The calculation is done.".to_string(), 0),
];
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder().tool_call_parser("mistral").build();
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert_eq!(
results.len(),
3,
"Should have content, tool call, and trailing content"
);
test_utils::assert_content(&results[0], "Sure, I can help. ");
test_utils::assert_tool_call(
&results[1],
"calculate",
serde_json::json!({"expression": "2+2"}),
);
test_utils::assert_content(&results[2], " The calculation is done.");
let reconstructed = test_utils::reconstruct_content(&results);
assert_eq!(reconstructed, "Sure, I can help. The calculation is done.");
}
#[tokio::test]
async fn test_jailed_stream_mistral_parser_with_tool_calls_marker() {
let chunks = vec![
create_mock_response_chunk("Let me check that for you. ".to_string(), 0),
create_mock_response_chunk("[TOOL_CALLS]".to_string(), 0),
create_mock_response_chunk("[{\"name\": \"get_time\", ".to_string(), 0),
create_mock_response_chunk("\"arguments\": {\"timezone\": \"UTC\"}}]".to_string(), 0),
create_mock_response_chunk(" Here's the time.".to_string(), 0),
];
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder().tool_call_parser("mistral").build();
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert_eq!(
results.len(),
3,
"Should have content, tool call, and trailing content"
);
test_utils::assert_content(&results[0], "Let me check that for you. ");
test_utils::assert_tool_call(
&results[1],
"get_time",
serde_json::json!({"timezone": "UTC"}),
);
test_utils::assert_content(&results[2], " Here's the time.");
let reconstructed = test_utils::reconstruct_content(&results);
assert_eq!(
reconstructed,
"Let me check that for you. Here's the time."
);
}
#[tokio::test]
async fn test_jailed_stream_phi4_parser() {
let chunks = vec![
create_mock_response_chunk("I'll analyze this data. ".to_string(), 0),
create_mock_response_chunk("functools[".to_string(), 0),
create_mock_response_chunk("{\"name\": \"analyze_data\", ".to_string(), 0),
create_mock_response_chunk(
"\"arguments\": {\"dataset\": \"sales_data\"}}".to_string(),
0,
),
create_mock_response_chunk("]".to_string(), 0),
create_mock_response_chunk(" Analysis complete.".to_string(), 0),
];
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder().tool_call_parser("phi4").build();
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert_eq!(
results.len(),
3,
"Should have content, tool call, and trailing content"
);
test_utils::assert_content(&results[0], "I'll analyze this data. ");
test_utils::assert_tool_call(
&results[1],
"analyze_data",
serde_json::json!({"dataset": "sales_data"}),
);
test_utils::assert_content(&results[2], " Analysis complete.");
let reconstructed = test_utils::reconstruct_content(&results);
assert_eq!(reconstructed, "I'll analyze this data. Analysis complete.");
}
#[tokio::test]
async fn test_jailed_stream_llama3_json_parser() {
let chunks = vec![
create_mock_response_chunk("Let me run some code. ".to_string(), 0),
create_mock_response_chunk("<|python_tag|>".to_string(), 0),
create_mock_response_chunk("{\"name\": \"execute_code\", ".to_string(), 0),
create_mock_response_chunk(
"\"arguments\": {\"code\": \"print('Hello')\"}}".to_string(),
0,
),
create_mock_response_chunk(" Done executing.".to_string(), 0),
];
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder()
.tool_call_parser("llama3_json")
.build();
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert_eq!(
results.len(),
3,
"Should have content, tool call, and trailing content"
);
test_utils::assert_content(&results[0], "Let me run some code. ");
test_utils::assert_tool_call(
&results[1],
"execute_code",
serde_json::json!({"code": "print('Hello')"}),
);
test_utils::assert_content(&results[2], " Done executing.");
let reconstructed = test_utils::reconstruct_content(&results);
assert_eq!(reconstructed, "Let me run some code. Done executing.");
}
#[tokio::test]
async fn test_jailed_stream_false_positive_json() {
let chunks = vec![
create_mock_response_chunk("I can explain JSON format. ".to_string(), 0),
create_mock_response_chunk("Here's an example: { \"key\": \"value\" }".to_string(), 0),
create_mock_response_chunk(" is a simple JSON object. ".to_string(), 0),
create_mock_response_chunk("Hope that helps!".to_string(), 0),
];
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder().tool_call_parser("mistral").build();
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert_eq!(results.len(), 2);
test_utils::assert_content(&results[0], "I can explain JSON format. ");
test_utils::assert_content(
&results[1],
"Here's an example: { \"key\": \"value\" } is a simple JSON object. Hope that helps!",
);
let reconstructed = test_utils::reconstruct_content(&results);
assert_eq!(
reconstructed,
"I can explain JSON format. Here's an example: { \"key\": \"value\" } is a simple JSON object. Hope that helps!"
);
}
#[tokio::test]
async fn test_jailed_stream_malformed_tool_call() {
let chunks = vec![
create_mock_response_chunk("Let me call a function. ".to_string(), 0),
create_mock_response_chunk("<TOOLCALL>".to_string(), 0),
create_mock_response_chunk("[{\"name\": \"broken_func\", ".to_string(), 0),
create_mock_response_chunk("\"arguments\": {\"param\": incomplete".to_string(), 0), create_mock_response_chunk("</TOOLCALL>".to_string(), 0),
create_mock_response_chunk(" Function call attempt finished.".to_string(), 0),
];
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder()
.tool_call_parser("nemotron_deci")
.build();
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert_eq!(
results.len(),
3,
"Should handle malformed JSON gracefully and jail appropriately"
);
test_utils::assert_content(&results[0], "Let me call a function. ");
test_utils::assert_content(
&results[1],
"<TOOLCALL>[{\"name\": \"broken_func\", \"arguments\": {\"param\": incomplete</TOOLCALL>",
);
let reconstructed = test_utils::reconstruct_content(&results);
assert_eq!(
reconstructed,
"Let me call a function. <TOOLCALL>[{\"name\": \"broken_func\", \"arguments\": {\"param\": incomplete</TOOLCALL> Function call attempt finished."
);
}
#[tokio::test]
async fn test_jailed_stream_partial_tool_call() {
let chunks = vec![
create_mock_response_chunk("Starting function call. ".to_string(), 0),
create_mock_response_chunk("<TOOLCALL>".to_string(), 0),
create_mock_response_chunk("[{\"name\": \"incomplete_func\", ".to_string(), 0),
create_mock_response_chunk("\"arguments\": {".to_string(), 0),
];
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder()
.tool_call_parser("nemotron_deci")
.build();
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert_eq!(
results.len(),
2,
"Should handle partial tool call and release content"
);
test_utils::assert_content(&results[0], "Starting function call. ");
test_utils::assert_content(
&results[1],
"<TOOLCALL>[{\"name\": \"incomplete_func\", \"arguments\": {",
);
let reconstructed = test_utils::reconstruct_content(&results);
assert_eq!(
reconstructed,
"Starting function call. <TOOLCALL>[{\"name\": \"incomplete_func\", \"arguments\": {"
);
}
#[tokio::test]
async fn test_jailed_stream_empty_stream() {
let chunks: Vec<Annotated<NvCreateChatCompletionStreamResponse>> = vec![];
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder()
.tool_call_parser("nemotron_deci")
.jail_start_sequence("<jail>")
.jail_end_sequence("</jail>")
.build();
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert_eq!(
results.len(),
0,
"Empty stream should produce exactly 0 results"
);
assert_eq!(
reconstruct_content(&results),
"",
"Empty stream should reconstruct to empty string"
);
}
#[tokio::test]
async fn test_jailed_stream_multiple_tool_calls() {
let chunks = vec![
create_mock_response_chunk("I'll help with multiple tasks. ".to_string(), 0),
create_mock_response_chunk("<TOOLCALL>".to_string(), 0),
create_mock_response_chunk(
"[{\"name\": \"get_weather\", \"arguments\": {\"city\": \"NYC\"}}]".to_string(),
0,
),
create_mock_response_chunk("</TOOLCALL>".to_string(), 0),
create_mock_response_chunk(" Now let me get the time. ".to_string(), 0),
create_mock_response_chunk("<TOOLCALL>".to_string(), 0),
create_mock_response_chunk(
"[{\"name\": \"get_time\", \"arguments\": {\"timezone\": \"EST\"}}]".to_string(),
0,
),
create_mock_response_chunk("</TOOLCALL>".to_string(), 0),
create_mock_response_chunk(" Both tasks completed!".to_string(), 0),
];
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder()
.tool_call_parser("nemotron_deci")
.build();
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert_eq!(
results.len(),
5,
"Should emit exactly 5 chunks as documented above"
);
assert_content(&results[0], "I'll help with multiple tasks. ");
assert_tool_call(&results[1], "get_weather", json!({"city": "NYC"}));
assert_content(&results[2], " Now let me get the time. ");
assert_tool_call(&results[3], "get_time", json!({"timezone": "EST"}));
assert_content(&results[4], " Both tasks completed!");
let expected_content =
"I'll help with multiple tasks. Now let me get the time. Both tasks completed!";
assert_eq!(
reconstruct_content(&results),
expected_content,
"Content reconstruction should exclude tool calls and preserve text flow"
);
}
#[tokio::test]
async fn test_jailed_stream_tool_call_across_many_chunks() {
let chunks = vec![
create_mock_response_chunk("I'll process your request. ".to_string(), 0),
create_mock_response_chunk("<".to_string(), 0),
create_mock_response_chunk("T".to_string(), 0),
create_mock_response_chunk("O".to_string(), 0),
create_mock_response_chunk("O".to_string(), 0),
create_mock_response_chunk("L".to_string(), 0),
create_mock_response_chunk("C".to_string(), 0),
create_mock_response_chunk("A".to_string(), 0),
create_mock_response_chunk("L".to_string(), 0),
create_mock_response_chunk("L".to_string(), 0),
create_mock_response_chunk(">".to_string(), 0),
create_mock_response_chunk("[".to_string(), 0),
create_mock_response_chunk("{".to_string(), 0),
create_mock_response_chunk("\"".to_string(), 0),
create_mock_response_chunk("n".to_string(), 0),
create_mock_response_chunk("a".to_string(), 0),
create_mock_response_chunk("m".to_string(), 0),
create_mock_response_chunk("e".to_string(), 0),
create_mock_response_chunk("\"".to_string(), 0),
create_mock_response_chunk(":".to_string(), 0),
create_mock_response_chunk(" ".to_string(), 0),
create_mock_response_chunk("\"".to_string(), 0),
create_mock_response_chunk("p".to_string(), 0),
create_mock_response_chunk("r".to_string(), 0),
create_mock_response_chunk("o".to_string(), 0),
create_mock_response_chunk("c".to_string(), 0),
create_mock_response_chunk("e".to_string(), 0),
create_mock_response_chunk("s".to_string(), 0),
create_mock_response_chunk("s".to_string(), 0),
create_mock_response_chunk("_".to_string(), 0),
create_mock_response_chunk("d".to_string(), 0),
create_mock_response_chunk("a".to_string(), 0),
create_mock_response_chunk("t".to_string(), 0),
create_mock_response_chunk("a".to_string(), 0),
create_mock_response_chunk("\"".to_string(), 0),
create_mock_response_chunk(",".to_string(), 0),
create_mock_response_chunk(" ".to_string(), 0),
create_mock_response_chunk("\"".to_string(), 0),
create_mock_response_chunk("a".to_string(), 0),
create_mock_response_chunk("r".to_string(), 0),
create_mock_response_chunk("g".to_string(), 0),
create_mock_response_chunk("u".to_string(), 0),
create_mock_response_chunk("m".to_string(), 0),
create_mock_response_chunk("e".to_string(), 0),
create_mock_response_chunk("n".to_string(), 0),
create_mock_response_chunk("t".to_string(), 0),
create_mock_response_chunk("s".to_string(), 0),
create_mock_response_chunk("\"".to_string(), 0),
create_mock_response_chunk(":".to_string(), 0),
create_mock_response_chunk(" ".to_string(), 0),
create_mock_response_chunk("{".to_string(), 0),
create_mock_response_chunk("}".to_string(), 0),
create_mock_response_chunk("}".to_string(), 0),
create_mock_response_chunk("]".to_string(), 0),
create_mock_response_chunk("<".to_string(), 0),
create_mock_response_chunk("/".to_string(), 0),
create_mock_response_chunk("T".to_string(), 0),
create_mock_response_chunk("O".to_string(), 0),
create_mock_response_chunk("O".to_string(), 0),
create_mock_response_chunk("L".to_string(), 0),
create_mock_response_chunk("C".to_string(), 0),
create_mock_response_chunk("A".to_string(), 0),
create_mock_response_chunk("L".to_string(), 0),
create_mock_response_chunk("L".to_string(), 0),
create_mock_response_chunk(">".to_string(), 0),
create_mock_response_chunk(" Processing complete!".to_string(), 0),
];
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder()
.tool_call_parser("nemotron_deci")
.build();
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert_eq!(
results.len(),
3,
"Should consolidate fragments into 3 chunks"
);
test_utils::assert_content(&results[0], "I'll process your request. ");
test_utils::assert_tool_call(&results[1], "process_data", serde_json::json!({}));
test_utils::assert_content(&results[2], " Processing complete!");
let reconstructed = test_utils::reconstruct_content(&results);
assert_eq!(
reconstructed,
"I'll process your request. Processing complete!"
);
}
#[tokio::test]
async fn test_jailed_stream_preserves_metadata() {
let test_id = Some("correlation-id-123".to_string());
let test_event = Some("request-processing".to_string());
let test_comment = Some(vec![
"upstream-correlation".to_string(),
"debug-info".to_string(),
]);
let chunks = vec![
create_annotated_chunk(
"I'll help you with that. ".to_string(),
0,
None, None,
None,
),
create_annotated_chunk(
"<tool_call>".to_string(),
0,
test_id.clone(), test_event.clone(),
test_comment.clone(),
),
create_mock_response_chunk("{\"name\": \"search_web\", ".to_string(), 0),
create_mock_response_chunk("\"arguments\": {\"query\": \"test\"}}".to_string(), 0),
create_mock_response_chunk("</tool_call>".to_string(), 0),
create_mock_response_chunk(" Processing complete.".to_string(), 0),
test_utils::create_final_response_chunk(0), ];
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder().tool_call_parser("hermes").build();
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert!(
results.len() >= 3,
"Should have at least 3 chunks, got {}",
results.len()
);
let tool_call_chunk = results
.iter()
.find(|r| {
r.data
.as_ref()
.and_then(|d| d.inner.choices.first())
.map(|c| c.delta.tool_calls.is_some())
.unwrap_or(false)
})
.expect("Should have a tool call response chunk");
assert_eq!(
tool_call_chunk.id, test_id,
"ID should be preserved from jail trigger chunk"
);
assert_eq!(
tool_call_chunk.event, test_event,
"Event should be preserved from jail trigger chunk"
);
assert_eq!(
tool_call_chunk.comment, test_comment,
"Comment should be preserved from jail trigger chunk"
);
let tool_calls = &tool_call_chunk.data.as_ref().unwrap().inner.choices[0]
.delta
.tool_calls;
assert!(tool_calls.is_some(), "Should have tool calls");
let tool_calls = tool_calls.as_ref().unwrap();
assert_eq!(tool_calls.len(), 1, "Should have exactly one tool call");
assert_eq!(
tool_calls[0]
.function
.as_ref()
.unwrap()
.name
.as_ref()
.unwrap(),
"search_web"
);
}
#[tokio::test]
async fn test_jailed_stream_preserves_metadata_on_stream_end() {
let test_id = Some("end-correlation-456".to_string());
let test_event = Some("stream-termination".to_string());
let test_comment = Some(vec!["incomplete-processing".to_string()]);
let chunks = vec![
create_mock_response_chunk("Starting function call: ".to_string(), 0),
create_annotated_chunk(
"<tool_call>".to_string(), 0,
test_id.clone(),
test_event.clone(),
test_comment.clone(),
),
create_mock_response_chunk(
"{\"name\": \"incomplete_call\"".to_string(), 0,
),
];
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder().tool_call_parser("hermes").build();
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert_eq!(results.len(), 2, "Should have exactly 2 chunks");
let accumulated_chunk = &results[1];
assert_eq!(
accumulated_chunk.id, test_id,
"ID should be preserved when stream ends while jailed"
);
assert_eq!(
accumulated_chunk.event, test_event,
"Event should be preserved when stream ends while jailed"
);
assert_eq!(
accumulated_chunk.comment, test_comment,
"Comment should be preserved when stream ends while jailed"
);
let inner = accumulated_chunk.data.as_ref().unwrap();
assert_eq!(
inner.inner.id, "test-id",
"Inner response id should carry forward from real stream chunks, not be 'stream-end'"
);
assert_eq!(
inner.inner.model, "test-model",
"Inner response model should carry forward from real stream chunks, not be 'unknown'"
);
assert_eq!(
inner.inner.created, 1234567890,
"Inner response created should carry forward from real stream chunks, not be 0"
);
let content = &inner.inner.choices[0].delta.content;
assert!(content.is_some(), "Should have accumulated content");
let content = content.as_ref().unwrap();
assert!(
test_utils::extract_text(content).contains("<tool_call>"),
"Should contain jail start marker in accumulated content"
);
assert!(
test_utils::extract_text(content).contains("incomplete_call"),
"Should contain accumulated incomplete content"
);
}
#[tokio::test]
async fn test_jailed_stream_metadata_edge_cases() {
let chunks = vec![
create_annotated_chunk(
"Text with ".to_string(),
0,
Some("".to_string()), None, Some(vec![]), ),
create_annotated_chunk(
"<tool_call>".to_string(),
0,
None, Some("partial-metadata".to_string()), None, ),
create_mock_response_chunk("{\"name\": \"test\", \"arguments\": {}}".to_string(), 0),
create_mock_response_chunk("</tool_call>".to_string(), 0),
test_utils::create_final_response_chunk(0), ];
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder().tool_call_parser("hermes").build();
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
let tool_call_chunk = results
.iter()
.find(|r| {
r.data
.as_ref()
.and_then(|d| d.inner.choices.first())
.map(|c| c.delta.tool_calls.is_some())
.unwrap_or(false)
})
.expect("Should have a tool call response chunk");
assert_eq!(tool_call_chunk.id, None, "Should preserve None ID");
assert_eq!(
tool_call_chunk.event,
Some("partial-metadata".to_string()),
"Should preserve event"
);
assert_eq!(
tool_call_chunk.comment, None,
"Should preserve None comment"
);
}
#[tokio::test]
async fn test_jailed_stream_trailing_content_same_chunk() {
let chunks = vec![
create_mock_response_chunk("I'll help you. ".to_string(), 0),
create_mock_response_chunk("<tool_call>".to_string(), 0),
create_mock_response_chunk("{\"name\": \"search\", \"arguments\": {}}".to_string(), 0),
create_mock_response_chunk(
"</tool_call>trailing text that should not be lost".to_string(),
0,
),
];
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder().tool_call_parser("hermes").build();
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert_eq!(
results.len(),
3,
"Should emit exactly 3 chunks as documented above"
);
assert_content(&results[0], "I'll help you. ");
assert_tool_call(&results[1], "search", json!({}));
assert_content(&results[2], "trailing text that should not be lost");
let expected_content = "I'll help you. trailing text that should not be lost";
assert_eq!(
reconstruct_content(&results),
expected_content,
"Content reconstruction should preserve initial and trailing text"
);
}
#[tokio::test]
async fn test_jailed_stream_early_exit_with_trailing() {
let chunks = vec![
create_mock_response_chunk("Starting task: ".to_string(), 0),
create_mock_response_chunk(
"<tool_call>{\"name\": \"complete_task\", \"arguments\": {}}".to_string(),
0,
),
create_mock_response_chunk("</tool_call> Task completed successfully.".to_string(), 0),
];
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder().tool_call_parser("hermes").build();
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert_eq!(
results.len(),
3,
"Should have content, tool call, and trailing content"
);
test_utils::assert_content(&results[0], "Starting task: ");
test_utils::assert_tool_call(&results[1], "complete_task", serde_json::json!({}));
test_utils::assert_content(&results[2], " Task completed successfully.");
let reconstructed = test_utils::reconstruct_content(&results);
assert_eq!(
reconstructed,
"Starting task: Task completed successfully."
);
}
#[tokio::test]
async fn test_multiple_choices_independent_jailing() {
let chunks = vec![
create_multi_choice_chunk(vec![
("Starting task A. ".to_string(), 0),
("Starting task B. ".to_string(), 1),
("Starting task C. ".to_string(), 2),
]),
create_multi_choice_chunk(vec![
("<tool_call>".to_string(), 0), ("Continuing B. ".to_string(), 1), ("Continuing C. ".to_string(), 2), ]),
create_multi_choice_chunk(vec![
("{\"name\": \"tool_a\"".to_string(), 0), ("More B content. ".to_string(), 1), ("<tool_call>".to_string(), 2), ]),
create_multi_choice_chunk(vec![
(", \"arguments\": {}}</tool_call>".to_string(), 0), ("Final B. ".to_string(), 1), ("{\"name\": \"tool_c\", \"arguments\": {}}".to_string(), 2), ]),
create_multi_choice_chunk(vec![
("After tool A. ".to_string(), 0), ("Done with B. ".to_string(), 1), ("</tool_call>".to_string(), 2), ]),
test_utils::create_multi_choice_finish_chunk(vec![0, 1, 2]),
];
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder().tool_call_parser("hermes").build();
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
let choice_1_chunks: Vec<_> = results
.iter()
.filter_map(|r| r.data.as_ref())
.flat_map(|d| &d.inner.choices)
.filter(|c| c.index == 1 && c.delta.content.is_some())
.collect();
assert!(
choice_1_chunks.len() >= 4,
"Choice 1 should have multiple continuous chunks, got {}",
choice_1_chunks.len()
);
let choice_0_tool_calls: Vec<_> = results
.iter()
.filter_map(|r| r.data.as_ref())
.flat_map(|d| &d.inner.choices)
.filter(|c| c.index == 0 && c.finish_reason == Some(FinishReason::ToolCalls))
.collect();
assert!(
!choice_0_tool_calls.is_empty(),
"Choice 0 should have tool call response"
);
let choice_2_tool_calls: Vec<_> = results
.iter()
.filter_map(|r| r.data.as_ref())
.flat_map(|d| &d.inner.choices)
.filter(|c| c.index == 2 && c.finish_reason == Some(FinishReason::ToolCalls))
.collect();
assert!(
!choice_2_tool_calls.is_empty(),
"Choice 2 should have tool call response"
);
}
#[tokio::test]
async fn test_deterministic_choice_ordering() {
let chunks = vec![
create_multi_choice_chunk(vec![
(
"<tool_call>{\"name\": \"tool_0\", \"arguments\": {}}</tool_call>".to_string(),
0,
),
(
"<tool_call>{\"name\": \"tool_1\", \"arguments\": {}}</tool_call>".to_string(),
1,
),
(
"<tool_call>{\"name\": \"tool_2\", \"arguments\": {}}</tool_call>".to_string(),
2,
),
]),
test_utils::create_multi_choice_finish_chunk(vec![0, 1, 2]),
];
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder().tool_call_parser("hermes").build();
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
let mut tool_call_responses: Vec<_> = results
.iter()
.filter_map(|r| r.data.as_ref())
.flat_map(|d| &d.inner.choices)
.filter(|c| c.finish_reason == Some(FinishReason::ToolCalls))
.collect();
tool_call_responses.sort_by_key(|c| c.index);
assert_eq!(
tool_call_responses.len(),
3,
"Should have 3 tool call responses"
);
for run in 0..5 {
let chunks = vec![
create_multi_choice_chunk(vec![
(
"<tool_call>{\"name\": \"tool_0\", \"arguments\": {}}</tool_call>"
.to_string(),
0,
),
(
"<tool_call>{\"name\": \"tool_1\", \"arguments\": {}}</tool_call>"
.to_string(),
1,
),
(
"<tool_call>{\"name\": \"tool_2\", \"arguments\": {}}</tool_call>"
.to_string(),
2,
),
]),
test_utils::create_multi_choice_finish_chunk(vec![0, 1, 2]),
];
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder().tool_call_parser("hermes").build();
let run_results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
let run_responses: Vec<_> = run_results
.iter()
.filter_map(|r| r.data.as_ref())
.flat_map(|d| &d.inner.choices)
.filter(|c| c.finish_reason == Some(FinishReason::ToolCalls))
.collect();
let indices: Vec<u32> = run_responses.iter().map(|c| c.index).collect();
assert_eq!(
indices,
vec![0, 1, 2],
"Choice processing order should be deterministic on run {}",
run
);
}
}
#[tokio::test]
async fn test_usage_chunk_preserved() {
let content_chunk = create_mock_response_chunk("Hello, world!".to_string(), 0);
let mut usage_chunk = content_chunk.clone();
if let Some(ref mut data) = usage_chunk.data {
data.inner.choices.clear();
data.inner.usage = Some(CompletionUsage {
prompt_tokens: 11,
completion_tokens: 3,
total_tokens: 14,
prompt_tokens_details: None,
completion_tokens_details: None,
});
}
let input_chunks = vec![content_chunk, usage_chunk];
let input_stream = stream::iter(input_chunks);
let jail = JailedStream::builder().build();
let results: Vec<_> = jail.apply(input_stream).collect().await;
assert_eq!(results.len(), 2, "Should have exactly 2 chunks");
let content = results[0].data.as_ref().unwrap().inner.choices[0]
.delta
.content
.as_ref()
.unwrap();
assert_eq!(
extract_text(content),
"Hello, world!",
"Content chunk should have 'Hello, world!'"
);
assert!(
results[1].data.as_ref().unwrap().inner.choices.is_empty(),
"Usage chunk should have no choices"
);
let usage = results[1]
.data
.as_ref()
.unwrap()
.inner
.usage
.as_ref()
.unwrap();
assert_eq!(usage.prompt_tokens, 11);
assert_eq!(usage.completion_tokens, 3);
assert_eq!(usage.total_tokens, 14);
}
#[tokio::test]
async fn test_multiple_choices_usage_aggregation() {
let chunks = vec![create_multi_choice_chunk(vec![
("Response A with many tokens".to_string(), 0), ("Response B".to_string(), 1), ("Response C has even more tokens than A".to_string(), 2), ])];
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder().build();
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert!(!results.is_empty(), "Should have some results");
}
#[tokio::test]
async fn test_partial_matching_false_positive_prevention() {
let chunks = vec![
create_mock_response_chunk("n ".to_string(), 0),
create_mock_response_chunk("<".to_string(), 0),
create_mock_response_chunk(" 5".to_string(), 0),
];
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder()
.tool_call_parser("nemotron_deci")
.build();
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert_eq!(
results.len(),
2,
"Should emit exactly 2 chunks: 'n ' and '< 5'"
);
assert_content(&results[0], "n ");
assert_content(&results[1], "< 5");
for (i, result) in results.iter().enumerate() {
assert!(
!has_tool_call(result),
"Chunk {} should not contain tool calls in mathematical expression",
i
);
}
assert_eq!(
reconstruct_content(&results),
"n < 5",
"Content reconstruction should preserve the complete mathematical expression"
);
}
#[tokio::test]
async fn test_partial_matching_suffix_detection() {
let chunks = vec![
create_mock_response_chunk("text<TO".to_string(), 0),
create_mock_response_chunk(
"OLCALL>[{\"name\": \"test\", \"arguments\": {}}]</TOOLCALL>".to_string(),
0,
),
];
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder()
.tool_call_parser("nemotron_deci")
.jail_end_sequence("</TOOLCALL>")
.build();
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert_eq!(
results.len(),
2,
"Should emit exactly 2 chunks: [0] 'text' content, [1] tool call"
);
assert_content(&results[0], "text");
assert_tool_call(&results[1], "test", json!({}));
let first_content = extract_content(&results[0]);
assert!(
!first_content.contains('<'),
"First chunk should not contain '<' as it's part of partial match '<TO'"
);
assert_eq!(
reconstruct_content(&results),
"text",
"Content reconstruction should only include 'text' (tool call parsed separately)"
);
}
#[tokio::test]
async fn test_jailed_stream_harmony_parser() {
let chunks = vec![
create_mock_response_chunk(
"<|channel|>analysis<|message|>Need to use function get_current_weather.<|end|>"
.to_string(),
0,
),
create_mock_response_chunk("<|start|>".to_string(), 0),
create_mock_response_chunk("assistant".to_string(), 0),
create_mock_response_chunk("<|channel|>".to_string(), 0),
create_mock_response_chunk(
"commentary to=functions.get_current_weather <|constrain|>json".to_string(),
0,
),
create_mock_response_chunk(
"<|message|>{\"location\":\"San Francisco\"}<|call|>".to_string(),
0,
),
];
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder().tool_call_parser("harmony").build();
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert!(!results.is_empty());
let has_analysis_text = results.iter().any(|r| {
r.data
.as_ref()
.and_then(|d| d.inner.choices.first())
.and_then(|c| c.delta.content.as_ref())
.map(|content| {
test_utils::extract_text(content)
.contains("Need to use function get_current_weather.")
})
.unwrap_or(false)
});
assert!(has_analysis_text, "Should contain extracted analysis text");
let tool_call_idx = results
.iter()
.position(test_utils::has_tool_call)
.expect("Should have a tool call result");
test_utils::assert_tool_call(
&results[tool_call_idx],
"get_current_weather",
json!({"location": "San Francisco"}),
);
}
#[tokio::test]
async fn test_deepseek_v3_1() {
let text = r#"<|tool▁calls▁begin|><|tool▁call▁begin|>get_current_weather<|tool▁sep|>{"location": "Berlin", "units": "metric"}<|tool▁call▁end|><|tool▁call▁begin|>get_weather_forecast<|tool▁sep|>{"location": "Berlin", "days": 7, "units": "imperial"}<|tool▁call▁end|><|tool▁call▁begin|>get_air_quality<|tool▁sep|>{"location": "Berlin", "radius": 50}<|tool▁call▁end|><|tool▁calls▁end|><|end▁of▁sentence|>"#;
let chunks = vec![create_mock_response_chunk(text.to_string(), 0)];
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder()
.tool_call_parser("deepseek_v3_1")
.build();
let jailed_stream = jail.apply_with_finish_reason(input_stream);
let results: Vec<_> = jailed_stream.collect().await;
assert!(!results.is_empty());
let tool_call_idx = results
.iter()
.position(test_utils::has_tool_call)
.expect("Should have a tool call result");
test_utils::assert_tool_call(
&results[tool_call_idx],
"get_current_weather",
json!({"location": "Berlin", "units": "metric"}),
);
for result in results {
let Some(data) = result.data else {
continue;
};
for choice in data.inner.choices {
if let Some(content) = choice.delta.content {
assert!(
!test_utils::extract_text(&content).contains("<|tool▁calls▁end|>"),
"Should not contain deepseek special tokens in content"
);
}
}
}
}
#[tokio::test]
async fn test_deepseek_v3_1_chunk() {
let text = r#"<|tool▁calls▁begin|><|tool▁call▁begin|>get_current_weather<|tool▁sep|>{"location": "Berlin", "units": "metric"}<|tool▁call▁end|><|tool▁call▁begin|>get_weather_forecast<|tool▁sep|>{"location": "Berlin", "days": 7, "units": "imperial"}<|tool▁call▁end|><|tool▁call▁begin|>get_air_quality<|tool▁sep|>{"location": "Berlin", "radius": 50}<|tool▁call▁end|><|tool▁calls▁end|><|end▁of▁sentence|>"#;
let mut words = Vec::new();
let mut i = 0;
let chars: Vec<char> = text.chars().collect();
while i < chars.len() {
if chars[i] == '<' {
if let Some(end) = chars[i..].iter().position(|&c| c == '>') {
let word: String = chars[i..=i + end].iter().collect();
words.push(word);
i += end + 1;
} else {
words.push(chars[i..].iter().collect());
break;
}
} else if chars[i].is_whitespace() {
i += 1;
} else {
let start = i;
while i < chars.len() && !chars[i].is_whitespace() && chars[i] != '<' {
i += 1;
}
words.push(chars[start..i].iter().collect());
}
}
let chunks = words
.into_iter()
.map(|word| create_mock_response_chunk(word, 0))
.collect::<Vec<_>>();
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder()
.tool_call_parser("deepseek_v3_1")
.build();
let jailed_stream = jail.apply_with_finish_reason(input_stream);
let results: Vec<_> = jailed_stream.collect().await;
assert!(!results.is_empty());
let tool_call_idx = results
.iter()
.position(test_utils::has_tool_call)
.expect("Should have a tool call result");
test_utils::assert_tool_call(
&results[tool_call_idx],
"get_current_weather",
json!({"location": "Berlin", "units": "metric"}),
);
for result in results {
let Some(data) = result.data else {
continue;
};
for choice in data.inner.choices {
if let Some(content) = choice.delta.content {
assert!(
!test_utils::extract_text(&content).contains("<|tool▁calls▁end|>"),
"Should not contain deepseek special tokens in content"
);
}
}
}
}
#[tokio::test]
async fn test_jailed_stream_qwen3_coder_parser() {
let chunks = vec![
create_mock_response_chunk("I'll call a function. ".to_string(), 0),
create_mock_response_chunk("<tool_call>".to_string(), 0),
create_mock_response_chunk("<function=get_weather>".to_string(), 0),
create_mock_response_chunk(
"<parameter=location>San Francisco</parameter>".to_string(),
0,
),
create_mock_response_chunk("<parameter=unit>celsius</parameter>".to_string(), 0),
create_mock_response_chunk("</function>".to_string(), 0),
create_mock_response_chunk("</tool_call>".to_string(), 0),
create_mock_response_chunk(" Done.".to_string(), 0),
];
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder()
.tool_call_parser("qwen3_coder")
.build();
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert_eq!(
results.len(),
3,
"Should have content, tool call, and trailing content"
);
test_utils::assert_content(&results[0], "I'll call a function. ");
test_utils::assert_tool_call(
&results[1],
"get_weather",
serde_json::json!({"location": "San Francisco", "unit": "celsius"}),
);
test_utils::assert_content(&results[2], " Done.");
let reconstructed = test_utils::reconstruct_content(&results);
assert_eq!(reconstructed, "I'll call a function. Done.");
}
#[tokio::test]
async fn test_jailed_stream_qwen3_coder_multiple_params() {
use dynamo_parsers::tool_calling::ToolDefinition;
let chunks = vec![
create_mock_response_chunk("Let me search for that. ".to_string(), 0),
create_mock_response_chunk(
"<tool_call><function=web_search><parameter=query>Rust programming</parameter><parameter=max_results>10</parameter><parameter=filter>recent</parameter></function></tool_call>".to_string(),
0,
),
create_mock_response_chunk(" Searching now.".to_string(), 0),
];
let tool_defs = vec![ToolDefinition {
name: "web_search".to_string(),
parameters: Some(serde_json::json!({
"type": "object",
"properties": {
"query": {"type": "string"},
"max_results": {"type": "integer"},
"filter": {"type": "string"},
},
})),
}];
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder()
.tool_call_parser("qwen3_coder")
.tool_definitions(tool_defs)
.build();
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert_eq!(results.len(), 3, "Should have 3 chunks");
test_utils::assert_content(&results[0], "Let me search for that. ");
test_utils::assert_tool_call(
&results[1],
"web_search",
serde_json::json!({
"query": "Rust programming",
"max_results": 10,
"filter": "recent"
}),
);
test_utils::assert_content(&results[2], " Searching now.");
}
#[tokio::test]
async fn test_jailed_stream_xml_parser_config_tokens_auto_population() {
let chunks = vec![
create_mock_response_chunk("Before tool call. ".to_string(), 0),
create_mock_response_chunk("<tool_call>".to_string(), 0), create_mock_response_chunk("<function=get_weather>".to_string(), 0),
create_mock_response_chunk("<parameter=city>Seattle</parameter>".to_string(), 0),
create_mock_response_chunk("</function>".to_string(), 0),
create_mock_response_chunk("</tool_call>".to_string(), 0), create_mock_response_chunk(" After tool call.".to_string(), 0),
];
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder()
.tool_call_parser("qwen3_coder")
.build();
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert_eq!(
results.len(),
3,
"Should have content, tool call, and trailing content"
);
test_utils::assert_content(&results[0], "Before tool call. ");
test_utils::assert_tool_call(
&results[1],
"get_weather",
serde_json::json!({"city": "Seattle"}),
);
test_utils::assert_content(&results[2], " After tool call.");
let reconstructed = test_utils::reconstruct_content(&results);
assert_eq!(reconstructed, "Before tool call. After tool call.");
}
#[tokio::test]
async fn test_jailed_stream_xml_manual_sequences_prevent_auto_population() {
let chunks = vec![
create_mock_response_chunk("Text with ".to_string(), 0),
create_mock_response_chunk("<tool_call>".to_string(), 0),
create_mock_response_chunk("should not jail".to_string(), 0),
create_mock_response_chunk("</tool_call>".to_string(), 0),
create_mock_response_chunk(" because custom ".to_string(), 0),
create_mock_response_chunk("[[START]]".to_string(), 0),
create_mock_response_chunk("jailed content".to_string(), 0),
create_mock_response_chunk("[[END]]".to_string(), 0),
create_mock_response_chunk(" text.".to_string(), 0),
];
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder()
.jail_start_sequence("[[START]]")
.jail_end_sequence("[[END]]")
.tool_call_parser("qwen3_coder")
.build();
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
let default_token_chunks: Vec<_> = results
.iter()
.filter_map(|r| {
r.data
.as_ref()
.and_then(|d| d.inner.choices.first())
.and_then(|c| c.delta.content.as_ref())
})
.filter(|content| {
test_utils::extract_text(content).contains("<tool_call>")
|| test_utils::extract_text(content).contains("should not jail")
})
.collect();
assert!(
!default_token_chunks.is_empty(),
"Default <tool_call> should pass through as content when manual sequences are set"
);
let jailed_chunk = results
.iter()
.filter_map(|r| {
r.data
.as_ref()
.and_then(|d| d.inner.choices.first())
.and_then(|c| c.delta.content.as_ref())
})
.find(|content| {
test_utils::extract_text(content).contains("[[START]]")
&& test_utils::extract_text(content).contains("jailed content")
});
assert!(
jailed_chunk.is_some(),
"Custom markers should trigger jailing and accumulated content should be released"
);
let tool_call_count = results
.iter()
.filter(|r| {
r.data
.as_ref()
.and_then(|d| d.inner.choices.first())
.and_then(|c| c.delta.tool_calls.as_ref())
.map(|tc| !tc.is_empty())
.unwrap_or(false)
})
.count();
assert_eq!(
tool_call_count, 0,
"Should have 0 tool calls because jailed content doesn't match XML format"
);
let reconstructed = test_utils::reconstruct_content(&results);
assert!(
reconstructed.contains("<tool_call>") && reconstructed.contains("should not jail"),
"Reconstructed content should include default tokens that passed through"
);
assert!(
reconstructed.contains("[[START]]") && reconstructed.contains("jailed content"),
"Reconstructed content should include jailed content with custom markers"
);
}
#[tokio::test]
async fn test_jailed_stream_mistral_false_positive_curly() {
let chunks = vec![
create_mock_response_chunk("Hey How".to_string(), 0),
create_mock_response_chunk("are { you? ".to_string(), 0),
create_final_response_chunk(0),
];
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder().tool_call_parser("mistral").build();
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert!(results.len() >= 2);
assert_content(&results[0], "Hey How");
assert!(
results.iter().any(|r| extract_content(r) == "are { you? "),
"Should preserve the literal text with curly brace"
);
for (i, r) in results.iter().enumerate() {
assert!(
!has_tool_call(r),
"Result {} should not contain tool calls for false-positive text",
i
);
}
}
#[tokio::test]
#[ignore]
async fn test_jailed_stream_mistral_false_positive_then_tool_calls_marker() {
let chunks = vec![
create_mock_response_chunk("Hey How".to_string(), 0),
create_mock_response_chunk("are { you? ".to_string(), 0),
create_mock_response_chunk("[TOOL_CALLS]".to_string(), 0),
create_mock_response_chunk(
"[{\"name\": \"get_weather\", \"arguments\": {\"location\": \"San Francisco\", \"unit\": \"fahrenheit\"}}]"
.to_string(),
0,
),
];
let input_stream = stream::iter(chunks);
let jail = JailedStream::builder().tool_call_parser("mistral").build();
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert!(results.len() >= 2);
assert!(
results.iter().any(|r| extract_content(r) == "Hey How"),
"Should include initial content"
);
assert!(
results.iter().any(|r| extract_content(r) == "{ you? "),
"Should include content preceding the marker"
);
let tool_call_idx = results
.iter()
.position(test_utils::has_tool_call)
.expect("Should have a tool call result");
test_utils::assert_tool_call(
&results[tool_call_idx],
"get_weather",
json!({"location": "San Francisco", "unit": "fahrenheit"}),
);
}
}
#[cfg(test)]
mod parallel_jail_tests {
use super::tests::test_utils;
use super::*;
use dynamo_protocols::types::ChatCompletionMessageContent;
use futures::StreamExt;
use futures::stream;
use serde_json::json;
fn create_multi_choice_response_chunk(
contents: Vec<String>,
) -> Annotated<NvCreateChatCompletionStreamResponse> {
let choices: Vec<ChatChoiceStream> = contents
.into_iter()
.enumerate()
.map(|(i, content)| {
#[allow(deprecated)]
ChatChoiceStream {
index: i as u32,
delta: ChatCompletionStreamResponseDelta {
role: Some(Role::Assistant),
content: Some(ChatCompletionMessageContent::Text(content)),
tool_calls: None,
function_call: None,
refusal: None,
reasoning_content: None,
},
finish_reason: None,
stop_reason: None,
logprobs: None,
}
})
.collect();
let response = NvCreateChatCompletionStreamResponse {
inner: dynamo_protocols::types::CreateChatCompletionStreamResponse {
id: "test-id".to_string(),
choices,
created: 1234567890,
model: "test-model".to_string(),
system_fingerprint: Some("test-fingerprint".to_string()),
object: "chat.completion.chunk".to_string(),
usage: None,
service_tier: None,
},
nvext: None,
};
Annotated {
data: Some(response),
id: None,
event: None,
comment: None,
error: None,
}
}
fn validate_parallel_streaming_tool_calls(
results: &[Annotated<NvCreateChatCompletionStreamResponse>],
expected_tool_calls: &[(&str, serde_json::Value)],
) {
let tool_call_results: Vec<_> = results
.iter()
.filter(|r| {
r.data
.as_ref()
.is_some_and(|d| d.inner.choices.iter().any(|c| c.delta.tool_calls.is_some()))
})
.collect();
assert!(
!tool_call_results.is_empty(),
"Should have at least one tool call result"
);
let mut all_tool_calls = Vec::new();
for result in &tool_call_results {
if let Some(ref data) = result.data {
for choice in &data.inner.choices {
if let Some(ref tool_calls) = choice.delta.tool_calls {
all_tool_calls.extend(tool_calls.iter());
}
}
}
}
assert_eq!(
all_tool_calls.len(),
expected_tool_calls.len(),
"Expected {} tool calls, got {}",
expected_tool_calls.len(),
all_tool_calls.len()
);
for (i, (expected_name, expected_args)) in expected_tool_calls.iter().enumerate() {
let tool_call = &all_tool_calls[i];
assert!(tool_call.id.is_some(), "Tool call {} should have an ID", i);
assert_eq!(
tool_call.index, i as u32,
"Tool call {} should have index {}, got {}",
i, i, tool_call.index
);
assert_eq!(
tool_call.r#type,
Some(dynamo_protocols::types::FunctionType::Function),
"Tool call {} should be of type 'function'",
i
);
if let Some(ref function) = tool_call.function {
assert_eq!(
function.name.as_deref(),
Some(*expected_name),
"Tool call {} name should be {}",
i,
expected_name
);
if let Some(ref args_str) = function.arguments {
let parsed_args: serde_json::Value =
serde_json::from_str(args_str).expect("Arguments should be valid JSON");
assert_eq!(
parsed_args, *expected_args,
"Tool call {} arguments should match expected",
i
);
}
}
}
}
#[tokio::test]
async fn test_parallel_tool_calls_single_chunk_nemotron() {
let jail = JailedStream::builder()
.tool_call_parser("nemotron_deci")
.build();
let input_chunks = vec![
test_utils::create_mock_response_chunk(
r#"<TOOLCALL>[
{"name": "get_current_weather", "arguments": {"city": "Dallas", "state": "TX", "unit": "fahrenheit"}},
{"name": "get_current_weather", "arguments": {"city": "Orlando", "state": "FL", "unit": "fahrenheit"}}
]</TOOLCALL>"#.to_string(),
0,
),
];
let input_stream = stream::iter(input_chunks);
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert!(!results.is_empty(), "Should have results");
let expected_calls = [
(
"get_current_weather",
json!({"city": "Dallas", "state": "TX", "unit": "fahrenheit"}),
),
(
"get_current_weather",
json!({"city": "Orlando", "state": "FL", "unit": "fahrenheit"}),
),
];
validate_parallel_streaming_tool_calls(&results, &expected_calls);
}
#[tokio::test]
async fn test_parallel_tool_calls_single_chunk_mistral() {
let jail = JailedStream::builder().tool_call_parser("mistral").build();
let input_chunks = vec![
test_utils::create_mock_response_chunk(
r#"[TOOL_CALLS][{"name": "get_current_weather", "arguments": {"city": "Dallas", "state": "TX", "unit": "fahrenheit"}}, {"name": "get_current_weather", "arguments": {"city": "Orlando", "state": "FL", "unit": "fahrenheit"}}][/TOOL_CALLS]"#.to_string(),
0,
),
];
let input_stream = stream::iter(input_chunks);
let results: Vec<_> = jail.apply(input_stream).collect().await;
assert!(!results.is_empty(), "Should have results");
let expected_calls = [
(
"get_current_weather",
json!({"city": "Dallas", "state": "TX", "unit": "fahrenheit"}),
),
(
"get_current_weather",
json!({"city": "Orlando", "state": "FL", "unit": "fahrenheit"}),
),
];
validate_parallel_streaming_tool_calls(&results, &expected_calls);
}
#[tokio::test]
async fn test_parallel_tool_calls_single_chunk_hermes() {
let jail = JailedStream::builder().tool_call_parser("hermes").build();
let input_chunks = vec![test_utils::create_mock_response_chunk(
"<tool_call>\n\
{\"name\": \"get_current_weather\", \"arguments\": {\"city\": \"Dallas\", \"state\": \"TX\"}}\n\
</tool_call>\n\
<tool_call>\n\
{\"name\": \"get_current_weather\", \"arguments\": {\"city\": \"Orlando\", \"state\": \"FL\"}}\n\
</tool_call>"
.to_string(),
0,
)];
let input_stream = stream::iter(input_chunks);
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert!(!results.is_empty(), "Should have results");
let expected_calls = [
(
"get_current_weather",
json!({"city": "Dallas", "state": "TX"}),
),
(
"get_current_weather",
json!({"city": "Orlando", "state": "FL"}),
),
];
validate_parallel_streaming_tool_calls(&results, &expected_calls);
for result in &results {
if let Some(ref data) = result.data {
for choice in &data.inner.choices {
if let Some(ref content) = choice.delta.content {
let text = test_utils::extract_text(content);
assert!(
!text.contains("<tool_call>"),
"Raw XML must not leak as text content, got: {text:?}"
);
}
}
}
}
}
#[tokio::test]
async fn test_parallel_tool_calls_single_chunk_qwen3_coder() {
let jail = JailedStream::builder()
.tool_call_parser("qwen3_coder")
.build();
let input_chunks = vec![test_utils::create_mock_response_chunk(
"<tool_call>\n\
<function=search>\n\
<parameter=query>Rust async</parameter>\n\
</function>\n\
</tool_call>\n\
<tool_call>\n\
<function=search>\n\
<parameter=query>Python async</parameter>\n\
</function>\n\
</tool_call>"
.to_string(),
0,
)];
let input_stream = stream::iter(input_chunks);
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert!(!results.is_empty(), "Should have results");
let expected_calls = [
("search", json!({"query": "Rust async"})),
("search", json!({"query": "Python async"})),
];
validate_parallel_streaming_tool_calls(&results, &expected_calls);
}
#[tokio::test]
async fn test_parallel_tool_calls_streaming_chunks() {
let jail = JailedStream::builder()
.tool_call_parser("nemotron_deci")
.build();
let input_chunks = vec![
test_utils::create_mock_response_chunk("<TOOLCALL>[".to_string(), 0),
test_utils::create_mock_response_chunk(
r#" {"name": "get_current_weather", "arguments": {"city": "Dallas", "state": "TX", "unit": "fahrenheit"}},"#.to_string(),
0,
),
test_utils::create_mock_response_chunk(
r#" {"name": "get_current_weather", "arguments": {"city": "Orlando", "state": "FL", "unit": "fahrenheit"}}"#.to_string(),
0,
),
test_utils::create_mock_response_chunk("]</TOOLCALL>".to_string(), 0),
];
let input_stream = stream::iter(input_chunks);
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert!(!results.is_empty(), "Should have results");
let expected_calls = [
(
"get_current_weather",
json!({"city": "Dallas", "state": "TX", "unit": "fahrenheit"}),
),
(
"get_current_weather",
json!({"city": "Orlando", "state": "FL", "unit": "fahrenheit"}),
),
];
validate_parallel_streaming_tool_calls(&results, &expected_calls);
}
#[tokio::test]
async fn test_parallel_tool_calls_with_normal_text_before_and_after() {
let jail = JailedStream::builder()
.tool_call_parser("nemotron_deci")
.build();
let input_chunks = vec![
test_utils::create_mock_response_chunk("I'll check the weather for both cities. ".to_string(), 0),
test_utils::create_mock_response_chunk(
r#"<TOOLCALL>[
{"name": "get_current_weather", "arguments": {"city": "Dallas", "state": "TX", "unit": "fahrenheit"}},
{"name": "get_current_weather", "arguments": {"city": "Orlando", "state": "FL", "unit": "fahrenheit"}}
]</TOOLCALL>"#.to_string(),
0,
),
test_utils::create_mock_response_chunk(" Let me get that information for you.".to_string(), 0),
];
let input_stream = stream::iter(input_chunks);
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert!(!results.is_empty(), "Should have results");
let normal_text_before = results.iter().find(|r| {
r.data.as_ref().is_some_and(|d| {
d.inner.choices.iter().any(|c| {
c.delta.content.as_ref().is_some_and(|content| {
test_utils::extract_text(content).contains("I'll check the weather")
})
})
})
});
assert!(
normal_text_before.is_some(),
"Should have normal text before tool calls"
);
let expected_calls = [
(
"get_current_weather",
json!({"city": "Dallas", "state": "TX", "unit": "fahrenheit"}),
),
(
"get_current_weather",
json!({"city": "Orlando", "state": "FL", "unit": "fahrenheit"}),
),
];
validate_parallel_streaming_tool_calls(&results, &expected_calls);
let normal_text_after = results.iter().find(|r| {
r.data.as_ref().is_some_and(|d| {
d.inner.choices.iter().any(|c| {
c.delta.content.as_ref().is_some_and(|content| {
test_utils::extract_text(content).contains("Let me get that information")
})
})
})
});
assert!(
normal_text_after.is_some(),
"Should have normal text after tool calls"
);
}
#[tokio::test]
async fn test_multiple_choices_with_parallel_tool_calls() {
let jail = JailedStream::builder()
.tool_call_parser("nemotron_deci")
.emission_mode(dynamo_llm::protocols::openai::chat_completions::jail::EmissionMode::SingleChoicePerChunk)
.build();
let input_chunks = vec![
create_multi_choice_response_chunk(vec![
r#"<TOOLCALL>[{"name": "get_current_weather", "arguments": {"city": "Dallas", "state": "TX", "unit": "fahrenheit"}}]</TOOLCALL>"#.to_string(),
r#"<TOOLCALL>[{"name": "get_current_weather", "arguments": {"city": "Orlando", "state": "FL", "unit": "fahrenheit"}}]</TOOLCALL>"#.to_string(),
]),
];
let input_stream = stream::iter(input_chunks);
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert!(!results.is_empty(), "Should have results");
let tool_call_count = results
.iter()
.map(|r| {
r.data.as_ref().map_or(0, |d| {
d.inner
.choices
.iter()
.map(|c| c.delta.tool_calls.as_ref().map_or(0, |tc| tc.len()))
.sum::<usize>()
})
})
.sum::<usize>();
assert!(
tool_call_count >= 2,
"Should have at least 2 tool calls from different choices"
);
}
#[tokio::test]
async fn test_parallel_mixed_tool_types_streaming() {
let jail = JailedStream::builder()
.tool_call_parser("nemotron_deci")
.build();
let input_chunks = vec![
test_utils::create_mock_response_chunk(
r#"<TOOLCALL>[
{"name": "get_current_weather", "arguments": {"city": "Dallas", "state": "TX", "unit": "fahrenheit"}},
{"name": "web_search", "arguments": {"query": "Orlando Florida attractions", "max_results": 5}},
{"name": "get_user_location", "arguments": {"ip_address": "192.168.1.1"}}
]</TOOLCALL>"#.to_string(),
0,
),
];
let input_stream = stream::iter(input_chunks);
let results: Vec<_> = jail.apply(input_stream).collect().await;
assert!(!results.is_empty(), "Should have results");
let expected_calls = [
(
"get_current_weather",
json!({"city": "Dallas", "state": "TX", "unit": "fahrenheit"}),
),
(
"web_search",
json!({"query": "Orlando Florida attractions", "max_results": 5}),
),
("get_user_location", json!({"ip_address": "192.168.1.1"})),
];
validate_parallel_streaming_tool_calls(&results, &expected_calls);
}
#[tokio::test]
async fn test_large_scale_parallel_tool_calls() {
let jail = JailedStream::builder()
.tool_call_parser("nemotron_deci")
.build();
let input_chunks = vec![
test_utils::create_mock_response_chunk(
r#"<TOOLCALL>[
{"name": "get_current_weather", "arguments": {"city": "Dallas", "state": "TX", "unit": "fahrenheit"}},
{"name": "get_current_weather", "arguments": {"city": "Orlando", "state": "FL", "unit": "fahrenheit"}},
{"name": "get_current_weather", "arguments": {"city": "Seattle", "state": "WA", "unit": "fahrenheit"}},
{"name": "get_current_weather", "arguments": {"city": "Denver", "state": "CO", "unit": "fahrenheit"}},
{"name": "get_current_weather", "arguments": {"city": "Miami", "state": "FL", "unit": "fahrenheit"}},
{"name": "get_current_weather", "arguments": {"city": "Phoenix", "state": "AZ", "unit": "fahrenheit"}},
{"name": "get_current_weather", "arguments": {"city": "Chicago", "state": "IL", "unit": "fahrenheit"}}
]</TOOLCALL>"#.to_string(),
0,
),
];
let input_stream = stream::iter(input_chunks);
let results: Vec<_> = jail.apply(input_stream).collect().await;
assert!(!results.is_empty(), "Should have results");
let tool_call_count = results
.iter()
.map(|r| {
r.data.as_ref().map_or(0, |d| {
d.inner
.choices
.iter()
.map(|c| c.delta.tool_calls.as_ref().map_or(0, |tc| tc.len()))
.sum::<usize>()
})
})
.sum::<usize>();
assert_eq!(tool_call_count, 7, "Should have exactly 7 tool calls");
}
#[tokio::test]
async fn test_parallel_complex_nested_arguments() {
let jail = JailedStream::builder()
.tool_call_parser("nemotron_deci")
.build();
let input_chunks = vec![test_utils::create_mock_response_chunk(
r#"<TOOLCALL>[
{
"name": "get_weather_forecast",
"arguments": {
"location": {
"city": "Dallas",
"state": "TX",
"country": "USA",
"coordinates": {"lat": 32.7767, "lon": -96.7970}
},
"options": {
"days": 7,
"units": "fahrenheit",
"include_hourly": true,
"include_alerts": true,
"metrics": ["temperature", "humidity", "wind_speed", "precipitation"]
}
}
},
{
"name": "get_air_quality_data",
"arguments": {
"location": {
"coordinates": {"lat": 32.7767, "lon": -96.7970},
"radius_km": 25
},
"pollutants": ["pm2.5", "pm10", "ozone", "no2", "so2", "co"],
"time_range": {
"start": "2024-01-01T00:00:00Z",
"end": "2024-01-07T23:59:59Z"
}
}
}
]</TOOLCALL>"#
.to_string(),
0,
)];
let input_stream = stream::iter(input_chunks);
let results: Vec<_> = jail.apply(input_stream).collect().await;
assert!(!results.is_empty(), "Should have results");
let tool_call_count = results
.iter()
.map(|r| {
r.data.as_ref().map_or(0, |d| {
d.inner
.choices
.iter()
.map(|c| c.delta.tool_calls.as_ref().map_or(0, |tc| tc.len()))
.sum::<usize>()
})
})
.sum::<usize>();
assert_eq!(tool_call_count, 2, "Should have exactly 2 tool calls");
let tool_call_results: Vec<_> = results
.iter()
.filter(|r| {
r.data
.as_ref()
.is_some_and(|d| d.inner.choices.iter().any(|c| c.delta.tool_calls.is_some()))
})
.collect();
if let Some(result) = tool_call_results.first()
&& let Some(ref data) = result.data
{
for choice in &data.inner.choices {
if let Some(ref tool_calls) = choice.delta.tool_calls {
for tool_call in tool_calls {
if let Some(ref function) = tool_call.function
&& let Some(args_str) = &function.arguments
{
let parsed_args: serde_json::Value = serde_json::from_str(args_str)
.expect("Arguments should be valid JSON");
if function.name.as_deref() == Some("get_weather_forecast") {
assert!(parsed_args["location"]["coordinates"]["lat"].is_number());
assert!(parsed_args["options"]["metrics"].is_array());
} else if function.name.as_deref() == Some("get_air_quality_data") {
assert!(parsed_args["pollutants"].is_array());
assert!(parsed_args["time_range"]["start"].is_string());
}
}
}
}
}
}
}
#[tokio::test]
async fn test_parallel_partial_malformed_calls() {
let jail = JailedStream::builder()
.tool_call_parser("nemotron_deci")
.build();
let input_chunks = vec![
test_utils::create_mock_response_chunk(
r#"<TOOLCALL>[
{"name": "get_current_weather", "arguments": {"city": "Dallas", "state": "TX", "unit": "fahrenheit"}},
{"invalid": "malformed_call"},
{"name": "get_current_weather", "arguments": {"city": "Orlando", "state": "FL", "unit": "fahrenheit"}}
]</TOOLCALL>"#.to_string(),
0,
),
];
let input_stream = stream::iter(input_chunks);
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert!(!results.is_empty(), "Should have results");
let tool_call_count = results
.iter()
.map(|r| {
r.data.as_ref().map_or(0, |d| {
d.inner
.choices
.iter()
.map(|c| c.delta.tool_calls.as_ref().map_or(0, |tc| tc.len()))
.sum::<usize>()
})
})
.sum::<usize>();
assert!(
tool_call_count >= 1,
"Should have at least 1 valid tool call"
);
}
#[tokio::test]
async fn test_parallel_streaming_interrupted() {
let jail = JailedStream::builder()
.tool_call_parser("nemotron_deci")
.build();
let input_chunks = vec![
test_utils::create_mock_response_chunk("<TOOLCALL>[".to_string(), 0),
test_utils::create_mock_response_chunk(
r#" {"name": "get_current_weather", "arguments": {"city": "Dallas", "state": "TX", "unit": "fahrenheit"}},"#.to_string(),
0,
),
test_utils::create_mock_response_chunk(
r#" {"name": "get_current_weather", "arguments": {"city": "Orlando""#.to_string(),
0,
),
];
let input_stream = stream::iter(input_chunks);
let results: Vec<_> = jail.apply_with_finish_reason(input_stream).collect().await;
assert!(
!results.is_empty(),
"Should have results even with incomplete stream"
);
let has_some_content = results.iter().any(|r| {
r.data.as_ref().is_some_and(|d| {
d.inner
.choices
.iter()
.any(|c| c.delta.content.is_some() || c.delta.tool_calls.is_some())
})
});
assert!(
has_some_content,
"Should have some content despite incomplete stream"
);
}
#[tokio::test]
async fn test_parallel_empty_tool_calls_array() {
let jail = JailedStream::builder()
.tool_call_parser("nemotron_deci")
.build();
let input_chunks = vec![
test_utils::create_mock_response_chunk("I'll help you with that. ".to_string(), 0),
test_utils::create_mock_response_chunk("<TOOLCALL>[]</TOOLCALL>".to_string(), 0),
test_utils::create_mock_response_chunk(
" Actually, I don't need any tools for this.".to_string(),
0,
),
];
let input_stream = stream::iter(input_chunks);
let results: Vec<_> = jail.apply(input_stream).collect().await;
assert!(!results.is_empty(), "Should have results");
let has_normal_text = results.iter().any(|r| {
r.data.as_ref().is_some_and(|d| {
d.inner.choices.iter().any(|c| {
c.delta.content.as_ref().is_some_and(|content| {
test_utils::extract_text(content).contains("I'll help you")
|| test_utils::extract_text(content).contains("don't need any tools")
})
})
})
});
assert!(has_normal_text, "Should have normal text content");
let tool_call_count = results
.iter()
.map(|r| {
r.data.as_ref().map_or(0, |d| {
d.inner
.choices
.iter()
.map(|c| c.delta.tool_calls.as_ref().map_or(0, |tc| tc.len()))
.sum::<usize>()
})
})
.sum::<usize>();
assert_eq!(
tool_call_count, 0,
"Should have no tool calls for empty array"
);
}
#[tokio::test]
async fn test_tool_choice_required_with_qwen3_coder_parser() {
let xml_output = r#"<tool_call>
<function=get_weather>
<parameter=city>
San Francisco
</parameter>
<parameter=unit>
fahrenheit
</parameter>
</function>
</tool_call>"#;
let input_chunks = vec![test_utils::create_mock_response_chunk(
xml_output.to_string(),
0,
)];
let input_stream = stream::iter(input_chunks);
let results: Vec<_> = OpenAIPreprocessor::apply_tool_calling_jail(
Some("qwen3_coder".to_string()),
Some(ChatCompletionToolChoiceOption::Required),
None,
input_stream,
)
.collect()
.await;
let tool_call_count: usize = results
.iter()
.map(|r| {
r.data.as_ref().map_or(0, |d| {
d.inner
.choices
.iter()
.map(|c: &ChatChoiceStream| {
c.delta.tool_calls.as_ref().map_or(0, |tc| tc.len())
})
.sum::<usize>()
})
})
.sum();
assert!(
tool_call_count >= 1,
"tool_choice=required with qwen3_coder should produce at least one tool call, got {}",
tool_call_count
);
for r in &results {
if let Some(data) = &r.data {
for choice in &data.inner.choices {
if let Some(tool_calls) = &choice.delta.tool_calls {
for tc in tool_calls {
assert_eq!(
tc.function.as_ref().unwrap().name.as_deref(),
Some("get_weather"),
"Tool call name should be 'get_weather'"
);
}
}
if let Some(content) = &choice.delta.content {
let text = test_utils::extract_text(content);
assert!(
!text.contains("<tool_call>"),
"Raw XML should not leak as content, got: {}",
text
);
}
}
}
}
}
#[tokio::test]
async fn test_tool_choice_named_with_qwen3_coder_parser() {
let xml_output = r#"<tool_call>
<function=get_weather>
<parameter=city>
San Francisco
</parameter>
<parameter=unit>
fahrenheit
</parameter>
</function>
</tool_call>"#;
let input_chunks = vec![
test_utils::create_mock_response_chunk(xml_output.to_string(), 0),
test_utils::create_final_response_chunk(0),
];
let input_stream = stream::iter(input_chunks);
let results: Vec<_> = OpenAIPreprocessor::apply_tool_calling_jail(
Some("qwen3_coder".to_string()),
Some(ChatCompletionToolChoiceOption::Named(
"get_weather".to_string().into(),
)),
None,
input_stream,
)
.collect()
.await;
let tool_call_count: usize = results
.iter()
.map(|r| {
r.data.as_ref().map_or(0, |d| {
d.inner
.choices
.iter()
.map(|c: &ChatChoiceStream| {
c.delta.tool_calls.as_ref().map_or(0, |tc| tc.len())
})
.sum::<usize>()
})
})
.sum();
assert!(
tool_call_count >= 1,
"tool_choice=named with qwen3_coder should produce at least one tool call, got {}",
tool_call_count
);
for r in &results {
if let Some(data) = &r.data {
for choice in &data.inner.choices {
if let Some(tool_calls) = &choice.delta.tool_calls {
for tc in tool_calls {
assert_eq!(
tc.function.as_ref().unwrap().name.as_deref(),
Some("get_weather"),
"Tool call name should match the named tool choice"
);
}
}
if let Some(content) = &choice.delta.content {
let text = test_utils::extract_text(content);
assert!(
!text.contains("<tool_call>"),
"Raw XML should not leak as content, got: {}",
text
);
}
}
}
}
let finish_reasons: Vec<_> = results
.iter()
.filter_map(|r| {
r.data
.as_ref()
.and_then(|d| d.inner.choices.first().and_then(|c| c.finish_reason))
})
.collect();
assert!(
finish_reasons.contains(&FinishReason::Stop),
"tool_choice=named should have Stop finish reason"
);
}
}