use futures::{Stream, StreamExt};
use std::collections::{BTreeMap, HashMap};
use dynamo_parsers::tool_calling::try_tool_call_parse_aggregate_finalize;
use super::{NvCreateChatCompletionResponse, NvCreateChatCompletionStreamResponse};
use crate::protocols::{
Annotated,
codec::{Message, SseCodecError},
convert_sse_stream,
openai::{ParsingOptions, nvext::merge_response_nvext},
};
use dynamo_protocols::types::ChatCompletionMessageContent;
use dynamo_runtime::engine::DataStream;
fn is_harmony_parser(parser: &str) -> bool {
parser == "harmony"
}
fn contains_harmony_protocol(text: &str) -> bool {
text.contains("<|channel|>")
}
pub struct DeltaAggregator {
id: String,
model: String,
created: u32,
usage: Option<dynamo_protocols::types::CompletionUsage>,
system_fingerprint: Option<String>,
choices: HashMap<u32, DeltaChoice>,
error: Option<String>,
service_tier: Option<dynamo_protocols::types::ServiceTierResponse>,
nvext: Option<serde_json::Value>,
}
#[derive(Debug)]
struct DeltaChoice {
index: u32,
text: String,
role: Option<dynamo_protocols::types::Role>,
finish_reason: Option<dynamo_protocols::types::FinishReason>,
logprobs: Option<dynamo_protocols::types::ChatChoiceLogprobs>,
tool_call_chunks: BTreeMap<u32, dynamo_protocols::types::ChatCompletionMessageToolCallChunk>,
tool_calls: Option<Vec<dynamo_protocols::types::ChatCompletionMessageToolCall>>,
reasoning_content: Option<String>,
content_parts: Vec<dynamo_protocols::types::ChatCompletionResponseContentPart>,
}
impl Default for DeltaAggregator {
fn default() -> Self {
Self::new()
}
}
fn merge_tool_call_chunk(
existing: &mut dynamo_protocols::types::ChatCompletionMessageToolCallChunk,
incoming: dynamo_protocols::types::ChatCompletionMessageToolCallChunk,
) {
if existing.id.is_none()
&& let Some(id) = incoming.id
{
existing.id = Some(id);
}
if existing.r#type.is_none()
&& let Some(ty) = incoming.r#type
{
existing.r#type = Some(ty);
}
let Some(incoming_fn) = incoming.function else {
return;
};
match &mut existing.function {
None => existing.function = Some(incoming_fn),
Some(existing_fn) => {
if existing_fn.name.is_none()
&& let Some(name) = incoming_fn.name
{
existing_fn.name = Some(name);
}
if let Some(args_fragment) = incoming_fn.arguments {
existing_fn
.arguments
.get_or_insert_with(String::new)
.push_str(&args_fragment);
}
}
}
}
fn finalize_merged_tool_chunk(
chunk: dynamo_protocols::types::ChatCompletionMessageToolCallChunk,
) -> Option<dynamo_protocols::types::ChatCompletionMessageToolCall> {
let index = chunk.index;
let Some(id) = chunk.id else {
tracing::warn!(
tool_call_index = index,
"dropping merged tool-call chunk: no `id` arrived across any delta"
);
return None;
};
let Some(function) = chunk.function else {
tracing::warn!(
tool_call_index = index,
tool_call_id = %id,
"dropping merged tool-call chunk: no `function` arrived across any delta"
);
return None;
};
let Some(name) = function.name else {
tracing::warn!(
tool_call_index = index,
tool_call_id = %id,
"dropping merged tool-call chunk: no `function.name` arrived across any delta"
);
return None;
};
Some(dynamo_protocols::types::ChatCompletionMessageToolCall {
id,
r#type: chunk
.r#type
.unwrap_or(dynamo_protocols::types::FunctionType::Function),
function: dynamo_protocols::types::FunctionCall {
name,
arguments: function.arguments.unwrap_or_default(),
},
})
}
impl DeltaAggregator {
pub fn new() -> Self {
Self {
id: "".to_string(),
model: "".to_string(),
created: 0,
usage: None,
system_fingerprint: None,
choices: HashMap::new(),
error: None,
service_tier: None,
nvext: None,
}
}
pub async fn apply(
stream: impl Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>>,
parsing_options: ParsingOptions,
) -> Result<NvCreateChatCompletionResponse, String> {
let mut aggregator = stream
.fold(DeltaAggregator::new(), |mut aggregator, delta| async move {
let delta = match delta.ok() {
Ok(delta) => delta,
Err(error) => {
aggregator.error = Some(error);
return aggregator;
}
};
if aggregator.error.is_none()
&& let Some(delta) = delta.data
{
aggregator.id = delta.inner.id;
aggregator.model = delta.inner.model;
aggregator.created = delta.inner.created;
aggregator.service_tier = delta.inner.service_tier;
if let Some(usage) = delta.inner.usage {
aggregator.usage = Some(usage);
}
if let Some(system_fingerprint) = delta.inner.system_fingerprint {
aggregator.system_fingerprint = Some(system_fingerprint);
}
merge_response_nvext(&mut aggregator.nvext, delta.nvext);
for choice in delta.inner.choices {
let state_choice =
aggregator
.choices
.entry(choice.index)
.or_insert(DeltaChoice {
index: choice.index,
text: "".to_string(),
role: choice.delta.role,
finish_reason: None,
logprobs: None,
tool_call_chunks: BTreeMap::new(),
tool_calls: None,
reasoning_content: None,
content_parts: Vec::new(),
});
if let Some(content) = &choice.delta.content {
match content {
ChatCompletionMessageContent::Text(text) => {
state_choice.text.push_str(text);
}
ChatCompletionMessageContent::Parts(parts) => {
state_choice.content_parts.extend(parts.clone());
}
}
}
if let Some(reasoning_content) = &choice.delta.reasoning_content {
state_choice
.reasoning_content
.get_or_insert_with(String::new)
.push_str(reasoning_content);
}
if let Some(incoming_chunks) = choice.delta.tool_calls {
for chunk in incoming_chunks {
let entry = state_choice
.tool_call_chunks
.entry(chunk.index)
.or_insert_with(|| {
dynamo_protocols::types::ChatCompletionMessageToolCallChunk {
index: chunk.index,
id: None,
r#type: None,
function: None,
}
});
merge_tool_call_chunk(entry, chunk);
}
}
if let Some(finish_reason) = choice.finish_reason {
state_choice.finish_reason = Some(finish_reason);
}
if let Some(logprobs) = &choice.logprobs {
let state_lps = state_choice.logprobs.get_or_insert(
dynamo_protocols::types::ChatChoiceLogprobs {
content: None,
refusal: None,
},
);
if let Some(content_lps) = &logprobs.content {
state_lps
.content
.get_or_insert(Vec::new())
.extend(content_lps.clone());
}
if let Some(refusal_lps) = &logprobs.refusal {
state_lps
.refusal
.get_or_insert(Vec::new())
.extend(refusal_lps.clone());
}
}
}
}
aggregator
})
.await;
if let Some(error) = aggregator.error {
return Err(error);
}
for choice in aggregator.choices.values_mut() {
if choice.tool_call_chunks.is_empty() {
continue;
}
let finalized: Vec<_> = std::mem::take(&mut choice.tool_call_chunks)
.into_values()
.filter_map(finalize_merged_tool_chunk)
.collect();
if !finalized.is_empty() {
choice.tool_calls = Some(finalized);
}
}
if let Some(parser) = parsing_options.tool_call_parser.as_deref() {
for choice in aggregator.choices.values_mut() {
if choice
.tool_calls
.as_ref()
.is_some_and(|calls| !calls.is_empty())
|| choice.text.is_empty()
{
continue;
}
let (tool_calls, content) =
match try_tool_call_parse_aggregate_finalize(&choice.text, Some(parser), None)
.await
{
Ok(result) => result,
Err(error) => {
tracing::debug!(
error = %error,
parser,
"failed to parse aggregated chat tool calls"
);
continue;
}
};
if !tool_calls.is_empty() {
choice.tool_calls = Some(
tool_calls
.into_iter()
.map(super::tool_call_response_to_protocol)
.collect(),
);
choice.text = content.unwrap_or_default();
} else if is_harmony_parser(parser) && contains_harmony_protocol(&choice.text) {
choice.text = content.unwrap_or_default();
}
}
}
let mut choices: Vec<_> = aggregator
.choices
.into_values()
.map(dynamo_protocols::types::ChatChoice::from)
.collect();
choices.sort_by(|a, b| a.index.cmp(&b.index));
let response = NvCreateChatCompletionResponse {
inner: dynamo_protocols::types::CreateChatCompletionResponse {
id: aggregator.id,
created: aggregator.created,
usage: aggregator.usage,
model: aggregator.model,
object: "chat.completion".to_string(),
system_fingerprint: aggregator.system_fingerprint,
choices,
service_tier: aggregator.service_tier,
},
nvext: aggregator.nvext,
};
Ok(response)
}
}
#[allow(deprecated)]
impl From<DeltaChoice> for dynamo_protocols::types::ChatChoice {
fn from(delta: DeltaChoice) -> Self {
let finish_reason = if delta
.tool_calls
.as_ref()
.is_some_and(|calls| !calls.is_empty())
{
Some(dynamo_protocols::types::FinishReason::ToolCalls)
} else {
delta.finish_reason
};
let content = if !delta.content_parts.is_empty() {
Some(ChatCompletionMessageContent::Parts(delta.content_parts))
} else if !delta.text.is_empty() {
Some(ChatCompletionMessageContent::Text(delta.text))
} else {
None
};
dynamo_protocols::types::ChatChoice {
message: dynamo_protocols::types::ChatCompletionResponseMessage {
role: delta.role.expect("delta should have a Role"),
content,
tool_calls: delta.tool_calls,
refusal: None,
function_call: None,
audio: None,
reasoning_content: delta.reasoning_content,
},
index: delta.index,
finish_reason,
logprobs: delta.logprobs,
}
}
}
#[allow(async_fn_in_trait)]
pub trait ChatCompletionAggregator {
async fn from_annotated_stream(
stream: impl Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>>,
parsing_options: ParsingOptions,
) -> Result<NvCreateChatCompletionResponse, String>;
async fn from_sse_stream(
stream: DataStream<Result<Message, SseCodecError>>,
parsing_options: ParsingOptions,
) -> Result<NvCreateChatCompletionResponse, String>;
}
impl ChatCompletionAggregator for NvCreateChatCompletionResponse {
async fn from_annotated_stream(
stream: impl Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>>,
parsing_options: ParsingOptions,
) -> Result<NvCreateChatCompletionResponse, String> {
DeltaAggregator::apply(stream, parsing_options).await
}
async fn from_sse_stream(
stream: DataStream<Result<Message, SseCodecError>>,
parsing_options: ParsingOptions,
) -> Result<NvCreateChatCompletionResponse, String> {
let stream = convert_sse_stream::<NvCreateChatCompletionStreamResponse>(stream);
NvCreateChatCompletionResponse::from_annotated_stream(stream, parsing_options).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::protocols::openai::token_to_utf8_bytes;
use futures::stream;
#[allow(deprecated)]
fn create_test_delta(
index: u32,
text: &str,
role: Option<dynamo_protocols::types::Role>,
finish_reason: Option<dynamo_protocols::types::FinishReason>,
logprob: Option<f32>,
tool_calls: Option<&str>,
) -> Annotated<NvCreateChatCompletionStreamResponse> {
let tool_calls: Option<serde_json::Value> =
tool_calls.map(|tool_calls| serde_json::from_str(tool_calls).unwrap());
let tool_call_chunks = if let Some(tool_calls) = tool_calls {
Some(vec![
dynamo_protocols::types::ChatCompletionMessageToolCallChunk {
index: 0,
id: Some("test_id".to_string()),
r#type: Some(dynamo_protocols::types::FunctionType::Function),
function: Some(dynamo_protocols::types::FunctionCallStream {
name: tool_calls["name"].as_str().map(|s| s.to_string()),
arguments: Some(serde_json::to_string(&tool_calls["arguments"]).unwrap()),
}),
},
])
} else {
None
};
let delta = dynamo_protocols::types::ChatCompletionStreamResponseDelta {
content: Some(ChatCompletionMessageContent::Text(text.to_string())),
function_call: None,
tool_calls: tool_call_chunks,
role,
refusal: None,
reasoning_content: None,
};
let logprobs = logprob.map(|lp| {
let token = text.to_string();
dynamo_protocols::types::ChatChoiceLogprobs {
content: Some(vec![dynamo_protocols::types::ChatCompletionTokenLogprob {
token: token.clone(),
logprob: lp,
bytes: token_to_utf8_bytes(&token),
top_logprobs: vec![],
}]),
refusal: None,
}
});
let choice = dynamo_protocols::types::ChatChoiceStream {
index,
delta,
finish_reason,
logprobs,
};
let data = NvCreateChatCompletionStreamResponse {
inner: dynamo_protocols::types::CreateChatCompletionStreamResponse {
id: "test_id".to_string(),
model: "meta/llama-3.1-8b-instruct".to_string(),
created: 1234567890,
service_tier: None,
usage: None,
system_fingerprint: None,
choices: vec![choice],
object: "chat.completion".to_string(),
},
nvext: None,
};
Annotated {
data: Some(data),
id: Some("test_id".to_string()),
event: None,
comment: None,
error: None,
}
}
fn create_test_delta_with_tool_chunks(
index: u32,
tool_chunks: Vec<dynamo_protocols::types::ChatCompletionMessageToolCallChunk>,
finish_reason: Option<dynamo_protocols::types::FinishReason>,
role: Option<dynamo_protocols::types::Role>,
) -> Annotated<NvCreateChatCompletionStreamResponse> {
#[allow(deprecated)]
let delta = dynamo_protocols::types::ChatCompletionStreamResponseDelta {
content: None,
function_call: None,
tool_calls: Some(tool_chunks),
role,
refusal: None,
reasoning_content: None,
};
let choice = dynamo_protocols::types::ChatChoiceStream {
index,
delta,
finish_reason,
logprobs: None,
};
let data = NvCreateChatCompletionStreamResponse {
inner: dynamo_protocols::types::CreateChatCompletionStreamResponse {
id: "test_id".to_string(),
model: "meta/llama-3.1-8b-instruct".to_string(),
created: 1234567890,
service_tier: None,
usage: None,
system_fingerprint: None,
choices: vec![choice],
object: "chat.completion".to_string(),
},
nvext: None,
};
Annotated {
data: Some(data),
id: Some("test_id".to_string()),
event: None,
comment: None,
error: None,
}
}
#[tokio::test]
async fn test_issue_8640_split_tool_call_arguments_reconstructed() {
let name_chunk = dynamo_protocols::types::ChatCompletionMessageToolCallChunk {
index: 0,
id: Some("tc1".to_string()),
r#type: Some(dynamo_protocols::types::FunctionType::Function),
function: Some(dynamo_protocols::types::FunctionCallStream {
name: Some("get_weather".to_string()),
arguments: None,
}),
};
let args_chunk_1 = dynamo_protocols::types::ChatCompletionMessageToolCallChunk {
index: 0,
id: None,
r#type: None,
function: Some(dynamo_protocols::types::FunctionCallStream {
name: None,
arguments: Some("{\"city\":".to_string()),
}),
};
let args_chunk_2 = dynamo_protocols::types::ChatCompletionMessageToolCallChunk {
index: 0,
id: None,
r#type: None,
function: Some(dynamo_protocols::types::FunctionCallStream {
name: None,
arguments: Some("\"Tokyo\"}".to_string()),
}),
};
let deltas = vec![
create_test_delta_with_tool_chunks(
0,
vec![name_chunk],
None,
Some(dynamo_protocols::types::Role::Assistant),
),
create_test_delta_with_tool_chunks(0, vec![args_chunk_1], None, None),
create_test_delta_with_tool_chunks(
0,
vec![args_chunk_2],
Some(dynamo_protocols::types::FinishReason::ToolCalls),
None,
),
];
let stream = Box::pin(stream::iter(deltas));
let result = DeltaAggregator::apply(stream, ParsingOptions::default()).await;
assert!(result.is_ok(), "aggregation should not error");
let response = result.unwrap();
assert_eq!(response.inner.choices.len(), 1);
let choice = &response.inner.choices[0];
let tool_calls = choice
.message
.tool_calls
.as_ref()
.expect("tool_calls should be Some after aggregation");
assert_eq!(
tool_calls.len(),
1,
"must produce exactly one aggregated tool_call, got {}",
tool_calls.len()
);
let tc = &tool_calls[0];
assert_eq!(tc.id, "tc1");
assert_eq!(tc.function.name, "get_weather");
assert_eq!(
tc.function.arguments, "{\"city\":\"Tokyo\"}",
"#8640: arguments must be reconstructed from split fragments, \
not dropped (got {:?})",
tc.function.arguments
);
}
#[tokio::test]
async fn test_parallel_tool_calls_interleaved_chunks_aggregate_independently() {
let make_name = |idx: u32, id: &str, name: &str| {
dynamo_protocols::types::ChatCompletionMessageToolCallChunk {
index: idx,
id: Some(id.to_string()),
r#type: Some(dynamo_protocols::types::FunctionType::Function),
function: Some(dynamo_protocols::types::FunctionCallStream {
name: Some(name.to_string()),
arguments: None,
}),
}
};
let make_args = |idx: u32, fragment: &str| {
dynamo_protocols::types::ChatCompletionMessageToolCallChunk {
index: idx,
id: None,
r#type: None,
function: Some(dynamo_protocols::types::FunctionCallStream {
name: None,
arguments: Some(fragment.to_string()),
}),
}
};
let deltas = vec![
create_test_delta_with_tool_chunks(
0,
vec![make_name(0, "tc0", "get_weather")],
None,
Some(dynamo_protocols::types::Role::Assistant),
),
create_test_delta_with_tool_chunks(
0,
vec![make_name(1, "tc1", "get_time")],
None,
None,
),
create_test_delta_with_tool_chunks(0, vec![make_args(0, "{\"city\":")], None, None),
create_test_delta_with_tool_chunks(0, vec![make_args(1, "{\"tz\":")], None, None),
create_test_delta_with_tool_chunks(0, vec![make_args(0, "\"Tokyo\"}")], None, None),
create_test_delta_with_tool_chunks(
0,
vec![make_args(1, "\"JST\"}")],
Some(dynamo_protocols::types::FinishReason::ToolCalls),
None,
),
];
let stream = Box::pin(stream::iter(deltas));
let response = DeltaAggregator::apply(stream, ParsingOptions::default())
.await
.expect("aggregation should succeed");
assert_eq!(response.inner.choices.len(), 1);
let tool_calls = response.inner.choices[0]
.message
.tool_calls
.as_ref()
.expect("tool_calls should be Some");
assert_eq!(tool_calls.len(), 2, "must produce both parallel tool calls");
assert_eq!(tool_calls[0].id, "tc0");
assert_eq!(tool_calls[0].function.name, "get_weather");
assert_eq!(tool_calls[0].function.arguments, "{\"city\":\"Tokyo\"}");
assert_eq!(tool_calls[1].id, "tc1");
assert_eq!(tool_calls[1].function.name, "get_time");
assert_eq!(tool_calls[1].function.arguments, "{\"tz\":\"JST\"}");
}
#[tokio::test]
async fn test_fragment_only_chunks_without_opener_drop_cleanly() {
let args_only = dynamo_protocols::types::ChatCompletionMessageToolCallChunk {
index: 0,
id: None,
r#type: None,
function: Some(dynamo_protocols::types::FunctionCallStream {
name: None,
arguments: Some("{\"orphaned\":true}".to_string()),
}),
};
let deltas = vec![create_test_delta_with_tool_chunks(
0,
vec![args_only],
Some(dynamo_protocols::types::FinishReason::Stop),
Some(dynamo_protocols::types::Role::Assistant),
)];
let stream = Box::pin(stream::iter(deltas));
let response = DeltaAggregator::apply(stream, ParsingOptions::default())
.await
.expect("aggregation should succeed even with dropped chunk");
assert!(
response.inner.choices[0].message.tool_calls.is_none(),
"orphaned fragment must not produce a tool call (got {:?})",
response.inner.choices[0].message.tool_calls,
);
}
#[tokio::test]
async fn test_empty_stream() {
let stream: DataStream<Annotated<NvCreateChatCompletionStreamResponse>> =
Box::pin(stream::empty());
let result = DeltaAggregator::apply(stream, ParsingOptions::default()).await;
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.inner.id, "");
assert_eq!(response.inner.model, "");
assert_eq!(response.inner.created, 0);
assert!(response.inner.usage.is_none());
assert!(response.inner.system_fingerprint.is_none());
assert_eq!(response.inner.choices.len(), 0);
assert!(response.inner.service_tier.is_none());
}
#[tokio::test]
async fn test_single_delta() {
let annotated_delta = create_test_delta(
0,
"Hello,",
Some(dynamo_protocols::types::Role::User),
None,
None,
None,
);
let stream = Box::pin(stream::iter(vec![annotated_delta]));
let result = DeltaAggregator::apply(stream, ParsingOptions::default()).await;
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.inner.id, "test_id");
assert_eq!(response.inner.model, "meta/llama-3.1-8b-instruct");
assert_eq!(response.inner.created, 1234567890);
assert!(response.inner.usage.is_none());
assert!(response.inner.system_fingerprint.is_none());
assert_eq!(response.inner.choices.len(), 1);
let choice = &response.inner.choices[0];
assert_eq!(choice.index, 0);
assert_eq!(
choice.message.content.as_ref().unwrap(),
&ChatCompletionMessageContent::Text("Hello,".to_string())
);
assert!(choice.finish_reason.is_none());
assert_eq!(choice.message.role, dynamo_protocols::types::Role::User);
assert!(response.inner.service_tier.is_none());
}
#[tokio::test]
async fn test_multiple_deltas_same_choice() {
let annotated_delta1 = create_test_delta(
0,
"Hello,",
Some(dynamo_protocols::types::Role::User),
None,
Some(-0.1),
None,
);
let annotated_delta2 = create_test_delta(
0,
" world!",
None,
Some(dynamo_protocols::types::FinishReason::Stop),
Some(-0.2),
None,
);
let annotated_deltas = vec![annotated_delta1, annotated_delta2];
let stream = Box::pin(stream::iter(annotated_deltas));
let result = DeltaAggregator::apply(stream, ParsingOptions::default()).await;
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.inner.choices.len(), 1);
let choice = &response.inner.choices[0];
assert_eq!(choice.index, 0);
assert_eq!(
choice.message.content.as_ref().unwrap(),
&ChatCompletionMessageContent::Text("Hello, world!".to_string())
);
assert_eq!(
choice.finish_reason,
Some(dynamo_protocols::types::FinishReason::Stop)
);
assert_eq!(choice.message.role, dynamo_protocols::types::Role::User);
assert_eq!(
choice
.logprobs
.as_ref()
.unwrap()
.content
.as_ref()
.unwrap()
.len(),
2
);
assert_eq!(
choice.logprobs.as_ref().unwrap().content.as_ref().unwrap()[0].logprob,
-0.1
);
assert_eq!(
choice.logprobs.as_ref().unwrap().content.as_ref().unwrap()[1].logprob,
-0.2
);
}
#[tokio::test]
async fn test_preserves_intermediate_whitespace_chunks() {
let annotated_delta1 = create_test_delta(
0,
"Hello",
Some(dynamo_protocols::types::Role::User),
None,
None,
None,
);
let annotated_delta2 = create_test_delta(0, " ", None, None, None, None);
let annotated_delta3 = create_test_delta(
0,
"world",
None,
Some(dynamo_protocols::types::FinishReason::Stop),
None,
None,
);
let stream = Box::pin(stream::iter(vec![
annotated_delta1,
annotated_delta2,
annotated_delta3,
]));
let result = DeltaAggregator::apply(stream, ParsingOptions::default()).await;
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.inner.choices.len(), 1);
let choice = &response.inner.choices[0];
assert_eq!(choice.index, 0);
assert_eq!(
choice.message.content.as_ref(),
Some(&ChatCompletionMessageContent::Text(
"Hello world".to_string()
))
);
assert_eq!(
choice.finish_reason,
Some(dynamo_protocols::types::FinishReason::Stop)
);
assert_eq!(choice.message.role, dynamo_protocols::types::Role::User);
}
#[tokio::test]
async fn test_multiple_deltas_merge_nvext_fields() {
let mut annotated_delta1 = create_test_delta(
0,
"Hello",
Some(dynamo_protocols::types::Role::Assistant),
None,
None,
None,
);
annotated_delta1.data.as_mut().expect("delta data").nvext =
Some(serde_json::json!({ "engine_data": { "trace_id": "abc" } }));
let mut annotated_delta2 = create_test_delta(
0,
" world",
None,
Some(dynamo_protocols::types::FinishReason::Stop),
None,
None,
);
annotated_delta2.data.as_mut().expect("delta data").nvext =
Some(serde_json::json!({ "stop_reason": 128001 }));
let stream = Box::pin(stream::iter(vec![annotated_delta1, annotated_delta2]));
let response = DeltaAggregator::apply(stream, ParsingOptions::default())
.await
.expect("aggregate stream");
assert_eq!(
response.nvext,
Some(serde_json::json!({
"engine_data": { "trace_id": "abc" },
"stop_reason": 128001,
}))
);
}
#[allow(deprecated)]
#[tokio::test]
async fn test_multiple_choices() {
let data = NvCreateChatCompletionStreamResponse {
inner: dynamo_protocols::types::CreateChatCompletionStreamResponse {
id: "test_id".to_string(),
model: "test_model".to_string(),
created: 1234567890,
service_tier: None,
usage: None,
system_fingerprint: None,
choices: vec![
dynamo_protocols::types::ChatChoiceStream {
index: 0,
delta: dynamo_protocols::types::ChatCompletionStreamResponseDelta {
role: Some(dynamo_protocols::types::Role::Assistant),
content: Some(ChatCompletionMessageContent::Text(
"Choice 0".to_string(),
)),
function_call: None,
tool_calls: None,
refusal: None,
reasoning_content: None,
},
finish_reason: Some(dynamo_protocols::types::FinishReason::Stop),
logprobs: None,
},
dynamo_protocols::types::ChatChoiceStream {
index: 1,
delta: dynamo_protocols::types::ChatCompletionStreamResponseDelta {
role: Some(dynamo_protocols::types::Role::Assistant),
content: Some(ChatCompletionMessageContent::Text(
"Choice 1".to_string(),
)),
function_call: None,
tool_calls: None,
refusal: None,
reasoning_content: None,
},
finish_reason: Some(dynamo_protocols::types::FinishReason::Stop),
logprobs: None,
},
],
object: "chat.completion".to_string(),
},
nvext: None,
};
let annotated_delta = Annotated {
data: Some(data),
id: Some("test_id".to_string()),
event: None,
comment: None,
error: None,
};
let stream = Box::pin(stream::iter(vec![annotated_delta]));
let result = DeltaAggregator::apply(stream, ParsingOptions::default()).await;
assert!(result.is_ok());
let mut response = result.unwrap();
assert_eq!(response.inner.choices.len(), 2);
response.inner.choices.sort_by(|a, b| a.index.cmp(&b.index)); let choice0 = &response.inner.choices[0];
assert_eq!(choice0.index, 0);
assert_eq!(
choice0.message.content.as_ref().unwrap(),
&ChatCompletionMessageContent::Text("Choice 0".to_string())
);
assert_eq!(
choice0.finish_reason,
Some(dynamo_protocols::types::FinishReason::Stop)
);
assert_eq!(
choice0.message.role,
dynamo_protocols::types::Role::Assistant
);
let choice1 = &response.inner.choices[1];
assert_eq!(choice1.index, 1);
assert_eq!(
choice1.message.content.as_ref().unwrap(),
&ChatCompletionMessageContent::Text("Choice 1".to_string())
);
assert_eq!(
choice1.finish_reason,
Some(dynamo_protocols::types::FinishReason::Stop)
);
assert_eq!(
choice1.message.role,
dynamo_protocols::types::Role::Assistant
);
}
#[tokio::test]
async fn test_tool_calling_finish_reason_override_from_stop() {
let tool_call_json =
r#"{"name": "get_weather", "arguments": {"location": "New York", "unit": "celsius"}}"#;
let annotated_delta = create_test_delta(
0,
"I'll check the weather for you.",
Some(dynamo_protocols::types::Role::Assistant),
Some(dynamo_protocols::types::FinishReason::Stop), None,
Some(tool_call_json),
);
let data = annotated_delta.data.unwrap();
let annotated_delta = Annotated {
data: Some(data),
id: Some("test_id".to_string()),
event: None,
comment: None,
error: None,
};
let stream = Box::pin(stream::iter(vec![annotated_delta]));
let result = DeltaAggregator::apply(stream, ParsingOptions::default()).await;
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.inner.choices.len(), 1);
let choice = &response.inner.choices[0];
assert!(choice.message.tool_calls.is_some());
let tool_calls = choice.message.tool_calls.as_ref().unwrap();
assert_eq!(tool_calls.len(), 1);
assert_eq!(
tool_calls[0].r#type,
dynamo_protocols::types::FunctionType::Function
);
assert_eq!(
choice.finish_reason,
Some(dynamo_protocols::types::FinishReason::ToolCalls)
);
}
#[tokio::test]
async fn test_tool_calling_finish_reason_override_from_length() {
let tool_call_json = r#"{"name": "search", "arguments": {"query": "rust programming"}}"#;
let annotated_delta = create_test_delta(
0,
"Let me search for that.",
Some(dynamo_protocols::types::Role::Assistant),
Some(dynamo_protocols::types::FinishReason::Length), None,
Some(tool_call_json),
);
let data = annotated_delta.data.unwrap();
let annotated_delta = Annotated {
data: Some(data),
id: Some("test_id".to_string()),
event: None,
comment: None,
error: None,
};
let stream = Box::pin(stream::iter(vec![annotated_delta]));
let result = DeltaAggregator::apply(stream, ParsingOptions::default()).await;
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.inner.choices.len(), 1);
let choice = &response.inner.choices[0];
assert!(choice.message.tool_calls.is_some());
let tool_calls = choice.message.tool_calls.as_ref().unwrap();
assert_eq!(tool_calls.len(), 1);
assert_eq!(
tool_calls[0].r#type,
dynamo_protocols::types::FunctionType::Function
);
assert_eq!(
choice.finish_reason,
Some(dynamo_protocols::types::FinishReason::ToolCalls)
);
}
#[tokio::test]
async fn test_tool_calling_finish_reason_override_from_none() {
let tool_call_json = r#"{"name": "calculate", "arguments": {"expression": "2+2"}}"#;
let annotated_delta = create_test_delta(
0,
"I'll calculate that for you.",
Some(dynamo_protocols::types::Role::Assistant),
None, None,
Some(tool_call_json),
);
let data = annotated_delta.data.unwrap();
let annotated_delta = Annotated {
data: Some(data),
id: Some("test_id".to_string()),
event: None,
comment: None,
error: None,
};
let stream = Box::pin(stream::iter(vec![annotated_delta]));
let result = DeltaAggregator::apply(stream, ParsingOptions::default()).await;
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.inner.choices.len(), 1);
let choice = &response.inner.choices[0];
assert!(choice.message.tool_calls.is_some());
let tool_calls = choice.message.tool_calls.as_ref().unwrap();
assert_eq!(tool_calls.len(), 1);
assert_eq!(
choice.finish_reason,
Some(dynamo_protocols::types::FinishReason::ToolCalls)
);
}
#[tokio::test]
async fn test_no_tool_calling_preserves_original_finish_reason() {
let annotated_delta = create_test_delta(
0,
"This is a regular response without tool calls.",
Some(dynamo_protocols::types::Role::Assistant),
Some(dynamo_protocols::types::FinishReason::Stop),
None,
None, );
let data = annotated_delta.data.unwrap();
let annotated_delta = Annotated {
data: Some(data),
id: Some("test_id".to_string()),
event: None,
comment: None,
error: None,
};
let stream = Box::pin(stream::iter(vec![annotated_delta]));
let result = DeltaAggregator::apply(stream, ParsingOptions::default()).await;
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.inner.choices.len(), 1);
let choice = &response.inner.choices[0];
assert!(choice.message.tool_calls.is_none());
assert_eq!(
choice.finish_reason,
Some(dynamo_protocols::types::FinishReason::Stop)
);
}
#[tokio::test]
async fn test_empty_tool_calls_preserves_original_finish_reason() {
let mut annotated_delta = create_test_delta(
0,
"Response with empty tool calls array.",
Some(dynamo_protocols::types::Role::Assistant),
Some(dynamo_protocols::types::FinishReason::Length),
None,
None,
);
if let Some(ref mut data) = annotated_delta.data {
data.inner.choices[0].delta.tool_calls = Some(vec![]); }
let data = annotated_delta.data.unwrap();
let annotated_delta = Annotated {
data: Some(data),
id: Some("test_id".to_string()),
event: None,
comment: None,
error: None,
};
let stream = Box::pin(stream::iter(vec![annotated_delta]));
let result = DeltaAggregator::apply(stream, ParsingOptions::default()).await;
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.inner.choices.len(), 1);
let choice = &response.inner.choices[0];
assert!(choice.message.tool_calls.is_none());
assert_eq!(
choice.finish_reason,
Some(dynamo_protocols::types::FinishReason::Length)
);
}
#[tokio::test]
async fn test_tool_calling_output() {
let tool_call_json = r#"{"name": "get_weather", "arguments": {"location": "San Francisco, CA", "unit": "fahrenheit"}}"#;
let annotated_delta = create_test_delta(
0,
"Hey Dude ! What's the weather in San Francisco in Fahrenheit?",
Some(dynamo_protocols::types::Role::Assistant),
Some(dynamo_protocols::types::FinishReason::ToolCalls),
None,
Some(tool_call_json),
);
let data = annotated_delta.data.unwrap();
let annotated_delta = Annotated {
data: Some(data),
id: Some("test_id".to_string()),
event: None,
comment: None,
error: None,
};
let stream = Box::pin(stream::iter(vec![annotated_delta]));
let result = DeltaAggregator::apply(stream, ParsingOptions::default()).await;
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.inner.choices.len(), 1);
let choice = &response.inner.choices[0];
assert!(choice.message.tool_calls.is_some());
let tool_calls = choice.message.tool_calls.as_ref().unwrap();
assert_eq!(tool_calls.len(), 1);
let tool_call = &tool_calls[0];
assert_eq!(tool_call.function.name, "get_weather");
let args: serde_json::Value = serde_json::from_str(&tool_call.function.arguments).unwrap();
assert_eq!(args["location"], "San Francisco, CA");
assert_eq!(args["unit"], "fahrenheit");
assert_eq!(
choice.message.content.as_ref().unwrap(),
&ChatCompletionMessageContent::Text(
"Hey Dude ! What's the weather in San Francisco in Fahrenheit?".to_string()
)
);
assert_eq!(
choice.finish_reason,
Some(dynamo_protocols::types::FinishReason::ToolCalls)
);
assert_eq!(
choice.message.role,
dynamo_protocols::types::Role::Assistant
);
}
#[tokio::test]
async fn test_tool_calling_finish_reason_override_from_stop_alternative() {
let tool_call_json =
r#"{"name": "get_weather", "arguments": {"location": "New York", "unit": "celsius"}}"#;
let annotated_delta = create_test_delta(
0,
"Getting weather for New York",
Some(dynamo_protocols::types::Role::Assistant),
Some(dynamo_protocols::types::FinishReason::Stop), None,
Some(tool_call_json),
);
let stream = Box::pin(stream::iter(vec![annotated_delta]));
let result = DeltaAggregator::apply(stream, ParsingOptions::default()).await;
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.inner.choices.len(), 1);
let choice = &response.inner.choices[0];
assert_eq!(
choice.finish_reason,
Some(dynamo_protocols::types::FinishReason::ToolCalls)
);
assert!(choice.message.tool_calls.is_some());
let tool_calls = choice.message.tool_calls.as_ref().unwrap();
assert_eq!(tool_calls.len(), 1);
assert_eq!(tool_calls[0].function.name, "get_weather");
}
#[tokio::test]
async fn test_parses_aggregated_tool_call_text_into_tool_calls() {
let annotated_delta = create_test_delta(
0,
"<tool_call>\n{\"name\":\"get_weather\",\"arguments\":{\"location\":\"SF\"}}\n</tool_call>",
Some(dynamo_protocols::types::Role::Assistant),
Some(dynamo_protocols::types::FinishReason::Stop),
None,
None,
);
let stream = Box::pin(stream::iter(vec![annotated_delta]));
let result = DeltaAggregator::apply(
stream,
ParsingOptions::new(Some("hermes".to_string()), None),
)
.await;
assert!(result.is_ok());
let response = result.unwrap();
let choice = &response.inner.choices[0];
assert_eq!(
choice.finish_reason,
Some(dynamo_protocols::types::FinishReason::ToolCalls)
);
assert_eq!(choice.message.content, None);
let tool_calls = choice.message.tool_calls.as_ref().unwrap();
assert_eq!(tool_calls.len(), 1);
assert_eq!(
tool_calls[0].r#type,
dynamo_protocols::types::FunctionType::Function
);
assert_eq!(tool_calls[0].function.name, "get_weather");
assert_eq!(tool_calls[0].function.arguments, "{\"location\":\"SF\"}");
}
#[tokio::test]
async fn test_preserves_non_tool_content_when_parsing_aggregated_tool_calls() {
let annotated_delta = create_test_delta(
0,
"hello\n<tool_call>\n{\"name\":\"get_weather\",\"arguments\":{\"location\":\"SF\"}}\n</tool_call>",
Some(dynamo_protocols::types::Role::Assistant),
Some(dynamo_protocols::types::FinishReason::Stop),
None,
None,
);
let stream = Box::pin(stream::iter(vec![annotated_delta]));
let result = DeltaAggregator::apply(
stream,
ParsingOptions::new(Some("hermes".to_string()), None),
)
.await;
assert!(result.is_ok());
let response = result.unwrap();
let choice = &response.inner.choices[0];
assert_eq!(
choice.message.content,
Some(ChatCompletionMessageContent::Text("hello".to_string()))
);
assert_eq!(
choice.finish_reason,
Some(dynamo_protocols::types::FinishReason::ToolCalls)
);
assert_eq!(
choice.message.tool_calls.as_ref().unwrap()[0].r#type,
dynamo_protocols::types::FunctionType::Function
);
}
#[tokio::test]
async fn test_harmony_aggregate_zero_call_drops_internal_analysis() {
let annotated_delta = create_test_delta(
0,
r#"<|channel|>analysis<|message|>Need current weather.<|end|><|start|>assistant<|channel|>commentary to=functions.get_current_weather <|constrain|>json<|message|>{"location":"Hidden City"}"#,
Some(dynamo_protocols::types::Role::Assistant),
Some(dynamo_protocols::types::FinishReason::Stop),
None,
None,
);
let stream = Box::pin(stream::iter(vec![annotated_delta]));
let result = DeltaAggregator::apply(
stream,
ParsingOptions::new(Some("harmony".to_string()), None),
)
.await;
assert!(result.is_ok());
let response = result.unwrap();
let choice = &response.inner.choices[0];
assert_eq!(choice.message.content, None);
assert!(choice.message.tool_calls.is_none());
assert_eq!(
choice.finish_reason,
Some(dynamo_protocols::types::FinishReason::Stop)
);
}
#[tokio::test]
async fn test_harmony_aggregate_plain_text_without_markers_stays_plain_text() {
let annotated_delta = create_test_delta(
0,
"plain response",
Some(dynamo_protocols::types::Role::Assistant),
Some(dynamo_protocols::types::FinishReason::Stop),
None,
None,
);
let stream = Box::pin(stream::iter(vec![annotated_delta]));
let result = DeltaAggregator::apply(
stream,
ParsingOptions::new(Some("harmony".to_string()), None),
)
.await;
assert!(result.is_ok());
let response = result.unwrap();
let choice = &response.inner.choices[0];
assert_eq!(
choice.message.content,
Some(ChatCompletionMessageContent::Text(
"plain response".to_string()
))
);
assert!(choice.message.tool_calls.is_none());
}
#[test]
fn test_reasoning_only_response_serializes_content_key_as_null() {
let delta = DeltaChoice {
index: 0,
text: String::new(),
role: Some(dynamo_protocols::types::Role::Assistant),
finish_reason: Some(dynamo_protocols::types::FinishReason::Stop),
logprobs: None,
tool_call_chunks: BTreeMap::new(),
tool_calls: None,
reasoning_content: Some("Analyzing the question.".to_string()),
content_parts: vec![],
};
let choice: dynamo_protocols::types::ChatChoice = delta.into();
assert!(choice.message.content.is_none());
assert_eq!(
choice.message.reasoning_content.as_deref(),
Some("Analyzing the question.")
);
let json = serde_json::to_value(&choice.message).unwrap();
assert_eq!(
json.get("content"),
Some(&serde_json::Value::Null),
"content key must be serialized as null when absent"
);
assert!(json.get("reasoning_content").is_some());
}
}