Skip to main content

systemprompt_sync/
api_client.rs

1use std::time::Duration;
2
3use reqwest::{Client, StatusCode};
4use serde::de::DeserializeOwned;
5use serde::{Deserialize, Serialize};
6use tokio::time::sleep;
7
8use crate::error::{SyncError, SyncResult};
9
10#[derive(Debug, Clone, Copy)]
11pub struct RetryConfig {
12    pub max_attempts: u32,
13    pub initial_delay: Duration,
14    pub max_delay: Duration,
15    pub exponential_base: u32,
16}
17
18impl Default for RetryConfig {
19    fn default() -> Self {
20        Self {
21            max_attempts: 5,
22            initial_delay: Duration::from_secs(2),
23            max_delay: Duration::from_secs(30),
24            exponential_base: 2,
25        }
26    }
27}
28
29#[derive(Clone, Debug)]
30pub struct SyncApiClient {
31    client: Client,
32    api_url: String,
33    token: String,
34    hostname: Option<String>,
35    sync_token: Option<String>,
36    retry_config: RetryConfig,
37}
38
39#[derive(Debug, Deserialize)]
40pub struct RegistryToken {
41    pub registry: String,
42    pub username: String,
43    pub token: String,
44}
45
46#[derive(Debug, Clone, Copy, Deserialize)]
47pub struct UploadResponse {
48    pub files_uploaded: usize,
49}
50
51#[derive(Debug, Deserialize)]
52pub struct DeployResponse {
53    pub status: String,
54    pub app_url: Option<String>,
55}
56
57impl SyncApiClient {
58    pub fn new(api_url: &str, token: &str) -> SyncResult<Self> {
59        Ok(Self {
60            client: Client::builder()
61                .connect_timeout(Duration::from_secs(10))
62                .timeout(Duration::from_secs(60))
63                .build()?,
64            api_url: api_url.to_string(),
65            token: token.to_string(),
66            hostname: None,
67            sync_token: None,
68            retry_config: RetryConfig::default(),
69        })
70    }
71
72    pub fn with_direct_sync(
73        mut self,
74        hostname: Option<String>,
75        sync_token: Option<String>,
76    ) -> Self {
77        self.hostname = hostname;
78        self.sync_token = sync_token;
79        self
80    }
81
82    fn direct_sync_credentials(&self) -> Option<(String, String)> {
83        match (&self.hostname, &self.sync_token) {
84            (Some(hostname), Some(token)) => {
85                let url = format!("https://{}/api/v1/sync/files", hostname);
86                Some((url, token.clone()))
87            },
88            _ => None,
89        }
90    }
91
92    fn calculate_next_delay(&self, current: Duration) -> Duration {
93        current
94            .saturating_mul(self.retry_config.exponential_base)
95            .min(self.retry_config.max_delay)
96    }
97
98    pub async fn upload_files(&self, tenant_id: &str, data: Vec<u8>) -> SyncResult<UploadResponse> {
99        let (url, token) = self.direct_sync_credentials().unwrap_or_else(|| {
100            (
101                format!("{}/api/v1/cloud/tenants/{}/files", self.api_url, tenant_id),
102                self.token.clone(),
103            )
104        });
105
106        let mut current_delay = self.retry_config.initial_delay;
107
108        for attempt in 1..=self.retry_config.max_attempts {
109            let response = self
110                .client
111                .post(&url)
112                .header("Authorization", format!("Bearer {}", token))
113                .header("Content-Type", "application/octet-stream")
114                .body(data.clone())
115                .send()
116                .await?;
117
118            match self.handle_json_response::<UploadResponse>(response).await {
119                Ok(upload) => return Ok(upload),
120                Err(error) if error.is_retryable() && attempt < self.retry_config.max_attempts => {
121                    tracing::warn!(
122                        attempt = attempt,
123                        max_attempts = self.retry_config.max_attempts,
124                        delay_ms = current_delay.as_millis() as u64,
125                        error = %error,
126                        "Retryable sync error, waiting before retry"
127                    );
128                    sleep(current_delay).await;
129                    current_delay = self.calculate_next_delay(current_delay);
130                },
131                Err(error) => return Err(error),
132            }
133        }
134
135        Err(SyncError::ApiError {
136            status: 503,
137            message: "Max retry attempts exceeded".to_string(),
138        })
139    }
140
141    pub async fn download_files(&self, tenant_id: &str) -> SyncResult<Vec<u8>> {
142        let (url, token) = self.direct_sync_credentials().unwrap_or_else(|| {
143            (
144                format!("{}/api/v1/cloud/tenants/{}/files", self.api_url, tenant_id),
145                self.token.clone(),
146            )
147        });
148
149        let mut current_delay = self.retry_config.initial_delay;
150
151        for attempt in 1..=self.retry_config.max_attempts {
152            let response = self
153                .client
154                .get(&url)
155                .header("Authorization", format!("Bearer {}", token))
156                .send()
157                .await?;
158
159            match self.handle_binary_response(response).await {
160                Ok(data) => return Ok(data),
161                Err(error) if error.is_retryable() && attempt < self.retry_config.max_attempts => {
162                    tracing::warn!(
163                        attempt = attempt,
164                        max_attempts = self.retry_config.max_attempts,
165                        delay_ms = current_delay.as_millis() as u64,
166                        error = %error,
167                        "Retryable sync error, waiting before retry"
168                    );
169                    sleep(current_delay).await;
170                    current_delay = self.calculate_next_delay(current_delay);
171                },
172                Err(error) => return Err(error),
173            }
174        }
175
176        Err(SyncError::ApiError {
177            status: 503,
178            message: "Max retry attempts exceeded".to_string(),
179        })
180    }
181
182    pub async fn get_registry_token(&self, tenant_id: &str) -> SyncResult<RegistryToken> {
183        let url = format!(
184            "{}/api/v1/cloud/tenants/{}/registry-token",
185            self.api_url, tenant_id
186        );
187        self.get(&url).await
188    }
189
190    pub async fn deploy(&self, tenant_id: &str, image: &str) -> SyncResult<DeployResponse> {
191        let url = format!("{}/api/v1/cloud/tenants/{}/deploy", self.api_url, tenant_id);
192        self.post(&url, &serde_json::json!({ "image": image }))
193            .await
194    }
195
196    pub async fn get_tenant_app_id(&self, tenant_id: &str) -> SyncResult<String> {
197        #[derive(Deserialize)]
198        struct TenantInfo {
199            fly_app_name: Option<String>,
200        }
201        let url = format!("{}/api/v1/cloud/tenants/{}", self.api_url, tenant_id);
202        let info: TenantInfo = self.get(&url).await?;
203        info.fly_app_name.ok_or(SyncError::TenantNoApp)
204    }
205
206    pub async fn get_database_url(&self, tenant_id: &str) -> SyncResult<String> {
207        #[derive(Deserialize)]
208        struct DatabaseInfo {
209            database_url: Option<String>,
210        }
211        let url = format!(
212            "{}/api/v1/cloud/tenants/{}/database",
213            self.api_url, tenant_id
214        );
215        let info: DatabaseInfo = self.get(&url).await?;
216        info.database_url.ok_or_else(|| SyncError::ApiError {
217            status: 404,
218            message: "Database URL not available for tenant".to_string(),
219        })
220    }
221
222    async fn get<T: DeserializeOwned>(&self, url: &str) -> SyncResult<T> {
223        let response = self
224            .client
225            .get(url)
226            .header("Authorization", format!("Bearer {}", self.token))
227            .send()
228            .await?;
229
230        self.handle_json_response(response).await
231    }
232
233    async fn post<T: DeserializeOwned, B: Serialize + Sync>(
234        &self,
235        url: &str,
236        body: &B,
237    ) -> SyncResult<T> {
238        let response = self
239            .client
240            .post(url)
241            .header("Authorization", format!("Bearer {}", self.token))
242            .json(body)
243            .send()
244            .await?;
245
246        self.handle_json_response(response).await
247    }
248
249    async fn handle_json_response<T: DeserializeOwned>(
250        &self,
251        response: reqwest::Response,
252    ) -> SyncResult<T> {
253        let status = response.status();
254        if status == StatusCode::UNAUTHORIZED {
255            return Err(SyncError::Unauthorized);
256        }
257        if !status.is_success() {
258            let message = response.text().await?;
259            return Err(SyncError::ApiError {
260                status: status.as_u16(),
261                message,
262            });
263        }
264        Ok(response.json().await?)
265    }
266
267    async fn handle_binary_response(&self, response: reqwest::Response) -> SyncResult<Vec<u8>> {
268        let status = response.status();
269        if !status.is_success() {
270            let message = response
271                .text()
272                .await
273                .unwrap_or_else(|e| format!("(body unreadable: {})", e));
274            return Err(SyncError::ApiError {
275                status: status.as_u16(),
276                message,
277            });
278        }
279        Ok(response.bytes().await?.to_vec())
280    }
281}