Skip to main content

ds_api/api/
client.rs

1use std::time::Duration;
2
3use eventsource_stream::Eventsource;
4use futures::{StreamExt, stream::BoxStream};
5use reqwest::Client;
6
7use tracing::{debug, info, instrument, warn};
8
9use super::request::ApiRequest;
10use crate::error::{ApiError, Result};
11use crate::raw::{ChatCompletionChunk, ChatCompletionResponse};
12
13/// Lightweight API HTTP client.
14#[derive(Clone, Debug)]
15pub struct ApiClient {
16    token: String,
17    base_url: String,
18    client: Client,
19    timeout: Option<Duration>,
20}
21
22impl ApiClient {
23    /// Create a new client with the given token.
24    #[instrument(level = "info", skip(token), fields(masked_token = tracing::field::Empty))]
25    pub fn new(token: impl Into<String>) -> Self {
26        Self::with_client(token, Client::new())
27    }
28
29    #[instrument(level = "info", skip(token), fields(masked_token = tracing::field::Empty))]
30    pub fn with_client(token: impl Into<String>, client: Client) -> Self {
31        // avoid recording the raw token value in traces; we mark a field instead
32        let token_str = token.into();
33        info!(message = "creating ApiClient instance");
34        let client = Self {
35            token: token_str.clone(),
36            base_url: "https://api.deepseek.com".to_string(),
37            client,
38            timeout: None,
39        };
40        // annotate the trace/span with a masked token indicator (presence only)
41        tracing::Span::current().record("masked_token", "***");
42        client
43    }
44
45    /// Replace base URL (builder style).
46    pub fn with_base_url(mut self, base: impl Into<String>) -> Self {
47        self.base_url = base.into();
48        self
49    }
50
51    /// Replace token (builder style).
52    pub fn with_token(mut self, token: impl Into<String>) -> Self {
53        self.token = token.into();
54        self
55    }
56
57    /// Set optional timeout for non-streaming requests.
58    pub fn with_timeout(mut self, t: Duration) -> Self {
59        self.timeout = Some(t);
60        self
61    }
62
63    /// Send a non-streaming request and parse the full ChatCompletionResponse.
64    #[instrument(level = "info", skip(self, req))]
65    pub async fn send(&self, req: ApiRequest) -> Result<ChatCompletionResponse> {
66        let raw = req.into_raw();
67        let url = format!("{}/chat/completions", self.base_url.trim_end_matches('/'));
68        debug!(method = "POST", %url, "sending non-streaming request");
69
70        let mut builder = self.client.post(&url).bearer_auth(&self.token).json(&raw);
71        if let Some(t) = self.timeout {
72            builder = builder.timeout(t);
73            debug!(timeout_ms = ?t.as_millis(), "request timeout set");
74        }
75
76        let resp = match builder.send().await {
77            Ok(r) => {
78                debug!("received HTTP response");
79                r
80            }
81            Err(e) => {
82                warn!(error = %e, "http send failed");
83                return Err(ApiError::Reqwest(e));
84            }
85        };
86
87        if !resp.status().is_success() {
88            let status = resp.status();
89            let text = resp.text().await.unwrap_or_else(|e| e.to_string());
90            warn!(%status, "non-success response");
91            return Err(ApiError::http_error(status, text));
92        }
93
94        let parsed = resp.json::<ChatCompletionResponse>().await.map_err(|e| {
95            warn!(error = %e, "failed to parse ChatCompletionResponse");
96            ApiError::Reqwest(e)
97        })?;
98        info!("request completed successfully");
99        Ok(parsed)
100    }
101
102    /// Send a streaming (SSE) request and return a boxed pinned stream of parsed `ChatCompletionChunk`.
103    #[instrument(level = "info", skip(self, req))]
104    pub async fn send_stream(
105        &self,
106        req: ApiRequest,
107    ) -> Result<BoxStream<'_, std::result::Result<ChatCompletionChunk, ApiError>>> {
108        let mut raw = req.into_raw();
109        raw.stream = Some(true);
110
111        let url = format!("{}/chat/completions", self.base_url.trim_end_matches('/'));
112        debug!(method = "POST", %url, "sending streaming request");
113        let response = match self
114            .client
115            .post(&url)
116            .bearer_auth(&self.token)
117            .json(&raw)
118            .send()
119            .await
120        {
121            Ok(r) => r,
122            Err(e) => {
123                warn!(error = %e, "stream http send failed");
124                return Err(ApiError::Reqwest(e));
125            }
126        };
127
128        if !response.status().is_success() {
129            let status = response.status();
130            let text = response.text().await.unwrap_or_else(|e| e.to_string());
131            warn!(%status, "non-success response for stream");
132            return Err(ApiError::http_error(status, text));
133        }
134
135        // Convert to SSE event stream
136        let event_stream = response.bytes_stream().eventsource();
137        info!("stream connected; converting SSE to chunk stream");
138
139        // Map SSE events -> parsed ChatCompletionChunk or ApiError
140        let chunk_stream = event_stream.filter_map(|ev_res| async move {
141            match ev_res {
142                Ok(ev) => {
143                    if ev.data == "[DONE]" {
144                        debug!("received [DONE] event");
145                        None
146                    } else {
147                        match serde_json::from_str::<ChatCompletionChunk>(&ev.data) {
148                            Ok(chunk) => {
149                                debug!("parsed chunk");
150                                Some(Ok(chunk))
151                            }
152                            Err(e) => {
153                                warn!(error = %e, "failed to parse chunk");
154                                Some(Err(ApiError::Json(e)))
155                            }
156                        }
157                    }
158                }
159                Err(e) => {
160                    warn!(error = %e, "eventsource error");
161                    Some(Err(ApiError::EventSource(e.to_string())))
162                }
163            }
164        });
165
166        // Box the stream into a pinned BoxStream for ergonomic returns.
167        Ok(chunk_stream.boxed())
168    }
169
170    /// Send a streaming request consuming `self`, returning a `'static` `BoxStream`.
171    ///
172    /// Unlike `send_stream`, this takes ownership so the returned stream is not tied
173    /// to a client lifetime — useful when the stream must outlive the client reference
174    /// (e.g. storing it inside a state machine).
175    #[instrument(level = "info", skip(self, req))]
176    pub async fn into_stream(
177        self,
178        req: ApiRequest,
179    ) -> Result<BoxStream<'static, std::result::Result<ChatCompletionChunk, ApiError>>> {
180        let mut raw = req.into_raw();
181        raw.stream = Some(true);
182
183        let url = format!("{}/chat/completions", self.base_url.trim_end_matches('/'));
184        debug!(method = "POST", %url, "sending streaming request (owned)");
185
186        let response = match self
187            .client
188            .post(&url)
189            .bearer_auth(&self.token)
190            .json(&raw)
191            .send()
192            .await
193        {
194            Ok(r) => r,
195            Err(e) => {
196                warn!(error = %e, "stream http send failed");
197                return Err(ApiError::Reqwest(e));
198            }
199        };
200
201        if !response.status().is_success() {
202            let status = response.status();
203            let text = response.text().await.unwrap_or_else(|e| e.to_string());
204            warn!(%status, "non-success response for stream (owned)");
205            return Err(ApiError::http_error(status, text));
206        }
207
208        let event_stream = response.bytes_stream().eventsource();
209        info!("stream connected (owned); converting SSE to chunk stream");
210
211        let chunk_stream = event_stream.filter_map(|ev_res| async move {
212            match ev_res {
213                Ok(ev) => {
214                    if ev.data == "[DONE]" {
215                        debug!("received [DONE] event");
216                        None
217                    } else {
218                        match serde_json::from_str::<ChatCompletionChunk>(&ev.data) {
219                            Ok(chunk) => Some(Ok(chunk)),
220                            Err(e) => Some(Err(ApiError::Json(e))),
221                        }
222                    }
223                }
224                Err(e) => Some(Err(ApiError::EventSource(e.to_string()))),
225            }
226        });
227
228        Ok(chunk_stream.boxed())
229    }
230
231    /// Convenience: stream only text fragments (delta.content) as String items.
232    ///
233    /// Each yielded item is `Result<String, ApiError>`.
234    #[instrument(level = "debug", skip(self, req))]
235    pub async fn stream_text(
236        &self,
237        req: ApiRequest,
238    ) -> Result<BoxStream<'_, std::result::Result<String, ApiError>>> {
239        debug!("creating text stream from chunk stream");
240        let chunk_stream = self.send_stream(req).await?;
241        let text_stream = chunk_stream.map(|item_res| match item_res {
242            Ok(chunk) => {
243                let s = chunk
244                    .choices
245                    .first()
246                    .and_then(|c| c.delta.content.as_ref())
247                    .cloned()
248                    .unwrap_or_default();
249                debug!(fragment = %s, "yielding text fragment");
250                Ok(s)
251            }
252            Err(e) => {
253                warn!(error = %e, "yielding error from chunk stream");
254                Err(e)
255            }
256        });
257
258        Ok(text_stream.boxed())
259    }
260}