Skip to main content

ds_api/api/
client.rs

1use std::time::Duration;
2
3use eventsource_stream::Eventsource;
4use futures::{StreamExt, stream::BoxStream};
5use reqwest::Client;
6
7use tracing::{debug, info, instrument, warn};
8
9use super::request::ApiRequest;
10use crate::error::{ApiError, Result};
11use crate::raw::{ChatCompletionChunk, ChatCompletionResponse};
12
13/// Lightweight API HTTP client.
14#[derive(Clone, Debug)]
15pub struct ApiClient {
16    token: String,
17    base_url: String,
18    client: Client,
19    timeout: Option<Duration>,
20}
21
22impl ApiClient {
23    /// Create a new client with the given token.
24    #[instrument(level = "info", skip(token), fields(masked_token = tracing::field::Empty))]
25    pub fn new(token: impl Into<String>) -> Self {
26        // avoid recording the raw token value in traces; we mark a field instead
27        let token_str = token.into();
28        info!(message = "creating ApiClient instance");
29        let client = Self {
30            token: token_str.clone(),
31            base_url: "https://api.deepseek.com".to_string(),
32            client: Client::new(),
33            timeout: None,
34        };
35        // annotate the trace/span with a masked token indicator (presence only)
36        tracing::Span::current().record("masked_token", &"***");
37        client
38    }
39
40    /// Replace base URL (builder style).
41    pub fn with_base_url(mut self, base: impl Into<String>) -> Self {
42        self.base_url = base.into();
43        self
44    }
45
46    /// Replace token (builder style).
47    pub fn with_token(mut self, token: impl Into<String>) -> Self {
48        self.token = token.into();
49        self
50    }
51
52    /// Set optional timeout for non-streaming requests.
53    pub fn with_timeout(mut self, t: Duration) -> Self {
54        self.timeout = Some(t);
55        self
56    }
57
58    /// Send a non-streaming request and parse the full ChatCompletionResponse.
59    #[instrument(level = "info", skip(self, req))]
60    pub async fn send(&self, req: ApiRequest) -> Result<ChatCompletionResponse> {
61        let raw = req.into_raw();
62        let url = format!("{}/chat/completions", self.base_url.trim_end_matches('/'));
63        debug!(method = "POST", %url, "sending non-streaming request");
64
65        let mut builder = self.client.post(&url).bearer_auth(&self.token).json(&raw);
66        if let Some(t) = self.timeout {
67            builder = builder.timeout(t);
68            debug!(timeout_ms = ?t.as_millis(), "request timeout set");
69        }
70
71        let resp = match builder.send().await {
72            Ok(r) => {
73                debug!("received HTTP response");
74                r
75            }
76            Err(e) => {
77                warn!(error = %e, "http send failed");
78                return Err(ApiError::Reqwest(e));
79            }
80        };
81
82        if !resp.status().is_success() {
83            let status = resp.status();
84            let text = resp.text().await.unwrap_or_else(|e| e.to_string());
85            warn!(%status, "non-success response");
86            return Err(ApiError::http_error(status, text));
87        }
88
89        let parsed = resp.json::<ChatCompletionResponse>().await.map_err(|e| {
90            warn!(error = %e, "failed to parse ChatCompletionResponse");
91            ApiError::Reqwest(e)
92        })?;
93        info!("request completed successfully");
94        Ok(parsed)
95    }
96
97    /// Send a streaming (SSE) request and return a boxed pinned stream of parsed `ChatCompletionChunk`.
98    #[instrument(level = "info", skip(self, req))]
99    pub async fn send_stream(
100        &self,
101        req: ApiRequest,
102    ) -> Result<BoxStream<'_, std::result::Result<ChatCompletionChunk, ApiError>>> {
103        let mut raw = req.into_raw();
104        raw.stream = Some(true);
105
106        let url = format!("{}/chat/completions", self.base_url.trim_end_matches('/'));
107        debug!(method = "POST", %url, "sending streaming request");
108        let response = match self
109            .client
110            .post(&url)
111            .bearer_auth(&self.token)
112            .json(&raw)
113            .send()
114            .await
115        {
116            Ok(r) => r,
117            Err(e) => {
118                warn!(error = %e, "stream http send failed");
119                return Err(ApiError::Reqwest(e));
120            }
121        };
122
123        if !response.status().is_success() {
124            let status = response.status();
125            let text = response.text().await.unwrap_or_else(|e| e.to_string());
126            warn!(%status, "non-success response for stream");
127            return Err(ApiError::http_error(status, text));
128        }
129
130        // Convert to SSE event stream
131        let event_stream = response.bytes_stream().eventsource();
132        info!("stream connected; converting SSE to chunk stream");
133
134        // Map SSE events -> parsed ChatCompletionChunk or ApiError
135        let chunk_stream = event_stream.filter_map(|ev_res| async move {
136            match ev_res {
137                Ok(ev) => {
138                    if ev.data == "[DONE]" {
139                        debug!("received [DONE] event");
140                        None
141                    } else {
142                        match serde_json::from_str::<ChatCompletionChunk>(&ev.data) {
143                            Ok(chunk) => {
144                                debug!("parsed chunk");
145                                Some(Ok(chunk))
146                            }
147                            Err(e) => {
148                                warn!(error = %e, "failed to parse chunk");
149                                Some(Err(ApiError::Json(e)))
150                            }
151                        }
152                    }
153                }
154                Err(e) => {
155                    warn!(error = %e, "eventsource error");
156                    Some(Err(ApiError::EventSource(e.to_string())))
157                }
158            }
159        });
160
161        // Box the stream into a pinned BoxStream for ergonomic returns.
162        Ok(chunk_stream.boxed())
163    }
164
165    /// Convenience: stream only text fragments (delta.content) as String items.
166    ///
167    /// Each yielded item is `Result<String, ApiError>`.
168    #[instrument(level = "debug", skip(self, req))]
169    pub async fn stream_text(
170        &self,
171        req: ApiRequest,
172    ) -> Result<BoxStream<'_, std::result::Result<String, ApiError>>> {
173        debug!("creating text stream from chunk stream");
174        let chunk_stream = self.send_stream(req).await?;
175        let text_stream = chunk_stream.map(|item_res| match item_res {
176            Ok(chunk) => {
177                let s = chunk
178                    .choices
179                    .first()
180                    .and_then(|c| c.delta.content.as_ref())
181                    .cloned()
182                    .unwrap_or_default();
183                debug!(fragment = %s, "yielding text fragment");
184                Ok(s)
185            }
186            Err(e) => {
187                warn!(error = %e, "yielding error from chunk stream");
188                Err(e)
189            }
190        });
191
192        Ok(text_stream.boxed())
193    }
194}