Skip to main content

systemprompt_sync/
api_client.rs

1use std::time::Duration;
2
3use reqwest::{Client, StatusCode};
4use serde::de::DeserializeOwned;
5use serde::{Deserialize, Serialize};
6use tokio::time::sleep;
7
8use crate::error::{SyncError, SyncResult};
9
10#[derive(Debug, Clone, Copy)]
11pub struct RetryConfig {
12    pub max_attempts: u32,
13    pub initial_delay: Duration,
14    pub max_delay: Duration,
15    pub exponential_base: u32,
16}
17
18impl Default for RetryConfig {
19    fn default() -> Self {
20        Self {
21            max_attempts: 5,
22            initial_delay: Duration::from_secs(2),
23            max_delay: Duration::from_secs(30),
24            exponential_base: 2,
25        }
26    }
27}
28
29#[derive(Clone, Debug)]
30pub struct SyncApiClient {
31    client: Client,
32    api_url: String,
33    token: String,
34    hostname: Option<String>,
35    sync_token: Option<String>,
36    retry_config: RetryConfig,
37}
38
39#[derive(Debug, Deserialize)]
40pub struct RegistryToken {
41    pub registry: String,
42    pub username: String,
43    pub token: String,
44}
45
46#[derive(Debug, Clone, Copy, Deserialize)]
47pub struct UploadResponse {
48    pub files_uploaded: usize,
49}
50
51#[derive(Debug, Deserialize)]
52pub struct DeployResponse {
53    pub status: String,
54    pub app_url: Option<String>,
55}
56
57impl SyncApiClient {
58    pub fn new(api_url: &str, token: &str) -> Self {
59        Self {
60            client: Client::new(),
61            api_url: api_url.to_string(),
62            token: token.to_string(),
63            hostname: None,
64            sync_token: None,
65            retry_config: RetryConfig::default(),
66        }
67    }
68
69    pub fn with_direct_sync(
70        mut self,
71        hostname: Option<String>,
72        sync_token: Option<String>,
73    ) -> Self {
74        self.hostname = hostname;
75        self.sync_token = sync_token;
76        self
77    }
78
79    fn direct_sync_credentials(&self) -> Option<(String, String)> {
80        match (&self.hostname, &self.sync_token) {
81            (Some(hostname), Some(token)) => {
82                let url = format!("https://{}/api/v1/sync/files", hostname);
83                Some((url, token.clone()))
84            },
85            _ => None,
86        }
87    }
88
89    fn calculate_next_delay(&self, current: Duration) -> Duration {
90        current
91            .saturating_mul(self.retry_config.exponential_base)
92            .min(self.retry_config.max_delay)
93    }
94
95    pub async fn upload_files(&self, tenant_id: &str, data: Vec<u8>) -> SyncResult<UploadResponse> {
96        let (url, token) = self.direct_sync_credentials().unwrap_or_else(|| {
97            (
98                format!("{}/api/v1/cloud/tenants/{}/files", self.api_url, tenant_id),
99                self.token.clone(),
100            )
101        });
102
103        let mut current_delay = self.retry_config.initial_delay;
104
105        for attempt in 1..=self.retry_config.max_attempts {
106            let response = self
107                .client
108                .post(&url)
109                .header("Authorization", format!("Bearer {}", token))
110                .header("Content-Type", "application/octet-stream")
111                .body(data.clone())
112                .send()
113                .await?;
114
115            match self.handle_json_response::<UploadResponse>(response).await {
116                Ok(upload) => return Ok(upload),
117                Err(error) if error.is_retryable() && attempt < self.retry_config.max_attempts => {
118                    tracing::warn!(
119                        attempt = attempt,
120                        max_attempts = self.retry_config.max_attempts,
121                        delay_ms = current_delay.as_millis() as u64,
122                        error = %error,
123                        "Retryable sync error, waiting before retry"
124                    );
125                    sleep(current_delay).await;
126                    current_delay = self.calculate_next_delay(current_delay);
127                },
128                Err(error) => return Err(error),
129            }
130        }
131
132        Err(SyncError::ApiError {
133            status: 503,
134            message: "Max retry attempts exceeded".to_string(),
135        })
136    }
137
138    pub async fn download_files(&self, tenant_id: &str) -> SyncResult<Vec<u8>> {
139        let (url, token) = self.direct_sync_credentials().unwrap_or_else(|| {
140            (
141                format!("{}/api/v1/cloud/tenants/{}/files", self.api_url, tenant_id),
142                self.token.clone(),
143            )
144        });
145
146        let mut current_delay = self.retry_config.initial_delay;
147
148        for attempt in 1..=self.retry_config.max_attempts {
149            let response = self
150                .client
151                .get(&url)
152                .header("Authorization", format!("Bearer {}", token))
153                .send()
154                .await?;
155
156            match self.handle_binary_response(response).await {
157                Ok(data) => return Ok(data),
158                Err(error) if error.is_retryable() && attempt < self.retry_config.max_attempts => {
159                    tracing::warn!(
160                        attempt = attempt,
161                        max_attempts = self.retry_config.max_attempts,
162                        delay_ms = current_delay.as_millis() as u64,
163                        error = %error,
164                        "Retryable sync error, waiting before retry"
165                    );
166                    sleep(current_delay).await;
167                    current_delay = self.calculate_next_delay(current_delay);
168                },
169                Err(error) => return Err(error),
170            }
171        }
172
173        Err(SyncError::ApiError {
174            status: 503,
175            message: "Max retry attempts exceeded".to_string(),
176        })
177    }
178
179    pub async fn get_registry_token(&self, tenant_id: &str) -> SyncResult<RegistryToken> {
180        let url = format!(
181            "{}/api/v1/cloud/tenants/{}/registry-token",
182            self.api_url, tenant_id
183        );
184        self.get(&url).await
185    }
186
187    pub async fn deploy(&self, tenant_id: &str, image: &str) -> SyncResult<DeployResponse> {
188        let url = format!("{}/api/v1/cloud/tenants/{}/deploy", self.api_url, tenant_id);
189        self.post(&url, &serde_json::json!({ "image": image }))
190            .await
191    }
192
193    pub async fn get_tenant_app_id(&self, tenant_id: &str) -> SyncResult<String> {
194        #[derive(Deserialize)]
195        struct TenantInfo {
196            fly_app_name: Option<String>,
197        }
198        let url = format!("{}/api/v1/cloud/tenants/{}", self.api_url, tenant_id);
199        let info: TenantInfo = self.get(&url).await?;
200        info.fly_app_name.ok_or(SyncError::TenantNoApp)
201    }
202
203    pub async fn get_database_url(&self, tenant_id: &str) -> SyncResult<String> {
204        #[derive(Deserialize)]
205        struct DatabaseInfo {
206            database_url: Option<String>,
207        }
208        let url = format!(
209            "{}/api/v1/cloud/tenants/{}/database",
210            self.api_url, tenant_id
211        );
212        let info: DatabaseInfo = self.get(&url).await?;
213        info.database_url.ok_or_else(|| SyncError::ApiError {
214            status: 404,
215            message: "Database URL not available for tenant".to_string(),
216        })
217    }
218
219    async fn get<T: DeserializeOwned>(&self, url: &str) -> SyncResult<T> {
220        let response = self
221            .client
222            .get(url)
223            .header("Authorization", format!("Bearer {}", self.token))
224            .send()
225            .await?;
226
227        self.handle_json_response(response).await
228    }
229
230    async fn post<T: DeserializeOwned, B: Serialize + Sync>(
231        &self,
232        url: &str,
233        body: &B,
234    ) -> SyncResult<T> {
235        let response = self
236            .client
237            .post(url)
238            .header("Authorization", format!("Bearer {}", self.token))
239            .json(body)
240            .send()
241            .await?;
242
243        self.handle_json_response(response).await
244    }
245
246    async fn handle_json_response<T: DeserializeOwned>(
247        &self,
248        response: reqwest::Response,
249    ) -> SyncResult<T> {
250        let status = response.status();
251        if status == StatusCode::UNAUTHORIZED {
252            return Err(SyncError::Unauthorized);
253        }
254        if !status.is_success() {
255            let message = response.text().await?;
256            return Err(SyncError::ApiError {
257                status: status.as_u16(),
258                message,
259            });
260        }
261        Ok(response.json().await?)
262    }
263
264    async fn handle_binary_response(&self, response: reqwest::Response) -> SyncResult<Vec<u8>> {
265        let status = response.status();
266        if !status.is_success() {
267            let message = response
268                .text()
269                .await
270                .unwrap_or_else(|e| format!("(body unreadable: {})", e));
271            return Err(SyncError::ApiError {
272                status: status.as_u16(),
273                message,
274            });
275        }
276        Ok(response.bytes().await?.to_vec())
277    }
278}