systemprompt_sync/api_client/
mod.rs1mod response;
8mod retry;
9mod token;
10
11use std::sync::Arc;
12use std::time::Duration;
13
14use reqwest::Client;
15use serde::de::DeserializeOwned;
16use serde::{Deserialize, Serialize};
17use systemprompt_models::net::{HTTP_CONNECT_TIMEOUT, HTTP_SYNC_DEPLOY_TIMEOUT};
18use tokio::sync::Mutex;
19use tokio::time::sleep;
20
21use crate::error::{SyncError, SyncResult};
22pub use retry::RetryConfig;
23pub use token::exchange_subject_token;
24use token::is_unauthorized;
25
26#[derive(Clone, Debug)]
27pub struct SyncApiClient {
28 client: Client,
29 api_url: String,
30 token: String,
31 hostname: Option<String>,
32 cached_sync_token: Arc<Mutex<Option<String>>>,
33 retry_config: RetryConfig,
34}
35
36#[derive(Debug, Deserialize)]
37pub struct RegistryToken {
38 pub registry: String,
39 pub username: String,
40 pub token: String,
41}
42
43#[derive(Debug, Clone, Copy, Deserialize)]
44pub struct UploadResponse {
45 pub files_uploaded: usize,
46}
47
48#[derive(Debug, Deserialize)]
49pub struct DeployResponse {
50 pub status: String,
51 pub app_url: Option<String>,
52}
53
54impl SyncApiClient {
55 pub fn new(api_url: &str, token: &str) -> SyncResult<Self> {
56 Ok(Self {
57 client: Client::builder()
58 .connect_timeout(HTTP_CONNECT_TIMEOUT)
59 .timeout(HTTP_SYNC_DEPLOY_TIMEOUT)
60 .build()?,
61 api_url: api_url.to_string(),
62 token: token.to_string(),
63 hostname: None,
64 cached_sync_token: Arc::new(Mutex::new(None)),
65 retry_config: RetryConfig::default(),
66 })
67 }
68
69 pub fn with_direct_sync(mut self, hostname: Option<String>) -> Self {
70 self.hostname = hostname;
71 self
72 }
73
74 const fn is_direct_sync(&self) -> bool {
75 self.hostname.is_some()
76 }
77
78 fn files_url(&self, tenant_id: &systemprompt_identifiers::TenantId) -> String {
79 self.hostname.as_ref().map_or_else(
80 || format!("{}/api/v1/cloud/tenants/{}/files", self.api_url, tenant_id),
81 |hostname| format!("https://{hostname}/api/v1/sync/files"),
82 )
83 }
84
85 fn calculate_next_delay(&self, current: Duration) -> Duration {
86 self.retry_config.next_delay(current)
87 }
88
89 pub async fn upload_files(
90 &self,
91 tenant_id: &systemprompt_identifiers::TenantId,
92 data: Vec<u8>,
93 ) -> SyncResult<UploadResponse> {
94 let url = self.files_url(tenant_id);
95 let direct = self.is_direct_sync();
96 let mut bearer = self.bearer_token(false).await?;
97 let mut reminted = false;
98
99 let mut current_delay = self.retry_config.initial_delay;
100
101 for attempt in 1..=self.retry_config.max_attempts {
102 let response = self
103 .client
104 .post(&url)
105 .header("Authorization", format!("Bearer {bearer}"))
106 .header("Content-Type", "application/octet-stream")
107 .body(data.clone())
108 .send()
109 .await?;
110
111 match response::handle_json::<UploadResponse>(response).await {
112 Ok(upload) => return Ok(upload),
113 Err(error) if direct && !reminted && is_unauthorized(&error) => {
114 reminted = true;
115 bearer = self.bearer_token(true).await?;
116 },
117 Err(error) if error.is_retryable() && attempt < self.retry_config.max_attempts => {
118 tracing::warn!(
119 attempt = attempt,
120 max_attempts = self.retry_config.max_attempts,
121 delay_ms = current_delay.as_millis() as u64,
122 error = %error,
123 "Retryable sync error, waiting before retry"
124 );
125 sleep(current_delay).await;
126 current_delay = self.calculate_next_delay(current_delay);
127 },
128 Err(error) => return Err(error),
129 }
130 }
131
132 Err(SyncError::ApiError {
133 status: 503,
134 message: "Max retry attempts exceeded".to_string(),
135 })
136 }
137
138 pub async fn download_files(
139 &self,
140 tenant_id: &systemprompt_identifiers::TenantId,
141 ) -> SyncResult<Vec<u8>> {
142 let url = self.files_url(tenant_id);
143 let direct = self.is_direct_sync();
144 let mut bearer = self.bearer_token(false).await?;
145 let mut reminted = false;
146
147 let mut current_delay = self.retry_config.initial_delay;
148
149 for attempt in 1..=self.retry_config.max_attempts {
150 let response = self
151 .client
152 .get(&url)
153 .header("Authorization", format!("Bearer {bearer}"))
154 .send()
155 .await?;
156
157 match response::handle_binary(response).await {
158 Ok(data) => return Ok(data),
159 Err(error) if direct && !reminted && is_unauthorized(&error) => {
160 reminted = true;
161 bearer = self.bearer_token(true).await?;
162 },
163 Err(error) if error.is_retryable() && attempt < self.retry_config.max_attempts => {
164 tracing::warn!(
165 attempt = attempt,
166 max_attempts = self.retry_config.max_attempts,
167 delay_ms = current_delay.as_millis() as u64,
168 error = %error,
169 "Retryable sync error, waiting before retry"
170 );
171 sleep(current_delay).await;
172 current_delay = self.calculate_next_delay(current_delay);
173 },
174 Err(error) => return Err(error),
175 }
176 }
177
178 Err(SyncError::ApiError {
179 status: 503,
180 message: "Max retry attempts exceeded".to_string(),
181 })
182 }
183
184 pub async fn get_registry_token(
185 &self,
186 tenant_id: &systemprompt_identifiers::TenantId,
187 ) -> SyncResult<RegistryToken> {
188 let url = format!(
189 "{}/api/v1/cloud/tenants/{}/registry-token",
190 self.api_url, tenant_id
191 );
192 self.get(&url).await
193 }
194
195 pub async fn deploy(
196 &self,
197 tenant_id: &systemprompt_identifiers::TenantId,
198 image: &str,
199 ) -> SyncResult<DeployResponse> {
200 let url = format!("{}/api/v1/cloud/tenants/{}/deploy", self.api_url, tenant_id);
201 self.post(&url, &serde_json::json!({ "image": image }))
202 .await
203 }
204
205 pub async fn get_tenant_app_id(
206 &self,
207 tenant_id: &systemprompt_identifiers::TenantId,
208 ) -> SyncResult<String> {
209 #[derive(Deserialize)]
210 struct TenantInfo {
211 fly_app_name: Option<String>,
212 }
213 let url = format!("{}/api/v1/cloud/tenants/{}", self.api_url, tenant_id);
214 let info: TenantInfo = self.get(&url).await?;
215 info.fly_app_name.ok_or(SyncError::TenantNoApp)
216 }
217
218 pub async fn get_database_url(
219 &self,
220 tenant_id: &systemprompt_identifiers::TenantId,
221 ) -> SyncResult<String> {
222 #[derive(Deserialize)]
223 struct DatabaseInfo {
224 database_url: Option<String>,
225 }
226 let url = format!(
227 "{}/api/v1/cloud/tenants/{}/database",
228 self.api_url, tenant_id
229 );
230 let info: DatabaseInfo = self.get(&url).await?;
231 info.database_url.ok_or_else(|| SyncError::ApiError {
232 status: 404,
233 message: "Database URL not available for tenant".to_string(),
234 })
235 }
236
237 async fn get<T: DeserializeOwned>(&self, url: &str) -> SyncResult<T> {
238 let resp = self
239 .client
240 .get(url)
241 .header("Authorization", format!("Bearer {}", self.token))
242 .send()
243 .await?;
244 response::handle_json(resp).await
245 }
246
247 async fn post<T: DeserializeOwned, B: Serialize + Sync>(
248 &self,
249 url: &str,
250 body: &B,
251 ) -> SyncResult<T> {
252 let resp = self
253 .client
254 .post(url)
255 .header("Authorization", format!("Bearer {}", self.token))
256 .json(body)
257 .send()
258 .await?;
259 response::handle_json(resp).await
260 }
261}