Skip to main content

composefs_oci/
layer.rs

1//! Shared layer import logic for OCI container images.
2//!
3//! This module provides common functionality for importing OCI image layers
4//! into a composefs repository, shared between the skopeo proxy path and
5//! direct OCI layout import.
6
7use std::sync::Arc;
8
9use anyhow::{Result, bail};
10use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder};
11use containers_image_proxy::oci_spec::image::MediaType;
12use tokio::io::{AsyncRead, AsyncWriteExt, BufReader};
13
14use composefs::fsverity::FsVerityHashValue;
15use composefs::repository::{ObjectStoreMethod, Repository};
16use composefs::shared_internals::IO_BUF_CAPACITY;
17
18use crate::skopeo::TAR_LAYER_CONTENT_TYPE;
19use crate::tar::split_async;
20
21/// Check if a media type represents a tar-based layer.
22pub fn is_tar_media_type(media_type: &MediaType) -> bool {
23    matches!(
24        media_type,
25        MediaType::ImageLayer
26            | MediaType::ImageLayerGzip
27            | MediaType::ImageLayerZstd
28            | MediaType::ImageLayerNonDistributable
29            | MediaType::ImageLayerNonDistributableGzip
30            | MediaType::ImageLayerNonDistributableZstd
31    )
32}
33
34/// Wrap an async reader with the appropriate decompressor for the media type.
35///
36/// Returns a boxed reader that decompresses the stream if needed.
37/// The output is `AsyncRead` (not `AsyncBufRead`) because `split_async`
38/// does its own buffering via `BytesMut`.
39pub fn decompress_async<'a, R>(
40    reader: R,
41    media_type: &MediaType,
42) -> Result<Box<dyn AsyncRead + Unpin + Send + 'a>>
43where
44    R: AsyncRead + Unpin + Send + 'a,
45{
46    let buf = BufReader::new(reader);
47    let reader: Box<dyn AsyncRead + Unpin + Send> = match media_type {
48        MediaType::ImageLayer | MediaType::ImageLayerNonDistributable => {
49            Box::new(BufReader::with_capacity(IO_BUF_CAPACITY, buf))
50        }
51        MediaType::ImageLayerGzip | MediaType::ImageLayerNonDistributableGzip => Box::new(
52            BufReader::with_capacity(IO_BUF_CAPACITY, GzipDecoder::new(buf)),
53        ),
54        MediaType::ImageLayerZstd | MediaType::ImageLayerNonDistributableZstd => Box::new(
55            BufReader::with_capacity(IO_BUF_CAPACITY, ZstdDecoder::new(buf)),
56        ),
57        _ => bail!("Unsupported layer media type for decompression: {media_type}"),
58    };
59    Ok(reader)
60}
61
62/// Import a tar layer from an async reader into the repository.
63///
64/// The reader should already be decompressed (use `decompress_async` first).
65/// Returns the fs-verity object ID and import stats of the imported splitstream.
66pub async fn import_tar_async<ObjectID, R>(
67    repo: Arc<Repository<ObjectID>>,
68    reader: R,
69) -> Result<(ObjectID, crate::ImportStats)>
70where
71    ObjectID: FsVerityHashValue,
72    R: AsyncRead + Unpin + Send,
73{
74    split_async(reader, repo, TAR_LAYER_CONTENT_TYPE).await
75}
76
77/// Store raw bytes from an async reader as a repository object.
78///
79/// Streams the raw bytes into a repository object without creating a splitstream.
80/// Use this for non-tar blobs (OCI artifacts) where the caller will create
81/// the splitstream wrapper.
82///
83/// Returns (object_id, size, store_method) of the stored object.
84pub async fn store_blob_async<ObjectID, R>(
85    repo: &Repository<ObjectID>,
86    mut reader: R,
87) -> Result<(ObjectID, u64, ObjectStoreMethod)>
88where
89    ObjectID: FsVerityHashValue,
90    R: AsyncRead + Unpin,
91{
92    let tmpfile = repo.create_object_tmpfile()?;
93    let mut writer = tokio::fs::File::from(std::fs::File::from(tmpfile));
94    let size = tokio::io::copy(&mut reader, &mut writer).await?;
95    writer.flush().await?;
96    let tmpfile = writer.into_std().await;
97    let (object_id, method) = repo.finalize_object_tmpfile(tmpfile, size)?;
98    Ok((object_id, size, method))
99}