pulse_client/resources.rs
1//! Resource accessors — one per OpenAPI tag.
2
3use reqwest::Method;
4use serde_json::{json, Value};
5
6use crate::client::PulseClient;
7use crate::error::PulseError;
8
9// ---------------------------------------------------------------------------
10// AuthResource — client.auth()
11// ---------------------------------------------------------------------------
12
13pub struct AuthResource<'c> {
14 pub(crate) client: &'c PulseClient,
15}
16
17impl AuthResource<'_> {
18 /// `POST /api/auth/login` — exchanges username + password for a JWT.
19 ///
20 /// On success, the returned token is cached on the parent client so
21 /// subsequent calls authenticate automatically.
22 pub async fn login(&self, username: &str, password: &str) -> Result<Value, PulseError> {
23 let body = json!({ "username": username, "password": password });
24 let response = self
25 .client
26 .request(Method::POST, "/api/auth/login", Some(&body), false)
27 .await?;
28 cache_token(self.client, &response);
29 Ok(response)
30 }
31
32 /// `POST /api/auth/refresh` — exchanges a refresh token for a fresh JWT.
33 pub async fn refresh(&self, refresh_token: &str) -> Result<Value, PulseError> {
34 let body = json!({ "refreshToken": refresh_token });
35 let response = self
36 .client
37 .request(Method::POST, "/api/auth/refresh", Some(&body), false)
38 .await?;
39 cache_token(self.client, &response);
40 Ok(response)
41 }
42
43 /// `GET /api/auth/organizations` — orgs the current user is a member of.
44 pub async fn organizations(&self) -> Result<Vec<Value>, PulseError> {
45 let result = self
46 .client
47 .request(Method::GET, "/api/auth/organizations", None::<&()>, true)
48 .await?;
49 Ok(unwrap_list(&result, "organizations"))
50 }
51
52 /// `POST /api/auth/switch-org` — switches the active organisation.
53 /// The new JWT (with updated orgId claim) is cached on the parent client.
54 pub async fn switch_org(&self, org_id: &str) -> Result<Value, PulseError> {
55 let body = json!({ "orgId": org_id });
56 let response = self
57 .client
58 .request(Method::POST, "/api/auth/switch-org", Some(&body), true)
59 .await?;
60 cache_token(self.client, &response);
61 Ok(response)
62 }
63}
64
65fn cache_token(client: &PulseClient, response: &Value) {
66 if let Some(token) = response.get("token").and_then(Value::as_str) {
67 if !token.is_empty() {
68 client.set_token(token);
69 }
70 }
71}
72
73// ---------------------------------------------------------------------------
74// PipelinesResource — client.pipelines()
75// ---------------------------------------------------------------------------
76
77pub struct PipelinesResource<'c> {
78 pub(crate) client: &'c PulseClient,
79}
80
81impl PipelinesResource<'_> {
82 /// `GET /api/pulse/pipelines` — every pipeline in the current org.
83 pub async fn list(&self) -> Result<Vec<Value>, PulseError> {
84 let result = self
85 .client
86 .request(Method::GET, "/api/pulse/pipelines", None::<&()>, true)
87 .await?;
88 Ok(unwrap_list(&result, "pipelines"))
89 }
90
91 /// `GET /api/pulse/pipelines/{id}` — one pipeline by id.
92 pub async fn get(&self, pipeline_id: &str) -> Result<Value, PulseError> {
93 let path = format!("/api/pulse/pipelines/{}", encode_path(pipeline_id));
94 self.client
95 .request(Method::GET, &path, None::<&()>, true)
96 .await
97 }
98
99 /// `POST /api/pulse/pipelines` — creates + deploys a new pipeline.
100 ///
101 /// The definition must follow the `CreatePipelineRequest` schema (see
102 /// openapi.yaml). At minimum: `name` + `nodes`.
103 pub async fn create(&self, definition: &Value) -> Result<Value, PulseError> {
104 self.client
105 .request(Method::POST, "/api/pulse/pipelines", Some(definition), true)
106 .await
107 }
108
109 /// `DELETE /api/pulse/pipelines/{id}` — tears down the pipeline.
110 pub async fn delete(&self, pipeline_id: &str) -> Result<(), PulseError> {
111 let path = format!("/api/pulse/pipelines/{}", encode_path(pipeline_id));
112 self.client
113 .request(Method::DELETE, &path, None::<&()>, true)
114 .await?;
115 Ok(())
116 }
117}
118
119// ---------------------------------------------------------------------------
120// AgentsResource — client.agents()
121// ---------------------------------------------------------------------------
122
123pub struct AgentsResource<'c> {
124 pub(crate) client: &'c PulseClient,
125}
126
127impl AgentsResource<'_> {
128 /// `GET /api/pulse/agents` — every deployed agent in the current org.
129 pub async fn list(&self) -> Result<Vec<Value>, PulseError> {
130 let result = self
131 .client
132 .request(Method::GET, "/api/pulse/agents", None::<&()>, true)
133 .await?;
134 Ok(unwrap_list(&result, "agents"))
135 }
136
137 /// `GET /api/pulse/agents/{id}` — one agent by id.
138 pub async fn get(&self, agent_id: &str) -> Result<Value, PulseError> {
139 let path = format!("/api/pulse/agents/{}", encode_path(agent_id));
140 self.client
141 .request(Method::GET, &path, None::<&()>, true)
142 .await
143 }
144
145 /// B-115 Phase 1 — `PUT /api/pulse/agents/{id}`: replace the agent's config.
146 ///
147 /// `config` is the FULL agent config (not a partial merge) — at minimum
148 /// `name`. Optional fields (`engineType`, `inputTopic`, `outputTopic`,
149 /// `description`, `instances`, `monthlyBudget`, `config`) fall back to safe
150 /// defaults when omitted. See the `UpdateAgentRequest` schema in
151 /// `openapi.yaml`.
152 ///
153 /// Today this triggers a full stop + persist + start cycle on the engine
154 /// side — the agent is briefly unavailable while the swap happens.
155 /// Existing state in the agent's keyed store is preserved. Phase 2
156 /// (B-115-engine) will add atomic event-boundary swap so hot-reloadable
157 /// changes apply with no downtime.
158 ///
159 /// Returns the post-update agent snapshot (same shape as [`get`](Self::get)).
160 ///
161 /// # Errors
162 ///
163 /// - [`PulseError::Validation`] on a bad config (self-loop, invalid
164 /// streaming operators)
165 /// - [`PulseError::NotFound`] if the agent doesn't exist
166 pub async fn update(&self, agent_id: &str, config: &Value) -> Result<Value, PulseError> {
167 let path = format!("/api/pulse/agents/{}", encode_path(agent_id));
168 self.client
169 .request(Method::PUT, &path, Some(config), true)
170 .await
171 }
172
173 /// `DELETE /api/pulse/agents/{id}` — stop the agent + remove its config row.
174 ///
175 /// The agent's keyed state store is also dropped. Requires the
176 /// `AGENT_DELETE` permission.
177 pub async fn delete(&self, agent_id: &str) -> Result<(), PulseError> {
178 let path = format!("/api/pulse/agents/{}", encode_path(agent_id));
179 self.client
180 .request::<()>(Method::DELETE, &path, None, true)
181 .await?;
182 Ok(())
183 }
184}
185
186// ---------------------------------------------------------------------------
187// TemplatesResource — client.templates()
188// ---------------------------------------------------------------------------
189
190pub struct TemplatesResource<'c> {
191 pub(crate) client: &'c PulseClient,
192}
193
194impl TemplatesResource<'_> {
195 /// `GET /api/pulse/templates` — the 223+ first-party templates.
196 pub async fn list(&self) -> Result<Vec<Value>, PulseError> {
197 let result = self
198 .client
199 .request(Method::GET, "/api/pulse/templates", None::<&()>, true)
200 .await?;
201 Ok(unwrap_list(&result, "templates"))
202 }
203}
204
205// ---------------------------------------------------------------------------
206// UsersResource — client.users()
207// ---------------------------------------------------------------------------
208
209pub struct UsersResource<'c> {
210 pub(crate) client: &'c PulseClient,
211}
212
213impl UsersResource<'_> {
214 /// `GET /api/pulse/users` — every user in the current org.
215 ///
216 /// Requires the caller to have the `USERS_LIST` permission atom (Owner /
217 /// Platform Admin personas by default — see B-105).
218 pub async fn list(&self) -> Result<Vec<Value>, PulseError> {
219 let result = self
220 .client
221 .request(Method::GET, "/api/pulse/users", None::<&()>, true)
222 .await?;
223 Ok(unwrap_list(&result, "users"))
224 }
225}
226
227// ---------------------------------------------------------------------------
228// Helpers
229// ---------------------------------------------------------------------------
230
231/// Extracts a `Vec<Value>` from `result[key]`. Returns an empty Vec for
232/// missing / malformed envelopes — never panics — so callers can iterate
233/// safely.
234fn unwrap_list(result: &Value, key: &str) -> Vec<Value> {
235 result
236 .get(key)
237 .and_then(Value::as_array)
238 .cloned()
239 .unwrap_or_default()
240}
241
242/// URL-encodes a path-param segment so ids containing `/`, spaces, etc.
243/// round-trip safely. Uses the same character set as the `pulse-go`
244/// `url.PathEscape` and `pulse-java` `URLEncoder` — `+` is encoded as `%20`.
245fn encode_path(segment: &str) -> String {
246 let mut out = String::with_capacity(segment.len());
247 for b in segment.bytes() {
248 match b {
249 b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
250 out.push(b as char)
251 }
252 _ => out.push_str(&format!("%{b:02X}")),
253 }
254 }
255 out
256}