Skip to main content

fn0_deploy/
lib.rs

1use anyhow::{Result, anyhow};
2use serde::{Deserialize, Serialize};
3use std::path::{Path, PathBuf};
4
5pub mod credentials;
6
7#[derive(Serialize)]
8struct NewProjectInput<'a> {
9    name: &'a str,
10}
11
12#[derive(Deserialize)]
13struct NewProjectRaw {
14    #[serde(rename = "Ok")]
15    ok: Option<NewProjectOk>,
16    #[serde(rename = "NotLoggedIn")]
17    not_logged_in: Option<()>,
18    #[serde(rename = "Error")]
19    error: Option<MessageErr>,
20}
21
22#[derive(Deserialize)]
23struct NewProjectOk {
24    project_id: String,
25}
26
27#[derive(Deserialize)]
28struct MessageErr {
29    message: String,
30}
31
32pub async fn ensure_project_id(
33    client: &reqwest::Client,
34    control_url: &str,
35    token: &str,
36    project_name: &str,
37    project_id: &mut Option<String>,
38) -> Result<String> {
39    if let Some(id) = project_id.as_ref() {
40        return Ok(id.clone());
41    }
42    let url = format!(
43        "{}/__forte_action/new_project",
44        control_url.trim_end_matches('/')
45    );
46    let resp = client
47        .post(&url)
48        .bearer_auth(token)
49        .json(&NewProjectInput { name: project_name })
50        .send()
51        .await?
52        .error_for_status()
53        .map_err(|e| anyhow!("new_project failed: {e}"))?;
54    let raw: NewProjectRaw = resp.json().await?;
55    let id = match raw {
56        NewProjectRaw { ok: Some(ok), .. } => ok.project_id,
57        NewProjectRaw {
58            not_logged_in: Some(_),
59            ..
60        } => return Err(anyhow!("control rejected token; run `fn0 login` again.")),
61        NewProjectRaw {
62            error: Some(err), ..
63        } => return Err(anyhow!("new_project: {}", err.message)),
64        _ => return Err(anyhow!("unexpected new_project response")),
65    };
66    *project_id = Some(id.clone());
67    Ok(id)
68}
69
70#[derive(Serialize)]
71struct DeployInput<'a> {
72    project_id: &'a str,
73    build_id: &'a str,
74    files: Vec<DeployFile>,
75    jobs: &'a [CronJob],
76    cron_updated_at: &'a str,
77}
78
79#[derive(Serialize, Deserialize, Clone, Debug)]
80pub struct CronJob {
81    pub function: String,
82    pub every_minutes: u32,
83}
84
85#[derive(Serialize)]
86struct DeployFile {
87    path: String,
88    size: u64,
89}
90
91#[derive(Deserialize)]
92struct DeployRaw {
93    #[serde(rename = "Ok")]
94    ok: Option<DeployOk>,
95    #[serde(rename = "QuotaExceeded")]
96    quota_exceeded: Option<QuotaExceededBody>,
97    #[serde(rename = "NotLoggedIn")]
98    not_logged_in: Option<()>,
99    #[serde(rename = "NotFound")]
100    not_found: Option<()>,
101    #[serde(rename = "Forbidden")]
102    forbidden: Option<()>,
103    #[serde(rename = "Error")]
104    error: Option<MessageErr>,
105}
106
107#[derive(Deserialize)]
108struct DeployOk {
109    presigned_put_url: String,
110    object_key: String,
111    static_uploads: Vec<StaticUpload>,
112}
113
114#[derive(Deserialize)]
115struct StaticUpload {
116    path: String,
117    presigned_url: String,
118}
119
120#[derive(Deserialize)]
121struct QuotaExceededBody {
122    reason: String,
123}
124
125#[derive(Serialize)]
126struct DeployStatusInput<'a> {
127    project_id: &'a str,
128    last_modified: &'a str,
129}
130
131#[derive(Deserialize)]
132struct DeployStatusRaw {
133    #[serde(rename = "Done")]
134    done: Option<DeployStatusBody>,
135    #[serde(rename = "Pending")]
136    pending: Option<DeployStatusBody>,
137    #[serde(rename = "NoActiveVersion")]
138    no_active_version: Option<()>,
139    #[serde(rename = "NotLoggedIn")]
140    not_logged_in: Option<()>,
141    #[serde(rename = "NotFound")]
142    not_found: Option<()>,
143    #[serde(rename = "Forbidden")]
144    forbidden: Option<()>,
145    #[serde(rename = "Error")]
146    error: Option<MessageErr>,
147}
148
149#[derive(Deserialize)]
150struct DeployStatusBody {
151    active_version: String,
152    pending_version: Option<String>,
153    pending_compiled: bool,
154    compiled_versions: Vec<String>,
155}
156
157#[allow(clippy::too_many_arguments)]
158pub async fn deploy_wasm(
159    control_url: &str,
160    token: &str,
161    project_name: &str,
162    project_id: &mut Option<String>,
163    build_id: &str,
164    bundle_tar_path: &Path,
165    jobs: &[CronJob],
166    cron_updated_at: &str,
167) -> Result<()> {
168    let client = reqwest::Client::new();
169    let project_id_resolved =
170        ensure_project_id(&client, control_url, token, project_name, project_id).await?;
171    println!("project_id: {project_id_resolved}");
172
173    let DeployOk {
174        presigned_put_url,
175        object_key,
176        static_uploads: _,
177    } = request_deploy(
178        &client,
179        control_url,
180        token,
181        &project_id_resolved,
182        build_id,
183        Vec::new(),
184        jobs,
185        cron_updated_at,
186    )
187    .await?;
188
189    println!("uploading bundle to {object_key}...");
190    let last_modified = upload_bundle(&client, &presigned_put_url, bundle_tar_path).await?;
191    println!("uploaded. last_modified={last_modified}");
192
193    poll_deploy_status(
194        &client,
195        control_url,
196        token,
197        &project_id_resolved,
198        &last_modified,
199    )
200    .await?;
201    println!("Deploy complete!");
202    Ok(())
203}
204
205#[allow(clippy::too_many_arguments)]
206pub async fn deploy_forte(
207    control_url: &str,
208    token: &str,
209    project_name: &str,
210    project_id: &mut Option<String>,
211    build_id: &str,
212    fe_dist_dir: &Path,
213    bundle_tar_path: &Path,
214    jobs: &[CronJob],
215    cron_updated_at: &str,
216) -> Result<()> {
217    let client = reqwest::Client::new();
218    let project_id_resolved =
219        ensure_project_id(&client, control_url, token, project_name, project_id).await?;
220    println!("project_id: {project_id_resolved}");
221
222    let static_files = collect_static_files(fe_dist_dir)?;
223    let deploy_files: Vec<DeployFile> = static_files
224        .iter()
225        .map(|f| DeployFile {
226            path: f.relative_path.clone(),
227            size: f.size,
228        })
229        .collect();
230    println!(
231        "Requesting deploy ({} static asset(s))...",
232        deploy_files.len()
233    );
234
235    let DeployOk {
236        presigned_put_url,
237        object_key,
238        static_uploads,
239    } = request_deploy(
240        &client,
241        control_url,
242        token,
243        &project_id_resolved,
244        build_id,
245        deploy_files,
246        jobs,
247        cron_updated_at,
248    )
249    .await?;
250
251    if !static_files.is_empty() {
252        println!("Uploading {} static asset(s)...", static_files.len());
253        upload_static_assets(&client, &static_files, static_uploads).await?;
254    }
255
256    println!("uploading bundle to {object_key}...");
257    let last_modified = upload_bundle(&client, &presigned_put_url, bundle_tar_path).await?;
258    println!("uploaded. last_modified={last_modified}");
259
260    poll_deploy_status(
261        &client,
262        control_url,
263        token,
264        &project_id_resolved,
265        &last_modified,
266    )
267    .await?;
268    println!("Deploy complete!");
269    Ok(())
270}
271
272#[allow(clippy::too_many_arguments)]
273async fn request_deploy(
274    client: &reqwest::Client,
275    control_url: &str,
276    token: &str,
277    project_id: &str,
278    build_id: &str,
279    files: Vec<DeployFile>,
280    jobs: &[CronJob],
281    cron_updated_at: &str,
282) -> Result<DeployOk> {
283    let deploy_url = format!(
284        "{}/__forte_action/deploy",
285        control_url.trim_end_matches('/')
286    );
287    let raw: DeployRaw = client
288        .post(&deploy_url)
289        .bearer_auth(token)
290        .json(&DeployInput {
291            project_id,
292            build_id,
293            files,
294            jobs,
295            cron_updated_at,
296        })
297        .send()
298        .await?
299        .error_for_status()
300        .map_err(|e| anyhow!("deploy failed: {e}"))?
301        .json()
302        .await?;
303    match raw {
304        DeployRaw { ok: Some(ok), .. } => Ok(ok),
305        DeployRaw {
306            quota_exceeded: Some(q),
307            ..
308        } => Err(anyhow!("deploy quota exceeded: {}", q.reason)),
309        DeployRaw {
310            not_logged_in: Some(_),
311            ..
312        } => Err(anyhow!("control rejected token; run `fn0 login` again.")),
313        DeployRaw {
314            not_found: Some(_), ..
315        } => Err(anyhow!("project not found")),
316        DeployRaw {
317            forbidden: Some(_), ..
318        } => Err(anyhow!("not the owner of this project")),
319        DeployRaw {
320            error: Some(err), ..
321        } => Err(anyhow!("deploy: {}", err.message)),
322        _ => Err(anyhow!("unexpected deploy response")),
323    }
324}
325
326async fn upload_bundle(
327    client: &reqwest::Client,
328    presigned_put_url: &str,
329    bundle_tar_path: &Path,
330) -> Result<String> {
331    let bundle_bytes = std::fs::read(bundle_tar_path)
332        .map_err(|e| anyhow!("Failed to read {}: {}", bundle_tar_path.display(), e))?;
333    let put_resp = client
334        .put(presigned_put_url)
335        .body(bundle_bytes)
336        .send()
337        .await?
338        .error_for_status()
339        .map_err(|e| anyhow!("bundle upload failed: {e}"))?;
340    extract_last_modified(&put_resp)
341}
342
343async fn upload_static_assets(
344    client: &reqwest::Client,
345    files: &[StaticFile],
346    uploads: Vec<StaticUpload>,
347) -> Result<()> {
348    use futures::StreamExt;
349    use std::collections::HashMap;
350
351    let mut url_for_path: HashMap<String, String> = HashMap::new();
352    for u in uploads {
353        url_for_path.insert(u.path, u.presigned_url);
354    }
355
356    let mut tasks = futures::stream::FuturesUnordered::new();
357    for file in files {
358        let url = url_for_path.remove(&file.relative_path).ok_or_else(|| {
359            anyhow!(
360                "control did not return presigned URL for {}",
361                file.relative_path
362            )
363        })?;
364        let bytes = std::fs::read(&file.absolute_path)
365            .map_err(|e| anyhow!("read {}: {}", file.absolute_path.display(), e))?;
366        let client = client.clone();
367        let content_type = file.content_type;
368        let path = file.relative_path.clone();
369        tasks.push(async move {
370            let resp = client
371                .put(&url)
372                .header("content-type", content_type)
373                .body(bytes)
374                .send()
375                .await
376                .map_err(|e| anyhow!("R2 PUT {}: {}", path, e))?;
377            resp.error_for_status()
378                .map_err(|e| anyhow!("R2 PUT {} HTTP error: {}", path, e))?;
379            Ok::<_, anyhow::Error>(())
380        });
381    }
382    while let Some(result) = tasks.next().await {
383        result?;
384    }
385    Ok(())
386}
387
388pub struct StaticFile {
389    pub relative_path: String,
390    pub absolute_path: PathBuf,
391    pub size: u64,
392    pub content_type: &'static str,
393}
394
395pub fn collect_static_files(dir: &Path) -> Result<Vec<StaticFile>> {
396    let mut out = Vec::new();
397    if !dir.exists() {
398        return Ok(out);
399    }
400    walk_collect(dir, dir, &mut out)?;
401    Ok(out)
402}
403
404fn walk_collect(base: &Path, dir: &Path, out: &mut Vec<StaticFile>) -> Result<()> {
405    for entry in std::fs::read_dir(dir)? {
406        let entry = entry?;
407        let path = entry.path();
408        if path.is_dir() {
409            if path.file_name().and_then(|s| s.to_str()) == Some("ssr")
410                && path.parent() == Some(base)
411            {
412                continue;
413            }
414            walk_collect(base, &path, out)?;
415            continue;
416        }
417        let metadata = entry.metadata()?;
418        let rel = path
419            .strip_prefix(base)
420            .map_err(|e| anyhow!("strip_prefix: {e}"))?
421            .to_string_lossy()
422            .replace('\\', "/");
423        out.push(StaticFile {
424            relative_path: rel,
425            absolute_path: path.clone(),
426            size: metadata.len(),
427            content_type: content_type_for(&path),
428        });
429    }
430    Ok(())
431}
432
433pub fn content_type_for(path: &Path) -> &'static str {
434    match path.extension().and_then(|e| e.to_str()) {
435        Some("html") => "text/html; charset=utf-8",
436        Some("css") => "text/css; charset=utf-8",
437        Some("js") | Some("mjs") | Some("cjs") => "application/javascript; charset=utf-8",
438        Some("json") => "application/json; charset=utf-8",
439        Some("map") => "application/json; charset=utf-8",
440        Some("png") => "image/png",
441        Some("jpg") | Some("jpeg") => "image/jpeg",
442        Some("gif") => "image/gif",
443        Some("svg") => "image/svg+xml",
444        Some("ico") => "image/x-icon",
445        Some("webp") => "image/webp",
446        Some("woff") => "font/woff",
447        Some("woff2") => "font/woff2",
448        Some("ttf") => "font/ttf",
449        Some("otf") => "font/otf",
450        Some("eot") => "application/vnd.ms-fontobject",
451        Some("txt") => "text/plain; charset=utf-8",
452        Some("xml") => "application/xml; charset=utf-8",
453        Some("pdf") => "application/pdf",
454        Some("mp4") => "video/mp4",
455        Some("webm") => "video/webm",
456        Some("mp3") => "audio/mpeg",
457        Some("wav") => "audio/wav",
458        _ => "application/octet-stream",
459    }
460}
461
462fn extract_last_modified(resp: &reqwest::Response) -> Result<String> {
463    let hv = resp
464        .headers()
465        .get(reqwest::header::LAST_MODIFIED)
466        .ok_or_else(|| anyhow!("R2 PUT response missing Last-Modified header"))?
467        .to_str()
468        .map_err(|e| anyhow!("Last-Modified not utf-8: {e}"))?;
469    let dt = chrono::DateTime::parse_from_rfc2822(hv)
470        .map_err(|e| anyhow!("Last-Modified parse: {e}; raw={hv}"))?;
471    Ok(dt
472        .with_timezone(&chrono::Utc)
473        .format("%Y-%m-%dT%H:%M:%SZ")
474        .to_string())
475}
476
477async fn poll_deploy_status(
478    client: &reqwest::Client,
479    control_url: &str,
480    token: &str,
481    project_id: &str,
482    last_modified: &str,
483) -> Result<()> {
484    let url = format!(
485        "{}/__forte_action/deploy_status",
486        control_url.trim_end_matches('/')
487    );
488    let timeout = std::time::Duration::from_secs(600);
489    let start = std::time::Instant::now();
490    let mut last_state: Option<String> = None;
491
492    loop {
493        let raw: DeployStatusRaw = client
494            .post(&url)
495            .bearer_auth(token)
496            .json(&DeployStatusInput {
497                project_id,
498                last_modified,
499            })
500            .send()
501            .await?
502            .error_for_status()
503            .map_err(|e| anyhow!("deploy_status failed: {e}"))?
504            .json()
505            .await?;
506
507        if let Some(body) = raw.done {
508            log_status_line(&body, &mut last_state);
509            return Ok(());
510        }
511        if raw.no_active_version.is_some() {
512            return Err(anyhow!("control has no active fn0-wasmtime version yet"));
513        }
514        if raw.not_logged_in.is_some() {
515            return Err(anyhow!("control rejected token; run `fn0 login` again."));
516        }
517        if raw.not_found.is_some() {
518            return Err(anyhow!("project not found"));
519        }
520        if raw.forbidden.is_some() {
521            return Err(anyhow!("not the owner of this project"));
522        }
523        if let Some(err) = raw.error {
524            return Err(anyhow!("deploy_status: {}", err.message));
525        }
526        if let Some(body) = raw.pending {
527            log_status_line(&body, &mut last_state);
528            if start.elapsed() > timeout {
529                return Err(anyhow!(
530                    "deploy_status timed out after {}s",
531                    timeout.as_secs()
532                ));
533            }
534            continue;
535        }
536        return Err(anyhow!("unexpected deploy_status response"));
537    }
538}
539
540fn log_status_line(body: &DeployStatusBody, last_state: &mut Option<String>) {
541    let state = format!(
542        "active={} compiled={:?} pending={:?} pending_compiled={}",
543        body.active_version, body.compiled_versions, body.pending_version, body.pending_compiled,
544    );
545    if last_state.as_deref() != Some(&state) {
546        println!("  {state}");
547        *last_state = Some(state);
548    }
549}
550
551pub fn read_env_yaml(project_dir: &Path) -> Result<Option<Vec<u8>>> {
552    let p = project_dir.join("env.yaml");
553    match std::fs::read(&p) {
554        Ok(content) => Ok(Some(content)),
555        Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
556        Err(e) => Err(anyhow!("Failed to read {}: {}", p.display(), e)),
557    }
558}
559
560pub fn create_raw_bundle_wasm(
561    wasm_path: &Path,
562    env_yaml: Option<&[u8]>,
563    output_path: &Path,
564) -> Result<()> {
565    let file = std::fs::File::create(output_path)
566        .map_err(|e| anyhow!("Failed to create {}: {}", output_path.display(), e))?;
567    let mut builder = tar::Builder::new(file);
568    append_bytes(&mut builder, "manifest.json", br#"{"kind":"wasm"}"#)?;
569    let wasm_bytes = std::fs::read(wasm_path)
570        .map_err(|e| anyhow!("Failed to read {}: {}", wasm_path.display(), e))?;
571    append_bytes(&mut builder, "backend.wasm", &wasm_bytes)?;
572    if let Some(env) = env_yaml {
573        append_bytes(&mut builder, "env.yaml", env)?;
574    }
575    builder.finish()?;
576    Ok(())
577}
578
579pub fn create_raw_bundle_forte(
580    dist_dir: &Path,
581    env_yaml: Option<&[u8]>,
582    output_path: &Path,
583) -> Result<()> {
584    let file = std::fs::File::create(output_path)
585        .map_err(|e| anyhow!("Failed to create {}: {}", output_path.display(), e))?;
586    let mut builder = tar::Builder::new(file);
587    append_bytes(&mut builder, "manifest.json", br#"{"kind":"wasmjs"}"#)?;
588
589    let backend_wasm = dist_dir.join("backend.wasm");
590    let wasm_bytes = std::fs::read(&backend_wasm)
591        .map_err(|e| anyhow!("Failed to read {}: {}", backend_wasm.display(), e))?;
592    append_bytes(&mut builder, "backend.wasm", &wasm_bytes)?;
593
594    let server_js = dist_dir.join("server.js");
595    let server_bytes = std::fs::read(&server_js)
596        .map_err(|e| anyhow!("Failed to read {}: {}", server_js.display(), e))?;
597    append_bytes(&mut builder, "entry.js", &server_bytes)?;
598
599    if let Some(env) = env_yaml {
600        append_bytes(&mut builder, "env.yaml", env)?;
601    }
602
603    builder.finish()?;
604    Ok(())
605}
606
607fn append_bytes<W: std::io::Write>(
608    builder: &mut tar::Builder<W>,
609    path: &str,
610    data: &[u8],
611) -> Result<()> {
612    let mut header = tar::Header::new_gnu();
613    header.set_size(data.len() as u64);
614    header.set_mode(0o644);
615    header.set_cksum();
616    builder
617        .append_data(&mut header, path, data)
618        .map_err(|e| anyhow!("tar append failed for {}: {}", path, e))?;
619    Ok(())
620}
621
622pub struct AdminRunOutput {
623    pub status: u16,
624    pub content_type: Option<String>,
625    pub body: Vec<u8>,
626}
627
628pub async fn admin_run(
629    _project_name: &str,
630    _task: &str,
631    _input_body: Vec<u8>,
632    _timeout_secs: u64,
633) -> Result<AdminRunOutput> {
634    Err(anyhow!(
635        "admin run is not yet migrated to control. See GitHub issue #4."
636    ))
637}
638
639pub async fn rename(_project_name: &str, _new_project_name: &str) -> Result<()> {
640    Err(anyhow!(
641        "rename is not yet migrated to control. See GitHub issue #5."
642    ))
643}
644
645pub async fn domain_add(_project_name: &str, _domain: &str) -> Result<()> {
646    Err(anyhow!("domain commands are not yet migrated to control."))
647}
648
649pub async fn domain_remove(_project_name: &str) -> Result<()> {
650    Err(anyhow!("domain commands are not yet migrated to control."))
651}
652
653pub async fn domain_status(_project_name: &str) -> Result<()> {
654    Err(anyhow!("domain commands are not yet migrated to control."))
655}