#![allow(clippy::must_use_candidate, clippy::implicit_hasher)]
mod aws;
mod docker;
use async_compression::{Level, tokio::write::ZstdEncoder};
use futures::{FutureExt, future::Fuse};
use pin_project::pin_project;
use std::{
collections::{BTreeSet, HashSet},
convert::TryInto,
fs,
future::Future,
io,
path::{Path, PathBuf},
pin::Pin,
task::{Context, Poll},
};
use tokio::io::{AsyncBufRead, AsyncRead, AsyncWriteExt, ReadBuf};
use walkdir::WalkDir;
use docker::Docker;
pub fn bundle(binary: PathBuf, resource_dirs: HashSet<PathBuf>) -> impl AsyncBufRead {
let mut resources = resource_dirs.into_iter().collect::<Vec<_>>();
resources.sort();
let resources = resources
.into_iter()
.map(|resources| {
WalkDir::new(resources)
.sort_by(|a, b| a.file_name().cmp(b.file_name()))
.into_iter()
.filter_map(|entry| {
let entry = entry.unwrap();
match entry.file_type() {
t if t.is_dir() => Some(false),
t if t.is_file() => Some(true),
_ => None,
}
.map(|is_file| (is_file, entry.into_path()))
})
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
let (writer, reader) = tokio::io::duplex(16 * 1024 * 1024);
let task = async move {
let tar_ = ZstdEncoder::with_quality(writer, Level::Precise(6));
let mut tar_ = tokio_tar::Builder::new(tar_);
tar_.mode(tokio_tar::HeaderMode::Deterministic);
let mut entry = shlex::try_join([binary.to_str().unwrap()]).unwrap();
entry.push('\n');
let mut header = tokio_tar::Header::new_gnu();
header.set_mtime(0);
header.set_uid(0);
header.set_gid(0);
header.set_mode(0o755);
header.set_size(entry.len().try_into().unwrap());
tar_.append_data(&mut header, "__entry", entry.as_bytes()).await.unwrap();
let mut docker_images = BTreeSet::new();
let mut binaries = BTreeSet::new();
for resources in resources {
for (is_file, resource) in resources {
if is_file {
if resource.file_name().unwrap() == "docker" {
let images = fs::read_to_string(&resource).unwrap();
let images = images.split('\n').filter(|image| !image.is_empty()).map(ToOwned::to_owned).collect::<Vec<_>>();
docker_images.extend(images);
} else if resource.file_name().unwrap() == "binary" {
let images = fs::read_to_string(&resource).unwrap();
let images = images.split('\n').filter(|image| !image.is_empty()).map(ToOwned::to_owned).collect::<Vec<_>>();
binaries.extend(images);
}
tar_.append_file(&resource, &mut tokio::fs::File::open(&resource).await.unwrap()).await.unwrap();
} else {
builder_append_dir(&mut tar_, &resource).await.unwrap();
}
}
}
let mut binaries = binaries.into_iter().map(|resource| binary.parent().unwrap().join(resource)).collect::<Vec<_>>();
binaries.sort();
binaries.insert(0, binary);
for resource in binaries {
tar_.append_file(&resource, &mut tokio::fs::File::open(&resource).await.unwrap()).await.unwrap();
}
if !docker_images.is_empty() {
let mut docker_images = docker_images.into_iter().collect::<Vec<_>>();
docker_images.sort();
let docker_dir = Path::new("__docker");
let docker = Docker::new();
builder_append_dir(&mut tar_, &docker_dir).await.unwrap();
let docker_tar = docker.images_export(&docker_images);
tokio::pin!(docker_tar);
let mut entries = tokio_tar::Archive::new(docker_tar);
let mut entries = entries.entries().unwrap();
while let Some(entry) = entries.next().await {
let entry = entry.unwrap();
let mut header = entry.header().clone();
let path = docker_dir.join(header.path().unwrap());
tar_.append_data(&mut header, path, entry).await.unwrap();
}
}
let mut tar_ = tar_.into_inner().await.unwrap();
tar_.shutdown().await.unwrap();
let _writer: tokio::io::DuplexStream = tar_.into_inner();
};
let reader = tokio::io::BufReader::with_capacity(16 * 1024 * 1024, reader);
let task = task.fuse();
AlsoPollFuture { reader, task }
}
async fn builder_append_dir<W, P>(self_: &mut tokio_tar::Builder<W>, path: P) -> io::Result<()>
where
W: tokio::io::AsyncWrite + Unpin + Send + 'static,
P: AsRef<Path>,
{
let mut header = tokio_tar::Header::new_gnu();
header.set_mtime(0);
header.set_uid(0);
header.set_gid(0);
header.set_mode(0o755);
header.set_size(0);
header.set_entry_type(tokio_tar::EntryType::Directory);
self_.append_data(&mut header, path, [].as_slice()).await
}
#[pin_project]
struct AlsoPollFuture<R, F> {
#[pin]
reader: R,
#[pin]
task: Fuse<F>,
}
impl<R, F> AsyncRead for AlsoPollFuture<R, F>
where
R: AsyncRead,
F: Future<Output = ()>,
{
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
let self_ = self.project();
let _ = self_.task.poll(cx);
self_.reader.poll_read(cx, buf)
}
}
impl<R, F> AsyncBufRead for AlsoPollFuture<R, F>
where
R: AsyncBufRead,
F: Future<Output = ()>,
{
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
let self_ = self.project();
let _ = self_.task.poll(cx);
self_.reader.poll_fill_buf(cx)
}
fn consume(self: Pin<&mut Self>, amt: usize) {
self.project().reader.consume(amt);
}
}