Skip to main content

systemprompt_sync/api_client/
mod.rs

1//! HTTP client used by sync push/pull/deploy.
2//!
3//! Handles direct-sync vs. cloud-relay endpoint selection, bearer-token
4//! auth, retryable failures with exponential backoff, and typed JSON /
5//! binary response handling.
6
7mod response;
8mod retry;
9
10use std::time::Duration;
11
12use reqwest::Client;
13use serde::de::DeserializeOwned;
14use serde::{Deserialize, Serialize};
15use systemprompt_models::net::{HTTP_CONNECT_TIMEOUT, HTTP_SYNC_DEPLOY_TIMEOUT};
16use tokio::time::sleep;
17
18use crate::error::{SyncError, SyncResult};
19pub use retry::RetryConfig;
20
21#[derive(Clone, Debug)]
22pub struct SyncApiClient {
23    client: Client,
24    api_url: String,
25    token: String,
26    hostname: Option<String>,
27    sync_token: Option<String>,
28    retry_config: RetryConfig,
29}
30
31#[derive(Debug, Deserialize)]
32pub struct RegistryToken {
33    pub registry: String,
34    pub username: String,
35    pub token: String,
36}
37
38#[derive(Debug, Clone, Copy, Deserialize)]
39pub struct UploadResponse {
40    pub files_uploaded: usize,
41}
42
43#[derive(Debug, Deserialize)]
44pub struct DeployResponse {
45    pub status: String,
46    pub app_url: Option<String>,
47}
48
49impl SyncApiClient {
50    pub fn new(api_url: &str, token: &str) -> SyncResult<Self> {
51        Ok(Self {
52            client: Client::builder()
53                .connect_timeout(HTTP_CONNECT_TIMEOUT)
54                .timeout(HTTP_SYNC_DEPLOY_TIMEOUT)
55                .build()?,
56            api_url: api_url.to_string(),
57            token: token.to_string(),
58            hostname: None,
59            sync_token: None,
60            retry_config: RetryConfig::default(),
61        })
62    }
63
64    pub fn with_direct_sync(
65        mut self,
66        hostname: Option<String>,
67        sync_token: Option<String>,
68    ) -> Self {
69        self.hostname = hostname;
70        self.sync_token = sync_token;
71        self
72    }
73
74    fn direct_sync_credentials(&self) -> Option<(String, String)> {
75        match (&self.hostname, &self.sync_token) {
76            (Some(hostname), Some(token)) => {
77                let url = format!("https://{}/api/v1/sync/files", hostname);
78                Some((url, token.clone()))
79            },
80            _ => None,
81        }
82    }
83
84    fn calculate_next_delay(&self, current: Duration) -> Duration {
85        self.retry_config.next_delay(current)
86    }
87
88    pub async fn upload_files(
89        &self,
90        tenant_id: &systemprompt_identifiers::TenantId,
91        data: Vec<u8>,
92    ) -> SyncResult<UploadResponse> {
93        let (url, token) = self.direct_sync_credentials().unwrap_or_else(|| {
94            (
95                format!("{}/api/v1/cloud/tenants/{}/files", self.api_url, tenant_id),
96                self.token.clone(),
97            )
98        });
99
100        let mut current_delay = self.retry_config.initial_delay;
101
102        for attempt in 1..=self.retry_config.max_attempts {
103            let response = self
104                .client
105                .post(&url)
106                .header("Authorization", format!("Bearer {}", token))
107                .header("Content-Type", "application/octet-stream")
108                .body(data.clone())
109                .send()
110                .await?;
111
112            match response::handle_json::<UploadResponse>(response).await {
113                Ok(upload) => return Ok(upload),
114                Err(error) if error.is_retryable() && attempt < self.retry_config.max_attempts => {
115                    tracing::warn!(
116                        attempt = attempt,
117                        max_attempts = self.retry_config.max_attempts,
118                        delay_ms = current_delay.as_millis() as u64,
119                        error = %error,
120                        "Retryable sync error, waiting before retry"
121                    );
122                    sleep(current_delay).await;
123                    current_delay = self.calculate_next_delay(current_delay);
124                },
125                Err(error) => return Err(error),
126            }
127        }
128
129        Err(SyncError::ApiError {
130            status: 503,
131            message: "Max retry attempts exceeded".to_string(),
132        })
133    }
134
135    pub async fn download_files(
136        &self,
137        tenant_id: &systemprompt_identifiers::TenantId,
138    ) -> SyncResult<Vec<u8>> {
139        let (url, token) = self.direct_sync_credentials().unwrap_or_else(|| {
140            (
141                format!("{}/api/v1/cloud/tenants/{}/files", self.api_url, tenant_id),
142                self.token.clone(),
143            )
144        });
145
146        let mut current_delay = self.retry_config.initial_delay;
147
148        for attempt in 1..=self.retry_config.max_attempts {
149            let response = self
150                .client
151                .get(&url)
152                .header("Authorization", format!("Bearer {}", token))
153                .send()
154                .await?;
155
156            match response::handle_binary(response).await {
157                Ok(data) => return Ok(data),
158                Err(error) if error.is_retryable() && attempt < self.retry_config.max_attempts => {
159                    tracing::warn!(
160                        attempt = attempt,
161                        max_attempts = self.retry_config.max_attempts,
162                        delay_ms = current_delay.as_millis() as u64,
163                        error = %error,
164                        "Retryable sync error, waiting before retry"
165                    );
166                    sleep(current_delay).await;
167                    current_delay = self.calculate_next_delay(current_delay);
168                },
169                Err(error) => return Err(error),
170            }
171        }
172
173        Err(SyncError::ApiError {
174            status: 503,
175            message: "Max retry attempts exceeded".to_string(),
176        })
177    }
178
179    pub async fn get_registry_token(
180        &self,
181        tenant_id: &systemprompt_identifiers::TenantId,
182    ) -> SyncResult<RegistryToken> {
183        let url = format!(
184            "{}/api/v1/cloud/tenants/{}/registry-token",
185            self.api_url, tenant_id
186        );
187        self.get(&url).await
188    }
189
190    pub async fn deploy(
191        &self,
192        tenant_id: &systemprompt_identifiers::TenantId,
193        image: &str,
194    ) -> SyncResult<DeployResponse> {
195        let url = format!("{}/api/v1/cloud/tenants/{}/deploy", self.api_url, tenant_id);
196        self.post(&url, &serde_json::json!({ "image": image }))
197            .await
198    }
199
200    pub async fn get_tenant_app_id(
201        &self,
202        tenant_id: &systemprompt_identifiers::TenantId,
203    ) -> SyncResult<String> {
204        #[derive(Deserialize)]
205        struct TenantInfo {
206            fly_app_name: Option<String>,
207        }
208        let url = format!("{}/api/v1/cloud/tenants/{}", self.api_url, tenant_id);
209        let info: TenantInfo = self.get(&url).await?;
210        info.fly_app_name.ok_or(SyncError::TenantNoApp)
211    }
212
213    pub async fn get_database_url(
214        &self,
215        tenant_id: &systemprompt_identifiers::TenantId,
216    ) -> SyncResult<String> {
217        #[derive(Deserialize)]
218        struct DatabaseInfo {
219            database_url: Option<String>,
220        }
221        let url = format!(
222            "{}/api/v1/cloud/tenants/{}/database",
223            self.api_url, tenant_id
224        );
225        let info: DatabaseInfo = self.get(&url).await?;
226        info.database_url.ok_or_else(|| SyncError::ApiError {
227            status: 404,
228            message: "Database URL not available for tenant".to_string(),
229        })
230    }
231
232    async fn get<T: DeserializeOwned>(&self, url: &str) -> SyncResult<T> {
233        let resp = self
234            .client
235            .get(url)
236            .header("Authorization", format!("Bearer {}", self.token))
237            .send()
238            .await?;
239        response::handle_json(resp).await
240    }
241
242    async fn post<T: DeserializeOwned, B: Serialize + Sync>(
243        &self,
244        url: &str,
245        body: &B,
246    ) -> SyncResult<T> {
247        let resp = self
248            .client
249            .post(url)
250            .header("Authorization", format!("Bearer {}", self.token))
251            .json(body)
252            .send()
253            .await?;
254        response::handle_json(resp).await
255    }
256}