bndl/
lib.rs

1#![allow(clippy::must_use_candidate, clippy::implicit_hasher)]
2
3mod aws;
4mod docker;
5
6use async_compression::{Level, tokio::write::ZstdEncoder};
7use futures::{FutureExt, future::Fuse};
8use pin_project::pin_project;
9use std::{
10	collections::{BTreeSet, HashSet},
11	convert::TryInto,
12	fs,
13	future::Future,
14	io,
15	path::{Path, PathBuf},
16	pin::Pin,
17	task::{Context, Poll},
18};
19use tokio::io::{AsyncBufRead, AsyncRead, AsyncWriteExt, ReadBuf};
20use walkdir::WalkDir;
21
22use docker::Docker;
23
24pub fn bundle(binary: PathBuf, resource_dirs: HashSet<PathBuf>) -> impl AsyncBufRead {
25	let mut resources = resource_dirs.into_iter().collect::<Vec<_>>();
26	resources.sort();
27	// recurse resource dirs and sort file list
28	let resources = resources
29		.into_iter()
30		.map(|resources| {
31			WalkDir::new(resources)
32				.sort_by(|a, b| a.file_name().cmp(b.file_name()))
33				.into_iter()
34				.filter_map(|entry| {
35					let entry = entry.unwrap();
36					match entry.file_type() {
37						t if t.is_dir() => Some(false),
38						t if t.is_file() => Some(true),
39						_ => None,
40					}
41					.map(|is_file| (is_file, entry.into_path()))
42				})
43				.collect::<Vec<_>>()
44		})
45		.collect::<Vec<_>>();
46
47	let (writer, reader) = tokio::io::duplex(16 * 1024 * 1024);
48
49	let task = async move {
50		// create a deterministic .tar.zsd
51		// https://community.centminmod.com/threads/round-4-compression-comparison-benchmarks-zstd-vs-brotli-vs-pigz-vs-bzip2-vs-xz-etc.18669/
52		let tar_ = ZstdEncoder::with_quality(writer, Level::Precise(6));
53		let mut tar_ = tokio_tar::Builder::new(tar_);
54		tar_.mode(tokio_tar::HeaderMode::Deterministic);
55
56		// add the entry point to tar
57		let mut entry = shlex::try_join([binary.to_str().unwrap()]).unwrap();
58		entry.push('\n');
59		let mut header = tokio_tar::Header::new_gnu();
60		header.set_mtime(0);
61		header.set_uid(0);
62		header.set_gid(0);
63		header.set_mode(0o755);
64		header.set_size(entry.len().try_into().unwrap());
65		tar_.append_data(&mut header, "__entry", entry.as_bytes()).await.unwrap();
66
67		// add resources (check for docker images) to tar
68		let mut docker_images = BTreeSet::new();
69		let mut binaries = BTreeSet::new();
70		for resources in resources {
71			for (is_file, resource) in resources {
72				if is_file {
73					if resource.file_name().unwrap() == "docker" {
74						let images = fs::read_to_string(&resource).unwrap();
75						let images = images.split('\n').filter(|image| !image.is_empty()).map(ToOwned::to_owned).collect::<Vec<_>>();
76						docker_images.extend(images);
77					} else if resource.file_name().unwrap() == "binary" {
78						let images = fs::read_to_string(&resource).unwrap();
79						let images = images.split('\n').filter(|image| !image.is_empty()).map(ToOwned::to_owned).collect::<Vec<_>>();
80						binaries.extend(images);
81					}
82					tar_.append_file(&resource, &mut tokio::fs::File::open(&resource).await.unwrap()).await.unwrap();
83				} else {
84					builder_append_dir(&mut tar_, &resource).await.unwrap();
85				}
86			}
87		}
88
89		// add binaries to tar
90		let mut binaries = binaries.into_iter().map(|resource| binary.parent().unwrap().join(resource)).collect::<Vec<_>>();
91		binaries.sort();
92		binaries.insert(0, binary);
93		for resource in binaries {
94			tar_.append_file(&resource, &mut tokio::fs::File::open(&resource).await.unwrap()).await.unwrap();
95		}
96
97		// add docker images resources to tar
98		if !docker_images.is_empty() {
99			let mut docker_images = docker_images.into_iter().collect::<Vec<_>>();
100			docker_images.sort();
101			let docker_dir = Path::new("__docker");
102			let docker = Docker::new();
103			builder_append_dir(&mut tar_, &docker_dir).await.unwrap();
104			let docker_tar = docker.images_export(&docker_images);
105			tokio::pin!(docker_tar);
106			let mut entries = tokio_tar::Archive::new(docker_tar);
107			let mut entries = entries.entries().unwrap();
108			while let Some(entry) = entries.next().await {
109				let entry = entry.unwrap();
110				let mut header = entry.header().clone();
111				let path = docker_dir.join(header.path().unwrap());
112				tar_.append_data(&mut header, path, entry).await.unwrap();
113			}
114		}
115
116		// flush writers
117		let mut tar_ = tar_.into_inner().await.unwrap();
118		tar_.shutdown().await.unwrap();
119		let _writer: tokio::io::DuplexStream = tar_.into_inner();
120	};
121	let reader = tokio::io::BufReader::with_capacity(16 * 1024 * 1024, reader);
122	let task = task.fuse();
123	AlsoPollFuture { reader, task }
124}
125
126async fn builder_append_dir<W, P>(self_: &mut tokio_tar::Builder<W>, path: P) -> io::Result<()>
127where
128	W: tokio::io::AsyncWrite + Unpin + Send + 'static,
129	P: AsRef<Path>,
130{
131	let mut header = tokio_tar::Header::new_gnu();
132	header.set_mtime(0);
133	header.set_uid(0);
134	header.set_gid(0);
135	header.set_mode(0o755);
136	header.set_size(0);
137	header.set_entry_type(tokio_tar::EntryType::Directory);
138	self_.append_data(&mut header, path, [].as_slice()).await
139}
140
141#[pin_project]
142struct AlsoPollFuture<R, F> {
143	#[pin]
144	reader: R,
145	#[pin]
146	task: Fuse<F>,
147}
148impl<R, F> AsyncRead for AlsoPollFuture<R, F>
149where
150	R: AsyncRead,
151	F: Future<Output = ()>,
152{
153	fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
154		let self_ = self.project();
155		let _ = self_.task.poll(cx);
156		self_.reader.poll_read(cx, buf)
157	}
158}
159impl<R, F> AsyncBufRead for AlsoPollFuture<R, F>
160where
161	R: AsyncBufRead,
162	F: Future<Output = ()>,
163{
164	fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
165		let self_ = self.project();
166		let _ = self_.task.poll(cx);
167		self_.reader.poll_fill_buf(cx)
168	}
169	fn consume(self: Pin<&mut Self>, amt: usize) {
170		self.project().reader.consume(amt);
171	}
172}