ostree_ext/container/
unencapsulate.rs

1//! APIs for "unencapsulating" OSTree commits from container images
2//!
3//! This code only operates on container images that were created via
4//! [`encapsulate`].
5//!
6//! # External depenendency on container-image-proxy
7//!
8//! This code requires <https://github.com/cgwalters/container-image-proxy>
9//! installed as a binary in $PATH.
10//!
11//! The rationale for this is that while there exist Rust crates to speak
12//! the Docker distribution API, the Go library <https://github.com/containers/image/>
13//! supports key things we want for production use like:
14//!
15//! - Image mirroring and remapping; effectively `man containers-registries.conf`
16//!   For example, we need to support an administrator mirroring an ostree-container
17//!   into a disconnected registry, without changing all the pull specs.
18//! - Signing
19//!
20//! Additionally, the proxy "upconverts" manifests into OCI, so we don't need to care
21//! about parsing the Docker manifest format (as used by most registries still).
22//!
23//! [`encapsulate`]: [`super::encapsulate()`]
24
25// # Implementation
26//
27// First, we support explicitly fetching just the manifest: https://github.com/opencontainers/image-spec/blob/main/manifest.md
28// This will give us information about the layers it contains, and crucially the digest (sha256) of
29// the manifest is how higher level software can detect changes.
30//
31// Once we have the manifest, we expect it to point to a single `application/vnd.oci.image.layer.v1.tar+gzip` layer,
32// which is exactly what is exported by the [`crate::tar::export`] process.
33
34use crate::container::store::LayerProgress;
35
36use super::*;
37use containers_image_proxy::{ImageProxy, OpenedImage};
38use fn_error_context::context;
39use futures_util::{Future, FutureExt};
40use oci_spec::image::{self as oci_image, Digest};
41use std::io::Read;
42use std::sync::{Arc, Mutex};
43use tokio::{
44    io::{AsyncBufRead, AsyncRead},
45    sync::watch::{Receiver, Sender},
46};
47use tracing::instrument;
48
49/// The legacy MIME type returned by the skopeo/(containers/storage) code
50/// when we have local uncompressed docker-formatted image.
51/// TODO: change the skopeo code to shield us from this correctly
52const DOCKER_TYPE_LAYER_TAR: &str = "application/vnd.docker.image.rootfs.diff.tar";
53
54type Progress = tokio::sync::watch::Sender<u64>;
55
56/// A read wrapper that updates the download progress.
57#[pin_project::pin_project]
58#[derive(Debug)]
59pub(crate) struct ProgressReader<T> {
60    #[pin]
61    pub(crate) reader: T,
62    #[pin]
63    pub(crate) progress: Arc<Mutex<Progress>>,
64}
65
66impl<T: AsyncRead> ProgressReader<T> {
67    pub(crate) fn new(reader: T) -> (Self, Receiver<u64>) {
68        let (progress, r) = tokio::sync::watch::channel(1);
69        let progress = Arc::new(Mutex::new(progress));
70        (ProgressReader { reader, progress }, r)
71    }
72}
73
74impl<T: AsyncRead> AsyncRead for ProgressReader<T> {
75    fn poll_read(
76        self: std::pin::Pin<&mut Self>,
77        cx: &mut std::task::Context<'_>,
78        buf: &mut tokio::io::ReadBuf<'_>,
79    ) -> std::task::Poll<std::io::Result<()>> {
80        let this = self.project();
81        let len = buf.filled().len();
82        match this.reader.poll_read(cx, buf) {
83            v @ std::task::Poll::Ready(Ok(_)) => {
84                let progress = this.progress.lock().unwrap();
85                let state = {
86                    let mut state = *progress.borrow();
87                    let newlen = buf.filled().len();
88                    debug_assert!(newlen >= len);
89                    let read = (newlen - len) as u64;
90                    state += read;
91                    state
92                };
93                // Ignore errors, if the caller disconnected from progress that's OK.
94                let _ = progress.send(state);
95                v
96            }
97            o => o,
98        }
99    }
100}
101
102async fn fetch_manifest_impl(
103    proxy: &mut ImageProxy,
104    imgref: &OstreeImageReference,
105) -> Result<(oci_image::ImageManifest, oci_image::Digest)> {
106    let oi = &proxy.open_image(&imgref.imgref.to_string()).await?;
107    let (digest, manifest) = proxy.fetch_manifest(oi).await?;
108    proxy.close_image(oi).await?;
109    Ok((manifest, oci_image::Digest::from_str(digest.as_str())?))
110}
111
112/// Download the manifest for a target image and its sha256 digest.
113#[context("Fetching manifest")]
114pub async fn fetch_manifest(
115    imgref: &OstreeImageReference,
116) -> Result<(oci_image::ImageManifest, oci_image::Digest)> {
117    let mut proxy = ImageProxy::new().await?;
118    fetch_manifest_impl(&mut proxy, imgref).await
119}
120
121/// Download the manifest for a target image and its sha256 digest, as well as the image configuration.
122#[context("Fetching manifest and config")]
123pub async fn fetch_manifest_and_config(
124    imgref: &OstreeImageReference,
125) -> Result<(
126    oci_image::ImageManifest,
127    oci_image::Digest,
128    oci_image::ImageConfiguration,
129)> {
130    let proxy = ImageProxy::new().await?;
131    let oi = &proxy.open_image(&imgref.imgref.to_string()).await?;
132    let (digest, manifest) = proxy.fetch_manifest(oi).await?;
133    let digest = oci_image::Digest::from_str(&digest)?;
134    let config = proxy.fetch_config(oi).await?;
135    Ok((manifest, digest, config))
136}
137
138/// The result of an import operation
139#[derive(Debug)]
140pub struct Import {
141    /// The ostree commit that was imported
142    pub ostree_commit: String,
143    /// The image digest retrieved
144    pub image_digest: Digest,
145
146    /// Any deprecation warning
147    pub deprecated_warning: Option<String>,
148}
149
150/// Use this to process potential errors from a worker and a driver.
151/// This is really a brutal hack around the fact that an error can occur
152/// on either our side or in the proxy.  But if an error occurs on our
153/// side, then we will close the pipe, which will *also* cause the proxy
154/// to error out.
155///
156/// What we really want is for the proxy to tell us when it got an
157/// error from us closing the pipe.  Or, we could store that state
158/// on our side.  Both are slightly tricky, so we have this (again)
159/// hacky thing where we just search for `broken pipe` in the error text.
160///
161/// Or to restate all of the above - what this function does is check
162/// to see if the worker function had an error *and* if the proxy
163/// had an error, but if the proxy's error ends in `broken pipe`
164/// then it means the real only error is from the worker.
165pub(crate) async fn join_fetch<T: std::fmt::Debug>(
166    worker: impl Future<Output = Result<T>>,
167    driver: impl Future<Output = Result<()>>,
168) -> Result<T> {
169    let (worker, driver) = tokio::join!(worker, driver);
170    match (worker, driver) {
171        (Ok(t), Ok(())) => Ok(t),
172        (Err(worker), Err(driver)) => {
173            let text = driver.root_cause().to_string();
174            if text.ends_with("broken pipe") {
175                tracing::trace!("Ignoring broken pipe failure from driver");
176                Err(worker)
177            } else {
178                Err(worker.context(format!("proxy failure: {} and client error", text)))
179            }
180        }
181        (Ok(_), Err(driver)) => Err(driver),
182        (Err(worker), Ok(())) => Err(worker),
183    }
184}
185
186/// Fetch a container image and import its embedded OSTree commit.
187#[context("Importing {}", imgref)]
188#[instrument(level = "debug", skip(repo))]
189pub async fn unencapsulate(repo: &ostree::Repo, imgref: &OstreeImageReference) -> Result<Import> {
190    let importer = super::store::ImageImporter::new(repo, imgref, Default::default()).await?;
191    importer.unencapsulate().await
192}
193
194/// Create a decompressor for this MIME type, given a stream of input.
195pub(crate) fn decompressor(
196    media_type: &oci_image::MediaType,
197    src: impl Read + Send + 'static,
198) -> Result<Box<dyn Read + Send + 'static>> {
199    let r: Box<dyn std::io::Read + Send + 'static> = match media_type {
200        m @ (oci_image::MediaType::ImageLayerGzip | oci_image::MediaType::ImageLayerZstd) => {
201            if matches!(m, oci_image::MediaType::ImageLayerZstd) {
202                Box::new(zstd::stream::read::Decoder::new(src)?)
203            } else {
204                Box::new(flate2::bufread::GzDecoder::new(std::io::BufReader::new(
205                    src,
206                )))
207            }
208        }
209        oci_image::MediaType::ImageLayer => Box::new(src),
210        oci_image::MediaType::Other(t) if t.as_str() == DOCKER_TYPE_LAYER_TAR => Box::new(src),
211        o => anyhow::bail!("Unhandled layer type: {}", o),
212    };
213    Ok(r)
214}
215
216/// A wrapper for [`get_blob`] which fetches a layer and decompresses it.
217pub(crate) async fn fetch_layer<'a>(
218    proxy: &'a ImageProxy,
219    img: &OpenedImage,
220    manifest: &oci_image::ImageManifest,
221    layer: &'a oci_image::Descriptor,
222    progress: Option<&'a Sender<Option<store::LayerProgress>>>,
223    layer_info: Option<&Vec<containers_image_proxy::ConvertedLayerInfo>>,
224    transport_src: Transport,
225) -> Result<(
226    Box<dyn AsyncBufRead + Send + Unpin>,
227    impl Future<Output = Result<()>> + 'a,
228    oci_image::MediaType,
229)> {
230    use futures_util::future::Either;
231    tracing::debug!("fetching {}", layer.digest());
232    let layer_index = manifest.layers().iter().position(|x| x == layer).unwrap();
233    let (blob, driver, size);
234    let media_type: oci_image::MediaType;
235    match transport_src {
236        Transport::ContainerStorage => {
237            let layer_info = layer_info
238                .ok_or_else(|| anyhow!("skopeo too old to pull from containers-storage"))?;
239            let n_layers = layer_info.len();
240            let layer_blob = layer_info.get(layer_index).ok_or_else(|| {
241                anyhow!("blobid position {layer_index} exceeds diffid count {n_layers}")
242            })?;
243            size = layer_blob.size;
244            media_type = layer_blob.media_type.clone();
245            (blob, driver) = proxy.get_blob(img, &layer_blob.digest, size).await?;
246        }
247        _ => {
248            size = layer.size();
249            media_type = layer.media_type().clone();
250            (blob, driver) = proxy.get_blob(img, layer.digest(), size).await?;
251        }
252    };
253
254    let driver = async { driver.await.map_err(Into::into) };
255
256    if let Some(progress) = progress {
257        let (readprogress, mut readwatch) = ProgressReader::new(blob);
258        let readprogress = tokio::io::BufReader::new(readprogress);
259        let readproxy = async move {
260            while let Ok(()) = readwatch.changed().await {
261                let fetched = readwatch.borrow_and_update();
262                let status = LayerProgress {
263                    layer_index,
264                    fetched: *fetched,
265                    total: size,
266                };
267                progress.send_replace(Some(status));
268            }
269        };
270        let reader = Box::new(readprogress);
271        let driver = futures_util::future::join(readproxy, driver).map(|r| r.1);
272        Ok((reader, Either::Left(driver), media_type))
273    } else {
274        Ok((Box::new(blob), Either::Right(driver), media_type))
275    }
276}