Skip to main content

systemprompt_sync/
api_client.rs

1use std::time::Duration;
2
3use reqwest::{Client, StatusCode};
4use serde::de::DeserializeOwned;
5use serde::{Deserialize, Serialize};
6use tokio::time::sleep;
7
8use crate::error::{SyncError, SyncResult};
9
10#[derive(Debug, Clone, Copy)]
11pub struct RetryConfig {
12    pub max_attempts: u32,
13    pub initial_delay: Duration,
14    pub max_delay: Duration,
15    pub exponential_base: u32,
16}
17
18impl Default for RetryConfig {
19    fn default() -> Self {
20        Self {
21            max_attempts: 3,
22            initial_delay: Duration::from_millis(500),
23            max_delay: Duration::from_secs(10),
24            exponential_base: 2,
25        }
26    }
27}
28
29#[derive(Clone, Debug)]
30pub struct SyncApiClient {
31    client: Client,
32    api_url: String,
33    token: String,
34    hostname: Option<String>,
35    sync_token: Option<String>,
36    retry_config: RetryConfig,
37}
38
39#[derive(Debug, Deserialize)]
40pub struct RegistryToken {
41    pub registry: String,
42    pub username: String,
43    pub token: String,
44}
45
46#[derive(Debug, Deserialize)]
47pub struct DeployResponse {
48    pub status: String,
49    pub app_url: Option<String>,
50}
51
52impl SyncApiClient {
53    pub fn new(api_url: &str, token: &str) -> Self {
54        Self {
55            client: Client::new(),
56            api_url: api_url.to_string(),
57            token: token.to_string(),
58            hostname: None,
59            sync_token: None,
60            retry_config: RetryConfig::default(),
61        }
62    }
63
64    pub fn with_direct_sync(
65        mut self,
66        hostname: Option<String>,
67        sync_token: Option<String>,
68    ) -> Self {
69        self.hostname = hostname;
70        self.sync_token = sync_token;
71        self
72    }
73
74    fn direct_sync_credentials(&self) -> Option<(String, String)> {
75        match (&self.hostname, &self.sync_token) {
76            (Some(hostname), Some(token)) => {
77                let url = format!("https://{}/api/v1/sync/files", hostname);
78                Some((url, token.clone()))
79            },
80            _ => None,
81        }
82    }
83
84    fn calculate_next_delay(&self, current: Duration) -> Duration {
85        current
86            .saturating_mul(self.retry_config.exponential_base)
87            .min(self.retry_config.max_delay)
88    }
89
90    pub async fn upload_files(&self, tenant_id: &str, data: Vec<u8>) -> SyncResult<()> {
91        let (url, token) = self.direct_sync_credentials().unwrap_or_else(|| {
92            (
93                format!("{}/api/v1/cloud/tenants/{}/files", self.api_url, tenant_id),
94                self.token.clone(),
95            )
96        });
97
98        let mut current_delay = self.retry_config.initial_delay;
99
100        for attempt in 1..=self.retry_config.max_attempts {
101            let response = self
102                .client
103                .post(&url)
104                .header("Authorization", format!("Bearer {}", token))
105                .header("Content-Type", "application/octet-stream")
106                .body(data.clone())
107                .send()
108                .await?;
109
110            match self.handle_empty_response(response).await {
111                Ok(()) => return Ok(()),
112                Err(error) if error.is_retryable() && attempt < self.retry_config.max_attempts => {
113                    tracing::warn!(
114                        attempt = attempt,
115                        max_attempts = self.retry_config.max_attempts,
116                        delay_ms = current_delay.as_millis() as u64,
117                        error = %error,
118                        "Retryable sync error, waiting before retry"
119                    );
120                    sleep(current_delay).await;
121                    current_delay = self.calculate_next_delay(current_delay);
122                },
123                Err(error) => return Err(error),
124            }
125        }
126
127        Err(SyncError::ApiError {
128            status: 503,
129            message: "Max retry attempts exceeded".to_string(),
130        })
131    }
132
133    pub async fn download_files(&self, tenant_id: &str) -> SyncResult<Vec<u8>> {
134        let (url, token) = self.direct_sync_credentials().unwrap_or_else(|| {
135            (
136                format!("{}/api/v1/cloud/tenants/{}/files", self.api_url, tenant_id),
137                self.token.clone(),
138            )
139        });
140
141        let mut current_delay = self.retry_config.initial_delay;
142
143        for attempt in 1..=self.retry_config.max_attempts {
144            let response = self
145                .client
146                .get(&url)
147                .header("Authorization", format!("Bearer {}", token))
148                .send()
149                .await?;
150
151            match self.handle_binary_response(response).await {
152                Ok(data) => return Ok(data),
153                Err(error) if error.is_retryable() && attempt < self.retry_config.max_attempts => {
154                    tracing::warn!(
155                        attempt = attempt,
156                        max_attempts = self.retry_config.max_attempts,
157                        delay_ms = current_delay.as_millis() as u64,
158                        error = %error,
159                        "Retryable sync error, waiting before retry"
160                    );
161                    sleep(current_delay).await;
162                    current_delay = self.calculate_next_delay(current_delay);
163                },
164                Err(error) => return Err(error),
165            }
166        }
167
168        Err(SyncError::ApiError {
169            status: 503,
170            message: "Max retry attempts exceeded".to_string(),
171        })
172    }
173
174    pub async fn get_registry_token(&self, tenant_id: &str) -> SyncResult<RegistryToken> {
175        let url = format!(
176            "{}/api/v1/cloud/tenants/{}/registry-token",
177            self.api_url, tenant_id
178        );
179        self.get(&url).await
180    }
181
182    pub async fn deploy(&self, tenant_id: &str, image: &str) -> SyncResult<DeployResponse> {
183        let url = format!("{}/api/v1/cloud/tenants/{}/deploy", self.api_url, tenant_id);
184        self.post(&url, &serde_json::json!({ "image": image }))
185            .await
186    }
187
188    pub async fn get_tenant_app_id(&self, tenant_id: &str) -> SyncResult<String> {
189        #[derive(Deserialize)]
190        struct TenantInfo {
191            fly_app_name: Option<String>,
192        }
193        let url = format!("{}/api/v1/cloud/tenants/{}", self.api_url, tenant_id);
194        let info: TenantInfo = self.get(&url).await?;
195        info.fly_app_name.ok_or(SyncError::TenantNoApp)
196    }
197
198    pub async fn get_database_url(&self, tenant_id: &str) -> SyncResult<String> {
199        #[derive(Deserialize)]
200        struct DatabaseInfo {
201            database_url: Option<String>,
202        }
203        let url = format!(
204            "{}/api/v1/cloud/tenants/{}/database",
205            self.api_url, tenant_id
206        );
207        let info: DatabaseInfo = self.get(&url).await?;
208        info.database_url.ok_or_else(|| SyncError::ApiError {
209            status: 404,
210            message: "Database URL not available for tenant".to_string(),
211        })
212    }
213
214    async fn get<T: DeserializeOwned>(&self, url: &str) -> SyncResult<T> {
215        let response = self
216            .client
217            .get(url)
218            .header("Authorization", format!("Bearer {}", self.token))
219            .send()
220            .await?;
221
222        self.handle_json_response(response).await
223    }
224
225    async fn post<T: DeserializeOwned, B: Serialize + Sync>(
226        &self,
227        url: &str,
228        body: &B,
229    ) -> SyncResult<T> {
230        let response = self
231            .client
232            .post(url)
233            .header("Authorization", format!("Bearer {}", self.token))
234            .json(body)
235            .send()
236            .await?;
237
238        self.handle_json_response(response).await
239    }
240
241    async fn handle_json_response<T: DeserializeOwned>(
242        &self,
243        response: reqwest::Response,
244    ) -> SyncResult<T> {
245        let status = response.status();
246        if status == StatusCode::UNAUTHORIZED {
247            return Err(SyncError::Unauthorized);
248        }
249        if !status.is_success() {
250            let message = response.text().await?;
251            return Err(SyncError::ApiError {
252                status: status.as_u16(),
253                message,
254            });
255        }
256        Ok(response.json().await?)
257    }
258
259    async fn handle_empty_response(&self, response: reqwest::Response) -> SyncResult<()> {
260        let status = response.status();
261        if !status.is_success() {
262            let message = response.text().await?;
263            return Err(SyncError::ApiError {
264                status: status.as_u16(),
265                message,
266            });
267        }
268        Ok(())
269    }
270
271    async fn handle_binary_response(&self, response: reqwest::Response) -> SyncResult<Vec<u8>> {
272        let status = response.status();
273        if !status.is_success() {
274            let message = response
275                .text()
276                .await
277                .unwrap_or_else(|e| format!("(body unreadable: {})", e));
278            return Err(SyncError::ApiError {
279                status: status.as_u16(),
280                message,
281            });
282        }
283        Ok(response.bytes().await?.to_vec())
284    }
285}