Skip to main content

systemprompt_sync/api_client/
mod.rs

1//! HTTP client used by sync push/pull/deploy.
2//!
3//! Handles direct-sync vs. cloud-relay endpoint selection, bearer-token
4//! auth, retryable failures with exponential backoff, and typed JSON /
5//! binary response handling.
6
7mod response;
8mod retry;
9mod token;
10
11use std::sync::Arc;
12use std::time::Duration;
13
14use reqwest::Client;
15use serde::de::DeserializeOwned;
16use serde::{Deserialize, Serialize};
17use systemprompt_models::net::{HTTP_CONNECT_TIMEOUT, HTTP_SYNC_DEPLOY_TIMEOUT};
18use tokio::sync::Mutex;
19use tokio::time::sleep;
20
21use crate::error::{SyncError, SyncResult};
22pub use retry::RetryConfig;
23pub use token::exchange_subject_token;
24use token::is_unauthorized;
25
26#[derive(Clone, Debug)]
27pub struct SyncApiClient {
28    client: Client,
29    api_url: String,
30    token: String,
31    hostname: Option<String>,
32    cached_sync_token: Arc<Mutex<Option<String>>>,
33    retry_config: RetryConfig,
34}
35
36#[derive(Debug, Deserialize)]
37pub struct RegistryToken {
38    pub registry: String,
39    pub username: String,
40    pub token: String,
41}
42
43#[derive(Debug, Clone, Copy, Deserialize)]
44pub struct UploadResponse {
45    pub files_uploaded: usize,
46}
47
48#[derive(Debug, Deserialize)]
49pub struct DeployResponse {
50    pub status: String,
51    pub app_url: Option<String>,
52}
53
54impl SyncApiClient {
55    pub fn new(api_url: &str, token: &str) -> SyncResult<Self> {
56        Ok(Self {
57            client: Client::builder()
58                .connect_timeout(HTTP_CONNECT_TIMEOUT)
59                .timeout(HTTP_SYNC_DEPLOY_TIMEOUT)
60                .build()?,
61            api_url: api_url.to_string(),
62            token: token.to_string(),
63            hostname: None,
64            cached_sync_token: Arc::new(Mutex::new(None)),
65            retry_config: RetryConfig::default(),
66        })
67    }
68
69    pub fn with_direct_sync(mut self, hostname: Option<String>) -> Self {
70        self.hostname = hostname;
71        self
72    }
73
74    const fn is_direct_sync(&self) -> bool {
75        self.hostname.is_some()
76    }
77
78    fn files_url(&self, tenant_id: &systemprompt_identifiers::TenantId) -> String {
79        self.hostname.as_ref().map_or_else(
80            || format!("{}/api/v1/cloud/tenants/{}/files", self.api_url, tenant_id),
81            |hostname| format!("https://{hostname}/api/v1/sync/files"),
82        )
83    }
84
85    fn calculate_next_delay(&self, current: Duration) -> Duration {
86        self.retry_config.next_delay(current)
87    }
88
89    pub async fn upload_files(
90        &self,
91        tenant_id: &systemprompt_identifiers::TenantId,
92        data: Vec<u8>,
93    ) -> SyncResult<UploadResponse> {
94        let url = self.files_url(tenant_id);
95        let direct = self.is_direct_sync();
96        let mut bearer = self.bearer_token(false).await?;
97        let mut reminted = false;
98
99        let mut current_delay = self.retry_config.initial_delay;
100
101        for attempt in 1..=self.retry_config.max_attempts {
102            let response = self
103                .client
104                .post(&url)
105                .header("Authorization", format!("Bearer {bearer}"))
106                .header("Content-Type", "application/octet-stream")
107                .body(data.clone())
108                .send()
109                .await?;
110
111            match response::handle_json::<UploadResponse>(response).await {
112                Ok(upload) => return Ok(upload),
113                Err(error) if direct && !reminted && is_unauthorized(&error) => {
114                    reminted = true;
115                    bearer = self.bearer_token(true).await?;
116                },
117                Err(error) if error.is_retryable() && attempt < self.retry_config.max_attempts => {
118                    tracing::warn!(
119                        attempt = attempt,
120                        max_attempts = self.retry_config.max_attempts,
121                        delay_ms = current_delay.as_millis() as u64,
122                        error = %error,
123                        "Retryable sync error, waiting before retry"
124                    );
125                    sleep(current_delay).await;
126                    current_delay = self.calculate_next_delay(current_delay);
127                },
128                Err(error) => return Err(error),
129            }
130        }
131
132        Err(SyncError::ApiError {
133            status: 503,
134            message: "Max retry attempts exceeded".to_string(),
135        })
136    }
137
138    pub async fn download_files(
139        &self,
140        tenant_id: &systemprompt_identifiers::TenantId,
141    ) -> SyncResult<Vec<u8>> {
142        let url = self.files_url(tenant_id);
143        let direct = self.is_direct_sync();
144        let mut bearer = self.bearer_token(false).await?;
145        let mut reminted = false;
146
147        let mut current_delay = self.retry_config.initial_delay;
148
149        for attempt in 1..=self.retry_config.max_attempts {
150            let response = self
151                .client
152                .get(&url)
153                .header("Authorization", format!("Bearer {bearer}"))
154                .send()
155                .await?;
156
157            match response::handle_binary(response).await {
158                Ok(data) => return Ok(data),
159                Err(error) if direct && !reminted && is_unauthorized(&error) => {
160                    reminted = true;
161                    bearer = self.bearer_token(true).await?;
162                },
163                Err(error) if error.is_retryable() && attempt < self.retry_config.max_attempts => {
164                    tracing::warn!(
165                        attempt = attempt,
166                        max_attempts = self.retry_config.max_attempts,
167                        delay_ms = current_delay.as_millis() as u64,
168                        error = %error,
169                        "Retryable sync error, waiting before retry"
170                    );
171                    sleep(current_delay).await;
172                    current_delay = self.calculate_next_delay(current_delay);
173                },
174                Err(error) => return Err(error),
175            }
176        }
177
178        Err(SyncError::ApiError {
179            status: 503,
180            message: "Max retry attempts exceeded".to_string(),
181        })
182    }
183
184    pub async fn get_registry_token(
185        &self,
186        tenant_id: &systemprompt_identifiers::TenantId,
187    ) -> SyncResult<RegistryToken> {
188        let url = format!(
189            "{}/api/v1/cloud/tenants/{}/registry-token",
190            self.api_url, tenant_id
191        );
192        self.get(&url).await
193    }
194
195    pub async fn deploy(
196        &self,
197        tenant_id: &systemprompt_identifiers::TenantId,
198        image: &str,
199    ) -> SyncResult<DeployResponse> {
200        let url = format!("{}/api/v1/cloud/tenants/{}/deploy", self.api_url, tenant_id);
201        self.post(&url, &serde_json::json!({ "image": image }))
202            .await
203    }
204
205    pub async fn get_tenant_app_id(
206        &self,
207        tenant_id: &systemprompt_identifiers::TenantId,
208    ) -> SyncResult<String> {
209        #[derive(Deserialize)]
210        struct TenantInfo {
211            fly_app_name: Option<String>,
212        }
213        let url = format!("{}/api/v1/cloud/tenants/{}", self.api_url, tenant_id);
214        let info: TenantInfo = self.get(&url).await?;
215        info.fly_app_name.ok_or(SyncError::TenantNoApp)
216    }
217
218    pub async fn get_database_url(
219        &self,
220        tenant_id: &systemprompt_identifiers::TenantId,
221    ) -> SyncResult<String> {
222        #[derive(Deserialize)]
223        struct DatabaseInfo {
224            database_url: Option<String>,
225        }
226        let url = format!(
227            "{}/api/v1/cloud/tenants/{}/database",
228            self.api_url, tenant_id
229        );
230        let info: DatabaseInfo = self.get(&url).await?;
231        info.database_url.ok_or_else(|| SyncError::ApiError {
232            status: 404,
233            message: "Database URL not available for tenant".to_string(),
234        })
235    }
236
237    async fn get<T: DeserializeOwned>(&self, url: &str) -> SyncResult<T> {
238        let resp = self
239            .client
240            .get(url)
241            .header("Authorization", format!("Bearer {}", self.token))
242            .send()
243            .await?;
244        response::handle_json(resp).await
245    }
246
247    async fn post<T: DeserializeOwned, B: Serialize + Sync>(
248        &self,
249        url: &str,
250        body: &B,
251    ) -> SyncResult<T> {
252        let resp = self
253            .client
254            .post(url)
255            .header("Authorization", format!("Bearer {}", self.token))
256            .json(body)
257            .send()
258            .await?;
259        response::handle_json(resp).await
260    }
261}