Skip to main content

ds_api/api/
client.rs

1use std::time::Duration;
2
3use eventsource_stream::Eventsource;
4use futures::{StreamExt, stream::BoxStream};
5use reqwest::{Client, Response};
6
7use tracing::{debug, info, instrument, warn};
8
9use super::request::ApiRequest;
10use crate::error::{ApiError, Result};
11use crate::raw::{ChatCompletionChunk, ChatCompletionResponse};
12
13/// Lightweight API HTTP client.
14#[derive(Clone, Debug)]
15pub struct ApiClient {
16    token: String,
17    base_url: String,
18    client: Client,
19    timeout: Option<Duration>,
20}
21
22impl ApiClient {
23    /// Create a new client with the given token.
24    #[instrument(level = "info", skip(token), fields(masked_token = tracing::field::Empty))]
25    pub fn new(token: impl Into<String>) -> Self {
26        Self::with_client(token, Client::new())
27    }
28
29    #[instrument(level = "info", skip(token), fields(masked_token = tracing::field::Empty))]
30    pub fn with_client(token: impl Into<String>, client: Client) -> Self {
31        let token_str = token.into();
32        info!(message = "creating ApiClient instance");
33        let client = Self {
34            token: token_str,
35            base_url: "https://api.deepseek.com".to_string(),
36            client,
37            timeout: None,
38        };
39        tracing::Span::current().record("masked_token", "***");
40        client
41    }
42
43    /// Replace base URL (builder style).
44    pub fn with_base_url(mut self, base: impl Into<String>) -> Self {
45        self.base_url = base.into();
46        self
47    }
48
49    /// Replace token (builder style).
50    pub fn with_token(mut self, token: impl Into<String>) -> Self {
51        self.token = token.into();
52        self
53    }
54
55    /// Set optional timeout for non-streaming requests.
56    pub fn with_timeout(mut self, t: Duration) -> Self {
57        self.timeout = Some(t);
58        self
59    }
60
61    // ── Private helpers ───────────────────────────────────────────────────────
62
63    fn completions_url(&self) -> String {
64        format!("{}/chat/completions", self.base_url.trim_end_matches('/'))
65    }
66
67    /// Send an HTTP POST to the completions endpoint and return the raw
68    /// [`Response`], checking for non-2xx status codes.
69    async fn post_raw(&self, req: ApiRequest, stream: bool) -> Result<Response> {
70        let mut raw = req.into_raw();
71        if stream {
72            raw.stream = Some(true);
73        }
74
75        let url = self.completions_url();
76        debug!(method = "POST", %url, %stream, "sending request");
77
78        let mut builder = self.client.post(&url).bearer_auth(&self.token).json(&raw);
79        if !stream && let Some(t) = self.timeout {
80            builder = builder.timeout(t);
81            debug!(timeout_ms = ?t.as_millis(), "request timeout set");
82        }
83
84        let resp = builder.send().await.map_err(|e| {
85            warn!(error = %e, "http send failed");
86            ApiError::Reqwest(e)
87        })?;
88
89        if !resp.status().is_success() {
90            let status = resp.status();
91            let text = resp.text().await.unwrap_or_else(|e| e.to_string());
92            warn!(%status, "non-success response");
93            return Err(ApiError::http_error(status, text));
94        }
95
96        Ok(resp)
97    }
98
99    /// Convert a successful streaming [`Response`] into a
100    /// `BoxStream<Result<ChatCompletionChunk, ApiError>>`.
101    ///
102    /// This is the single source of truth for SSE → chunk parsing.
103    fn response_into_chunk_stream(
104        resp: Response,
105    ) -> BoxStream<'static, std::result::Result<ChatCompletionChunk, ApiError>> {
106        let event_stream = resp.bytes_stream().eventsource();
107
108        event_stream
109            .filter_map(|ev_res| async move {
110                match ev_res {
111                    Ok(ev) => {
112                        if ev.data == "[DONE]" {
113                            debug!("received [DONE] event");
114                            None
115                        } else {
116                            match serde_json::from_str::<ChatCompletionChunk>(&ev.data) {
117                                Ok(chunk) => {
118                                    debug!("parsed chunk");
119                                    Some(Ok(chunk))
120                                }
121                                Err(e) => {
122                                    warn!(error = %e, "failed to parse chunk");
123                                    Some(Err(ApiError::Json(e)))
124                                }
125                            }
126                        }
127                    }
128                    Err(e) => {
129                        warn!(error = %e, "eventsource error");
130                        Some(Err(ApiError::EventSource(e.to_string())))
131                    }
132                }
133            })
134            .boxed()
135    }
136
137    // ── Public API ────────────────────────────────────────────────────────────
138
139    /// Send a non-streaming request and parse the full [`ChatCompletionResponse`].
140    #[instrument(level = "info", skip(self, req))]
141    pub async fn send(&self, req: ApiRequest) -> Result<ChatCompletionResponse> {
142        let resp = self.post_raw(req, false).await?;
143        debug!("received HTTP response; deserialising");
144
145        let parsed = resp.json::<ChatCompletionResponse>().await.map_err(|e| {
146            warn!(error = %e, "failed to parse ChatCompletionResponse");
147            ApiError::Reqwest(e)
148        })?;
149
150        info!("request completed successfully");
151        Ok(parsed)
152    }
153
154    /// Send a streaming (SSE) request and return a `BoxStream` of parsed
155    /// [`ChatCompletionChunk`]s.
156    ///
157    /// The stream borrows `&self` via the response lifetime, so it cannot
158    /// outlive the client.  Use [`into_stream`][Self::into_stream] when you
159    /// need a `'static` stream (e.g. inside a state machine that owns the
160    /// client).
161    #[instrument(level = "info", skip(self, req))]
162    pub async fn send_stream(
163        &self,
164        req: ApiRequest,
165    ) -> Result<BoxStream<'_, std::result::Result<ChatCompletionChunk, ApiError>>> {
166        let resp = self.post_raw(req, true).await?;
167        info!("stream connected");
168        Ok(Self::response_into_chunk_stream(resp))
169    }
170
171    /// Send a streaming (SSE) request, consuming `self`, and return a
172    /// `'static` `BoxStream` of parsed [`ChatCompletionChunk`]s.
173    ///
174    /// Taking ownership of the client means the returned stream is not tied to
175    /// any borrow, making it suitable for storage inside a state machine (e.g.
176    /// [`AgentStream`][crate::agent::AgentStream]).
177    #[instrument(level = "info", skip(self, req))]
178    pub async fn into_stream(
179        self,
180        req: ApiRequest,
181    ) -> Result<BoxStream<'static, std::result::Result<ChatCompletionChunk, ApiError>>> {
182        let resp = self.post_raw(req, true).await?;
183        info!("stream connected (owned)");
184        Ok(Self::response_into_chunk_stream(resp))
185    }
186
187    /// Convenience: stream only text fragments (`delta.content`) as [`String`]
188    /// items.
189    ///
190    /// Each yielded item is `Result<String, ApiError>`.
191    #[instrument(level = "debug", skip(self, req))]
192    pub async fn stream_text(
193        &self,
194        req: ApiRequest,
195    ) -> Result<BoxStream<'_, std::result::Result<String, ApiError>>> {
196        debug!("creating text stream from chunk stream");
197        let chunk_stream = self.send_stream(req).await?;
198        let text_stream = chunk_stream.map(|item_res| match item_res {
199            Ok(chunk) => {
200                let s = chunk
201                    .choices
202                    .first()
203                    .and_then(|c| c.delta.content.as_ref())
204                    .cloned()
205                    .unwrap_or_default();
206                debug!(fragment = %s, "yielding text fragment");
207                Ok(s)
208            }
209            Err(e) => {
210                warn!(error = %e, "yielding error from chunk stream");
211                Err(e)
212            }
213        });
214
215        Ok(text_stream.boxed())
216    }
217}