1use std::{cmp::Reverse, process::Command, thread::available_parallelism};
2
3pub mod image;
4pub mod tar;
5
6use std::{collections::HashMap, io::Read, iter::zip, sync::Arc};
7
8use anyhow::{bail, ensure, Context, Result};
9use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder};
10use containers_image_proxy::{ImageProxy, ImageProxyConfig, OpenedImage};
11use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
12use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest, MediaType};
13use sha2::{Digest, Sha256};
14use tokio::{io::AsyncReadExt, sync::Semaphore};
15
16use composefs::{
17 fsverity::FsVerityHashValue,
18 repository::Repository,
19 splitstream::DigestMap,
20 util::{parse_sha256, Sha256Digest},
21};
22
23use crate::tar::{get_entry, split_async};
24
25pub fn import_layer<ObjectID: FsVerityHashValue>(
26 repo: &Arc<Repository<ObjectID>>,
27 sha256: &Sha256Digest,
28 name: Option<&str>,
29 tar_stream: &mut impl Read,
30) -> Result<ObjectID> {
31 repo.ensure_stream(sha256, |writer| tar::split(tar_stream, writer), name)
32}
33
34pub fn ls_layer<ObjectID: FsVerityHashValue>(
35 repo: &Repository<ObjectID>,
36 name: &str,
37) -> Result<()> {
38 let mut split_stream = repo.open_stream(name, None)?;
39
40 while let Some(entry) = get_entry(&mut split_stream)? {
41 println!("{entry}");
42 }
43
44 Ok(())
45}
46
47struct ImageOp<ObjectID: FsVerityHashValue> {
48 repo: Arc<Repository<ObjectID>>,
49 proxy: ImageProxy,
50 img: OpenedImage,
51 progress: MultiProgress,
52}
53
54fn sha256_from_descriptor(descriptor: &Descriptor) -> Result<Sha256Digest> {
55 let Some(digest) = descriptor.as_digest_sha256() else {
56 bail!("Descriptor in oci config is not sha256");
57 };
58 Ok(parse_sha256(digest)?)
59}
60
61fn sha256_from_digest(digest: &str) -> Result<Sha256Digest> {
62 match digest.strip_prefix("sha256:") {
63 Some(rest) => Ok(parse_sha256(rest)?),
64 None => bail!("Manifest has non-sha256 digest"),
65 }
66}
67
68type ContentAndVerity<ObjectID> = (Sha256Digest, ObjectID);
69
70impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
71 async fn new(repo: &Arc<Repository<ObjectID>>, imgref: &str) -> Result<Self> {
72 let skopeo_cmd = if imgref.starts_with("containers-storage:") {
74 let mut cmd = Command::new("podman");
75 cmd.args(["unshare", "skopeo"]);
76 Some(cmd)
77 } else {
78 None
79 };
80
81 let config = ImageProxyConfig {
82 skopeo_cmd,
83 ..ImageProxyConfig::default()
85 };
86 let proxy = containers_image_proxy::ImageProxy::new_with_config(config).await?;
87 let img = proxy.open_image(imgref).await.context("Opening image")?;
88 let progress = MultiProgress::new();
89 Ok(ImageOp {
90 repo: Arc::clone(repo),
91 proxy,
92 img,
93 progress,
94 })
95 }
96
97 pub async fn ensure_layer(
98 &self,
99 layer_sha256: Sha256Digest,
100 descriptor: &Descriptor,
101 ) -> Result<ObjectID> {
102 if let Some(layer_id) = self.repo.check_stream(&layer_sha256)? {
107 self.progress
108 .println(format!("Already have layer {}", hex::encode(layer_sha256)))?;
109 Ok(layer_id)
110 } else {
111 let (blob_reader, driver) = self.proxy.get_descriptor(&self.img, descriptor).await?;
113
114 let blob_reader = blob_reader.take(descriptor.size());
116
117 let bar = self.progress.add(ProgressBar::new(descriptor.size()));
118 bar.set_style(ProgressStyle::with_template("[eta {eta}] {bar:40.cyan/blue} {decimal_bytes:>7}/{decimal_total_bytes:7} {msg}")
119 .unwrap()
120 .progress_chars("##-"));
121 let progress = bar.wrap_async_read(blob_reader);
122 self.progress
123 .println(format!("Fetching layer {}", hex::encode(layer_sha256)))?;
124
125 let mut splitstream = self.repo.create_stream(Some(layer_sha256), None);
126 match descriptor.media_type() {
127 MediaType::ImageLayer => {
128 split_async(progress, &mut splitstream).await?;
129 }
130 MediaType::ImageLayerGzip => {
131 split_async(GzipDecoder::new(progress), &mut splitstream).await?;
132 }
133 MediaType::ImageLayerZstd => {
134 split_async(ZstdDecoder::new(progress), &mut splitstream).await?;
135 }
136 other => bail!("Unsupported layer media type {:?}", other),
137 };
138 let layer_id = self.repo.write_stream(splitstream, None)?;
139
140 drop(driver);
145
146 Ok(layer_id)
147 }
148 }
149
150 pub async fn ensure_config(
151 self: &Arc<Self>,
152 manifest_layers: &[Descriptor],
153 descriptor: &Descriptor,
154 ) -> Result<ContentAndVerity<ObjectID>> {
155 let config_sha256 = sha256_from_descriptor(descriptor)?;
156 if let Some(config_id) = self.repo.check_stream(&config_sha256)? {
157 self.progress.println(format!(
159 "Already have container config {}",
160 hex::encode(config_sha256)
161 ))?;
162 Ok((config_sha256, config_id))
163 } else {
164 self.progress
168 .println(format!("Fetching config {}", hex::encode(config_sha256)))?;
169
170 let (mut config, driver) = self.proxy.get_descriptor(&self.img, descriptor).await?;
171 let config = async move {
172 let mut s = Vec::new();
173 config.read_to_end(&mut s).await?;
174 anyhow::Ok(s)
175 };
176 let (config, driver) = tokio::join!(config, driver);
177 let _: () = driver?;
178 let raw_config = config?;
179 let config = ImageConfiguration::from_reader(&raw_config[..])?;
180
181 let mut layers: Vec<_> = zip(manifest_layers, config.rootfs().diff_ids()).collect();
184 layers.sort_by_key(|(mld, ..)| Reverse(mld.size()));
185
186 let threads = available_parallelism()?;
188 let sem = Arc::new(Semaphore::new(threads.into()));
189 let mut entries = vec![];
190 for (mld, diff_id) in layers {
191 let self_ = Arc::clone(self);
192 let permit = Arc::clone(&sem).acquire_owned().await?;
193 let layer_sha256 = sha256_from_digest(diff_id)?;
194 let descriptor = mld.clone();
195 let future = tokio::spawn(async move {
196 let _permit = permit;
197 self_.ensure_layer(layer_sha256, &descriptor).await
198 });
199 entries.push((layer_sha256, future));
200 }
201
202 let mut config_maps = DigestMap::new();
204 for (layer_sha256, future) in entries {
205 config_maps.insert(&layer_sha256, &future.await??);
206 }
207
208 let mut splitstream = self
209 .repo
210 .create_stream(Some(config_sha256), Some(config_maps));
211 splitstream.write_inline(&raw_config);
212 let config_id = self.repo.write_stream(splitstream, None)?;
213
214 Ok((config_sha256, config_id))
215 }
216 }
217
218 pub async fn pull(self: &Arc<Self>) -> Result<ContentAndVerity<ObjectID>> {
219 let (_manifest_digest, raw_manifest) = self
220 .proxy
221 .fetch_manifest_raw_oci(&self.img)
222 .await
223 .context("Fetching manifest")?;
224
225 let manifest = ImageManifest::from_reader(raw_manifest.as_slice())?;
228 let config_descriptor = manifest.config();
229 let layers = manifest.layers();
230 self.ensure_config(layers, config_descriptor)
231 .await
232 .with_context(|| format!("Failed to pull config {config_descriptor:?}"))
233 }
234}
235
236pub async fn pull(
239 repo: &Arc<Repository<impl FsVerityHashValue>>,
240 imgref: &str,
241 reference: Option<&str>,
242) -> Result<(Sha256Digest, impl FsVerityHashValue)> {
243 let op = Arc::new(ImageOp::new(repo, imgref).await?);
244 let (sha256, id) = op
245 .pull()
246 .await
247 .with_context(|| format!("Unable to pull container image {imgref}"))?;
248
249 if let Some(name) = reference {
250 repo.name_stream(sha256, name)?;
251 }
252 Ok((sha256, id))
253}
254
255pub fn open_config<ObjectID: FsVerityHashValue>(
256 repo: &Repository<ObjectID>,
257 name: &str,
258 verity: Option<&ObjectID>,
259) -> Result<(ImageConfiguration, DigestMap<ObjectID>)> {
260 let id = match verity {
261 Some(id) => id,
262 None => {
263 let sha256 = parse_sha256(name)
265 .context("Containers must be referred to by sha256 if verity is missing")?;
266 &repo
267 .check_stream(&sha256)?
268 .with_context(|| format!("Object {name} is unknown to us"))?
269 }
270 };
271 let mut stream = repo.open_stream(name, Some(id))?;
272 let config = ImageConfiguration::from_reader(&mut stream)?;
273 Ok((config, stream.refs))
274}
275
276fn hash(bytes: &[u8]) -> Sha256Digest {
277 let mut context = Sha256::new();
278 context.update(bytes);
279 context.finalize().into()
280}
281
282pub fn open_config_shallow<ObjectID: FsVerityHashValue>(
283 repo: &Repository<ObjectID>,
284 name: &str,
285 verity: Option<&ObjectID>,
286) -> Result<ImageConfiguration> {
287 match verity {
288 Some(id) => Ok(open_config(repo, name, Some(id))?.0),
290 None => {
291 let expected_hash = parse_sha256(name)
293 .context("Containers must be referred to by sha256 if verity is missing")?;
294 let mut stream = repo.open_stream(name, None)?;
295 let mut raw_config = vec![];
296 stream.read_to_end(&mut raw_config)?;
297 ensure!(hash(&raw_config) == expected_hash, "Data integrity issue");
298 Ok(ImageConfiguration::from_reader(&mut raw_config.as_slice())?)
299 }
300 }
301}
302
303pub fn write_config<ObjectID: FsVerityHashValue>(
304 repo: &Arc<Repository<ObjectID>>,
305 config: &ImageConfiguration,
306 refs: DigestMap<ObjectID>,
307) -> Result<ContentAndVerity<ObjectID>> {
308 let json = config.to_string()?;
309 let json_bytes = json.as_bytes();
310 let sha256 = hash(json_bytes);
311 let mut stream = repo.create_stream(Some(sha256), Some(refs));
312 stream.write_inline(json_bytes);
313 let id = repo.write_stream(stream, None)?;
314 Ok((sha256, id))
315}
316
317pub fn seal<ObjectID: FsVerityHashValue>(
318 repo: &Arc<Repository<ObjectID>>,
319 config_name: &str,
320 config_verity: Option<&ObjectID>,
321) -> Result<ContentAndVerity<ObjectID>> {
322 let (mut config, refs) = open_config(repo, config_name, config_verity)?;
323 let mut myconfig = config.config().clone().context("no config!")?;
324 let labels = myconfig.labels_mut().get_or_insert_with(HashMap::new);
325 let mut fs = crate::image::create_filesystem(repo, config_name, config_verity)?;
326 let id = fs.compute_image_id();
327 labels.insert("containers.composefs.fsverity".to_string(), id.to_hex());
328 config.set_config(Some(myconfig));
329 write_config(repo, &config, refs)
330}
331
332pub fn mount<ObjectID: FsVerityHashValue>(
333 repo: &Repository<ObjectID>,
334 name: &str,
335 mountpoint: &str,
336 verity: Option<&ObjectID>,
337) -> Result<()> {
338 let config = open_config_shallow(repo, name, verity)?;
339 let Some(id) = config.get_config_annotation("containers.composefs.fsverity") else {
340 bail!("Can only mount sealed containers");
341 };
342 repo.mount(id, mountpoint)
343}
344
345#[cfg(test)]
346mod test {
347 use std::{fmt::Write, io::Read};
348
349 use rustix::fs::CWD;
350 use sha2::{Digest, Sha256};
351
352 use composefs::{fsverity::Sha256HashValue, repository::Repository, test::tempdir};
353
354 use super::*;
355
356 fn append_data(builder: &mut ::tar::Builder<Vec<u8>>, name: &str, size: usize) {
357 let mut header = ::tar::Header::new_ustar();
358 header.set_uid(0);
359 header.set_gid(0);
360 header.set_mode(0o700);
361 header.set_entry_type(::tar::EntryType::Regular);
362 header.set_size(size as u64);
363 builder
364 .append_data(&mut header, name, std::io::repeat(0u8).take(size as u64))
365 .unwrap();
366 }
367
368 fn example_layer() -> Vec<u8> {
369 let mut builder = ::tar::Builder::new(vec![]);
370 append_data(&mut builder, "file0", 0);
371 append_data(&mut builder, "file4095", 4095);
372 append_data(&mut builder, "file4096", 4096);
373 append_data(&mut builder, "file4097", 4097);
374 builder.into_inner().unwrap()
375 }
376
377 #[test]
378 fn test_layer() {
379 let layer = example_layer();
380 let mut context = Sha256::new();
381 context.update(&layer);
382 let layer_id: [u8; 32] = context.finalize().into();
383
384 let repo_dir = tempdir();
385 let repo = Arc::new(Repository::<Sha256HashValue>::open_path(CWD, &repo_dir).unwrap());
386 let id = import_layer(&repo, &layer_id, Some("name"), &mut layer.as_slice()).unwrap();
387
388 let mut dump = String::new();
389 let mut split_stream = repo.open_stream("refs/name", Some(&id)).unwrap();
390 while let Some(entry) = tar::get_entry(&mut split_stream).unwrap() {
391 writeln!(dump, "{entry}").unwrap();
392 }
393 similar_asserts::assert_eq!(dump, "\
394/file0 0 100700 1 0 0 0 0.0 - - -
395/file4095 4095 100700 1 0 0 0 0.0 53/72beb83c78537c8970c8361e3254119fafdf1763854ecd57d3f0fe2da7c719 - 5372beb83c78537c8970c8361e3254119fafdf1763854ecd57d3f0fe2da7c719
396/file4096 4096 100700 1 0 0 0 0.0 ba/bc284ee4ffe7f449377fbf6692715b43aec7bc39c094a95878904d34bac97e - babc284ee4ffe7f449377fbf6692715b43aec7bc39c094a95878904d34bac97e
397/file4097 4097 100700 1 0 0 0 0.0 09/3756e4ea9683329106d4a16982682ed182c14bf076463a9e7f97305cbac743 - 093756e4ea9683329106d4a16982682ed182c14bf076463a9e7f97305cbac743
398");
399 }
400}