1use std::time::Duration;
2
3use reqwest::{Client, StatusCode};
4use serde::de::DeserializeOwned;
5use serde::{Deserialize, Serialize};
6use tokio::time::sleep;
7
8use crate::error::{SyncError, SyncResult};
9
10#[derive(Debug, Clone, Copy)]
11pub struct RetryConfig {
12 pub max_attempts: u32,
13 pub initial_delay: Duration,
14 pub max_delay: Duration,
15 pub exponential_base: u32,
16}
17
18impl Default for RetryConfig {
19 fn default() -> Self {
20 Self {
21 max_attempts: 3,
22 initial_delay: Duration::from_millis(500),
23 max_delay: Duration::from_secs(10),
24 exponential_base: 2,
25 }
26 }
27}
28
29#[derive(Clone, Debug)]
30pub struct SyncApiClient {
31 client: Client,
32 api_url: String,
33 token: String,
34 hostname: Option<String>,
35 sync_token: Option<String>,
36 retry_config: RetryConfig,
37}
38
39#[derive(Debug, Deserialize)]
40pub struct RegistryToken {
41 pub registry: String,
42 pub username: String,
43 pub token: String,
44}
45
46#[derive(Debug, Deserialize)]
47pub struct DeployResponse {
48 pub status: String,
49 pub app_url: Option<String>,
50}
51
52impl SyncApiClient {
53 pub fn new(api_url: &str, token: &str) -> Self {
54 Self {
55 client: Client::new(),
56 api_url: api_url.to_string(),
57 token: token.to_string(),
58 hostname: None,
59 sync_token: None,
60 retry_config: RetryConfig::default(),
61 }
62 }
63
64 pub fn with_direct_sync(
65 mut self,
66 hostname: Option<String>,
67 sync_token: Option<String>,
68 ) -> Self {
69 self.hostname = hostname;
70 self.sync_token = sync_token;
71 self
72 }
73
74 fn direct_sync_credentials(&self) -> Option<(String, String)> {
75 match (&self.hostname, &self.sync_token) {
76 (Some(hostname), Some(token)) => {
77 let url = format!("https://{}/api/v1/sync/files", hostname);
78 Some((url, token.clone()))
79 },
80 _ => None,
81 }
82 }
83
84 fn calculate_next_delay(&self, current: Duration) -> Duration {
85 current
86 .saturating_mul(self.retry_config.exponential_base)
87 .min(self.retry_config.max_delay)
88 }
89
90 pub async fn upload_files(&self, tenant_id: &str, data: Vec<u8>) -> SyncResult<()> {
91 let (url, token) = self.direct_sync_credentials().unwrap_or_else(|| {
92 (
93 format!("{}/api/v1/cloud/tenants/{}/files", self.api_url, tenant_id),
94 self.token.clone(),
95 )
96 });
97
98 let mut current_delay = self.retry_config.initial_delay;
99
100 for attempt in 1..=self.retry_config.max_attempts {
101 let response = self
102 .client
103 .post(&url)
104 .header("Authorization", format!("Bearer {}", token))
105 .header("Content-Type", "application/octet-stream")
106 .body(data.clone())
107 .send()
108 .await?;
109
110 match self.handle_empty_response(response).await {
111 Ok(()) => return Ok(()),
112 Err(error) if error.is_retryable() && attempt < self.retry_config.max_attempts => {
113 tracing::warn!(
114 attempt = attempt,
115 max_attempts = self.retry_config.max_attempts,
116 delay_ms = current_delay.as_millis() as u64,
117 error = %error,
118 "Retryable sync error, waiting before retry"
119 );
120 sleep(current_delay).await;
121 current_delay = self.calculate_next_delay(current_delay);
122 },
123 Err(error) => return Err(error),
124 }
125 }
126
127 Err(SyncError::ApiError {
128 status: 503,
129 message: "Max retry attempts exceeded".to_string(),
130 })
131 }
132
133 pub async fn download_files(&self, tenant_id: &str) -> SyncResult<Vec<u8>> {
134 let (url, token) = self.direct_sync_credentials().unwrap_or_else(|| {
135 (
136 format!("{}/api/v1/cloud/tenants/{}/files", self.api_url, tenant_id),
137 self.token.clone(),
138 )
139 });
140
141 let mut current_delay = self.retry_config.initial_delay;
142
143 for attempt in 1..=self.retry_config.max_attempts {
144 let response = self
145 .client
146 .get(&url)
147 .header("Authorization", format!("Bearer {}", token))
148 .send()
149 .await?;
150
151 match self.handle_binary_response(response).await {
152 Ok(data) => return Ok(data),
153 Err(error) if error.is_retryable() && attempt < self.retry_config.max_attempts => {
154 tracing::warn!(
155 attempt = attempt,
156 max_attempts = self.retry_config.max_attempts,
157 delay_ms = current_delay.as_millis() as u64,
158 error = %error,
159 "Retryable sync error, waiting before retry"
160 );
161 sleep(current_delay).await;
162 current_delay = self.calculate_next_delay(current_delay);
163 },
164 Err(error) => return Err(error),
165 }
166 }
167
168 Err(SyncError::ApiError {
169 status: 503,
170 message: "Max retry attempts exceeded".to_string(),
171 })
172 }
173
174 pub async fn get_registry_token(&self, tenant_id: &str) -> SyncResult<RegistryToken> {
175 let url = format!(
176 "{}/api/v1/cloud/tenants/{}/registry-token",
177 self.api_url, tenant_id
178 );
179 self.get(&url).await
180 }
181
182 pub async fn deploy(&self, tenant_id: &str, image: &str) -> SyncResult<DeployResponse> {
183 let url = format!("{}/api/v1/cloud/tenants/{}/deploy", self.api_url, tenant_id);
184 self.post(&url, &serde_json::json!({ "image": image }))
185 .await
186 }
187
188 pub async fn get_tenant_app_id(&self, tenant_id: &str) -> SyncResult<String> {
189 #[derive(Deserialize)]
190 struct TenantInfo {
191 fly_app_name: Option<String>,
192 }
193 let url = format!("{}/api/v1/cloud/tenants/{}", self.api_url, tenant_id);
194 let info: TenantInfo = self.get(&url).await?;
195 info.fly_app_name.ok_or(SyncError::TenantNoApp)
196 }
197
198 pub async fn get_database_url(&self, tenant_id: &str) -> SyncResult<String> {
199 #[derive(Deserialize)]
200 struct DatabaseInfo {
201 database_url: Option<String>,
202 }
203 let url = format!(
204 "{}/api/v1/cloud/tenants/{}/database",
205 self.api_url, tenant_id
206 );
207 let info: DatabaseInfo = self.get(&url).await?;
208 info.database_url.ok_or_else(|| SyncError::ApiError {
209 status: 404,
210 message: "Database URL not available for tenant".to_string(),
211 })
212 }
213
214 async fn get<T: DeserializeOwned>(&self, url: &str) -> SyncResult<T> {
215 let response = self
216 .client
217 .get(url)
218 .header("Authorization", format!("Bearer {}", self.token))
219 .send()
220 .await?;
221
222 self.handle_json_response(response).await
223 }
224
225 async fn post<T: DeserializeOwned, B: Serialize + Sync>(
226 &self,
227 url: &str,
228 body: &B,
229 ) -> SyncResult<T> {
230 let response = self
231 .client
232 .post(url)
233 .header("Authorization", format!("Bearer {}", self.token))
234 .json(body)
235 .send()
236 .await?;
237
238 self.handle_json_response(response).await
239 }
240
241 async fn handle_json_response<T: DeserializeOwned>(
242 &self,
243 response: reqwest::Response,
244 ) -> SyncResult<T> {
245 let status = response.status();
246 if status == StatusCode::UNAUTHORIZED {
247 return Err(SyncError::Unauthorized);
248 }
249 if !status.is_success() {
250 let message = response.text().await?;
251 return Err(SyncError::ApiError {
252 status: status.as_u16(),
253 message,
254 });
255 }
256 Ok(response.json().await?)
257 }
258
259 async fn handle_empty_response(&self, response: reqwest::Response) -> SyncResult<()> {
260 let status = response.status();
261 if !status.is_success() {
262 let message = response.text().await?;
263 return Err(SyncError::ApiError {
264 status: status.as_u16(),
265 message,
266 });
267 }
268 Ok(())
269 }
270
271 async fn handle_binary_response(&self, response: reqwest::Response) -> SyncResult<Vec<u8>> {
272 let status = response.status();
273 if !status.is_success() {
274 let message = response
275 .text()
276 .await
277 .unwrap_or_else(|e| format!("(body unreadable: {})", e));
278 return Err(SyncError::ApiError {
279 status: status.as_u16(),
280 message,
281 });
282 }
283 Ok(response.bytes().await?.to_vec())
284 }
285}