use crate::api::upload;
use crate::api::{client::Client, func};
use crate::config::deploy::DeployConfig;
use crate::error::Error;
use crate::project::Project;
use base64::Engine as _;
use crc_fast::{CrcAlgorithm::Crc64Nvme, Digest};
use eyre::{eyre, ContextCompat, WrapErr};
use reqwest::StatusCode;
use std::collections::HashMap;
use std::path::PathBuf;
use std::process::Stdio;
use tokio::io::{AsyncBufReadExt, BufReader};
pub use kinetics_parser::{Params, ParsedFunction, Role};
#[derive(Clone, Debug)]
pub struct Function {
pub name: String,
pub is_deploying: bool,
pub role: Role,
pub params: Params,
pub project: Project,
}
impl Function {
pub fn new(project: &Project, function: &ParsedFunction) -> eyre::Result<Self> {
Ok(Function {
name: function.func_name(false)?,
is_deploying: false,
project: project.clone(),
params: function.params.clone(),
role: function.role.clone(),
})
}
pub fn find_by_name(functions: &[Function], name: &str) -> eyre::Result<Function> {
functions
.iter()
.find(|f| name.eq(&f.name))
.wrap_err("No function with such name")
.cloned()
}
pub fn set_is_deploying(mut self, is_deploying: bool) -> Self {
self.is_deploying = is_deploying;
self
}
pub fn bundle_path(&self) -> PathBuf {
self.project
.path
.join("target")
.join("lambda")
.join(&self.name)
.join("bootstrap.zip")
}
pub async fn upload(
&mut self,
client: &Client,
deploy_config: Option<&dyn DeployConfig>,
) -> eyre::Result<bool> {
if let Some(config) = deploy_config {
return config.upload(self).await;
}
let path = self.bundle_path();
let data = tokio::fs::read(&path).await?;
let mut digest = Digest::new(Crc64Nvme);
digest.update(&data);
let body = upload::Request {
project_name: self.project.name.clone(),
name: self.name.clone(),
checksum: base64::prelude::BASE64_STANDARD.encode(digest.finalize().to_be_bytes()),
};
log::debug!(
"Calling /upload with body:\n{}",
serde_json::to_string_pretty(&body)?
);
let response = client
.post("/upload")
.json(&body)
.send()
.await
.inspect_err(|e| log::error!("Upload request failed: {e:?}"))?;
if response.status() == StatusCode::NOT_MODIFIED {
return Ok(false);
}
let text = response.text().await?;
log::debug!("Got response for {}: {}", self.name, text);
let presigned = serde_json::from_str::<upload::Response>(&text).inspect_err(|e| {
log::error!("Failed to parse the response: {e:?}");
})?;
let public_client = reqwest::Client::new();
public_client
.put(&presigned.url)
.body(data)
.send()
.await?
.error_for_status()?;
Ok(true)
}
pub fn environment(&self) -> HashMap<String, String> {
let mut env = self.project.environment().clone();
env.extend(self.params.environment().clone());
env
}
pub async fn url(&self) -> eyre::Result<String> {
let url_path = match &self.params {
Params::Endpoint(endpoint) => Ok(&endpoint.url_path),
_ => Err(eyre!("Not an endpoint")),
}?;
Ok(format!(
"{}{}",
Project::fetch_one(&self.project.name).await?.url(),
url_path
))
}
pub async fn status(&self, client: &Client) -> eyre::Result<Option<String>> {
let result = client
.post("/function/status")
.json(&func::status::Request {
project_name: self.project.name.clone(),
function_name: self.name.clone(),
})
.send()
.await
.inspect_err(|err| log::error!("{err:?}"))
.wrap_err(Error::new(
"Network request failed",
Some("Try again in a few seconds."),
))?;
if result.status() != StatusCode::OK {
return Err(Error::new(
&format!(
"Function status request failed for {}/{}",
self.project.name.clone(),
self.name.clone()
),
Some("Try again in a few seconds."),
)
.into());
}
let status: func::status::Response =
result.json().await.wrap_err("Failed to parse response")?;
Ok(status.last_modified)
}
}
pub async fn build(
functions: &[Function],
total_progress: &indicatif::ProgressBar,
) -> eyre::Result<()> {
let Some(Function { project, .. }) = functions.iter().next() else {
return Err(eyre!("Attempted to build an empty function list"));
};
total_progress.set_message("Starting cargo...");
let mut cmd = tokio::process::Command::new("cargo");
cmd.arg("lambda")
.arg("build")
.arg("--release")
.arg("--target")
.arg("x86_64-unknown-linux-musl")
.arg("--output-format")
.arg("zip")
.current_dir(&project.path)
.stdout(Stdio::piped())
.stderr(Stdio::piped());
for function in functions {
cmd.arg("--bin").arg(&function.name);
}
let mut child = cmd.spawn().wrap_err("Failed to execute the process")?;
let mut is_failed = false;
let mut error_message_lines = Vec::new();
if let Some(stderr) = child.stderr.take() {
let mut reader = BufReader::new(stderr).lines();
let regex = regex::Regex::new(r"^[\t ]+")?;
while let Some(line) = reader.next_line().await? {
let trimmed = line.trim();
if is_failed || trimmed.starts_with("error") || trimmed.starts_with("Error") {
is_failed = true;
error_message_lines.push(line);
continue;
};
total_progress.set_message(regex.replace_all(&line, "").to_string());
}
}
total_progress.set_message("");
let status = child.wait().await?;
if !status.success() {
return Err(eyre!("{}", error_message_lines.join("\n")));
}
Ok(())
}