Skip to main content

flowfull/
client.rs

1use std::{sync::Arc, time::Duration};
2
3use reqwest::{Method, Url};
4use serde::{Serialize, de::DeserializeOwned};
5use serde_json::Value;
6use tokio::time::sleep;
7
8use crate::{
9    auth::AuthClient,
10    bridge::BridgeClient,
11    config::{ClientConfig, ClientConfigBuilder},
12    error::{FlowfullError, Result},
13    query::QueryBuilder,
14    request::RequestOptions,
15    response::{ApiResponse, RawResponse},
16    session::SessionManager,
17    upload::UploadBuilder,
18};
19
20#[derive(Clone)]
21pub struct FlowfullClient {
22    http: reqwest::Client,
23    config: Arc<ClientConfig>,
24    session_manager: SessionManager,
25}
26
27impl FlowfullClient {
28    pub fn new(base_url: impl AsRef<str>) -> Result<Self> {
29        Self::builder(base_url).build_client()
30    }
31
32    pub fn builder(base_url: impl AsRef<str>) -> ClientConfigBuilder {
33        ClientConfigBuilder::new(base_url).expect("invalid base URL")
34    }
35
36    pub fn from_config(config: ClientConfig) -> Result<Self> {
37        let http = reqwest::Client::builder().timeout(config.timeout).build()?;
38        let config = Arc::new(config);
39        let session_manager = SessionManager::new(config.clone());
40        Ok(Self {
41            http,
42            config,
43            session_manager,
44        })
45    }
46
47    pub fn session_manager(&self) -> &SessionManager {
48        &self.session_manager
49    }
50
51    pub fn auth(&self) -> AuthClient {
52        AuthClient::new(self.clone())
53    }
54
55    pub fn bridge(&self) -> BridgeClient {
56        BridgeClient::new(self.clone())
57    }
58
59    #[cfg(feature = "payments")]
60    pub fn pay(&self) -> crate::payments::PaymentsClient {
61        crate::payments::PaymentsClient::new(self.clone())
62    }
63
64    pub fn query(&self, endpoint: impl Into<String>) -> QueryBuilder {
65        QueryBuilder::new(self.clone(), endpoint.into())
66    }
67
68    pub fn upload_file(
69        &self,
70        endpoint: impl Into<String>,
71        file: crate::upload::UploadFile,
72    ) -> UploadBuilder {
73        UploadBuilder::new(self.clone(), endpoint.into(), file)
74    }
75
76    pub async fn get<T>(&self, endpoint: &str) -> Result<T>
77    where
78        T: DeserializeOwned,
79    {
80        self.get_with_options(endpoint, RequestOptions::default())
81            .await
82    }
83
84    pub async fn get_with_options<T>(&self, endpoint: &str, options: RequestOptions) -> Result<T>
85    where
86        T: DeserializeOwned,
87    {
88        self.request_json(Method::GET, endpoint, Option::<&()>::None, options)
89            .await
90    }
91
92    pub async fn post<T, B>(&self, endpoint: &str, body: &B) -> Result<T>
93    where
94        T: DeserializeOwned,
95        B: Serialize + ?Sized,
96    {
97        self.post_with_options(endpoint, body, RequestOptions::default())
98            .await
99    }
100
101    pub async fn post_with_options<T, B>(
102        &self,
103        endpoint: &str,
104        body: &B,
105        options: RequestOptions,
106    ) -> Result<T>
107    where
108        T: DeserializeOwned,
109        B: Serialize + ?Sized,
110    {
111        self.request_json(Method::POST, endpoint, Some(body), options)
112            .await
113    }
114
115    pub async fn put<T, B>(&self, endpoint: &str, body: &B) -> Result<T>
116    where
117        T: DeserializeOwned,
118        B: Serialize + ?Sized,
119    {
120        self.request_json(Method::PUT, endpoint, Some(body), RequestOptions::default())
121            .await
122    }
123
124    pub async fn patch<T, B>(&self, endpoint: &str, body: &B) -> Result<T>
125    where
126        T: DeserializeOwned,
127        B: Serialize + ?Sized,
128    {
129        self.request_json(
130            Method::PATCH,
131            endpoint,
132            Some(body),
133            RequestOptions::default(),
134        )
135        .await
136    }
137
138    pub async fn delete<T>(&self, endpoint: &str) -> Result<T>
139    where
140        T: DeserializeOwned,
141    {
142        self.request_json(
143            Method::DELETE,
144            endpoint,
145            Option::<&()>::None,
146            RequestOptions::default(),
147        )
148        .await
149    }
150
151    pub async fn request_json<T, B>(
152        &self,
153        method: Method,
154        endpoint: &str,
155        body: Option<&B>,
156        options: RequestOptions,
157    ) -> Result<T>
158    where
159        T: DeserializeOwned,
160        B: Serialize + ?Sized,
161    {
162        let raw = self
163            .raw_json_request(method, endpoint, body, options)
164            .await?;
165        self.parse_json_response(raw)
166    }
167
168    pub async fn raw_json_request<B>(
169        &self,
170        method: Method,
171        endpoint: &str,
172        body: Option<&B>,
173        options: RequestOptions,
174    ) -> Result<RawResponse>
175    where
176        B: Serialize + ?Sized,
177    {
178        let url = self.build_url(endpoint, &options)?;
179        let body = match body {
180            Some(body) => Some(serde_json::to_value(body)?),
181            None => None,
182        };
183        self.execute_with_retry(method, url, body, options).await
184    }
185
186    pub(crate) async fn execute_with_retry(
187        &self,
188        method: Method,
189        url: Url,
190        body: Option<Value>,
191        options: RequestOptions,
192    ) -> Result<RawResponse> {
193        let attempts = self.config.retry.attempts.max(1);
194        let mut last_error: Option<FlowfullError> = None;
195
196        for attempt in 0..attempts {
197            match self
198                .execute_once(method.clone(), url.clone(), body.clone(), options.clone())
199                .await
200            {
201                Ok(resp)
202                    if self.should_retry_status(method.clone(), resp.status, attempt, attempts) =>
203                {
204                    last_error = Some(FlowfullError::Api {
205                        status: resp.status,
206                        message: format!("retryable status {}", resp.status),
207                        body: Some(resp.text()),
208                    });
209                }
210                Ok(resp) => return Ok(resp),
211                Err(err) if self.should_retry_error(&method, attempt, attempts) => {
212                    last_error = Some(err);
213                }
214                Err(err) => return Err(err),
215            }
216
217            if attempt + 1 < attempts {
218                sleep(self.retry_delay(attempt)).await;
219            }
220        }
221
222        Err(last_error.unwrap_or(FlowfullError::RequestFailed))
223    }
224
225    async fn execute_once(
226        &self,
227        method: Method,
228        url: Url,
229        body: Option<Value>,
230        options: RequestOptions,
231    ) -> Result<RawResponse> {
232        let mut request = self.http.request(method, url);
233
234        for (key, value) in self.config.headers.iter() {
235            request = request.header(key, value);
236        }
237        for (key, value) in options.headers.iter() {
238            request = request.header(key, value);
239        }
240
241        let include_session = options
242            .include_session
243            .unwrap_or(self.config.include_session);
244        let session_id = match options.session_id {
245            Some(session_id) => Some(session_id),
246            None if include_session => self.session_manager.get_session_id().await?,
247            None => None,
248        };
249        if let Some(session_id) = session_id {
250            if !session_id.is_empty() {
251                request = request.header(self.config.session_header.clone(), session_id);
252            }
253        }
254
255        if let Some(timeout) = options.timeout {
256            request = request.timeout(timeout);
257        }
258
259        if let Some(body) = body {
260            request = request.json(&body);
261        }
262
263        let response = request.send().await?;
264        let status = response.status().as_u16();
265        let headers = response.headers().clone();
266        let body = response.bytes().await?.to_vec();
267
268        Ok(RawResponse {
269            status,
270            headers,
271            body,
272        })
273    }
274
275    pub(crate) fn parse_json_response<T>(&self, raw: RawResponse) -> Result<T>
276    where
277        T: DeserializeOwned,
278    {
279        if !(200..300).contains(&raw.status) {
280            return Err(FlowfullError::Api {
281                status: raw.status,
282                message: raw.text(),
283                body: Some(raw.text()),
284            });
285        }
286
287        let value: Value = serde_json::from_slice(&raw.body)?;
288        if value.get("success").is_some() {
289            let envelope: ApiResponse<Value> = serde_json::from_value(value.clone())?;
290            if !envelope.success {
291                return Err(FlowfullError::Api {
292                    status: raw.status,
293                    message: envelope
294                        .error
295                        .or(envelope.message)
296                        .unwrap_or_else(|| "request failed".to_string()),
297                    body: Some(raw.text()),
298                });
299            }
300
301            if let Some(data) = envelope.data {
302                return Ok(serde_json::from_value(data)?);
303            }
304        }
305
306        Ok(serde_json::from_value(value)?)
307    }
308
309    pub(crate) async fn raw_multipart_request(
310        &self,
311        method: Method,
312        endpoint: &str,
313        form: reqwest::multipart::Form,
314        options: RequestOptions,
315    ) -> Result<RawResponse> {
316        let url = self.build_url(endpoint, &options)?;
317        let mut request = self.http.request(method, url).multipart(form);
318
319        for (key, value) in self.config.headers.iter() {
320            request = request.header(key, value);
321        }
322        for (key, value) in options.headers.iter() {
323            request = request.header(key, value);
324        }
325
326        let include_session = options
327            .include_session
328            .unwrap_or(self.config.include_session);
329        let session_id = match options.session_id {
330            Some(session_id) => Some(session_id),
331            None if include_session => self.session_manager.get_session_id().await?,
332            None => None,
333        };
334        if let Some(session_id) = session_id {
335            if !session_id.is_empty() {
336                request = request.header(self.config.session_header.clone(), session_id);
337            }
338        }
339
340        if let Some(timeout) = options.timeout {
341            request = request.timeout(timeout);
342        }
343
344        let response = request.send().await?;
345        let status = response.status().as_u16();
346        let headers = response.headers().clone();
347        let body = response.bytes().await?.to_vec();
348        Ok(RawResponse {
349            status,
350            headers,
351            body,
352        })
353    }
354
355    pub(crate) fn build_url(&self, endpoint: &str, options: &RequestOptions) -> Result<Url> {
356        let mut url = if endpoint.starts_with("http://") || endpoint.starts_with("https://") {
357            Url::parse(endpoint)?
358        } else {
359            let endpoint = endpoint.trim_start_matches('/');
360            self.config.base_url.join(endpoint)?
361        };
362
363        if !options.query.is_empty() {
364            let mut pairs = url.query_pairs_mut();
365            for (key, value) in &options.query {
366                pairs.append_pair(key, value);
367            }
368        }
369
370        Ok(url)
371    }
372
373    fn should_retry_error(&self, method: &Method, attempt: usize, attempts: usize) -> bool {
374        attempt + 1 < attempts && (is_idempotent(method) || self.config.retry.retry_non_idempotent)
375    }
376
377    fn should_retry_status(
378        &self,
379        method: Method,
380        status: u16,
381        attempt: usize,
382        attempts: usize,
383    ) -> bool {
384        attempt + 1 < attempts
385            && self.config.retry.retry_statuses.contains(&status)
386            && (is_idempotent(&method) || self.config.retry.retry_non_idempotent)
387    }
388
389    fn retry_delay(&self, attempt: usize) -> Duration {
390        let mut delay = self.config.retry.delay;
391        if self.config.retry.exponential {
392            delay = delay.saturating_mul(2_u32.saturating_pow(attempt as u32));
393        }
394        if let Some(max_delay) = self.config.retry.max_delay {
395            delay.min(max_delay)
396        } else {
397            delay
398        }
399    }
400}
401
402impl ClientConfigBuilder {
403    pub fn build_client(self) -> Result<FlowfullClient> {
404        FlowfullClient::from_config(self.build()?)
405    }
406}
407
408fn is_idempotent(method: &Method) -> bool {
409    matches!(
410        *method,
411        Method::GET | Method::HEAD | Method::PUT | Method::DELETE | Method::OPTIONS
412    )
413}