Skip to main content

composefs_oci/
lib.rs

1use std::{cmp::Reverse, process::Command, thread::available_parallelism};
2
3pub mod image;
4pub mod tar;
5
6use std::{collections::HashMap, io::Read, iter::zip, sync::Arc};
7
8use anyhow::{bail, ensure, Context, Result};
9use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder};
10use containers_image_proxy::{ImageProxy, ImageProxyConfig, OpenedImage};
11use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
12use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest, MediaType};
13use sha2::{Digest, Sha256};
14use tokio::{io::AsyncReadExt, sync::Semaphore};
15
16use composefs::{
17    fsverity::FsVerityHashValue,
18    repository::Repository,
19    splitstream::DigestMap,
20    util::{parse_sha256, Sha256Digest},
21};
22
23use crate::tar::{get_entry, split_async};
24
25pub fn import_layer<ObjectID: FsVerityHashValue>(
26    repo: &Arc<Repository<ObjectID>>,
27    sha256: &Sha256Digest,
28    name: Option<&str>,
29    tar_stream: &mut impl Read,
30) -> Result<ObjectID> {
31    repo.ensure_stream(sha256, |writer| tar::split(tar_stream, writer), name)
32}
33
34pub fn ls_layer<ObjectID: FsVerityHashValue>(
35    repo: &Repository<ObjectID>,
36    name: &str,
37) -> Result<()> {
38    let mut split_stream = repo.open_stream(name, None)?;
39
40    while let Some(entry) = get_entry(&mut split_stream)? {
41        println!("{entry}");
42    }
43
44    Ok(())
45}
46
47struct ImageOp<ObjectID: FsVerityHashValue> {
48    repo: Arc<Repository<ObjectID>>,
49    proxy: ImageProxy,
50    img: OpenedImage,
51    progress: MultiProgress,
52}
53
54fn sha256_from_descriptor(descriptor: &Descriptor) -> Result<Sha256Digest> {
55    let Some(digest) = descriptor.as_digest_sha256() else {
56        bail!("Descriptor in oci config is not sha256");
57    };
58    Ok(parse_sha256(digest)?)
59}
60
61fn sha256_from_digest(digest: &str) -> Result<Sha256Digest> {
62    match digest.strip_prefix("sha256:") {
63        Some(rest) => Ok(parse_sha256(rest)?),
64        None => bail!("Manifest has non-sha256 digest"),
65    }
66}
67
68type ContentAndVerity<ObjectID> = (Sha256Digest, ObjectID);
69
70impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
71    async fn new(repo: &Arc<Repository<ObjectID>>, imgref: &str) -> Result<Self> {
72        // See https://github.com/containers/skopeo/issues/2563
73        let skopeo_cmd = if imgref.starts_with("containers-storage:") {
74            let mut cmd = Command::new("podman");
75            cmd.args(["unshare", "skopeo"]);
76            Some(cmd)
77        } else {
78            None
79        };
80
81        let config = ImageProxyConfig {
82            skopeo_cmd,
83            // auth_anonymous: true, debug: true, insecure_skip_tls_verification: Some(true),
84            ..ImageProxyConfig::default()
85        };
86        let proxy = containers_image_proxy::ImageProxy::new_with_config(config).await?;
87        let img = proxy.open_image(imgref).await.context("Opening image")?;
88        let progress = MultiProgress::new();
89        Ok(ImageOp {
90            repo: Arc::clone(repo),
91            proxy,
92            img,
93            progress,
94        })
95    }
96
97    pub async fn ensure_layer(
98        &self,
99        layer_sha256: Sha256Digest,
100        descriptor: &Descriptor,
101    ) -> Result<ObjectID> {
102        // We need to use the per_manifest descriptor to download the compressed layer but it gets
103        // stored in the repository via the per_config descriptor.  Our return value is the
104        // fsverity digest for the corresponding splitstream.
105
106        if let Some(layer_id) = self.repo.check_stream(&layer_sha256)? {
107            self.progress
108                .println(format!("Already have layer {}", hex::encode(layer_sha256)))?;
109            Ok(layer_id)
110        } else {
111            // Otherwise, we need to fetch it...
112            let (blob_reader, driver) = self.proxy.get_descriptor(&self.img, descriptor).await?;
113
114            // See https://github.com/containers/containers-image-proxy-rs/issues/71
115            let blob_reader = blob_reader.take(descriptor.size());
116
117            let bar = self.progress.add(ProgressBar::new(descriptor.size()));
118            bar.set_style(ProgressStyle::with_template("[eta {eta}] {bar:40.cyan/blue} {decimal_bytes:>7}/{decimal_total_bytes:7} {msg}")
119                .unwrap()
120                .progress_chars("##-"));
121            let progress = bar.wrap_async_read(blob_reader);
122            self.progress
123                .println(format!("Fetching layer {}", hex::encode(layer_sha256)))?;
124
125            let mut splitstream = self.repo.create_stream(Some(layer_sha256), None);
126            match descriptor.media_type() {
127                MediaType::ImageLayer => {
128                    split_async(progress, &mut splitstream).await?;
129                }
130                MediaType::ImageLayerGzip => {
131                    split_async(GzipDecoder::new(progress), &mut splitstream).await?;
132                }
133                MediaType::ImageLayerZstd => {
134                    split_async(ZstdDecoder::new(progress), &mut splitstream).await?;
135                }
136                other => bail!("Unsupported layer media type {:?}", other),
137            };
138            let layer_id = self.repo.write_stream(splitstream, None)?;
139
140            // We intentionally explicitly ignore this, even though we're supposed to check it.
141            // See https://github.com/containers/containers-image-proxy-rs/issues/80 for discussion
142            // about why.  Note: we only care about the uncompressed layer tar, and we checksum it
143            // ourselves.
144            drop(driver);
145
146            Ok(layer_id)
147        }
148    }
149
150    pub async fn ensure_config(
151        self: &Arc<Self>,
152        manifest_layers: &[Descriptor],
153        descriptor: &Descriptor,
154    ) -> Result<ContentAndVerity<ObjectID>> {
155        let config_sha256 = sha256_from_descriptor(descriptor)?;
156        if let Some(config_id) = self.repo.check_stream(&config_sha256)? {
157            // We already got this config?  Nice.
158            self.progress.println(format!(
159                "Already have container config {}",
160                hex::encode(config_sha256)
161            ))?;
162            Ok((config_sha256, config_id))
163        } else {
164            // We need to add the config to the repo.  We need to parse the config and make sure we
165            // have all of the layers first.
166            //
167            self.progress
168                .println(format!("Fetching config {}", hex::encode(config_sha256)))?;
169
170            let (mut config, driver) = self.proxy.get_descriptor(&self.img, descriptor).await?;
171            let config = async move {
172                let mut s = Vec::new();
173                config.read_to_end(&mut s).await?;
174                anyhow::Ok(s)
175            };
176            let (config, driver) = tokio::join!(config, driver);
177            let _: () = driver?;
178            let raw_config = config?;
179            let config = ImageConfiguration::from_reader(&raw_config[..])?;
180
181            // We want to sort the layers based on size so we can get started on the big layers
182            // first.  The last thing we want is to start on the biggest layer right at the end.
183            let mut layers: Vec<_> = zip(manifest_layers, config.rootfs().diff_ids()).collect();
184            layers.sort_by_key(|(mld, ..)| Reverse(mld.size()));
185
186            // Bound the number of tasks to the available parallelism.
187            let threads = available_parallelism()?;
188            let sem = Arc::new(Semaphore::new(threads.into()));
189            let mut entries = vec![];
190            for (mld, diff_id) in layers {
191                let self_ = Arc::clone(self);
192                let permit = Arc::clone(&sem).acquire_owned().await?;
193                let layer_sha256 = sha256_from_digest(diff_id)?;
194                let descriptor = mld.clone();
195                let future = tokio::spawn(async move {
196                    let _permit = permit;
197                    self_.ensure_layer(layer_sha256, &descriptor).await
198                });
199                entries.push((layer_sha256, future));
200            }
201
202            // Collect the results.
203            let mut config_maps = DigestMap::new();
204            for (layer_sha256, future) in entries {
205                config_maps.insert(&layer_sha256, &future.await??);
206            }
207
208            let mut splitstream = self
209                .repo
210                .create_stream(Some(config_sha256), Some(config_maps));
211            splitstream.write_inline(&raw_config);
212            let config_id = self.repo.write_stream(splitstream, None)?;
213
214            Ok((config_sha256, config_id))
215        }
216    }
217
218    pub async fn pull(self: &Arc<Self>) -> Result<ContentAndVerity<ObjectID>> {
219        let (_manifest_digest, raw_manifest) = self
220            .proxy
221            .fetch_manifest_raw_oci(&self.img)
222            .await
223            .context("Fetching manifest")?;
224
225        // We need to add the manifest to the repo.  We need to parse the manifest and make
226        // sure we have the config first (which will also pull in the layers).
227        let manifest = ImageManifest::from_reader(raw_manifest.as_slice())?;
228        let config_descriptor = manifest.config();
229        let layers = manifest.layers();
230        self.ensure_config(layers, config_descriptor)
231            .await
232            .with_context(|| format!("Failed to pull config {config_descriptor:?}"))
233    }
234}
235
236/// Pull the target image, and add the provided tag. If this is a mountable
237/// image (i.e. not an artifact), it is *not* unpacked by default.
238pub async fn pull(
239    repo: &Arc<Repository<impl FsVerityHashValue>>,
240    imgref: &str,
241    reference: Option<&str>,
242) -> Result<(Sha256Digest, impl FsVerityHashValue)> {
243    let op = Arc::new(ImageOp::new(repo, imgref).await?);
244    let (sha256, id) = op
245        .pull()
246        .await
247        .with_context(|| format!("Unable to pull container image {imgref}"))?;
248
249    if let Some(name) = reference {
250        repo.name_stream(sha256, name)?;
251    }
252    Ok((sha256, id))
253}
254
255pub fn open_config<ObjectID: FsVerityHashValue>(
256    repo: &Repository<ObjectID>,
257    name: &str,
258    verity: Option<&ObjectID>,
259) -> Result<(ImageConfiguration, DigestMap<ObjectID>)> {
260    let id = match verity {
261        Some(id) => id,
262        None => {
263            // take the expensive route
264            let sha256 = parse_sha256(name)
265                .context("Containers must be referred to by sha256 if verity is missing")?;
266            &repo
267                .check_stream(&sha256)?
268                .with_context(|| format!("Object {name} is unknown to us"))?
269        }
270    };
271    let mut stream = repo.open_stream(name, Some(id))?;
272    let config = ImageConfiguration::from_reader(&mut stream)?;
273    Ok((config, stream.refs))
274}
275
276fn hash(bytes: &[u8]) -> Sha256Digest {
277    let mut context = Sha256::new();
278    context.update(bytes);
279    context.finalize().into()
280}
281
282pub fn open_config_shallow<ObjectID: FsVerityHashValue>(
283    repo: &Repository<ObjectID>,
284    name: &str,
285    verity: Option<&ObjectID>,
286) -> Result<ImageConfiguration> {
287    match verity {
288        // with verity deep opens are just as fast as shallow ones
289        Some(id) => Ok(open_config(repo, name, Some(id))?.0),
290        None => {
291            // we need to manually check the content digest
292            let expected_hash = parse_sha256(name)
293                .context("Containers must be referred to by sha256 if verity is missing")?;
294            let mut stream = repo.open_stream(name, None)?;
295            let mut raw_config = vec![];
296            stream.read_to_end(&mut raw_config)?;
297            ensure!(hash(&raw_config) == expected_hash, "Data integrity issue");
298            Ok(ImageConfiguration::from_reader(&mut raw_config.as_slice())?)
299        }
300    }
301}
302
303pub fn write_config<ObjectID: FsVerityHashValue>(
304    repo: &Arc<Repository<ObjectID>>,
305    config: &ImageConfiguration,
306    refs: DigestMap<ObjectID>,
307) -> Result<ContentAndVerity<ObjectID>> {
308    let json = config.to_string()?;
309    let json_bytes = json.as_bytes();
310    let sha256 = hash(json_bytes);
311    let mut stream = repo.create_stream(Some(sha256), Some(refs));
312    stream.write_inline(json_bytes);
313    let id = repo.write_stream(stream, None)?;
314    Ok((sha256, id))
315}
316
317pub fn seal<ObjectID: FsVerityHashValue>(
318    repo: &Arc<Repository<ObjectID>>,
319    config_name: &str,
320    config_verity: Option<&ObjectID>,
321) -> Result<ContentAndVerity<ObjectID>> {
322    let (mut config, refs) = open_config(repo, config_name, config_verity)?;
323    let mut myconfig = config.config().clone().context("no config!")?;
324    let labels = myconfig.labels_mut().get_or_insert_with(HashMap::new);
325    let mut fs = crate::image::create_filesystem(repo, config_name, config_verity)?;
326    let id = fs.compute_image_id();
327    labels.insert("containers.composefs.fsverity".to_string(), id.to_hex());
328    config.set_config(Some(myconfig));
329    write_config(repo, &config, refs)
330}
331
332pub fn mount<ObjectID: FsVerityHashValue>(
333    repo: &Repository<ObjectID>,
334    name: &str,
335    mountpoint: &str,
336    verity: Option<&ObjectID>,
337) -> Result<()> {
338    let config = open_config_shallow(repo, name, verity)?;
339    let Some(id) = config.get_config_annotation("containers.composefs.fsverity") else {
340        bail!("Can only mount sealed containers");
341    };
342    repo.mount(id, mountpoint)
343}
344
345#[cfg(test)]
346mod test {
347    use std::{fmt::Write, io::Read};
348
349    use rustix::fs::CWD;
350    use sha2::{Digest, Sha256};
351
352    use composefs::{fsverity::Sha256HashValue, repository::Repository, test::tempdir};
353
354    use super::*;
355
356    fn append_data(builder: &mut ::tar::Builder<Vec<u8>>, name: &str, size: usize) {
357        let mut header = ::tar::Header::new_ustar();
358        header.set_uid(0);
359        header.set_gid(0);
360        header.set_mode(0o700);
361        header.set_entry_type(::tar::EntryType::Regular);
362        header.set_size(size as u64);
363        builder
364            .append_data(&mut header, name, std::io::repeat(0u8).take(size as u64))
365            .unwrap();
366    }
367
368    fn example_layer() -> Vec<u8> {
369        let mut builder = ::tar::Builder::new(vec![]);
370        append_data(&mut builder, "file0", 0);
371        append_data(&mut builder, "file4095", 4095);
372        append_data(&mut builder, "file4096", 4096);
373        append_data(&mut builder, "file4097", 4097);
374        builder.into_inner().unwrap()
375    }
376
377    #[test]
378    fn test_layer() {
379        let layer = example_layer();
380        let mut context = Sha256::new();
381        context.update(&layer);
382        let layer_id: [u8; 32] = context.finalize().into();
383
384        let repo_dir = tempdir();
385        let repo = Arc::new(Repository::<Sha256HashValue>::open_path(CWD, &repo_dir).unwrap());
386        let id = import_layer(&repo, &layer_id, Some("name"), &mut layer.as_slice()).unwrap();
387
388        let mut dump = String::new();
389        let mut split_stream = repo.open_stream("refs/name", Some(&id)).unwrap();
390        while let Some(entry) = tar::get_entry(&mut split_stream).unwrap() {
391            writeln!(dump, "{entry}").unwrap();
392        }
393        similar_asserts::assert_eq!(dump, "\
394/file0 0 100700 1 0 0 0 0.0 - - -
395/file4095 4095 100700 1 0 0 0 0.0 53/72beb83c78537c8970c8361e3254119fafdf1763854ecd57d3f0fe2da7c719 - 5372beb83c78537c8970c8361e3254119fafdf1763854ecd57d3f0fe2da7c719
396/file4096 4096 100700 1 0 0 0 0.0 ba/bc284ee4ffe7f449377fbf6692715b43aec7bc39c094a95878904d34bac97e - babc284ee4ffe7f449377fbf6692715b43aec7bc39c094a95878904d34bac97e
397/file4097 4097 100700 1 0 0 0 0.0 09/3756e4ea9683329106d4a16982682ed182c14bf076463a9e7f97305cbac743 - 093756e4ea9683329106d4a16982682ed182c14bf076463a9e7f97305cbac743
398");
399    }
400}