use std::str::FromStr;
use crate::error;
use crate::image::Image;
use crate::layer::Layer;
use crate::models::MediaType;
use crate::models::Platform;
use crate::uri::{Reference, Uri};
use bon::Builder;
use futures::future::join_all;
#[cfg(feature = "progress")]
use indicatif::MultiProgress;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use tempfile::tempdir;
use tokio::fs::{File, create_dir_all};
use tokio::io::AsyncWrite;
use tokio::task::JoinHandle;
use tokio_tar::Builder as ArchiveBuilder;
#[derive(Debug, Serialize, Deserialize, Clone, Builder)]
#[serde(rename_all = "camelCase")]
pub struct Index {
#[builder(into)]
schema_version: usize,
#[builder(into)]
media_type: MediaType,
#[builder(into)]
manifests: Vec<Layer>,
}
impl Index {
pub async fn new(manifests: &[Layer]) -> Self {
Self {
schema_version: 2,
media_type: MediaType::ImageIndex,
manifests: manifests.to_vec(),
}
}
pub async fn check(uri: &Uri) -> crate::Result<bool> {
uri.registry()
.check_manifest(uri.repository(), uri.reference().to_string().as_str())
.await
}
pub async fn fetch(uri: &Uri) -> crate::Result<Self> {
uri.registry()
.fetch_manifest(uri.repository(), uri.reference().to_string().as_str())
.await
}
pub fn schema_version(&self) -> usize {
self.schema_version
}
pub fn media_type(&self) -> &MediaType {
&self.media_type
}
pub fn manifests(&self) -> &[Layer] {
self.manifests.as_slice()
}
pub async fn fetch_image(
&self,
uri: &Uri,
platform: Option<Platform>,
) -> crate::Result<Option<Image>> {
if let Some(platform) = platform {
let oci = self
.manifests
.iter()
.find(|x| x.platform() == Some(platform.clone()))
.context(error::IndexNoPlatformSnafu {
platform: platform.clone(),
})?;
let new_uri = Uri::builder()
.registry(uri.registry().clone())
.repository(uri.repository())
.reference(Reference::from_str(oci.digest())?)
.build();
Ok(Some(Image::fetch(&new_uri, Some(platform)).await?))
} else {
let current = Platform::default();
if let Some(oci) = self
.manifests
.iter()
.find(|x| x.platform() == Some(current.clone()))
{
let new_uri = Uri::builder()
.registry(uri.registry().clone())
.repository(uri.repository())
.reference(Reference::from_str(oci.digest())?)
.build();
return Ok(Some(Image::fetch(&new_uri, Some(current.clone())).await?));
}
if let Some(oci) = self.manifests.first() {
let new_uri = Uri::builder()
.registry(uri.registry().clone())
.repository(uri.repository())
.reference(Reference::from_str(oci.digest())?)
.build();
Ok(Some(Image::fetch(&new_uri, oci.platform().clone()).await?))
} else {
Ok(None)
}
}
}
pub async fn push(&self, uri: &Uri) -> crate::Result<()> {
uri.registry()
.push_manifest(
&self.media_type,
uri.repository(),
uri.reference().to_string().as_str(),
self,
None,
)
.await?;
Ok(())
}
pub async fn to_oci<W>(
&self,
uri: &Uri,
platform: Option<Platform>,
output: W,
) -> crate::Result<()>
where
W: AsyncWrite + Unpin + Send + 'static,
{
let tmp_dir = tempdir().context(error::TempSnafu)?;
tokio::fs::write(
tmp_dir.path().join("oci-layout"),
r#"{ "imageLayoutVersion": "1.0.0" }"#,
)
.await
.context(error::FileSnafu)?;
let blob_dir = tmp_dir.path().join("blobs/sha256");
create_dir_all(&blob_dir)
.await
.context(error::DirectorySnafu)?;
let mut index = self.clone();
if let Some(platform) = platform {
index.manifests = index
.manifests
.iter()
.filter(|x| x.platform() == Some(platform.clone()))
.cloned()
.collect::<Vec<Layer>>();
if index.manifests.is_empty() {
return error::IndexNoPlatformSnafu { platform }.fail();
}
}
let index_content = serde_json::to_string(&index).context(error::SerializeSnafu)?;
tokio::fs::write(tmp_dir.path().join("index.json"), &index_content)
.await
.context(error::FileSnafu)?;
for manifest in index.manifests.iter() {
let image_uri = Uri::builder()
.registry(uri.registry().clone())
.repository(uri.repository())
.reference(Reference::from_str(manifest.digest())?)
.build();
let image = Image::fetch(&image_uri, manifest.platform().clone()).await?;
let manifest_bytes = serde_json::to_string(&image).context(error::SerializeSnafu)?;
tokio::fs::write(
blob_dir.join(manifest.digest().strip_prefix("sha256:").unwrap()),
&manifest_bytes,
)
.await
.context(error::FileSnafu)?;
let mut config_reader = image.config().open(uri).await?;
let mut config_file = File::create(
blob_dir.join(image.config().digest().strip_prefix("sha256:").unwrap()),
)
.await
.context(error::FileSnafu)?;
Layer::copy(&mut config_reader, &mut config_file, image.config().size()).await?;
let mut tasks: Vec<JoinHandle<crate::Result<()>>> = Vec::new();
for layer in image.layers().iter() {
let layer = layer.clone();
let uri = uri.clone();
let blob_dir = blob_dir.clone();
tasks.push(tokio::spawn(async move {
let mut reader = layer.open(&uri).await?;
let mut blob_file = File::create(
blob_dir.join(layer.digest().strip_prefix("sha256:").unwrap()),
)
.await
.context(error::FileSnafu)?;
Layer::copy(&mut reader, &mut blob_file, layer.size()).await?;
Ok(())
}));
}
for result in join_all(tasks).await {
let result = result.context(error::LayerWaitSnafu)?;
result?;
}
}
let mut archive = ArchiveBuilder::new(output);
archive
.append_dir_all(".", tmp_dir.path().to_path_buf())
.await
.context(error::ArchiveSnafu)?;
archive.finish().await.context(error::ArchiveSnafu)?;
Ok(())
}
#[cfg(feature = "progress")]
pub async fn to_oci_progress<W>(
&self,
uri: &Uri,
platform: Option<Platform>,
output: W,
multi: &mut MultiProgress,
) -> crate::Result<()>
where
W: AsyncWrite + Unpin + Send + 'static,
{
let tmp_dir = tempdir().context(error::TempSnafu)?;
tokio::fs::write(
tmp_dir.path().join("oci-layout"),
r#"{ "imageLayoutVersion": "1.0.0" }"#,
)
.await
.context(error::FileSnafu)?;
let blob_dir = tmp_dir.path().join("blobs/sha256");
create_dir_all(&blob_dir)
.await
.context(error::DirectorySnafu)?;
let mut index = self.clone();
if let Some(platform) = platform {
index.manifests = index
.manifests
.iter()
.filter(|x| x.platform() == Some(platform.clone()))
.cloned()
.collect::<Vec<Layer>>();
if index.manifests.is_empty() {
return error::IndexNoPlatformSnafu { platform }.fail();
}
}
let index_content = serde_json::to_string(&index).context(error::SerializeSnafu)?;
tokio::fs::write(tmp_dir.path().join("index.json"), &index_content)
.await
.context(error::FileSnafu)?;
for manifest in index.manifests.iter() {
let image_uri = Uri::builder()
.registry(uri.registry().clone())
.repository(uri.repository())
.reference(Reference::from_str(manifest.digest())?)
.build();
let image = Image::fetch(&image_uri, manifest.platform().clone()).await?;
let manifest_bytes = serde_json::to_string(&image).context(error::SerializeSnafu)?;
tokio::fs::write(
blob_dir.join(manifest.digest().strip_prefix("sha256:").unwrap()),
&manifest_bytes,
)
.await
.context(error::FileSnafu)?;
let mut config_reader = image.config().open_progress(uri, multi).await?;
let mut config_file = File::create(
blob_dir.join(image.config().digest().strip_prefix("sha256:").unwrap()),
)
.await
.context(error::FileSnafu)?;
Layer::copy(&mut config_reader, &mut config_file, image.config().size()).await?;
let mut tasks: Vec<JoinHandle<crate::Result<()>>> = Vec::new();
for layer in image.layers().iter() {
let layer = layer.clone();
let uri = uri.clone();
let mut multi = multi.clone();
let blob_dir = blob_dir.clone();
tasks.push(tokio::spawn(async move {
let mut reader = layer.open_progress(&uri, &mut multi).await?;
let mut blob_file = File::create(
blob_dir.join(layer.digest().strip_prefix("sha256:").unwrap()),
)
.await
.context(error::FileSnafu)?;
Layer::copy(&mut reader, &mut blob_file, layer.size()).await?;
Ok(())
}));
}
for result in join_all(tasks).await {
let result = result.context(error::LayerWaitSnafu)?;
result?;
}
}
let mut archive = ArchiveBuilder::new(output);
archive
.append_dir_all(".", tmp_dir.path().to_path_buf())
.await
.context(error::ArchiveSnafu)?;
archive.finish().await.context(error::ArchiveSnafu)?;
Ok(())
}
}