1use crate::static_files::{StaticFile, collect_static_files};
2use anyhow::{Result, anyhow};
3use serde::{Deserialize, Serialize};
4use std::path::Path;
5
6#[derive(Serialize)]
7struct DeployInput<'a> {
8 project_id: &'a str,
9 build_id: &'a str,
10 files: Vec<DeployFile>,
11 jobs: &'a [CronJob],
12 cron_updated_at: &'a str,
13}
14
15#[derive(Serialize, Deserialize, Clone, Debug)]
16pub struct CronJob {
17 pub function: String,
18 pub every_minutes: u32,
19}
20
21#[derive(Serialize)]
22struct DeployFile {
23 path: String,
24 size: u64,
25}
26
27#[derive(Deserialize)]
28#[serde(tag = "t", rename_all_fields = "camelCase")]
29enum Deploy {
30 Ok {
31 presigned_put_url: String,
32 object_key: String,
33 static_uploads: Vec<StaticUpload>,
34 code_version: u64,
35 },
36 QuotaExceeded {
37 reason: String,
38 },
39 NotLoggedIn,
40 NotFound,
41 InternalError,
42}
43
44#[derive(Deserialize)]
45#[serde(rename_all = "camelCase")]
46struct StaticUpload {
47 path: String,
48 presigned_url: String,
49}
50
51#[derive(Serialize)]
52struct DeployStatusInput<'a> {
53 project_id: &'a str,
54 code_version: u64,
55}
56
57#[derive(Deserialize)]
58#[serde(tag = "t", rename_all_fields = "camelCase")]
59enum DeployStatus {
60 Done {
61 active_version: String,
62 pending_version: Option<String>,
63 pending_compiled: bool,
64 compiled_versions: Vec<String>,
65 },
66 Pending {
67 active_version: String,
68 pending_version: Option<String>,
69 pending_compiled: bool,
70 compiled_versions: Vec<String>,
71 },
72 NoActiveVersion,
73 NotLoggedIn,
74 NotFound,
75 InternalError,
76}
77
78#[allow(clippy::too_many_arguments)]
79pub async fn deploy_wasm(
80 control_url: &str,
81 token: &str,
82 project_id: &str,
83 build_id: &str,
84 bundle_tar_path: &Path,
85 jobs: &[CronJob],
86 cron_updated_at: &str,
87) -> Result<()> {
88 let client = reqwest::Client::new();
89 println!("project_id: {project_id}");
90
91 let DeployOk {
92 presigned_put_url,
93 object_key,
94 static_uploads: _,
95 code_version,
96 } = request_deploy(
97 &client,
98 control_url,
99 token,
100 project_id,
101 build_id,
102 Vec::new(),
103 jobs,
104 cron_updated_at,
105 )
106 .await?;
107
108 println!("uploading bundle to {object_key} (code_version={code_version})...");
109 upload_bundle(&client, &presigned_put_url, bundle_tar_path, code_version).await?;
110
111 poll_deploy_status(&client, control_url, token, project_id, code_version).await?;
112 println!("Deploy complete!");
113 Ok(())
114}
115
116struct DeployOk {
117 presigned_put_url: String,
118 object_key: String,
119 static_uploads: Vec<StaticUpload>,
120 code_version: u64,
121}
122
123#[allow(clippy::too_many_arguments)]
124pub async fn deploy_forte(
125 control_url: &str,
126 token: &str,
127 project_id: &str,
128 build_id: &str,
129 fe_dist_dir: &Path,
130 bundle_tar_path: &Path,
131 jobs: &[CronJob],
132 cron_updated_at: &str,
133) -> Result<()> {
134 let client = reqwest::Client::new();
135 println!("project_id: {project_id}");
136
137 let static_files = collect_static_files(fe_dist_dir)?;
138 let deploy_files: Vec<DeployFile> = static_files
139 .iter()
140 .map(|f| DeployFile {
141 path: f.relative_path.clone(),
142 size: f.size,
143 })
144 .collect();
145 println!(
146 "Requesting deploy ({} static asset(s))...",
147 deploy_files.len()
148 );
149
150 let DeployOk {
151 presigned_put_url,
152 object_key,
153 static_uploads,
154 code_version,
155 } = request_deploy(
156 &client,
157 control_url,
158 token,
159 project_id,
160 build_id,
161 deploy_files,
162 jobs,
163 cron_updated_at,
164 )
165 .await?;
166
167 if !static_files.is_empty() {
168 println!("Uploading {} static asset(s)...", static_files.len());
169 upload_static_assets(&client, &static_files, static_uploads).await?;
170 }
171
172 println!("uploading bundle to {object_key} (code_version={code_version})...");
173 upload_bundle(&client, &presigned_put_url, bundle_tar_path, code_version).await?;
174
175 poll_deploy_status(&client, control_url, token, project_id, code_version).await?;
176 println!("Deploy complete!");
177 Ok(())
178}
179
180#[allow(clippy::too_many_arguments)]
181async fn request_deploy(
182 client: &reqwest::Client,
183 control_url: &str,
184 token: &str,
185 project_id: &str,
186 build_id: &str,
187 files: Vec<DeployFile>,
188 jobs: &[CronJob],
189 cron_updated_at: &str,
190) -> Result<DeployOk> {
191 let deploy_url = format!(
192 "{}/__forte_action/deploy",
193 control_url.trim_end_matches('/')
194 );
195 let raw: Deploy = client
196 .post(&deploy_url)
197 .bearer_auth(token)
198 .json(&DeployInput {
199 project_id,
200 build_id,
201 files,
202 jobs,
203 cron_updated_at,
204 })
205 .send()
206 .await?
207 .error_for_status()
208 .map_err(|e| anyhow!("deploy failed: {e}"))?
209 .json()
210 .await?;
211 match raw {
212 Deploy::Ok {
213 presigned_put_url,
214 object_key,
215 static_uploads,
216 code_version,
217 } => Ok(DeployOk {
218 presigned_put_url,
219 object_key,
220 static_uploads,
221 code_version,
222 }),
223 Deploy::QuotaExceeded { reason } => Err(anyhow!("deploy quota exceeded: {reason}")),
224 Deploy::NotLoggedIn => Err(anyhow!("control rejected token; run `fn0 login` again.")),
225 Deploy::NotFound => Err(anyhow!("project '{project_id}' not found or not owned by you.")),
226 Deploy::InternalError => Err(anyhow!("deploy: server error; check fn0-control logs")),
227 }
228}
229
230async fn upload_bundle(
231 client: &reqwest::Client,
232 presigned_put_url: &str,
233 bundle_tar_path: &Path,
234 code_version: u64,
235) -> Result<()> {
236 let bundle_bytes = std::fs::read(bundle_tar_path)
237 .map_err(|e| anyhow!("Failed to read {}: {}", bundle_tar_path.display(), e))?;
238 let _ = code_version;
239 client
240 .put(presigned_put_url)
241 .body(bundle_bytes)
242 .send()
243 .await?
244 .error_for_status()
245 .map_err(|e| anyhow!("bundle upload failed: {e}"))?;
246 Ok(())
247}
248
249async fn upload_static_assets(
250 client: &reqwest::Client,
251 files: &[StaticFile],
252 uploads: Vec<StaticUpload>,
253) -> Result<()> {
254 use futures::StreamExt;
255 use std::collections::HashMap;
256
257 let mut url_for_path: HashMap<String, String> = HashMap::new();
258 for u in uploads {
259 url_for_path.insert(u.path, u.presigned_url);
260 }
261
262 let mut tasks = futures::stream::FuturesUnordered::new();
263 for file in files {
264 let url = url_for_path.remove(&file.relative_path).ok_or_else(|| {
265 anyhow!(
266 "control did not return presigned URL for {}",
267 file.relative_path
268 )
269 })?;
270 let bytes = std::fs::read(&file.absolute_path)
271 .map_err(|e| anyhow!("read {}: {}", file.absolute_path.display(), e))?;
272 let client = client.clone();
273 let content_type = file.content_type;
274 let path = file.relative_path.clone();
275 tasks.push(async move {
276 let resp = client
277 .put(&url)
278 .header("content-type", content_type)
279 .body(bytes)
280 .send()
281 .await
282 .map_err(|e| anyhow!("R2 PUT {}: {}", path, e))?;
283 resp.error_for_status()
284 .map_err(|e| anyhow!("R2 PUT {} HTTP error: {}", path, e))?;
285 Ok::<_, anyhow::Error>(())
286 });
287 }
288 while let Some(result) = tasks.next().await {
289 result?;
290 }
291 Ok(())
292}
293
294async fn poll_deploy_status(
295 client: &reqwest::Client,
296 control_url: &str,
297 token: &str,
298 project_id: &str,
299 code_version: u64,
300) -> Result<()> {
301 let url = format!(
302 "{}/__forte_action/deploy_status",
303 control_url.trim_end_matches('/')
304 );
305 let timeout = std::time::Duration::from_secs(600);
306 let start = std::time::Instant::now();
307 let mut last_state: Option<String> = None;
308
309 loop {
310 let raw: DeployStatus = client
311 .post(&url)
312 .bearer_auth(token)
313 .json(&DeployStatusInput {
314 project_id,
315 code_version,
316 })
317 .send()
318 .await?
319 .error_for_status()
320 .map_err(|e| anyhow!("deploy_status failed: {e}"))?
321 .json()
322 .await?;
323
324 match raw {
325 DeployStatus::Done {
326 active_version,
327 pending_version,
328 pending_compiled,
329 compiled_versions,
330 } => {
331 log_status_line(
332 &active_version,
333 &compiled_versions,
334 &pending_version,
335 pending_compiled,
336 &mut last_state,
337 );
338 return Ok(());
339 }
340 DeployStatus::Pending {
341 active_version,
342 pending_version,
343 pending_compiled,
344 compiled_versions,
345 } => {
346 log_status_line(
347 &active_version,
348 &compiled_versions,
349 &pending_version,
350 pending_compiled,
351 &mut last_state,
352 );
353 if start.elapsed() > timeout {
354 return Err(anyhow!(
355 "deploy_status timed out after {}s",
356 timeout.as_secs()
357 ));
358 }
359 }
360 DeployStatus::NoActiveVersion => {
361 return Err(anyhow!("control has no active fn0-wasmtime version yet"));
362 }
363 DeployStatus::NotLoggedIn => {
364 return Err(anyhow!("control rejected token; run `fn0 login` again."));
365 }
366 DeployStatus::NotFound => {
367 return Err(anyhow!(
368 "project '{project_id}' not found or not owned by you."
369 ));
370 }
371 DeployStatus::InternalError => {
372 return Err(anyhow!(
373 "deploy_status: server error; check fn0-control logs"
374 ));
375 }
376 }
377 }
378}
379
380fn log_status_line(
381 active_version: &str,
382 compiled_versions: &[String],
383 pending_version: &Option<String>,
384 pending_compiled: bool,
385 last_state: &mut Option<String>,
386) {
387 let state = format!(
388 "active={active_version} compiled={compiled_versions:?} pending={pending_version:?} pending_compiled={pending_compiled}",
389 );
390 if last_state.as_deref() != Some(&state) {
391 println!(" {state}");
392 *last_state = Some(state);
393 }
394}