1use std::sync::Arc;
4use std::sync::RwLock;
5use std::time::Duration;
6
7use reqwest::Method;
8use reqwest::StatusCode;
9use serde::Serialize;
10use serde_json::Value;
11
12use crate::error::PulseError;
13use crate::events::EventsResource;
14use crate::iq::IQResource;
15use crate::resources::{
16 AgentsResource, AuthResource, PipelinesResource, TemplatesResource, UsersResource,
17};
18use crate::streams::StreamsResource;
19
20const USER_AGENT: &str = "pulse-client-rust/2.6.0";
21const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
22
23#[derive(Clone)]
50pub struct PulseClient {
51 pub(crate) inner: Arc<Inner>,
52}
53
54pub(crate) struct Inner {
55 pub(crate) base_url: String,
56 pub(crate) http: reqwest::Client,
57 pub(crate) token: RwLock<Option<String>>,
58}
59
60impl PulseClient {
61 pub fn builder() -> PulseClientBuilder {
62 PulseClientBuilder::default()
63 }
64
65 pub fn token(&self) -> Option<String> {
67 self.inner.token.read().ok().and_then(|guard| guard.clone())
68 }
69
70 pub fn set_token<S: Into<String>>(&self, token: S) {
73 if let Ok(mut guard) = self.inner.token.write() {
74 *guard = Some(token.into());
75 }
76 }
77
78 pub fn clear_token(&self) {
80 if let Ok(mut guard) = self.inner.token.write() {
81 *guard = None;
82 }
83 }
84
85 pub fn auth(&self) -> AuthResource<'_> {
89 AuthResource { client: self }
90 }
91
92 pub fn pipelines(&self) -> PipelinesResource<'_> {
93 PipelinesResource { client: self }
94 }
95
96 pub fn agents(&self) -> AgentsResource<'_> {
97 AgentsResource { client: self }
98 }
99
100 pub fn templates(&self) -> TemplatesResource<'_> {
101 TemplatesResource { client: self }
102 }
103
104 pub fn users(&self) -> UsersResource<'_> {
105 UsersResource { client: self }
106 }
107
108 pub fn events(&self) -> EventsResource<'_> {
109 EventsResource { client: self }
110 }
111
112 pub fn iq(&self) -> IQResource<'_> {
113 IQResource { client: self }
114 }
115
116 pub fn streams(&self) -> StreamsResource<'_> {
118 StreamsResource { client: self }
119 }
120
121 pub async fn version(&self) -> Result<Value, PulseError> {
124 self.request(Method::GET, "/api/pulse/version", None::<&()>, false)
125 .await
126 }
127
128 pub(crate) async fn request<B: Serialize + ?Sized>(
132 &self,
133 method: Method,
134 path: &str,
135 body: Option<&B>,
136 authenticated: bool,
137 ) -> Result<Value, PulseError> {
138 let url = format!("{}{path}", self.inner.base_url);
139 let mut req = self.inner.http.request(method, url);
140
141 if authenticated {
142 match self.token() {
143 Some(token) if !token.is_empty() => {
144 req = req.bearer_auth(token);
145 }
146 _ => {
147 return Err(PulseError::NoToken {
148 path: path.to_string(),
149 });
150 }
151 }
152 }
153
154 if let Some(payload) = body {
155 req = req.json(payload);
156 }
157
158 let response = req.send().await?;
159 let status = response.status();
160
161 if status == StatusCode::NO_CONTENT {
162 return Ok(Value::Object(Default::default()));
163 }
164
165 if status.is_success() {
166 let bytes = response.bytes().await?;
168 if bytes.is_empty() {
169 return Ok(Value::Object(Default::default()));
170 }
171 return Ok(serde_json::from_slice(&bytes)?);
172 }
173
174 let retry_after_header = response
176 .headers()
177 .get(reqwest::header::RETRY_AFTER)
178 .and_then(|v| v.to_str().ok())
179 .and_then(|s| s.trim().parse::<u32>().ok());
180
181 let bytes = response.bytes().await?;
182 let parsed_body: Option<Value> = if bytes.is_empty() {
183 None
184 } else {
185 match serde_json::from_slice::<Value>(&bytes) {
186 Ok(v) => Some(v),
187 Err(_) => {
188 let raw = String::from_utf8_lossy(&bytes);
189 let trimmed = if raw.len() > 200 { &raw[..200] } else { &raw };
190 Some(serde_json::json!({ "error": trimmed }))
191 }
192 }
193 };
194
195 Err(translate_error(
196 status,
197 path,
198 parsed_body,
199 retry_after_header,
200 ))
201 }
202}
203
204fn translate_error(
205 status: StatusCode,
206 path: &str,
207 body: Option<Value>,
208 retry_after_header: Option<u32>,
209) -> PulseError {
210 let path = path.to_string();
211 match status {
212 StatusCode::UNAUTHORIZED => PulseError::Auth { path, body },
213 StatusCode::NOT_FOUND => PulseError::NotFound { path, body },
214 StatusCode::BAD_REQUEST => PulseError::Validation { path, body },
215 StatusCode::TOO_MANY_REQUESTS => {
216 let retry_from_body = body
217 .as_ref()
218 .and_then(|v| v.get("retryAfterSeconds"))
219 .and_then(|v| v.as_u64())
220 .map(|n| n as u32);
221 PulseError::RateLimit {
222 path,
223 body,
224 retry_after_seconds: retry_from_body.or(retry_after_header),
225 }
226 }
227 other => PulseError::Api {
228 status: other.as_u16(),
229 path,
230 body,
231 },
232 }
233}
234
235fn strip_trailing_slash(url: &str) -> String {
236 let mut s = url.to_string();
237 while s.len() > 1 && s.ends_with('/') {
238 s.pop();
239 }
240 s
241}
242
243#[derive(Default, Debug)]
249pub struct PulseClientBuilder {
250 base_url: Option<String>,
251 token: Option<String>,
252 timeout: Option<Duration>,
253 http: Option<reqwest::Client>,
254}
255
256impl PulseClientBuilder {
257 pub fn base_url<S: Into<String>>(mut self, base_url: S) -> Self {
259 self.base_url = Some(base_url.into());
260 self
261 }
262
263 pub fn token<S: Into<String>>(mut self, token: S) -> Self {
265 self.token = Some(token.into());
266 self
267 }
268
269 pub fn timeout(mut self, timeout: Duration) -> Self {
271 self.timeout = Some(timeout);
272 self
273 }
274
275 pub fn http_client(mut self, http: reqwest::Client) -> Self {
278 self.http = Some(http);
279 self
280 }
281
282 pub fn build(self) -> Result<PulseClient, PulseError> {
283 let base_url = self
284 .base_url
285 .ok_or_else(|| PulseError::InvalidConfig("base_url is required".to_string()))?;
286 if base_url.is_empty() {
287 return Err(PulseError::InvalidConfig(
288 "base_url cannot be empty".to_string(),
289 ));
290 }
291
292 let http = match self.http {
293 Some(c) => c,
294 None => reqwest::Client::builder()
295 .timeout(self.timeout.unwrap_or(DEFAULT_TIMEOUT))
296 .user_agent(USER_AGENT)
297 .build()
298 .map_err(PulseError::Transport)?,
299 };
300
301 Ok(PulseClient {
302 inner: Arc::new(Inner {
303 base_url: strip_trailing_slash(&base_url),
304 http,
305 token: RwLock::new(self.token),
306 }),
307 })
308 }
309}