Skip to main content

ds_api/api/
client.rs

1use std::time::Duration;
2
3use eventsource_stream::Eventsource;
4use futures::{StreamExt, stream::BoxStream};
5use reqwest::Client;
6
7use tracing::{debug, info, instrument, warn};
8
9use super::request::ApiRequest;
10use crate::error::{ApiError, Result};
11use crate::raw::{ChatCompletionChunk, ChatCompletionResponse};
12
13/// Lightweight API HTTP client.
14#[derive(Clone, Debug)]
15pub struct ApiClient {
16    token: String,
17    base_url: String,
18    client: Client,
19    timeout: Option<Duration>,
20}
21
22impl ApiClient {
23    /// Create a new client with the given token.
24    #[instrument(level = "info", skip(token), fields(masked_token = tracing::field::Empty))]
25    pub fn new(token: impl Into<String>) -> Self {
26        // avoid recording the raw token value in traces; we mark a field instead
27        let token_str = token.into();
28        info!(message = "creating ApiClient instance");
29        let client = Self {
30            token: token_str.clone(),
31            base_url: "https://api.deepseek.com".to_string(),
32            client: Client::new(),
33            timeout: None,
34        };
35        // annotate the trace/span with a masked token indicator (presence only)
36        tracing::Span::current().record("masked_token", &"***");
37        client
38    }
39
40    /// Replace base URL (builder style).
41    pub fn with_base_url(mut self, base: impl Into<String>) -> Self {
42        self.base_url = base.into();
43        self
44    }
45
46    /// Replace token (builder style).
47    pub fn with_token(mut self, token: impl Into<String>) -> Self {
48        self.token = token.into();
49        self
50    }
51
52    /// Set optional timeout for non-streaming requests.
53    pub fn with_timeout(mut self, t: Duration) -> Self {
54        self.timeout = Some(t);
55        self
56    }
57
58    /// Send a non-streaming request and parse the full ChatCompletionResponse.
59    #[instrument(level = "info", skip(self, req))]
60    pub async fn send(&self, req: ApiRequest) -> Result<ChatCompletionResponse> {
61        let raw = req.into_raw();
62        let url = format!("{}/chat/completions", self.base_url.trim_end_matches('/'));
63        debug!(method = "POST", %url, "sending non-streaming request");
64
65        let mut builder = self.client.post(&url).bearer_auth(&self.token).json(&raw);
66        if let Some(t) = self.timeout {
67            builder = builder.timeout(t);
68            debug!(timeout_ms = ?t.as_millis(), "request timeout set");
69        }
70
71        let resp = match builder.send().await {
72            Ok(r) => {
73                debug!("received HTTP response");
74                r
75            }
76            Err(e) => {
77                warn!(error = %e, "http send failed");
78                return Err(ApiError::Reqwest(e));
79            }
80        };
81
82        if !resp.status().is_success() {
83            let status = resp.status();
84            let text = resp.text().await.unwrap_or_else(|e| e.to_string());
85            warn!(%status, "non-success response");
86            return Err(ApiError::http_error(status, text));
87        }
88
89        let parsed = resp.json::<ChatCompletionResponse>().await.map_err(|e| {
90            warn!(error = %e, "failed to parse ChatCompletionResponse");
91            ApiError::Reqwest(e)
92        })?;
93        info!("request completed successfully");
94        Ok(parsed)
95    }
96
97    /// Send a streaming (SSE) request and return a boxed pinned stream of parsed `ChatCompletionChunk`.
98    #[instrument(level = "info", skip(self, req))]
99    pub async fn send_stream(
100        &self,
101        req: ApiRequest,
102    ) -> Result<BoxStream<'_, std::result::Result<ChatCompletionChunk, ApiError>>> {
103        let mut raw = req.into_raw();
104        raw.stream = Some(true);
105
106        let url = format!("{}/chat/completions", self.base_url.trim_end_matches('/'));
107        debug!(method = "POST", %url, "sending streaming request");
108        let response = match self
109            .client
110            .post(&url)
111            .bearer_auth(&self.token)
112            .json(&raw)
113            .send()
114            .await
115        {
116            Ok(r) => r,
117            Err(e) => {
118                warn!(error = %e, "stream http send failed");
119                return Err(ApiError::Reqwest(e));
120            }
121        };
122
123        if !response.status().is_success() {
124            let status = response.status();
125            let text = response.text().await.unwrap_or_else(|e| e.to_string());
126            warn!(%status, "non-success response for stream");
127            return Err(ApiError::http_error(status, text));
128        }
129
130        // Convert to SSE event stream
131        let event_stream = response.bytes_stream().eventsource();
132        info!("stream connected; converting SSE to chunk stream");
133
134        // Map SSE events -> parsed ChatCompletionChunk or ApiError
135        let chunk_stream = event_stream.filter_map(|ev_res| async move {
136            match ev_res {
137                Ok(ev) => {
138                    if ev.data == "[DONE]" {
139                        debug!("received [DONE] event");
140                        None
141                    } else {
142                        match serde_json::from_str::<ChatCompletionChunk>(&ev.data) {
143                            Ok(chunk) => {
144                                debug!("parsed chunk");
145                                Some(Ok(chunk))
146                            }
147                            Err(e) => {
148                                warn!(error = %e, "failed to parse chunk");
149                                Some(Err(ApiError::Json(e)))
150                            }
151                        }
152                    }
153                }
154                Err(e) => {
155                    warn!(error = %e, "eventsource error");
156                    Some(Err(ApiError::EventSource(e.to_string())))
157                }
158            }
159        });
160
161        // Box the stream into a pinned BoxStream for ergonomic returns.
162        Ok(chunk_stream.boxed())
163    }
164
165    /// Send a streaming request consuming `self`, returning a `'static` `BoxStream`.
166    ///
167    /// Unlike `send_stream`, this takes ownership so the returned stream is not tied
168    /// to a client lifetime — useful when the stream must outlive the client reference
169    /// (e.g. storing it inside a state machine).
170    #[instrument(level = "info", skip(self, req))]
171    pub async fn into_stream(
172        self,
173        req: ApiRequest,
174    ) -> Result<BoxStream<'static, std::result::Result<ChatCompletionChunk, ApiError>>> {
175        let mut raw = req.into_raw();
176        raw.stream = Some(true);
177
178        let url = format!("{}/chat/completions", self.base_url.trim_end_matches('/'));
179        debug!(method = "POST", %url, "sending streaming request (owned)");
180
181        let response = match self
182            .client
183            .post(&url)
184            .bearer_auth(&self.token)
185            .json(&raw)
186            .send()
187            .await
188        {
189            Ok(r) => r,
190            Err(e) => {
191                warn!(error = %e, "stream http send failed");
192                return Err(ApiError::Reqwest(e));
193            }
194        };
195
196        if !response.status().is_success() {
197            let status = response.status();
198            let text = response.text().await.unwrap_or_else(|e| e.to_string());
199            warn!(%status, "non-success response for stream (owned)");
200            return Err(ApiError::http_error(status, text));
201        }
202
203        let event_stream = response.bytes_stream().eventsource();
204        info!("stream connected (owned); converting SSE to chunk stream");
205
206        let chunk_stream = event_stream.filter_map(|ev_res| async move {
207            match ev_res {
208                Ok(ev) => {
209                    if ev.data == "[DONE]" {
210                        debug!("received [DONE] event");
211                        None
212                    } else {
213                        match serde_json::from_str::<ChatCompletionChunk>(&ev.data) {
214                            Ok(chunk) => Some(Ok(chunk)),
215                            Err(e) => Some(Err(ApiError::Json(e))),
216                        }
217                    }
218                }
219                Err(e) => Some(Err(ApiError::EventSource(e.to_string()))),
220            }
221        });
222
223        Ok(chunk_stream.boxed())
224    }
225
226    /// Convenience: stream only text fragments (delta.content) as String items.
227    ///
228    /// Each yielded item is `Result<String, ApiError>`.
229    #[instrument(level = "debug", skip(self, req))]
230    pub async fn stream_text(
231        &self,
232        req: ApiRequest,
233    ) -> Result<BoxStream<'_, std::result::Result<String, ApiError>>> {
234        debug!("creating text stream from chunk stream");
235        let chunk_stream = self.send_stream(req).await?;
236        let text_stream = chunk_stream.map(|item_res| match item_res {
237            Ok(chunk) => {
238                let s = chunk
239                    .choices
240                    .first()
241                    .and_then(|c| c.delta.content.as_ref())
242                    .cloned()
243                    .unwrap_or_default();
244                debug!(fragment = %s, "yielding text fragment");
245                Ok(s)
246            }
247            Err(e) => {
248                warn!(error = %e, "yielding error from chunk stream");
249                Err(e)
250            }
251        });
252
253        Ok(text_stream.boxed())
254    }
255}