dynamo_llm/protocols/openai/chat_completions/
delta.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use super::{NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse};
17use crate::protocols::common;
18
19/// Provides a method for generating a [`DeltaGenerator`] from a chat completion request.
20impl NvCreateChatCompletionRequest {
21    /// Creates a [`DeltaGenerator`] instance based on the chat completion request.
22    ///
23    /// # Returns
24    /// * [`DeltaGenerator`] configured with model name and response options.
25    pub fn response_generator(&self) -> DeltaGenerator {
26        let options = DeltaGeneratorOptions {
27            enable_usage: true,
28            enable_logprobs: self.inner.logprobs.unwrap_or(false),
29        };
30
31        DeltaGenerator::new(self.inner.model.clone(), options)
32    }
33}
34
35/// Configuration options for the [`DeltaGenerator`], controlling response behavior.
36#[derive(Debug, Clone, Default)]
37pub struct DeltaGeneratorOptions {
38    /// Determines whether token usage statistics should be included in the response.
39    pub enable_usage: bool,
40    /// Determines whether log probabilities should be included in the response.
41    pub enable_logprobs: bool,
42}
43
44/// Generates incremental chat completion responses in a streaming fashion.
45#[derive(Debug, Clone)]
46pub struct DeltaGenerator {
47    /// Unique identifier for the chat completion session.
48    id: String,
49    /// Object type, representing a streamed chat completion response.
50    object: String,
51    /// Timestamp (Unix epoch) when the response was created.
52    created: u32,
53    /// Model name used for generating responses.
54    model: String,
55    /// Optional system fingerprint for version tracking.
56    system_fingerprint: Option<String>,
57    /// Optional service tier information for the response.
58    service_tier: Option<async_openai::types::ServiceTierResponse>,
59    /// Tracks token usage for the completion request.
60    usage: async_openai::types::CompletionUsage,
61    /// Counter tracking the number of messages issued.
62    msg_counter: u64,
63    /// Configuration options for response generation.
64    options: DeltaGeneratorOptions,
65}
66
67impl DeltaGenerator {
68    /// Creates a new [`DeltaGenerator`] instance with the specified model and options.
69    ///
70    /// # Arguments
71    /// * `model` - The model name used for response generation.
72    /// * `options` - Configuration options for enabling usage and log probabilities.
73    ///
74    /// # Returns
75    /// * A new instance of [`DeltaGenerator`].
76    pub fn new(model: String, options: DeltaGeneratorOptions) -> Self {
77        // SAFETY: Casting from `u64` to `u32` could lead to precision loss after `u32::MAX`,
78        // but this will not be an issue until 2106.
79        let now = std::time::SystemTime::now()
80            .duration_since(std::time::UNIX_EPOCH)
81            .unwrap()
82            .as_secs() as u32;
83
84        let usage = async_openai::types::CompletionUsage {
85            prompt_tokens: 0,
86            completion_tokens: 0,
87            total_tokens: 0,
88            prompt_tokens_details: None,
89            completion_tokens_details: None,
90        };
91
92        Self {
93            id: format!("chatcmpl-{}", uuid::Uuid::new_v4()),
94            object: "chat.completion.chunk".to_string(),
95            created: now,
96            model,
97            system_fingerprint: None,
98            service_tier: None,
99            usage,
100            msg_counter: 0,
101            options,
102        }
103    }
104
105    /// Updates the prompt token usage count.
106    ///
107    /// # Arguments
108    /// * `isl` - The number of prompt tokens used.
109    pub fn update_isl(&mut self, isl: u32) {
110        self.usage.prompt_tokens = isl;
111    }
112
113    /// Creates a choice within a chat completion response.
114    ///
115    /// # Arguments
116    /// * `index` - The index of the choice in the completion response.
117    /// * `text` - The text content for the response.
118    /// * `finish_reason` - The reason why the response finished (e.g., stop, length, etc.).
119    /// * `logprobs` - Optional log probabilities of the generated tokens.
120    ///
121    /// # Returns
122    /// * An [`async_openai::types::CreateChatCompletionStreamResponse`] instance representing the choice.
123    #[allow(deprecated)]
124    pub fn create_choice(
125        &self,
126        index: u32,
127        text: Option<String>,
128        finish_reason: Option<async_openai::types::FinishReason>,
129        logprobs: Option<async_openai::types::ChatChoiceLogprobs>,
130    ) -> async_openai::types::CreateChatCompletionStreamResponse {
131        // TODO: Update for tool calling
132        let delta = async_openai::types::ChatCompletionStreamResponseDelta {
133            role: if self.msg_counter == 0 {
134                Some(async_openai::types::Role::Assistant)
135            } else {
136                None
137            },
138            content: text,
139            tool_calls: None,
140            function_call: None,
141            refusal: None,
142        };
143
144        let choice = async_openai::types::ChatChoiceStream {
145            index,
146            delta,
147            finish_reason,
148            logprobs,
149        };
150
151        let choices = vec![choice];
152
153        async_openai::types::CreateChatCompletionStreamResponse {
154            id: self.id.clone(),
155            object: self.object.clone(),
156            created: self.created,
157            model: self.model.clone(),
158            system_fingerprint: self.system_fingerprint.clone(),
159            choices,
160            usage: if self.options.enable_usage {
161                Some(self.usage.clone())
162            } else {
163                None
164            },
165            service_tier: self.service_tier.clone(),
166        }
167    }
168}
169
170/// Implements the [`crate::protocols::openai::DeltaGeneratorExt`] trait for [`DeltaGenerator`], allowing
171/// it to transform backend responses into OpenAI-style streaming responses.
172impl crate::protocols::openai::DeltaGeneratorExt<NvCreateChatCompletionStreamResponse>
173    for DeltaGenerator
174{
175    /// Converts a backend response into a structured OpenAI-style streaming response.
176    ///
177    /// # Arguments
178    /// * `delta` - The backend response containing generated text and metadata.
179    ///
180    /// # Returns
181    /// * `Ok(NvCreateChatCompletionStreamResponse)` if conversion succeeds.
182    /// * `Err(anyhow::Error)` if an error occurs.
183    fn choice_from_postprocessor(
184        &mut self,
185        delta: crate::protocols::common::llm_backend::BackendOutput,
186    ) -> anyhow::Result<NvCreateChatCompletionStreamResponse> {
187        // Aggregate token usage if enabled.
188        if self.options.enable_usage {
189            self.usage.completion_tokens += delta.token_ids.len() as u32;
190        }
191
192        // TODO: Implement log probabilities aggregation.
193        let logprobs = None;
194
195        // Map backend finish reasons to OpenAI's finish reasons.
196        let finish_reason = match delta.finish_reason {
197            Some(common::FinishReason::EoS) => Some(async_openai::types::FinishReason::Stop),
198            Some(common::FinishReason::Stop) => Some(async_openai::types::FinishReason::Stop),
199            Some(common::FinishReason::Length) => Some(async_openai::types::FinishReason::Length),
200            Some(common::FinishReason::Cancelled) => Some(async_openai::types::FinishReason::Stop),
201            Some(common::FinishReason::Error(err_msg)) => {
202                return Err(anyhow::anyhow!(err_msg));
203            }
204            None => None,
205        };
206
207        // Create the streaming response.
208        let index = 0;
209        let stream_response = self.create_choice(index, delta.text, finish_reason, logprobs);
210
211        Ok(NvCreateChatCompletionStreamResponse {
212            inner: stream_response,
213        })
214    }
215
216    fn get_isl(&self) -> Option<u32> {
217        Some(self.usage.prompt_tokens)
218    }
219}