1use std::sync::Arc;
8
9use anyhow::{Result, bail};
10use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder};
11use containers_image_proxy::oci_spec::image::MediaType;
12use tokio::io::{AsyncRead, AsyncWriteExt, BufReader};
13
14use composefs::fsverity::FsVerityHashValue;
15use composefs::repository::{ObjectStoreMethod, Repository};
16use composefs::shared_internals::IO_BUF_CAPACITY;
17
18use crate::skopeo::TAR_LAYER_CONTENT_TYPE;
19use crate::tar::split_async;
20
21pub fn is_tar_media_type(media_type: &MediaType) -> bool {
23 matches!(
24 media_type,
25 MediaType::ImageLayer
26 | MediaType::ImageLayerGzip
27 | MediaType::ImageLayerZstd
28 | MediaType::ImageLayerNonDistributable
29 | MediaType::ImageLayerNonDistributableGzip
30 | MediaType::ImageLayerNonDistributableZstd
31 )
32}
33
34pub fn decompress_async<'a, R>(
40 reader: R,
41 media_type: &MediaType,
42) -> Result<Box<dyn AsyncRead + Unpin + Send + 'a>>
43where
44 R: AsyncRead + Unpin + Send + 'a,
45{
46 let buf = BufReader::new(reader);
47 let reader: Box<dyn AsyncRead + Unpin + Send> = match media_type {
48 MediaType::ImageLayer | MediaType::ImageLayerNonDistributable => {
49 Box::new(BufReader::with_capacity(IO_BUF_CAPACITY, buf))
50 }
51 MediaType::ImageLayerGzip | MediaType::ImageLayerNonDistributableGzip => Box::new(
52 BufReader::with_capacity(IO_BUF_CAPACITY, GzipDecoder::new(buf)),
53 ),
54 MediaType::ImageLayerZstd | MediaType::ImageLayerNonDistributableZstd => Box::new(
55 BufReader::with_capacity(IO_BUF_CAPACITY, ZstdDecoder::new(buf)),
56 ),
57 _ => bail!("Unsupported layer media type for decompression: {media_type}"),
58 };
59 Ok(reader)
60}
61
62pub async fn import_tar_async<ObjectID, R>(
67 repo: Arc<Repository<ObjectID>>,
68 reader: R,
69) -> Result<(ObjectID, crate::ImportStats)>
70where
71 ObjectID: FsVerityHashValue,
72 R: AsyncRead + Unpin + Send,
73{
74 split_async(reader, repo, TAR_LAYER_CONTENT_TYPE).await
75}
76
77pub async fn store_blob_async<ObjectID, R>(
85 repo: &Repository<ObjectID>,
86 mut reader: R,
87) -> Result<(ObjectID, u64, ObjectStoreMethod)>
88where
89 ObjectID: FsVerityHashValue,
90 R: AsyncRead + Unpin,
91{
92 let tmpfile = repo.create_object_tmpfile()?;
93 let mut writer = tokio::fs::File::from(std::fs::File::from(tmpfile));
94 let size = tokio::io::copy(&mut reader, &mut writer).await?;
95 writer.flush().await?;
96 let tmpfile = writer.into_std().await;
97 let (object_id, method) = repo.finalize_object_tmpfile(tmpfile, size)?;
98 Ok((object_id, size, method))
99}