next_web_ai/ai/deep_seek/
deep_seek_chat_model.rs1use futures_core::stream::BoxStream;
2use futures_util::StreamExt;
3use next_web_core::{async_trait, convert::into_box::IntoBox, error::BoxError};
4
5use crate::{
6 ai::deep_seek::{
7 api::deep_seek_api::{
8 ChatCompletion, ChatCompletionMessage, ChatCompletionRequest, DeepSeekApi, DefaultUsage,
9 },
10 deep_seek_chat_options::DeepSeekChatOptions,
11 },
12 chat::{
13 messages::{assistant_message::AssistantMessage, message_type::MessageType},
14 meta_data::{
15 chat_response_meta_data::ChatResponseMetadata, empty_usage::EmptyUsage, usage::Usage,
16 },
17 model::{
18 chat_model::ChatModel, chat_response::ChatResponse, generation::Generation,
19 streaming_chat_model::StreamingChatModel,
20 },
21 observation::{
22 chat_model_observation_context::ChatModelObservationContext,
23 chat_model_observation_documentation::ChatModelObservationDocumentation,
24 default_chat_model_observation_convention::DefaultChatModelObservationConvention,
25 },
26 prompt::{
27 chat_options::{ChatOptions, DefaultChatOptions},
28 prompt::Prompt,
29 },
30 },
31 model::{model::Model, model_request::ModelRequest, streaming_model::StreamingModel},
32 observation::{
33 conventions::ai_provider::AiProvider, noop_observation_registry::NoopObservationRegistry,
34 observation::Observable, observation_documentation::ObservationDocumentation,
35 observation_registry::ObservationRegistryImpl,
36 },
37};
38
39use super::api::deep_seek_api::ChatApiRespnose;
40
41#[derive(Clone)]
42pub struct DeepSeekChatModel<T = DefaultChatModelObservationConvention, R = NoopObservationRegistry>
43{
44 pub(crate) options: Option<DeepSeekChatOptions>,
46 pub(crate) api: DeepSeekApi,
47 pub(crate) observation_registry: R,
48 pub(crate) observation_convention: T,
49 }
51
52impl DeepSeekChatModel {
53 pub fn new(api: DeepSeekApi, options: Option<DeepSeekChatOptions>) -> Self {
54 Self {
55 api,
56 options,
57 observation_registry: ObservationRegistryImpl::noop(),
58 observation_convention: Default::default(),
59 }
60 }
61
62 fn crate_request(
63 &self,
64 prompt: &Prompt,
65 stream: bool,
66 ) -> Result<ChatCompletionRequest, &'static str> {
67 let (system_messages, user_messages) = prompt.instructions().iter().fold(
68 (Vec::new(), Vec::new()),
69 |(mut sys, mut user), s| {
70 let msg = ChatCompletionMessage::new(s.message_type().as_ref(), s.text());
71 match s.message_type() {
72 MessageType::System => sys.push(msg),
73 _ => user.push(msg),
74 }
75 (sys, user)
76 },
77 );
78
79 if system_messages.len() > 1 {
80 return Err("Only one system message is allowed in the prompt");
81 }
82
83 let system_message = system_messages.first();
84
85 let request = ChatCompletionRequest {
86 messages: user_messages,
87 model: prompt.chat_options().get_model().into(),
88 stream,
89 temperature: None,
90 };
91
92 if let Some(options) = self.options.as_ref() {}
103
104 if let Some(options) = prompt.options() {}
105
106 Ok(request)
107 }
108
109 fn build_request_options(request: &ChatCompletionRequest) -> impl ChatOptions {
110 DefaultChatOptions {
111 model: request.model.to_string(),
112 ..Default::default()
113 }
114 }
115
116 fn to_metadata(chat_completion: &ChatCompletion, model: &str) -> ChatResponseMetadata {
117 let usage: Box<dyn Usage> = chat_completion
118 .usage
119 .as_ref()
120 .map(|u| Self::default_usage(u).into_boxed() as Box<dyn Usage>)
121 .unwrap_or_else(|| EmptyUsage.into_boxed());
122
123 ChatResponseMetadata {
124 id: chat_completion.id.to_owned(),
125 model: model.into(),
126 usage,
127 }
128 }
129
130 fn default_usage(usage: &super::api::deep_seek_api::Usage) -> DefaultUsage {
131 DefaultUsage {
132 prompt_tokens: usage.prompt_tokens,
133 completion_tokens: usage.completion_tokens,
134 total_tokens: usage.total_tokens,
135 }
136 }
137}
138
139#[async_trait]
140impl Model<Prompt, ChatResponse> for DeepSeekChatModel {
141 async fn call(&self, prompt: Prompt) -> Result<ChatResponse, BoxError> {
142 let req: ChatCompletionRequest = self.crate_request(&prompt, false)?;
143
144 let mut observation_context = ChatModelObservationContext::new(
146 prompt,
147 AiProvider::DeepSeek.to_string(),
148 Self::build_request_options(&req).into_boxed(),
149 );
150
151 let observation_convention = self.observation_convention.clone().into_boxed();
152 match ChatModelObservationDocumentation::ChatModelOperation.observation(
153 Some(observation_convention.clone()),
154 Some(observation_convention),
155 observation_context.clone(),
156 self.observation_registry.clone().into_boxed(),
157 ) {
158 Ok(observation) => observation,
159 Err(e) => return Err(e.into()),
160 }
161 .observe(async {
162 let chat_respnose = self.api.send(&req).await?;
164
165 let chat_completion = match chat_respnose {
166 ChatApiRespnose::Data(chat_completion) => chat_completion,
167 _ => return Err("Chat completion is not entity".into()),
168 };
169
170 let text_content = chat_completion
171 .choices
172 .first()
173 .and_then(|s| s.message.as_ref().and_then(|s1| Some(s1.content.clone())))
174 .unwrap_or_default()
175 .to_string();
176
177 let assistant_message = AssistantMessage {
178 text_content,
179 metadata: None,
180 message_type: MessageType::Assistant,
181 };
182
183 let generations = vec![Generation::new(assistant_message)];
184 let chat_response_meta_data = Self::to_metadata(&chat_completion, &req.model);
185
186 let chat_response = ChatResponse::new(chat_response_meta_data, generations);
187 observation_context.set_response(chat_response.clone());
188
189 Ok(chat_response)
190 })
191 .await
192 }
193}
194
195#[async_trait]
196impl StreamingModel<Prompt, ChatResponse> for DeepSeekChatModel {
197 async fn stream(
198 &self,
199 prompt: Prompt,
200 ) -> Result<BoxStream<'static, Result<ChatResponse, BoxError>>, BoxError> {
201 let req: ChatCompletionRequest = self.crate_request(&prompt, true)?;
202 let chat_respnose = self.api.send(&req).await?;
204
205 let stream = match chat_respnose {
206 ChatApiRespnose::DataStream(stream) => stream,
207 _ => return Err("Chat completion is not stream".into()),
208 };
209
210 let stream = stream.then(move |chat_completion| {
211 let model = req.model.clone();
212 async move {
213 let chat_completion = chat_completion?;
214
215 let generations: Vec<Generation> = chat_completion
216 .iter()
217 .flat_map(|response| &response.choices)
218 .filter_map(|choice| choice.delta.as_ref())
219 .map(|delta| {
220 Generation::new(AssistantMessage {
221 text_content: delta.content.to_string(),
222 metadata: None,
223 message_type: MessageType::Assistant,
224 })
225 })
226 .collect();
227
228 Ok(ChatResponse::new(
229 Self::to_metadata(chat_completion.last().unwrap(), &model),
230 generations,
231 ))
232 }
233 });
234
235 Ok(stream.boxed())
236 }
237}
238
239impl ChatModel for DeepSeekChatModel {}
240impl StreamingChatModel for DeepSeekChatModel {}