dynamo_llm/protocols/openai/completions/
delta.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::{NvCreateCompletionRequest, NvCreateCompletionResponse};
5use crate::{protocols::common, types::TokenIdType};
6
7impl NvCreateCompletionRequest {
8    // put this method on the request
9    // inspect the request to extract options
10    pub fn response_generator(&self, request_id: String) -> DeltaGenerator {
11        let options = DeltaGeneratorOptions {
12            enable_usage: self
13                .inner
14                .stream_options
15                .as_ref()
16                .map(|opts| opts.include_usage)
17                .unwrap_or(false),
18            enable_logprobs: self.inner.logprobs.unwrap_or(0) > 0,
19        };
20
21        DeltaGenerator::new(self.inner.model.clone(), options, request_id)
22    }
23}
24
25#[derive(Debug, Clone, Default)]
26pub struct DeltaGeneratorOptions {
27    pub enable_usage: bool,
28    pub enable_logprobs: bool,
29}
30
31#[derive(Debug, Clone)]
32pub struct DeltaGenerator {
33    id: String,
34    object: String,
35    created: u32,
36    model: String,
37    system_fingerprint: Option<String>,
38    usage: dynamo_async_openai::types::CompletionUsage,
39    options: DeltaGeneratorOptions,
40}
41
42impl DeltaGenerator {
43    pub fn new(model: String, options: DeltaGeneratorOptions, request_id: String) -> Self {
44        let now = std::time::SystemTime::now()
45            .duration_since(std::time::UNIX_EPOCH)
46            .unwrap()
47            .as_secs();
48
49        // SAFETY: Casting from `u64` to `u32` could lead to precision loss after `u32::MAX`,
50        // but this will not be an issue until 2106.
51        let now: u32 = now.try_into().expect("timestamp exceeds u32::MAX");
52
53        // Previously, our home-rolled CompletionUsage impl'd Default
54        // PR !387 - https://github.com/64bit/async-openai/pull/387
55        let usage = dynamo_async_openai::types::CompletionUsage {
56            completion_tokens: 0,
57            prompt_tokens: 0,
58            total_tokens: 0,
59            completion_tokens_details: None,
60            prompt_tokens_details: None,
61        };
62
63        let completion_id = format!("cmpl-{request_id}");
64
65        Self {
66            id: completion_id,
67            object: "text_completion".to_string(),
68            created: now,
69            model,
70            system_fingerprint: None,
71            usage,
72            options,
73        }
74    }
75
76    pub fn update_isl(&mut self, isl: u32) {
77        self.usage.prompt_tokens = isl;
78    }
79
80    pub fn create_logprobs(
81        &self,
82        tokens: Vec<common::llm_backend::TokenType>,
83        token_ids: Vec<TokenIdType>,
84        logprobs: Option<common::llm_backend::LogProbs>,
85        top_logprobs: Option<common::llm_backend::TopLogprobs>,
86    ) -> Option<dynamo_async_openai::types::Logprobs> {
87        if !self.options.enable_logprobs || logprobs.is_none() {
88            return None;
89        }
90
91        let toks = tokens
92            .into_iter()
93            .zip(token_ids)
94            .map(|(token, token_id)| (token.unwrap_or_default(), token_id))
95            .collect::<Vec<(String, TokenIdType)>>();
96        let tok_lps = toks
97            .iter()
98            .zip(logprobs.unwrap())
99            .map(|(_, lp)| lp as f32)
100            .collect::<Vec<f32>>();
101
102        let top_lps = top_logprobs.map_or(vec![], |top_logprobs| {
103            toks.iter()
104                .zip(tok_lps.iter())
105                .zip(top_logprobs.iter())
106                .map(|(((t, tid), lp), top_lps)| {
107                    let mut found_selected_token = false;
108                    let mut converted_top_lps = top_lps
109                        .iter()
110                        .map(|top_lp| {
111                            let top_t = top_lp.token.clone().unwrap_or_default();
112                            let top_tid = top_lp.token_id;
113                            found_selected_token = found_selected_token || top_tid == *tid;
114                            dynamo_async_openai::types::TopLogprobs {
115                                token: top_t,
116                                logprob: top_lp.logprob as f32,
117                                bytes: None,
118                            }
119                        })
120                        .collect::<Vec<dynamo_async_openai::types::TopLogprobs>>();
121                    if !found_selected_token {
122                        // If the selected token is not in the top logprobs, add it
123                        converted_top_lps.push(dynamo_async_openai::types::TopLogprobs {
124                            token: t.clone(),
125                            logprob: *lp,
126                            bytes: None,
127                        });
128                    }
129                    serde_json::to_value(converted_top_lps).unwrap()
130                })
131                .collect()
132        });
133
134        Some(dynamo_async_openai::types::Logprobs {
135            tokens: toks.iter().map(|(t, _)| t.clone()).collect(),
136            token_logprobs: tok_lps.into_iter().map(Some).collect(),
137            text_offset: vec![],
138            top_logprobs: top_lps,
139        })
140    }
141
142    pub fn create_choice(
143        &self,
144        index: u32,
145        text: Option<String>,
146        finish_reason: Option<dynamo_async_openai::types::CompletionFinishReason>,
147        logprobs: Option<dynamo_async_openai::types::Logprobs>,
148    ) -> NvCreateCompletionResponse {
149        // todo - update for tool calling
150
151        // According to OpenAI spec: when stream_options.include_usage is true,
152        // all intermediate chunks should have usage: null
153        // The final usage chunk will be sent separately with empty choices
154        let inner = dynamo_async_openai::types::CreateCompletionResponse {
155            id: self.id.clone(),
156            object: self.object.clone(),
157            created: self.created,
158            model: self.model.clone(),
159            system_fingerprint: self.system_fingerprint.clone(),
160            choices: vec![dynamo_async_openai::types::Choice {
161                text: text.unwrap_or_default(),
162                index,
163                finish_reason,
164                logprobs,
165            }],
166            usage: None, // Always None for chunks with content/choices
167        };
168
169        NvCreateCompletionResponse { inner }
170    }
171
172    /// Creates a final usage-only chunk for OpenAI compliance.
173    /// This should be sent after the last content chunk when stream_options.include_usage is true.
174    ///
175    /// # Returns
176    /// * A [`NvCreateCompletionResponse`] with empty choices and usage stats.
177    pub fn create_usage_chunk(&self) -> NvCreateCompletionResponse {
178        let mut usage = self.usage.clone();
179        usage.total_tokens = usage.prompt_tokens.saturating_add(usage.completion_tokens);
180
181        let inner = dynamo_async_openai::types::CreateCompletionResponse {
182            id: self.id.clone(),
183            object: self.object.clone(),
184            created: self.created,
185            model: self.model.clone(),
186            system_fingerprint: self.system_fingerprint.clone(),
187            choices: vec![], // Empty choices for usage-only chunk
188            usage: Some(usage),
189        };
190
191        NvCreateCompletionResponse { inner }
192    }
193
194    /// Check if usage tracking is enabled
195    pub fn is_usage_enabled(&self) -> bool {
196        self.options.enable_usage
197    }
198}
199
200impl crate::protocols::openai::DeltaGeneratorExt<NvCreateCompletionResponse> for DeltaGenerator {
201    fn choice_from_postprocessor(
202        &mut self,
203        delta: common::llm_backend::BackendOutput,
204    ) -> anyhow::Result<NvCreateCompletionResponse> {
205        // aggregate usage
206        if self.options.enable_usage {
207            // SAFETY: Casting from `usize` to `u32` could lead to precision loss after `u32::MAX`,
208            // but this will not be an issue until context lengths exceed 4_294_967_295.
209            let token_length: u32 = delta
210                .token_ids
211                .len()
212                .try_into()
213                .expect("token_ids length exceeds u32::MAX");
214
215            self.usage.completion_tokens += token_length;
216        }
217
218        let logprobs = self.create_logprobs(
219            delta.tokens,
220            delta.token_ids,
221            delta.log_probs,
222            delta.top_logprobs,
223        );
224
225        let finish_reason = delta.finish_reason.map(Into::into);
226
227        // create choice
228        let index = delta.index.unwrap_or(0);
229        let response = self.create_choice(index, delta.text.clone(), finish_reason, logprobs);
230        Ok(response)
231    }
232
233    fn get_isl(&self) -> Option<u32> {
234        Some(self.usage.prompt_tokens)
235    }
236
237    fn create_usage_chunk(&self) -> NvCreateCompletionResponse {
238        DeltaGenerator::create_usage_chunk(self)
239    }
240
241    fn is_usage_enabled(&self) -> bool {
242        DeltaGenerator::is_usage_enabled(self)
243    }
244}