Skip to main content

pulse_client/
client.rs

1//! The `PulseClient` and its [`PulseClientBuilder`].
2
3use std::sync::Arc;
4use std::sync::RwLock;
5use std::time::Duration;
6
7use reqwest::Method;
8use reqwest::StatusCode;
9use serde::Serialize;
10use serde_json::Value;
11
12use crate::duplex::{derive_ws_url, DuplexChannel};
13use crate::error::PulseError;
14use crate::events::EventsResource;
15use crate::iq::IQResource;
16use crate::resources::{
17    AgentsResource, AuthResource, ConnectorsResource, ModelsResource, PipelinesResource,
18    TemplatesResource, UsersResource,
19};
20use crate::streams::StreamsResource;
21
22const USER_AGENT: &str = "pulse-client-rust/2.6.0";
23const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
24
25/// Async HTTP client for the Pulse REST API.
26///
27/// # Example
28///
29/// ```no_run
30/// use pulse_client::PulseClient;
31///
32/// # async fn run() -> Result<(), pulse_client::PulseError> {
33/// let client = PulseClient::builder()
34///     .base_url("http://localhost:9090")
35///     .build()?;
36///
37/// client.auth().login("alice", "secret").await?;
38///
39/// for pipeline in client.pipelines().list().await? {
40///     println!("{}", pipeline["name"]);
41/// }
42/// # Ok(())
43/// # }
44/// ```
45///
46/// # Thread safety
47///
48/// `PulseClient` is `Clone` and cheap to clone — the underlying reqwest client
49/// pools connections, and the token sits behind an `Arc<RwLock>`. Share a
50/// single instance across tasks.
51#[derive(Clone)]
52pub struct PulseClient {
53    pub(crate) inner: Arc<Inner>,
54}
55
56pub(crate) struct Inner {
57    pub(crate) base_url: String,
58    pub(crate) http: reqwest::Client,
59    pub(crate) token: RwLock<Option<String>>,
60}
61
62impl PulseClient {
63    pub fn builder() -> PulseClientBuilder {
64        PulseClientBuilder::default()
65    }
66
67    /// Returns the current bearer token, or `None` if none is set.
68    pub fn token(&self) -> Option<String> {
69        self.inner.token.read().ok().and_then(|guard| guard.clone())
70    }
71
72    /// Updates the bearer token used by subsequent authenticated requests.
73    /// Safe to call from multiple tasks concurrently.
74    pub fn set_token<S: Into<String>>(&self, token: S) {
75        if let Ok(mut guard) = self.inner.token.write() {
76            *guard = Some(token.into());
77        }
78    }
79
80    /// Clears the bearer token, effectively logging out the client.
81    pub fn clear_token(&self) {
82        if let Ok(mut guard) = self.inner.token.write() {
83            *guard = None;
84        }
85    }
86
87    // ------------------------------------------------------------------
88    // Resource accessors
89    // ------------------------------------------------------------------
90    pub fn auth(&self) -> AuthResource<'_> {
91        AuthResource { client: self }
92    }
93
94    pub fn pipelines(&self) -> PipelinesResource<'_> {
95        PipelinesResource { client: self }
96    }
97
98    pub fn agents(&self) -> AgentsResource<'_> {
99        AgentsResource { client: self }
100    }
101
102    pub fn templates(&self) -> TemplatesResource<'_> {
103        TemplatesResource { client: self }
104    }
105
106    pub fn users(&self) -> UsersResource<'_> {
107        UsersResource { client: self }
108    }
109
110    /// `client.models()` — B-112 embedded ML model registry (upload / list /
111    /// get / delete ONNX models scored by the streaming `ml_predict` operator).
112    pub fn models(&self) -> ModelsResource<'_> {
113        ModelsResource { client: self }
114    }
115
116    /// `client.connectors()` — the connector catalogue (B-093 family + every
117    /// native / bridged connector); use a `subType` as a pipeline node `type`.
118    pub fn connectors(&self) -> ConnectorsResource<'_> {
119        ConnectorsResource { client: self }
120    }
121
122    pub fn events(&self) -> EventsResource<'_> {
123        EventsResource { client: self }
124    }
125
126    pub fn iq(&self) -> IQResource<'_> {
127        IQResource { client: self }
128    }
129
130    /// `client.streams()` — B-107 Kafka-Streams-like declarative DSL.
131    pub fn streams(&self) -> StreamsResource<'_> {
132        StreamsResource { client: self }
133    }
134
135    /// B-114 — open a bidirectional duplex channel to an agent.
136    ///
137    /// Streams events IN and receives the agent's correlated outputs OUT on a
138    /// single WebSocket — the synchronous-decision path (fraud, pricing, A/B
139    /// assignment). The endpoint runs on the Pulse WebSocket port (REST port
140    /// + 1); the URL is derived from this client's `base_url` + token.
141    ///
142    /// ```no_run
143    /// # use pulse_client::PulseClient;
144    /// # use serde_json::json;
145    /// # async fn run(client: &PulseClient) -> Result<(), pulse_client::PulseError> {
146    /// let mut ch = client.duplex("fraud-detector").await?;
147    /// let cid = ch.send(&json!({ "amount": 5000 }), Some("tx-1")).await?;
148    /// let output = ch.recv().await?;
149    /// assert_eq!(output.correlation_id, Some(cid));
150    /// ch.close().await?;
151    /// # Ok(())
152    /// # }
153    /// ```
154    ///
155    /// # Errors
156    ///
157    /// - [`PulseError::InvalidConfig`] if `agent_id` is blank.
158    /// - [`PulseError::Duplex`] on a WebSocket handshake / transport failure.
159    /// - [`PulseError::Validation`] if the server rejects the agent with an
160    ///   `error` frame on open.
161    pub async fn duplex(&self, agent_id: &str) -> Result<DuplexChannel, PulseError> {
162        if agent_id.trim().is_empty() {
163            return Err(PulseError::InvalidConfig(
164                "agent_id must be a non-empty string".to_string(),
165            ));
166        }
167        let token = self.token();
168        let url = derive_ws_url(&self.inner.base_url, agent_id, token.as_deref());
169        DuplexChannel::connect(url).await
170    }
171
172    /// Open a duplex channel at an explicit WebSocket URL, bypassing the
173    /// REST-port-+-1 derivation. Useful when the WebSocket endpoint sits
174    /// behind a separate gateway / hostname.
175    pub async fn duplex_at(&self, ws_url: impl Into<String>) -> Result<DuplexChannel, PulseError> {
176        DuplexChannel::connect(ws_url.into()).await
177    }
178
179    /// `GET /api/pulse/version` — public, no JWT required. Returns the
180    /// Pulse server's build + version metadata.
181    pub async fn version(&self) -> Result<Value, PulseError> {
182        self.request(Method::GET, "/api/pulse/version", None::<&()>, false)
183            .await
184    }
185
186    // ------------------------------------------------------------------
187    // Internal: request execution + error translation
188    // ------------------------------------------------------------------
189    pub(crate) async fn request<B: Serialize + ?Sized>(
190        &self,
191        method: Method,
192        path: &str,
193        body: Option<&B>,
194        authenticated: bool,
195    ) -> Result<Value, PulseError> {
196        let url = format!("{}{path}", self.inner.base_url);
197        let mut req = self.inner.http.request(method, url);
198
199        if authenticated {
200            match self.token() {
201                Some(token) if !token.is_empty() => {
202                    req = req.bearer_auth(token);
203                }
204                _ => {
205                    return Err(PulseError::NoToken {
206                        path: path.to_string(),
207                    });
208                }
209            }
210        }
211
212        if let Some(payload) = body {
213            req = req.json(payload);
214        }
215
216        let response = req.send().await?;
217        let status = response.status();
218
219        if status == StatusCode::NO_CONTENT {
220            return Ok(Value::Object(Default::default()));
221        }
222
223        if status.is_success() {
224            // Read body; empty body → empty object so callers can `.get()`
225            let bytes = response.bytes().await?;
226            if bytes.is_empty() {
227                return Ok(Value::Object(Default::default()));
228            }
229            return Ok(serde_json::from_slice(&bytes)?);
230        }
231
232        // Non-success — translate to a typed error
233        let retry_after_header = response
234            .headers()
235            .get(reqwest::header::RETRY_AFTER)
236            .and_then(|v| v.to_str().ok())
237            .and_then(|s| s.trim().parse::<u32>().ok());
238
239        let bytes = response.bytes().await?;
240        let parsed_body: Option<Value> = if bytes.is_empty() {
241            None
242        } else {
243            match serde_json::from_slice::<Value>(&bytes) {
244                Ok(v) => Some(v),
245                Err(_) => {
246                    let raw = String::from_utf8_lossy(&bytes);
247                    let trimmed = if raw.len() > 200 { &raw[..200] } else { &raw };
248                    Some(serde_json::json!({ "error": trimmed }))
249                }
250            }
251        };
252
253        Err(translate_error(
254            status,
255            path,
256            parsed_body,
257            retry_after_header,
258        ))
259    }
260
261    /// B-112 — issue a `multipart/form-data` POST (the ML model-upload path).
262    ///
263    /// Shares the auth + error-translation logic of [`request`](Self::request)
264    /// but sends a pre-built [`reqwest::multipart::Form`] instead of a JSON
265    /// body. Always authenticated.
266    pub(crate) async fn request_multipart(
267        &self,
268        path: &str,
269        form: reqwest::multipart::Form,
270    ) -> Result<Value, PulseError> {
271        let url = format!("{}{path}", self.inner.base_url);
272        let token = match self.token() {
273            Some(token) if !token.is_empty() => token,
274            _ => {
275                return Err(PulseError::NoToken {
276                    path: path.to_string(),
277                });
278            }
279        };
280
281        let response = self
282            .inner
283            .http
284            .request(Method::POST, url)
285            .bearer_auth(token)
286            .multipart(form)
287            .send()
288            .await?;
289        let status = response.status();
290
291        if status == StatusCode::NO_CONTENT {
292            return Ok(Value::Object(Default::default()));
293        }
294        if status.is_success() {
295            let bytes = response.bytes().await?;
296            if bytes.is_empty() {
297                return Ok(Value::Object(Default::default()));
298            }
299            return Ok(serde_json::from_slice(&bytes)?);
300        }
301
302        let retry_after_header = response
303            .headers()
304            .get(reqwest::header::RETRY_AFTER)
305            .and_then(|v| v.to_str().ok())
306            .and_then(|s| s.trim().parse::<u32>().ok());
307        let bytes = response.bytes().await?;
308        let parsed_body: Option<Value> = if bytes.is_empty() {
309            None
310        } else {
311            match serde_json::from_slice::<Value>(&bytes) {
312                Ok(v) => Some(v),
313                Err(_) => {
314                    let raw = String::from_utf8_lossy(&bytes);
315                    let trimmed = if raw.len() > 200 { &raw[..200] } else { &raw };
316                    Some(serde_json::json!({ "error": trimmed }))
317                }
318            }
319        };
320        Err(translate_error(
321            status,
322            path,
323            parsed_body,
324            retry_after_header,
325        ))
326    }
327}
328
329fn translate_error(
330    status: StatusCode,
331    path: &str,
332    body: Option<Value>,
333    retry_after_header: Option<u32>,
334) -> PulseError {
335    let path = path.to_string();
336    match status {
337        StatusCode::UNAUTHORIZED => PulseError::Auth { path, body },
338        StatusCode::NOT_FOUND => PulseError::NotFound { path, body },
339        StatusCode::BAD_REQUEST => PulseError::Validation { path, body },
340        StatusCode::TOO_MANY_REQUESTS => {
341            let retry_from_body = body
342                .as_ref()
343                .and_then(|v| v.get("retryAfterSeconds"))
344                .and_then(|v| v.as_u64())
345                .map(|n| n as u32);
346            PulseError::RateLimit {
347                path,
348                body,
349                retry_after_seconds: retry_from_body.or(retry_after_header),
350            }
351        }
352        other => PulseError::Api {
353            status: other.as_u16(),
354            path,
355            body,
356        },
357    }
358}
359
360fn strip_trailing_slash(url: &str) -> String {
361    let mut s = url.to_string();
362    while s.len() > 1 && s.ends_with('/') {
363        s.pop();
364    }
365    s
366}
367
368// ----------------------------------------------------------------------
369// Builder
370// ----------------------------------------------------------------------
371
372/// Fluent builder for [`PulseClient`].
373#[derive(Default, Debug)]
374pub struct PulseClientBuilder {
375    base_url: Option<String>,
376    token: Option<String>,
377    timeout: Option<Duration>,
378    http: Option<reqwest::Client>,
379}
380
381impl PulseClientBuilder {
382    /// Required — the Pulse server URL (e.g. `http://localhost:9090`).
383    pub fn base_url<S: Into<String>>(mut self, base_url: S) -> Self {
384        self.base_url = Some(base_url.into());
385        self
386    }
387
388    /// Optional — pre-minted JWT to attach as `Authorization: Bearer <token>`.
389    pub fn token<S: Into<String>>(mut self, token: S) -> Self {
390        self.token = Some(token.into());
391        self
392    }
393
394    /// Optional — per-request timeout. Default 30 seconds.
395    pub fn timeout(mut self, timeout: Duration) -> Self {
396        self.timeout = Some(timeout);
397        self
398    }
399
400    /// Optional — bring-your-own [`reqwest::Client`] (shared connection pools,
401    /// custom TLS / proxy / mTLS config, tracing middleware).
402    pub fn http_client(mut self, http: reqwest::Client) -> Self {
403        self.http = Some(http);
404        self
405    }
406
407    pub fn build(self) -> Result<PulseClient, PulseError> {
408        let base_url = self
409            .base_url
410            .ok_or_else(|| PulseError::InvalidConfig("base_url is required".to_string()))?;
411        if base_url.is_empty() {
412            return Err(PulseError::InvalidConfig(
413                "base_url cannot be empty".to_string(),
414            ));
415        }
416
417        let http = match self.http {
418            Some(c) => c,
419            None => reqwest::Client::builder()
420                .timeout(self.timeout.unwrap_or(DEFAULT_TIMEOUT))
421                .user_agent(USER_AGENT)
422                .build()
423                .map_err(PulseError::Transport)?,
424        };
425
426        Ok(PulseClient {
427            inner: Arc::new(Inner {
428                base_url: strip_trailing_slash(&base_url),
429                http,
430                token: RwLock::new(self.token),
431            }),
432        })
433    }
434}