1use std::time::Duration;
2
3use reqwest::{Client, StatusCode};
4use serde::de::DeserializeOwned;
5use serde::{Deserialize, Serialize};
6use tokio::time::sleep;
7
8use crate::error::{SyncError, SyncResult};
9
10#[derive(Debug, Clone, Copy)]
11pub struct RetryConfig {
12 pub max_attempts: u32,
13 pub initial_delay: Duration,
14 pub max_delay: Duration,
15 pub exponential_base: u32,
16}
17
18impl Default for RetryConfig {
19 fn default() -> Self {
20 Self {
21 max_attempts: 5,
22 initial_delay: Duration::from_secs(2),
23 max_delay: Duration::from_secs(30),
24 exponential_base: 2,
25 }
26 }
27}
28
29#[derive(Clone, Debug)]
30pub struct SyncApiClient {
31 client: Client,
32 api_url: String,
33 token: String,
34 hostname: Option<String>,
35 sync_token: Option<String>,
36 retry_config: RetryConfig,
37}
38
39#[derive(Debug, Deserialize)]
40pub struct RegistryToken {
41 pub registry: String,
42 pub username: String,
43 pub token: String,
44}
45
46#[derive(Debug, Clone, Copy, Deserialize)]
47pub struct UploadResponse {
48 pub files_uploaded: usize,
49}
50
51#[derive(Debug, Deserialize)]
52pub struct DeployResponse {
53 pub status: String,
54 pub app_url: Option<String>,
55}
56
57impl SyncApiClient {
58 pub fn new(api_url: &str, token: &str) -> Self {
59 Self {
60 client: Client::new(),
61 api_url: api_url.to_string(),
62 token: token.to_string(),
63 hostname: None,
64 sync_token: None,
65 retry_config: RetryConfig::default(),
66 }
67 }
68
69 pub fn with_direct_sync(
70 mut self,
71 hostname: Option<String>,
72 sync_token: Option<String>,
73 ) -> Self {
74 self.hostname = hostname;
75 self.sync_token = sync_token;
76 self
77 }
78
79 fn direct_sync_credentials(&self) -> Option<(String, String)> {
80 match (&self.hostname, &self.sync_token) {
81 (Some(hostname), Some(token)) => {
82 let url = format!("https://{}/api/v1/sync/files", hostname);
83 Some((url, token.clone()))
84 },
85 _ => None,
86 }
87 }
88
89 fn calculate_next_delay(&self, current: Duration) -> Duration {
90 current
91 .saturating_mul(self.retry_config.exponential_base)
92 .min(self.retry_config.max_delay)
93 }
94
95 pub async fn upload_files(&self, tenant_id: &str, data: Vec<u8>) -> SyncResult<UploadResponse> {
96 let (url, token) = self.direct_sync_credentials().unwrap_or_else(|| {
97 (
98 format!("{}/api/v1/cloud/tenants/{}/files", self.api_url, tenant_id),
99 self.token.clone(),
100 )
101 });
102
103 let mut current_delay = self.retry_config.initial_delay;
104
105 for attempt in 1..=self.retry_config.max_attempts {
106 let response = self
107 .client
108 .post(&url)
109 .header("Authorization", format!("Bearer {}", token))
110 .header("Content-Type", "application/octet-stream")
111 .body(data.clone())
112 .send()
113 .await?;
114
115 match self.handle_json_response::<UploadResponse>(response).await {
116 Ok(upload) => return Ok(upload),
117 Err(error) if error.is_retryable() && attempt < self.retry_config.max_attempts => {
118 tracing::warn!(
119 attempt = attempt,
120 max_attempts = self.retry_config.max_attempts,
121 delay_ms = current_delay.as_millis() as u64,
122 error = %error,
123 "Retryable sync error, waiting before retry"
124 );
125 sleep(current_delay).await;
126 current_delay = self.calculate_next_delay(current_delay);
127 },
128 Err(error) => return Err(error),
129 }
130 }
131
132 Err(SyncError::ApiError {
133 status: 503,
134 message: "Max retry attempts exceeded".to_string(),
135 })
136 }
137
138 pub async fn download_files(&self, tenant_id: &str) -> SyncResult<Vec<u8>> {
139 let (url, token) = self.direct_sync_credentials().unwrap_or_else(|| {
140 (
141 format!("{}/api/v1/cloud/tenants/{}/files", self.api_url, tenant_id),
142 self.token.clone(),
143 )
144 });
145
146 let mut current_delay = self.retry_config.initial_delay;
147
148 for attempt in 1..=self.retry_config.max_attempts {
149 let response = self
150 .client
151 .get(&url)
152 .header("Authorization", format!("Bearer {}", token))
153 .send()
154 .await?;
155
156 match self.handle_binary_response(response).await {
157 Ok(data) => return Ok(data),
158 Err(error) if error.is_retryable() && attempt < self.retry_config.max_attempts => {
159 tracing::warn!(
160 attempt = attempt,
161 max_attempts = self.retry_config.max_attempts,
162 delay_ms = current_delay.as_millis() as u64,
163 error = %error,
164 "Retryable sync error, waiting before retry"
165 );
166 sleep(current_delay).await;
167 current_delay = self.calculate_next_delay(current_delay);
168 },
169 Err(error) => return Err(error),
170 }
171 }
172
173 Err(SyncError::ApiError {
174 status: 503,
175 message: "Max retry attempts exceeded".to_string(),
176 })
177 }
178
179 pub async fn get_registry_token(&self, tenant_id: &str) -> SyncResult<RegistryToken> {
180 let url = format!(
181 "{}/api/v1/cloud/tenants/{}/registry-token",
182 self.api_url, tenant_id
183 );
184 self.get(&url).await
185 }
186
187 pub async fn deploy(&self, tenant_id: &str, image: &str) -> SyncResult<DeployResponse> {
188 let url = format!("{}/api/v1/cloud/tenants/{}/deploy", self.api_url, tenant_id);
189 self.post(&url, &serde_json::json!({ "image": image }))
190 .await
191 }
192
193 pub async fn get_tenant_app_id(&self, tenant_id: &str) -> SyncResult<String> {
194 #[derive(Deserialize)]
195 struct TenantInfo {
196 fly_app_name: Option<String>,
197 }
198 let url = format!("{}/api/v1/cloud/tenants/{}", self.api_url, tenant_id);
199 let info: TenantInfo = self.get(&url).await?;
200 info.fly_app_name.ok_or(SyncError::TenantNoApp)
201 }
202
203 pub async fn get_database_url(&self, tenant_id: &str) -> SyncResult<String> {
204 #[derive(Deserialize)]
205 struct DatabaseInfo {
206 database_url: Option<String>,
207 }
208 let url = format!(
209 "{}/api/v1/cloud/tenants/{}/database",
210 self.api_url, tenant_id
211 );
212 let info: DatabaseInfo = self.get(&url).await?;
213 info.database_url.ok_or_else(|| SyncError::ApiError {
214 status: 404,
215 message: "Database URL not available for tenant".to_string(),
216 })
217 }
218
219 async fn get<T: DeserializeOwned>(&self, url: &str) -> SyncResult<T> {
220 let response = self
221 .client
222 .get(url)
223 .header("Authorization", format!("Bearer {}", self.token))
224 .send()
225 .await?;
226
227 self.handle_json_response(response).await
228 }
229
230 async fn post<T: DeserializeOwned, B: Serialize + Sync>(
231 &self,
232 url: &str,
233 body: &B,
234 ) -> SyncResult<T> {
235 let response = self
236 .client
237 .post(url)
238 .header("Authorization", format!("Bearer {}", self.token))
239 .json(body)
240 .send()
241 .await?;
242
243 self.handle_json_response(response).await
244 }
245
246 async fn handle_json_response<T: DeserializeOwned>(
247 &self,
248 response: reqwest::Response,
249 ) -> SyncResult<T> {
250 let status = response.status();
251 if status == StatusCode::UNAUTHORIZED {
252 return Err(SyncError::Unauthorized);
253 }
254 if !status.is_success() {
255 let message = response.text().await?;
256 return Err(SyncError::ApiError {
257 status: status.as_u16(),
258 message,
259 });
260 }
261 Ok(response.json().await?)
262 }
263
264 async fn handle_binary_response(&self, response: reqwest::Response) -> SyncResult<Vec<u8>> {
265 let status = response.status();
266 if !status.is_success() {
267 let message = response
268 .text()
269 .await
270 .unwrap_or_else(|e| format!("(body unreadable: {})", e));
271 return Err(SyncError::ApiError {
272 status: status.as_u16(),
273 message,
274 });
275 }
276 Ok(response.bytes().await?.to_vec())
277 }
278}