next_web_ai/ai/deep_seek/
deep_seek_chat_model.rs

1use futures_core::stream::BoxStream;
2use futures_util::StreamExt;
3use next_web_core::{async_trait, convert::into_box::IntoBox, error::BoxError};
4
5use crate::{
6    ai::deep_seek::{
7        api::deep_seek_api::{
8            ChatCompletion, ChatCompletionMessage, ChatCompletionRequest, DeepSeekApi, DefaultUsage,
9        },
10        deep_seek_chat_options::DeepSeekChatOptions,
11    },
12    chat::{
13        messages::{assistant_message::AssistantMessage, message_type::MessageType},
14        meta_data::{
15            chat_response_meta_data::ChatResponseMetadata, empty_usage::EmptyUsage, usage::Usage,
16        },
17        model::{
18            chat_model::ChatModel, chat_response::ChatResponse, generation::Generation,
19            streaming_chat_model::StreamingChatModel,
20        },
21        observation::{
22            chat_model_observation_context::ChatModelObservationContext,
23            chat_model_observation_documentation::ChatModelObservationDocumentation,
24            default_chat_model_observation_convention::DefaultChatModelObservationConvention,
25        },
26        prompt::{
27            chat_options::{ChatOptions, DefaultChatOptions},
28            prompt::Prompt,
29        },
30    },
31    model::{model::Model, model_request::ModelRequest, streaming_model::StreamingModel},
32    observation::{
33        conventions::ai_provider::AiProvider, noop_observation_registry::NoopObservationRegistry,
34        observation::Observable, observation_documentation::ObservationDocumentation,
35        observation_registry::ObservationRegistryImpl,
36    },
37};
38
39use super::api::deep_seek_api::ChatApiRespnose;
40
41#[derive(Clone)]
42pub struct DeepSeekChatModel<T = DefaultChatModelObservationConvention, R = NoopObservationRegistry>
43{
44    // pub(crate) retry_template: RetryTemplate,
45    pub(crate) options: Option<DeepSeekChatOptions>,
46    pub(crate) api: DeepSeekApi,
47    pub(crate) observation_registry: R,
48    pub(crate) observation_convention: T,
49    // todo retry
50}
51
52impl DeepSeekChatModel {
53    pub fn new(api: DeepSeekApi, options: Option<DeepSeekChatOptions>) -> Self {
54        Self {
55            api,
56            options,
57            observation_registry: ObservationRegistryImpl::noop(),
58            observation_convention: Default::default(),
59        }
60    }
61
62    fn crate_request(
63        &self,
64        prompt: &Prompt,
65        stream: bool,
66    ) -> Result<ChatCompletionRequest, &'static str> {
67        let (system_messages, user_messages) = prompt.instructions().iter().fold(
68            (Vec::new(), Vec::new()),
69            |(mut sys, mut user), s| {
70                let msg = ChatCompletionMessage::new(s.message_type().as_ref(), s.text());
71                match s.message_type() {
72                    MessageType::System => sys.push(msg),
73                    _ => user.push(msg),
74                }
75                (sys, user)
76            },
77        );
78
79        if system_messages.len() > 1 {
80            return Err("Only one system message is allowed in the prompt");
81        }
82
83        let system_message = system_messages.first();
84
85        let request = ChatCompletionRequest {
86            messages: user_messages,
87            model: prompt.chat_options().get_model().into(),
88            stream,
89            temperature: None,
90        };
91
92        // if (this.defaultOptions != null) {
93        // 	request = ModelOptionsUtils.merge(this.defaultOptions, request, ChatCompletionRequest.class);
94        // }
95
96        // if (prompt.getOptions() != null) {
97        // 	var updatedRuntimeOptions = ModelOptionsUtils.copyToTarget(prompt.getOptions(), ChatOptions.class,
98        // 			QianFanChatOptions.class);
99        // 	request = ModelOptionsUtils.merge(updatedRuntimeOptions, request, ChatCompletionRequest.class);
100        // }
101
102        if let Some(options) = self.options.as_ref() {}
103
104        if let Some(options) = prompt.options() {}
105
106        Ok(request)
107    }
108
109    fn build_request_options(request: &ChatCompletionRequest) -> impl ChatOptions {
110        DefaultChatOptions {
111            model: request.model.to_string(),
112            ..Default::default()
113        }
114    }
115
116    fn to_metadata(chat_completion: &ChatCompletion, model: &str) -> ChatResponseMetadata {
117        let usage: Box<dyn Usage> = chat_completion
118            .usage
119            .as_ref()
120            .map(|u| Self::default_usage(u).into_boxed() as Box<dyn Usage>)
121            .unwrap_or_else(|| EmptyUsage.into_boxed());
122
123        ChatResponseMetadata {
124            id: chat_completion.id.to_owned(),
125            model: model.into(),
126            usage,
127        }
128    }
129
130    fn default_usage(usage: &super::api::deep_seek_api::Usage) -> DefaultUsage {
131        DefaultUsage {
132            prompt_tokens: usage.prompt_tokens,
133            completion_tokens: usage.completion_tokens,
134            total_tokens: usage.total_tokens,
135        }
136    }
137}
138
139#[async_trait]
140impl Model<Prompt, ChatResponse> for DeepSeekChatModel {
141    async fn call(&self, prompt: Prompt) -> Result<ChatResponse, BoxError> {
142        let req: ChatCompletionRequest = self.crate_request(&prompt, false)?;
143
144        // observe
145        let mut observation_context = ChatModelObservationContext::new(
146            prompt,
147            AiProvider::DeepSeek.to_string(),
148            Self::build_request_options(&req).into_boxed(),
149        );
150
151        let observation_convention = self.observation_convention.clone().into_boxed();
152        match ChatModelObservationDocumentation::ChatModelOperation.observation(
153            Some(observation_convention.clone()),
154            Some(observation_convention),
155            observation_context.clone(),
156            self.observation_registry.clone().into_boxed(),
157        ) {
158            Ok(observation) => observation,
159            Err(e) => return Err(e.into()),
160        }
161        .observe(async {
162            // execute
163            let chat_respnose = self.api.send(&req).await?;
164
165            let chat_completion = match chat_respnose {
166                ChatApiRespnose::Data(chat_completion) => chat_completion,
167                _ => return Err("Chat completion is not entity".into()),
168            };
169
170            let text_content = chat_completion
171                .choices
172                .first()
173                .and_then(|s| s.message.as_ref().and_then(|s1| Some(s1.content.clone())))
174                .unwrap_or_default()
175                .to_string();
176
177            let assistant_message = AssistantMessage {
178                text_content,
179                metadata: None,
180                message_type: MessageType::Assistant,
181            };
182
183            let generations = vec![Generation::new(assistant_message)];
184            let chat_response_meta_data = Self::to_metadata(&chat_completion, &req.model);
185
186            let chat_response = ChatResponse::new(chat_response_meta_data, generations);
187            observation_context.set_response(chat_response.clone());
188
189            Ok(chat_response)
190        })
191        .await
192    }
193}
194
195#[async_trait]
196impl StreamingModel<Prompt, ChatResponse> for DeepSeekChatModel {
197    async fn stream(
198        &self,
199        prompt: Prompt,
200    ) -> Result<BoxStream<'static, Result<ChatResponse, BoxError>>, BoxError> {
201        let req: ChatCompletionRequest = self.crate_request(&prompt, true)?;
202        // execute
203        let chat_respnose = self.api.send(&req).await?;
204
205        let stream = match chat_respnose {
206            ChatApiRespnose::DataStream(stream) => stream,
207            _ => return Err("Chat completion is not stream".into()),
208        };
209
210        let stream = stream.then(move |chat_completion| {
211            let model = req.model.clone();
212            async move {
213                let chat_completion = chat_completion?;
214
215                let generations: Vec<Generation> = chat_completion
216                    .iter()
217                    .flat_map(|response| &response.choices)
218                    .filter_map(|choice| choice.delta.as_ref())
219                    .map(|delta| {
220                        Generation::new(AssistantMessage {
221                            text_content: delta.content.to_string(),
222                            metadata: None,
223                            message_type: MessageType::Assistant,
224                        })
225                    })
226                    .collect();
227
228                Ok(ChatResponse::new(
229                    Self::to_metadata(chat_completion.last().unwrap(), &model),
230                    generations,
231                ))
232            }
233        });
234
235        Ok(stream.boxed())
236    }
237}
238
239impl ChatModel for DeepSeekChatModel {}
240impl StreamingChatModel for DeepSeekChatModel {}