Skip to main content

pulse_client/
resources.rs

1//! Resource accessors — one per OpenAPI tag.
2
3use reqwest::Method;
4use serde_json::{json, Value};
5
6use crate::client::PulseClient;
7use crate::error::PulseError;
8
9// ---------------------------------------------------------------------------
10// AuthResource — client.auth()
11// ---------------------------------------------------------------------------
12
13pub struct AuthResource<'c> {
14    pub(crate) client: &'c PulseClient,
15}
16
17impl AuthResource<'_> {
18    /// `POST /api/auth/login` — exchanges username + password for a JWT.
19    ///
20    /// On success, the returned token is cached on the parent client so
21    /// subsequent calls authenticate automatically.
22    pub async fn login(&self, username: &str, password: &str) -> Result<Value, PulseError> {
23        let body = json!({ "username": username, "password": password });
24        let response = self
25            .client
26            .request(Method::POST, "/api/auth/login", Some(&body), false)
27            .await?;
28        cache_token(self.client, &response);
29        Ok(response)
30    }
31
32    /// `POST /api/auth/refresh` — exchanges a refresh token for a fresh JWT.
33    pub async fn refresh(&self, refresh_token: &str) -> Result<Value, PulseError> {
34        let body = json!({ "refreshToken": refresh_token });
35        let response = self
36            .client
37            .request(Method::POST, "/api/auth/refresh", Some(&body), false)
38            .await?;
39        cache_token(self.client, &response);
40        Ok(response)
41    }
42
43    /// `GET /api/auth/organizations` — orgs the current user is a member of.
44    pub async fn organizations(&self) -> Result<Vec<Value>, PulseError> {
45        let result = self
46            .client
47            .request(Method::GET, "/api/auth/organizations", None::<&()>, true)
48            .await?;
49        Ok(unwrap_list(&result, "organizations"))
50    }
51
52    /// `POST /api/auth/switch-org` — switches the active organisation.
53    /// The new JWT (with updated orgId claim) is cached on the parent client.
54    pub async fn switch_org(&self, org_id: &str) -> Result<Value, PulseError> {
55        let body = json!({ "orgId": org_id });
56        let response = self
57            .client
58            .request(Method::POST, "/api/auth/switch-org", Some(&body), true)
59            .await?;
60        cache_token(self.client, &response);
61        Ok(response)
62    }
63}
64
65fn cache_token(client: &PulseClient, response: &Value) {
66    if let Some(token) = response.get("token").and_then(Value::as_str) {
67        if !token.is_empty() {
68            client.set_token(token);
69        }
70    }
71}
72
73// ---------------------------------------------------------------------------
74// PipelinesResource — client.pipelines()
75// ---------------------------------------------------------------------------
76
77pub struct PipelinesResource<'c> {
78    pub(crate) client: &'c PulseClient,
79}
80
81impl PipelinesResource<'_> {
82    /// `GET /api/pulse/pipelines` — every pipeline in the current org.
83    pub async fn list(&self) -> Result<Vec<Value>, PulseError> {
84        let result = self
85            .client
86            .request(Method::GET, "/api/pulse/pipelines", None::<&()>, true)
87            .await?;
88        Ok(unwrap_list(&result, "pipelines"))
89    }
90
91    /// `GET /api/pulse/pipelines/{id}` — one pipeline by id.
92    pub async fn get(&self, pipeline_id: &str) -> Result<Value, PulseError> {
93        let path = format!("/api/pulse/pipelines/{}", encode_path(pipeline_id));
94        self.client
95            .request(Method::GET, &path, None::<&()>, true)
96            .await
97    }
98
99    /// `POST /api/pulse/pipelines` — creates + deploys a new pipeline.
100    ///
101    /// The definition must follow the `CreatePipelineRequest` schema (see
102    /// openapi.yaml). At minimum: `name` + `nodes`.
103    pub async fn create(&self, definition: &Value) -> Result<Value, PulseError> {
104        self.client
105            .request(Method::POST, "/api/pulse/pipelines", Some(definition), true)
106            .await
107    }
108
109    /// `DELETE /api/pulse/pipelines/{id}` — tears down the pipeline.
110    pub async fn delete(&self, pipeline_id: &str) -> Result<(), PulseError> {
111        let path = format!("/api/pulse/pipelines/{}", encode_path(pipeline_id));
112        self.client
113            .request(Method::DELETE, &path, None::<&()>, true)
114            .await?;
115        Ok(())
116    }
117}
118
119// ---------------------------------------------------------------------------
120// AgentsResource — client.agents()
121// ---------------------------------------------------------------------------
122
123pub struct AgentsResource<'c> {
124    pub(crate) client: &'c PulseClient,
125}
126
127impl AgentsResource<'_> {
128    /// `GET /api/pulse/agents` — every deployed agent in the current org.
129    pub async fn list(&self) -> Result<Vec<Value>, PulseError> {
130        let result = self
131            .client
132            .request(Method::GET, "/api/pulse/agents", None::<&()>, true)
133            .await?;
134        Ok(unwrap_list(&result, "agents"))
135    }
136
137    /// `GET /api/pulse/agents/{id}` — one agent by id.
138    pub async fn get(&self, agent_id: &str) -> Result<Value, PulseError> {
139        let path = format!("/api/pulse/agents/{}", encode_path(agent_id));
140        self.client
141            .request(Method::GET, &path, None::<&()>, true)
142            .await
143    }
144
145    /// B-115 Phase 1 — `PUT /api/pulse/agents/{id}`: replace the agent's config.
146    ///
147    /// `config` is the FULL agent config (not a partial merge) — at minimum
148    /// `name`. Optional fields (`engineType`, `inputTopic`, `outputTopic`,
149    /// `description`, `instances`, `monthlyBudget`, `config`) fall back to safe
150    /// defaults when omitted. See the `UpdateAgentRequest` schema in
151    /// `openapi.yaml`.
152    ///
153    /// Today this triggers a full stop + persist + start cycle on the engine
154    /// side — the agent is briefly unavailable while the swap happens.
155    /// Existing state in the agent's keyed store is preserved. Phase 2
156    /// (B-115-engine) will add atomic event-boundary swap so hot-reloadable
157    /// changes apply with no downtime.
158    ///
159    /// Returns the post-update agent snapshot (same shape as [`get`](Self::get)).
160    ///
161    /// # Errors
162    ///
163    /// - [`PulseError::Validation`] on a bad config (self-loop, invalid
164    ///   streaming operators)
165    /// - [`PulseError::NotFound`] if the agent doesn't exist
166    pub async fn update(&self, agent_id: &str, config: &Value) -> Result<Value, PulseError> {
167        let path = format!("/api/pulse/agents/{}", encode_path(agent_id));
168        self.client
169            .request(Method::PUT, &path, Some(config), true)
170            .await
171    }
172
173    /// `DELETE /api/pulse/agents/{id}` — stop the agent + remove its config row.
174    ///
175    /// The agent's keyed state store is also dropped. Requires the
176    /// `AGENT_DELETE` permission.
177    pub async fn delete(&self, agent_id: &str) -> Result<(), PulseError> {
178        let path = format!("/api/pulse/agents/{}", encode_path(agent_id));
179        self.client
180            .request::<()>(Method::DELETE, &path, None, true)
181            .await?;
182        Ok(())
183    }
184}
185
186// ---------------------------------------------------------------------------
187// TemplatesResource — client.templates()
188// ---------------------------------------------------------------------------
189
190pub struct TemplatesResource<'c> {
191    pub(crate) client: &'c PulseClient,
192}
193
194impl TemplatesResource<'_> {
195    /// `GET /api/pulse/templates` — the 223+ first-party templates.
196    pub async fn list(&self) -> Result<Vec<Value>, PulseError> {
197        let result = self
198            .client
199            .request(Method::GET, "/api/pulse/templates", None::<&()>, true)
200            .await?;
201        Ok(unwrap_list(&result, "templates"))
202    }
203}
204
205// ---------------------------------------------------------------------------
206// ModelsResource — client.models()
207// ---------------------------------------------------------------------------
208
209/// `client.models()` — B-112 embedded ML model registry.
210///
211/// Upload ONNX models that the streaming `ml_predict` operator scores events
212/// against, in-process on the Pulse engine (no model-server hop). Models are
213/// org-scoped; upload / delete require the ADMIN role.
214///
215/// # Example
216///
217/// ```no_run
218/// use pulse_client::{PulseClient, ModelUpload};
219/// use std::collections::BTreeMap;
220///
221/// # async fn run(client: &PulseClient) -> Result<(), pulse_client::PulseError> {
222/// let mut input = BTreeMap::new();
223/// input.insert("amount".to_string(), "float".to_string());
224/// input.insert("country".to_string(), "string".to_string());
225///
226/// client
227///     .models()
228///     .upload(
229///         ModelUpload::from_path("fraud-classifier", "./model.onnx")
230///             .input_schema(input),
231///     )
232///     .await?;
233/// # Ok(())
234/// # }
235/// ```
236pub struct ModelsResource<'c> {
237    pub(crate) client: &'c PulseClient,
238}
239
240/// B-112 — describes a model upload to [`ModelsResource::upload`].
241///
242/// Supply the model bytes either by file `path` (read at upload time) or as
243/// raw `data`. Exactly one of the two must be set — [`ModelsResource::upload`]
244/// returns a [`PulseError::InvalidConfig`] otherwise.
245#[derive(Debug, Clone, Default)]
246pub struct ModelUpload {
247    /// Model name referenced by `ml_predict(model = ...)`.
248    pub name: String,
249    /// Filesystem path to the `.onnx` file. Mutually exclusive with `data`.
250    pub path: Option<String>,
251    /// Raw model bytes. Mutually exclusive with `path`.
252    pub data: Option<Vec<u8>>,
253    /// Model runtime — only `"onnx"` is supported today. Defaults to `"onnx"`.
254    pub runtime: Option<String>,
255    /// Ordered feature-name → type map, used to pack features into the input
256    /// tensor (in the model's input order).
257    pub input_schema: Option<std::collections::BTreeMap<String, String>>,
258    /// Output-name → type map (informational).
259    pub output_schema: Option<std::collections::BTreeMap<String, String>>,
260}
261
262impl ModelUpload {
263    /// Upload from a filesystem path to the `.onnx` file.
264    pub fn from_path(name: impl Into<String>, path: impl Into<String>) -> Self {
265        Self {
266            name: name.into(),
267            path: Some(path.into()),
268            ..Self::default()
269        }
270    }
271
272    /// Upload from raw model bytes.
273    pub fn from_bytes(name: impl Into<String>, data: Vec<u8>) -> Self {
274        Self {
275            name: name.into(),
276            data: Some(data),
277            ..Self::default()
278        }
279    }
280
281    /// Override the runtime (default `"onnx"`).
282    pub fn runtime(mut self, runtime: impl Into<String>) -> Self {
283        self.runtime = Some(runtime.into());
284        self
285    }
286
287    /// Set the ordered input feature schema.
288    pub fn input_schema(mut self, schema: std::collections::BTreeMap<String, String>) -> Self {
289        self.input_schema = Some(schema);
290        self
291    }
292
293    /// Set the (informational) output schema.
294    pub fn output_schema(mut self, schema: std::collections::BTreeMap<String, String>) -> Self {
295        self.output_schema = Some(schema);
296        self
297    }
298}
299
300impl ModelsResource<'_> {
301    /// `POST /api/pulse/ml-models` — upload (or replace) a model.
302    ///
303    /// Sent as `multipart/form-data`: a file part named `model` carrying the
304    /// bytes, plus text parts `name`, `runtime`, and (when set) `inputSchema` /
305    /// `outputSchema` as JSON strings. Replacing an existing name hot-swaps the
306    /// model with no agent restart.
307    ///
308    /// Returns the persisted model metadata (name, runtime, sha256, version, …).
309    ///
310    /// # Errors
311    ///
312    /// - [`PulseError::InvalidConfig`] if `name` is blank, if neither or both
313    ///   of `path`/`data` are set, or if the model bytes are empty.
314    /// - [`PulseError::Transport`] if reading the file at `path` fails.
315    pub async fn upload(&self, upload: ModelUpload) -> Result<Value, PulseError> {
316        if upload.name.trim().is_empty() {
317            return Err(PulseError::InvalidConfig(
318                "model name must be a non-empty string".to_string(),
319            ));
320        }
321        if upload.path.is_some() == upload.data.is_some() {
322            return Err(PulseError::InvalidConfig(
323                "provide exactly one of 'path' or 'data'".to_string(),
324            ));
325        }
326
327        let (blob, filename) = match (&upload.path, upload.data) {
328            (Some(path), None) => {
329                let bytes = std::fs::read(path)
330                    .map_err(|e| PulseError::InvalidConfig(format!("read {path}: {e}")))?;
331                let filename = path
332                    .rsplit(['/', '\\'])
333                    .next()
334                    .filter(|s| !s.is_empty())
335                    .unwrap_or("model.onnx")
336                    .to_string();
337                (bytes, filename)
338            }
339            (None, Some(data)) => (data, format!("{}.onnx", upload.name)),
340            // Unreachable — guarded by the XOR check above.
341            _ => unreachable!("exactly one of path/data enforced above"),
342        };
343        if blob.is_empty() {
344            return Err(PulseError::InvalidConfig(
345                "model bytes are empty".to_string(),
346            ));
347        }
348
349        let runtime = upload.runtime.unwrap_or_else(|| "onnx".to_string());
350        let model_part = reqwest::multipart::Part::bytes(blob)
351            .file_name(filename)
352            .mime_str("application/octet-stream")
353            .map_err(PulseError::Transport)?;
354        let mut form = reqwest::multipart::Form::new()
355            .text("name", upload.name)
356            .text("runtime", runtime)
357            .part("model", model_part);
358        if let Some(schema) = upload.input_schema {
359            form = form.text("inputSchema", serde_json::to_string(&schema)?);
360        }
361        if let Some(schema) = upload.output_schema {
362            form = form.text("outputSchema", serde_json::to_string(&schema)?);
363        }
364
365        self.client
366            .request_multipart("/api/pulse/ml-models", form)
367            .await
368    }
369
370    /// `GET /api/pulse/ml-models` — models registered for the caller's org.
371    pub async fn list(&self) -> Result<Vec<Value>, PulseError> {
372        let result = self
373            .client
374            .request(Method::GET, "/api/pulse/ml-models", None::<&()>, true)
375            .await?;
376        Ok(unwrap_list(&result, "models"))
377    }
378
379    /// `GET /api/pulse/ml-models/{name}` — metadata for one model.
380    pub async fn get(&self, name: &str) -> Result<Value, PulseError> {
381        let path = format!("/api/pulse/ml-models/{}", encode_path(name));
382        self.client
383            .request(Method::GET, &path, None::<&()>, true)
384            .await
385    }
386
387    /// `DELETE /api/pulse/ml-models/{name}` — remove a model (ADMIN).
388    pub async fn delete(&self, name: &str) -> Result<(), PulseError> {
389        let path = format!("/api/pulse/ml-models/{}", encode_path(name));
390        self.client
391            .request::<()>(Method::DELETE, &path, None, true)
392            .await?;
393        Ok(())
394    }
395}
396
397// ---------------------------------------------------------------------------
398// UsersResource — client.users()
399// ---------------------------------------------------------------------------
400
401pub struct UsersResource<'c> {
402    pub(crate) client: &'c PulseClient,
403}
404
405impl UsersResource<'_> {
406    /// `GET /api/pulse/users` — every user in the current org.
407    ///
408    /// Requires the caller to have the `USERS_LIST` permission atom (Owner /
409    /// Platform Admin personas by default — see B-105).
410    pub async fn list(&self) -> Result<Vec<Value>, PulseError> {
411        let result = self
412            .client
413            .request(Method::GET, "/api/pulse/users", None::<&()>, true)
414            .await?;
415        Ok(unwrap_list(&result, "users"))
416    }
417}
418
419// ---------------------------------------------------------------------------
420// Helpers
421// ---------------------------------------------------------------------------
422
423/// Extracts a `Vec<Value>` from `result[key]`. Returns an empty Vec for
424/// missing / malformed envelopes — never panics — so callers can iterate
425/// safely.
426fn unwrap_list(result: &Value, key: &str) -> Vec<Value> {
427    result
428        .get(key)
429        .and_then(Value::as_array)
430        .cloned()
431        .unwrap_or_default()
432}
433
434/// URL-encodes a path-param segment so ids containing `/`, spaces, etc.
435/// round-trip safely. Uses the same character set as the `pulse-go`
436/// `url.PathEscape` and `pulse-java` `URLEncoder` — `+` is encoded as `%20`.
437fn encode_path(segment: &str) -> String {
438    let mut out = String::with_capacity(segment.len());
439    for b in segment.bytes() {
440        match b {
441            b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
442                out.push(b as char)
443            }
444            _ => out.push_str(&format!("%{b:02X}")),
445        }
446    }
447    out
448}
449
450// ---------------------------------------------------------------------------
451// ConnectorsResource — client.connectors() (B-093 follow-up: catalogue parity)
452// ---------------------------------------------------------------------------
453
454/// `client.connectors()` — the connector catalogue, the same list the Pipeline
455/// Studio palette and `pulse connectors list` show. Each entry is
456/// `{subType, displayName, configFields}`; use the `subType` as a sink/source
457/// node `type` in a pipeline definition deployed via `client.pipelines()`.
458/// Bridged connectors appear only when the enterprise bridge JAR is on the
459/// server's classpath.
460pub struct ConnectorsResource<'c> {
461    pub(crate) client: &'c PulseClient,
462}
463
464impl ConnectorsResource<'_> {
465    /// `GET /api/pulse/connectors` — `{"sources": [...], "sinks": [...]}`.
466    pub async fn list(&self) -> Result<Value, PulseError> {
467        self.client
468            .request(Method::GET, "/api/pulse/connectors", None::<&()>, true)
469            .await
470    }
471
472    /// Just the sink connectors.
473    pub async fn sinks(&self) -> Result<Vec<Value>, PulseError> {
474        Ok(unwrap_list(&self.list().await?, "sinks"))
475    }
476
477    /// Just the source connectors.
478    pub async fn sources(&self) -> Result<Vec<Value>, PulseError> {
479        Ok(unwrap_list(&self.list().await?, "sources"))
480    }
481}