systemprompt_sync/api_client/
mod.rs1mod response;
8mod retry;
9mod token;
10
11use std::sync::Arc;
12use std::time::Duration;
13
14use reqwest::Client;
15use serde::de::DeserializeOwned;
16use serde::{Deserialize, Serialize};
17use systemprompt_models::net::{HTTP_CONNECT_TIMEOUT, HTTP_SYNC_DEPLOY_TIMEOUT};
18use tokio::sync::Mutex;
19use tokio::time::sleep;
20
21use crate::error::{SyncError, SyncResult};
22pub use retry::RetryConfig;
23use token::is_unauthorized;
24pub use token::{exchange_subject_token, exchange_subject_token_at};
25
26#[derive(Clone, Debug)]
27pub struct SyncApiClient {
28 client: Client,
29 api_url: String,
30 token: String,
31 direct_sync_origin: Option<String>,
32 cached_sync_token: Arc<Mutex<Option<String>>>,
33 retry_config: RetryConfig,
34}
35
36#[derive(Debug, Deserialize)]
37pub struct RegistryToken {
38 pub registry: String,
39 pub username: String,
40 pub token: String,
41}
42
43#[derive(Debug, Clone, Copy, Deserialize)]
44pub struct UploadResponse {
45 pub files_uploaded: usize,
46}
47
48#[derive(Debug, Deserialize)]
49pub struct DeployResponse {
50 pub status: String,
51 pub app_url: Option<String>,
52}
53
54impl SyncApiClient {
55 pub fn new(api_url: &str, token: &str) -> SyncResult<Self> {
56 Ok(Self {
57 client: Client::builder()
58 .connect_timeout(HTTP_CONNECT_TIMEOUT)
59 .timeout(HTTP_SYNC_DEPLOY_TIMEOUT)
60 .build()?,
61 api_url: api_url.to_owned(),
62 token: token.to_owned(),
63 direct_sync_origin: None,
64 cached_sync_token: Arc::new(Mutex::new(None)),
65 retry_config: RetryConfig::default(),
66 })
67 }
68
69 pub fn with_direct_sync(self, hostname: Option<String>) -> Self {
70 let origin = hostname.map(|h| format!("https://{h}"));
71 self.with_direct_sync_origin(origin)
72 }
73
74 pub fn with_direct_sync_origin(mut self, origin: Option<String>) -> Self {
75 self.direct_sync_origin = origin;
76 self
77 }
78
79 pub const fn with_retry_config(mut self, retry_config: RetryConfig) -> Self {
80 self.retry_config = retry_config;
81 self
82 }
83
84 pub(crate) fn direct_sync_origin(&self) -> Option<&str> {
85 self.direct_sync_origin.as_deref()
86 }
87
88 const fn is_direct_sync(&self) -> bool {
89 self.direct_sync_origin.is_some()
90 }
91
92 fn files_url(&self, tenant_id: &systemprompt_identifiers::TenantId) -> String {
93 self.direct_sync_origin.as_ref().map_or_else(
94 || format!("{}/api/v1/cloud/tenants/{}/files", self.api_url, tenant_id),
95 |origin| format!("{origin}/api/v1/sync/files"),
96 )
97 }
98
99 fn calculate_next_delay(&self, current: Duration) -> Duration {
100 self.retry_config.next_delay(current)
101 }
102
103 pub async fn upload_files(
104 &self,
105 tenant_id: &systemprompt_identifiers::TenantId,
106 data: Vec<u8>,
107 ) -> SyncResult<UploadResponse> {
108 let url = self.files_url(tenant_id);
109 let direct = self.is_direct_sync();
110 let mut bearer = self.bearer_token(false).await?;
111 let mut reminted = false;
112
113 let mut current_delay = self.retry_config.initial_delay;
114
115 for attempt in 1..=self.retry_config.max_attempts {
116 let response = self
117 .client
118 .post(&url)
119 .header("Authorization", format!("Bearer {bearer}"))
120 .header("Content-Type", "application/octet-stream")
121 .body(data.clone())
122 .send()
123 .await?;
124
125 match response::handle_json::<UploadResponse>(response).await {
126 Ok(upload) => return Ok(upload),
127 Err(error) if direct && !reminted && is_unauthorized(&error) => {
128 reminted = true;
129 bearer = self.bearer_token(true).await?;
130 },
131 Err(error) if error.is_retryable() && attempt < self.retry_config.max_attempts => {
132 tracing::warn!(
133 attempt = attempt,
134 max_attempts = self.retry_config.max_attempts,
135 delay_ms = current_delay.as_millis() as u64,
136 error = %error,
137 "Retryable sync error, waiting before retry"
138 );
139 sleep(current_delay).await;
140 current_delay = self.calculate_next_delay(current_delay);
141 },
142 Err(error) => return Err(error),
143 }
144 }
145
146 Err(SyncError::ApiError {
147 status: 503,
148 message: "Max retry attempts exceeded".to_owned(),
149 })
150 }
151
152 pub async fn download_files(
153 &self,
154 tenant_id: &systemprompt_identifiers::TenantId,
155 ) -> SyncResult<Vec<u8>> {
156 let url = self.files_url(tenant_id);
157 let direct = self.is_direct_sync();
158 let mut bearer = self.bearer_token(false).await?;
159 let mut reminted = false;
160
161 let mut current_delay = self.retry_config.initial_delay;
162
163 for attempt in 1..=self.retry_config.max_attempts {
164 let response = self
165 .client
166 .get(&url)
167 .header("Authorization", format!("Bearer {bearer}"))
168 .send()
169 .await?;
170
171 match response::handle_binary(response).await {
172 Ok(data) => return Ok(data),
173 Err(error) if direct && !reminted && is_unauthorized(&error) => {
174 reminted = true;
175 bearer = self.bearer_token(true).await?;
176 },
177 Err(error) if error.is_retryable() && attempt < self.retry_config.max_attempts => {
178 tracing::warn!(
179 attempt = attempt,
180 max_attempts = self.retry_config.max_attempts,
181 delay_ms = current_delay.as_millis() as u64,
182 error = %error,
183 "Retryable sync error, waiting before retry"
184 );
185 sleep(current_delay).await;
186 current_delay = self.calculate_next_delay(current_delay);
187 },
188 Err(error) => return Err(error),
189 }
190 }
191
192 Err(SyncError::ApiError {
193 status: 503,
194 message: "Max retry attempts exceeded".to_owned(),
195 })
196 }
197
198 pub async fn get_registry_token(
199 &self,
200 tenant_id: &systemprompt_identifiers::TenantId,
201 ) -> SyncResult<RegistryToken> {
202 let url = format!(
203 "{}/api/v1/cloud/tenants/{}/registry-token",
204 self.api_url, tenant_id
205 );
206 self.get(&url).await
207 }
208
209 pub async fn deploy(
210 &self,
211 tenant_id: &systemprompt_identifiers::TenantId,
212 image: &str,
213 ) -> SyncResult<DeployResponse> {
214 let url = format!("{}/api/v1/cloud/tenants/{}/deploy", self.api_url, tenant_id);
215 self.post(&url, &serde_json::json!({ "image": image }))
216 .await
217 }
218
219 pub async fn get_tenant_app_id(
220 &self,
221 tenant_id: &systemprompt_identifiers::TenantId,
222 ) -> SyncResult<String> {
223 #[derive(Deserialize)]
224 struct TenantInfo {
225 fly_app_name: Option<String>,
226 }
227 let url = format!("{}/api/v1/cloud/tenants/{}", self.api_url, tenant_id);
228 let info: TenantInfo = self.get(&url).await?;
229 info.fly_app_name.ok_or(SyncError::TenantNoApp)
230 }
231
232 pub async fn get_database_url(
233 &self,
234 tenant_id: &systemprompt_identifiers::TenantId,
235 ) -> SyncResult<String> {
236 #[derive(Deserialize)]
237 struct DatabaseInfo {
238 database_url: Option<String>,
239 }
240 let url = format!(
241 "{}/api/v1/cloud/tenants/{}/database",
242 self.api_url, tenant_id
243 );
244 let info: DatabaseInfo = self.get(&url).await?;
245 info.database_url.ok_or_else(|| SyncError::ApiError {
246 status: 404,
247 message: "Database URL not available for tenant".to_owned(),
248 })
249 }
250
251 pub(crate) async fn get<T: DeserializeOwned>(&self, url: &str) -> SyncResult<T> {
252 let resp = self
253 .client
254 .get(url)
255 .header("Authorization", format!("Bearer {}", self.token))
256 .send()
257 .await?;
258 response::handle_json(resp).await
259 }
260
261 pub(crate) async fn post<T: DeserializeOwned, B: Serialize + Sync>(
262 &self,
263 url: &str,
264 body: &B,
265 ) -> SyncResult<T> {
266 let resp = self
267 .client
268 .post(url)
269 .header("Authorization", format!("Bearer {}", self.token))
270 .json(body)
271 .send()
272 .await?;
273 response::handle_json(resp).await
274 }
275}