1use std::time::Duration;
2
3use eventsource_stream::Eventsource;
4use futures::{StreamExt, stream::BoxStream};
5use reqwest::Client;
6
7use tracing::{debug, info, instrument, warn};
8
9use super::request::ApiRequest;
10use crate::error::{ApiError, Result};
11use crate::raw::{ChatCompletionChunk, ChatCompletionResponse};
12
13#[derive(Clone, Debug)]
15pub struct ApiClient {
16 token: String,
17 base_url: String,
18 client: Client,
19 timeout: Option<Duration>,
20}
21
22impl ApiClient {
23 #[instrument(level = "info", skip(token), fields(masked_token = tracing::field::Empty))]
25 pub fn new(token: impl Into<String>) -> Self {
26 let token_str = token.into();
28 info!(message = "creating ApiClient instance");
29 let client = Self {
30 token: token_str.clone(),
31 base_url: "https://api.deepseek.com".to_string(),
32 client: Client::new(),
33 timeout: None,
34 };
35 tracing::Span::current().record("masked_token", &"***");
37 client
38 }
39
40 pub fn with_base_url(mut self, base: impl Into<String>) -> Self {
42 self.base_url = base.into();
43 self
44 }
45
46 pub fn with_token(mut self, token: impl Into<String>) -> Self {
48 self.token = token.into();
49 self
50 }
51
52 pub fn with_timeout(mut self, t: Duration) -> Self {
54 self.timeout = Some(t);
55 self
56 }
57
58 #[instrument(level = "info", skip(self, req))]
60 pub async fn send(&self, req: ApiRequest) -> Result<ChatCompletionResponse> {
61 let raw = req.into_raw();
62 let url = format!("{}/chat/completions", self.base_url.trim_end_matches('/'));
63 debug!(method = "POST", %url, "sending non-streaming request");
64
65 let mut builder = self.client.post(&url).bearer_auth(&self.token).json(&raw);
66 if let Some(t) = self.timeout {
67 builder = builder.timeout(t);
68 debug!(timeout_ms = ?t.as_millis(), "request timeout set");
69 }
70
71 let resp = match builder.send().await {
72 Ok(r) => {
73 debug!("received HTTP response");
74 r
75 }
76 Err(e) => {
77 warn!(error = %e, "http send failed");
78 return Err(ApiError::Reqwest(e));
79 }
80 };
81
82 if !resp.status().is_success() {
83 let status = resp.status();
84 let text = resp.text().await.unwrap_or_else(|e| e.to_string());
85 warn!(%status, "non-success response");
86 return Err(ApiError::http_error(status, text));
87 }
88
89 let parsed = resp.json::<ChatCompletionResponse>().await.map_err(|e| {
90 warn!(error = %e, "failed to parse ChatCompletionResponse");
91 ApiError::Reqwest(e)
92 })?;
93 info!("request completed successfully");
94 Ok(parsed)
95 }
96
97 #[instrument(level = "info", skip(self, req))]
99 pub async fn send_stream(
100 &self,
101 req: ApiRequest,
102 ) -> Result<BoxStream<'_, std::result::Result<ChatCompletionChunk, ApiError>>> {
103 let mut raw = req.into_raw();
104 raw.stream = Some(true);
105
106 let url = format!("{}/chat/completions", self.base_url.trim_end_matches('/'));
107 debug!(method = "POST", %url, "sending streaming request");
108 let response = match self
109 .client
110 .post(&url)
111 .bearer_auth(&self.token)
112 .json(&raw)
113 .send()
114 .await
115 {
116 Ok(r) => r,
117 Err(e) => {
118 warn!(error = %e, "stream http send failed");
119 return Err(ApiError::Reqwest(e));
120 }
121 };
122
123 if !response.status().is_success() {
124 let status = response.status();
125 let text = response.text().await.unwrap_or_else(|e| e.to_string());
126 warn!(%status, "non-success response for stream");
127 return Err(ApiError::http_error(status, text));
128 }
129
130 let event_stream = response.bytes_stream().eventsource();
132 info!("stream connected; converting SSE to chunk stream");
133
134 let chunk_stream = event_stream.filter_map(|ev_res| async move {
136 match ev_res {
137 Ok(ev) => {
138 if ev.data == "[DONE]" {
139 debug!("received [DONE] event");
140 None
141 } else {
142 match serde_json::from_str::<ChatCompletionChunk>(&ev.data) {
143 Ok(chunk) => {
144 debug!("parsed chunk");
145 Some(Ok(chunk))
146 }
147 Err(e) => {
148 warn!(error = %e, "failed to parse chunk");
149 Some(Err(ApiError::Json(e)))
150 }
151 }
152 }
153 }
154 Err(e) => {
155 warn!(error = %e, "eventsource error");
156 Some(Err(ApiError::EventSource(e.to_string())))
157 }
158 }
159 });
160
161 Ok(chunk_stream.boxed())
163 }
164
165 #[instrument(level = "debug", skip(self, req))]
169 pub async fn stream_text(
170 &self,
171 req: ApiRequest,
172 ) -> Result<BoxStream<'_, std::result::Result<String, ApiError>>> {
173 debug!("creating text stream from chunk stream");
174 let chunk_stream = self.send_stream(req).await?;
175 let text_stream = chunk_stream.map(|item_res| match item_res {
176 Ok(chunk) => {
177 let s = chunk
178 .choices
179 .first()
180 .and_then(|c| c.delta.content.as_ref())
181 .cloned()
182 .unwrap_or_default();
183 debug!(fragment = %s, "yielding text fragment");
184 Ok(s)
185 }
186 Err(e) => {
187 warn!(error = %e, "yielding error from chunk stream");
188 Err(e)
189 }
190 });
191
192 Ok(text_stream.boxed())
193 }
194}