fn0-deploy 0.1.10

Deploy client for fn0 cloud
Documentation
use crate::static_files::{StaticFile, collect_static_files};
use anyhow::{Result, anyhow};
use serde::{Deserialize, Serialize};
use std::path::Path;

#[derive(Serialize)]
struct DeployInput<'a> {
    project_id: &'a str,
    build_id: &'a str,
    files: Vec<DeployFile>,
    jobs: &'a [CronJob],
    cron_updated_at: &'a str,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct CronJob {
    pub function: String,
    pub every_minutes: u32,
}

#[derive(Serialize)]
struct DeployFile {
    path: String,
    size: u64,
}

#[derive(Deserialize)]
#[serde(tag = "t", rename_all_fields = "camelCase")]
enum Deploy {
    Ok {
        presigned_put_url: String,
        object_key: String,
        static_uploads: Vec<StaticUpload>,
        code_version: u64,
    },
    QuotaExceeded {
        reason: String,
    },
    NotLoggedIn,
    NotFound,
    InternalError,
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct StaticUpload {
    path: String,
    presigned_url: String,
}

#[derive(Serialize)]
struct DeployStatusInput<'a> {
    project_id: &'a str,
    code_version: u64,
}

#[derive(Deserialize)]
#[serde(tag = "t", rename_all_fields = "camelCase")]
enum DeployStatus {
    Done {
        active_version: String,
        pending_version: Option<String>,
        pending_compiled: bool,
        compiled_versions: Vec<String>,
    },
    Pending {
        active_version: String,
        pending_version: Option<String>,
        pending_compiled: bool,
        compiled_versions: Vec<String>,
    },
    NoActiveVersion,
    NotLoggedIn,
    NotFound,
    InternalError,
}

#[allow(clippy::too_many_arguments)]
pub async fn deploy_wasm(
    control_url: &str,
    token: &str,
    project_id: &str,
    build_id: &str,
    bundle_tar_path: &Path,
    jobs: &[CronJob],
    cron_updated_at: &str,
) -> Result<()> {
    let client = reqwest::Client::new();
    println!("project_id: {project_id}");

    let DeployOk {
        presigned_put_url,
        object_key,
        static_uploads: _,
        code_version,
    } = request_deploy(
        &client,
        control_url,
        token,
        project_id,
        build_id,
        Vec::new(),
        jobs,
        cron_updated_at,
    )
    .await?;

    println!("uploading bundle to {object_key} (code_version={code_version})...");
    upload_bundle(&client, &presigned_put_url, bundle_tar_path, code_version).await?;

    poll_deploy_status(&client, control_url, token, project_id, code_version).await?;
    println!("Deploy complete!");
    Ok(())
}

struct DeployOk {
    presigned_put_url: String,
    object_key: String,
    static_uploads: Vec<StaticUpload>,
    code_version: u64,
}

#[allow(clippy::too_many_arguments)]
pub async fn deploy_forte(
    control_url: &str,
    token: &str,
    project_id: &str,
    build_id: &str,
    fe_dist_dir: &Path,
    bundle_tar_path: &Path,
    jobs: &[CronJob],
    cron_updated_at: &str,
) -> Result<()> {
    let client = reqwest::Client::new();
    println!("project_id: {project_id}");

    let static_files = collect_static_files(fe_dist_dir)?;
    let deploy_files: Vec<DeployFile> = static_files
        .iter()
        .map(|f| DeployFile {
            path: f.relative_path.clone(),
            size: f.size,
        })
        .collect();
    println!(
        "Requesting deploy ({} static asset(s))...",
        deploy_files.len()
    );

    let DeployOk {
        presigned_put_url,
        object_key,
        static_uploads,
        code_version,
    } = request_deploy(
        &client,
        control_url,
        token,
        project_id,
        build_id,
        deploy_files,
        jobs,
        cron_updated_at,
    )
    .await?;

    if !static_files.is_empty() {
        println!("Uploading {} static asset(s)...", static_files.len());
        upload_static_assets(&client, &static_files, static_uploads).await?;
    }

    println!("uploading bundle to {object_key} (code_version={code_version})...");
    upload_bundle(&client, &presigned_put_url, bundle_tar_path, code_version).await?;

    poll_deploy_status(&client, control_url, token, project_id, code_version).await?;
    println!("Deploy complete!");
    Ok(())
}

#[allow(clippy::too_many_arguments)]
async fn request_deploy(
    client: &reqwest::Client,
    control_url: &str,
    token: &str,
    project_id: &str,
    build_id: &str,
    files: Vec<DeployFile>,
    jobs: &[CronJob],
    cron_updated_at: &str,
) -> Result<DeployOk> {
    let deploy_url = format!(
        "{}/__forte_action/deploy",
        control_url.trim_end_matches('/')
    );
    let raw: Deploy = client
        .post(&deploy_url)
        .bearer_auth(token)
        .json(&DeployInput {
            project_id,
            build_id,
            files,
            jobs,
            cron_updated_at,
        })
        .send()
        .await?
        .error_for_status()
        .map_err(|e| anyhow!("deploy failed: {e}"))?
        .json()
        .await?;
    match raw {
        Deploy::Ok {
            presigned_put_url,
            object_key,
            static_uploads,
            code_version,
        } => Ok(DeployOk {
            presigned_put_url,
            object_key,
            static_uploads,
            code_version,
        }),
        Deploy::QuotaExceeded { reason } => Err(anyhow!("deploy quota exceeded: {reason}")),
        Deploy::NotLoggedIn => Err(anyhow!("control rejected token; run `fn0 login` again.")),
        Deploy::NotFound => Err(anyhow!("project '{project_id}' not found or not owned by you.")),
        Deploy::InternalError => Err(anyhow!("deploy: server error; check fn0-control logs")),
    }
}

