Skip to main content

pulse_client/
resources.rs

1//! Resource accessors — one per OpenAPI tag.
2
3use reqwest::Method;
4use serde_json::{json, Value};
5
6use crate::client::PulseClient;
7use crate::error::PulseError;
8
9// ---------------------------------------------------------------------------
10// AuthResource — client.auth()
11// ---------------------------------------------------------------------------
12
13pub struct AuthResource<'c> {
14    pub(crate) client: &'c PulseClient,
15}
16
17impl AuthResource<'_> {
18    /// `POST /api/auth/login` — exchanges username + password for a JWT.
19    ///
20    /// On success, the returned token is cached on the parent client so
21    /// subsequent calls authenticate automatically.
22    pub async fn login(&self, username: &str, password: &str) -> Result<Value, PulseError> {
23        let body = json!({ "username": username, "password": password });
24        let response = self
25            .client
26            .request(Method::POST, "/api/auth/login", Some(&body), false)
27            .await?;
28        cache_token(self.client, &response);
29        Ok(response)
30    }
31
32    /// `POST /api/auth/refresh` — exchanges a refresh token for a fresh JWT.
33    pub async fn refresh(&self, refresh_token: &str) -> Result<Value, PulseError> {
34        let body = json!({ "refreshToken": refresh_token });
35        let response = self
36            .client
37            .request(Method::POST, "/api/auth/refresh", Some(&body), false)
38            .await?;
39        cache_token(self.client, &response);
40        Ok(response)
41    }
42
43    /// `GET /api/auth/organizations` — orgs the current user is a member of.
44    pub async fn organizations(&self) -> Result<Vec<Value>, PulseError> {
45        let result = self
46            .client
47            .request(Method::GET, "/api/auth/organizations", None::<&()>, true)
48            .await?;
49        Ok(unwrap_list(&result, "organizations"))
50    }
51
52    /// `POST /api/auth/switch-org` — switches the active organisation.
53    /// The new JWT (with updated orgId claim) is cached on the parent client.
54    pub async fn switch_org(&self, org_id: &str) -> Result<Value, PulseError> {
55        let body = json!({ "orgId": org_id });
56        let response = self
57            .client
58            .request(Method::POST, "/api/auth/switch-org", Some(&body), true)
59            .await?;
60        cache_token(self.client, &response);
61        Ok(response)
62    }
63}
64
65fn cache_token(client: &PulseClient, response: &Value) {
66    if let Some(token) = response.get("token").and_then(Value::as_str) {
67        if !token.is_empty() {
68            client.set_token(token);
69        }
70    }
71}
72
73// ---------------------------------------------------------------------------
74// PipelinesResource — client.pipelines()
75// ---------------------------------------------------------------------------
76
77pub struct PipelinesResource<'c> {
78    pub(crate) client: &'c PulseClient,
79}
80
81impl PipelinesResource<'_> {
82    /// `GET /api/pulse/pipelines` — every pipeline in the current org.
83    pub async fn list(&self) -> Result<Vec<Value>, PulseError> {
84        let result = self
85            .client
86            .request(Method::GET, "/api/pulse/pipelines", None::<&()>, true)
87            .await?;
88        Ok(unwrap_list(&result, "pipelines"))
89    }
90
91    /// `GET /api/pulse/pipelines/{id}` — one pipeline by id.
92    pub async fn get(&self, pipeline_id: &str) -> Result<Value, PulseError> {
93        let path = format!("/api/pulse/pipelines/{}", encode_path(pipeline_id));
94        self.client
95            .request(Method::GET, &path, None::<&()>, true)
96            .await
97    }
98
99    /// `POST /api/pulse/pipelines` — creates + deploys a new pipeline.
100    ///
101    /// The definition must follow the `CreatePipelineRequest` schema (see
102    /// openapi.yaml). At minimum: `name` + `nodes`.
103    pub async fn create(&self, definition: &Value) -> Result<Value, PulseError> {
104        self.client
105            .request(Method::POST, "/api/pulse/pipelines", Some(definition), true)
106            .await
107    }
108
109    /// `DELETE /api/pulse/pipelines/{id}` — tears down the pipeline.
110    pub async fn delete(&self, pipeline_id: &str) -> Result<(), PulseError> {
111        let path = format!("/api/pulse/pipelines/{}", encode_path(pipeline_id));
112        self.client
113            .request(Method::DELETE, &path, None::<&()>, true)
114            .await?;
115        Ok(())
116    }
117}
118
119// ---------------------------------------------------------------------------
120// AgentsResource — client.agents()
121// ---------------------------------------------------------------------------
122
123pub struct AgentsResource<'c> {
124    pub(crate) client: &'c PulseClient,
125}
126
127impl AgentsResource<'_> {
128    /// `GET /api/pulse/agents` — every deployed agent in the current org.
129    pub async fn list(&self) -> Result<Vec<Value>, PulseError> {
130        let result = self
131            .client
132            .request(Method::GET, "/api/pulse/agents", None::<&()>, true)
133            .await?;
134        Ok(unwrap_list(&result, "agents"))
135    }
136
137    /// `GET /api/pulse/agents/{id}` — one agent by id.
138    pub async fn get(&self, agent_id: &str) -> Result<Value, PulseError> {
139        let path = format!("/api/pulse/agents/{}", encode_path(agent_id));
140        self.client
141            .request(Method::GET, &path, None::<&()>, true)
142            .await
143    }
144
145    /// B-115 Phase 1 — `PUT /api/pulse/agents/{id}`: replace the agent's config.
146    ///
147    /// `config` is the FULL agent config (not a partial merge) — at minimum
148    /// `name`. Optional fields (`engineType`, `inputTopic`, `outputTopic`,
149    /// `description`, `instances`, `monthlyBudget`, `config`) fall back to safe
150    /// defaults when omitted. See the `UpdateAgentRequest` schema in
151    /// `openapi.yaml`.
152    ///
153    /// Today this triggers a full stop + persist + start cycle on the engine
154    /// side — the agent is briefly unavailable while the swap happens.
155    /// Existing state in the agent's keyed store is preserved. Phase 2
156    /// (B-115-engine) will add atomic event-boundary swap so hot-reloadable
157    /// changes apply with no downtime.
158    ///
159    /// Returns the post-update agent snapshot (same shape as [`get`](Self::get)).
160    ///
161    /// # Errors
162    ///
163    /// - [`PulseError::Validation`] on a bad config (self-loop, invalid
164    ///   streaming operators)
165    /// - [`PulseError::NotFound`] if the agent doesn't exist
166    pub async fn update(&self, agent_id: &str, config: &Value) -> Result<Value, PulseError> {
167        let path = format!("/api/pulse/agents/{}", encode_path(agent_id));
168        self.client
169            .request(Method::PUT, &path, Some(config), true)
170            .await
171    }
172
173    /// `DELETE /api/pulse/agents/{id}` — stop the agent + remove its config row.
174    ///
175    /// The agent's keyed state store is also dropped. Requires the
176    /// `AGENT_DELETE` permission.
177    pub async fn delete(&self, agent_id: &str) -> Result<(), PulseError> {
178        let path = format!("/api/pulse/agents/{}", encode_path(agent_id));
179        self.client
180            .request::<()>(Method::DELETE, &path, None, true)
181            .await?;
182        Ok(())
183    }
184}
185
186// ---------------------------------------------------------------------------
187// TemplatesResource — client.templates()
188// ---------------------------------------------------------------------------
189
190pub struct TemplatesResource<'c> {
191    pub(crate) client: &'c PulseClient,
192}
193
194impl TemplatesResource<'_> {
195    /// `GET /api/pulse/templates` — the 223+ first-party templates.
196    pub async fn list(&self) -> Result<Vec<Value>, PulseError> {
197        let result = self
198            .client
199            .request(Method::GET, "/api/pulse/templates", None::<&()>, true)
200            .await?;
201        Ok(unwrap_list(&result, "templates"))
202    }
203}
204
205// ---------------------------------------------------------------------------
206// UsersResource — client.users()
207// ---------------------------------------------------------------------------
208
209pub struct UsersResource<'c> {
210    pub(crate) client: &'c PulseClient,
211}
212
213impl UsersResource<'_> {
214    /// `GET /api/pulse/users` — every user in the current org.
215    ///
216    /// Requires the caller to have the `USERS_LIST` permission atom (Owner /
217    /// Platform Admin personas by default — see B-105).
218    pub async fn list(&self) -> Result<Vec<Value>, PulseError> {
219        let result = self
220            .client
221            .request(Method::GET, "/api/pulse/users", None::<&()>, true)
222            .await?;
223        Ok(unwrap_list(&result, "users"))
224    }
225}
226
227// ---------------------------------------------------------------------------
228// Helpers
229// ---------------------------------------------------------------------------
230
231/// Extracts a `Vec<Value>` from `result[key]`. Returns an empty Vec for
232/// missing / malformed envelopes — never panics — so callers can iterate
233/// safely.
234fn unwrap_list(result: &Value, key: &str) -> Vec<Value> {
235    result
236        .get(key)
237        .and_then(Value::as_array)
238        .cloned()
239        .unwrap_or_default()
240}
241
242/// URL-encodes a path-param segment so ids containing `/`, spaces, etc.
243/// round-trip safely. Uses the same character set as the `pulse-go`
244/// `url.PathEscape` and `pulse-java` `URLEncoder` — `+` is encoded as `%20`.
245fn encode_path(segment: &str) -> String {
246    let mut out = String::with_capacity(segment.len());
247    for b in segment.bytes() {
248        match b {
249            b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
250                out.push(b as char)
251            }
252            _ => out.push_str(&format!("%{b:02X}")),
253        }
254    }
255    out
256}