use std::cmp::Reverse;
use std::collections::HashMap;
use std::io::Read;
use std::path::Path;
use std::sync::Arc;
use std::thread::available_parallelism;
use anyhow::{Context, Result};
use cap_std_ext::cap_std;
use containers_image_proxy::oci_spec::image::{Descriptor, Digest as OciDigest, MediaType};
use fn_error_context::context;
use ocidir::{OciDir, ResolvedManifest};
use tokio::sync::Semaphore;
use tokio::task::JoinSet;
use tracing::debug;
use composefs::fsverity::FsVerityHashValue;
use composefs::repository::{ObjectStoreMethod, Repository};
use crate::layer::{decompress_async, import_tar_async, is_tar_media_type, store_blob_async};
use crate::oci_image::manifest_identifier;
use crate::progress::{ComponentId, ProgressEvent, ProgressRead, ProgressUnit, SharedReporter};
use crate::skopeo::OCI_BLOB_CONTENT_TYPE;
use crate::skopeo::{OCI_CONFIG_CONTENT_TYPE, OCI_MANIFEST_CONTENT_TYPE};
use crate::{ImportStats, config_identifier, layer_identifier};
use crate::skopeo::PullResult;
pub(crate) fn parse_oci_layout_ref(imgref: &str) -> (&str, Option<&str>) {
let Some((before_colon, tag)) = imgref.rsplit_once(':') else {
return (imgref, None);
};
if tag.contains('/') {
(imgref, None)
} else {
(before_colon, Some(tag))
}
}
fn resolve_manifest(ocidir: &OciDir, tag: Option<&str>) -> Result<ResolvedManifest> {
ocidir
.open_image_this_platform(tag)
.context("Resolving manifest for platform")
}
#[context("Importing OCI layout from {}", layout_path.display())]
pub async fn import_oci_layout<ObjectID: FsVerityHashValue>(
repo: &Arc<Repository<ObjectID>>,
layout_path: &Path,
layout_tag: Option<&str>,
reporter: SharedReporter,
) -> Result<(PullResult<ObjectID>, ImportStats)> {
repo.ensure_writable()?;
let dir = cap_std::fs::Dir::open_ambient_dir(layout_path, cap_std::ambient_authority())
.with_context(|| format!("Opening OCI layout directory {}", layout_path.display()))?;
let ocidir = OciDir::open(dir).context("Opening OCI directory")?;
if let Ok(index) = ocidir.read_index()
&& index.manifests().len() == 1
{
let desc = &index.manifests()[0];
let mut manifest_data = Vec::new();
ocidir.read_blob(desc)?.read_to_end(&mut manifest_data)?;
let manifest = containers_image_proxy::oci_spec::image::ImageManifest::from_reader(
&manifest_data[..],
)?;
if crate::delta::is_delta_artifact(&manifest) {
let blob_reader = Arc::new(OciDirBlobReader(ocidir));
let (result, stats) =
crate::delta::import_delta(repo, &manifest, blob_reader, &reporter, None).await?;
return Ok((result, stats));
}
}
let resolved = resolve_manifest(&ocidir, layout_tag)?;
let manifest = resolved.manifest;
let manifest_descriptor = &resolved.manifest_descriptor;
let manifest_digest = manifest_descriptor.digest().clone();
let config_descriptor = manifest.config();
let layers = manifest.layers();
reporter.report(ProgressEvent::Message(format!(
"Importing {} layers from OCI layout",
layers.len()
)));
let (config_digest, config_verity, layer_refs, stats) =
import_config_and_layers(repo, &ocidir, layers, config_descriptor, &reporter)
.await
.with_context(|| format!("Failed to import config {}", config_descriptor.digest()))?;
reporter.report(ProgressEvent::Message("Storing manifest".to_string()));
let manifest_content_id = manifest_identifier(&manifest_digest);
let manifest_verity = if let Some(verity) = repo.has_stream(&manifest_content_id)? {
debug!("Already have manifest {manifest_digest}");
verity
} else {
debug!("Storing manifest {manifest_digest}");
let mut splitstream = repo.create_stream(OCI_MANIFEST_CONTENT_TYPE)?;
let config_key = format!("config:{}", config_descriptor.digest());
splitstream.add_named_stream_ref(&config_key, &config_verity);
for (diff_id, verity) in &layer_refs {
splitstream.add_named_stream_ref(diff_id.as_ref(), verity);
}
let mut raw_manifest = Vec::with_capacity(manifest_descriptor.size() as usize);
ocidir
.read_blob(manifest_descriptor)
.context("Reading raw manifest bytes")?
.read_to_end(&mut raw_manifest)?;
splitstream.write_external(&raw_manifest)?;
repo.write_stream(splitstream, &manifest_content_id, None)?
};
Ok((
PullResult {
manifest_digest,
manifest_verity,
config_digest,
config_verity,
},
stats,
))
}
async fn import_config_and_layers<ObjectID: FsVerityHashValue>(
repo: &Arc<Repository<ObjectID>>,
ocidir: &OciDir,
manifest_layers: &[Descriptor],
config_descriptor: &Descriptor,
reporter: &SharedReporter,
) -> Result<(OciDigest, ObjectID, Vec<(OciDigest, ObjectID)>, ImportStats)> {
let config_digest: OciDigest = config_descriptor.digest().clone();
let content_id = config_identifier(&config_digest);
if let Some(config_id) = repo.has_stream(&content_id)? {
debug!("Already have container config {config_digest}");
let (data, named_refs) = crate::oci_image::read_external_splitstream(
repo,
&content_id,
Some(&config_id),
Some(OCI_CONFIG_CONTENT_TYPE),
)?;
let named_refs_map: HashMap<&str, ObjectID> = named_refs
.iter()
.map(|(k, v)| (k.as_ref(), v.clone()))
.collect();
let diff_ids = crate::extract_diff_ids(
config_descriptor.media_type(),
data.as_slice(),
manifest_layers,
)?;
let layer_refs: Vec<(OciDigest, ObjectID)> = diff_ids
.into_iter()
.map(|diff_id| {
let verity = named_refs_map
.get(diff_id.as_ref())
.with_context(|| format!("missing layer verity for diff_id {diff_id}"))?;
Ok((diff_id, verity.clone()))
})
.collect::<Result<_>>()?;
anyhow::ensure!(
layer_refs.len() == manifest_layers.len(),
"expected {} layer refs but got {}",
manifest_layers.len(),
layer_refs.len()
);
for (diff_id, _) in &layer_refs {
reporter.report(ProgressEvent::Skipped {
id: ComponentId::from(diff_id.to_string()),
});
}
return Ok((config_digest, config_id, layer_refs, ImportStats::default()));
}
debug!("Reading config {config_digest}");
let mut raw_config = Vec::with_capacity(config_descriptor.size() as usize);
ocidir
.read_blob(config_descriptor)
.context("Reading config blob")?
.read_to_end(&mut raw_config)?;
let diff_ids = crate::extract_diff_ids(
config_descriptor.media_type(),
raw_config.as_slice(),
manifest_layers,
)?;
let mut layers: Vec<_> = manifest_layers.iter().zip(&diff_ids).collect();
layers.sort_by_key(|(desc, _)| Reverse(desc.size()));
let threads = available_parallelism()?;
let sem = Arc::new(Semaphore::new(threads.into()));
let mut layer_tasks = JoinSet::new();
for (idx, (descriptor, diff_id)) in layers.iter().enumerate() {
let diff_id = (*diff_id).clone();
let repo = Arc::clone(repo);
let permit = Arc::clone(&sem).acquire_owned().await?;
let reporter = Arc::clone(reporter);
let layer_file = ocidir
.read_blob(descriptor)
.with_context(|| format!("Opening layer blob {}", descriptor.digest()))?;
let media_type = descriptor.media_type().clone();
let layer_size = descriptor.size();
layer_tasks.spawn(async move {
let _permit = permit;
let (verity, layer_stats) = import_layer_from_file(
&repo,
&diff_id,
layer_file,
&media_type,
layer_size,
&reporter,
)
.await?;
anyhow::Ok((idx, diff_id, verity, layer_stats))
});
}
let mut verity_map: HashMap<OciDigest, ObjectID> = HashMap::new();
let mut stats = ImportStats::default();
for result in layer_tasks.join_all().await {
let (_, diff_id, verity, layer_stats) = result?;
verity_map.insert(diff_id, verity);
stats.merge(&layer_stats);
}
let layer_refs: Vec<(OciDigest, ObjectID)> = diff_ids
.into_iter()
.map(|diff_id| {
let verity = verity_map
.get(&diff_id)
.with_context(|| format!("missing layer verity for diff_id {diff_id}"))?;
Ok((diff_id, verity.clone()))
})
.collect::<Result<_>>()?;
anyhow::ensure!(
layer_refs.len() == manifest_layers.len(),
"expected {} layer refs but got {}",
manifest_layers.len(),
layer_refs.len()
);
let mut splitstream = repo.create_stream(OCI_CONFIG_CONTENT_TYPE)?;
for (diff_id, verity) in &layer_refs {
splitstream.add_named_stream_ref(diff_id.as_ref(), verity);
}
splitstream.write_external(&raw_config)?;
let config_id = repo.write_stream(splitstream, &content_id, None)?;
Ok((config_digest, config_id, layer_refs, stats))
}
async fn import_layer_from_file<ObjectID: FsVerityHashValue>(
repo: &Arc<Repository<ObjectID>>,
diff_id: &OciDigest,
layer_file: std::fs::File,
media_type: &MediaType,
layer_size: u64,
reporter: &SharedReporter,
) -> Result<(ObjectID, ImportStats)> {
let content_id = layer_identifier(diff_id);
let id = ComponentId::from(diff_id.to_string());
if let Some(layer_id) = repo.has_stream(&content_id)? {
debug!("Already have layer {diff_id}");
reporter.report(ProgressEvent::Skipped { id });
return Ok((layer_id, ImportStats::default()));
}
debug!("Importing layer {diff_id}");
reporter.report(ProgressEvent::Started {
id: id.clone(),
total: Some(layer_size),
unit: ProgressUnit::Bytes,
});
let (async_file, progress_driver) = ProgressRead::new(
tokio::fs::File::from_std(layer_file),
Arc::clone(reporter),
id.clone(),
Some(layer_size),
);
let (object_id, layer_stats) = if is_tar_media_type(media_type) {
let reader = decompress_async(async_file, media_type)?;
let (result, ()) = tokio::join!(import_tar_async(repo.clone(), reader), progress_driver);
result?
} else {
let (store_result, ()) = tokio::join!(store_blob_async(repo, async_file), progress_driver);
let (object_id, size, method) = store_result?;
let mut stats = ImportStats::default();
match method {
ObjectStoreMethod::Copied => {
stats.objects_copied += 1;
stats.bytes_copied += size;
}
ObjectStoreMethod::Reflinked => {
stats.objects_reflinked += 1;
stats.bytes_reflinked += size;
}
ObjectStoreMethod::Hardlinked => {
stats.objects_hardlinked += 1;
stats.bytes_hardlinked += size;
}
ObjectStoreMethod::AlreadyPresent => {
stats.objects_already_present += 1;
}
}
let mut stream = repo.create_stream(OCI_BLOB_CONTENT_TYPE)?;
stream.add_external_size(size);
stream.write_reference(object_id)?;
let stream_id = repo.write_stream(stream, &content_id, None)?;
reporter.report(ProgressEvent::Done {
id,
transferred: size,
});
return Ok((stream_id, stats));
};
repo.register_stream(&object_id, &content_id, None).await?;
reporter.report(ProgressEvent::Done {
id,
transferred: layer_size,
});
Ok((object_id, layer_stats))
}
struct OciDirBlobReader(OciDir);
impl crate::delta::DeltaBlobReader for OciDirBlobReader {
fn open_blob(
&self,
digest: &OciDigest,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<std::fs::File>> + Send + '_>>
{
let blob_path = format!("blobs/{}/{}", digest.algorithm(), digest.digest());
Box::pin(std::future::ready(
self.0
.dir()
.open(&blob_path)
.map(|f| f.into_std())
.with_context(|| format!("Opening blob {digest} from OCI layout")),
))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::progress::NullReporter;
#[test]
fn test_parse_oci_layout_ref() {
let cases: &[(&str, (&str, Option<&str>))] = &[
("/path/to/oci", ("/path/to/oci", None)),
("./local/oci", ("./local/oci", None)),
("ocidir", ("ocidir", None)),
("/path/to/oci:latest", ("/path/to/oci", Some("latest"))),
("/path/to/oci:v1.0.0", ("/path/to/oci", Some("v1.0.0"))),
("./local/oci:mytag", ("./local/oci", Some("mytag"))),
("ocidir:latest", ("ocidir", Some("latest"))),
("C:/path/to/oci", ("C:/path/to/oci", None)),
("C:/path/to/oci:latest", ("C:/path/to/oci", Some("latest"))),
(
"/path/to/oci:tag:with:colons",
("/path/to/oci:tag:with", Some("colons")),
),
("/path/to/oci:", ("/path/to/oci", Some(""))),
("ocidir:", ("ocidir", Some(""))),
("/path:middle/to/oci", ("/path:middle/to/oci", None)),
(
"/path:middle/to/oci:tag",
("/path:middle/to/oci", Some("tag")),
),
];
for (input, expected) in cases {
assert_eq!(parse_oci_layout_ref(input), *expected, "input: {input}");
}
}
#[tokio::test]
async fn test_wrong_platform_rejected() {
use cap_std_ext::cap_std;
use composefs::fsverity::Sha256HashValue;
use containers_image_proxy::oci_spec::image::{
Arch, ConfigBuilder, ImageConfigurationBuilder, Os, PlatformBuilder, RootFsBuilder,
};
let tempdir = tempfile::tempdir().unwrap();
let layout_path = tempdir.path();
let dir =
cap_std::fs::Dir::open_ambient_dir(layout_path, cap_std::ambient_authority()).unwrap();
let ocidir = OciDir::ensure(dir).unwrap();
let foreign_arch = if Arch::default() == Arch::Amd64 {
"s390x"
} else {
"amd64"
};
let manifest = ocidir.new_empty_manifest().unwrap().build().unwrap();
let config = ImageConfigurationBuilder::default()
.architecture(foreign_arch)
.os("linux")
.rootfs(
RootFsBuilder::default()
.typ("layers")
.diff_ids(Vec::<String>::new())
.build()
.unwrap(),
)
.config(ConfigBuilder::default().build().unwrap())
.build()
.unwrap();
let platform = PlatformBuilder::default()
.architecture(foreign_arch)
.os(Os::default())
.build()
.unwrap();
ocidir
.insert_manifest_and_config(manifest, config, None, platform)
.unwrap();
let repo_dir = tempfile::tempdir().unwrap();
let repo_path = repo_dir.path().join("repo");
let (repo, _) = composefs::repository::Repository::<Sha256HashValue>::init_path(
rustix::fs::CWD,
&repo_path,
composefs::repository::RepositoryConfig::default().set_insecure(),
)
.unwrap();
let repo = std::sync::Arc::new(repo);
let reporter = std::sync::Arc::new(NullReporter);
let result = import_oci_layout(&repo, layout_path, None, reporter).await;
let err = result.expect_err("should fail with no matching platform");
let err_msg = format!("{err:#}");
assert!(
err_msg.contains("No manifest found for platform"),
"unexpected error: {err_msg}"
);
}
}