1use std::time::Duration;
2
3use eventsource_stream::Eventsource;
4use futures::{StreamExt, stream::BoxStream};
5use reqwest::{Client, Response};
6
7use tracing::{debug, info, instrument, warn};
8
9use super::request::ApiRequest;
10use crate::error::{ApiError, Result};
11use crate::raw::{ChatCompletionChunk, ChatCompletionResponse};
12
13#[derive(Clone, Debug)]
15pub struct ApiClient {
16 token: String,
17 base_url: String,
18 client: Client,
19 timeout: Option<Duration>,
20}
21
22impl ApiClient {
23 #[instrument(level = "info", skip(token), fields(masked_token = tracing::field::Empty))]
25 pub fn new(token: impl Into<String>) -> Self {
26 Self::with_client(token, Client::new())
27 }
28
29 #[instrument(level = "info", skip(token), fields(masked_token = tracing::field::Empty))]
30 pub fn with_client(token: impl Into<String>, client: Client) -> Self {
31 let token_str = token.into();
32 info!(message = "creating ApiClient instance");
33 let client = Self {
34 token: token_str,
35 base_url: "https://api.deepseek.com".to_string(),
36 client,
37 timeout: None,
38 };
39 tracing::Span::current().record("masked_token", "***");
40 client
41 }
42
43 pub fn with_base_url(mut self, base: impl Into<String>) -> Self {
45 self.base_url = base.into();
46 self
47 }
48
49 pub fn with_token(mut self, token: impl Into<String>) -> Self {
51 self.token = token.into();
52 self
53 }
54
55 pub fn with_timeout(mut self, t: Duration) -> Self {
57 self.timeout = Some(t);
58 self
59 }
60
61 fn completions_url(&self) -> String {
64 format!("{}/chat/completions", self.base_url.trim_end_matches('/'))
65 }
66
67 async fn post_raw(&self, req: ApiRequest, stream: bool) -> Result<Response> {
70 let mut raw = req.into_raw();
71 if stream {
72 raw.stream = Some(true);
73 }
74
75 let url = self.completions_url();
76 debug!(method = "POST", %url, %stream, "sending request");
77
78 let mut builder = self.client.post(&url).bearer_auth(&self.token).json(&raw);
79 if !stream && let Some(t) = self.timeout {
80 builder = builder.timeout(t);
81 debug!(timeout_ms = ?t.as_millis(), "request timeout set");
82 }
83
84 let resp = builder.send().await.map_err(|e| {
85 warn!(error = %e, "http send failed");
86 ApiError::Reqwest(e)
87 })?;
88
89 if !resp.status().is_success() {
90 let status = resp.status();
91 let text = resp.text().await.unwrap_or_else(|e| e.to_string());
92 warn!(%status, "non-success response");
93 return Err(ApiError::http_error(status, text));
94 }
95
96 Ok(resp)
97 }
98
99 fn response_into_chunk_stream(
104 resp: Response,
105 ) -> BoxStream<'static, std::result::Result<ChatCompletionChunk, ApiError>> {
106 let event_stream = resp.bytes_stream().eventsource();
107
108 event_stream
109 .filter_map(|ev_res| async move {
110 match ev_res {
111 Ok(ev) => {
112 if ev.data == "[DONE]" {
113 debug!("received [DONE] event");
114 None
115 } else {
116 match serde_json::from_str::<ChatCompletionChunk>(&ev.data) {
117 Ok(chunk) => {
118 debug!("parsed chunk");
119 Some(Ok(chunk))
120 }
121 Err(e) => {
122 warn!(error = %e, "failed to parse chunk");
123 Some(Err(ApiError::Json(e)))
124 }
125 }
126 }
127 }
128 Err(e) => {
129 warn!(error = %e, "eventsource error");
130 Some(Err(ApiError::EventSource(e.to_string())))
131 }
132 }
133 })
134 .boxed()
135 }
136
137 #[instrument(level = "info", skip(self, req))]
141 pub async fn send(&self, req: ApiRequest) -> Result<ChatCompletionResponse> {
142 let resp = self.post_raw(req, false).await?;
143 debug!("received HTTP response; deserialising");
144
145 let parsed = resp.json::<ChatCompletionResponse>().await.map_err(|e| {
146 warn!(error = %e, "failed to parse ChatCompletionResponse");
147 ApiError::Reqwest(e)
148 })?;
149
150 info!("request completed successfully");
151 Ok(parsed)
152 }
153
154 #[instrument(level = "info", skip(self, req))]
162 pub async fn send_stream(
163 &self,
164 req: ApiRequest,
165 ) -> Result<BoxStream<'_, std::result::Result<ChatCompletionChunk, ApiError>>> {
166 let resp = self.post_raw(req, true).await?;
167 info!("stream connected");
168 Ok(Self::response_into_chunk_stream(resp))
169 }
170
171 #[instrument(level = "info", skip(self, req))]
178 pub async fn into_stream(
179 self,
180 req: ApiRequest,
181 ) -> Result<BoxStream<'static, std::result::Result<ChatCompletionChunk, ApiError>>> {
182 let resp = self.post_raw(req, true).await?;
183 info!("stream connected (owned)");
184 Ok(Self::response_into_chunk_stream(resp))
185 }
186
187 #[instrument(level = "debug", skip(self, req))]
192 pub async fn stream_text(
193 &self,
194 req: ApiRequest,
195 ) -> Result<BoxStream<'_, std::result::Result<String, ApiError>>> {
196 debug!("creating text stream from chunk stream");
197 let chunk_stream = self.send_stream(req).await?;
198 let text_stream = chunk_stream.map(|item_res| match item_res {
199 Ok(chunk) => {
200 let s = chunk
201 .choices
202 .first()
203 .and_then(|c| c.delta.content.as_ref())
204 .cloned()
205 .unwrap_or_default();
206 debug!(fragment = %s, "yielding text fragment");
207 Ok(s)
208 }
209 Err(e) => {
210 warn!(error = %e, "yielding error from chunk stream");
211 Err(e)
212 }
213 });
214
215 Ok(text_stream.boxed())
216 }
217}