dynamo_llm/protocols/openai/completions/
delta.rs1use super::{NvCreateCompletionRequest, NvCreateCompletionResponse};
5use crate::{protocols::common, types::TokenIdType};
6
7impl NvCreateCompletionRequest {
8 pub fn response_generator(&self, request_id: String) -> DeltaGenerator {
11 let options = DeltaGeneratorOptions {
12 enable_usage: self
13 .inner
14 .stream_options
15 .as_ref()
16 .map(|opts| opts.include_usage)
17 .unwrap_or(false),
18 enable_logprobs: self.inner.logprobs.unwrap_or(0) > 0,
19 };
20
21 DeltaGenerator::new(self.inner.model.clone(), options, request_id)
22 }
23}
24
25#[derive(Debug, Clone, Default)]
26pub struct DeltaGeneratorOptions {
27 pub enable_usage: bool,
28 pub enable_logprobs: bool,
29}
30
31#[derive(Debug, Clone)]
32pub struct DeltaGenerator {
33 id: String,
34 object: String,
35 created: u32,
36 model: String,
37 system_fingerprint: Option<String>,
38 usage: dynamo_async_openai::types::CompletionUsage,
39 options: DeltaGeneratorOptions,
40}
41
42impl DeltaGenerator {
43 pub fn new(model: String, options: DeltaGeneratorOptions, request_id: String) -> Self {
44 let now = std::time::SystemTime::now()
45 .duration_since(std::time::UNIX_EPOCH)
46 .unwrap()
47 .as_secs();
48
49 let now: u32 = now.try_into().expect("timestamp exceeds u32::MAX");
52
53 let usage = dynamo_async_openai::types::CompletionUsage {
56 completion_tokens: 0,
57 prompt_tokens: 0,
58 total_tokens: 0,
59 completion_tokens_details: None,
60 prompt_tokens_details: None,
61 };
62
63 let completion_id = format!("cmpl-{request_id}");
64
65 Self {
66 id: completion_id,
67 object: "text_completion".to_string(),
68 created: now,
69 model,
70 system_fingerprint: None,
71 usage,
72 options,
73 }
74 }
75
76 pub fn update_isl(&mut self, isl: u32) {
77 self.usage.prompt_tokens = isl;
78 }
79
80 pub fn create_logprobs(
81 &self,
82 tokens: Vec<common::llm_backend::TokenType>,
83 token_ids: Vec<TokenIdType>,
84 logprobs: Option<common::llm_backend::LogProbs>,
85 top_logprobs: Option<common::llm_backend::TopLogprobs>,
86 ) -> Option<dynamo_async_openai::types::Logprobs> {
87 if !self.options.enable_logprobs || logprobs.is_none() {
88 return None;
89 }
90
91 let toks = tokens
92 .into_iter()
93 .zip(token_ids)
94 .map(|(token, token_id)| (token.unwrap_or_default(), token_id))
95 .collect::<Vec<(String, TokenIdType)>>();
96 let tok_lps = toks
97 .iter()
98 .zip(logprobs.unwrap())
99 .map(|(_, lp)| lp as f32)
100 .collect::<Vec<f32>>();
101
102 let top_lps = top_logprobs.map_or(vec![], |top_logprobs| {
103 toks.iter()
104 .zip(tok_lps.iter())
105 .zip(top_logprobs.iter())
106 .map(|(((t, tid), lp), top_lps)| {
107 let mut found_selected_token = false;
108 let mut converted_top_lps = top_lps
109 .iter()
110 .map(|top_lp| {
111 let top_t = top_lp.token.clone().unwrap_or_default();
112 let top_tid = top_lp.token_id;
113 found_selected_token = found_selected_token || top_tid == *tid;
114 dynamo_async_openai::types::TopLogprobs {
115 token: top_t,
116 logprob: top_lp.logprob as f32,
117 bytes: None,
118 }
119 })
120 .collect::<Vec<dynamo_async_openai::types::TopLogprobs>>();
121 if !found_selected_token {
122 converted_top_lps.push(dynamo_async_openai::types::TopLogprobs {
124 token: t.clone(),
125 logprob: *lp,
126 bytes: None,
127 });
128 }
129 serde_json::to_value(converted_top_lps).unwrap()
130 })
131 .collect()
132 });
133
134 Some(dynamo_async_openai::types::Logprobs {
135 tokens: toks.iter().map(|(t, _)| t.clone()).collect(),
136 token_logprobs: tok_lps.into_iter().map(Some).collect(),
137 text_offset: vec![],
138 top_logprobs: top_lps,
139 })
140 }
141
142 pub fn create_choice(
143 &self,
144 index: u32,
145 text: Option<String>,
146 finish_reason: Option<dynamo_async_openai::types::CompletionFinishReason>,
147 logprobs: Option<dynamo_async_openai::types::Logprobs>,
148 ) -> NvCreateCompletionResponse {
149 let inner = dynamo_async_openai::types::CreateCompletionResponse {
155 id: self.id.clone(),
156 object: self.object.clone(),
157 created: self.created,
158 model: self.model.clone(),
159 system_fingerprint: self.system_fingerprint.clone(),
160 choices: vec![dynamo_async_openai::types::Choice {
161 text: text.unwrap_or_default(),
162 index,
163 finish_reason,
164 logprobs,
165 }],
166 usage: None, };
168
169 NvCreateCompletionResponse { inner }
170 }
171
172 pub fn create_usage_chunk(&self) -> NvCreateCompletionResponse {
178 let mut usage = self.usage.clone();
179 usage.total_tokens = usage.prompt_tokens.saturating_add(usage.completion_tokens);
180
181 let inner = dynamo_async_openai::types::CreateCompletionResponse {
182 id: self.id.clone(),
183 object: self.object.clone(),
184 created: self.created,
185 model: self.model.clone(),
186 system_fingerprint: self.system_fingerprint.clone(),
187 choices: vec![], usage: Some(usage),
189 };
190
191 NvCreateCompletionResponse { inner }
192 }
193
194 pub fn is_usage_enabled(&self) -> bool {
196 self.options.enable_usage
197 }
198}
199
200impl crate::protocols::openai::DeltaGeneratorExt<NvCreateCompletionResponse> for DeltaGenerator {
201 fn choice_from_postprocessor(
202 &mut self,
203 delta: common::llm_backend::BackendOutput,
204 ) -> anyhow::Result<NvCreateCompletionResponse> {
205 if self.options.enable_usage {
207 let token_length: u32 = delta
210 .token_ids
211 .len()
212 .try_into()
213 .expect("token_ids length exceeds u32::MAX");
214
215 self.usage.completion_tokens += token_length;
216 }
217
218 let logprobs = self.create_logprobs(
219 delta.tokens,
220 delta.token_ids,
221 delta.log_probs,
222 delta.top_logprobs,
223 );
224
225 let finish_reason = delta.finish_reason.map(Into::into);
226
227 let index = delta.index.unwrap_or(0);
229 let response = self.create_choice(index, delta.text.clone(), finish_reason, logprobs);
230 Ok(response)
231 }
232
233 fn get_isl(&self) -> Option<u32> {
234 Some(self.usage.prompt_tokens)
235 }
236
237 fn create_usage_chunk(&self) -> NvCreateCompletionResponse {
238 DeltaGenerator::create_usage_chunk(self)
239 }
240
241 fn is_usage_enabled(&self) -> bool {
242 DeltaGenerator::is_usage_enabled(self)
243 }
244}