pulse_client/resources.rs
1//! Resource accessors — one per OpenAPI tag.
2
3use reqwest::Method;
4use serde_json::{json, Value};
5
6use crate::client::PulseClient;
7use crate::error::PulseError;
8
9// ---------------------------------------------------------------------------
10// AuthResource — client.auth()
11// ---------------------------------------------------------------------------
12
13pub struct AuthResource<'c> {
14 pub(crate) client: &'c PulseClient,
15}
16
17impl AuthResource<'_> {
18 /// `POST /api/auth/login` — exchanges username + password for a JWT.
19 ///
20 /// On success, the returned token is cached on the parent client so
21 /// subsequent calls authenticate automatically.
22 pub async fn login(&self, username: &str, password: &str) -> Result<Value, PulseError> {
23 let body = json!({ "username": username, "password": password });
24 let response = self
25 .client
26 .request(Method::POST, "/api/auth/login", Some(&body), false)
27 .await?;
28 cache_token(self.client, &response);
29 Ok(response)
30 }
31
32 /// `POST /api/auth/refresh` — exchanges a refresh token for a fresh JWT.
33 pub async fn refresh(&self, refresh_token: &str) -> Result<Value, PulseError> {
34 let body = json!({ "refreshToken": refresh_token });
35 let response = self
36 .client
37 .request(Method::POST, "/api/auth/refresh", Some(&body), false)
38 .await?;
39 cache_token(self.client, &response);
40 Ok(response)
41 }
42
43 /// `GET /api/auth/organizations` — orgs the current user is a member of.
44 pub async fn organizations(&self) -> Result<Vec<Value>, PulseError> {
45 let result = self
46 .client
47 .request(Method::GET, "/api/auth/organizations", None::<&()>, true)
48 .await?;
49 Ok(unwrap_list(&result, "organizations"))
50 }
51
52 /// `POST /api/auth/switch-org` — switches the active organisation.
53 /// The new JWT (with updated orgId claim) is cached on the parent client.
54 pub async fn switch_org(&self, org_id: &str) -> Result<Value, PulseError> {
55 let body = json!({ "orgId": org_id });
56 let response = self
57 .client
58 .request(Method::POST, "/api/auth/switch-org", Some(&body), true)
59 .await?;
60 cache_token(self.client, &response);
61 Ok(response)
62 }
63}
64
65fn cache_token(client: &PulseClient, response: &Value) {
66 if let Some(token) = response.get("token").and_then(Value::as_str) {
67 if !token.is_empty() {
68 client.set_token(token);
69 }
70 }
71}
72
73// ---------------------------------------------------------------------------
74// PipelinesResource — client.pipelines()
75// ---------------------------------------------------------------------------
76
77pub struct PipelinesResource<'c> {
78 pub(crate) client: &'c PulseClient,
79}
80
81impl PipelinesResource<'_> {
82 /// `GET /api/pulse/pipelines` — every pipeline in the current org.
83 pub async fn list(&self) -> Result<Vec<Value>, PulseError> {
84 let result = self
85 .client
86 .request(Method::GET, "/api/pulse/pipelines", None::<&()>, true)
87 .await?;
88 Ok(unwrap_list(&result, "pipelines"))
89 }
90
91 /// `GET /api/pulse/pipelines/{id}` — one pipeline by id.
92 pub async fn get(&self, pipeline_id: &str) -> Result<Value, PulseError> {
93 let path = format!("/api/pulse/pipelines/{}", encode_path(pipeline_id));
94 self.client
95 .request(Method::GET, &path, None::<&()>, true)
96 .await
97 }
98
99 /// `POST /api/pulse/pipelines` — creates + deploys a new pipeline.
100 ///
101 /// The definition must follow the `CreatePipelineRequest` schema (see
102 /// openapi.yaml). At minimum: `name` + `nodes`.
103 pub async fn create(&self, definition: &Value) -> Result<Value, PulseError> {
104 self.client
105 .request(Method::POST, "/api/pulse/pipelines", Some(definition), true)
106 .await
107 }
108
109 /// `DELETE /api/pulse/pipelines/{id}` — tears down the pipeline.
110 pub async fn delete(&self, pipeline_id: &str) -> Result<(), PulseError> {
111 let path = format!("/api/pulse/pipelines/{}", encode_path(pipeline_id));
112 self.client
113 .request(Method::DELETE, &path, None::<&()>, true)
114 .await?;
115 Ok(())
116 }
117}
118
119// ---------------------------------------------------------------------------
120// AgentsResource — client.agents()
121// ---------------------------------------------------------------------------
122
123pub struct AgentsResource<'c> {
124 pub(crate) client: &'c PulseClient,
125}
126
127impl AgentsResource<'_> {
128 /// `GET /api/pulse/agents` — every deployed agent in the current org.
129 pub async fn list(&self) -> Result<Vec<Value>, PulseError> {
130 let result = self
131 .client
132 .request(Method::GET, "/api/pulse/agents", None::<&()>, true)
133 .await?;
134 Ok(unwrap_list(&result, "agents"))
135 }
136
137 /// `GET /api/pulse/agents/{id}` — one agent by id.
138 pub async fn get(&self, agent_id: &str) -> Result<Value, PulseError> {
139 let path = format!("/api/pulse/agents/{}", encode_path(agent_id));
140 self.client
141 .request(Method::GET, &path, None::<&()>, true)
142 .await
143 }
144
145 /// B-115 Phase 1 — `PUT /api/pulse/agents/{id}`: replace the agent's config.
146 ///
147 /// `config` is the FULL agent config (not a partial merge) — at minimum
148 /// `name`. Optional fields (`engineType`, `inputTopic`, `outputTopic`,
149 /// `description`, `instances`, `monthlyBudget`, `config`) fall back to safe
150 /// defaults when omitted. See the `UpdateAgentRequest` schema in
151 /// `openapi.yaml`.
152 ///
153 /// Today this triggers a full stop + persist + start cycle on the engine
154 /// side — the agent is briefly unavailable while the swap happens.
155 /// Existing state in the agent's keyed store is preserved. Phase 2
156 /// (B-115-engine) will add atomic event-boundary swap so hot-reloadable
157 /// changes apply with no downtime.
158 ///
159 /// Returns the post-update agent snapshot (same shape as [`get`](Self::get)).
160 ///
161 /// # Errors
162 ///
163 /// - [`PulseError::Validation`] on a bad config (self-loop, invalid
164 /// streaming operators)
165 /// - [`PulseError::NotFound`] if the agent doesn't exist
166 pub async fn update(&self, agent_id: &str, config: &Value) -> Result<Value, PulseError> {
167 let path = format!("/api/pulse/agents/{}", encode_path(agent_id));
168 self.client
169 .request(Method::PUT, &path, Some(config), true)
170 .await
171 }
172
173 /// `DELETE /api/pulse/agents/{id}` — stop the agent + remove its config row.
174 ///
175 /// The agent's keyed state store is also dropped. Requires the
176 /// `AGENT_DELETE` permission.
177 pub async fn delete(&self, agent_id: &str) -> Result<(), PulseError> {
178 let path = format!("/api/pulse/agents/{}", encode_path(agent_id));
179 self.client
180 .request::<()>(Method::DELETE, &path, None, true)
181 .await?;
182 Ok(())
183 }
184}
185
186// ---------------------------------------------------------------------------
187// TemplatesResource — client.templates()
188// ---------------------------------------------------------------------------
189
190pub struct TemplatesResource<'c> {
191 pub(crate) client: &'c PulseClient,
192}
193
194impl TemplatesResource<'_> {
195 /// `GET /api/pulse/templates` — the 223+ first-party templates.
196 pub async fn list(&self) -> Result<Vec<Value>, PulseError> {
197 let result = self
198 .client
199 .request(Method::GET, "/api/pulse/templates", None::<&()>, true)
200 .await?;
201 Ok(unwrap_list(&result, "templates"))
202 }
203}
204
205// ---------------------------------------------------------------------------
206// ModelsResource — client.models()
207// ---------------------------------------------------------------------------
208
209/// `client.models()` — B-112 embedded ML model registry.
210///
211/// Upload ONNX models that the streaming `ml_predict` operator scores events
212/// against, in-process on the Pulse engine (no model-server hop). Models are
213/// org-scoped; upload / delete require the ADMIN role.
214///
215/// # Example
216///
217/// ```no_run
218/// use pulse_client::{PulseClient, ModelUpload};
219/// use std::collections::BTreeMap;
220///
221/// # async fn run(client: &PulseClient) -> Result<(), pulse_client::PulseError> {
222/// let mut input = BTreeMap::new();
223/// input.insert("amount".to_string(), "float".to_string());
224/// input.insert("country".to_string(), "string".to_string());
225///
226/// client
227/// .models()
228/// .upload(
229/// ModelUpload::from_path("fraud-classifier", "./model.onnx")
230/// .input_schema(input),
231/// )
232/// .await?;
233/// # Ok(())
234/// # }
235/// ```
236pub struct ModelsResource<'c> {
237 pub(crate) client: &'c PulseClient,
238}
239
240/// B-112 — describes a model upload to [`ModelsResource::upload`].
241///
242/// Supply the model bytes either by file `path` (read at upload time) or as
243/// raw `data`. Exactly one of the two must be set — [`ModelsResource::upload`]
244/// returns a [`PulseError::InvalidConfig`] otherwise.
245#[derive(Debug, Clone, Default)]
246pub struct ModelUpload {
247 /// Model name referenced by `ml_predict(model = ...)`.
248 pub name: String,
249 /// Filesystem path to the `.onnx` file. Mutually exclusive with `data`.
250 pub path: Option<String>,
251 /// Raw model bytes. Mutually exclusive with `path`.
252 pub data: Option<Vec<u8>>,
253 /// Model runtime — only `"onnx"` is supported today. Defaults to `"onnx"`.
254 pub runtime: Option<String>,
255 /// Ordered feature-name → type map, used to pack features into the input
256 /// tensor (in the model's input order).
257 pub input_schema: Option<std::collections::BTreeMap<String, String>>,
258 /// Output-name → type map (informational).
259 pub output_schema: Option<std::collections::BTreeMap<String, String>>,
260}
261
262impl ModelUpload {
263 /// Upload from a filesystem path to the `.onnx` file.
264 pub fn from_path(name: impl Into<String>, path: impl Into<String>) -> Self {
265 Self {
266 name: name.into(),
267 path: Some(path.into()),
268 ..Self::default()
269 }
270 }
271
272 /// Upload from raw model bytes.
273 pub fn from_bytes(name: impl Into<String>, data: Vec<u8>) -> Self {
274 Self {
275 name: name.into(),
276 data: Some(data),
277 ..Self::default()
278 }
279 }
280
281 /// Override the runtime (default `"onnx"`).
282 pub fn runtime(mut self, runtime: impl Into<String>) -> Self {
283 self.runtime = Some(runtime.into());
284 self
285 }
286
287 /// Set the ordered input feature schema.
288 pub fn input_schema(mut self, schema: std::collections::BTreeMap<String, String>) -> Self {
289 self.input_schema = Some(schema);
290 self
291 }
292
293 /// Set the (informational) output schema.
294 pub fn output_schema(mut self, schema: std::collections::BTreeMap<String, String>) -> Self {
295 self.output_schema = Some(schema);
296 self
297 }
298}
299
300impl ModelsResource<'_> {
301 /// `POST /api/pulse/ml-models` — upload (or replace) a model.
302 ///
303 /// Sent as `multipart/form-data`: a file part named `model` carrying the
304 /// bytes, plus text parts `name`, `runtime`, and (when set) `inputSchema` /
305 /// `outputSchema` as JSON strings. Replacing an existing name hot-swaps the
306 /// model with no agent restart.
307 ///
308 /// Returns the persisted model metadata (name, runtime, sha256, version, …).
309 ///
310 /// # Errors
311 ///
312 /// - [`PulseError::InvalidConfig`] if `name` is blank, if neither or both
313 /// of `path`/`data` are set, or if the model bytes are empty.
314 /// - [`PulseError::Transport`] if reading the file at `path` fails.
315 pub async fn upload(&self, upload: ModelUpload) -> Result<Value, PulseError> {
316 if upload.name.trim().is_empty() {
317 return Err(PulseError::InvalidConfig(
318 "model name must be a non-empty string".to_string(),
319 ));
320 }
321 if upload.path.is_some() == upload.data.is_some() {
322 return Err(PulseError::InvalidConfig(
323 "provide exactly one of 'path' or 'data'".to_string(),
324 ));
325 }
326
327 let (blob, filename) = match (&upload.path, upload.data) {
328 (Some(path), None) => {
329 let bytes = std::fs::read(path)
330 .map_err(|e| PulseError::InvalidConfig(format!("read {path}: {e}")))?;
331 let filename = path
332 .rsplit(['/', '\\'])
333 .next()
334 .filter(|s| !s.is_empty())
335 .unwrap_or("model.onnx")
336 .to_string();
337 (bytes, filename)
338 }
339 (None, Some(data)) => (data, format!("{}.onnx", upload.name)),
340 // Unreachable — guarded by the XOR check above.
341 _ => unreachable!("exactly one of path/data enforced above"),
342 };
343 if blob.is_empty() {
344 return Err(PulseError::InvalidConfig(
345 "model bytes are empty".to_string(),
346 ));
347 }
348
349 let runtime = upload.runtime.unwrap_or_else(|| "onnx".to_string());
350 let model_part = reqwest::multipart::Part::bytes(blob)
351 .file_name(filename)
352 .mime_str("application/octet-stream")
353 .map_err(PulseError::Transport)?;
354 let mut form = reqwest::multipart::Form::new()
355 .text("name", upload.name)
356 .text("runtime", runtime)
357 .part("model", model_part);
358 if let Some(schema) = upload.input_schema {
359 form = form.text("inputSchema", serde_json::to_string(&schema)?);
360 }
361 if let Some(schema) = upload.output_schema {
362 form = form.text("outputSchema", serde_json::to_string(&schema)?);
363 }
364
365 self.client
366 .request_multipart("/api/pulse/ml-models", form)
367 .await
368 }
369
370 /// `GET /api/pulse/ml-models` — models registered for the caller's org.
371 pub async fn list(&self) -> Result<Vec<Value>, PulseError> {
372 let result = self
373 .client
374 .request(Method::GET, "/api/pulse/ml-models", None::<&()>, true)
375 .await?;
376 Ok(unwrap_list(&result, "models"))
377 }
378
379 /// `GET /api/pulse/ml-models/{name}` — metadata for one model.
380 pub async fn get(&self, name: &str) -> Result<Value, PulseError> {
381 let path = format!("/api/pulse/ml-models/{}", encode_path(name));
382 self.client
383 .request(Method::GET, &path, None::<&()>, true)
384 .await
385 }
386
387 /// `DELETE /api/pulse/ml-models/{name}` — remove a model (ADMIN).
388 pub async fn delete(&self, name: &str) -> Result<(), PulseError> {
389 let path = format!("/api/pulse/ml-models/{}", encode_path(name));
390 self.client
391 .request::<()>(Method::DELETE, &path, None, true)
392 .await?;
393 Ok(())
394 }
395}
396
397// ---------------------------------------------------------------------------
398// UsersResource — client.users()
399// ---------------------------------------------------------------------------
400
401pub struct UsersResource<'c> {
402 pub(crate) client: &'c PulseClient,
403}
404
405impl UsersResource<'_> {
406 /// `GET /api/pulse/users` — every user in the current org.
407 ///
408 /// Requires the caller to have the `USERS_LIST` permission atom (Owner /
409 /// Platform Admin personas by default — see B-105).
410 pub async fn list(&self) -> Result<Vec<Value>, PulseError> {
411 let result = self
412 .client
413 .request(Method::GET, "/api/pulse/users", None::<&()>, true)
414 .await?;
415 Ok(unwrap_list(&result, "users"))
416 }
417}
418
419// ---------------------------------------------------------------------------
420// Helpers
421// ---------------------------------------------------------------------------
422
423/// Extracts a `Vec<Value>` from `result[key]`. Returns an empty Vec for
424/// missing / malformed envelopes — never panics — so callers can iterate
425/// safely.
426fn unwrap_list(result: &Value, key: &str) -> Vec<Value> {
427 result
428 .get(key)
429 .and_then(Value::as_array)
430 .cloned()
431 .unwrap_or_default()
432}
433
434/// URL-encodes a path-param segment so ids containing `/`, spaces, etc.
435/// round-trip safely. Uses the same character set as the `pulse-go`
436/// `url.PathEscape` and `pulse-java` `URLEncoder` — `+` is encoded as `%20`.
437fn encode_path(segment: &str) -> String {
438 let mut out = String::with_capacity(segment.len());
439 for b in segment.bytes() {
440 match b {
441 b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
442 out.push(b as char)
443 }
444 _ => out.push_str(&format!("%{b:02X}")),
445 }
446 }
447 out
448}
449
450// ---------------------------------------------------------------------------
451// ConnectorsResource — client.connectors() (B-093 follow-up: catalogue parity)
452// ---------------------------------------------------------------------------
453
454/// `client.connectors()` — the connector catalogue, the same list the Pipeline
455/// Studio palette and `pulse connectors list` show. Each entry is
456/// `{subType, displayName, configFields}`; use the `subType` as a sink/source
457/// node `type` in a pipeline definition deployed via `client.pipelines()`.
458/// Bridged connectors appear only when the enterprise bridge JAR is on the
459/// server's classpath.
460pub struct ConnectorsResource<'c> {
461 pub(crate) client: &'c PulseClient,
462}
463
464impl ConnectorsResource<'_> {
465 /// `GET /api/pulse/connectors` — `{"sources": [...], "sinks": [...]}`.
466 pub async fn list(&self) -> Result<Value, PulseError> {
467 self.client
468 .request(Method::GET, "/api/pulse/connectors", None::<&()>, true)
469 .await
470 }
471
472 /// Just the sink connectors.
473 pub async fn sinks(&self) -> Result<Vec<Value>, PulseError> {
474 Ok(unwrap_list(&self.list().await?, "sinks"))
475 }
476
477 /// Just the source connectors.
478 pub async fn sources(&self) -> Result<Vec<Value>, PulseError> {
479 Ok(unwrap_list(&self.list().await?, "sources"))
480 }
481}