Skip to main content

fn0_deploy/
deploy.rs

1use crate::static_files::{StaticFile, collect_static_files};
2use anyhow::{Result, anyhow};
3use serde::{Deserialize, Serialize};
4use std::path::Path;
5
6#[derive(Serialize)]
7struct DeployInput<'a> {
8    project_id: &'a str,
9    build_id: &'a str,
10    files: Vec<DeployFile>,
11    jobs: &'a [CronJob],
12    cron_updated_at: &'a str,
13}
14
15#[derive(Serialize, Deserialize, Clone, Debug)]
16pub struct CronJob {
17    pub function: String,
18    pub every_minutes: u32,
19}
20
21#[derive(Serialize)]
22struct DeployFile {
23    path: String,
24    size: u64,
25}
26
27#[derive(Deserialize)]
28#[serde(tag = "t", rename_all_fields = "camelCase")]
29enum Deploy {
30    Ok {
31        presigned_put_url: String,
32        object_key: String,
33        static_uploads: Vec<StaticUpload>,
34        code_version: u64,
35    },
36    QuotaExceeded {
37        reason: String,
38    },
39    NotLoggedIn,
40    NotFound,
41    InternalError,
42}
43
44#[derive(Deserialize)]
45#[serde(rename_all = "camelCase")]
46struct StaticUpload {
47    path: String,
48    presigned_url: String,
49}
50
51#[derive(Serialize)]
52struct DeployStatusInput<'a> {
53    project_id: &'a str,
54    code_version: u64,
55}
56
57#[derive(Deserialize)]
58#[serde(tag = "t", rename_all_fields = "camelCase")]
59enum DeployStatus {
60    Done {
61        active_version: String,
62        pending_version: Option<String>,
63        pending_compiled: bool,
64        compiled_versions: Vec<String>,
65    },
66    Pending {
67        active_version: String,
68        pending_version: Option<String>,
69        pending_compiled: bool,
70        compiled_versions: Vec<String>,
71    },
72    NoActiveVersion,
73    NotLoggedIn,
74    NotFound,
75    InternalError,
76}
77
78#[allow(clippy::too_many_arguments)]
79pub async fn deploy_wasm(
80    control_url: &str,
81    token: &str,
82    project_id: &str,
83    build_id: &str,
84    bundle_tar_path: &Path,
85    jobs: &[CronJob],
86    cron_updated_at: &str,
87) -> Result<()> {
88    let client = reqwest::Client::new();
89    println!("project_id: {project_id}");
90
91    let DeployOk {
92        presigned_put_url,
93        object_key,
94        static_uploads: _,
95        code_version,
96    } = request_deploy(
97        &client,
98        control_url,
99        token,
100        project_id,
101        build_id,
102        Vec::new(),
103        jobs,
104        cron_updated_at,
105    )
106    .await?;
107
108    println!("uploading bundle to {object_key} (code_version={code_version})...");
109    upload_bundle(&client, &presigned_put_url, bundle_tar_path, code_version).await?;
110
111    poll_deploy_status(&client, control_url, token, project_id, code_version).await?;
112    println!("Deploy complete!");
113    Ok(())
114}
115
116struct DeployOk {
117    presigned_put_url: String,
118    object_key: String,
119    static_uploads: Vec<StaticUpload>,
120    code_version: u64,
121}
122
123#[allow(clippy::too_many_arguments)]
124pub async fn deploy_forte(
125    control_url: &str,
126    token: &str,
127    project_id: &str,
128    build_id: &str,
129    fe_dist_dir: &Path,
130    bundle_tar_path: &Path,
131    jobs: &[CronJob],
132    cron_updated_at: &str,
133) -> Result<()> {
134    let client = reqwest::Client::new();
135    println!("project_id: {project_id}");
136
137    let static_files = collect_static_files(fe_dist_dir)?;
138    let deploy_files: Vec<DeployFile> = static_files
139        .iter()
140        .map(|f| DeployFile {
141            path: f.relative_path.clone(),
142            size: f.size,
143        })
144        .collect();
145    println!(
146        "Requesting deploy ({} static asset(s))...",
147        deploy_files.len()
148    );
149
150    let DeployOk {
151        presigned_put_url,
152        object_key,
153        static_uploads,
154        code_version,
155    } = request_deploy(
156        &client,
157        control_url,
158        token,
159        project_id,
160        build_id,
161        deploy_files,
162        jobs,
163        cron_updated_at,
164    )
165    .await?;
166
167    if !static_files.is_empty() {
168        println!("Uploading {} static asset(s)...", static_files.len());
169        upload_static_assets(&client, &static_files, static_uploads).await?;
170    }
171
172    println!("uploading bundle to {object_key} (code_version={code_version})...");
173    upload_bundle(&client, &presigned_put_url, bundle_tar_path, code_version).await?;
174
175    poll_deploy_status(&client, control_url, token, project_id, code_version).await?;
176    println!("Deploy complete!");
177    Ok(())
178}
179
180#[allow(clippy::too_many_arguments)]
181async fn request_deploy(
182    client: &reqwest::Client,
183    control_url: &str,
184    token: &str,
185    project_id: &str,
186    build_id: &str,
187    files: Vec<DeployFile>,
188    jobs: &[CronJob],
189    cron_updated_at: &str,
190) -> Result<DeployOk> {
191    let deploy_url = format!(
192        "{}/__forte_action/deploy",
193        control_url.trim_end_matches('/')
194    );
195    let raw: Deploy = client
196        .post(&deploy_url)
197        .bearer_auth(token)
198        .json(&DeployInput {
199            project_id,
200            build_id,
201            files,
202            jobs,
203            cron_updated_at,
204        })
205        .send()
206        .await?
207        .error_for_status()
208        .map_err(|e| anyhow!("deploy failed: {e}"))?
209        .json()
210        .await?;
211    match raw {
212        Deploy::Ok {
213            presigned_put_url,
214            object_key,
215            static_uploads,
216            code_version,
217        } => Ok(DeployOk {
218            presigned_put_url,
219            object_key,
220            static_uploads,
221            code_version,
222        }),
223        Deploy::QuotaExceeded { reason } => Err(anyhow!("deploy quota exceeded: {reason}")),
224        Deploy::NotLoggedIn => Err(anyhow!("control rejected token; run `fn0 login` again.")),
225        Deploy::NotFound => Err(anyhow!("project '{project_id}' not found or not owned by you.")),
226        Deploy::InternalError => Err(anyhow!("deploy: server error; check fn0-control logs")),
227    }
228}
229
230async fn upload_bundle(
231    client: &reqwest::Client,
232    presigned_put_url: &str,
233    bundle_tar_path: &Path,
234    code_version: u64,
235) -> Result<()> {
236    let bundle_bytes = std::fs::read(bundle_tar_path)
237        .map_err(|e| anyhow!("Failed to read {}: {}", bundle_tar_path.display(), e))?;
238    let _ = code_version;
239    client
240        .put(presigned_put_url)
241        .body(bundle_bytes)
242        .send()
243        .await?
244        .error_for_status()
245        .map_err(|e| anyhow!("bundle upload failed: {e}"))?;
246    Ok(())
247}
248
249async fn upload_static_assets(
250    client: &reqwest::Client,
251    files: &[StaticFile],
252    uploads: Vec<StaticUpload>,
253) -> Result<()> {
254    use futures::StreamExt;
255    use std::collections::HashMap;
256
257    let mut url_for_path: HashMap<String, String> = HashMap::new();
258    for u in uploads {
259        url_for_path.insert(u.path, u.presigned_url);
260    }
261
262    let mut tasks = futures::stream::FuturesUnordered::new();
263    for file in files {
264        let url = url_for_path.remove(&file.relative_path).ok_or_else(|| {
265            anyhow!(
266                "control did not return presigned URL for {}",
267                file.relative_path
268            )
269        })?;
270        let bytes = std::fs::read(&file.absolute_path)
271            .map_err(|e| anyhow!("read {}: {}", file.absolute_path.display(), e))?;
272        let client = client.clone();
273        let content_type = file.content_type;
274        let path = file.relative_path.clone();
275        tasks.push(async move {
276            let resp = client
277                .put(&url)
278                .header("content-type", content_type)
279                .body(bytes)
280                .send()
281                .await
282                .map_err(|e| anyhow!("R2 PUT {}: {}", path, e))?;
283            resp.error_for_status()
284                .map_err(|e| anyhow!("R2 PUT {} HTTP error: {}", path, e))?;
285            Ok::<_, anyhow::Error>(())
286        });
287    }
288    while let Some(result) = tasks.next().await {
289        result?;
290    }
291    Ok(())
292}
293
294async fn poll_deploy_status(
295    client: &reqwest::Client,
296    control_url: &str,
297    token: &str,
298    project_id: &str,
299    code_version: u64,
300) -> Result<()> {
301    let url = format!(
302        "{}/__forte_action/deploy_status",
303        control_url.trim_end_matches('/')
304    );
305    let timeout = std::time::Duration::from_secs(600);
306    let start = std::time::Instant::now();
307    let mut last_state: Option<String> = None;
308
309    loop {
310        let raw: DeployStatus = client
311            .post(&url)
312            .bearer_auth(token)
313            .json(&DeployStatusInput {
314                project_id,
315                code_version,
316            })
317            .send()
318            .await?
319            .error_for_status()
320            .map_err(|e| anyhow!("deploy_status failed: {e}"))?
321            .json()
322            .await?;
323
324        match raw {
325            DeployStatus::Done {
326                active_version,
327                pending_version,
328                pending_compiled,
329                compiled_versions,
330            } => {
331                log_status_line(
332                    &active_version,
333                    &compiled_versions,
334                    &pending_version,
335                    pending_compiled,
336                    &mut last_state,
337                );
338                return Ok(());
339            }
340            DeployStatus::Pending {
341                active_version,
342                pending_version,
343                pending_compiled,
344                compiled_versions,
345            } => {
346                log_status_line(
347                    &active_version,
348                    &compiled_versions,
349                    &pending_version,
350                    pending_compiled,
351                    &mut last_state,
352                );
353                if start.elapsed() > timeout {
354                    return Err(anyhow!(
355                        "deploy_status timed out after {}s",
356                        timeout.as_secs()
357                    ));
358                }
359            }
360            DeployStatus::NoActiveVersion => {
361                return Err(anyhow!("control has no active fn0-wasmtime version yet"));
362            }
363            DeployStatus::NotLoggedIn => {
364                return Err(anyhow!("control rejected token; run `fn0 login` again."));
365            }
366            DeployStatus::NotFound => {
367                return Err(anyhow!(
368                    "project '{project_id}' not found or not owned by you."
369                ));
370            }
371            DeployStatus::InternalError => {
372                return Err(anyhow!(
373                    "deploy_status: server error; check fn0-control logs"
374                ));
375            }
376        }
377    }
378}
379
380fn log_status_line(
381    active_version: &str,
382    compiled_versions: &[String],
383    pending_version: &Option<String>,
384    pending_compiled: bool,
385    last_state: &mut Option<String>,
386) {
387    let state = format!(
388        "active={active_version} compiled={compiled_versions:?} pending={pending_version:?} pending_compiled={pending_compiled}",
389    );
390    if last_state.as_deref() != Some(&state) {
391        println!("  {state}");
392        *last_state = Some(state);
393    }
394}