1use crate::auth::{AuthStrategy, TokenProvider};
2use crate::config::{PortConfig, PortRegion, RetryConfig, TelemetryConfig};
3use crate::error::PortError;
4use crate::tracking::{new_shared_tracker, ResourceTrackerHandle};
5#[cfg(feature = "retry")]
6use backoff::backoff::Backoff;
7#[cfg(feature = "retry")]
8use backoff::ExponentialBackoff;
9use httpdate::parse_http_date;
10use reqwest::header::HeaderValue;
11use reqwest::{Client, Method, Proxy, Request, Response, Url};
12use serde::de::DeserializeOwned;
13use serde::Serialize;
14use std::sync::Arc;
15use std::time::{Duration, Instant, SystemTime};
16#[cfg(feature = "tracing")]
17use tracing::{Instrument, Span};
18
19#[derive(Debug, Clone, Default, Serialize)]
21#[serde(rename_all = "camelCase")]
22pub struct Pagination {
23 #[serde(skip_serializing_if = "Option::is_none")]
24 page: Option<u32>,
25 #[serde(skip_serializing_if = "Option::is_none")]
26 per_page: Option<u32>,
27 #[serde(skip_serializing_if = "Option::is_none")]
28 cursor: Option<String>,
29}
30
31impl Pagination {
32 pub fn builder() -> PaginationBuilder {
33 PaginationBuilder::default()
34 }
35
36 pub fn is_empty(&self) -> bool {
37 self.page.is_none() && self.per_page.is_none() && self.cursor.is_none()
38 }
39}
40
41#[cfg(feature = "tracing")]
42impl PortClient {
43 fn start_request_span(&self, request: &Request) -> Span {
44 if !self.telemetry.enable_tracing {
45 return Span::none();
46 }
47
48 tracing::info_span!(
49 "port_sdk.request",
50 method = %request.method(),
51 path = %request.url(),
52 retry_enabled = self.retry_policy.is_some()
53 )
54 }
55
56 fn finish_request_span(&self, span: &Span, response: &Response) {
57 if !self.telemetry.enable_tracing || span.is_none() {
58 return;
59 }
60
61 let status = response.status();
62 let _guard = span.enter();
63 if status.is_success() {
64 tracing::info!(status = %status, "request completed");
65 } else {
66 tracing::warn!(status = %status, "request completed with failure");
67 }
68 }
69}
70
71#[derive(Debug, Default)]
73pub struct PaginationBuilder {
74 inner: Pagination,
75}
76
77impl PaginationBuilder {
78 pub fn new() -> Self {
79 Self::default()
80 }
81
82 pub fn page(mut self, page: u32) -> Self {
83 self.inner.page = Some(page);
84 self
85 }
86
87 pub fn per_page(mut self, per_page: u32) -> Self {
88 self.inner.per_page = Some(per_page);
89 self
90 }
91
92 pub fn cursor(mut self, cursor: impl Into<String>) -> Self {
93 self.inner.cursor = Some(cursor.into());
94 self
95 }
96
97 pub fn build(self) -> Pagination {
98 self.inner
99 }
100}
101
102#[derive(Clone)]
103pub struct PortClient {
104 http: Client,
105 base_url: Url,
106 token_provider: Arc<dyn TokenProvider>,
107 retry_policy: Option<RetryPolicy>,
108 tracker: ResourceTrackerHandle,
109 telemetry: TelemetryConfig,
110}
111
112impl std::fmt::Debug for PortClient {
113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114 f.debug_struct("PortClient")
115 .field("base_url", &self.base_url)
116 .field("retry_policy", &self.retry_policy)
117 .field("tracing", &self.telemetry.enable_tracing)
118 .finish()
119 }
120}
121
122impl PortClient {
123 pub fn builder() -> PortClientBuilder {
124 PortClientBuilder::default()
125 }
126
127 pub fn from_config(config: PortConfig) -> Result<Self, PortError> {
128 PortClientBuilder::from_config(config).build()
129 }
130
131 pub fn from_env() -> Result<Self, PortError> {
132 let config = PortConfig::from_env()?;
133 Self::from_config(config)
134 }
135
136 pub fn base_url(&self) -> &Url {
137 &self.base_url
138 }
139
140 pub fn tracker(&self) -> ResourceTrackerHandle {
141 Arc::clone(&self.tracker)
142 }
143
144 pub fn telemetry(&self) -> &TelemetryConfig {
145 &self.telemetry
146 }
147
148 pub fn record_creation(&self, resource_type: &str, identifier: &str) {
149 self.tracker.record_creation(resource_type.to_string(), identifier.to_string());
150 }
151
152 pub fn record_deletion(&self, resource_type: &str, identifier: &str) {
153 self.tracker.record_deletion(resource_type.to_string(), identifier.to_string());
154 }
155
156 async fn authenticated_request(
157 &self,
158 builder: reqwest::RequestBuilder,
159 ) -> Result<Request, PortError> {
160 let token = self.token_provider.bearer_token().await?;
161 let request = builder.bearer_auth(token).build()?;
162 Ok(request)
163 }
164
165 async fn execute<T>(&self, request: Request) -> Result<T, PortError>
166 where
167 T: DeserializeOwned,
168 {
169 #[cfg(feature = "retry")]
170 {
171 if let Some(policy) = &self.retry_policy {
172 return self.execute_with_retry(request, policy).await;
173 }
174 }
175
176 self.execute_once(request).await
177 }
178
179 async fn execute_once<T>(&self, request: Request) -> Result<T, PortError>
180 where
181 T: DeserializeOwned,
182 {
183 #[cfg(feature = "tracing")]
184 let span = self.start_request_span(&request);
185
186 #[cfg(feature = "tracing")]
187 let response = self.http.execute(request).instrument(span.clone()).await?;
188
189 #[cfg(not(feature = "tracing"))]
190 let response = self.http.execute(request).await?;
191
192 #[cfg(feature = "tracing")]
193 self.finish_request_span(&span, &response);
194
195 Self::deserialize_response(response).await
196 }
197
198 async fn deserialize_response<T>(response: Response) -> Result<T, PortError>
199 where
200 T: DeserializeOwned,
201 {
202 let status = response.status();
203 let headers = response.headers().clone();
204 let body_bytes = response.bytes().await?;
205
206 if !status.is_success() {
207 let message = String::from_utf8_lossy(&body_bytes).trim().to_string();
208 return Err(PortError::api(status.as_u16(), message, headers));
209 }
210
211 if body_bytes.is_empty() {
212 return Ok(serde_json::from_str("null")?);
213 }
214
215 Ok(serde_json::from_slice(&body_bytes)?)
216 }
217
218 async fn request_json<T, F>(
219 &self,
220 method: Method,
221 path: &str,
222 configure: F,
223 ) -> Result<T, PortError>
224 where
225 T: DeserializeOwned,
226 F: Fn(reqwest::RequestBuilder) -> reqwest::RequestBuilder,
227 {
228 self.request_with(method, path, configure, |builder| builder).await
229 }
230
231 async fn request_with<T, F, Q>(
232 &self,
233 method: Method,
234 path: &str,
235 configure: F,
236 extra: Q,
237 ) -> Result<T, PortError>
238 where
239 T: DeserializeOwned,
240 F: Fn(reqwest::RequestBuilder) -> reqwest::RequestBuilder,
241 Q: Fn(reqwest::RequestBuilder) -> reqwest::RequestBuilder,
242 {
243 let url = self.base_url.join(path)?;
244 let builder = self.http.request(method, url);
245 let builder = configure(builder);
246 let builder = extra(builder);
247 let request = self.authenticated_request(builder).await?;
248 self.execute(request).await
249 }
250
251 pub async fn get<T>(&self, path: &str) -> Result<T, PortError>
252 where
253 T: DeserializeOwned,
254 {
255 self.request_json(Method::GET, path, |builder| builder).await
256 }
257
258 pub async fn get_with_query<T, Q>(&self, path: &str, query: &Q) -> Result<T, PortError>
259 where
260 T: DeserializeOwned,
261 Q: Serialize + ?Sized,
262 {
263 self.request_json(Method::GET, path, |builder| builder.query(query)).await
264 }
265
266 pub async fn get_paginated<T, Q>(
267 &self,
268 path: &str,
269 query: &Q,
270 pagination: &Pagination,
271 ) -> Result<T, PortError>
272 where
273 T: DeserializeOwned,
274 Q: Serialize + ?Sized,
275 {
276 self.request_json(Method::GET, path, |builder| {
277 let builder = builder.query(query);
278 if pagination.is_empty() {
279 builder
280 } else {
281 builder.query(pagination)
282 }
283 })
284 .await
285 }
286
287 pub async fn post<B, T>(&self, path: &str, body: &B) -> Result<T, PortError>
288 where
289 B: Serialize + ?Sized,
290 T: DeserializeOwned,
291 {
292 self.request_json(Method::POST, path, |builder| builder.json(body)).await
293 }
294
295 pub async fn put<B, T>(&self, path: &str, body: &B) -> Result<T, PortError>
296 where
297 B: Serialize + ?Sized,
298 T: DeserializeOwned,
299 {
300 self.request_json(Method::PUT, path, |builder| builder.json(body)).await
301 }
302
303 pub async fn patch<B, T>(&self, path: &str, body: &B) -> Result<T, PortError>
304 where
305 B: Serialize + ?Sized,
306 T: DeserializeOwned,
307 {
308 self.request_json(Method::PATCH, path, |builder| builder.json(body)).await
309 }
310
311 pub async fn delete<T>(&self, path: &str) -> Result<T, PortError>
312 where
313 T: DeserializeOwned,
314 {
315 self.request_json(Method::DELETE, path, |builder| builder).await
316 }
317
318 pub async fn delete_with_query<T, Q>(&self, path: &str, query: &Q) -> Result<T, PortError>
319 where
320 T: DeserializeOwned,
321 Q: Serialize + ?Sized,
322 {
323 self.request_json(Method::DELETE, path, |builder| builder.query(query)).await
324 }
325
326 #[cfg(feature = "retry")]
327 async fn execute_with_retry<T>(
328 &self,
329 request: Request,
330 policy: &RetryPolicy,
331 ) -> Result<T, PortError>
332 where
333 T: DeserializeOwned,
334 {
335 let start = Instant::now();
336 let mut attempts = 0;
337 let mut backoff = policy.to_backoff();
338 let request_template = request;
339
340 loop {
341 attempts += 1;
342 let attempt_request = request_template.try_clone().ok_or_else(|| {
343 PortError::Configuration("request body could not be cloned for retry".into())
344 })?;
345
346 match self.execute_once(attempt_request).await {
347 Ok(value) => return Ok(value),
348 Err(err) => {
349 if !policy.should_retry(&err, attempts) {
350 return Err(err);
351 }
352
353 let delay = match policy.next_delay(&err, &mut backoff) {
354 Some(delay) => delay,
355 None => return Err(err),
356 };
357 tokio::time::sleep(delay).await;
358
359 if let Some(max_elapsed) = policy.max_elapsed_time {
360 if start.elapsed() >= max_elapsed {
361 return Err(err);
362 }
363 }
364 }
365 }
366 }
367 }
368}
369
370#[derive(Clone, Debug, Default)]
371pub struct PortClientBuilder {
372 region: PortRegion,
373 base_url: Option<Url>,
374 auth: Option<AuthStrategy>,
375 proxy: Option<String>,
376 timeout: Option<Duration>,
377 retry: Option<RetryPolicy>,
378 http_client: Option<Client>,
379 tracker: Option<ResourceTrackerHandle>,
380 telemetry: TelemetryConfig,
381}
382
383impl PortClientBuilder {
384 pub fn from_config(config: PortConfig) -> Self {
385 let retry = config.retry.map(RetryPolicy::from);
386 Self {
387 region: config.region,
388 base_url: Some(config.base_url),
389 auth: Some(config.auth),
390 proxy: config.proxy,
391 timeout: Some(config.timeout),
392 retry,
393 http_client: None,
394 tracker: None,
395 telemetry: config.telemetry,
396 }
397 }
398
399 pub fn from_env() -> Result<Self, PortError> {
400 let config = PortConfig::from_env()?;
401 Ok(Self::from_config(config))
402 }
403
404 pub fn region(mut self, region: PortRegion) -> Self {
405 self.region = region;
406 self
407 }
408
409 pub fn base_url(mut self, base_url: Url) -> Self {
410 self.base_url = Some(base_url);
411 self
412 }
413
414 pub fn auth(mut self, auth: AuthStrategy) -> Self {
415 self.auth = Some(auth);
416 self
417 }
418
419 pub fn proxy(mut self, proxy: impl Into<String>) -> Self {
420 self.proxy = Some(proxy.into());
421 self
422 }
423
424 pub fn timeout(mut self, timeout: Duration) -> Self {
425 self.timeout = Some(timeout);
426 self
427 }
428
429 pub fn retry(mut self, retry: Option<RetryPolicy>) -> Self {
430 self.retry = retry;
431 self
432 }
433
434 pub fn http_client(mut self, client: Client) -> Self {
435 self.http_client = Some(client);
436 self
437 }
438
439 pub fn tracker(mut self, tracker: ResourceTrackerHandle) -> Self {
440 self.tracker = Some(tracker);
441 self
442 }
443
444 pub fn telemetry(mut self, telemetry: TelemetryConfig) -> Self {
445 self.telemetry = telemetry;
446 self
447 }
448
449 #[cfg(feature = "tracing")]
450 pub fn enable_tracing(mut self, enable: bool) -> Self {
451 self.telemetry.enable_tracing = enable;
452 self
453 }
454
455 pub fn build(self) -> Result<PortClient, PortError> {
456 let base_url = match self.base_url {
457 Some(url) => url,
458 None => Url::parse(self.region.base_url())?,
459 };
460
461 let auth = self.auth.ok_or_else(|| {
462 PortError::Configuration("authentication strategy missing for PortClient".into())
463 })?;
464 let token_provider = auth.into_provider()?;
465
466 let tracker = self.tracker.unwrap_or_else(new_shared_tracker);
467
468 let timeout = self.timeout.unwrap_or_else(|| Duration::from_secs(30));
469
470 let http = match self.http_client {
471 Some(client) => client,
472 None => {
473 let mut builder = Client::builder();
474 builder = builder.timeout(timeout);
475 if let Some(proxy_url) = &self.proxy {
476 builder = builder.proxy(build_proxy(proxy_url)?);
477 }
478 builder.build()?
479 }
480 };
481
482 Ok(PortClient {
483 http,
484 base_url,
485 token_provider,
486 retry_policy: self.retry,
487 tracker,
488 telemetry: self.telemetry,
489 })
490 }
491}
492
493fn build_proxy(proxy_url: &str) -> Result<Proxy, PortError> {
494 let mut proxy = Proxy::all(proxy_url).map_err(|err| {
495 PortError::Configuration(format!("failed to configure proxy {proxy_url}: {err}"))
496 })?;
497
498 if let (Ok(username), Ok(password)) =
499 (std::env::var("PROXY_AUTH_USERNAME"), std::env::var("PROXY_AUTH_PASSWORD"))
500 {
501 if !username.is_empty() || !password.is_empty() {
502 proxy = proxy.basic_auth(&username, &password);
503 }
504 }
505
506 Ok(proxy)
507}
508
509#[derive(Clone, Debug)]
510pub struct RetryPolicy {
511 max_attempts: u32,
512 max_elapsed_time: Option<Duration>,
513 initial_interval: Duration,
514 multiplier: f64,
515 max_interval: Duration,
516 retry_on_statuses: Vec<u16>,
517}
518
519impl From<RetryConfig> for RetryPolicy {
520 fn from(value: RetryConfig) -> Self {
521 RetryPolicy {
522 max_attempts: value.max_attempts.max(1),
523 max_elapsed_time: value.max_elapsed_time,
524 initial_interval: value.initial_interval,
525 multiplier: value.multiplier,
526 max_interval: value.max_interval,
527 retry_on_statuses: value.retry_on_statuses,
528 }
529 }
530}
531
532impl RetryPolicy {
533 fn should_retry(&self, error: &PortError, attempt: u32) -> bool {
534 if attempt >= self.max_attempts {
535 return false;
536 }
537
538 match error {
539 PortError::Http(_) => true,
540 PortError::Api { status, .. } => self.retry_on_statuses.contains(status),
541 _ => false,
542 }
543 }
544
545 #[cfg(feature = "retry")]
546 fn next_delay(&self, error: &PortError, backoff: &mut ExponentialBackoff) -> Option<Duration> {
547 if let Some(duration) = self.retry_after(error) {
548 return Some(duration);
549 }
550 backoff.next_backoff()
551 }
552
553 fn retry_after(&self, error: &PortError) -> Option<Duration> {
554 match error {
555 PortError::Api { headers, .. } => {
556 headers.get("retry-after").and_then(parse_retry_after_header)
557 }
558 _ => None,
559 }
560 }
561
562 #[cfg(feature = "retry")]
563 fn to_backoff(&self) -> ExponentialBackoff {
564 let mut backoff = ExponentialBackoff::default();
565 backoff.max_elapsed_time = self.max_elapsed_time;
566 backoff.current_interval = self.initial_interval;
567 backoff.initial_interval = self.initial_interval;
568 backoff.multiplier = self.multiplier;
569 backoff.max_interval = self.max_interval;
570 backoff.randomization_factor = 0.0;
571 backoff
572 }
573}
574
575fn parse_retry_after_header(value: &HeaderValue) -> Option<Duration> {
576 let text = value.to_str().ok()?;
577 if let Ok(seconds) = text.parse::<u64>() {
578 return Some(Duration::from_secs(seconds));
579 }
580
581 let target_time = parse_http_date(text).ok()?;
582 let now = SystemTime::now();
583 if target_time > now {
584 target_time.duration_since(now).ok()
585 } else {
586 Some(Duration::from_secs(0))
587 }
588}
589
590#[cfg(test)]
591mod tests {
592 use super::*;
593
594 #[test]
595 fn parse_retry_after_seconds() {
596 let header = HeaderValue::from_static("3");
597 let duration = parse_retry_after_header(&header).expect("duration");
598 assert_eq!(duration.as_secs(), 3);
599 }
600}