dynamo_llm/protocols/openai/chat_completions/
delta.rs1use super::{NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse};
5use crate::{
6 local_model::runtime_config::ModelRuntimeConfig,
7 protocols::common::{self},
8 types::TokenIdType,
9};
10use dynamo_parsers::{ParserResult, ReasoningParser, ReasoningParserType, ReasoningParserWrapper};
11
12impl NvCreateChatCompletionRequest {
14 pub fn response_generator(&self, request_id: String) -> DeltaGenerator {
22 let options = DeltaGeneratorOptions {
23 enable_usage: self
24 .inner
25 .stream_options
26 .as_ref()
27 .map(|opts| opts.include_usage)
28 .unwrap_or(false),
29 enable_logprobs: self.inner.logprobs.unwrap_or(false)
30 || self.inner.top_logprobs.unwrap_or(0) > 0,
31 runtime_config: ModelRuntimeConfig::default(),
32 };
33
34 DeltaGenerator::new(self.inner.model.clone(), options, request_id)
35 }
36}
37
38#[derive(Debug, Clone, Default)]
40pub struct DeltaGeneratorOptions {
41 pub enable_usage: bool,
43 pub enable_logprobs: bool,
45
46 pub runtime_config: ModelRuntimeConfig,
47}
48
49#[derive(Debug)]
51pub struct DeltaGenerator {
52 id: String,
54 object: String,
56 created: u32,
58 model: String,
59 system_fingerprint: Option<String>,
61 service_tier: Option<dynamo_async_openai::types::ServiceTierResponse>,
63 usage: dynamo_async_openai::types::CompletionUsage,
65 msg_counter: u64,
67 options: DeltaGeneratorOptions,
69
70 reasoning_parser: Option<ReasoningParserWrapper>,
74}
75
76impl DeltaGenerator {
77 pub fn new(model: String, options: DeltaGeneratorOptions, request_id: String) -> Self {
87 let now = std::time::SystemTime::now()
88 .duration_since(std::time::UNIX_EPOCH)
89 .unwrap()
90 .as_secs();
91
92 let now: u32 = now.try_into().expect("timestamp exceeds u32::MAX");
95
96 let usage = dynamo_async_openai::types::CompletionUsage {
97 prompt_tokens: 0,
98 completion_tokens: 0,
99 total_tokens: 0,
100 prompt_tokens_details: None,
101 completion_tokens_details: None,
102 };
103
104 let reasoning_parser = options
107 .runtime_config
108 .reasoning_parser
109 .as_deref()
110 .map(ReasoningParserType::get_reasoning_parser_from_name);
111
112 let chatcmpl_id = format!("chatcmpl-{request_id}");
113
114 Self {
115 id: chatcmpl_id,
116 object: "chat.completion.chunk".to_string(),
117 created: now,
118 model,
119 system_fingerprint: None,
120 service_tier: None,
121 usage,
122 msg_counter: 0,
123 options,
124 reasoning_parser,
125 }
126 }
127
128 pub fn set_reasoning_parser(&mut self, runtime_config: ModelRuntimeConfig) {
130 self.options.runtime_config = runtime_config.clone();
131 match self.options.runtime_config.reasoning_parser.as_deref() {
132 Some(name) => {
133 self.reasoning_parser =
134 Some(ReasoningParserType::get_reasoning_parser_from_name(name));
135 }
136 None => {
137 self.reasoning_parser = None;
138 }
139 }
140 }
141
142 pub fn update_isl(&mut self, isl: u32) {
147 self.usage.prompt_tokens = isl;
148 }
149
150 pub fn create_logprobs(
151 &self,
152 tokens: Vec<common::llm_backend::TokenType>,
153 token_ids: &[TokenIdType],
154 logprobs: Option<common::llm_backend::LogProbs>,
155 top_logprobs: Option<common::llm_backend::TopLogprobs>,
156 ) -> Option<dynamo_async_openai::types::ChatChoiceLogprobs> {
157 if !self.options.enable_logprobs || logprobs.is_none() {
158 return None;
159 }
160
161 let toks = tokens
162 .into_iter()
163 .zip(token_ids)
164 .map(|(token, token_id)| (token.unwrap_or_default(), *token_id))
165 .collect::<Vec<(String, TokenIdType)>>();
166 let tok_lps = toks
167 .iter()
168 .zip(logprobs.unwrap())
169 .map(|(_, lp)| lp as f32)
170 .collect::<Vec<f32>>();
171
172 let content = top_logprobs.map(|top_logprobs| {
173 toks.iter()
174 .zip(tok_lps)
175 .zip(top_logprobs)
176 .map(|(((t, tid), lp), top_lps)| {
177 let mut found_selected_token = false;
178 let mut converted_top_lps = top_lps
179 .iter()
180 .map(|top_lp| {
181 let top_t = top_lp.token.clone().unwrap_or_default();
182 let top_tid = top_lp.token_id;
183 found_selected_token = found_selected_token || top_tid == *tid;
184 dynamo_async_openai::types::TopLogprobs {
185 token: top_t,
186 logprob: top_lp.logprob as f32,
187 bytes: None,
188 }
189 })
190 .collect::<Vec<dynamo_async_openai::types::TopLogprobs>>();
191 if !found_selected_token {
192 converted_top_lps.push(dynamo_async_openai::types::TopLogprobs {
194 token: t.clone(),
195 logprob: lp,
196 bytes: None,
197 });
198 }
199 dynamo_async_openai::types::ChatCompletionTokenLogprob {
200 token: t.clone(),
201 logprob: lp,
202 bytes: None,
203 top_logprobs: converted_top_lps,
204 }
205 })
206 .collect()
207 });
208
209 Some(dynamo_async_openai::types::ChatChoiceLogprobs {
210 content,
211 refusal: None,
212 })
213 }
214
215 fn create_reasoning_content(
216 &mut self,
217 text: &Option<String>,
218 token_ids: &[u32],
219 ) -> Option<ParserResult> {
220 let reasoning_parser = self.reasoning_parser.as_mut()?;
222
223 let text_ref = text.as_deref().unwrap_or("");
224 if text_ref.is_empty() && token_ids.is_empty() {
225 return None;
226 }
227 let parser_result =
228 reasoning_parser.parse_reasoning_streaming_incremental(text_ref, token_ids);
229
230 Some(parser_result)
231 }
232
233 #[allow(deprecated)]
244 pub fn create_choice(
245 &mut self,
246 index: u32,
247 text: Option<String>,
248 reasoning_content: Option<String>,
249 finish_reason: Option<dynamo_async_openai::types::FinishReason>,
250 logprobs: Option<dynamo_async_openai::types::ChatChoiceLogprobs>,
251 ) -> NvCreateChatCompletionStreamResponse {
252 let delta = dynamo_async_openai::types::ChatCompletionStreamResponseDelta {
253 content: text,
254 function_call: None,
255 tool_calls: None,
256 role: if self.msg_counter == 0 {
257 Some(dynamo_async_openai::types::Role::Assistant)
258 } else {
259 None
260 },
261 refusal: None,
262 reasoning_content,
263 };
264
265 let choice = dynamo_async_openai::types::ChatChoiceStream {
266 index,
267 delta,
268 finish_reason,
269 logprobs,
270 };
271
272 let choices = vec![choice];
273
274 dynamo_async_openai::types::CreateChatCompletionStreamResponse {
278 id: self.id.clone(),
279 object: self.object.clone(),
280 created: self.created,
281 model: self.model.clone(),
282 system_fingerprint: self.system_fingerprint.clone(),
283 choices,
284 usage: None, service_tier: self.service_tier.clone(),
286 }
287 }
288
289 pub fn create_usage_chunk(&self) -> NvCreateChatCompletionStreamResponse {
295 let mut usage = self.usage.clone();
296 usage.total_tokens = usage.prompt_tokens.saturating_add(usage.completion_tokens);
297
298 dynamo_async_openai::types::CreateChatCompletionStreamResponse {
299 id: self.id.clone(),
300 object: self.object.clone(),
301 created: self.created,
302 model: self.model.clone(),
303 system_fingerprint: self.system_fingerprint.clone(),
304 choices: vec![], usage: Some(usage),
306 service_tier: self.service_tier.clone(),
307 }
308 }
309
310 pub fn is_usage_enabled(&self) -> bool {
312 self.options.enable_usage
313 }
314}
315
316impl crate::protocols::openai::DeltaGeneratorExt<NvCreateChatCompletionStreamResponse>
319 for DeltaGenerator
320{
321 fn choice_from_postprocessor(
330 &mut self,
331 delta: crate::protocols::common::llm_backend::BackendOutput,
332 ) -> anyhow::Result<NvCreateChatCompletionStreamResponse> {
333 if self.options.enable_usage {
335 let token_length: u32 = delta
338 .token_ids
339 .len()
340 .try_into()
341 .expect("token_ids length exceeds u32::MAX");
342
343 self.usage.completion_tokens += token_length;
344 }
345
346 let logprobs = self.create_logprobs(
347 delta.tokens,
348 &delta.token_ids,
349 delta.log_probs,
350 delta.top_logprobs,
351 );
352
353 let finish_reason = match delta.finish_reason {
355 Some(common::FinishReason::EoS) => Some(dynamo_async_openai::types::FinishReason::Stop),
356 Some(common::FinishReason::Stop) => {
357 Some(dynamo_async_openai::types::FinishReason::Stop)
358 }
359 Some(common::FinishReason::Length) => {
360 Some(dynamo_async_openai::types::FinishReason::Length)
361 }
362 Some(common::FinishReason::Cancelled) => {
363 Some(dynamo_async_openai::types::FinishReason::Stop)
364 }
365 Some(common::FinishReason::ContentFilter) => {
366 Some(dynamo_async_openai::types::FinishReason::ContentFilter)
367 }
368 Some(common::FinishReason::Error(err_msg)) => {
369 return Err(anyhow::anyhow!(err_msg));
370 }
371 None => None,
372 };
373
374 let (normal_text, reasoning_content) =
376 match self.create_reasoning_content(&delta.text, &delta.token_ids) {
377 Some(reasoning_parser_result) => (
378 reasoning_parser_result.get_some_normal_text(),
379 reasoning_parser_result.get_some_reasoning(),
380 ),
381 None => (delta.text, None),
382 };
383
384 let index = 0;
386 let stream_response = self.create_choice(
387 index,
388 normal_text,
389 reasoning_content,
390 finish_reason,
391 logprobs,
392 );
393
394 Ok(stream_response)
395 }
396
397 fn get_isl(&self) -> Option<u32> {
398 Some(self.usage.prompt_tokens)
399 }
400
401 fn create_usage_chunk(&self) -> NvCreateChatCompletionStreamResponse {
402 DeltaGenerator::create_usage_chunk(self)
403 }
404
405 fn is_usage_enabled(&self) -> bool {
406 DeltaGenerator::is_usage_enabled(self)
407 }
408}