async fn upload_bundle(
    client: &reqwest::Client,
    presigned_put_url: &str,
    bundle_tar_path: &Path,
    code_version: u64,
) -> Result<()> {
    let bundle_bytes = std::fs::read(bundle_tar_path)
        .map_err(|e| anyhow!("Failed to read {}: {}", bundle_tar_path.display(), e))?;
    let _ = code_version;
    client
        .put(presigned_put_url)
        .body(bundle_bytes)
        .send()
        .await?
        .error_for_status()
        .map_err(|e| anyhow!("bundle upload failed: {e}"))?;
    Ok(())
}

async fn upload_static_assets(
    client: &reqwest::Client,
    files: &[StaticFile],
    uploads: Vec<StaticUpload>,
) -> Result<()> {
    use futures::StreamExt;
    use std::collections::HashMap;

    let mut url_for_path: HashMap<String, String> = HashMap::new();
    for u in uploads {
        url_for_path.insert(u.path, u.presigned_url);
    }

    let mut tasks = futures::stream::FuturesUnordered::new();
    for file in files {
        let url = url_for_path.remove(&file.relative_path).ok_or_else(|| {
            anyhow!(
                "control did not return presigned URL for {}",
                file.relative_path
            )
        })?;
        let bytes = std::fs::read(&file.absolute_path)
            .map_err(|e| anyhow!("read {}: {}", file.absolute_path.display(), e))?;
        let client = client.clone();
        let content_type = file.content_type;
        let path = file.relative_path.clone();
        tasks.push(async move {
            let resp = client
                .put(&url)
                .header("content-type", content_type)
                .body(bytes)
                .send()
                .await
                .map_err(|e| anyhow!("R2 PUT {}: {}", path, e))?;
            resp.error_for_status()
                .map_err(|e| anyhow!("R2 PUT {} HTTP error: {}", path, e))?;
            Ok::<_, anyhow::Error>(())
        });
    }
    while let Some(result) = tasks.next().await {
        result?;
    }
    Ok(())
}

async fn poll_deploy_status(
    client: &reqwest::Client,
    control_url: &str,
    token: &str,
    project_id: &str,
    code_version: u64,
) -> Result<()> {
    let url = format!(
        "{}/__forte_action/deploy_status",
        control_url.trim_end_matches('/')
    );
    let timeout = std::time::Duration::from_secs(600);
    let start = std::time::Instant::now();
    let mut last_state: Option<String> = None;

    loop {
        let raw: DeployStatus = client
            .post(&url)
            .bearer_auth(token)
            .json(&DeployStatusInput {
                project_id,
                code_version,
            })
            .send()
            .await?
            .error_for_status()
            .map_err(|e| anyhow!("deploy_status failed: {e}"))?
            .json()
            .await?;

        match raw {
            DeployStatus::Done {
                active_version,
                pending_version,
                pending_compiled,
                compiled_versions,
            } => {
                log_status_line(
                    &active_version,
                    &compiled_versions,
                    &pending_version,
                    pending_compiled,
                    &mut last_state,
                );
                return Ok(());
            }
            DeployStatus::Pending {
                active_version,
                pending_version,
                pending_compiled,
                compiled_versions,
            } => {
                log_status_line(
                    &active_version,
                    &compiled_versions,
                    &pending_version,
                    pending_compiled,
                    &mut last_state,
                );
                if start.elapsed() > timeout {
                    return Err(anyhow!(
                        "deploy_status timed out after {}s",
                        timeout.as_secs()
                    ));
                }
            }
            DeployStatus::NoActiveVersion => {
                return Err(anyhow!("control has no active fn0-wasmtime version yet"));
            }
            DeployStatus::NotLoggedIn => {
                return Err(anyhow!("control rejected token; run `fn0 login` again."));
            }
            DeployStatus::NotFound => {
                return Err(anyhow!(
                    "project '{project_id}' not found or not owned by you."
                ));
            }
            DeployStatus::InternalError => {
                return Err(anyhow!(
                    "deploy_status: server error; check fn0-control logs"
                ));
            }
        }
    }
}

fn log_status_line(
    active_version: &str,
    compiled_versions: &[String],
    pending_version: &Option<String>,
    pending_compiled: bool,
    last_state: &mut Option<String>,
) {
    let state = format!(
        "active={active_version} compiled={compiled_versions:?} pending={pending_version:?} pending_compiled={pending_compiled}",
    );
    if last_state.as_deref() != Some(&state) {
        println!("  {state}");
        *last_state = Some(state);
    }
}