1use std::time::Duration;
2
3use eventsource_stream::Eventsource;
4use futures::{StreamExt, stream::BoxStream};
5use reqwest::Client;
6
7use tracing::{debug, info, instrument, warn};
8
9use super::request::ApiRequest;
10use crate::error::{ApiError, Result};
11use crate::raw::{ChatCompletionChunk, ChatCompletionResponse};
12
13#[derive(Clone, Debug)]
15pub struct ApiClient {
16 token: String,
17 base_url: String,
18 client: Client,
19 timeout: Option<Duration>,
20}
21
22impl ApiClient {
23 #[instrument(level = "info", skip(token), fields(masked_token = tracing::field::Empty))]
25 pub fn new(token: impl Into<String>) -> Self {
26 Self::with_client(token, Client::new())
27 }
28
29 #[instrument(level = "info", skip(token), fields(masked_token = tracing::field::Empty))]
30 pub fn with_client(token: impl Into<String>, client: Client) -> Self {
31 let token_str = token.into();
33 info!(message = "creating ApiClient instance");
34 let client = Self {
35 token: token_str.clone(),
36 base_url: "https://api.deepseek.com".to_string(),
37 client,
38 timeout: None,
39 };
40 tracing::Span::current().record("masked_token", "***");
42 client
43 }
44
45 pub fn with_base_url(mut self, base: impl Into<String>) -> Self {
47 self.base_url = base.into();
48 self
49 }
50
51 pub fn with_token(mut self, token: impl Into<String>) -> Self {
53 self.token = token.into();
54 self
55 }
56
57 pub fn with_timeout(mut self, t: Duration) -> Self {
59 self.timeout = Some(t);
60 self
61 }
62
63 #[instrument(level = "info", skip(self, req))]
65 pub async fn send(&self, req: ApiRequest) -> Result<ChatCompletionResponse> {
66 let raw = req.into_raw();
67 let url = format!("{}/chat/completions", self.base_url.trim_end_matches('/'));
68 debug!(method = "POST", %url, "sending non-streaming request");
69
70 let mut builder = self.client.post(&url).bearer_auth(&self.token).json(&raw);
71 if let Some(t) = self.timeout {
72 builder = builder.timeout(t);
73 debug!(timeout_ms = ?t.as_millis(), "request timeout set");
74 }
75
76 let resp = match builder.send().await {
77 Ok(r) => {
78 debug!("received HTTP response");
79 r
80 }
81 Err(e) => {
82 warn!(error = %e, "http send failed");
83 return Err(ApiError::Reqwest(e));
84 }
85 };
86
87 if !resp.status().is_success() {
88 let status = resp.status();
89 let text = resp.text().await.unwrap_or_else(|e| e.to_string());
90 warn!(%status, "non-success response");
91 return Err(ApiError::http_error(status, text));
92 }
93
94 let parsed = resp.json::<ChatCompletionResponse>().await.map_err(|e| {
95 warn!(error = %e, "failed to parse ChatCompletionResponse");
96 ApiError::Reqwest(e)
97 })?;
98 info!("request completed successfully");
99 Ok(parsed)
100 }
101
102 #[instrument(level = "info", skip(self, req))]
104 pub async fn send_stream(
105 &self,
106 req: ApiRequest,
107 ) -> Result<BoxStream<'_, std::result::Result<ChatCompletionChunk, ApiError>>> {
108 let mut raw = req.into_raw();
109 raw.stream = Some(true);
110
111 let url = format!("{}/chat/completions", self.base_url.trim_end_matches('/'));
112 debug!(method = "POST", %url, "sending streaming request");
113 let response = match self
114 .client
115 .post(&url)
116 .bearer_auth(&self.token)
117 .json(&raw)
118 .send()
119 .await
120 {
121 Ok(r) => r,
122 Err(e) => {
123 warn!(error = %e, "stream http send failed");
124 return Err(ApiError::Reqwest(e));
125 }
126 };
127
128 if !response.status().is_success() {
129 let status = response.status();
130 let text = response.text().await.unwrap_or_else(|e| e.to_string());
131 warn!(%status, "non-success response for stream");
132 return Err(ApiError::http_error(status, text));
133 }
134
135 let event_stream = response.bytes_stream().eventsource();
137 info!("stream connected; converting SSE to chunk stream");
138
139 let chunk_stream = event_stream.filter_map(|ev_res| async move {
141 match ev_res {
142 Ok(ev) => {
143 if ev.data == "[DONE]" {
144 debug!("received [DONE] event");
145 None
146 } else {
147 match serde_json::from_str::<ChatCompletionChunk>(&ev.data) {
148 Ok(chunk) => {
149 debug!("parsed chunk");
150 Some(Ok(chunk))
151 }
152 Err(e) => {
153 warn!(error = %e, "failed to parse chunk");
154 Some(Err(ApiError::Json(e)))
155 }
156 }
157 }
158 }
159 Err(e) => {
160 warn!(error = %e, "eventsource error");
161 Some(Err(ApiError::EventSource(e.to_string())))
162 }
163 }
164 });
165
166 Ok(chunk_stream.boxed())
168 }
169
170 #[instrument(level = "info", skip(self, req))]
176 pub async fn into_stream(
177 self,
178 req: ApiRequest,
179 ) -> Result<BoxStream<'static, std::result::Result<ChatCompletionChunk, ApiError>>> {
180 let mut raw = req.into_raw();
181 raw.stream = Some(true);
182
183 let url = format!("{}/chat/completions", self.base_url.trim_end_matches('/'));
184 debug!(method = "POST", %url, "sending streaming request (owned)");
185
186 let response = match self
187 .client
188 .post(&url)
189 .bearer_auth(&self.token)
190 .json(&raw)
191 .send()
192 .await
193 {
194 Ok(r) => r,
195 Err(e) => {
196 warn!(error = %e, "stream http send failed");
197 return Err(ApiError::Reqwest(e));
198 }
199 };
200
201 if !response.status().is_success() {
202 let status = response.status();
203 let text = response.text().await.unwrap_or_else(|e| e.to_string());
204 warn!(%status, "non-success response for stream (owned)");
205 return Err(ApiError::http_error(status, text));
206 }
207
208 let event_stream = response.bytes_stream().eventsource();
209 info!("stream connected (owned); converting SSE to chunk stream");
210
211 let chunk_stream = event_stream.filter_map(|ev_res| async move {
212 match ev_res {
213 Ok(ev) => {
214 if ev.data == "[DONE]" {
215 debug!("received [DONE] event");
216 None
217 } else {
218 match serde_json::from_str::<ChatCompletionChunk>(&ev.data) {
219 Ok(chunk) => Some(Ok(chunk)),
220 Err(e) => Some(Err(ApiError::Json(e))),
221 }
222 }
223 }
224 Err(e) => Some(Err(ApiError::EventSource(e.to_string()))),
225 }
226 });
227
228 Ok(chunk_stream.boxed())
229 }
230
231 #[instrument(level = "debug", skip(self, req))]
235 pub async fn stream_text(
236 &self,
237 req: ApiRequest,
238 ) -> Result<BoxStream<'_, std::result::Result<String, ApiError>>> {
239 debug!("creating text stream from chunk stream");
240 let chunk_stream = self.send_stream(req).await?;
241 let text_stream = chunk_stream.map(|item_res| match item_res {
242 Ok(chunk) => {
243 let s = chunk
244 .choices
245 .first()
246 .and_then(|c| c.delta.content.as_ref())
247 .cloned()
248 .unwrap_or_default();
249 debug!(fragment = %s, "yielding text fragment");
250 Ok(s)
251 }
252 Err(e) => {
253 warn!(error = %e, "yielding error from chunk stream");
254 Err(e)
255 }
256 });
257
258 Ok(text_stream.boxed())
259 }
260}