use crate::blob::{self, BLOB};
use crate::progress::{NoProgress, Progress};
use crate::target::Target;
use anyhow::{anyhow, bail, Context, Result};
use async_trait::async_trait;
use flate2::write::GzEncoder;
use futures_util::{stream, StreamExt, TryStreamExt};
use serde_derive::Deserialize;
use std::collections::BTreeMap;
use std::convert::TryInto;
use std::fs::{File, OpenOptions};
use std::path::{Path, PathBuf};
use tar::Builder;
use tokio::io::{AsyncSeekExt, AsyncWriteExt};
#[async_trait]
trait AsyncAppendFile {
async fn append_file_async<P>(&mut self, path: P, file: &mut File) -> std::io::Result<()>
where
P: AsRef<Path> + Send;
async fn append_path_with_name_async<P, N>(&mut self, path: P, name: N) -> std::io::Result<()>
where
P: AsRef<Path> + Send,
N: AsRef<Path> + Send;
async fn append_dir_all_async<P, Q>(&mut self, path: P, src_path: Q) -> std::io::Result<()>
where
P: AsRef<Path> + Send,
Q: AsRef<Path> + Send;
}
#[async_trait]
impl<W: std::io::Write + Send> AsyncAppendFile for Builder<W> {
async fn append_file_async<P>(&mut self, path: P, file: &mut File) -> std::io::Result<()>
where
P: AsRef<Path> + Send,
{
tokio::task::block_in_place(move || self.append_file(path, file))
}
async fn append_path_with_name_async<P, N>(&mut self, path: P, name: N) -> std::io::Result<()>
where
P: AsRef<Path> + Send,
N: AsRef<Path> + Send,
{
tokio::task::block_in_place(move || self.append_path_with_name(path, name))
}
async fn append_dir_all_async<P, Q>(&mut self, path: P, src_path: Q) -> std::io::Result<()>
where
P: AsRef<Path> + Send,
Q: AsRef<Path> + Send,
{
tokio::task::block_in_place(move || self.append_dir_all(path, src_path))
}
}
fn create_tarfile<P: AsRef<Path> + std::fmt::Debug>(tarfile: P) -> Result<File> {
OpenOptions::new()
.write(true)
.read(true)
.truncate(true)
.create(true)
.open(tarfile.as_ref())
.map_err(|err| anyhow!("Cannot create tarfile {:?}: {}", tarfile, err))
}
fn open_tarfile<P: AsRef<Path> + std::fmt::Debug>(tarfile: P) -> Result<File> {
OpenOptions::new()
.read(true)
.open(tarfile.as_ref())
.map_err(|err| anyhow!("Cannot open tarfile {:?}: {}", tarfile, err))
}
fn archive_path(path: &Path) -> Result<PathBuf> {
let leading_slash = std::path::MAIN_SEPARATOR.to_string();
Ok(Path::new("root").join(path.strip_prefix(leading_slash)?))
}
fn add_directory_and_parents<W: std::io::Write>(
archive: &mut tar::Builder<W>,
to: &Path,
) -> Result<()> {
let mut parents: Vec<&Path> = to.ancestors().collect::<Vec<&Path>>();
parents.reverse();
if to.is_relative() {
return Err(anyhow!(
"Cannot add 'to = {}'; absolute path required",
to.to_string_lossy()
));
}
for parent in parents {
let dst = archive_path(parent)?;
archive.append_dir(&dst, ".")?;
}
Ok(())
}
async fn add_package_to_zone_archive(
archive: &mut tar::Builder<GzEncoder<File>>,
package_path: &Path,
) -> Result<()> {
let tmp = tempfile::tempdir()?;
let gzr = flate2::read::GzDecoder::new(open_tarfile(package_path)?);
if gzr.header().is_none() {
return Err(anyhow!(
"Missing gzip header from {} - cannot add it to zone image",
package_path.display()
));
}
let mut component_reader = tar::Archive::new(gzr);
let entries = component_reader.entries()?;
for entry in entries {
let mut entry = entry?;
let entry_path = entry.path()?;
if entry_path == Path::new("oxide.json") {
continue;
}
let entry_unpack_path = tmp.path().join(entry_path.strip_prefix("root/")?);
entry.unpack(&entry_unpack_path)?;
let entry_path = entry.path()?;
assert!(entry_unpack_path.exists());
archive
.append_path_with_name_async(entry_unpack_path, entry_path)
.await?;
}
Ok(())
}
#[derive(Clone, Deserialize, Debug, PartialEq)]
pub struct PrebuiltBlob {
pub repo: String,
pub series: String,
pub commit: String,
pub artifact: String,
pub sha256: String,
}
#[derive(Clone, Deserialize, Debug, PartialEq)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum PackageSource {
Local {
blobs: Option<Vec<PathBuf>>,
buildomat_blobs: Option<Vec<PrebuiltBlob>>,
rust: Option<RustPackage>,
#[serde(default)]
paths: Vec<MappedPath>,
},
Prebuilt {
repo: String,
commit: String,
sha256: String,
},
Composite { packages: Vec<String> },
Manual,
}
impl PackageSource {
fn rust_package(&self) -> Option<&RustPackage> {
match self {
PackageSource::Local {
rust: Some(rust_pkg),
..
} => Some(rust_pkg),
_ => None,
}
}
fn blobs(&self) -> Option<&[PathBuf]> {
match self {
PackageSource::Local {
blobs: Some(blobs), ..
} => Some(blobs),
_ => None,
}
}
fn buildomat_blobs(&self) -> Option<&[PrebuiltBlob]> {
match self {
PackageSource::Local {
buildomat_blobs: Some(buildomat_blobs),
..
} => Some(buildomat_blobs),
_ => None,
}
}
}
#[derive(Deserialize, Debug, Clone, PartialEq)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum PackageOutput {
Zone {
#[serde(default)]
intermediate_only: bool,
},
Tarball,
}
#[derive(Clone, Deserialize, Debug, PartialEq)]
pub struct Package {
pub service_name: String,
pub source: PackageSource,
pub output: PackageOutput,
pub only_for_targets: Option<BTreeMap<String, String>>,
#[serde(default)]
pub setup_hint: Option<String>,
}
const DEFAULT_VERSION: semver::Version = semver::Version::new(0, 0, 0);
async fn new_zone_archive_builder(
package_name: &str,
output_directory: &Path,
version: Option<&semver::Version>,
) -> Result<tar::Builder<GzEncoder<File>>> {
let tarfile = output_directory.join(format!("{}.tar.gz", package_name));
let file = create_tarfile(tarfile)?;
let gzw = GzEncoder::new(file, flate2::Compression::fast());
let mut archive = Builder::new(gzw);
archive.mode(tar::HeaderMode::Deterministic);
let mut root_json = tokio::fs::File::from_std(tempfile::tempfile()?);
let version = version.cloned().unwrap_or(DEFAULT_VERSION);
let version = &version.to_string();
let kvs = vec![
("v", "1"),
("t", "layer"),
("pkg", package_name),
("version", version),
];
let contents = String::from("{")
+ &kvs
.into_iter()
.map(|(k, v)| format!("\"{k}\":\"{v}\""))
.collect::<Vec<String>>()
.join(",")
+ "}";
root_json.write_all(contents.as_bytes()).await?;
root_json.seek(std::io::SeekFrom::Start(0)).await?;
archive
.append_file_async(Path::new("oxide.json"), &mut root_json.into_std().await)
.await?;
Ok(archive)
}
impl Package {
pub fn get_output_path(&self, name: &str, output_directory: &Path) -> PathBuf {
output_directory.join(self.get_output_file(name))
}
pub fn get_stamped_output_path(&self, name: &str, output_directory: &Path) -> PathBuf {
output_directory
.join("versioned")
.join(self.get_output_file(name))
}
pub fn get_output_file(&self, name: &str) -> String {
match self.output {
PackageOutput::Zone { .. } => format!("{}.tar.gz", name),
PackageOutput::Tarball => format!("{}.tar", name),
}
}
#[deprecated(note = "Call Self::create_for_target instead")]
pub async fn create(&self, name: &str, output_directory: &Path) -> Result<File> {
let null_target = Target(BTreeMap::new());
self.create_internal(&null_target, &NoProgress, name, output_directory)
.await
}
pub async fn create_for_target(
&self,
target: &Target,
name: &str,
output_directory: &Path,
) -> Result<File> {
self.create_internal(target, &NoProgress, name, output_directory)
.await
}
pub async fn stamp(
&self,
name: &str,
output_directory: &Path,
version: &semver::Version,
) -> Result<PathBuf> {
let stamp_path = self.get_stamped_output_path(name, output_directory);
std::fs::create_dir_all(stamp_path.parent().unwrap())?;
match self.output {
PackageOutput::Zone { .. } => {
let mut archive =
new_zone_archive_builder(name, stamp_path.parent().unwrap(), Some(version))
.await?;
let package_path = self.get_output_path(name, output_directory);
add_package_to_zone_archive(&mut archive, &package_path).await?;
archive
.into_inner()
.map_err(|err| anyhow!("Failed to finalize archive: {}", err))?
.finish()?;
}
PackageOutput::Tarball => {
let original_file = self.get_output_path(name, output_directory);
let mut reader = tar::Archive::new(open_tarfile(&original_file)?);
let tmp = tempfile::tempdir()?;
reader.unpack(tmp.path())?;
if let Err(err) = std::fs::remove_file(tmp.path().join("VERSION")) {
if err.kind() != std::io::ErrorKind::NotFound {
return Err(err.into());
}
}
let file = create_tarfile(&stamp_path)?;
let mut archive = Builder::new(file);
archive.mode(tar::HeaderMode::Deterministic);
archive.append_dir_all_async(".", tmp.path()).await?;
self.add_stamp_to_tarball_package(&mut archive, version)
.await?;
archive.finish()?;
}
}
Ok(stamp_path)
}
#[deprecated(note = "Call Self::get_total_work_for_target instead")]
pub fn get_total_work(&self) -> u64 {
let null_target = Target(BTreeMap::new());
self.get_total_work_for_target(&null_target).unwrap()
}
pub fn get_total_work_for_target(&self, target: &Target) -> Result<u64> {
let progress_total = match &self.source {
PackageSource::Local {
blobs,
buildomat_blobs,
rust,
paths,
} => {
let blob_work = blobs.as_ref().map(|b| b.len()).unwrap_or(0);
let buildomat_work = buildomat_blobs.as_ref().map(|b| b.len()).unwrap_or(0);
let blob_dir_work = (blob_work != 0 || buildomat_work != 0) as usize;
let rust_work = rust.as_ref().map(|r| r.binary_names.len()).unwrap_or(0);
let mut paths_work = 0;
for path in paths {
let from = PathBuf::from(path.from.interpolate(target)?);
paths_work += walkdir::WalkDir::new(&from)
.follow_links(true)
.into_iter()
.count();
}
rust_work + blob_work + buildomat_work + paths_work + blob_dir_work
}
_ => 1,
};
Ok(progress_total.try_into()?)
}
#[deprecated(note = "Call Self::create_with_progress_for_target instead")]
pub async fn create_with_progress(
&self,
progress: &impl Progress,
name: &str,
output_directory: &Path,
) -> Result<File> {
let null_target = Target(BTreeMap::new());
self.create_internal(&null_target, progress, name, output_directory)
.await
}
pub async fn create_with_progress_for_target(
&self,
progress: &impl Progress,
target: &Target,
name: &str,
output_directory: &Path,
) -> Result<File> {
self.create_internal(target, progress, name, output_directory)
.await
}
async fn create_internal(
&self,
target: &Target,
progress: &impl Progress,
name: &str,
output_directory: &Path,
) -> Result<File> {
match self.output {
PackageOutput::Zone { .. } => {
self.create_zone_package(target, progress, name, output_directory)
.await
}
PackageOutput::Tarball => {
self.create_tarball_package(target, progress, name, output_directory)
.await
}
}
}
async fn add_paths<W: std::io::Write + Send + Sync>(
&self,
target: &Target,
progress: &impl Progress,
archive: &mut Builder<W>,
paths: &Vec<MappedPath>,
) -> Result<()> {
progress.set_message("adding paths".into());
for path in paths {
let from = PathBuf::from(path.from.interpolate(target)?);
let to = PathBuf::from(path.to.interpolate(target)?);
match self.output {
PackageOutput::Zone { .. } => {
add_directory_and_parents(archive, to.parent().unwrap())?;
}
PackageOutput::Tarball => {}
}
if !from.exists() {
return Err(anyhow!(
"Cannot add path \"{}\" to package \"{}\" because it does not exist",
from.to_string_lossy(),
self.service_name,
));
}
let from_root = std::fs::canonicalize(&from).map_err(|e| {
anyhow!(
"failed to canonicalize \"{}\": {}",
from.to_string_lossy(),
e
)
})?;
let entries = walkdir::WalkDir::new(&from_root)
.follow_links(true)
.sort_by_file_name();
for entry in entries {
let entry = entry?;
let dst = if from.is_dir() {
to.join(entry.path().strip_prefix(&from_root)?)
} else {
assert_eq!(entry.path(), from_root.as_path());
to.clone()
};
let dst = match self.output {
PackageOutput::Zone { .. } => {
archive_path(&dst)?
}
PackageOutput::Tarball => dst,
};
if entry.file_type().is_dir() {
archive.append_dir(&dst, ".")?;
} else if entry.file_type().is_file() {
archive
.append_path_with_name_async(entry.path(), &dst)
.await
.context(format!(
"Failed to add file '{}' to '{}'",
entry.path().display(),
dst.display()
))?;
} else {
panic!(
"Unsupported file type: {:?} for {:?}",
entry.file_type(),
entry
);
}
progress.increment(1);
}
}
Ok(())
}
async fn add_rust<W: std::io::Write + Send>(
&self,
progress: &impl Progress,
archive: &mut Builder<W>,
) -> Result<()> {
if let Some(rust_pkg) = self.source.rust_package() {
let dst = match self.output {
PackageOutput::Zone { .. } => {
let dst = Path::new("/opt/oxide").join(&self.service_name).join("bin");
add_directory_and_parents(archive, &dst)?;
archive_path(&dst)?
}
PackageOutput::Tarball => PathBuf::from(""),
};
rust_pkg
.add_binaries_to_archive(progress, archive, &dst)
.await?;
}
Ok(())
}
async fn add_blobs<W: std::io::Write + Send>(
&self,
progress: &impl Progress,
archive: &mut Builder<W>,
download_directory: &Path,
destination_path: &Path,
) -> Result<()> {
let mut all_blobs = Vec::new();
if let Some(blobs) = self.source.blobs() {
all_blobs.extend(blobs.iter().map(crate::blob::Source::S3));
}
if let Some(buildomat_blobs) = self.source.buildomat_blobs() {
all_blobs.extend(buildomat_blobs.iter().map(crate::blob::Source::Buildomat));
}
if !all_blobs.is_empty() {
progress.set_message("downloading blobs".into());
let blobs_path = download_directory.join(&self.service_name);
std::fs::create_dir_all(&blobs_path)?;
stream::iter(all_blobs.iter())
.map(Ok)
.try_for_each_concurrent(None, |blob| {
let blob_path = match blob {
blob::Source::S3(s) => blobs_path.join(s),
blob::Source::Buildomat(spec) => blobs_path.join(&spec.artifact),
};
async move {
blob::download(progress, blob, &blob_path)
.await
.with_context(|| {
format!("failed to download blob: {}", blob.get_url())
})?;
progress.increment(1);
Ok::<_, anyhow::Error>(())
}
})
.await?;
progress.set_message("adding blobs".into());
archive
.append_dir_all_async(destination_path, &blobs_path)
.await?;
progress.increment(1);
}
Ok(())
}
async fn create_zone_package(
&self,
target: &Target,
progress: &impl Progress,
name: &str,
output_directory: &Path,
) -> Result<File> {
let mut archive = new_zone_archive_builder(name, output_directory, None).await?;
match &self.source {
PackageSource::Local { paths, .. } => {
self.add_paths(target, progress, &mut archive, paths)
.await?;
self.add_rust(progress, &mut archive).await?;
let blob_dst = Path::new("/opt/oxide").join(&self.service_name).join(BLOB);
self.add_blobs(
progress,
&mut archive,
output_directory,
&archive_path(&blob_dst)?,
)
.await?;
}
PackageSource::Composite { packages } => {
for component_package in packages {
let component_path = output_directory.join(component_package);
add_package_to_zone_archive(&mut archive, &component_path).await?;
}
}
_ => {
return Err(anyhow!(
"Cannot create a zone package with source: {:?}",
self.source
));
}
}
let file = archive
.into_inner()
.map_err(|err| anyhow!("Failed to finalize archive: {}", err))?;
Ok(file.finish()?)
}
async fn add_stamp_to_tarball_package(
&self,
archive: &mut Builder<File>,
version: &semver::Version,
) -> Result<()> {
let mut version_file = tokio::fs::File::from_std(tempfile::tempfile()?);
version_file
.write_all(version.to_string().as_bytes())
.await?;
version_file.seek(std::io::SeekFrom::Start(0)).await?;
let version_filename = Path::new("VERSION");
archive
.append_file_async(version_filename, &mut version_file.into_std().await)
.await?;
Ok(())
}
async fn create_tarball_package(
&self,
target: &Target,
progress: &impl Progress,
name: &str,
output_directory: &Path,
) -> Result<File> {
let tarfile = self.get_output_path(name, output_directory);
let file = create_tarfile(&tarfile)?;
let mut archive = Builder::new(file);
archive.mode(tar::HeaderMode::Deterministic);
match &self.source {
PackageSource::Local { paths, .. } => {
self.add_paths(target, progress, &mut archive, paths)
.await?;
self.add_rust(progress, &mut archive).await?;
self.add_blobs(progress, &mut archive, output_directory, Path::new(BLOB))
.await?;
self.add_stamp_to_tarball_package(&mut archive, &DEFAULT_VERSION)
.await?;
Ok(archive
.into_inner()
.map_err(|err| anyhow!("Failed to finalize archive: {}", err))?)
}
_ => Err(anyhow!("Cannot create non-local tarball")),
}
}
}
#[derive(Clone, Deserialize, Debug, PartialEq)]
pub struct RustPackage {
pub binary_names: Vec<String>,
pub release: bool,
}
impl RustPackage {
async fn add_binaries_to_archive<W: std::io::Write + Send>(
&self,
progress: &impl Progress,
archive: &mut tar::Builder<W>,
dst_directory: &Path,
) -> Result<()> {
for name in &self.binary_names {
progress.set_message(format!("adding rust binary: {name}").into());
archive
.append_path_with_name_async(
Self::local_binary_path(name, self.release),
dst_directory.join(name),
)
.await
.map_err(|err| anyhow!("Cannot append binary to tarfile: {}", err))?;
progress.increment(1);
}
Ok(())
}
fn local_binary_path(name: &str, release: bool) -> PathBuf {
format!(
"target/{}/{}",
if release { "release" } else { "debug" },
name,
)
.into()
}
}
#[derive(Clone, Deserialize, Debug, PartialEq)]
pub struct InterpolatedString(String);
impl InterpolatedString {
pub fn interpolate(&self, target: &Target) -> Result<String> {
let mut input = self.0.as_str();
let mut output = String::new();
const START_STR: &str = "{{";
const END_STR: &str = "}}";
while let Some(sub_idx) = input.find(START_STR) {
output.push_str(&input[..sub_idx]);
input = &input[sub_idx + START_STR.len()..];
let Some(end_idx) = input.find(END_STR) else {
bail!("Missing closing '{END_STR}' character in '{}'", self.0);
};
let key = &input[..end_idx];
let Some(value) = target.0.get(key) else {
bail!(
"Key '{key}' not found in target, but required in '{}'",
self.0
);
};
output.push_str(value);
input = &input[end_idx + END_STR.len()..];
}
output.push_str(input);
Ok(output)
}
}
#[derive(Clone, Deserialize, Debug, PartialEq)]
pub struct MappedPath {
pub from: InterpolatedString,
pub to: InterpolatedString,
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn interpolate_noop() {
let target = Target(BTreeMap::new());
let is = InterpolatedString(String::from("nothing to change"));
let s = is.interpolate(&target).unwrap();
assert_eq!(s, is.0);
}
#[test]
fn interpolate_single() {
let mut target = Target(BTreeMap::new());
target.0.insert("key1".to_string(), "value1".to_string());
let is = InterpolatedString(String::from("{{key1}}"));
let s = is.interpolate(&target).unwrap();
assert_eq!(s, "value1");
}
#[test]
fn interpolate_single_with_prefix() {
let mut target = Target(BTreeMap::new());
target.0.insert("key1".to_string(), "value1".to_string());
let is = InterpolatedString(String::from("prefix-{{key1}}"));
let s = is.interpolate(&target).unwrap();
assert_eq!(s, "prefix-value1");
}
#[test]
fn interpolate_single_with_suffix() {
let mut target = Target(BTreeMap::new());
target.0.insert("key1".to_string(), "value1".to_string());
let is = InterpolatedString(String::from("{{key1}}-suffix"));
let s = is.interpolate(&target).unwrap();
assert_eq!(s, "value1-suffix");
}
#[test]
fn interpolate_multiple() {
let mut target = Target(BTreeMap::new());
target.0.insert("key1".to_string(), "value1".to_string());
target.0.insert("key2".to_string(), "value2".to_string());
let is = InterpolatedString(String::from("{{key1}}-{{key2}}"));
let s = is.interpolate(&target).unwrap();
assert_eq!(s, "value1-value2");
}
#[test]
fn interpolate_missing_key() {
let mut target = Target(BTreeMap::new());
target.0.insert("key1".to_string(), "value1".to_string());
let is = InterpolatedString(String::from("{{key3}}"));
let err = is
.interpolate(&target)
.expect_err("Interpolating string should have failed");
assert_eq!(
err.to_string(),
"Key 'key3' not found in target, but required in '{{key3}}'"
);
}
#[test]
fn interpolate_missing_closing() {
let mut target = Target(BTreeMap::new());
target.0.insert("key1".to_string(), "value1".to_string());
let is = InterpolatedString(String::from("{{key1"));
let err = is
.interpolate(&target)
.expect_err("Interpolating string should have failed");
assert_eq!(
err.to_string(),
"Missing closing '}}' character in '{{key1'"
);
}
#[test]
fn interpolate_key_as_literal() {
let mut target = Target(BTreeMap::new());
target.0.insert("oh{{no".to_string(), "value".to_string());
let is = InterpolatedString(String::from("{{oh{{no}}"));
let s = is.interpolate(&target).unwrap();
assert_eq!(s, "value");
}
}