use futures::{Stream, StreamExt};
use std::collections::HashMap;
use super::{NvCreateChatCompletionResponse, NvCreateChatCompletionStreamResponse};
use crate::protocols::{
Annotated,
codec::{Message, SseCodecError},
convert_sse_stream,
openai::ParsingOptions,
};
use dynamo_async_openai::types::{ChatCompletionMessageContent, StopReason};
use dynamo_runtime::engine::DataStream;
pub struct DeltaAggregator {
id: String,
model: String,
created: u32,
usage: Option<dynamo_async_openai::types::CompletionUsage>,
system_fingerprint: Option<String>,
choices: HashMap<u32, DeltaChoice>,
error: Option<String>,
service_tier: Option<dynamo_async_openai::types::ServiceTierResponse>,
nvext: Option<serde_json::Value>,
}
#[derive(Debug)]
struct DeltaChoice {
index: u32,
text: String,
role: Option<dynamo_async_openai::types::Role>,
finish_reason: Option<dynamo_async_openai::types::FinishReason>,
stop_reason: Option<StopReason>,
logprobs: Option<dynamo_async_openai::types::ChatChoiceLogprobs>,
tool_calls: Option<Vec<dynamo_async_openai::types::ChatCompletionMessageToolCall>>,
reasoning_content: Option<String>,
content_parts: Vec<dynamo_async_openai::types::ChatCompletionResponseContentPart>,
}
impl Default for DeltaAggregator {
fn default() -> Self {
Self::new()
}
}
fn convert_tool_chunk_to_message_tool_call(
chunk: &dynamo_async_openai::types::ChatCompletionMessageToolCallChunk,
) -> Option<dynamo_async_openai::types::ChatCompletionMessageToolCall> {
if let (Some(id), Some(r#type), Some(function)) = (&chunk.id, &chunk.r#type, &chunk.function) {
if let (Some(name), Some(arguments)) = (&function.name, &function.arguments) {
Some(dynamo_async_openai::types::ChatCompletionMessageToolCall {
id: id.clone(),
r#type: r#type.clone(),
function: dynamo_async_openai::types::FunctionCall {
name: name.clone(),
arguments: arguments.clone(),
},
})
} else {
None
}
} else {
None
}
}
impl DeltaAggregator {
pub fn new() -> Self {
Self {
id: "".to_string(),
model: "".to_string(),
created: 0,
usage: None,
system_fingerprint: None,
choices: HashMap::new(),
error: None,
service_tier: None,
nvext: None,
}
}
pub async fn apply(
stream: impl Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>>,
_parsing_options: ParsingOptions,
) -> Result<NvCreateChatCompletionResponse, String> {
let aggregator = stream
.fold(DeltaAggregator::new(), |mut aggregator, delta| async move {
let delta = match delta.ok() {
Ok(delta) => delta,
Err(error) => {
aggregator.error = Some(error);
return aggregator;
}
};
if aggregator.error.is_none() && delta.data.is_some() {
let delta = delta.data.unwrap();
aggregator.id = delta.id;
aggregator.model = delta.model;
aggregator.created = delta.created;
aggregator.service_tier = delta.service_tier;
if let Some(usage) = delta.usage {
aggregator.usage = Some(usage);
}
if let Some(system_fingerprint) = delta.system_fingerprint {
aggregator.system_fingerprint = Some(system_fingerprint);
}
if delta.nvext.is_some() {
aggregator.nvext = delta.nvext;
}
for choice in delta.choices {
let state_choice =
aggregator
.choices
.entry(choice.index)
.or_insert(DeltaChoice {
index: choice.index,
text: "".to_string(),
role: choice.delta.role,
finish_reason: None,
stop_reason: None,
logprobs: None,
tool_calls: None,
reasoning_content: None,
content_parts: Vec::new(),
});
if let Some(content) = &choice.delta.content {
match content {
ChatCompletionMessageContent::Text(text) => {
state_choice.text.push_str(text);
}
ChatCompletionMessageContent::Parts(parts) => {
state_choice.content_parts.extend(parts.clone());
}
}
}
if let Some(reasoning_content) = &choice.delta.reasoning_content {
state_choice
.reasoning_content
.get_or_insert_with(String::new)
.push_str(reasoning_content);
}
if let Some(tool_calls) = &choice.delta.tool_calls
&& !tool_calls.is_empty()
{
let converted_tool_calls: Vec<
dynamo_async_openai::types::ChatCompletionMessageToolCall,
> = tool_calls
.iter()
.filter_map(convert_tool_chunk_to_message_tool_call)
.collect();
if !converted_tool_calls.is_empty() {
if let Some(existing_tool_calls) = &mut state_choice.tool_calls {
existing_tool_calls.extend(converted_tool_calls);
} else {
state_choice.tool_calls = Some(converted_tool_calls);
}
}
}
if let Some(finish_reason) = choice.finish_reason {
state_choice.finish_reason = Some(finish_reason);
}
if let Some(stop_reason) = choice.stop_reason {
state_choice.stop_reason = Some(stop_reason);
}
if let Some(logprobs) = &choice.logprobs {
let state_lps = state_choice.logprobs.get_or_insert(
dynamo_async_openai::types::ChatChoiceLogprobs {
content: None,
refusal: None,
},
);
if let Some(content_lps) = &logprobs.content {
state_lps
.content
.get_or_insert(Vec::new())
.extend(content_lps.clone());
}
if let Some(refusal_lps) = &logprobs.refusal {
state_lps
.refusal
.get_or_insert(Vec::new())
.extend(refusal_lps.clone());
}
}
}
}
aggregator
})
.await;
if let Some(error) = aggregator.error {
return Err(error);
}
let mut choices: Vec<_> = aggregator
.choices
.into_values()
.map(dynamo_async_openai::types::ChatChoice::from)
.collect();
choices.sort_by(|a, b| a.index.cmp(&b.index));
let response = NvCreateChatCompletionResponse {
id: aggregator.id,
created: aggregator.created,
usage: aggregator.usage,
model: aggregator.model,
object: "chat.completion".to_string(),
system_fingerprint: aggregator.system_fingerprint,
choices,
service_tier: aggregator.service_tier,
nvext: aggregator.nvext,
};
Ok(response)
}
}
#[allow(deprecated)]
impl From<DeltaChoice> for dynamo_async_openai::types::ChatChoice {
fn from(delta: DeltaChoice) -> Self {
let finish_reason = if delta
.tool_calls
.as_ref()
.is_some_and(|calls| !calls.is_empty())
{
Some(dynamo_async_openai::types::FinishReason::ToolCalls)
} else {
delta.finish_reason
};
let content = if !delta.content_parts.is_empty() {
Some(ChatCompletionMessageContent::Parts(delta.content_parts))
} else if !delta.text.is_empty() {
Some(ChatCompletionMessageContent::Text(delta.text))
} else {
None
};
dynamo_async_openai::types::ChatChoice {
message: dynamo_async_openai::types::ChatCompletionResponseMessage {
role: delta.role.expect("delta should have a Role"),
content,
tool_calls: delta.tool_calls,
refusal: None,
function_call: None,
audio: None,
reasoning_content: delta.reasoning_content,
},
index: delta.index,
finish_reason,
stop_reason: delta.stop_reason,
logprobs: delta.logprobs,
}
}
}
#[allow(async_fn_in_trait)]
pub trait ChatCompletionAggregator {
async fn from_annotated_stream(
stream: impl Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>>,
parsing_options: ParsingOptions,
) -> Result<NvCreateChatCompletionResponse, String>;
async fn from_sse_stream(
stream: DataStream<Result<Message, SseCodecError>>,
parsing_options: ParsingOptions,
) -> Result<NvCreateChatCompletionResponse, String>;
}
impl ChatCompletionAggregator for dynamo_async_openai::types::CreateChatCompletionResponse {
async fn from_annotated_stream(
stream: impl Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>>,
parsing_options: ParsingOptions,
) -> Result<NvCreateChatCompletionResponse, String> {
DeltaAggregator::apply(stream, parsing_options).await
}
async fn from_sse_stream(
stream: DataStream<Result<Message, SseCodecError>>,
parsing_options: ParsingOptions,
) -> Result<NvCreateChatCompletionResponse, String> {
let stream = convert_sse_stream::<NvCreateChatCompletionStreamResponse>(stream);
NvCreateChatCompletionResponse::from_annotated_stream(stream, parsing_options).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::protocols::openai::token_to_utf8_bytes;
use futures::stream;
#[allow(deprecated)]
fn create_test_delta(
index: u32,
text: &str,
role: Option<dynamo_async_openai::types::Role>,
finish_reason: Option<dynamo_async_openai::types::FinishReason>,
logprob: Option<f32>,
tool_calls: Option<&str>,
) -> Annotated<NvCreateChatCompletionStreamResponse> {
let tool_calls: Option<serde_json::Value> =
tool_calls.map(|tool_calls| serde_json::from_str(tool_calls).unwrap());
let tool_call_chunks = if let Some(tool_calls) = tool_calls {
Some(vec![
dynamo_async_openai::types::ChatCompletionMessageToolCallChunk {
index: 0,
id: Some("test_id".to_string()),
r#type: Some(dynamo_async_openai::types::ChatCompletionToolType::Function),
function: Some(dynamo_async_openai::types::FunctionCallStream {
name: tool_calls["name"].as_str().map(|s| s.to_string()),
arguments: Some(serde_json::to_string(&tool_calls["arguments"]).unwrap()),
}),
},
])
} else {
None
};
let delta = dynamo_async_openai::types::ChatCompletionStreamResponseDelta {
content: Some(ChatCompletionMessageContent::Text(text.to_string())),
function_call: None,
tool_calls: tool_call_chunks,
role,
refusal: None,
reasoning_content: None,
};
let logprobs = logprob.map(|lp| {
let token = text.to_string();
dynamo_async_openai::types::ChatChoiceLogprobs {
content: Some(vec![
dynamo_async_openai::types::ChatCompletionTokenLogprob {
token: token.clone(),
logprob: lp,
bytes: token_to_utf8_bytes(&token),
top_logprobs: vec![],
},
]),
refusal: None,
}
});
let choice = dynamo_async_openai::types::ChatChoiceStream {
index,
delta,
finish_reason,
stop_reason: None,
logprobs,
};
let data = NvCreateChatCompletionStreamResponse {
id: "test_id".to_string(),
model: "meta/llama-3.1-8b-instruct".to_string(),
created: 1234567890,
service_tier: None,
usage: None,
system_fingerprint: None,
choices: vec![choice],
object: "chat.completion".to_string(),
nvext: None,
};
Annotated {
data: Some(data),
id: Some("test_id".to_string()),
event: None,
comment: None,
error: None,
}
}
#[tokio::test]
async fn test_empty_stream() {
let stream: DataStream<Annotated<NvCreateChatCompletionStreamResponse>> =
Box::pin(stream::empty());
let result = DeltaAggregator::apply(stream, ParsingOptions::default()).await;
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.id, "");
assert_eq!(response.model, "");
assert_eq!(response.created, 0);
assert!(response.usage.is_none());
assert!(response.system_fingerprint.is_none());
assert_eq!(response.choices.len(), 0);
assert!(response.service_tier.is_none());
}
#[tokio::test]
async fn test_single_delta() {
let annotated_delta = create_test_delta(
0,
"Hello,",
Some(dynamo_async_openai::types::Role::User),
None,
None,
None,
);
let stream = Box::pin(stream::iter(vec![annotated_delta]));
let result = DeltaAggregator::apply(stream, ParsingOptions::default()).await;
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.id, "test_id");
assert_eq!(response.model, "meta/llama-3.1-8b-instruct");
assert_eq!(response.created, 1234567890);
assert!(response.usage.is_none());
assert!(response.system_fingerprint.is_none());
assert_eq!(response.choices.len(), 1);
let choice = &response.choices[0];
assert_eq!(choice.index, 0);
assert_eq!(
choice.message.content.as_ref().unwrap(),
&ChatCompletionMessageContent::Text("Hello,".to_string())
);
assert!(choice.finish_reason.is_none());
assert_eq!(choice.message.role, dynamo_async_openai::types::Role::User);
assert!(response.service_tier.is_none());
}
#[tokio::test]
async fn test_multiple_deltas_same_choice() {
let annotated_delta1 = create_test_delta(
0,
"Hello,",
Some(dynamo_async_openai::types::Role::User),
None,
Some(-0.1),
None,
);
let annotated_delta2 = create_test_delta(
0,
" world!",
None,
Some(dynamo_async_openai::types::FinishReason::Stop),
Some(-0.2),
None,
);
let annotated_deltas = vec![annotated_delta1, annotated_delta2];
let stream = Box::pin(stream::iter(annotated_deltas));
let result = DeltaAggregator::apply(stream, ParsingOptions::default()).await;
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.choices.len(), 1);
let choice = &response.choices[0];
assert_eq!(choice.index, 0);
assert_eq!(
choice.message.content.as_ref().unwrap(),
&ChatCompletionMessageContent::Text("Hello, world!".to_string())
);
assert_eq!(
choice.finish_reason,
Some(dynamo_async_openai::types::FinishReason::Stop)
);
assert_eq!(choice.message.role, dynamo_async_openai::types::Role::User);
assert_eq!(
choice
.logprobs
.as_ref()
.unwrap()
.content
.as_ref()
.unwrap()
.len(),
2
);
assert_eq!(
choice.logprobs.as_ref().unwrap().content.as_ref().unwrap()[0].logprob,
-0.1
);
assert_eq!(
choice.logprobs.as_ref().unwrap().content.as_ref().unwrap()[1].logprob,
-0.2
);
}
#[tokio::test]
async fn test_preserves_intermediate_whitespace_chunks() {
let annotated_delta1 = create_test_delta(
0,
"Hello",
Some(dynamo_async_openai::types::Role::User),
None,
None,
None,
);
let annotated_delta2 = create_test_delta(0, " ", None, None, None, None);
let annotated_delta3 = create_test_delta(
0,
"world",
None,
Some(dynamo_async_openai::types::FinishReason::Stop),
None,
None,
);
let stream = Box::pin(stream::iter(vec![
annotated_delta1,
annotated_delta2,
annotated_delta3,
]));
let result = DeltaAggregator::apply(stream, ParsingOptions::default()).await;
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.choices.len(), 1);
let choice = &response.choices[0];
assert_eq!(choice.index, 0);
assert_eq!(
choice.message.content.as_ref(),
Some(&ChatCompletionMessageContent::Text(
"Hello world".to_string()
))
);
assert_eq!(
choice.finish_reason,
Some(dynamo_async_openai::types::FinishReason::Stop)
);
assert_eq!(choice.message.role, dynamo_async_openai::types::Role::User);
}
#[allow(deprecated)]
#[tokio::test]
async fn test_multiple_choices() {
let data = NvCreateChatCompletionStreamResponse {
id: "test_id".to_string(),
model: "test_model".to_string(),
created: 1234567890,
service_tier: None,
usage: None,
system_fingerprint: None,
choices: vec![
dynamo_async_openai::types::ChatChoiceStream {
index: 0,
delta: dynamo_async_openai::types::ChatCompletionStreamResponseDelta {
role: Some(dynamo_async_openai::types::Role::Assistant),
content: Some(ChatCompletionMessageContent::Text("Choice 0".to_string())),
function_call: None,
tool_calls: None,
refusal: None,
reasoning_content: None,
},
finish_reason: Some(dynamo_async_openai::types::FinishReason::Stop),
stop_reason: None,
logprobs: None,
},
dynamo_async_openai::types::ChatChoiceStream {
index: 1,
delta: dynamo_async_openai::types::ChatCompletionStreamResponseDelta {
role: Some(dynamo_async_openai::types::Role::Assistant),
content: Some(ChatCompletionMessageContent::Text("Choice 1".to_string())),
function_call: None,
tool_calls: None,
refusal: None,
reasoning_content: None,
},
finish_reason: Some(dynamo_async_openai::types::FinishReason::Stop),
stop_reason: None,
logprobs: None,
},
],
object: "chat.completion".to_string(),
nvext: None,
};
let annotated_delta = Annotated {
data: Some(data),
id: Some("test_id".to_string()),
event: None,
comment: None,
error: None,
};
let stream = Box::pin(stream::iter(vec![annotated_delta]));
let result = DeltaAggregator::apply(stream, ParsingOptions::default()).await;
assert!(result.is_ok());
let mut response = result.unwrap();
assert_eq!(response.choices.len(), 2);
response.choices.sort_by(|a, b| a.index.cmp(&b.index)); let choice0 = &response.choices[0];
assert_eq!(choice0.index, 0);
assert_eq!(
choice0.message.content.as_ref().unwrap(),
&ChatCompletionMessageContent::Text("Choice 0".to_string())
);
assert_eq!(
choice0.finish_reason,
Some(dynamo_async_openai::types::FinishReason::Stop)
);
assert_eq!(
choice0.message.role,
dynamo_async_openai::types::Role::Assistant
);
let choice1 = &response.choices[1];
assert_eq!(choice1.index, 1);
assert_eq!(
choice1.message.content.as_ref().unwrap(),
&ChatCompletionMessageContent::Text("Choice 1".to_string())
);
assert_eq!(
choice1.finish_reason,
Some(dynamo_async_openai::types::FinishReason::Stop)
);
assert_eq!(
choice1.message.role,
dynamo_async_openai::types::Role::Assistant
);
}
#[tokio::test]
async fn test_tool_calling_finish_reason_override_from_stop() {
let tool_call_json =
r#"{"name": "get_weather", "arguments": {"location": "New York", "unit": "celsius"}}"#;
let annotated_delta = create_test_delta(
0,
"I'll check the weather for you.",
Some(dynamo_async_openai::types::Role::Assistant),
Some(dynamo_async_openai::types::FinishReason::Stop), None,
Some(tool_call_json),
);
let data = annotated_delta.data.unwrap();
let annotated_delta = Annotated {
data: Some(data),
id: Some("test_id".to_string()),
event: None,
comment: None,
error: None,
};
let stream = Box::pin(stream::iter(vec![annotated_delta]));
let result = DeltaAggregator::apply(stream, ParsingOptions::default()).await;
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.choices.len(), 1);
let choice = &response.choices[0];
assert!(choice.message.tool_calls.is_some());
let tool_calls = choice.message.tool_calls.as_ref().unwrap();
assert_eq!(tool_calls.len(), 1);
assert_eq!(
choice.finish_reason,
Some(dynamo_async_openai::types::FinishReason::ToolCalls)
);
}
#[tokio::test]
async fn test_tool_calling_finish_reason_override_from_length() {
let tool_call_json = r#"{"name": "search", "arguments": {"query": "rust programming"}}"#;
let annotated_delta = create_test_delta(
0,
"Let me search for that.",
Some(dynamo_async_openai::types::Role::Assistant),
Some(dynamo_async_openai::types::FinishReason::Length), None,
Some(tool_call_json),
);
let data = annotated_delta.data.unwrap();
let annotated_delta = Annotated {
data: Some(data),
id: Some("test_id".to_string()),
event: None,
comment: None,
error: None,
};
let stream = Box::pin(stream::iter(vec![annotated_delta]));
let result = DeltaAggregator::apply(stream, ParsingOptions::default()).await;
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.choices.len(), 1);
let choice = &response.choices[0];
assert!(choice.message.tool_calls.is_some());
let tool_calls = choice.message.tool_calls.as_ref().unwrap();
assert_eq!(tool_calls.len(), 1);
assert_eq!(
choice.finish_reason,
Some(dynamo_async_openai::types::FinishReason::ToolCalls)
);
}
#[tokio::test]
async fn test_tool_calling_finish_reason_override_from_none() {
let tool_call_json = r#"{"name": "calculate", "arguments": {"expression": "2+2"}}"#;
let annotated_delta = create_test_delta(
0,
"I'll calculate that for you.",
Some(dynamo_async_openai::types::Role::Assistant),
None, None,
Some(tool_call_json),
);
let data = annotated_delta.data.unwrap();
let annotated_delta = Annotated {
data: Some(data),
id: Some("test_id".to_string()),
event: None,
comment: None,
error: None,
};
let stream = Box::pin(stream::iter(vec![annotated_delta]));
let result = DeltaAggregator::apply(stream, ParsingOptions::default()).await;
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.choices.len(), 1);
let choice = &response.choices[0];
assert!(choice.message.tool_calls.is_some());
let tool_calls = choice.message.tool_calls.as_ref().unwrap();
assert_eq!(tool_calls.len(), 1);
assert_eq!(
choice.finish_reason,
Some(dynamo_async_openai::types::FinishReason::ToolCalls)
);
}
#[tokio::test]
async fn test_no_tool_calling_preserves_original_finish_reason() {
let annotated_delta = create_test_delta(
0,
"This is a regular response without tool calls.",
Some(dynamo_async_openai::types::Role::Assistant),
Some(dynamo_async_openai::types::FinishReason::Stop),
None,
None, );
let data = annotated_delta.data.unwrap();
let annotated_delta = Annotated {
data: Some(data),
id: Some("test_id".to_string()),
event: None,
comment: None,
error: None,
};
let stream = Box::pin(stream::iter(vec![annotated_delta]));
let result = DeltaAggregator::apply(stream, ParsingOptions::default()).await;
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.choices.len(), 1);
let choice = &response.choices[0];
assert!(choice.message.tool_calls.is_none());
assert_eq!(
choice.finish_reason,
Some(dynamo_async_openai::types::FinishReason::Stop)
);
}
#[tokio::test]
async fn test_empty_tool_calls_preserves_original_finish_reason() {
let mut annotated_delta = create_test_delta(
0,
"Response with empty tool calls array.",
Some(dynamo_async_openai::types::Role::Assistant),
Some(dynamo_async_openai::types::FinishReason::Length),
None,
None,
);
if let Some(ref mut data) = annotated_delta.data {
data.choices[0].delta.tool_calls = Some(vec![]); }
let data = annotated_delta.data.unwrap();
let annotated_delta = Annotated {
data: Some(data),
id: Some("test_id".to_string()),
event: None,
comment: None,
error: None,
};
let stream = Box::pin(stream::iter(vec![annotated_delta]));
let result = DeltaAggregator::apply(stream, ParsingOptions::default()).await;
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.choices.len(), 1);
let choice = &response.choices[0];
assert!(choice.message.tool_calls.is_none());
assert_eq!(
choice.finish_reason,
Some(dynamo_async_openai::types::FinishReason::Length)
);
}
#[tokio::test]
async fn test_tool_calling_output() {
let tool_call_json = r#"{"name": "get_weather", "arguments": {"location": "San Francisco, CA", "unit": "fahrenheit"}}"#;
let annotated_delta = create_test_delta(
0,
"Hey Dude ! What's the weather in San Francisco in Fahrenheit?",
Some(dynamo_async_openai::types::Role::Assistant),
Some(dynamo_async_openai::types::FinishReason::ToolCalls),
None,
Some(tool_call_json),
);
let data = annotated_delta.data.unwrap();
let annotated_delta = Annotated {
data: Some(data),
id: Some("test_id".to_string()),
event: None,
comment: None,
error: None,
};
let stream = Box::pin(stream::iter(vec![annotated_delta]));
let result = DeltaAggregator::apply(stream, ParsingOptions::default()).await;
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.choices.len(), 1);
let choice = &response.choices[0];
assert!(choice.message.tool_calls.is_some());
let tool_calls = choice.message.tool_calls.as_ref().unwrap();
assert_eq!(tool_calls.len(), 1);
let tool_call = &tool_calls[0];
assert_eq!(tool_call.function.name, "get_weather");
let args: serde_json::Value = serde_json::from_str(&tool_call.function.arguments).unwrap();
assert_eq!(args["location"], "San Francisco, CA");
assert_eq!(args["unit"], "fahrenheit");
assert_eq!(
choice.message.content.as_ref().unwrap(),
&ChatCompletionMessageContent::Text(
"Hey Dude ! What's the weather in San Francisco in Fahrenheit?".to_string()
)
);
assert_eq!(
choice.finish_reason,
Some(dynamo_async_openai::types::FinishReason::ToolCalls)
);
assert_eq!(
choice.message.role,
dynamo_async_openai::types::Role::Assistant
);
}
#[tokio::test]
async fn test_tool_calling_finish_reason_override_from_stop_alternative() {
let tool_call_json =
r#"{"name": "get_weather", "arguments": {"location": "New York", "unit": "celsius"}}"#;
let annotated_delta = create_test_delta(
0,
"Getting weather for New York",
Some(dynamo_async_openai::types::Role::Assistant),
Some(dynamo_async_openai::types::FinishReason::Stop), None,
Some(tool_call_json),
);
let stream = Box::pin(stream::iter(vec![annotated_delta]));
let result = DeltaAggregator::apply(stream, ParsingOptions::default()).await;
assert!(result.is_ok());
let response = result.unwrap();
assert_eq!(response.choices.len(), 1);
let choice = &response.choices[0];
assert_eq!(
choice.finish_reason,
Some(dynamo_async_openai::types::FinishReason::ToolCalls)
);
assert!(choice.message.tool_calls.is_some());
let tool_calls = choice.message.tool_calls.as_ref().unwrap();
assert_eq!(tool_calls.len(), 1);
assert_eq!(tool_calls[0].function.name, "get_weather");
}
}