#[cfg(feature = "compression")]
use crate::compression::Decompress;
use crate::error;
use crate::layer::Layer;
use crate::models::{Config, ImageConfig, MediaType, Platform, TarballManifest};
use crate::uri::{Reference, Uri};
use bon::Builder;
use futures::StreamExt;
use futures::future::join_all;
#[cfg(feature = "progress")]
use indicatif::MultiProgress;
use serde::{Deserialize, Serialize};
use snafu::{ResultExt, ensure};
use std::collections::HashSet;
use tempfile::tempdir;
use tokio::fs::File;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::task::JoinHandle;
use tokio_tar::{Archive, Builder as ArchiveBuilder};
const WHITEOUT: &str = ".wh.";
#[derive(Debug, Serialize, Deserialize, Clone, Builder)]
#[serde(rename_all = "camelCase")]
pub struct Image {
#[builder(into)]
schema_version: usize,
#[builder(into)]
media_type: MediaType,
#[builder(into)]
config: Layer,
#[builder(into)]
layers: Vec<Layer>,
#[builder(into)]
#[serde(skip)]
platform: Option<Platform>,
}
impl Image {
pub async fn read<R>(reader: &mut R, platform: Option<Platform>) -> crate::Result<Self>
where
R: AsyncRead + Unpin,
{
let mut buffer = Vec::new();
reader
.read_to_end(&mut buffer)
.await
.context(error::ArchiveSnafu)?;
let mut me: Self =
serde_json::from_slice(buffer.as_slice()).context(error::ImageInvalidManifestSnafu)?;
me.platform = platform;
Ok(me)
}
pub async fn create(config: &Layer, layers: &[Layer], platform: Option<Platform>) -> Self {
Self {
schema_version: 2,
media_type: MediaType::Config,
config: config.clone(),
layers: layers.to_vec(),
platform,
}
}
pub async fn fetch(uri: &Uri, platform: Option<Platform>) -> crate::Result<Self> {
ensure!(
matches!(uri.reference(), Reference::Digest { .. }),
error::DirectLoadImageSnafu { uri: uri.clone() }
);
let mut me: Self = uri
.registry()
.fetch_manifest(uri.repository(), uri.reference().to_string().as_str())
.await?;
me.platform = platform.clone();
Ok(me)
}
pub fn schema_version(&self) -> usize {
self.schema_version
}
pub fn media_type(&self) -> &MediaType {
&self.media_type
}
pub fn config(&self) -> &Layer {
&self.config
}
pub fn layers(&self) -> &[Layer] {
&self.layers
}
pub fn platform(&self) -> Option<Platform> {
self.platform.clone()
}
pub async fn fetch_config(&self, uri: &Uri) -> crate::Result<ImageConfig> {
let mut layer = self.config.open(uri).await?;
let mut config = String::new();
layer
.read_to_string(&mut config)
.await
.context(error::LayerReadSnafu)?;
serde_json::from_str(config.as_str()).context(error::ConfigDeserializeSnafu)
}
#[cfg(feature = "compression")]
pub async fn filesystem<W>(&self, uri: &Uri, output: W) -> crate::Result<()>
where
W: AsyncWrite + Unpin + Send + 'static,
{
let mut archive = ArchiveBuilder::new(output);
let mut filemap: HashSet<String> = HashSet::new();
for layer in self.layers.iter().rev() {
let reader = Decompress::new(layer.media_type(), layer.open(uri).await?);
let mut layer = Archive::new(reader);
let mut entries = layer.entries_raw().context(error::LayerArchiveSnafu)?;
while let Some(entry) = entries.next().await {
let mut entry = entry.context(error::LayerArchiveSnafu)?;
let header = entry.header().clone();
let path = header.path().context(error::LayerArchiveSnafu)?;
let path = path.to_string_lossy();
if path.contains(WHITEOUT)
|| (header.entry_type().is_file() && filemap.contains(path.as_ref()))
{
continue;
}
filemap.insert(path.to_string());
archive
.append(&header, &mut entry)
.await
.context(error::LayerCopySnafu)?;
}
}
archive.finish().await.context(error::ArchiveSnafu)?;
Ok(())
}
#[cfg(all(feature = "progress", feature = "compression"))]
pub async fn filesystem_progress<W>(
&self,
uri: &Uri,
output: W,
multi: &mut MultiProgress,
) -> crate::Result<()>
where
W: AsyncWrite + Unpin + Send + 'static,
{
let mut archive = ArchiveBuilder::new(output);
let mut filemap: HashSet<String> = HashSet::new();
for layer in self.layers.iter().rev() {
let reader =
Decompress::new(layer.media_type(), layer.open_progress(uri, multi).await?);
let mut layer = Archive::new(reader);
let mut entries = layer.entries_raw().context(error::LayerArchiveSnafu)?;
while let Some(entry) = entries.next().await {
let mut entry = entry.context(error::LayerArchiveSnafu)?;
let header = entry.header().clone();
let path = header.path().context(error::LayerArchiveSnafu)?;
let path = path.to_string_lossy();
if path.contains(WHITEOUT)
|| (header.entry_type().is_file() && filemap.contains(path.as_ref()))
{
continue;
}
filemap.insert(path.to_string());
archive
.append(&header, &mut entry)
.await
.context(error::LayerCopySnafu)?;
}
}
archive.finish().await.context(error::ArchiveSnafu)?;
Ok(())
}
#[cfg(feature = "compression")]
pub async fn to_tarball<W>(&self, uri: &Uri, output: W) -> crate::Result<()>
where
W: AsyncWrite + Unpin + Send + 'static,
{
let mut manifest = TarballManifest::builder()
.config(self.config.digest())
.repo_tags(vec![uri.to_string()])
.layers(vec![])
.build();
let tmp_dir = tempdir().context(error::TempSnafu)?;
let mut config_reader = self.config.open(uri).await?;
let mut config_file = File::create(tmp_dir.path().join(self.config.digest()))
.await
.context(error::FileSnafu)?;
Layer::copy(&mut config_reader, &mut config_file, self.config.size()).await?;
let mut tasks: Vec<JoinHandle<crate::Result<String>>> = Vec::new();
let tmp_path = tmp_dir.path().to_path_buf();
for layer in self.layers.iter() {
let layer = layer.clone();
let uri = uri.clone();
let tmp_path = tmp_path.clone();
tasks.push(tokio::spawn(async move {
let mut reader = layer.open(&uri).await?;
let blob_layer = format!(
"{}.tar{}",
layer.digest().split_once(":").unwrap().1,
layer.media_type().compression().to_ext()
);
let mut blob_file = File::create(tmp_path.join(blob_layer.clone()))
.await
.context(error::FileSnafu)?;
Layer::copy(&mut reader, &mut blob_file, layer.size()).await?;
Ok(blob_layer)
}));
}
for result in join_all(tasks).await {
let result = result.unwrap();
manifest.layers.push(result?);
}
let manifest_bytes =
serde_json::to_string(&vec![manifest]).context(error::SerializeSnafu)?;
tokio::fs::write(tmp_dir.path().join("manifest.json"), manifest_bytes)
.await
.context(error::FileSnafu)?;
let mut archive = ArchiveBuilder::new(output);
archive
.append_dir_all(".", tmp_dir.path().to_path_buf())
.await
.context(error::ArchiveSnafu)?;
archive.finish().await.context(error::ArchiveSnafu)?;
Ok(())
}
#[cfg(all(feature = "compression", feature = "progress"))]
pub async fn to_tarball_progress<W>(
&self,
uri: &Uri,
output: W,
progress: &mut MultiProgress,
) -> crate::Result<()>
where
W: AsyncWrite + Unpin + Send + 'static,
{
let mut manifest = TarballManifest::builder()
.config(self.config.digest())
.repo_tags(vec![uri.to_string()])
.layers(vec![])
.build();
let tmp_dir = tempdir().context(error::TempSnafu)?;
let mut config_reader = self.config.open_progress(uri, progress).await?;
let mut config_file = File::create(tmp_dir.path().join(self.config.digest()))
.await
.context(error::FileSnafu)?;
Layer::copy(&mut config_reader, &mut config_file, self.config.size()).await?;
let mut tasks: Vec<JoinHandle<crate::Result<String>>> = Vec::new();
let tmp_path = tmp_dir.path().to_path_buf();
for layer in self.layers.iter() {
let layer = layer.clone();
let uri = uri.clone();
let tmp_path = tmp_path.clone();
let mut multi = progress.clone();
tasks.push(tokio::spawn(async move {
let mut reader = layer.open_progress(&uri, &mut multi).await?;
let blob_layer = format!(
"{}.tar{}",
layer.digest().split_once(":").unwrap().1,
layer.media_type().compression().to_ext()
);
let mut blob_file = File::create(tmp_path.join(blob_layer.clone()))
.await
.context(error::FileSnafu)?;
Layer::copy(&mut reader, &mut blob_file, layer.size()).await?;
Ok(blob_layer)
}));
}
for result in join_all(tasks).await {
let result = result.unwrap();
manifest.layers.push(result?);
}
let manifest_bytes =
serde_json::to_string(&vec![manifest]).context(error::SerializeSnafu)?;
tokio::fs::write(tmp_dir.path().join("manifest.json"), manifest_bytes)
.await
.context(error::FileSnafu)?;
let mut archive = ArchiveBuilder::new(output);
archive
.append_dir_all(".", tmp_dir.path().to_path_buf())
.await
.context(error::ArchiveSnafu)?;
archive.finish().await.context(error::ArchiveSnafu)?;
Ok(())
}
pub async fn push(&self, uri: &Uri) -> crate::Result<Layer> {
uri.registry()
.push_manifest(
&self.media_type,
uri.repository(),
uri.reference().to_string().as_str(),
&self,
self.platform.clone(),
)
.await
}
pub async fn create_config(uri: &Uri, config: &Config) -> crate::Result<Layer> {
let config_bytes = serde_json::to_vec(config).context(error::SerializeSnafu)?;
let mut writer = Layer::create(uri, &MediaType::Config, config_bytes.len(), None)
.await?
.unwrap();
writer
.write_all(config_bytes.as_slice())
.await
.context(error::LayerWriteSnafu)?;
writer.flush().await.context(error::LayerWriteSnafu)?;
writer.layer().await
}
}