systemprompt_sync/api_client/
mod.rs1mod response;
8mod retry;
9
10use std::time::Duration;
11
12use reqwest::Client;
13use serde::de::DeserializeOwned;
14use serde::{Deserialize, Serialize};
15use systemprompt_models::net::{HTTP_CONNECT_TIMEOUT, HTTP_SYNC_DEPLOY_TIMEOUT};
16use tokio::time::sleep;
17
18use crate::error::{SyncError, SyncResult};
19pub use retry::RetryConfig;
20
21#[derive(Clone, Debug)]
22pub struct SyncApiClient {
23 client: Client,
24 api_url: String,
25 token: String,
26 hostname: Option<String>,
27 sync_token: Option<String>,
28 retry_config: RetryConfig,
29}
30
31#[derive(Debug, Deserialize)]
32pub struct RegistryToken {
33 pub registry: String,
34 pub username: String,
35 pub token: String,
36}
37
38#[derive(Debug, Clone, Copy, Deserialize)]
39pub struct UploadResponse {
40 pub files_uploaded: usize,
41}
42
43#[derive(Debug, Deserialize)]
44pub struct DeployResponse {
45 pub status: String,
46 pub app_url: Option<String>,
47}
48
49impl SyncApiClient {
50 pub fn new(api_url: &str, token: &str) -> SyncResult<Self> {
51 Ok(Self {
52 client: Client::builder()
53 .connect_timeout(HTTP_CONNECT_TIMEOUT)
54 .timeout(HTTP_SYNC_DEPLOY_TIMEOUT)
55 .build()?,
56 api_url: api_url.to_string(),
57 token: token.to_string(),
58 hostname: None,
59 sync_token: None,
60 retry_config: RetryConfig::default(),
61 })
62 }
63
64 pub fn with_direct_sync(
65 mut self,
66 hostname: Option<String>,
67 sync_token: Option<String>,
68 ) -> Self {
69 self.hostname = hostname;
70 self.sync_token = sync_token;
71 self
72 }
73
74 fn direct_sync_credentials(&self) -> Option<(String, String)> {
75 match (&self.hostname, &self.sync_token) {
76 (Some(hostname), Some(token)) => {
77 let url = format!("https://{}/api/v1/sync/files", hostname);
78 Some((url, token.clone()))
79 },
80 _ => None,
81 }
82 }
83
84 fn calculate_next_delay(&self, current: Duration) -> Duration {
85 self.retry_config.next_delay(current)
86 }
87
88 pub async fn upload_files(
89 &self,
90 tenant_id: &systemprompt_identifiers::TenantId,
91 data: Vec<u8>,
92 ) -> SyncResult<UploadResponse> {
93 let (url, token) = self.direct_sync_credentials().unwrap_or_else(|| {
94 (
95 format!("{}/api/v1/cloud/tenants/{}/files", self.api_url, tenant_id),
96 self.token.clone(),
97 )
98 });
99
100 let mut current_delay = self.retry_config.initial_delay;
101
102 for attempt in 1..=self.retry_config.max_attempts {
103 let response = self
104 .client
105 .post(&url)
106 .header("Authorization", format!("Bearer {}", token))
107 .header("Content-Type", "application/octet-stream")
108 .body(data.clone())
109 .send()
110 .await?;
111
112 match response::handle_json::<UploadResponse>(response).await {
113 Ok(upload) => return Ok(upload),
114 Err(error) if error.is_retryable() && attempt < self.retry_config.max_attempts => {
115 tracing::warn!(
116 attempt = attempt,
117 max_attempts = self.retry_config.max_attempts,
118 delay_ms = current_delay.as_millis() as u64,
119 error = %error,
120 "Retryable sync error, waiting before retry"
121 );
122 sleep(current_delay).await;
123 current_delay = self.calculate_next_delay(current_delay);
124 },
125 Err(error) => return Err(error),
126 }
127 }
128
129 Err(SyncError::ApiError {
130 status: 503,
131 message: "Max retry attempts exceeded".to_string(),
132 })
133 }
134
135 pub async fn download_files(
136 &self,
137 tenant_id: &systemprompt_identifiers::TenantId,
138 ) -> SyncResult<Vec<u8>> {
139 let (url, token) = self.direct_sync_credentials().unwrap_or_else(|| {
140 (
141 format!("{}/api/v1/cloud/tenants/{}/files", self.api_url, tenant_id),
142 self.token.clone(),
143 )
144 });
145
146 let mut current_delay = self.retry_config.initial_delay;
147
148 for attempt in 1..=self.retry_config.max_attempts {
149 let response = self
150 .client
151 .get(&url)
152 .header("Authorization", format!("Bearer {}", token))
153 .send()
154 .await?;
155
156 match response::handle_binary(response).await {
157 Ok(data) => return Ok(data),
158 Err(error) if error.is_retryable() && attempt < self.retry_config.max_attempts => {
159 tracing::warn!(
160 attempt = attempt,
161 max_attempts = self.retry_config.max_attempts,
162 delay_ms = current_delay.as_millis() as u64,
163 error = %error,
164 "Retryable sync error, waiting before retry"
165 );
166 sleep(current_delay).await;
167 current_delay = self.calculate_next_delay(current_delay);
168 },
169 Err(error) => return Err(error),
170 }
171 }
172
173 Err(SyncError::ApiError {
174 status: 503,
175 message: "Max retry attempts exceeded".to_string(),
176 })
177 }
178
179 pub async fn get_registry_token(
180 &self,
181 tenant_id: &systemprompt_identifiers::TenantId,
182 ) -> SyncResult<RegistryToken> {
183 let url = format!(
184 "{}/api/v1/cloud/tenants/{}/registry-token",
185 self.api_url, tenant_id
186 );
187 self.get(&url).await
188 }
189
190 pub async fn deploy(
191 &self,
192 tenant_id: &systemprompt_identifiers::TenantId,
193 image: &str,
194 ) -> SyncResult<DeployResponse> {
195 let url = format!("{}/api/v1/cloud/tenants/{}/deploy", self.api_url, tenant_id);
196 self.post(&url, &serde_json::json!({ "image": image }))
197 .await
198 }
199
200 pub async fn get_tenant_app_id(
201 &self,
202 tenant_id: &systemprompt_identifiers::TenantId,
203 ) -> SyncResult<String> {
204 #[derive(Deserialize)]
205 struct TenantInfo {
206 fly_app_name: Option<String>,
207 }
208 let url = format!("{}/api/v1/cloud/tenants/{}", self.api_url, tenant_id);
209 let info: TenantInfo = self.get(&url).await?;
210 info.fly_app_name.ok_or(SyncError::TenantNoApp)
211 }
212
213 pub async fn get_database_url(
214 &self,
215 tenant_id: &systemprompt_identifiers::TenantId,
216 ) -> SyncResult<String> {
217 #[derive(Deserialize)]
218 struct DatabaseInfo {
219 database_url: Option<String>,
220 }
221 let url = format!(
222 "{}/api/v1/cloud/tenants/{}/database",
223 self.api_url, tenant_id
224 );
225 let info: DatabaseInfo = self.get(&url).await?;
226 info.database_url.ok_or_else(|| SyncError::ApiError {
227 status: 404,
228 message: "Database URL not available for tenant".to_string(),
229 })
230 }
231
232 async fn get<T: DeserializeOwned>(&self, url: &str) -> SyncResult<T> {
233 let resp = self
234 .client
235 .get(url)
236 .header("Authorization", format!("Bearer {}", self.token))
237 .send()
238 .await?;
239 response::handle_json(resp).await
240 }
241
242 async fn post<T: DeserializeOwned, B: Serialize + Sync>(
243 &self,
244 url: &str,
245 body: &B,
246 ) -> SyncResult<T> {
247 let resp = self
248 .client
249 .post(url)
250 .header("Authorization", format!("Bearer {}", self.token))
251 .json(body)
252 .send()
253 .await?;
254 response::handle_json(resp).await
255 }
256}