Skip to main content

systemprompt_sync/api_client/
mod.rs

1//! HTTP client used by sync push/pull/deploy.
2//!
3//! Handles direct-sync vs. cloud-relay endpoint selection, bearer-token
4//! auth, retryable failures with exponential backoff, and typed JSON /
5//! binary response handling.
6
7mod response;
8mod retry;
9mod token;
10
11use std::sync::Arc;
12use std::time::Duration;
13
14use reqwest::Client;
15use serde::de::DeserializeOwned;
16use serde::{Deserialize, Serialize};
17use systemprompt_models::net::{HTTP_CONNECT_TIMEOUT, HTTP_SYNC_DEPLOY_TIMEOUT};
18use tokio::sync::Mutex;
19use tokio::time::sleep;
20
21use crate::error::{SyncError, SyncResult};
22pub use retry::RetryConfig;
23use token::is_unauthorized;
24pub use token::{exchange_subject_token, exchange_subject_token_at};
25
26#[derive(Clone, Debug)]
27pub struct SyncApiClient {
28    client: Client,
29    api_url: String,
30    token: String,
31    direct_sync_origin: Option<String>,
32    cached_sync_token: Arc<Mutex<Option<String>>>,
33    retry_config: RetryConfig,
34}
35
36#[derive(Debug, Deserialize)]
37pub struct RegistryToken {
38    pub registry: String,
39    pub username: String,
40    pub token: String,
41}
42
43#[derive(Debug, Clone, Copy, Deserialize)]
44pub struct UploadResponse {
45    pub files_uploaded: usize,
46}
47
48#[derive(Debug, Deserialize)]
49pub struct DeployResponse {
50    pub status: String,
51    pub app_url: Option<String>,
52}
53
54impl SyncApiClient {
55    pub fn new(api_url: &str, token: &str) -> SyncResult<Self> {
56        Ok(Self {
57            client: Client::builder()
58                .connect_timeout(HTTP_CONNECT_TIMEOUT)
59                .timeout(HTTP_SYNC_DEPLOY_TIMEOUT)
60                .build()?,
61            api_url: api_url.to_owned(),
62            token: token.to_owned(),
63            direct_sync_origin: None,
64            cached_sync_token: Arc::new(Mutex::new(None)),
65            retry_config: RetryConfig::default(),
66        })
67    }
68
69    pub fn with_direct_sync(self, hostname: Option<String>) -> Self {
70        let origin = hostname.map(|h| format!("https://{h}"));
71        self.with_direct_sync_origin(origin)
72    }
73
74    pub fn with_direct_sync_origin(mut self, origin: Option<String>) -> Self {
75        self.direct_sync_origin = origin;
76        self
77    }
78
79    pub const fn with_retry_config(mut self, retry_config: RetryConfig) -> Self {
80        self.retry_config = retry_config;
81        self
82    }
83
84    pub(crate) fn direct_sync_origin(&self) -> Option<&str> {
85        self.direct_sync_origin.as_deref()
86    }
87
88    const fn is_direct_sync(&self) -> bool {
89        self.direct_sync_origin.is_some()
90    }
91
92    fn files_url(&self, tenant_id: &systemprompt_identifiers::TenantId) -> String {
93        self.direct_sync_origin.as_ref().map_or_else(
94            || format!("{}/api/v1/cloud/tenants/{}/files", self.api_url, tenant_id),
95            |origin| format!("{origin}/api/v1/sync/files"),
96        )
97    }
98
99    fn calculate_next_delay(&self, current: Duration) -> Duration {
100        self.retry_config.next_delay(current)
101    }
102
103    pub async fn upload_files(
104        &self,
105        tenant_id: &systemprompt_identifiers::TenantId,
106        data: Vec<u8>,
107    ) -> SyncResult<UploadResponse> {
108        let url = self.files_url(tenant_id);
109        let direct = self.is_direct_sync();
110        let mut bearer = self.bearer_token(false).await?;
111        let mut reminted = false;
112
113        let mut current_delay = self.retry_config.initial_delay;
114
115        for attempt in 1..=self.retry_config.max_attempts {
116            let response = self
117                .client
118                .post(&url)
119                .header("Authorization", format!("Bearer {bearer}"))
120                .header("Content-Type", "application/octet-stream")
121                .body(data.clone())
122                .send()
123                .await?;
124
125            match response::handle_json::<UploadResponse>(response).await {
126                Ok(upload) => return Ok(upload),
127                Err(error) if direct && !reminted && is_unauthorized(&error) => {
128                    reminted = true;
129                    bearer = self.bearer_token(true).await?;
130                },
131                Err(error) if error.is_retryable() && attempt < self.retry_config.max_attempts => {
132                    tracing::warn!(
133                        attempt = attempt,
134                        max_attempts = self.retry_config.max_attempts,
135                        delay_ms = current_delay.as_millis() as u64,
136                        error = %error,
137                        "Retryable sync error, waiting before retry"
138                    );
139                    sleep(current_delay).await;
140                    current_delay = self.calculate_next_delay(current_delay);
141                },
142                Err(error) => return Err(error),
143            }
144        }
145
146        Err(SyncError::ApiError {
147            status: 503,
148            message: "Max retry attempts exceeded".to_owned(),
149        })
150    }
151
152    pub async fn download_files(
153        &self,
154        tenant_id: &systemprompt_identifiers::TenantId,
155    ) -> SyncResult<Vec<u8>> {
156        let url = self.files_url(tenant_id);
157        let direct = self.is_direct_sync();
158        let mut bearer = self.bearer_token(false).await?;
159        let mut reminted = false;
160
161        let mut current_delay = self.retry_config.initial_delay;
162
163        for attempt in 1..=self.retry_config.max_attempts {
164            let response = self
165                .client
166                .get(&url)
167                .header("Authorization", format!("Bearer {bearer}"))
168                .send()
169                .await?;
170
171            match response::handle_binary(response).await {
172                Ok(data) => return Ok(data),
173                Err(error) if direct && !reminted && is_unauthorized(&error) => {
174                    reminted = true;
175                    bearer = self.bearer_token(true).await?;
176                },
177                Err(error) if error.is_retryable() && attempt < self.retry_config.max_attempts => {
178                    tracing::warn!(
179                        attempt = attempt,
180                        max_attempts = self.retry_config.max_attempts,
181                        delay_ms = current_delay.as_millis() as u64,
182                        error = %error,
183                        "Retryable sync error, waiting before retry"
184                    );
185                    sleep(current_delay).await;
186                    current_delay = self.calculate_next_delay(current_delay);
187                },
188                Err(error) => return Err(error),
189            }
190        }
191
192        Err(SyncError::ApiError {
193            status: 503,
194            message: "Max retry attempts exceeded".to_owned(),
195        })
196    }
197
198    pub async fn get_registry_token(
199        &self,
200        tenant_id: &systemprompt_identifiers::TenantId,
201    ) -> SyncResult<RegistryToken> {
202        let url = format!(
203            "{}/api/v1/cloud/tenants/{}/registry-token",
204            self.api_url, tenant_id
205        );
206        self.get(&url).await
207    }
208
209    pub async fn deploy(
210        &self,
211        tenant_id: &systemprompt_identifiers::TenantId,
212        image: &str,
213    ) -> SyncResult<DeployResponse> {
214        let url = format!("{}/api/v1/cloud/tenants/{}/deploy", self.api_url, tenant_id);
215        self.post(&url, &serde_json::json!({ "image": image }))
216            .await
217    }
218
219    pub async fn get_tenant_app_id(
220        &self,
221        tenant_id: &systemprompt_identifiers::TenantId,
222    ) -> SyncResult<String> {
223        #[derive(Deserialize)]
224        struct TenantInfo {
225            fly_app_name: Option<String>,
226        }
227        let url = format!("{}/api/v1/cloud/tenants/{}", self.api_url, tenant_id);
228        let info: TenantInfo = self.get(&url).await?;
229        info.fly_app_name.ok_or(SyncError::TenantNoApp)
230    }
231
232    pub async fn get_database_url(
233        &self,
234        tenant_id: &systemprompt_identifiers::TenantId,
235    ) -> SyncResult<String> {
236        #[derive(Deserialize)]
237        struct DatabaseInfo {
238            database_url: Option<String>,
239        }
240        let url = format!(
241            "{}/api/v1/cloud/tenants/{}/database",
242            self.api_url, tenant_id
243        );
244        let info: DatabaseInfo = self.get(&url).await?;
245        info.database_url.ok_or_else(|| SyncError::ApiError {
246            status: 404,
247            message: "Database URL not available for tenant".to_owned(),
248        })
249    }
250
251    pub(crate) async fn get<T: DeserializeOwned>(&self, url: &str) -> SyncResult<T> {
252        let resp = self
253            .client
254            .get(url)
255            .header("Authorization", format!("Bearer {}", self.token))
256            .send()
257            .await?;
258        response::handle_json(resp).await
259    }
260
261    pub(crate) async fn post<T: DeserializeOwned, B: Serialize + Sync>(
262        &self,
263        url: &str,
264        body: &B,
265    ) -> SyncResult<T> {
266        let resp = self
267            .client
268            .post(url)
269            .header("Authorization", format!("Bearer {}", self.token))
270            .json(body)
271            .send()
272            .await?;
273        response::handle_json(resp).await
274    }
275}