1#![allow(clippy::must_use_candidate, clippy::implicit_hasher)]
2
3mod aws;
4mod docker;
5
6use async_compression::{Level, tokio::write::ZstdEncoder};
7use futures::{FutureExt, future::Fuse};
8use pin_project::pin_project;
9use std::{
10 collections::{BTreeSet, HashSet},
11 convert::TryInto,
12 fs,
13 future::Future,
14 io,
15 path::{Path, PathBuf},
16 pin::Pin,
17 task::{Context, Poll},
18};
19use tokio::io::{AsyncBufRead, AsyncRead, AsyncWriteExt, ReadBuf};
20use walkdir::WalkDir;
21
22use docker::Docker;
23
24pub fn bundle(binary: PathBuf, resource_dirs: HashSet<PathBuf>) -> impl AsyncBufRead {
25 let mut resources = resource_dirs.into_iter().collect::<Vec<_>>();
26 resources.sort();
27 let resources = resources
29 .into_iter()
30 .map(|resources| {
31 WalkDir::new(resources)
32 .sort_by(|a, b| a.file_name().cmp(b.file_name()))
33 .into_iter()
34 .filter_map(|entry| {
35 let entry = entry.unwrap();
36 match entry.file_type() {
37 t if t.is_dir() => Some(false),
38 t if t.is_file() => Some(true),
39 _ => None,
40 }
41 .map(|is_file| (is_file, entry.into_path()))
42 })
43 .collect::<Vec<_>>()
44 })
45 .collect::<Vec<_>>();
46
47 let (writer, reader) = tokio::io::duplex(16 * 1024 * 1024);
48
49 let task = async move {
50 let tar_ = ZstdEncoder::with_quality(writer, Level::Precise(6));
53 let mut tar_ = tokio_tar::Builder::new(tar_);
54 tar_.mode(tokio_tar::HeaderMode::Deterministic);
55
56 let mut entry = shlex::try_join([binary.to_str().unwrap()]).unwrap();
58 entry.push('\n');
59 let mut header = tokio_tar::Header::new_gnu();
60 header.set_mtime(0);
61 header.set_uid(0);
62 header.set_gid(0);
63 header.set_mode(0o755);
64 header.set_size(entry.len().try_into().unwrap());
65 tar_.append_data(&mut header, "__entry", entry.as_bytes()).await.unwrap();
66
67 let mut docker_images = BTreeSet::new();
69 let mut binaries = BTreeSet::new();
70 for resources in resources {
71 for (is_file, resource) in resources {
72 if is_file {
73 if resource.file_name().unwrap() == "docker" {
74 let images = fs::read_to_string(&resource).unwrap();
75 let images = images.split('\n').filter(|image| !image.is_empty()).map(ToOwned::to_owned).collect::<Vec<_>>();
76 docker_images.extend(images);
77 } else if resource.file_name().unwrap() == "binary" {
78 let images = fs::read_to_string(&resource).unwrap();
79 let images = images.split('\n').filter(|image| !image.is_empty()).map(ToOwned::to_owned).collect::<Vec<_>>();
80 binaries.extend(images);
81 }
82 tar_.append_file(&resource, &mut tokio::fs::File::open(&resource).await.unwrap()).await.unwrap();
83 } else {
84 builder_append_dir(&mut tar_, &resource).await.unwrap();
85 }
86 }
87 }
88
89 let mut binaries = binaries.into_iter().map(|resource| binary.parent().unwrap().join(resource)).collect::<Vec<_>>();
91 binaries.sort();
92 binaries.insert(0, binary);
93 for resource in binaries {
94 tar_.append_file(&resource, &mut tokio::fs::File::open(&resource).await.unwrap()).await.unwrap();
95 }
96
97 if !docker_images.is_empty() {
99 let mut docker_images = docker_images.into_iter().collect::<Vec<_>>();
100 docker_images.sort();
101 let docker_dir = Path::new("__docker");
102 let docker = Docker::new();
103 builder_append_dir(&mut tar_, &docker_dir).await.unwrap();
104 let docker_tar = docker.images_export(&docker_images);
105 tokio::pin!(docker_tar);
106 let mut entries = tokio_tar::Archive::new(docker_tar);
107 let mut entries = entries.entries().unwrap();
108 while let Some(entry) = entries.next().await {
109 let entry = entry.unwrap();
110 let mut header = entry.header().clone();
111 let path = docker_dir.join(header.path().unwrap());
112 tar_.append_data(&mut header, path, entry).await.unwrap();
113 }
114 }
115
116 let mut tar_ = tar_.into_inner().await.unwrap();
118 tar_.shutdown().await.unwrap();
119 let _writer: tokio::io::DuplexStream = tar_.into_inner();
120 };
121 let reader = tokio::io::BufReader::with_capacity(16 * 1024 * 1024, reader);
122 let task = task.fuse();
123 AlsoPollFuture { reader, task }
124}
125
126async fn builder_append_dir<W, P>(self_: &mut tokio_tar::Builder<W>, path: P) -> io::Result<()>
127where
128 W: tokio::io::AsyncWrite + Unpin + Send + 'static,
129 P: AsRef<Path>,
130{
131 let mut header = tokio_tar::Header::new_gnu();
132 header.set_mtime(0);
133 header.set_uid(0);
134 header.set_gid(0);
135 header.set_mode(0o755);
136 header.set_size(0);
137 header.set_entry_type(tokio_tar::EntryType::Directory);
138 self_.append_data(&mut header, path, [].as_slice()).await
139}
140
141#[pin_project]
142struct AlsoPollFuture<R, F> {
143 #[pin]
144 reader: R,
145 #[pin]
146 task: Fuse<F>,
147}
148impl<R, F> AsyncRead for AlsoPollFuture<R, F>
149where
150 R: AsyncRead,
151 F: Future<Output = ()>,
152{
153 fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
154 let self_ = self.project();
155 let _ = self_.task.poll(cx);
156 self_.reader.poll_read(cx, buf)
157 }
158}
159impl<R, F> AsyncBufRead for AlsoPollFuture<R, F>
160where
161 R: AsyncBufRead,
162 F: Future<Output = ()>,
163{
164 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
165 let self_ = self.project();
166 let _ = self_.task.poll(cx);
167 self_.reader.poll_fill_buf(cx)
168 }
169 fn consume(self: Pin<&mut Self>, amt: usize) {
170 self.project().reader.consume(amt);
171 }
172}