Skip to main content

port_sdk/
client.rs

1use crate::auth::{AuthStrategy, TokenProvider};
2use crate::config::{PortConfig, PortRegion, RetryConfig, TelemetryConfig};
3use crate::error::PortError;
4use crate::tracking::{new_shared_tracker, ResourceTrackerHandle};
5#[cfg(feature = "retry")]
6use backoff::backoff::Backoff;
7#[cfg(feature = "retry")]
8use backoff::ExponentialBackoff;
9use httpdate::parse_http_date;
10use reqwest::header::HeaderValue;
11use reqwest::{Client, Method, Proxy, Request, Response, Url};
12use serde::de::DeserializeOwned;
13use serde::Serialize;
14use std::sync::Arc;
15use std::time::{Duration, Instant, SystemTime};
16#[cfg(feature = "tracing")]
17use tracing::{Instrument, Span};
18
19/// Common pagination parameters used by many Port endpoints.
20#[derive(Debug, Clone, Default, Serialize)]
21#[serde(rename_all = "camelCase")]
22pub struct Pagination {
23    #[serde(skip_serializing_if = "Option::is_none")]
24    page: Option<u32>,
25    #[serde(skip_serializing_if = "Option::is_none")]
26    per_page: Option<u32>,
27    #[serde(skip_serializing_if = "Option::is_none")]
28    cursor: Option<String>,
29}
30
31impl Pagination {
32    pub fn builder() -> PaginationBuilder {
33        PaginationBuilder::default()
34    }
35
36    pub fn is_empty(&self) -> bool {
37        self.page.is_none() && self.per_page.is_none() && self.cursor.is_none()
38    }
39}
40
41#[cfg(feature = "tracing")]
42impl PortClient {
43    fn start_request_span(&self, request: &Request) -> Span {
44        if !self.telemetry.enable_tracing {
45            return Span::none();
46        }
47
48        tracing::info_span!(
49            "port_sdk.request",
50            method = %request.method(),
51            path = %request.url(),
52            retry_enabled = self.retry_policy.is_some()
53        )
54    }
55
56    fn finish_request_span(&self, span: &Span, response: &Response) {
57        if !self.telemetry.enable_tracing || span.is_none() {
58            return;
59        }
60
61        let status = response.status();
62        let _guard = span.enter();
63        if status.is_success() {
64            tracing::info!(status = %status, "request completed");
65        } else {
66            tracing::warn!(status = %status, "request completed with failure");
67        }
68    }
69}
70
71/// Builder for ergonomically constructing [`Pagination`] structs.
72#[derive(Debug, Default)]
73pub struct PaginationBuilder {
74    inner: Pagination,
75}
76
77impl PaginationBuilder {
78    pub fn new() -> Self {
79        Self::default()
80    }
81
82    pub fn page(mut self, page: u32) -> Self {
83        self.inner.page = Some(page);
84        self
85    }
86
87    pub fn per_page(mut self, per_page: u32) -> Self {
88        self.inner.per_page = Some(per_page);
89        self
90    }
91
92    pub fn cursor(mut self, cursor: impl Into<String>) -> Self {
93        self.inner.cursor = Some(cursor.into());
94        self
95    }
96
97    pub fn build(self) -> Pagination {
98        self.inner
99    }
100}
101
102#[derive(Clone)]
103pub struct PortClient {
104    http: Client,
105    base_url: Url,
106    token_provider: Arc<dyn TokenProvider>,
107    retry_policy: Option<RetryPolicy>,
108    tracker: ResourceTrackerHandle,
109    telemetry: TelemetryConfig,
110}
111
112impl std::fmt::Debug for PortClient {
113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114        f.debug_struct("PortClient")
115            .field("base_url", &self.base_url)
116            .field("retry_policy", &self.retry_policy)
117            .field("tracing", &self.telemetry.enable_tracing)
118            .finish()
119    }
120}
121
122impl PortClient {
123    pub fn builder() -> PortClientBuilder {
124        PortClientBuilder::default()
125    }
126
127    pub fn from_config(config: PortConfig) -> Result<Self, PortError> {
128        PortClientBuilder::from_config(config).build()
129    }
130
131    pub fn from_env() -> Result<Self, PortError> {
132        let config = PortConfig::from_env()?;
133        Self::from_config(config)
134    }
135
136    pub fn base_url(&self) -> &Url {
137        &self.base_url
138    }
139
140    pub fn tracker(&self) -> ResourceTrackerHandle {
141        Arc::clone(&self.tracker)
142    }
143
144    pub fn telemetry(&self) -> &TelemetryConfig {
145        &self.telemetry
146    }
147
148    pub fn record_creation(&self, resource_type: &str, identifier: &str) {
149        self.tracker.record_creation(resource_type.to_string(), identifier.to_string());
150    }
151
152    pub fn record_deletion(&self, resource_type: &str, identifier: &str) {
153        self.tracker.record_deletion(resource_type.to_string(), identifier.to_string());
154    }
155
156    async fn authenticated_request(
157        &self,
158        builder: reqwest::RequestBuilder,
159    ) -> Result<Request, PortError> {
160        let token = self.token_provider.bearer_token().await?;
161        let request = builder.bearer_auth(token).build()?;
162        Ok(request)
163    }
164
165    async fn execute<T>(&self, request: Request) -> Result<T, PortError>
166    where
167        T: DeserializeOwned,
168    {
169        #[cfg(feature = "retry")]
170        {
171            if let Some(policy) = &self.retry_policy {
172                return self.execute_with_retry(request, policy).await;
173            }
174        }
175
176        self.execute_once(request).await
177    }
178
179    async fn execute_once<T>(&self, request: Request) -> Result<T, PortError>
180    where
181        T: DeserializeOwned,
182    {
183        #[cfg(feature = "tracing")]
184        let span = self.start_request_span(&request);
185
186        #[cfg(feature = "tracing")]
187        let response = self.http.execute(request).instrument(span.clone()).await?;
188
189        #[cfg(not(feature = "tracing"))]
190        let response = self.http.execute(request).await?;
191
192        #[cfg(feature = "tracing")]
193        self.finish_request_span(&span, &response);
194
195        Self::deserialize_response(response).await
196    }
197
198    async fn deserialize_response<T>(response: Response) -> Result<T, PortError>
199    where
200        T: DeserializeOwned,
201    {
202        let status = response.status();
203        let headers = response.headers().clone();
204        let body_bytes = response.bytes().await?;
205
206        if !status.is_success() {
207            let message = String::from_utf8_lossy(&body_bytes).trim().to_string();
208            return Err(PortError::api(status.as_u16(), message, headers));
209        }
210
211        if body_bytes.is_empty() {
212            return Ok(serde_json::from_str("null")?);
213        }
214
215        Ok(serde_json::from_slice(&body_bytes)?)
216    }
217
218    async fn request_json<T, F>(
219        &self,
220        method: Method,
221        path: &str,
222        configure: F,
223    ) -> Result<T, PortError>
224    where
225        T: DeserializeOwned,
226        F: Fn(reqwest::RequestBuilder) -> reqwest::RequestBuilder,
227    {
228        self.request_with(method, path, configure, |builder| builder).await
229    }
230
231    async fn request_with<T, F, Q>(
232        &self,
233        method: Method,
234        path: &str,
235        configure: F,
236        extra: Q,
237    ) -> Result<T, PortError>
238    where
239        T: DeserializeOwned,
240        F: Fn(reqwest::RequestBuilder) -> reqwest::RequestBuilder,
241        Q: Fn(reqwest::RequestBuilder) -> reqwest::RequestBuilder,
242    {
243        let url = self.base_url.join(path)?;
244        let builder = self.http.request(method, url);
245        let builder = configure(builder);
246        let builder = extra(builder);
247        let request = self.authenticated_request(builder).await?;
248        self.execute(request).await
249    }
250
251    pub async fn get<T>(&self, path: &str) -> Result<T, PortError>
252    where
253        T: DeserializeOwned,
254    {
255        self.request_json(Method::GET, path, |builder| builder).await
256    }
257
258    pub async fn get_with_query<T, Q>(&self, path: &str, query: &Q) -> Result<T, PortError>
259    where
260        T: DeserializeOwned,
261        Q: Serialize + ?Sized,
262    {
263        self.request_json(Method::GET, path, |builder| builder.query(query)).await
264    }
265
266    pub async fn get_paginated<T, Q>(
267        &self,
268        path: &str,
269        query: &Q,
270        pagination: &Pagination,
271    ) -> Result<T, PortError>
272    where
273        T: DeserializeOwned,
274        Q: Serialize + ?Sized,
275    {
276        self.request_json(Method::GET, path, |builder| {
277            let builder = builder.query(query);
278            if pagination.is_empty() {
279                builder
280            } else {
281                builder.query(pagination)
282            }
283        })
284        .await
285    }
286
287    pub async fn post<B, T>(&self, path: &str, body: &B) -> Result<T, PortError>
288    where
289        B: Serialize + ?Sized,
290        T: DeserializeOwned,
291    {
292        self.request_json(Method::POST, path, |builder| builder.json(body)).await
293    }
294
295    pub async fn put<B, T>(&self, path: &str, body: &B) -> Result<T, PortError>
296    where
297        B: Serialize + ?Sized,
298        T: DeserializeOwned,
299    {
300        self.request_json(Method::PUT, path, |builder| builder.json(body)).await
301    }
302
303    pub async fn patch<B, T>(&self, path: &str, body: &B) -> Result<T, PortError>
304    where
305        B: Serialize + ?Sized,
306        T: DeserializeOwned,
307    {
308        self.request_json(Method::PATCH, path, |builder| builder.json(body)).await
309    }
310
311    pub async fn delete<T>(&self, path: &str) -> Result<T, PortError>
312    where
313        T: DeserializeOwned,
314    {
315        self.request_json(Method::DELETE, path, |builder| builder).await
316    }
317
318    pub async fn delete_with_query<T, Q>(&self, path: &str, query: &Q) -> Result<T, PortError>
319    where
320        T: DeserializeOwned,
321        Q: Serialize + ?Sized,
322    {
323        self.request_json(Method::DELETE, path, |builder| builder.query(query)).await
324    }
325
326    #[cfg(feature = "retry")]
327    async fn execute_with_retry<T>(
328        &self,
329        request: Request,
330        policy: &RetryPolicy,
331    ) -> Result<T, PortError>
332    where
333        T: DeserializeOwned,
334    {
335        let start = Instant::now();
336        let mut attempts = 0;
337        let mut backoff = policy.to_backoff();
338        let request_template = request;
339
340        loop {
341            attempts += 1;
342            let attempt_request = request_template.try_clone().ok_or_else(|| {
343                PortError::Configuration("request body could not be cloned for retry".into())
344            })?;
345
346            match self.execute_once(attempt_request).await {
347                Ok(value) => return Ok(value),
348                Err(err) => {
349                    if !policy.should_retry(&err, attempts) {
350                        return Err(err);
351                    }
352
353                    let delay = match policy.next_delay(&err, &mut backoff) {
354                        Some(delay) => delay,
355                        None => return Err(err),
356                    };
357                    tokio::time::sleep(delay).await;
358
359                    if let Some(max_elapsed) = policy.max_elapsed_time {
360                        if start.elapsed() >= max_elapsed {
361                            return Err(err);
362                        }
363                    }
364                }
365            }
366        }
367    }
368}
369
370#[derive(Clone, Debug, Default)]
371pub struct PortClientBuilder {
372    region: PortRegion,
373    base_url: Option<Url>,
374    auth: Option<AuthStrategy>,
375    proxy: Option<String>,
376    timeout: Option<Duration>,
377    retry: Option<RetryPolicy>,
378    http_client: Option<Client>,
379    tracker: Option<ResourceTrackerHandle>,
380    telemetry: TelemetryConfig,
381}
382
383impl PortClientBuilder {
384    pub fn from_config(config: PortConfig) -> Self {
385        let retry = config.retry.map(RetryPolicy::from);
386        Self {
387            region: config.region,
388            base_url: Some(config.base_url),
389            auth: Some(config.auth),
390            proxy: config.proxy,
391            timeout: Some(config.timeout),
392            retry,
393            http_client: None,
394            tracker: None,
395            telemetry: config.telemetry,
396        }
397    }
398
399    pub fn from_env() -> Result<Self, PortError> {
400        let config = PortConfig::from_env()?;
401        Ok(Self::from_config(config))
402    }
403
404    pub fn region(mut self, region: PortRegion) -> Self {
405        self.region = region;
406        self
407    }
408
409    pub fn base_url(mut self, base_url: Url) -> Self {
410        self.base_url = Some(base_url);
411        self
412    }
413
414    pub fn auth(mut self, auth: AuthStrategy) -> Self {
415        self.auth = Some(auth);
416        self
417    }
418
419    pub fn proxy(mut self, proxy: impl Into<String>) -> Self {
420        self.proxy = Some(proxy.into());
421        self
422    }
423
424    pub fn timeout(mut self, timeout: Duration) -> Self {
425        self.timeout = Some(timeout);
426        self
427    }
428
429    pub fn retry(mut self, retry: Option<RetryPolicy>) -> Self {
430        self.retry = retry;
431        self
432    }
433
434    pub fn http_client(mut self, client: Client) -> Self {
435        self.http_client = Some(client);
436        self
437    }
438
439    pub fn tracker(mut self, tracker: ResourceTrackerHandle) -> Self {
440        self.tracker = Some(tracker);
441        self
442    }
443
444    pub fn telemetry(mut self, telemetry: TelemetryConfig) -> Self {
445        self.telemetry = telemetry;
446        self
447    }
448
449    #[cfg(feature = "tracing")]
450    pub fn enable_tracing(mut self, enable: bool) -> Self {
451        self.telemetry.enable_tracing = enable;
452        self
453    }
454
455    pub fn build(self) -> Result<PortClient, PortError> {
456        let base_url = match self.base_url {
457            Some(url) => url,
458            None => Url::parse(self.region.base_url())?,
459        };
460
461        let auth = self.auth.ok_or_else(|| {
462            PortError::Configuration("authentication strategy missing for PortClient".into())
463        })?;
464        let token_provider = auth.into_provider()?;
465
466        let tracker = self.tracker.unwrap_or_else(new_shared_tracker);
467
468        let timeout = self.timeout.unwrap_or_else(|| Duration::from_secs(30));
469
470        let http = match self.http_client {
471            Some(client) => client,
472            None => {
473                let mut builder = Client::builder();
474                builder = builder.timeout(timeout);
475                if let Some(proxy_url) = &self.proxy {
476                    builder = builder.proxy(build_proxy(proxy_url)?);
477                }
478                builder.build()?
479            }
480        };
481
482        Ok(PortClient {
483            http,
484            base_url,
485            token_provider,
486            retry_policy: self.retry,
487            tracker,
488            telemetry: self.telemetry,
489        })
490    }
491}
492
493fn build_proxy(proxy_url: &str) -> Result<Proxy, PortError> {
494    let mut proxy = Proxy::all(proxy_url).map_err(|err| {
495        PortError::Configuration(format!("failed to configure proxy {proxy_url}: {err}"))
496    })?;
497
498    if let (Ok(username), Ok(password)) =
499        (std::env::var("PROXY_AUTH_USERNAME"), std::env::var("PROXY_AUTH_PASSWORD"))
500    {
501        if !username.is_empty() || !password.is_empty() {
502            proxy = proxy.basic_auth(&username, &password);
503        }
504    }
505
506    Ok(proxy)
507}
508
509#[derive(Clone, Debug)]
510pub struct RetryPolicy {
511    max_attempts: u32,
512    max_elapsed_time: Option<Duration>,
513    initial_interval: Duration,
514    multiplier: f64,
515    max_interval: Duration,
516    retry_on_statuses: Vec<u16>,
517}
518
519impl From<RetryConfig> for RetryPolicy {
520    fn from(value: RetryConfig) -> Self {
521        RetryPolicy {
522            max_attempts: value.max_attempts.max(1),
523            max_elapsed_time: value.max_elapsed_time,
524            initial_interval: value.initial_interval,
525            multiplier: value.multiplier,
526            max_interval: value.max_interval,
527            retry_on_statuses: value.retry_on_statuses,
528        }
529    }
530}
531
532impl RetryPolicy {
533    fn should_retry(&self, error: &PortError, attempt: u32) -> bool {
534        if attempt >= self.max_attempts {
535            return false;
536        }
537
538        match error {
539            PortError::Http(_) => true,
540            PortError::Api { status, .. } => self.retry_on_statuses.contains(status),
541            _ => false,
542        }
543    }
544
545    #[cfg(feature = "retry")]
546    fn next_delay(&self, error: &PortError, backoff: &mut ExponentialBackoff) -> Option<Duration> {
547        if let Some(duration) = self.retry_after(error) {
548            return Some(duration);
549        }
550        backoff.next_backoff()
551    }
552
553    fn retry_after(&self, error: &PortError) -> Option<Duration> {
554        match error {
555            PortError::Api { headers, .. } => {
556                headers.get("retry-after").and_then(parse_retry_after_header)
557            }
558            _ => None,
559        }
560    }
561
562    #[cfg(feature = "retry")]
563    fn to_backoff(&self) -> ExponentialBackoff {
564        let mut backoff = ExponentialBackoff::default();
565        backoff.max_elapsed_time = self.max_elapsed_time;
566        backoff.current_interval = self.initial_interval;
567        backoff.initial_interval = self.initial_interval;
568        backoff.multiplier = self.multiplier;
569        backoff.max_interval = self.max_interval;
570        backoff.randomization_factor = 0.0;
571        backoff
572    }
573}
574
575fn parse_retry_after_header(value: &HeaderValue) -> Option<Duration> {
576    let text = value.to_str().ok()?;
577    if let Ok(seconds) = text.parse::<u64>() {
578        return Some(Duration::from_secs(seconds));
579    }
580
581    let target_time = parse_http_date(text).ok()?;
582    let now = SystemTime::now();
583    if target_time > now {
584        target_time.duration_since(now).ok()
585    } else {
586        Some(Duration::from_secs(0))
587    }
588}
589
590#[cfg(test)]
591mod tests {
592    use super::*;
593
594    #[test]
595    fn parse_retry_after_seconds() {
596        let header = HeaderValue::from_static("3");
597        let duration = parse_retry_after_header(&header).expect("duration");
598        assert_eq!(duration.as_secs(), 3);
599    }
600}