Skip to main content

pulse_client/
client.rs

1//! The `PulseClient` and its [`PulseClientBuilder`].
2
3use std::sync::Arc;
4use std::sync::RwLock;
5use std::time::Duration;
6
7use reqwest::Method;
8use reqwest::StatusCode;
9use serde::Serialize;
10use serde_json::Value;
11
12use crate::error::PulseError;
13use crate::events::EventsResource;
14use crate::iq::IQResource;
15use crate::resources::{
16    AgentsResource, AuthResource, PipelinesResource, TemplatesResource, UsersResource,
17};
18use crate::streams::StreamsResource;
19
20const USER_AGENT: &str = "pulse-client-rust/2.6.0";
21const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
22
23/// Async HTTP client for the Pulse REST API.
24///
25/// # Example
26///
27/// ```no_run
28/// use pulse_client::PulseClient;
29///
30/// # async fn run() -> Result<(), pulse_client::PulseError> {
31/// let client = PulseClient::builder()
32///     .base_url("http://localhost:9090")
33///     .build()?;
34///
35/// client.auth().login("alice", "secret").await?;
36///
37/// for pipeline in client.pipelines().list().await? {
38///     println!("{}", pipeline["name"]);
39/// }
40/// # Ok(())
41/// # }
42/// ```
43///
44/// # Thread safety
45///
46/// `PulseClient` is `Clone` and cheap to clone — the underlying reqwest client
47/// pools connections, and the token sits behind an `Arc<RwLock>`. Share a
48/// single instance across tasks.
49#[derive(Clone)]
50pub struct PulseClient {
51    pub(crate) inner: Arc<Inner>,
52}
53
54pub(crate) struct Inner {
55    pub(crate) base_url: String,
56    pub(crate) http: reqwest::Client,
57    pub(crate) token: RwLock<Option<String>>,
58}
59
60impl PulseClient {
61    pub fn builder() -> PulseClientBuilder {
62        PulseClientBuilder::default()
63    }
64
65    /// Returns the current bearer token, or `None` if none is set.
66    pub fn token(&self) -> Option<String> {
67        self.inner.token.read().ok().and_then(|guard| guard.clone())
68    }
69
70    /// Updates the bearer token used by subsequent authenticated requests.
71    /// Safe to call from multiple tasks concurrently.
72    pub fn set_token<S: Into<String>>(&self, token: S) {
73        if let Ok(mut guard) = self.inner.token.write() {
74            *guard = Some(token.into());
75        }
76    }
77
78    /// Clears the bearer token, effectively logging out the client.
79    pub fn clear_token(&self) {
80        if let Ok(mut guard) = self.inner.token.write() {
81            *guard = None;
82        }
83    }
84
85    // ------------------------------------------------------------------
86    // Resource accessors
87    // ------------------------------------------------------------------
88    pub fn auth(&self) -> AuthResource<'_> {
89        AuthResource { client: self }
90    }
91
92    pub fn pipelines(&self) -> PipelinesResource<'_> {
93        PipelinesResource { client: self }
94    }
95
96    pub fn agents(&self) -> AgentsResource<'_> {
97        AgentsResource { client: self }
98    }
99
100    pub fn templates(&self) -> TemplatesResource<'_> {
101        TemplatesResource { client: self }
102    }
103
104    pub fn users(&self) -> UsersResource<'_> {
105        UsersResource { client: self }
106    }
107
108    pub fn events(&self) -> EventsResource<'_> {
109        EventsResource { client: self }
110    }
111
112    pub fn iq(&self) -> IQResource<'_> {
113        IQResource { client: self }
114    }
115
116    /// `client.streams()` — B-107 Kafka-Streams-like declarative DSL.
117    pub fn streams(&self) -> StreamsResource<'_> {
118        StreamsResource { client: self }
119    }
120
121    /// `GET /api/pulse/version` — public, no JWT required. Returns the
122    /// Pulse server's build + version metadata.
123    pub async fn version(&self) -> Result<Value, PulseError> {
124        self.request(Method::GET, "/api/pulse/version", None::<&()>, false)
125            .await
126    }
127
128    // ------------------------------------------------------------------
129    // Internal: request execution + error translation
130    // ------------------------------------------------------------------
131    pub(crate) async fn request<B: Serialize + ?Sized>(
132        &self,
133        method: Method,
134        path: &str,
135        body: Option<&B>,
136        authenticated: bool,
137    ) -> Result<Value, PulseError> {
138        let url = format!("{}{path}", self.inner.base_url);
139        let mut req = self.inner.http.request(method, url);
140
141        if authenticated {
142            match self.token() {
143                Some(token) if !token.is_empty() => {
144                    req = req.bearer_auth(token);
145                }
146                _ => {
147                    return Err(PulseError::NoToken {
148                        path: path.to_string(),
149                    });
150                }
151            }
152        }
153
154        if let Some(payload) = body {
155            req = req.json(payload);
156        }
157
158        let response = req.send().await?;
159        let status = response.status();
160
161        if status == StatusCode::NO_CONTENT {
162            return Ok(Value::Object(Default::default()));
163        }
164
165        if status.is_success() {
166            // Read body; empty body → empty object so callers can `.get()`
167            let bytes = response.bytes().await?;
168            if bytes.is_empty() {
169                return Ok(Value::Object(Default::default()));
170            }
171            return Ok(serde_json::from_slice(&bytes)?);
172        }
173
174        // Non-success — translate to a typed error
175        let retry_after_header = response
176            .headers()
177            .get(reqwest::header::RETRY_AFTER)
178            .and_then(|v| v.to_str().ok())
179            .and_then(|s| s.trim().parse::<u32>().ok());
180
181        let bytes = response.bytes().await?;
182        let parsed_body: Option<Value> = if bytes.is_empty() {
183            None
184        } else {
185            match serde_json::from_slice::<Value>(&bytes) {
186                Ok(v) => Some(v),
187                Err(_) => {
188                    let raw = String::from_utf8_lossy(&bytes);
189                    let trimmed = if raw.len() > 200 { &raw[..200] } else { &raw };
190                    Some(serde_json::json!({ "error": trimmed }))
191                }
192            }
193        };
194
195        Err(translate_error(
196            status,
197            path,
198            parsed_body,
199            retry_after_header,
200        ))
201    }
202}
203
204fn translate_error(
205    status: StatusCode,
206    path: &str,
207    body: Option<Value>,
208    retry_after_header: Option<u32>,
209) -> PulseError {
210    let path = path.to_string();
211    match status {
212        StatusCode::UNAUTHORIZED => PulseError::Auth { path, body },
213        StatusCode::NOT_FOUND => PulseError::NotFound { path, body },
214        StatusCode::BAD_REQUEST => PulseError::Validation { path, body },
215        StatusCode::TOO_MANY_REQUESTS => {
216            let retry_from_body = body
217                .as_ref()
218                .and_then(|v| v.get("retryAfterSeconds"))
219                .and_then(|v| v.as_u64())
220                .map(|n| n as u32);
221            PulseError::RateLimit {
222                path,
223                body,
224                retry_after_seconds: retry_from_body.or(retry_after_header),
225            }
226        }
227        other => PulseError::Api {
228            status: other.as_u16(),
229            path,
230            body,
231        },
232    }
233}
234
235fn strip_trailing_slash(url: &str) -> String {
236    let mut s = url.to_string();
237    while s.len() > 1 && s.ends_with('/') {
238        s.pop();
239    }
240    s
241}
242
243// ----------------------------------------------------------------------
244// Builder
245// ----------------------------------------------------------------------
246
247/// Fluent builder for [`PulseClient`].
248#[derive(Default, Debug)]
249pub struct PulseClientBuilder {
250    base_url: Option<String>,
251    token: Option<String>,
252    timeout: Option<Duration>,
253    http: Option<reqwest::Client>,
254}
255
256impl PulseClientBuilder {
257    /// Required — the Pulse server URL (e.g. `http://localhost:9090`).
258    pub fn base_url<S: Into<String>>(mut self, base_url: S) -> Self {
259        self.base_url = Some(base_url.into());
260        self
261    }
262
263    /// Optional — pre-minted JWT to attach as `Authorization: Bearer <token>`.
264    pub fn token<S: Into<String>>(mut self, token: S) -> Self {
265        self.token = Some(token.into());
266        self
267    }
268
269    /// Optional — per-request timeout. Default 30 seconds.
270    pub fn timeout(mut self, timeout: Duration) -> Self {
271        self.timeout = Some(timeout);
272        self
273    }
274
275    /// Optional — bring-your-own [`reqwest::Client`] (shared connection pools,
276    /// custom TLS / proxy / mTLS config, tracing middleware).
277    pub fn http_client(mut self, http: reqwest::Client) -> Self {
278        self.http = Some(http);
279        self
280    }
281
282    pub fn build(self) -> Result<PulseClient, PulseError> {
283        let base_url = self
284            .base_url
285            .ok_or_else(|| PulseError::InvalidConfig("base_url is required".to_string()))?;
286        if base_url.is_empty() {
287            return Err(PulseError::InvalidConfig(
288                "base_url cannot be empty".to_string(),
289            ));
290        }
291
292        let http = match self.http {
293            Some(c) => c,
294            None => reqwest::Client::builder()
295                .timeout(self.timeout.unwrap_or(DEFAULT_TIMEOUT))
296                .user_agent(USER_AGENT)
297                .build()
298                .map_err(PulseError::Transport)?,
299        };
300
301        Ok(PulseClient {
302            inner: Arc::new(Inner {
303                base_url: strip_trailing_slash(&base_url),
304                http,
305                token: RwLock::new(self.token),
306            }),
307        })
308    }
309}