1use std::sync::Arc;
4use std::sync::RwLock;
5use std::time::Duration;
6
7use reqwest::Method;
8use reqwest::StatusCode;
9use serde::Serialize;
10use serde_json::Value;
11
12use crate::duplex::{derive_ws_url, DuplexChannel};
13use crate::error::PulseError;
14use crate::events::EventsResource;
15use crate::iq::IQResource;
16use crate::resources::{
17 AgentsResource, AuthResource, ConnectorsResource, ModelsResource, PipelinesResource,
18 TemplatesResource, UsersResource,
19};
20use crate::streams::StreamsResource;
21
22const USER_AGENT: &str = "pulse-client-rust/2.6.0";
23const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
24
25#[derive(Clone)]
52pub struct PulseClient {
53 pub(crate) inner: Arc<Inner>,
54}
55
56pub(crate) struct Inner {
57 pub(crate) base_url: String,
58 pub(crate) http: reqwest::Client,
59 pub(crate) token: RwLock<Option<String>>,
60}
61
62impl PulseClient {
63 pub fn builder() -> PulseClientBuilder {
64 PulseClientBuilder::default()
65 }
66
67 pub fn token(&self) -> Option<String> {
69 self.inner.token.read().ok().and_then(|guard| guard.clone())
70 }
71
72 pub fn set_token<S: Into<String>>(&self, token: S) {
75 if let Ok(mut guard) = self.inner.token.write() {
76 *guard = Some(token.into());
77 }
78 }
79
80 pub fn clear_token(&self) {
82 if let Ok(mut guard) = self.inner.token.write() {
83 *guard = None;
84 }
85 }
86
87 pub fn auth(&self) -> AuthResource<'_> {
91 AuthResource { client: self }
92 }
93
94 pub fn pipelines(&self) -> PipelinesResource<'_> {
95 PipelinesResource { client: self }
96 }
97
98 pub fn agents(&self) -> AgentsResource<'_> {
99 AgentsResource { client: self }
100 }
101
102 pub fn templates(&self) -> TemplatesResource<'_> {
103 TemplatesResource { client: self }
104 }
105
106 pub fn users(&self) -> UsersResource<'_> {
107 UsersResource { client: self }
108 }
109
110 pub fn models(&self) -> ModelsResource<'_> {
113 ModelsResource { client: self }
114 }
115
116 pub fn connectors(&self) -> ConnectorsResource<'_> {
119 ConnectorsResource { client: self }
120 }
121
122 pub fn events(&self) -> EventsResource<'_> {
123 EventsResource { client: self }
124 }
125
126 pub fn iq(&self) -> IQResource<'_> {
127 IQResource { client: self }
128 }
129
130 pub fn streams(&self) -> StreamsResource<'_> {
132 StreamsResource { client: self }
133 }
134
135 pub async fn duplex(&self, agent_id: &str) -> Result<DuplexChannel, PulseError> {
162 if agent_id.trim().is_empty() {
163 return Err(PulseError::InvalidConfig(
164 "agent_id must be a non-empty string".to_string(),
165 ));
166 }
167 let token = self.token();
168 let url = derive_ws_url(&self.inner.base_url, agent_id, token.as_deref());
169 DuplexChannel::connect(url).await
170 }
171
172 pub async fn duplex_at(&self, ws_url: impl Into<String>) -> Result<DuplexChannel, PulseError> {
176 DuplexChannel::connect(ws_url.into()).await
177 }
178
179 pub async fn version(&self) -> Result<Value, PulseError> {
182 self.request(Method::GET, "/api/pulse/version", None::<&()>, false)
183 .await
184 }
185
186 pub(crate) async fn request<B: Serialize + ?Sized>(
190 &self,
191 method: Method,
192 path: &str,
193 body: Option<&B>,
194 authenticated: bool,
195 ) -> Result<Value, PulseError> {
196 let url = format!("{}{path}", self.inner.base_url);
197 let mut req = self.inner.http.request(method, url);
198
199 if authenticated {
200 match self.token() {
201 Some(token) if !token.is_empty() => {
202 req = req.bearer_auth(token);
203 }
204 _ => {
205 return Err(PulseError::NoToken {
206 path: path.to_string(),
207 });
208 }
209 }
210 }
211
212 if let Some(payload) = body {
213 req = req.json(payload);
214 }
215
216 let response = req.send().await?;
217 let status = response.status();
218
219 if status == StatusCode::NO_CONTENT {
220 return Ok(Value::Object(Default::default()));
221 }
222
223 if status.is_success() {
224 let bytes = response.bytes().await?;
226 if bytes.is_empty() {
227 return Ok(Value::Object(Default::default()));
228 }
229 return Ok(serde_json::from_slice(&bytes)?);
230 }
231
232 let retry_after_header = response
234 .headers()
235 .get(reqwest::header::RETRY_AFTER)
236 .and_then(|v| v.to_str().ok())
237 .and_then(|s| s.trim().parse::<u32>().ok());
238
239 let bytes = response.bytes().await?;
240 let parsed_body: Option<Value> = if bytes.is_empty() {
241 None
242 } else {
243 match serde_json::from_slice::<Value>(&bytes) {
244 Ok(v) => Some(v),
245 Err(_) => {
246 let raw = String::from_utf8_lossy(&bytes);
247 let trimmed = if raw.len() > 200 { &raw[..200] } else { &raw };
248 Some(serde_json::json!({ "error": trimmed }))
249 }
250 }
251 };
252
253 Err(translate_error(
254 status,
255 path,
256 parsed_body,
257 retry_after_header,
258 ))
259 }
260
261 pub(crate) async fn request_multipart(
267 &self,
268 path: &str,
269 form: reqwest::multipart::Form,
270 ) -> Result<Value, PulseError> {
271 let url = format!("{}{path}", self.inner.base_url);
272 let token = match self.token() {
273 Some(token) if !token.is_empty() => token,
274 _ => {
275 return Err(PulseError::NoToken {
276 path: path.to_string(),
277 });
278 }
279 };
280
281 let response = self
282 .inner
283 .http
284 .request(Method::POST, url)
285 .bearer_auth(token)
286 .multipart(form)
287 .send()
288 .await?;
289 let status = response.status();
290
291 if status == StatusCode::NO_CONTENT {
292 return Ok(Value::Object(Default::default()));
293 }
294 if status.is_success() {
295 let bytes = response.bytes().await?;
296 if bytes.is_empty() {
297 return Ok(Value::Object(Default::default()));
298 }
299 return Ok(serde_json::from_slice(&bytes)?);
300 }
301
302 let retry_after_header = response
303 .headers()
304 .get(reqwest::header::RETRY_AFTER)
305 .and_then(|v| v.to_str().ok())
306 .and_then(|s| s.trim().parse::<u32>().ok());
307 let bytes = response.bytes().await?;
308 let parsed_body: Option<Value> = if bytes.is_empty() {
309 None
310 } else {
311 match serde_json::from_slice::<Value>(&bytes) {
312 Ok(v) => Some(v),
313 Err(_) => {
314 let raw = String::from_utf8_lossy(&bytes);
315 let trimmed = if raw.len() > 200 { &raw[..200] } else { &raw };
316 Some(serde_json::json!({ "error": trimmed }))
317 }
318 }
319 };
320 Err(translate_error(
321 status,
322 path,
323 parsed_body,
324 retry_after_header,
325 ))
326 }
327}
328
329fn translate_error(
330 status: StatusCode,
331 path: &str,
332 body: Option<Value>,
333 retry_after_header: Option<u32>,
334) -> PulseError {
335 let path = path.to_string();
336 match status {
337 StatusCode::UNAUTHORIZED => PulseError::Auth { path, body },
338 StatusCode::NOT_FOUND => PulseError::NotFound { path, body },
339 StatusCode::BAD_REQUEST => PulseError::Validation { path, body },
340 StatusCode::TOO_MANY_REQUESTS => {
341 let retry_from_body = body
342 .as_ref()
343 .and_then(|v| v.get("retryAfterSeconds"))
344 .and_then(|v| v.as_u64())
345 .map(|n| n as u32);
346 PulseError::RateLimit {
347 path,
348 body,
349 retry_after_seconds: retry_from_body.or(retry_after_header),
350 }
351 }
352 other => PulseError::Api {
353 status: other.as_u16(),
354 path,
355 body,
356 },
357 }
358}
359
360fn strip_trailing_slash(url: &str) -> String {
361 let mut s = url.to_string();
362 while s.len() > 1 && s.ends_with('/') {
363 s.pop();
364 }
365 s
366}
367
368#[derive(Default, Debug)]
374pub struct PulseClientBuilder {
375 base_url: Option<String>,
376 token: Option<String>,
377 timeout: Option<Duration>,
378 http: Option<reqwest::Client>,
379}
380
381impl PulseClientBuilder {
382 pub fn base_url<S: Into<String>>(mut self, base_url: S) -> Self {
384 self.base_url = Some(base_url.into());
385 self
386 }
387
388 pub fn token<S: Into<String>>(mut self, token: S) -> Self {
390 self.token = Some(token.into());
391 self
392 }
393
394 pub fn timeout(mut self, timeout: Duration) -> Self {
396 self.timeout = Some(timeout);
397 self
398 }
399
400 pub fn http_client(mut self, http: reqwest::Client) -> Self {
403 self.http = Some(http);
404 self
405 }
406
407 pub fn build(self) -> Result<PulseClient, PulseError> {
408 let base_url = self
409 .base_url
410 .ok_or_else(|| PulseError::InvalidConfig("base_url is required".to_string()))?;
411 if base_url.is_empty() {
412 return Err(PulseError::InvalidConfig(
413 "base_url cannot be empty".to_string(),
414 ));
415 }
416
417 let http = match self.http {
418 Some(c) => c,
419 None => reqwest::Client::builder()
420 .timeout(self.timeout.unwrap_or(DEFAULT_TIMEOUT))
421 .user_agent(USER_AGENT)
422 .build()
423 .map_err(PulseError::Transport)?,
424 };
425
426 Ok(PulseClient {
427 inner: Arc::new(Inner {
428 base_url: strip_trailing_slash(&base_url),
429 http,
430 token: RwLock::new(self.token),
431 }),
432 })
433 }
434}