1use std::time::Duration;
2
3use eventsource_stream::Eventsource;
4use futures::{StreamExt, stream::BoxStream};
5use reqwest::Client;
6
7use tracing::{debug, info, instrument, warn};
8
9use super::request::ApiRequest;
10use crate::error::{ApiError, Result};
11use crate::raw::{ChatCompletionChunk, ChatCompletionResponse};
12
13#[derive(Clone, Debug)]
15pub struct ApiClient {
16 token: String,
17 base_url: String,
18 client: Client,
19 timeout: Option<Duration>,
20}
21
22impl ApiClient {
23 #[instrument(level = "info", skip(token), fields(masked_token = tracing::field::Empty))]
25 pub fn new(token: impl Into<String>) -> Self {
26 let token_str = token.into();
28 info!(message = "creating ApiClient instance");
29 let client = Self {
30 token: token_str.clone(),
31 base_url: "https://api.deepseek.com".to_string(),
32 client: Client::new(),
33 timeout: None,
34 };
35 tracing::Span::current().record("masked_token", &"***");
37 client
38 }
39
40 pub fn with_base_url(mut self, base: impl Into<String>) -> Self {
42 self.base_url = base.into();
43 self
44 }
45
46 pub fn with_token(mut self, token: impl Into<String>) -> Self {
48 self.token = token.into();
49 self
50 }
51
52 pub fn with_timeout(mut self, t: Duration) -> Self {
54 self.timeout = Some(t);
55 self
56 }
57
58 #[instrument(level = "info", skip(self, req))]
60 pub async fn send(&self, req: ApiRequest) -> Result<ChatCompletionResponse> {
61 let raw = req.into_raw();
62 let url = format!("{}/chat/completions", self.base_url.trim_end_matches('/'));
63 debug!(method = "POST", %url, "sending non-streaming request");
64
65 let mut builder = self.client.post(&url).bearer_auth(&self.token).json(&raw);
66 if let Some(t) = self.timeout {
67 builder = builder.timeout(t);
68 debug!(timeout_ms = ?t.as_millis(), "request timeout set");
69 }
70
71 let resp = match builder.send().await {
72 Ok(r) => {
73 debug!("received HTTP response");
74 r
75 }
76 Err(e) => {
77 warn!(error = %e, "http send failed");
78 return Err(ApiError::Reqwest(e));
79 }
80 };
81
82 if !resp.status().is_success() {
83 let status = resp.status();
84 let text = resp.text().await.unwrap_or_else(|e| e.to_string());
85 warn!(%status, "non-success response");
86 return Err(ApiError::http_error(status, text));
87 }
88
89 let parsed = resp.json::<ChatCompletionResponse>().await.map_err(|e| {
90 warn!(error = %e, "failed to parse ChatCompletionResponse");
91 ApiError::Reqwest(e)
92 })?;
93 info!("request completed successfully");
94 Ok(parsed)
95 }
96
97 #[instrument(level = "info", skip(self, req))]
99 pub async fn send_stream(
100 &self,
101 req: ApiRequest,
102 ) -> Result<BoxStream<'_, std::result::Result<ChatCompletionChunk, ApiError>>> {
103 let mut raw = req.into_raw();
104 raw.stream = Some(true);
105
106 let url = format!("{}/chat/completions", self.base_url.trim_end_matches('/'));
107 debug!(method = "POST", %url, "sending streaming request");
108 let response = match self
109 .client
110 .post(&url)
111 .bearer_auth(&self.token)
112 .json(&raw)
113 .send()
114 .await
115 {
116 Ok(r) => r,
117 Err(e) => {
118 warn!(error = %e, "stream http send failed");
119 return Err(ApiError::Reqwest(e));
120 }
121 };
122
123 if !response.status().is_success() {
124 let status = response.status();
125 let text = response.text().await.unwrap_or_else(|e| e.to_string());
126 warn!(%status, "non-success response for stream");
127 return Err(ApiError::http_error(status, text));
128 }
129
130 let event_stream = response.bytes_stream().eventsource();
132 info!("stream connected; converting SSE to chunk stream");
133
134 let chunk_stream = event_stream.filter_map(|ev_res| async move {
136 match ev_res {
137 Ok(ev) => {
138 if ev.data == "[DONE]" {
139 debug!("received [DONE] event");
140 None
141 } else {
142 match serde_json::from_str::<ChatCompletionChunk>(&ev.data) {
143 Ok(chunk) => {
144 debug!("parsed chunk");
145 Some(Ok(chunk))
146 }
147 Err(e) => {
148 warn!(error = %e, "failed to parse chunk");
149 Some(Err(ApiError::Json(e)))
150 }
151 }
152 }
153 }
154 Err(e) => {
155 warn!(error = %e, "eventsource error");
156 Some(Err(ApiError::EventSource(e.to_string())))
157 }
158 }
159 });
160
161 Ok(chunk_stream.boxed())
163 }
164
165 #[instrument(level = "info", skip(self, req))]
171 pub async fn into_stream(
172 self,
173 req: ApiRequest,
174 ) -> Result<BoxStream<'static, std::result::Result<ChatCompletionChunk, ApiError>>> {
175 let mut raw = req.into_raw();
176 raw.stream = Some(true);
177
178 let url = format!("{}/chat/completions", self.base_url.trim_end_matches('/'));
179 debug!(method = "POST", %url, "sending streaming request (owned)");
180
181 let response = match self
182 .client
183 .post(&url)
184 .bearer_auth(&self.token)
185 .json(&raw)
186 .send()
187 .await
188 {
189 Ok(r) => r,
190 Err(e) => {
191 warn!(error = %e, "stream http send failed");
192 return Err(ApiError::Reqwest(e));
193 }
194 };
195
196 if !response.status().is_success() {
197 let status = response.status();
198 let text = response.text().await.unwrap_or_else(|e| e.to_string());
199 warn!(%status, "non-success response for stream (owned)");
200 return Err(ApiError::http_error(status, text));
201 }
202
203 let event_stream = response.bytes_stream().eventsource();
204 info!("stream connected (owned); converting SSE to chunk stream");
205
206 let chunk_stream = event_stream.filter_map(|ev_res| async move {
207 match ev_res {
208 Ok(ev) => {
209 if ev.data == "[DONE]" {
210 debug!("received [DONE] event");
211 None
212 } else {
213 match serde_json::from_str::<ChatCompletionChunk>(&ev.data) {
214 Ok(chunk) => Some(Ok(chunk)),
215 Err(e) => Some(Err(ApiError::Json(e))),
216 }
217 }
218 }
219 Err(e) => Some(Err(ApiError::EventSource(e.to_string()))),
220 }
221 });
222
223 Ok(chunk_stream.boxed())
224 }
225
226 #[instrument(level = "debug", skip(self, req))]
230 pub async fn stream_text(
231 &self,
232 req: ApiRequest,
233 ) -> Result<BoxStream<'_, std::result::Result<String, ApiError>>> {
234 debug!("creating text stream from chunk stream");
235 let chunk_stream = self.send_stream(req).await?;
236 let text_stream = chunk_stream.map(|item_res| match item_res {
237 Ok(chunk) => {
238 let s = chunk
239 .choices
240 .first()
241 .and_then(|c| c.delta.content.as_ref())
242 .cloned()
243 .unwrap_or_default();
244 debug!(fragment = %s, "yielding text fragment");
245 Ok(s)
246 }
247 Err(e) => {
248 warn!(error = %e, "yielding error from chunk stream");
249 Err(e)
250 }
251 });
252
253 Ok(text_stream.boxed())
254 }
255}