dynamo_llm/protocols/openai/chat_completions/
delta.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::{NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse};
5use crate::{
6    local_model::runtime_config::ModelRuntimeConfig,
7    protocols::common::{self},
8    types::TokenIdType,
9};
10use dynamo_parsers::{ParserResult, ReasoningParser, ReasoningParserType, ReasoningParserWrapper};
11
12/// Provides a method for generating a [`DeltaGenerator`] from a chat completion request.
13impl NvCreateChatCompletionRequest {
14    /// Creates a [`DeltaGenerator`] instance based on the chat completion request.
15    ///
16    /// # Arguments
17    /// * `request_id` - The request ID to use for the chat completion response ID.
18    ///
19    /// # Returns
20    /// * [`DeltaGenerator`] configured with model name and response options.
21    pub fn response_generator(&self, request_id: String) -> DeltaGenerator {
22        let options = DeltaGeneratorOptions {
23            enable_usage: self
24                .inner
25                .stream_options
26                .as_ref()
27                .map(|opts| opts.include_usage)
28                .unwrap_or(false),
29            enable_logprobs: self.inner.logprobs.unwrap_or(false)
30                || self.inner.top_logprobs.unwrap_or(0) > 0,
31            runtime_config: ModelRuntimeConfig::default(),
32        };
33
34        DeltaGenerator::new(self.inner.model.clone(), options, request_id)
35    }
36}
37
38/// Configuration options for the [`DeltaGenerator`], controlling response behavior.
39#[derive(Debug, Clone, Default)]
40pub struct DeltaGeneratorOptions {
41    /// Determines whether token usage statistics should be included in the response.
42    pub enable_usage: bool,
43    /// Determines whether log probabilities should be included in the response.
44    pub enable_logprobs: bool,
45
46    pub runtime_config: ModelRuntimeConfig,
47}
48
49/// Generates incremental chat completion responses in a streaming fashion.
50#[derive(Debug)]
51pub struct DeltaGenerator {
52    /// Unique identifier for the chat completion session.
53    id: String,
54    /// Object type, representing a streamed chat completion response.
55    object: String,
56    /// Timestamp (Unix epoch) when the response was created.
57    created: u32,
58    model: String,
59    /// Optional system fingerprint for version tracking.
60    system_fingerprint: Option<String>,
61    /// Optional service tier information for the response.
62    service_tier: Option<dynamo_async_openai::types::ServiceTierResponse>,
63    /// Tracks token usage for the completion request.
64    usage: dynamo_async_openai::types::CompletionUsage,
65    /// Counter tracking the number of messages issued.
66    msg_counter: u64,
67    /// Configuration options for response generation.
68    options: DeltaGeneratorOptions,
69
70    /// Reasoning Parser object
71    /// This is used to parse reasoning content in the response.
72    /// None means no reasoning parsing will be performed.
73    reasoning_parser: Option<ReasoningParserWrapper>,
74}
75
76impl DeltaGenerator {
77    /// Creates a new [`DeltaGenerator`] instance with the specified model and options.
78    ///
79    /// # Arguments
80    /// * `model` - The model name used for response generation.
81    /// * `options` - Configuration options for enabling usage and log probabilities.
82    /// * `request_id` - The request ID to use for the chat completion response.
83    ///
84    /// # Returns
85    /// * A new instance of [`DeltaGenerator`].
86    pub fn new(model: String, options: DeltaGeneratorOptions, request_id: String) -> Self {
87        let now = std::time::SystemTime::now()
88            .duration_since(std::time::UNIX_EPOCH)
89            .unwrap()
90            .as_secs();
91
92        // SAFETY: Casting from `u64` to `u32` could lead to precision loss after `u32::MAX`,
93        // but this will not be an issue until 2106.
94        let now: u32 = now.try_into().expect("timestamp exceeds u32::MAX");
95
96        let usage = dynamo_async_openai::types::CompletionUsage {
97            prompt_tokens: 0,
98            completion_tokens: 0,
99            total_tokens: 0,
100            prompt_tokens_details: None,
101            completion_tokens_details: None,
102        };
103
104        // Reasoning parser type
105        // If no parser is specified (None), no reasoning parsing will be performed
106        let reasoning_parser = options
107            .runtime_config
108            .reasoning_parser
109            .as_deref()
110            .map(ReasoningParserType::get_reasoning_parser_from_name);
111
112        let chatcmpl_id = format!("chatcmpl-{request_id}");
113
114        Self {
115            id: chatcmpl_id,
116            object: "chat.completion.chunk".to_string(),
117            created: now,
118            model,
119            system_fingerprint: None,
120            service_tier: None,
121            usage,
122            msg_counter: 0,
123            options,
124            reasoning_parser,
125        }
126    }
127
128    /// Update runtime configuration and reconfigure the reasoning parser accordingly.
129    pub fn set_reasoning_parser(&mut self, runtime_config: ModelRuntimeConfig) {
130        self.options.runtime_config = runtime_config.clone();
131        match self.options.runtime_config.reasoning_parser.as_deref() {
132            Some(name) => {
133                self.reasoning_parser =
134                    Some(ReasoningParserType::get_reasoning_parser_from_name(name));
135            }
136            None => {
137                self.reasoning_parser = None;
138            }
139        }
140    }
141
142    /// Updates the prompt token usage count.
143    ///
144    /// # Arguments
145    /// * `isl` - The number of prompt tokens used.
146    pub fn update_isl(&mut self, isl: u32) {
147        self.usage.prompt_tokens = isl;
148    }
149
150    pub fn create_logprobs(
151        &self,
152        tokens: Vec<common::llm_backend::TokenType>,
153        token_ids: &[TokenIdType],
154        logprobs: Option<common::llm_backend::LogProbs>,
155        top_logprobs: Option<common::llm_backend::TopLogprobs>,
156    ) -> Option<dynamo_async_openai::types::ChatChoiceLogprobs> {
157        if !self.options.enable_logprobs || logprobs.is_none() {
158            return None;
159        }
160
161        let toks = tokens
162            .into_iter()
163            .zip(token_ids)
164            .map(|(token, token_id)| (token.unwrap_or_default(), *token_id))
165            .collect::<Vec<(String, TokenIdType)>>();
166        let tok_lps = toks
167            .iter()
168            .zip(logprobs.unwrap())
169            .map(|(_, lp)| lp as f32)
170            .collect::<Vec<f32>>();
171
172        let content = top_logprobs.map(|top_logprobs| {
173            toks.iter()
174                .zip(tok_lps)
175                .zip(top_logprobs)
176                .map(|(((t, tid), lp), top_lps)| {
177                    let mut found_selected_token = false;
178                    let mut converted_top_lps = top_lps
179                        .iter()
180                        .map(|top_lp| {
181                            let top_t = top_lp.token.clone().unwrap_or_default();
182                            let top_tid = top_lp.token_id;
183                            found_selected_token = found_selected_token || top_tid == *tid;
184                            dynamo_async_openai::types::TopLogprobs {
185                                token: top_t,
186                                logprob: top_lp.logprob as f32,
187                                bytes: None,
188                            }
189                        })
190                        .collect::<Vec<dynamo_async_openai::types::TopLogprobs>>();
191                    if !found_selected_token {
192                        // If the selected token is not in the top logprobs, add it
193                        converted_top_lps.push(dynamo_async_openai::types::TopLogprobs {
194                            token: t.clone(),
195                            logprob: lp,
196                            bytes: None,
197                        });
198                    }
199                    dynamo_async_openai::types::ChatCompletionTokenLogprob {
200                        token: t.clone(),
201                        logprob: lp,
202                        bytes: None,
203                        top_logprobs: converted_top_lps,
204                    }
205                })
206                .collect()
207        });
208
209        Some(dynamo_async_openai::types::ChatChoiceLogprobs {
210            content,
211            refusal: None,
212        })
213    }
214
215    fn create_reasoning_content(
216        &mut self,
217        text: &Option<String>,
218        token_ids: &[u32],
219    ) -> Option<ParserResult> {
220        // If no reasoning parser is configured, return None
221        let reasoning_parser = self.reasoning_parser.as_mut()?;
222
223        let text_ref = text.as_deref().unwrap_or("");
224        if text_ref.is_empty() && token_ids.is_empty() {
225            return None;
226        }
227        let parser_result =
228            reasoning_parser.parse_reasoning_streaming_incremental(text_ref, token_ids);
229
230        Some(parser_result)
231    }
232
233    /// Creates a choice within a chat completion response.
234    ///
235    /// # Arguments
236    /// * `index` - The index of the choice in the completion response.
237    /// * `text` - The text content for the response.
238    /// * `finish_reason` - The reason why the response finished (e.g., stop, length, etc.).
239    /// * `logprobs` - Optional log probabilities of the generated tokens.
240    ///
241    /// # Returns
242    /// * An [`dynamo_async_openai::types::CreateChatCompletionStreamResponse`] instance representing the choice.
243    #[allow(deprecated)]
244    pub fn create_choice(
245        &mut self,
246        index: u32,
247        text: Option<String>,
248        reasoning_content: Option<String>,
249        finish_reason: Option<dynamo_async_openai::types::FinishReason>,
250        logprobs: Option<dynamo_async_openai::types::ChatChoiceLogprobs>,
251    ) -> NvCreateChatCompletionStreamResponse {
252        let delta = dynamo_async_openai::types::ChatCompletionStreamResponseDelta {
253            content: text,
254            function_call: None,
255            tool_calls: None,
256            role: if self.msg_counter == 0 {
257                Some(dynamo_async_openai::types::Role::Assistant)
258            } else {
259                None
260            },
261            refusal: None,
262            reasoning_content,
263        };
264
265        let choice = dynamo_async_openai::types::ChatChoiceStream {
266            index,
267            delta,
268            finish_reason,
269            logprobs,
270        };
271
272        let choices = vec![choice];
273
274        // According to OpenAI spec: when stream_options.include_usage is true,
275        // all intermediate chunks should have usage: null
276        // The final usage chunk will be sent separately with empty choices
277        dynamo_async_openai::types::CreateChatCompletionStreamResponse {
278            id: self.id.clone(),
279            object: self.object.clone(),
280            created: self.created,
281            model: self.model.clone(),
282            system_fingerprint: self.system_fingerprint.clone(),
283            choices,
284            usage: None, // Always None for chunks with content/choices
285            service_tier: self.service_tier.clone(),
286        }
287    }
288
289    /// Creates a final usage-only chunk for OpenAI compliance.
290    /// This should be sent after the last content chunk when stream_options.include_usage is true.
291    ///
292    /// # Returns
293    /// * A [`CreateChatCompletionStreamResponse`] with empty choices and usage stats.
294    pub fn create_usage_chunk(&self) -> NvCreateChatCompletionStreamResponse {
295        let mut usage = self.usage.clone();
296        usage.total_tokens = usage.prompt_tokens.saturating_add(usage.completion_tokens);
297
298        dynamo_async_openai::types::CreateChatCompletionStreamResponse {
299            id: self.id.clone(),
300            object: self.object.clone(),
301            created: self.created,
302            model: self.model.clone(),
303            system_fingerprint: self.system_fingerprint.clone(),
304            choices: vec![], // Empty choices for usage-only chunk
305            usage: Some(usage),
306            service_tier: self.service_tier.clone(),
307        }
308    }
309
310    /// Check if usage tracking is enabled
311    pub fn is_usage_enabled(&self) -> bool {
312        self.options.enable_usage
313    }
314}
315
316/// Implements the [`crate::protocols::openai::DeltaGeneratorExt`] trait for [`DeltaGenerator`], allowing
317/// it to transform backend responses into OpenAI-style streaming responses.
318impl crate::protocols::openai::DeltaGeneratorExt<NvCreateChatCompletionStreamResponse>
319    for DeltaGenerator
320{
321    /// Converts a backend response into a structured OpenAI-style streaming response.
322    ///
323    /// # Arguments
324    /// * `delta` - The backend response containing generated text and metadata.
325    ///
326    /// # Returns
327    /// * `Ok(NvCreateChatCompletionStreamResponse)` if conversion succeeds.
328    /// * `Err(anyhow::Error)` if an error occurs.
329    fn choice_from_postprocessor(
330        &mut self,
331        delta: crate::protocols::common::llm_backend::BackendOutput,
332    ) -> anyhow::Result<NvCreateChatCompletionStreamResponse> {
333        // Aggregate token usage if enabled.
334        if self.options.enable_usage {
335            // SAFETY: Casting from `usize` to `u32` could lead to precision loss after `u32::MAX`,
336            // but this will not be an issue until context lengths exceed 4_294_967_295.
337            let token_length: u32 = delta
338                .token_ids
339                .len()
340                .try_into()
341                .expect("token_ids length exceeds u32::MAX");
342
343            self.usage.completion_tokens += token_length;
344        }
345
346        let logprobs = self.create_logprobs(
347            delta.tokens,
348            &delta.token_ids,
349            delta.log_probs,
350            delta.top_logprobs,
351        );
352
353        // Map backend finish reasons to OpenAI's finish reasons.
354        let finish_reason = match delta.finish_reason {
355            Some(common::FinishReason::EoS) => Some(dynamo_async_openai::types::FinishReason::Stop),
356            Some(common::FinishReason::Stop) => {
357                Some(dynamo_async_openai::types::FinishReason::Stop)
358            }
359            Some(common::FinishReason::Length) => {
360                Some(dynamo_async_openai::types::FinishReason::Length)
361            }
362            Some(common::FinishReason::Cancelled) => {
363                Some(dynamo_async_openai::types::FinishReason::Stop)
364            }
365            Some(common::FinishReason::ContentFilter) => {
366                Some(dynamo_async_openai::types::FinishReason::ContentFilter)
367            }
368            Some(common::FinishReason::Error(err_msg)) => {
369                return Err(anyhow::anyhow!(err_msg));
370            }
371            None => None,
372        };
373
374        // Handle reasoning parsing if enabled, otherwise treat all text as normal
375        let (normal_text, reasoning_content) =
376            match self.create_reasoning_content(&delta.text, &delta.token_ids) {
377                Some(reasoning_parser_result) => (
378                    reasoning_parser_result.get_some_normal_text(),
379                    reasoning_parser_result.get_some_reasoning(),
380                ),
381                None => (delta.text, None),
382            };
383
384        // Create the streaming response.
385        let index = 0;
386        let stream_response = self.create_choice(
387            index,
388            normal_text,
389            reasoning_content,
390            finish_reason,
391            logprobs,
392        );
393
394        Ok(stream_response)
395    }
396
397    fn get_isl(&self) -> Option<u32> {
398        Some(self.usage.prompt_tokens)
399    }
400
401    fn create_usage_chunk(&self) -> NvCreateChatCompletionStreamResponse {
402        DeltaGenerator::create_usage_chunk(self)
403    }
404
405    fn is_usage_enabled(&self) -> bool {
406        DeltaGenerator::is_usage_enabled(self)
407    }
408}