containers_image_proxy/
imageproxy.rs

1//! Run skopeo as a subprocess to fetch container images.
2//!
3//! This allows fetching a container image manifest and layers in a streaming fashion.
4//!
5//! More information: <https://github.com/containers/skopeo/pull/1476>
6
7use cap_std_ext::prelude::CapStdExtCommandExt;
8use cap_std_ext::{cap_std, cap_tempfile};
9use futures_util::{Future, FutureExt};
10use itertools::Itertools;
11use oci_spec::image::{Descriptor, Digest};
12use serde::{Deserialize, Serialize};
13use std::fs::File;
14use std::iter::FusedIterator;
15use std::num::NonZeroU32;
16use std::ops::Range;
17use std::os::fd::OwnedFd;
18use std::os::unix::prelude::CommandExt;
19use std::path::PathBuf;
20use std::pin::Pin;
21use std::process::{Command, Stdio};
22use std::sync::{Arc, Mutex, OnceLock};
23use thiserror::Error;
24use tokio::io::{AsyncBufRead, AsyncReadExt};
25use tokio::sync::Mutex as AsyncMutex;
26use tokio::task::JoinError;
27use tracing::instrument;
28
29/// Errors returned by this crate.
30#[derive(Error, Debug)]
31#[non_exhaustive]
32pub enum Error {
33    #[error("i/o error")]
34    /// An input/output error
35    Io(#[from] std::io::Error),
36    #[error("skopeo spawn error: {}", .0)]
37    /// An error spawning skopeo
38    SkopeoSpawnError(#[source] std::io::Error),
39    #[error("serialization error")]
40    /// Returned when serialization or deserialization fails
41    SerDe(#[from] serde_json::Error),
42    /// The proxy failed to initiate a request
43    #[error("failed to invoke method {method}: {error}")]
44    RequestInitiationFailure { method: Box<str>, error: Box<str> },
45    /// An error returned from the remote proxy
46    #[error("proxy request returned error")]
47    RequestReturned(Box<str>),
48    #[error("semantic version error")]
49    SemanticVersion(#[from] semver::Error),
50    #[error("proxy too old (requested={requested_version} found={found_version}) error")]
51    /// The proxy doesn't support the requested semantic version
52    ProxyTooOld {
53        requested_version: Box<str>,
54        found_version: Box<str>,
55    },
56    #[error("configuration error")]
57    /// Conflicting or missing configuration
58    Configuration(Box<str>),
59    #[error("error")]
60    /// An unknown other error
61    Other(Box<str>),
62}
63
64impl Error {
65    pub(crate) fn new_other(e: impl Into<Box<str>>) -> Self {
66        Self::Other(e.into())
67    }
68}
69
70/// Errors returned by get_raw_blob
71#[derive(Error, Debug)]
72#[non_exhaustive]
73pub enum GetBlobError {
74    /// A client may reasonably retry on this type of error.
75    #[error("retryable error")]
76    Retryable(Box<str>),
77    #[error("error")]
78    /// An unknown other error
79    Other(Box<str>),
80}
81
82impl From<rustix::io::Errno> for Error {
83    fn from(value: rustix::io::Errno) -> Self {
84        Self::Io(value.into())
85    }
86}
87
88/// The error type returned from this crate.
89pub type Result<T> = std::result::Result<T, Error>;
90
91/// Re-export because we use this in our public APIs
92pub use oci_spec;
93
94/// File descriptor range which is reserved for passing data down into the proxy;
95/// avoid configuring the command to use files in this range.  (Also, stdin is
96/// reserved)
97pub const RESERVED_FD_RANGE: Range<i32> = 100..200;
98
99// This is defined in skopeo; maximum size of JSON we will read/write.
100// Note that payload data (non-metadata) should go over a pipe file descriptor.
101const MAX_MSG_SIZE: usize = 32 * 1024;
102
103fn base_proto_version() -> &'static semver::VersionReq {
104    // Introduced in https://github.com/containers/skopeo/pull/1523
105    static BASE_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
106    BASE_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.3").unwrap())
107}
108
109fn layer_info_proto_version() -> &'static semver::VersionReq {
110    static LAYER_INFO_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
111    LAYER_INFO_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.5").unwrap())
112}
113
114fn layer_info_piped_proto_version() -> &'static semver::VersionReq {
115    static LAYER_INFO_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
116    LAYER_INFO_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.7").unwrap())
117}
118
119#[derive(Serialize)]
120struct Request {
121    method: String,
122    args: Vec<serde_json::Value>,
123}
124
125impl Request {
126    fn new<T, I>(method: &str, args: T) -> Self
127    where
128        T: IntoIterator<Item = I>,
129        I: Into<serde_json::Value>,
130    {
131        let args: Vec<_> = args.into_iter().map(|v| v.into()).collect();
132        Self {
133            method: method.to_string(),
134            args,
135        }
136    }
137
138    fn new_bare(method: &str) -> Self {
139        Self {
140            method: method.to_string(),
141            args: vec![],
142        }
143    }
144}
145
146#[derive(Deserialize)]
147struct Reply {
148    success: bool,
149    error: String,
150    pipeid: u32,
151    value: serde_json::Value,
152}
153
154type ChildFuture = Pin<
155    Box<
156        dyn Future<Output = std::result::Result<std::io::Result<std::process::Output>, JoinError>>
157            + Send,
158    >,
159>;
160
161/// Manage a child process proxy to fetch container images.
162pub struct ImageProxy {
163    sockfd: Arc<Mutex<OwnedFd>>,
164    childwait: Arc<AsyncMutex<ChildFuture>>,
165    protover: semver::Version,
166}
167
168impl std::fmt::Debug for ImageProxy {
169    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
170        f.debug_struct("ImageProxy").finish()
171    }
172}
173
174/// Opaque identifier for an image
175#[derive(Debug, PartialEq, Eq)]
176pub struct OpenedImage(u32);
177
178#[derive(Debug, PartialEq, Eq)]
179struct PipeId(NonZeroU32);
180
181impl PipeId {
182    fn try_new(pipeid: u32) -> Option<Self> {
183        Some(Self(NonZeroU32::new(pipeid)?))
184    }
185}
186
187/// Configuration for the proxy.
188#[derive(Debug, Default)]
189pub struct ImageProxyConfig {
190    /// Path to container auth file; equivalent to `skopeo --authfile`.
191    /// This conflicts with [`auth_data`].
192    pub authfile: Option<PathBuf>,
193
194    /// Data stream for container auth.  This conflicts with [`authfile`].
195    pub auth_data: Option<File>,
196
197    /// Do not use default container authentication paths; equivalent to `skopeo --no-creds`.
198    ///
199    /// Defaults to `false`; in other words, use the default file paths from `man containers-auth.json`.
200    pub auth_anonymous: bool,
201
202    // Directory with certificates (*.crt, *.cert, *.key) used to connect to registry
203    // Equivalent to `skopeo --cert-dir`
204    pub certificate_directory: Option<PathBuf>,
205
206    /// Decryption keys to decrypt an encrypted container image.
207    /// equivalent to `skopeo copy --decryption-key <path_to_decryption_key> `
208    pub decryption_keys: Option<Vec<String>>,
209
210    /// If set, disable TLS verification.  Equivalent to `skopeo --tls-verify=false`.
211    pub insecure_skip_tls_verification: Option<bool>,
212
213    /// If enabled, propagate debug-logging level from the proxy via stderr to the
214    /// current process' stderr. Note than when enabled, this also means that standard
215    /// error will no longer be captured.
216    pub debug: bool,
217
218    /// Provide a configured [`std::process::Command`] instance.
219    ///
220    /// This allows configuring aspects of the resulting child `skopeo` process.
221    /// The intention of this hook is to allow the caller to use e.g.
222    /// `systemd-run` or equivalent containerization tools.  For example you
223    /// can set up a command whose arguments are `systemd-run -Pq -p DynamicUser=yes -- skopeo`.
224    /// You can also set up arbitrary aspects of the child via e.g.
225    /// [`current_dir`] [`pre_exec`].
226    ///
227    /// [`current_dir`]: https://doc.rust-lang.org/std/process/struct.Command.html#method.current_dir
228    /// [`pre_exec`]: https://doc.rust-lang.org/std/os/unix/process/trait.CommandExt.html#tymethod.pre_exec
229    ///
230    /// The default is to wrap via util-linux `setpriv --pdeathsig SIGTERM -- skopeo`,
231    /// which on Linux binds the lifecycle of the child process to the parent.
232    ///
233    /// Note that you *must* add `skopeo` as the primary argument or
234    /// indirectly.  However, all other command line options including
235    /// `experimental-image-proxy` will be injected by this library.
236    /// You may use a different command name from `skopeo` if your
237    /// application has set up a compatible copy, e.g. `/usr/lib/myapp/my-private-skopeo`/
238    pub skopeo_cmd: Option<Command>,
239}
240
241impl TryFrom<ImageProxyConfig> for Command {
242    type Error = Error;
243
244    fn try_from(config: ImageProxyConfig) -> Result<Self> {
245        let debug = config.debug || std::env::var_os("CONTAINERS_IMAGE_PROXY_DEBUG").is_some();
246        let mut allocated_fds = RESERVED_FD_RANGE.clone();
247        let mut alloc_fd = || {
248            allocated_fds.next().ok_or_else(|| {
249                Error::Other("Ran out of reserved file descriptors for child".into())
250            })
251        };
252
253        // By default, we set up pdeathsig to "lifecycle bind" the child process to us.
254        let mut c = config.skopeo_cmd.unwrap_or_else(|| {
255            let mut c = std::process::Command::new("skopeo");
256            unsafe {
257                c.pre_exec(|| {
258                    rustix::process::set_parent_process_death_signal(Some(
259                        rustix::process::Signal::TERM,
260                    ))
261                    .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
262                });
263            }
264            c
265        });
266        c.arg("experimental-image-proxy");
267        if debug {
268            c.arg("--debug");
269        }
270        let auth_option_count = [
271            config.authfile.is_some(),
272            config.auth_data.is_some(),
273            config.auth_anonymous,
274        ]
275        .into_iter()
276        .filter(|&x| x)
277        .count();
278        if auth_option_count > 1 {
279            // This is a programmer error really
280            return Err(Error::Configuration(
281                "Conflicting authentication options".into(),
282            ));
283        }
284        if let Some(authfile) = config.authfile {
285            c.arg("--authfile");
286            c.arg(authfile);
287        } else if let Some(mut auth_data) = config.auth_data.map(std::io::BufReader::new) {
288            // If we get the authentication data as a file, we always copy it to a new temporary file under
289            // the assumption that the caller provided it this way to aid in privilege separation where
290            // the file is only readable to privileged code.
291            let target_fd = alloc_fd()?;
292            let tmpd = &cap_std::fs::Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
293            let mut tempfile =
294                cap_tempfile::TempFile::new_anonymous(tmpd).map(std::io::BufWriter::new)?;
295            std::io::copy(&mut auth_data, &mut tempfile)?;
296            let tempfile = tempfile
297                .into_inner()
298                .map_err(|e| e.into_error())?
299                .into_std();
300            let fd = std::sync::Arc::new(tempfile.into());
301            c.take_fd_n(fd, target_fd);
302            c.arg("--authfile");
303            c.arg(format!("/proc/self/fd/{target_fd}"));
304        } else if config.auth_anonymous {
305            c.arg("--no-creds");
306        }
307
308        if let Some(certificate_directory) = config.certificate_directory {
309            c.arg("--cert-dir");
310            c.arg(certificate_directory);
311        }
312
313        if let Some(decryption_keys) = config.decryption_keys {
314            for decryption_key in &decryption_keys {
315                c.arg("--decryption-key");
316                c.arg(decryption_key);
317            }
318        }
319
320        if config.insecure_skip_tls_verification.unwrap_or_default() {
321            c.arg("--tls-verify=false");
322        }
323        c.stdout(Stdio::null());
324        if !debug {
325            c.stderr(Stdio::piped());
326        }
327        Ok(c)
328    }
329}
330
331/// BlobInfo collects known information about a blob
332#[derive(Debug, serde::Deserialize)]
333pub struct ConvertedLayerInfo {
334    /// Uncompressed digest of a layer; for more information, see
335    /// https://github.com/opencontainers/image-spec/blob/main/config.md#layer-diffid
336    pub digest: Digest,
337
338    /// Size of blob
339    pub size: u64,
340
341    /// Mediatype of blob
342    pub media_type: oci_spec::image::MediaType,
343}
344
345/// A single fd; requires invoking FinishPipe
346#[derive(Debug)]
347struct FinishPipe {
348    pipeid: PipeId,
349    datafd: OwnedFd,
350}
351
352/// There is a data FD and an error FD. The error FD will be JSON.
353#[derive(Debug)]
354struct DualFds {
355    datafd: OwnedFd,
356    errfd: OwnedFd,
357}
358
359/// Helper trait for parsing the pipeid and/or file descriptors of a reply
360trait FromReplyFds: Send + 'static
361where
362    Self: Sized,
363{
364    fn from_reply(
365        iterable: impl IntoIterator<IntoIter: FusedIterator, Item = OwnedFd>,
366        pipeid: u32,
367    ) -> Result<Self>;
368}
369
370/// No file descriptors or pipeid expected
371impl FromReplyFds for () {
372    fn from_reply(fds: impl IntoIterator<Item = OwnedFd>, pipeid: u32) -> Result<Self> {
373        if fds.into_iter().next().is_some() {
374            return Err(Error::Other("expected no fds".into()));
375        }
376        if pipeid != 0 {
377            return Err(Error::Other("unexpected pipeid".into()));
378        }
379        Ok(())
380    }
381}
382
383/// A FinishPipe instance
384impl FromReplyFds for FinishPipe {
385    fn from_reply(fds: impl IntoIterator<Item = OwnedFd>, pipeid: u32) -> Result<Self> {
386        let Some(pipeid) = PipeId::try_new(pipeid) else {
387            return Err(Error::Other("Expected pipeid for FinishPipe".into()));
388        };
389        let datafd = fds
390            .into_iter()
391            .exactly_one()
392            .map_err(|_| Error::Other("Expected exactly one fd for FinishPipe".into()))?;
393        Ok(Self { pipeid, datafd })
394    }
395}
396
397/// A DualFds instance
398impl FromReplyFds for DualFds {
399    fn from_reply(fds: impl IntoIterator<Item = OwnedFd>, pipeid: u32) -> Result<Self> {
400        if pipeid != 0 {
401            return Err(Error::Other("Unexpected pipeid with DualFds".into()));
402        }
403        let [datafd, errfd] = fds
404            .into_iter()
405            .collect_array()
406            .ok_or_else(|| Error::Other("Expected two fds for DualFds".into()))?;
407        Ok(Self { datafd, errfd })
408    }
409}
410
411impl ImageProxy {
412    /// Create an image proxy that fetches the target image, using default configuration.
413    pub async fn new() -> Result<Self> {
414        Self::new_with_config(Default::default()).await
415    }
416
417    /// Create an image proxy that fetches the target image
418    #[instrument]
419    pub async fn new_with_config(config: ImageProxyConfig) -> Result<Self> {
420        let mut c = Command::try_from(config)?;
421        let (mysock, theirsock) = rustix::net::socketpair(
422            rustix::net::AddressFamily::UNIX,
423            rustix::net::SocketType::SEQPACKET,
424            rustix::net::SocketFlags::CLOEXEC,
425            None,
426        )?;
427        c.stdin(Stdio::from(theirsock));
428        let child = match c.spawn() {
429            Ok(c) => c,
430            Err(error) => return Err(Error::SkopeoSpawnError(error)),
431        };
432        tracing::debug!("Spawned skopeo pid={:?}", child.id());
433        // Here we use std sync API via thread because tokio installs
434        // a SIGCHLD handler which can conflict with e.g. the glib one
435        // which may also be in process.
436        // xref https://github.com/tokio-rs/tokio/issues/3520#issuecomment-968985861
437        let childwait = tokio::task::spawn_blocking(move || child.wait_with_output());
438        let sockfd = Arc::new(Mutex::new(mysock));
439
440        let mut r = Self {
441            sockfd,
442            childwait: Arc::new(AsyncMutex::new(Box::pin(childwait))),
443            protover: semver::Version::new(0, 0, 0),
444        };
445
446        // Verify semantic version
447        let protover: String = r.impl_request("Initialize", [(); 0]).await?;
448        tracing::debug!("Remote protocol version: {protover}");
449        let protover = semver::Version::parse(protover.as_str())?;
450        // Previously we had a feature to opt-in to requiring newer versions using `if cfg!()`.
451        let supported = base_proto_version();
452        if !supported.matches(&protover) {
453            return Err(Error::ProxyTooOld {
454                requested_version: protover.to_string().into(),
455                found_version: supported.to_string().into(),
456            });
457        }
458        r.protover = protover;
459
460        Ok(r)
461    }
462
463    /// Create and send a request. Should only be used by impl_request.
464    async fn impl_request_raw<T: serde::de::DeserializeOwned + Send + 'static, F: FromReplyFds>(
465        sockfd: Arc<Mutex<OwnedFd>>,
466        req: Request,
467    ) -> Result<(T, F)> {
468        tracing::trace!("sending request {}", req.method.as_str());
469        // TODO: Investigate https://crates.io/crates/uds for SOCK_SEQPACKET tokio
470        let r = tokio::task::spawn_blocking(move || {
471            let sockfd = sockfd.lock().unwrap();
472            let sendbuf = serde_json::to_vec(&req)?;
473            let sockfd = &*sockfd;
474            rustix::net::send(sockfd, &sendbuf, rustix::net::SendFlags::empty())?;
475            drop(sendbuf);
476            let mut buf = [0u8; MAX_MSG_SIZE];
477            let mut cmsg_space: Vec<std::mem::MaybeUninit<u8>> =
478                vec![std::mem::MaybeUninit::uninit(); rustix::cmsg_space!(ScmRights(1))];
479            let mut cmsg_buffer = rustix::net::RecvAncillaryBuffer::new(cmsg_space.as_mut_slice());
480            let iov = std::io::IoSliceMut::new(buf.as_mut());
481            let mut iov = [iov];
482            let nread = rustix::net::recvmsg(
483                sockfd,
484                &mut iov,
485                &mut cmsg_buffer,
486                rustix::net::RecvFlags::CMSG_CLOEXEC,
487            )?
488            .bytes;
489            let fdret = cmsg_buffer
490                .drain()
491                .filter_map(|m| match m {
492                    rustix::net::RecvAncillaryMessage::ScmRights(f) => Some(f),
493                    _ => None,
494                })
495                .flatten();
496            let buf = &buf[..nread];
497            let reply: Reply = serde_json::from_slice(buf)?;
498            if !reply.success {
499                return Err(Error::RequestInitiationFailure {
500                    method: req.method.clone().into(),
501                    error: reply.error.into(),
502                });
503            }
504            let fds = FromReplyFds::from_reply(fdret, reply.pipeid)?;
505            Ok((serde_json::from_value(reply.value)?, fds))
506        })
507        .await
508        .map_err(|e| Error::Other(e.to_string().into()))??;
509        tracing::trace!("completed request");
510        Ok(r)
511    }
512
513    /// Create a request that may return file descriptors, and also check for an unexpected
514    /// termination of the child process.
515    #[instrument(skip(args))]
516    async fn impl_request_with_fds<
517        T: serde::de::DeserializeOwned + Send + 'static,
518        F: FromReplyFds,
519    >(
520        &self,
521        method: &str,
522        args: impl IntoIterator<Item = impl Into<serde_json::Value>>,
523    ) -> Result<(T, F)> {
524        let req = Self::impl_request_raw(Arc::clone(&self.sockfd), Request::new(method, args));
525        let mut childwait = self.childwait.lock().await;
526        tokio::select! {
527            r = req => { r }
528            r = childwait.as_mut() => {
529                let r = r.map_err(|e| Error::Other(e.to_string().into()))??;
530                let stderr = String::from_utf8_lossy(&r.stderr);
531                Err(Error::Other(format!("skopeo proxy unexpectedly exited during request method {}: {}\n{}", method, r.status, stderr).into()))
532            }
533        }
534    }
535
536    /// A synchronous invocation which does not return any file descriptors.
537    async fn impl_request<T: serde::de::DeserializeOwned + Send + 'static>(
538        &self,
539        method: &str,
540        args: impl IntoIterator<Item = impl Into<serde_json::Value>>,
541    ) -> Result<T> {
542        let (r, ()) = self.impl_request_with_fds(method, args).await?;
543        Ok(r)
544    }
545
546    #[instrument]
547    async fn finish_pipe(&self, pipeid: PipeId) -> Result<()> {
548        tracing::debug!("closing pipe");
549        let (r, ()) = self
550            .impl_request_with_fds("FinishPipe", [pipeid.0.get()])
551            .await?;
552        Ok(r)
553    }
554
555    #[instrument]
556    pub async fn open_image(&self, imgref: &str) -> Result<OpenedImage> {
557        tracing::debug!("opening image");
558        let imgid = self.impl_request("OpenImage", [imgref]).await?;
559        Ok(OpenedImage(imgid))
560    }
561
562    #[instrument]
563    pub async fn open_image_optional(&self, imgref: &str) -> Result<Option<OpenedImage>> {
564        tracing::debug!("opening image");
565        let imgid = self.impl_request("OpenImageOptional", [imgref]).await?;
566        if imgid == 0 {
567            Ok(None)
568        } else {
569            Ok(Some(OpenedImage(imgid)))
570        }
571    }
572
573    #[instrument]
574    pub async fn close_image(&self, img: &OpenedImage) -> Result<()> {
575        self.impl_request("CloseImage", [img.0]).await
576    }
577
578    async fn read_finish_pipe(&self, pipe: FinishPipe) -> Result<Vec<u8>> {
579        let fd = tokio::fs::File::from_std(std::fs::File::from(pipe.datafd));
580        let mut fd = tokio::io::BufReader::new(fd);
581        let mut r = Vec::new();
582        let reader = fd.read_to_end(&mut r);
583        let (nbytes, finish) = tokio::join!(reader, self.finish_pipe(pipe.pipeid));
584        finish?;
585        assert_eq!(nbytes?, r.len());
586        Ok(r)
587    }
588
589    /// Fetch the manifest as raw bytes, converted to OCI if necessary.
590    /// The original digest of the unconverted manifest is also returned.
591    /// For more information on OCI manifests, see <https://github.com/opencontainers/image-spec/blob/main/manifest.md>
592    pub async fn fetch_manifest_raw_oci(&self, img: &OpenedImage) -> Result<(String, Vec<u8>)> {
593        let (digest, pipefd) = self.impl_request_with_fds("GetManifest", [img.0]).await?;
594        Ok((digest, self.read_finish_pipe(pipefd).await?))
595    }
596
597    /// Fetch the manifest.
598    /// For more information on OCI manifests, see <https://github.com/opencontainers/image-spec/blob/main/manifest.md>
599    pub async fn fetch_manifest(
600        &self,
601        img: &OpenedImage,
602    ) -> Result<(String, oci_spec::image::ImageManifest)> {
603        let (digest, raw) = self.fetch_manifest_raw_oci(img).await?;
604        let manifest = serde_json::from_slice(&raw)?;
605        Ok((digest, manifest))
606    }
607
608    /// Fetch the config.
609    /// For more information on OCI config, see <https://github.com/opencontainers/image-spec/blob/main/config.md>
610    pub async fn fetch_config_raw(&self, img: &OpenedImage) -> Result<Vec<u8>> {
611        let ((), pipe) = self.impl_request_with_fds("GetFullConfig", [img.0]).await?;
612        self.read_finish_pipe(pipe).await
613    }
614
615    /// Fetch the config.
616    /// For more information on OCI config, see <https://github.com/opencontainers/image-spec/blob/main/config.md>
617    pub async fn fetch_config(
618        &self,
619        img: &OpenedImage,
620    ) -> Result<oci_spec::image::ImageConfiguration> {
621        let raw = self.fetch_config_raw(img).await?;
622        serde_json::from_slice(&raw).map_err(Into::into)
623    }
624
625    /// Fetch a blob identified by e.g. `sha256:<digest>`.
626    /// <https://github.com/opencontainers/image-spec/blob/main/descriptor.md>
627    ///
628    /// The requested size and digest are verified (by the proxy process).
629    ///
630    /// Note that because of the implementation details of this function, you should
631    /// [`futures::join!`] the returned futures instead of polling one after the other. The
632    /// secondary "driver" future will only return once everything has been read from
633    /// the reader future.
634    #[instrument]
635    pub async fn get_blob(
636        &self,
637        img: &OpenedImage,
638        digest: &Digest,
639        size: u64,
640    ) -> Result<(
641        impl AsyncBufRead + Send + Unpin,
642        impl Future<Output = Result<()>> + Unpin + '_,
643    )> {
644        // For previous discussion on digest/size verification, see
645        // https://github.com/cgwalters/container-image-proxy/issues/1#issuecomment-926712009
646        tracing::debug!("fetching blob");
647        let args: Vec<serde_json::Value> =
648            vec![img.0.into(), digest.to_string().into(), size.into()];
649        // Note that size may be -1 here if e.g. the remote registry doesn't give a Content-Length
650        // for example.
651        // We have always validated the size later (in FinishPipe) so out of conservatism we
652        // just ignore the size here.
653        let (_bloblen, pipe): (serde_json::Number, FinishPipe) =
654            self.impl_request_with_fds("GetBlob", args).await?;
655        let fd = tokio::fs::File::from_std(std::fs::File::from(pipe.datafd));
656        let fd = tokio::io::BufReader::new(fd);
657        let finish = Box::pin(self.finish_pipe(pipe.pipeid));
658        Ok((fd, finish))
659    }
660
661    async fn read_blob_error(fd: OwnedFd) -> std::result::Result<(), GetBlobError> {
662        let fd = tokio::fs::File::from_std(std::fs::File::from(fd));
663        let mut errfd = tokio::io::BufReader::new(fd);
664        let mut buf = Vec::new();
665        errfd
666            .read_to_end(&mut buf)
667            .await
668            .map_err(|e| GetBlobError::Other(e.to_string().into_boxed_str()))?;
669        if buf.is_empty() {
670            return Ok(());
671        }
672        #[derive(Deserialize)]
673        struct RemoteError {
674            code: String,
675            message: String,
676        }
677        let e: RemoteError = serde_json::from_slice(&buf)
678            .map_err(|e| GetBlobError::Other(e.to_string().into_boxed_str()))?;
679        match e.code.as_str() {
680            // Actually this is OK
681            "EPIPE" => Ok(()),
682            "retryable" => Err(GetBlobError::Retryable(e.message.into_boxed_str())),
683            _ => Err(GetBlobError::Other(e.message.into_boxed_str())),
684        }
685    }
686
687    /// Fetch a blob identified by e.g. `sha256:<digest>`; does not perform
688    /// any verification that the blob matches the digest. The size of the
689    /// blob (if available) and a pipe file descriptor are returned.
690    #[instrument]
691    pub async fn get_raw_blob(
692        &self,
693        img: &OpenedImage,
694        digest: &Digest,
695    ) -> Result<(
696        Option<u64>,
697        tokio::fs::File,
698        impl Future<Output = std::result::Result<(), GetBlobError>> + Unpin + '_,
699    )> {
700        tracing::debug!("fetching blob");
701        let args: Vec<serde_json::Value> = vec![img.0.into(), digest.to_string().into()];
702        let (bloblen, fds): (i64, DualFds) = self.impl_request_with_fds("GetRawBlob", args).await?;
703        // See the GetBlob case, we have a best-effort attempt to return the size, but it might not be known
704        let bloblen = u64::try_from(bloblen).ok();
705        let fd = tokio::fs::File::from_std(std::fs::File::from(fds.datafd));
706        let err = Self::read_blob_error(fds.errfd).boxed();
707        Ok((bloblen, fd, err))
708    }
709
710    /// Fetch a descriptor. The requested size and digest are verified (by the proxy process).
711    #[instrument]
712    pub async fn get_descriptor(
713        &self,
714        img: &OpenedImage,
715        descriptor: &Descriptor,
716    ) -> Result<(
717        impl AsyncBufRead + Send + Unpin,
718        impl Future<Output = Result<()>> + Unpin + '_,
719    )> {
720        self.get_blob(img, descriptor.digest(), descriptor.size())
721            .await
722    }
723
724    ///Returns data that can be used to find the "diffid" corresponding to a particular layer.
725    #[instrument]
726    pub async fn get_layer_info(
727        &self,
728        img: &OpenedImage,
729    ) -> Result<Option<Vec<ConvertedLayerInfo>>> {
730        tracing::debug!("Getting layer info");
731        if layer_info_piped_proto_version().matches(&self.protover) {
732            let ((), pipe) = self
733                .impl_request_with_fds("GetLayerInfoPiped", [img.0])
734                .await?;
735            let buf = self.read_finish_pipe(pipe).await?;
736            return Ok(Some(serde_json::from_slice(&buf)?));
737        }
738        if !layer_info_proto_version().matches(&self.protover) {
739            return Ok(None);
740        }
741        let layers = self.impl_request("GetLayerInfo", [img.0]).await?;
742        Ok(Some(layers))
743    }
744
745    /// Close the connection and wait for the child process to exit successfully.
746    #[instrument]
747    pub async fn finalize(self) -> Result<()> {
748        let _ = &self;
749        let req = Request::new_bare("Shutdown");
750        let sendbuf = serde_json::to_vec(&req)?;
751        // SAFETY: Only panics if a worker thread already panic'd
752        let sockfd = Arc::try_unwrap(self.sockfd).unwrap().into_inner().unwrap();
753        rustix::net::send(sockfd, &sendbuf, rustix::net::SendFlags::empty())?;
754        drop(sendbuf);
755        tracing::debug!("sent shutdown request");
756        let mut childwait = self.childwait.lock().await;
757        let output = childwait
758            .as_mut()
759            .await
760            .map_err(|e| Error::new_other(e.to_string()))??;
761        if !output.status.success() {
762            let stderr = String::from_utf8_lossy(&output.stderr);
763            return Err(Error::RequestReturned(
764                format!("proxy failed: {}\n{}", output.status, stderr).into(),
765            ));
766        }
767        tracing::debug!("proxy exited successfully");
768        Ok(())
769    }
770}
771
772#[cfg(test)]
773mod tests {
774    use std::io::{BufWriter, Seek, Write};
775    use std::os::fd::{AsRawFd, OwnedFd};
776
777    use super::*;
778    use cap_std_ext::cap_std::fs::Dir;
779    use rustix::fs::{memfd_create, MemfdFlags};
780
781    /// Check if we have skopeo
782    fn check_skopeo() -> bool {
783        static HAVE_SKOPEO: OnceLock<bool> = OnceLock::new();
784        *HAVE_SKOPEO.get_or_init(|| {
785            Command::new("skopeo")
786                .arg("--help")
787                .stdout(Stdio::null())
788                .stderr(Stdio::null())
789                .status()
790                .is_ok()
791        })
792    }
793
794    fn validate(c: Command, contains: &[&str], not_contains: &[&str]) {
795        // Format via debug, because
796        // https://doc.rust-lang.org/std/process/struct.Command.html#method.get_args
797        // is experimental
798        let d = format!("{:?}", c);
799        for c in contains {
800            assert!(d.contains(c), "{} missing {}", d, c);
801        }
802        for c in not_contains {
803            assert!(!d.contains(c), "{} should not contain {}", d, c);
804        }
805    }
806
807    #[test]
808    fn proxy_configs() {
809        let tmpd = &cap_tempfile::tempdir(cap_std::ambient_authority()).unwrap();
810
811        let c = Command::try_from(ImageProxyConfig::default()).unwrap();
812        validate(
813            c,
814            &["experimental-image-proxy"],
815            &["--no-creds", "--tls-verify", "--authfile"],
816        );
817
818        let c = Command::try_from(ImageProxyConfig {
819            authfile: Some(PathBuf::from("/path/to/authfile")),
820            ..Default::default()
821        })
822        .unwrap();
823        validate(c, &[r"--authfile", "/path/to/authfile"], &[]);
824
825        let decryption_key_path = "/path/to/decryption_key";
826        let c = Command::try_from(ImageProxyConfig {
827            decryption_keys: Some(vec![decryption_key_path.to_string()]),
828            ..Default::default()
829        })
830        .unwrap();
831        validate(c, &[r"--decryption-key", "/path/to/decryption_key"], &[]);
832
833        let c = Command::try_from(ImageProxyConfig {
834            certificate_directory: Some(PathBuf::from("/path/to/certs")),
835            ..Default::default()
836        })
837        .unwrap();
838        validate(c, &[r"--cert-dir", "/path/to/certs"], &[]);
839
840        let c = Command::try_from(ImageProxyConfig {
841            insecure_skip_tls_verification: Some(true),
842            ..Default::default()
843        })
844        .unwrap();
845        validate(c, &[r"--tls-verify=false"], &[]);
846
847        let mut tmpf = cap_tempfile::TempFile::new_anonymous(tmpd).unwrap();
848        tmpf.write_all(r#"{ "auths": {} "#.as_bytes()).unwrap();
849        tmpf.seek(std::io::SeekFrom::Start(0)).unwrap();
850        let c = Command::try_from(ImageProxyConfig {
851            auth_data: Some(tmpf.into_std()),
852            ..Default::default()
853        })
854        .unwrap();
855        validate(c, &["--authfile", "/proc/self/fd/100"], &[]);
856    }
857
858    #[tokio::test]
859    async fn skopeo_not_found() {
860        let mut config = ImageProxyConfig {
861            ..ImageProxyConfig::default()
862        };
863        config.skopeo_cmd = Some(Command::new("no-skopeo"));
864
865        match ImageProxy::new_with_config(config).await {
866            Ok(_) => panic!("Expected an error"),
867            Err(ref e @ Error::SkopeoSpawnError(ref inner)) => {
868                assert_eq!(inner.kind(), std::io::ErrorKind::NotFound);
869                // Just to double check
870                assert!(e
871                    .to_string()
872                    .contains("skopeo spawn error: No such file or directory"));
873            }
874            Err(e) => panic!("Unexpected error {e}"),
875        }
876    }
877
878    #[tokio::test]
879    async fn test_proxy_send_sync() {
880        fn assert_send_sync(_x: impl Send + Sync) {}
881
882        let Ok(proxy) = ImageProxy::new().await else {
883            // doesn't matter: we only actually care to test if this compiles
884            return;
885        };
886        assert_send_sync(&proxy);
887        assert_send_sync(proxy);
888
889        let opened = OpenedImage(0);
890        assert_send_sync(&opened);
891        assert_send_sync(opened);
892    }
893
894    fn generate_err_fd(v: serde_json::Value) -> Result<OwnedFd> {
895        let tmp = Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
896        let mut tf = cap_tempfile::TempFile::new_anonymous(&tmp).map(BufWriter::new)?;
897        serde_json::to_writer(&mut tf, &v)?;
898        let mut tf = tf.into_inner().map_err(|e| e.into_error())?;
899        tf.seek(std::io::SeekFrom::Start(0))?;
900        let r = tf.into_std().into();
901        Ok(r)
902    }
903
904    #[tokio::test]
905    async fn test_read_blob_error_retryable() -> Result<()> {
906        let retryable = serde_json::json!({
907            "code": "retryable",
908            "message": "foo",
909        });
910        let retryable = generate_err_fd(retryable)?;
911        let err = ImageProxy::read_blob_error(retryable).boxed();
912        let e = err.await.unwrap_err();
913        match e {
914            GetBlobError::Retryable(s) => assert_eq!(s.as_ref(), "foo"),
915            _ => panic!("Unexpected error {e:?}"),
916        }
917        Ok(())
918    }
919
920    #[tokio::test]
921    async fn test_read_blob_error_none() -> Result<()> {
922        let tmp = Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
923        let tf = cap_tempfile::TempFile::new_anonymous(&tmp)?.into_std();
924        let err = ImageProxy::read_blob_error(tf.into()).boxed();
925        err.await.unwrap();
926        Ok(())
927    }
928
929    #[tokio::test]
930    async fn test_read_blob_error_other() -> Result<()> {
931        let other = serde_json::json!({
932            "code": "other",
933            "message": "bar",
934        });
935        let other = generate_err_fd(other)?;
936        let err = ImageProxy::read_blob_error(other).boxed();
937        let e = err.await.unwrap_err();
938        match e {
939            GetBlobError::Other(s) => assert_eq!(s.as_ref(), "bar"),
940            _ => panic!("Unexpected error {e:?}"),
941        }
942        Ok(())
943    }
944
945    #[tokio::test]
946    async fn test_read_blob_error_epipe() -> Result<()> {
947        let epipe = serde_json::json!({
948            "code": "EPIPE",
949            "message": "baz",
950        });
951        let epipe = generate_err_fd(epipe)?;
952        let err = ImageProxy::read_blob_error(epipe).boxed();
953        err.await.unwrap();
954        Ok(())
955    }
956
957    // Helper to create a dummy OwnedFd using memfd_create for testing.
958    fn create_dummy_fd() -> OwnedFd {
959        memfd_create(c"test-fd", MemfdFlags::CLOEXEC).unwrap()
960    }
961
962    #[test]
963    fn test_new_from_raw_values_finish_pipe() {
964        let datafd = create_dummy_fd();
965        // Keep a raw fd to compare later, as fds_and_pipeid consumes datafd
966        let raw_datafd_val = datafd.as_raw_fd();
967        let fds = vec![datafd];
968        let v = FinishPipe::from_reply(fds, 1).unwrap();
969        assert_eq!(v.pipeid.0.get(), 1);
970        assert_eq!(v.datafd.as_raw_fd(), raw_datafd_val);
971    }
972
973    #[test]
974    fn test_new_from_raw_values_dual_fds() {
975        let datafd = create_dummy_fd();
976        let errfd = create_dummy_fd();
977        let raw_datafd_val = datafd.as_raw_fd();
978        let raw_errfd_val = errfd.as_raw_fd();
979        let fds = vec![datafd, errfd];
980        let v = DualFds::from_reply(fds, 0).unwrap();
981        assert_eq!(v.datafd.as_raw_fd(), raw_datafd_val);
982        assert_eq!(v.errfd.as_raw_fd(), raw_errfd_val);
983    }
984
985    #[test]
986    fn test_new_from_raw_values_error_too_many_fds() {
987        let fds = vec![create_dummy_fd(), create_dummy_fd(), create_dummy_fd()];
988        match DualFds::from_reply(fds, 0) {
989            Ok(v) => unreachable!("{v:?}"),
990            Err(Error::Other(msg)) => {
991                assert_eq!(msg.as_ref(), "Expected two fds for DualFds")
992            }
993            Err(other) => unreachable!("{other}"),
994        }
995    }
996
997    #[test]
998    fn test_new_from_raw_values_error_fd_with_zero_pipeid() {
999        let fds = vec![create_dummy_fd()];
1000        match FinishPipe::from_reply(fds, 0) {
1001            Ok(v) => unreachable!("{v:?}"),
1002            Err(Error::Other(msg)) => {
1003                assert_eq!(msg.as_ref(), "Expected pipeid for FinishPipe")
1004            }
1005            Err(other) => unreachable!("{other}"),
1006        }
1007    }
1008
1009    #[test]
1010    fn test_new_from_raw_values_error_pipeid_with_both_fds() {
1011        let fds = vec![create_dummy_fd(), create_dummy_fd()];
1012        match DualFds::from_reply(fds, 1) {
1013            Ok(v) => unreachable!("{v:?}"),
1014            Err(Error::Other(msg)) => {
1015                assert_eq!(msg.as_ref(), "Unexpected pipeid with DualFds")
1016            }
1017            Err(other) => unreachable!("{other}"),
1018        }
1019    }
1020
1021    #[test]
1022    fn test_new_from_raw_values_error_no_fd_with_pipeid() {
1023        let fds: Vec<OwnedFd> = vec![];
1024        match FinishPipe::from_reply(fds, 1) {
1025            Ok(v) => unreachable!("{v:?}"),
1026            Err(Error::Other(msg)) => {
1027                assert_eq!(msg.as_ref(), "Expected exactly one fd for FinishPipe")
1028            }
1029            Err(other) => unreachable!("{other}"),
1030        }
1031    }
1032
1033    #[tokio::test]
1034    #[ignore = "https://github.com/coreos/rpm-ostree/issues/5442"]
1035    async fn test_open_optional() -> Result<()> {
1036        if !check_skopeo() {
1037            return Ok(());
1038        }
1039
1040        let td = tempfile::tempdir()?;
1041        let td = td.path().to_str().unwrap();
1042        let proxy = ImageProxy::new().await?;
1043        let imgpath = format!("oci-archive:{td}/some-nonexistent-image.ociarchive");
1044        let img = proxy.open_image_optional(&imgpath).await.unwrap();
1045        assert!(img.is_none());
1046
1047        Ok(())
1048    }
1049}