1use std::{sync::Arc, time::Duration};
2
3use reqwest::{Method, Url};
4use serde::{Serialize, de::DeserializeOwned};
5use serde_json::Value;
6use tokio::time::sleep;
7
8use crate::{
9 auth::AuthClient,
10 bridge::BridgeClient,
11 config::{ClientConfig, ClientConfigBuilder},
12 error::{FlowfullError, Result},
13 query::QueryBuilder,
14 request::RequestOptions,
15 response::{ApiResponse, RawResponse},
16 session::SessionManager,
17 upload::UploadBuilder,
18};
19
20#[derive(Clone)]
21pub struct FlowfullClient {
22 http: reqwest::Client,
23 config: Arc<ClientConfig>,
24 session_manager: SessionManager,
25}
26
27impl FlowfullClient {
28 pub fn new(base_url: impl AsRef<str>) -> Result<Self> {
29 Self::builder(base_url).build_client()
30 }
31
32 pub fn builder(base_url: impl AsRef<str>) -> ClientConfigBuilder {
33 ClientConfigBuilder::new(base_url).expect("invalid base URL")
34 }
35
36 pub fn from_config(config: ClientConfig) -> Result<Self> {
37 let http = reqwest::Client::builder().timeout(config.timeout).build()?;
38 let config = Arc::new(config);
39 let session_manager = SessionManager::new(config.clone());
40 Ok(Self {
41 http,
42 config,
43 session_manager,
44 })
45 }
46
47 pub fn session_manager(&self) -> &SessionManager {
48 &self.session_manager
49 }
50
51 pub fn auth(&self) -> AuthClient {
52 AuthClient::new(self.clone())
53 }
54
55 pub fn bridge(&self) -> BridgeClient {
56 BridgeClient::new(self.clone())
57 }
58
59 #[cfg(feature = "payments")]
60 pub fn pay(&self) -> crate::payments::PaymentsClient {
61 crate::payments::PaymentsClient::new(self.clone())
62 }
63
64 pub fn query(&self, endpoint: impl Into<String>) -> QueryBuilder {
65 QueryBuilder::new(self.clone(), endpoint.into())
66 }
67
68 pub fn upload_file(
69 &self,
70 endpoint: impl Into<String>,
71 file: crate::upload::UploadFile,
72 ) -> UploadBuilder {
73 UploadBuilder::new(self.clone(), endpoint.into(), file)
74 }
75
76 pub async fn get<T>(&self, endpoint: &str) -> Result<T>
77 where
78 T: DeserializeOwned,
79 {
80 self.get_with_options(endpoint, RequestOptions::default())
81 .await
82 }
83
84 pub async fn get_with_options<T>(&self, endpoint: &str, options: RequestOptions) -> Result<T>
85 where
86 T: DeserializeOwned,
87 {
88 self.request_json(Method::GET, endpoint, Option::<&()>::None, options)
89 .await
90 }
91
92 pub async fn post<T, B>(&self, endpoint: &str, body: &B) -> Result<T>
93 where
94 T: DeserializeOwned,
95 B: Serialize + ?Sized,
96 {
97 self.post_with_options(endpoint, body, RequestOptions::default())
98 .await
99 }
100
101 pub async fn post_with_options<T, B>(
102 &self,
103 endpoint: &str,
104 body: &B,
105 options: RequestOptions,
106 ) -> Result<T>
107 where
108 T: DeserializeOwned,
109 B: Serialize + ?Sized,
110 {
111 self.request_json(Method::POST, endpoint, Some(body), options)
112 .await
113 }
114
115 pub async fn put<T, B>(&self, endpoint: &str, body: &B) -> Result<T>
116 where
117 T: DeserializeOwned,
118 B: Serialize + ?Sized,
119 {
120 self.request_json(Method::PUT, endpoint, Some(body), RequestOptions::default())
121 .await
122 }
123
124 pub async fn patch<T, B>(&self, endpoint: &str, body: &B) -> Result<T>
125 where
126 T: DeserializeOwned,
127 B: Serialize + ?Sized,
128 {
129 self.request_json(
130 Method::PATCH,
131 endpoint,
132 Some(body),
133 RequestOptions::default(),
134 )
135 .await
136 }
137
138 pub async fn delete<T>(&self, endpoint: &str) -> Result<T>
139 where
140 T: DeserializeOwned,
141 {
142 self.request_json(
143 Method::DELETE,
144 endpoint,
145 Option::<&()>::None,
146 RequestOptions::default(),
147 )
148 .await
149 }
150
151 pub async fn request_json<T, B>(
152 &self,
153 method: Method,
154 endpoint: &str,
155 body: Option<&B>,
156 options: RequestOptions,
157 ) -> Result<T>
158 where
159 T: DeserializeOwned,
160 B: Serialize + ?Sized,
161 {
162 let raw = self
163 .raw_json_request(method, endpoint, body, options)
164 .await?;
165 self.parse_json_response(raw)
166 }
167
168 pub async fn raw_json_request<B>(
169 &self,
170 method: Method,
171 endpoint: &str,
172 body: Option<&B>,
173 options: RequestOptions,
174 ) -> Result<RawResponse>
175 where
176 B: Serialize + ?Sized,
177 {
178 let url = self.build_url(endpoint, &options)?;
179 let body = match body {
180 Some(body) => Some(serde_json::to_value(body)?),
181 None => None,
182 };
183 self.execute_with_retry(method, url, body, options).await
184 }
185
186 pub(crate) async fn execute_with_retry(
187 &self,
188 method: Method,
189 url: Url,
190 body: Option<Value>,
191 options: RequestOptions,
192 ) -> Result<RawResponse> {
193 let attempts = self.config.retry.attempts.max(1);
194 let mut last_error: Option<FlowfullError> = None;
195
196 for attempt in 0..attempts {
197 match self
198 .execute_once(method.clone(), url.clone(), body.clone(), options.clone())
199 .await
200 {
201 Ok(resp)
202 if self.should_retry_status(method.clone(), resp.status, attempt, attempts) =>
203 {
204 last_error = Some(FlowfullError::Api {
205 status: resp.status,
206 message: format!("retryable status {}", resp.status),
207 body: Some(resp.text()),
208 });
209 }
210 Ok(resp) => return Ok(resp),
211 Err(err) if self.should_retry_error(&method, attempt, attempts) => {
212 last_error = Some(err);
213 }
214 Err(err) => return Err(err),
215 }
216
217 if attempt + 1 < attempts {
218 sleep(self.retry_delay(attempt)).await;
219 }
220 }
221
222 Err(last_error.unwrap_or(FlowfullError::RequestFailed))
223 }
224
225 async fn execute_once(
226 &self,
227 method: Method,
228 url: Url,
229 body: Option<Value>,
230 options: RequestOptions,
231 ) -> Result<RawResponse> {
232 let mut request = self.http.request(method, url);
233
234 for (key, value) in self.config.headers.iter() {
235 request = request.header(key, value);
236 }
237 for (key, value) in options.headers.iter() {
238 request = request.header(key, value);
239 }
240
241 let include_session = options
242 .include_session
243 .unwrap_or(self.config.include_session);
244 let session_id = match options.session_id {
245 Some(session_id) => Some(session_id),
246 None if include_session => self.session_manager.get_session_id().await?,
247 None => None,
248 };
249 if let Some(session_id) = session_id {
250 if !session_id.is_empty() {
251 request = request.header(self.config.session_header.clone(), session_id);
252 }
253 }
254
255 if let Some(timeout) = options.timeout {
256 request = request.timeout(timeout);
257 }
258
259 if let Some(body) = body {
260 request = request.json(&body);
261 }
262
263 let response = request.send().await?;
264 let status = response.status().as_u16();
265 let headers = response.headers().clone();
266 let body = response.bytes().await?.to_vec();
267
268 Ok(RawResponse {
269 status,
270 headers,
271 body,
272 })
273 }
274
275 pub(crate) fn parse_json_response<T>(&self, raw: RawResponse) -> Result<T>
276 where
277 T: DeserializeOwned,
278 {
279 if !(200..300).contains(&raw.status) {
280 return Err(FlowfullError::Api {
281 status: raw.status,
282 message: raw.text(),
283 body: Some(raw.text()),
284 });
285 }
286
287 let value: Value = serde_json::from_slice(&raw.body)?;
288 if value.get("success").is_some() {
289 let envelope: ApiResponse<Value> = serde_json::from_value(value.clone())?;
290 if !envelope.success {
291 return Err(FlowfullError::Api {
292 status: raw.status,
293 message: envelope
294 .error
295 .or(envelope.message)
296 .unwrap_or_else(|| "request failed".to_string()),
297 body: Some(raw.text()),
298 });
299 }
300
301 if let Some(data) = envelope.data {
302 return Ok(serde_json::from_value(data)?);
303 }
304 }
305
306 Ok(serde_json::from_value(value)?)
307 }
308
309 pub(crate) async fn raw_multipart_request(
310 &self,
311 method: Method,
312 endpoint: &str,
313 form: reqwest::multipart::Form,
314 options: RequestOptions,
315 ) -> Result<RawResponse> {
316 let url = self.build_url(endpoint, &options)?;
317 let mut request = self.http.request(method, url).multipart(form);
318
319 for (key, value) in self.config.headers.iter() {
320 request = request.header(key, value);
321 }
322 for (key, value) in options.headers.iter() {
323 request = request.header(key, value);
324 }
325
326 let include_session = options
327 .include_session
328 .unwrap_or(self.config.include_session);
329 let session_id = match options.session_id {
330 Some(session_id) => Some(session_id),
331 None if include_session => self.session_manager.get_session_id().await?,
332 None => None,
333 };
334 if let Some(session_id) = session_id {
335 if !session_id.is_empty() {
336 request = request.header(self.config.session_header.clone(), session_id);
337 }
338 }
339
340 if let Some(timeout) = options.timeout {
341 request = request.timeout(timeout);
342 }
343
344 let response = request.send().await?;
345 let status = response.status().as_u16();
346 let headers = response.headers().clone();
347 let body = response.bytes().await?.to_vec();
348 Ok(RawResponse {
349 status,
350 headers,
351 body,
352 })
353 }
354
355 pub(crate) fn build_url(&self, endpoint: &str, options: &RequestOptions) -> Result<Url> {
356 let mut url = if endpoint.starts_with("http://") || endpoint.starts_with("https://") {
357 Url::parse(endpoint)?
358 } else {
359 let endpoint = endpoint.trim_start_matches('/');
360 self.config.base_url.join(endpoint)?
361 };
362
363 if !options.query.is_empty() {
364 let mut pairs = url.query_pairs_mut();
365 for (key, value) in &options.query {
366 pairs.append_pair(key, value);
367 }
368 }
369
370 Ok(url)
371 }
372
373 fn should_retry_error(&self, method: &Method, attempt: usize, attempts: usize) -> bool {
374 attempt + 1 < attempts && (is_idempotent(method) || self.config.retry.retry_non_idempotent)
375 }
376
377 fn should_retry_status(
378 &self,
379 method: Method,
380 status: u16,
381 attempt: usize,
382 attempts: usize,
383 ) -> bool {
384 attempt + 1 < attempts
385 && self.config.retry.retry_statuses.contains(&status)
386 && (is_idempotent(&method) || self.config.retry.retry_non_idempotent)
387 }
388
389 fn retry_delay(&self, attempt: usize) -> Duration {
390 let mut delay = self.config.retry.delay;
391 if self.config.retry.exponential {
392 delay = delay.saturating_mul(2_u32.saturating_pow(attempt as u32));
393 }
394 if let Some(max_delay) = self.config.retry.max_delay {
395 delay.min(max_delay)
396 } else {
397 delay
398 }
399 }
400}
401
402impl ClientConfigBuilder {
403 pub fn build_client(self) -> Result<FlowfullClient> {
404 FlowfullClient::from_config(self.build()?)
405 }
406}
407
408fn is_idempotent(method: &Method) -> bool {
409 matches!(
410 *method,
411 Method::GET | Method::HEAD | Method::PUT | Method::DELETE | Method::OPTIONS
412 )
413}