pub mod canonical;
pub mod dir;
pub mod image;
pub mod layers;
pub mod overlay;
pub mod squashfs;
pub mod tar;
pub mod tracker;
pub mod verify;
use anyhow::Result;
use image::LayerBlob;
use std::path::{Path, PathBuf};
#[derive(Clone, Debug)]
pub enum ImageSpec {
Squashfs {
path: PathBuf,
binpath: Option<PathBuf>,
},
Tar { path: PathBuf },
Dir { path: PathBuf },
}
impl ImageSpec {
pub fn path(&self) -> &Path {
match self {
Self::Squashfs { path, .. } | Self::Tar { path } | Self::Dir { path } => path,
}
}
}
#[derive(Debug, Clone)]
pub enum PackerProgress {
LayerStarted(usize),
LayerFinished(usize),
}
#[derive(Clone, Debug)]
pub struct LayerMeta {
pub index: usize,
pub media_type: String,
}
fn layers_from_image_dir(
image_dir: &Path,
) -> Result<(std::sync::mpsc::Receiver<Result<LayerBlob>>, usize)> {
let manifest = image::load_manifest(image_dir)?;
let layers = image::resolve_layers(image_dir, &manifest)?;
let total = layers.len();
let (tx, rx) = std::sync::mpsc::channel();
for layer in layers {
tx.send(Ok(layer)).unwrap();
}
Ok((rx, total))
}
fn make_layer_channel(
cap: usize,
) -> (
tokio::sync::mpsc::Sender<Result<LayerBlob>>,
std::sync::mpsc::Receiver<Result<LayerBlob>>,
) {
let (tokio_tx, tokio_rx) = tokio::sync::mpsc::channel(cap.max(1));
let (std_tx, std_rx) = std::sync::mpsc::channel();
tokio::spawn(relay_to_blocking(tokio_rx, std_tx));
(tokio_tx, std_rx)
}
async fn relay_to_blocking(
mut tokio_rx: tokio::sync::mpsc::Receiver<Result<LayerBlob>>,
std_tx: std::sync::mpsc::Sender<Result<LayerBlob>>,
) {
while let Some(item) = tokio_rx.recv().await {
if std_tx.send(item).is_err() {
break;
}
}
}
async fn relay_from_blocking(
std_rx: std::sync::mpsc::Receiver<PackerProgress>,
tokio_tx: tokio::sync::mpsc::Sender<PackerProgress>,
) {
let (bridge_tx, mut bridge_rx) = tokio::sync::mpsc::channel(1);
tokio::task::spawn_blocking(move || {
while let Ok(item) = std_rx.recv() {
if bridge_tx.blocking_send(item).is_err() {
break;
}
}
});
while let Some(item) = bridge_rx.recv().await {
if tokio_tx.send(item).await.is_err() {
break;
}
}
}
fn write_for_spec(
receiver: std::sync::mpsc::Receiver<Result<LayerBlob>>,
total_layers: usize,
spec: ImageSpec,
progress_tx: Option<std::sync::mpsc::SyncSender<PackerProgress>>,
) -> Result<()> {
match spec {
ImageSpec::Squashfs { path, binpath } => squashfs::write_squashfs_with_progress(
receiver,
total_layers,
&path,
binpath.as_deref(),
progress_tx,
),
ImageSpec::Tar { path } => {
tar::write_tar_with_progress(receiver, total_layers, &path, progress_tx)
}
ImageSpec::Dir { path } => {
dir::write_dir_with_progress(receiver, total_layers, &path, progress_tx)
}
}
}
pub async fn convert(image_dir: &Path, spec: ImageSpec) -> Result<()> {
let image_dir = image_dir.to_path_buf();
tokio::task::spawn_blocking(move || {
let (rx, total) = layers_from_image_dir(&image_dir)?;
write_for_spec(rx, total, spec, None)
})
.await?
}
pub async fn convert_mksquashfs(
image_dir: &Path,
output_squashfs: &Path,
squashfs_binpath: Option<&Path>,
) -> Result<()> {
convert(
image_dir,
ImageSpec::Squashfs {
path: output_squashfs.to_path_buf(),
binpath: squashfs_binpath.map(Path::to_path_buf),
},
)
.await
}
pub async fn convert_tar(image_dir: &Path, output_tar: &Path) -> Result<()> {
convert(
image_dir,
ImageSpec::Tar {
path: output_tar.to_path_buf(),
},
)
.await
}
pub async fn convert_dir(image_dir: &Path, output_dir: &Path) -> Result<()> {
convert(
image_dir,
ImageSpec::Dir {
path: output_dir.to_path_buf(),
},
)
.await
}
pub async fn convert_mksquashfs_streaming(
receiver: tokio::sync::mpsc::Receiver<Result<LayerBlob>>,
total_layers: usize,
output_squashfs: &Path,
squashfs_binpath: Option<&Path>,
) -> Result<()> {
let spec = ImageSpec::Squashfs {
path: output_squashfs.to_path_buf(),
binpath: squashfs_binpath.map(Path::to_path_buf),
};
let (std_tx, std_rx) = std::sync::mpsc::channel();
tokio::spawn(relay_to_blocking(receiver, std_tx));
tokio::task::spawn_blocking(move || write_for_spec(std_rx, total_layers, spec, None)).await?
}
pub async fn convert_tar_streaming(
receiver: tokio::sync::mpsc::Receiver<Result<LayerBlob>>,
total_layers: usize,
output_tar: &Path,
) -> Result<()> {
let spec = ImageSpec::Tar {
path: output_tar.to_path_buf(),
};
let (std_tx, std_rx) = std::sync::mpsc::channel();
tokio::spawn(relay_to_blocking(receiver, std_tx));
tokio::task::spawn_blocking(move || write_for_spec(std_rx, total_layers, spec, None)).await?
}
pub async fn convert_dir_streaming(
receiver: tokio::sync::mpsc::Receiver<Result<LayerBlob>>,
total_layers: usize,
output_dir: &Path,
) -> Result<()> {
let spec = ImageSpec::Dir {
path: output_dir.to_path_buf(),
};
let (std_tx, std_rx) = std::sync::mpsc::channel();
tokio::spawn(relay_to_blocking(receiver, std_tx));
tokio::task::spawn_blocking(move || write_for_spec(std_rx, total_layers, spec, None)).await?
}
pub struct StreamingPacker {
layer_tx: tokio::sync::mpsc::Sender<Result<LayerBlob>>,
metas: Vec<LayerMeta>,
task: tokio::task::JoinHandle<Result<()>>,
}
impl StreamingPacker {
pub fn new(
layer_metas: Vec<LayerMeta>,
spec: ImageSpec,
progress_tx: Option<tokio::sync::mpsc::Sender<PackerProgress>>,
) -> Self {
let total = layer_metas.len();
let (tokio_tx, std_rx) = make_layer_channel(total);
let std_progress_tx = if let Some(async_tx) = progress_tx {
let (tx, rx) = std::sync::mpsc::sync_channel::<PackerProgress>(total.max(1) * 2);
tokio::spawn(relay_from_blocking(rx, async_tx));
Some(tx)
} else {
None
};
let task = tokio::task::spawn_blocking(move || {
write_for_spec(std_rx, total, spec, std_progress_tx)
});
StreamingPacker {
layer_tx: tokio_tx,
metas: layer_metas,
task,
}
}
pub async fn notify_layer_ready(&self, index: usize, path: PathBuf) -> Result<()> {
let meta = self.metas.get(index).ok_or_else(|| {
anyhow::anyhow!(
"layer index {index} out of range (have {} layers)",
self.metas.len()
)
})?;
self.layer_tx
.send(Ok(LayerBlob {
path,
media_type: meta.media_type.clone(),
index,
}))
.await
.map_err(|_| anyhow::anyhow!("packer channel closed unexpectedly"))
}
pub async fn notify_error(&self, err: anyhow::Error) {
let _ = self.layer_tx.send(Err(err)).await;
}
pub async fn finish(self) -> Result<()> {
drop(self.layer_tx);
self.task.await?
}
}