kinetics 0.15.1

Kinetics is a hosting platform for Rust applications that allows you to deploy all types of workloads by writing **only Rust code**.
Documentation
use crate::api::upload;
use crate::api::{client::Client, func};
use crate::config::deploy::DeployConfig;
use crate::error::Error;
use crate::project::Project;
use base64::Engine as _;
use crc_fast::{CrcAlgorithm::Crc64Nvme, Digest};
use eyre::{eyre, ContextCompat, WrapErr};
use reqwest::StatusCode;
use std::collections::HashMap;
use std::path::PathBuf;
use std::process::Stdio;
use tokio::io::{AsyncBufReadExt, BufReader};

// Re-export types from kinetics-parser
pub use kinetics_parser::{Params, ParsedFunction, Role};

/// Represents a function in the project
#[derive(Clone, Debug)]
pub struct Function {
    /// The name of the function
    pub name: String,

    /// Whether the function is requested for deployment
    pub is_deploying: bool,

    /// The role of the workload
    pub role: Role,

    /// The workload-specific parameters of the function
    pub params: Params,

    /// The project that contains the function, it belongs to Crate in the build directory
    pub project: Project,
}

impl Function {
    /// Instantiate struct from parsed function data
    pub fn new(project: &Project, function: &ParsedFunction) -> eyre::Result<Self> {
        Ok(Function {
            name: function.func_name(false)?,
            is_deploying: false,
            project: project.clone(),
            params: function.params.clone(),
            role: function.role.clone(),
        })
    }

    /// Try to find a function by name in the vec of functions
    pub fn find_by_name(functions: &[Function], name: &str) -> eyre::Result<Function> {
        functions
            .iter()
            .find(|f| name.eq(&f.name))
            .wrap_err("No function with such name")
            .cloned()
    }

    pub fn set_is_deploying(mut self, is_deploying: bool) -> Self {
        self.is_deploying = is_deploying;
        self
    }

    /// A path to zip file generated by cargo-lambda
    pub fn bundle_path(&self) -> PathBuf {
        self.project
            .path
            .join("target")
            .join("lambda")
            .join(&self.name)
            .join("bootstrap.zip")
    }

    /// Call the /upload endpoint to get the presigned URL and upload the file
    /// Returns a boolean indicating whether the resource has been updated.
    pub async fn upload(
        &mut self,
        client: &Client,
        deploy_config: Option<&dyn DeployConfig>,
    ) -> eyre::Result<bool> {
        if let Some(config) = deploy_config {
            return config.upload(self).await;
        }

        let path = self.bundle_path();
        let data = tokio::fs::read(&path).await?;

        let mut digest = Digest::new(Crc64Nvme);
        digest.update(&data);
        let body = upload::Request {
            project_name: self.project.name.clone(),
            name: self.name.clone(),
            checksum: base64::prelude::BASE64_STANDARD.encode(digest.finalize().to_be_bytes()),
        };

        log::debug!(
            "Calling /upload with body:\n{}",
            serde_json::to_string_pretty(&body)?
        );

        let response = client
            .post("/upload")
            .json(&body)
            .send()
            .await
            .inspect_err(|e| log::error!("Upload request failed: {e:?}"))?;

        // If the file has not changed, skip the upload.
        if response.status() == StatusCode::NOT_MODIFIED {
            return Ok(false);
        }

        let text = response.text().await?;
        log::debug!("Got response for {}: {}", self.name, text);

        let presigned = serde_json::from_str::<upload::Response>(&text).inspect_err(|e| {
            log::error!("Failed to parse the response: {e:?}");
        })?;

        let public_client = reqwest::Client::new();

        public_client
            .put(&presigned.url)
            .body(data)
            .send()
            .await?
            .error_for_status()?;

        Ok(true)
    }

    /// Env vars to be added to function's runtime
    ///
    /// These are env vars assigned to the function in macro definition
    /// as well as those defined globally in .env file.
    pub fn environment(&self) -> HashMap<String, String> {
        let mut env = self.project.environment().clone();
        env.extend(self.params.environment().clone());
        env
    }

    /// URL to call the function
    ///
    /// Only relevant for endpoint type of functions.
    pub async fn url(&self) -> eyre::Result<String> {
        let url_path = match &self.params {
            Params::Endpoint(endpoint) => Ok(&endpoint.url_path),
            _ => Err(eyre!("Not an endpoint")),
        }?;

        Ok(format!(
            "{}{}",
            Project::fetch_one(&self.project.name).await?.url(),
            url_path
        ))
    }

    /// Get the function deployment status from the backend
    pub async fn status(&self, client: &Client) -> eyre::Result<Option<String>> {
        let result = client
            .post("/function/status")
            .json(&func::status::Request {
                project_name: self.project.name.clone(),
                function_name: self.name.clone(),
            })
            .send()
            .await
            .inspect_err(|err| log::error!("{err:?}"))
            .wrap_err(Error::new(
                "Network request failed",
                Some("Try again in a few seconds."),
            ))?;

        if result.status() != StatusCode::OK {
            return Err(Error::new(
                &format!(
                    "Function status request failed for {}/{}",
                    self.project.name.clone(),
                    self.name.clone()
                ),
                Some("Try again in a few seconds."),
            )
            .into());
        }

        let status: func::status::Response =
            result.json().await.wrap_err("Failed to parse response")?;

        Ok(status.last_modified)
    }
}

pub async fn build(
    functions: &[Function],
    total_progress: &indicatif::ProgressBar,
) -> eyre::Result<()> {
    let Some(Function { project, .. }) = functions.iter().next() else {
        return Err(eyre!("Attempted to build an empty function list"));
    };

    total_progress.set_message("Starting cargo...");
    let mut cmd = tokio::process::Command::new("cargo");
    cmd.arg("lambda")
        .arg("build")
        .arg("--release")
        .arg("--target")
        .arg("x86_64-unknown-linux-musl")
        .arg("--output-format")
        .arg("zip")
        .current_dir(&project.path)
        .stdout(Stdio::piped())
        .stderr(Stdio::piped());

    for function in functions {
        cmd.arg("--bin").arg(&function.name);
    }

    let mut child = cmd.spawn().wrap_err("Failed to execute the process")?;

    let mut is_failed = false;
    let mut error_message_lines = Vec::new();

    if let Some(stderr) = child.stderr.take() {
        let mut reader = BufReader::new(stderr).lines();
        let regex = regex::Regex::new(r"^[\t ]+")?;

        while let Some(line) = reader.next_line().await? {
            let trimmed = line.trim();

            if is_failed || trimmed.starts_with("error") || trimmed.starts_with("Error") {
                is_failed = true;
                error_message_lines.push(line);
                continue;
            };
            total_progress.set_message(regex.replace_all(&line, "").to_string());
        }
    }

    total_progress.set_message("");
    let status = child.wait().await?;

    if !status.success() {
        return Err(eyre!("{}", error_message_lines.join("\n")));
    }

    Ok(())
}