Skip to main content

containers_image_proxy/
imageproxy.rs

1//! Run skopeo as a subprocess to fetch container images.
2//!
3//! This allows fetching a container image manifest and layers in a streaming fashion.
4//!
5//! More information: <https://github.com/containers/skopeo/pull/1476>
6
7pub mod transport;
8
9// Re-export oci_spec so consumers don't need to add it as a dependency.
10// NOTE: Bumping oci_spec in a semver-incompatible way requires bumping this crate too.
11// See Cargo.toml for details.
12pub use ::oci_spec;
13pub use transport::{
14    ContainersStorageRef, ImageReference, ImageReferenceError, Transport, TransportConversionError,
15};
16
17use ::oci_spec::image::{Descriptor, Digest};
18use cap_std_ext::prelude::CapStdExtCommandExt;
19use cap_std_ext::{cap_std, cap_tempfile};
20use futures_util::{Future, FutureExt};
21use itertools::Itertools;
22use serde::{Deserialize, Serialize};
23use std::fs::File;
24use std::iter::FusedIterator;
25use std::num::NonZeroU32;
26use std::ops::Range;
27use std::os::fd::OwnedFd;
28use std::os::unix::prelude::CommandExt;
29use std::path::PathBuf;
30use std::pin::Pin;
31use std::process::{Command, Stdio};
32use std::sync::{Arc, Mutex, OnceLock};
33use thiserror::Error;
34use tokio::io::{AsyncBufRead, AsyncReadExt};
35use tokio::sync::Mutex as AsyncMutex;
36use tokio::task::JoinError;
37use tracing::instrument;
38
39/// Errors returned by this crate.
40#[derive(Error, Debug)]
41#[non_exhaustive]
42pub enum Error {
43    #[error("i/o error: {0}")]
44    /// An input/output error
45    Io(#[from] std::io::Error),
46    #[error("skopeo spawn error: {0}")]
47    /// An error spawning skopeo
48    SkopeoSpawnError(#[source] std::io::Error),
49    #[error("serialization error: {0}")]
50    /// Returned when serialization or deserialization fails
51    SerDe(#[from] serde_json::Error),
52    /// The proxy failed to initiate a request
53    #[error("failed to invoke method {method}: {error}")]
54    RequestInitiationFailure { method: Box<str>, error: Box<str> },
55    /// An error returned from the remote proxy
56    #[error("proxy request returned error: {0}")]
57    RequestReturned(Box<str>),
58    #[error("semantic version error: {0}")]
59    SemanticVersion(#[from] semver::Error),
60    #[error("proxy too old (requested={requested_version} found={found_version}) error")]
61    /// The proxy doesn't support the requested semantic version
62    ProxyTooOld {
63        requested_version: Box<str>,
64        found_version: Box<str>,
65    },
66    #[error("configuration error: {0}")]
67    /// Conflicting or missing configuration
68    Configuration(Box<str>),
69    #[error("other error: {0}")]
70    /// An unknown other error
71    Other(Box<str>),
72}
73
74impl Error {
75    pub(crate) fn new_other(e: impl Into<Box<str>>) -> Self {
76        Self::Other(e.into())
77    }
78}
79
80/// Errors returned by get_raw_blob
81#[derive(Error, Debug)]
82#[non_exhaustive]
83pub enum GetBlobError {
84    /// A client may reasonably retry on this type of error.
85    #[error("retryable error: {0}")]
86    Retryable(Box<str>),
87    #[error("other error: {0}")]
88    /// An unknown other error
89    Other(Box<str>),
90}
91
92impl From<rustix::io::Errno> for Error {
93    fn from(value: rustix::io::Errno) -> Self {
94        Self::Io(value.into())
95    }
96}
97
98/// The error type returned from this crate.
99pub type Result<T> = std::result::Result<T, Error>;
100
101/// File descriptor range which is reserved for passing data down into the proxy;
102/// avoid configuring the command to use files in this range.  (Also, stdin is
103/// reserved)
104pub const RESERVED_FD_RANGE: Range<i32> = 100..200;
105
106// This is defined in skopeo; maximum size of JSON we will read/write.
107// Note that payload data (non-metadata) should go over a pipe file descriptor.
108const MAX_MSG_SIZE: usize = 32 * 1024;
109
110fn base_proto_version() -> &'static semver::VersionReq {
111    // Introduced in https://github.com/containers/skopeo/pull/1523
112    static BASE_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
113    BASE_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.3").unwrap())
114}
115
116fn layer_info_proto_version() -> &'static semver::VersionReq {
117    static LAYER_INFO_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
118    LAYER_INFO_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.5").unwrap())
119}
120
121fn layer_info_piped_proto_version() -> &'static semver::VersionReq {
122    static LAYER_INFO_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
123    LAYER_INFO_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.7").unwrap())
124}
125
126#[derive(Serialize)]
127struct Request {
128    method: String,
129    args: Vec<serde_json::Value>,
130}
131
132impl Request {
133    fn new<T, I>(method: &str, args: T) -> Self
134    where
135        T: IntoIterator<Item = I>,
136        I: Into<serde_json::Value>,
137    {
138        let args: Vec<_> = args.into_iter().map(|v| v.into()).collect();
139        Self {
140            method: method.to_string(),
141            args,
142        }
143    }
144
145    fn new_bare(method: &str) -> Self {
146        Self {
147            method: method.to_string(),
148            args: vec![],
149        }
150    }
151}
152
153#[derive(Deserialize)]
154struct Reply {
155    success: bool,
156    error: String,
157    pipeid: u32,
158    value: serde_json::Value,
159}
160
161type ChildFuture = Pin<
162    Box<
163        dyn Future<Output = std::result::Result<std::io::Result<std::process::Output>, JoinError>>
164            + Send,
165    >,
166>;
167
168/// Manage a child process proxy to fetch container images.
169pub struct ImageProxy {
170    sockfd: Arc<Mutex<OwnedFd>>,
171    childwait: Arc<AsyncMutex<ChildFuture>>,
172    protover: semver::Version,
173}
174
175impl std::fmt::Debug for ImageProxy {
176    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
177        f.debug_struct("ImageProxy").finish()
178    }
179}
180
181/// Opaque identifier for an image
182#[derive(Debug, PartialEq, Eq)]
183pub struct OpenedImage(u32);
184
185#[derive(Debug, PartialEq, Eq)]
186struct PipeId(NonZeroU32);
187
188impl PipeId {
189    fn try_new(pipeid: u32) -> Option<Self> {
190        Some(Self(NonZeroU32::new(pipeid)?))
191    }
192}
193
194/// Configuration for the proxy.
195#[derive(Debug, Default)]
196pub struct ImageProxyConfig {
197    /// Path to container auth file; equivalent to `skopeo --authfile`.
198    /// This conflicts with [`auth_data`].
199    pub authfile: Option<PathBuf>,
200
201    /// Data stream for container auth.  This conflicts with [`authfile`].
202    pub auth_data: Option<File>,
203
204    /// Do not use default container authentication paths; equivalent to `skopeo --no-creds`.
205    ///
206    /// Defaults to `false`; in other words, use the default file paths from `man containers-auth.json`.
207    pub auth_anonymous: bool,
208
209    // Directory with certificates (*.crt, *.cert, *.key) used to connect to registry
210    // Equivalent to `skopeo --cert-dir`
211    pub certificate_directory: Option<PathBuf>,
212
213    /// Decryption keys to decrypt an encrypted container image.
214    /// equivalent to `skopeo copy --decryption-key <path_to_decryption_key> `
215    pub decryption_keys: Option<Vec<String>>,
216
217    /// If set, disable TLS verification.  Equivalent to `skopeo --tls-verify=false`.
218    pub insecure_skip_tls_verification: Option<bool>,
219
220    /// Prefix to add to the user agent string. Equivalent to `skopeo --user-agent-prefix`.
221    /// The resulting user agent will be in the format "prefix skopeo/version".
222    /// This option is only used if the installed skopeo version supports it.
223    pub user_agent_prefix: Option<String>,
224
225    /// If enabled, propagate debug-logging level from the proxy via stderr to the
226    /// current process' stderr. Note than when enabled, this also means that standard
227    /// error will no longer be captured.
228    pub debug: bool,
229
230    /// Provide a configured [`std::process::Command`] instance.
231    ///
232    /// This allows configuring aspects of the resulting child `skopeo` process.
233    /// The intention of this hook is to allow the caller to use e.g.
234    /// `systemd-run` or equivalent containerization tools.  For example you
235    /// can set up a command whose arguments are `systemd-run -Pq -p DynamicUser=yes -- skopeo`.
236    /// You can also set up arbitrary aspects of the child via e.g.
237    /// [`current_dir`] [`pre_exec`].
238    ///
239    /// [`current_dir`]: https://doc.rust-lang.org/std/process/struct.Command.html#method.current_dir
240    /// [`pre_exec`]: https://doc.rust-lang.org/std/os/unix/process/trait.CommandExt.html#tymethod.pre_exec
241    ///
242    /// The default is to wrap via util-linux `setpriv --pdeathsig SIGTERM -- skopeo`,
243    /// which on Linux binds the lifecycle of the child process to the parent.
244    ///
245    /// Note that you *must* add `skopeo` as the primary argument or
246    /// indirectly.  However, all other command line options including
247    /// `experimental-image-proxy` will be injected by this library.
248    /// You may use a different command name from `skopeo` if your
249    /// application has set up a compatible copy, e.g. `/usr/lib/myapp/my-private-skopeo`/
250    pub skopeo_cmd: Option<Command>,
251}
252
253/// Check if skopeo supports --user-agent-prefix by probing --help output
254fn supports_user_agent_prefix() -> bool {
255    static SUPPORTS_USER_AGENT: OnceLock<bool> = OnceLock::new();
256    *SUPPORTS_USER_AGENT.get_or_init(|| {
257        Command::new("skopeo")
258            .arg("--help")
259            .stdout(Stdio::piped())
260            .stderr(Stdio::null())
261            .output()
262            .ok()
263            .and_then(|output| {
264                String::from_utf8(output.stdout)
265                    .ok()
266                    .map(|help| help.contains("--user-agent-prefix"))
267            })
268            .unwrap_or(false)
269    })
270}
271
272impl TryFrom<ImageProxyConfig> for Command {
273    type Error = Error;
274
275    fn try_from(config: ImageProxyConfig) -> Result<Self> {
276        let debug = config.debug || std::env::var_os("CONTAINERS_IMAGE_PROXY_DEBUG").is_some();
277        let mut allocated_fds = RESERVED_FD_RANGE.clone();
278        let mut alloc_fd = || {
279            allocated_fds.next().ok_or_else(|| {
280                Error::Other("Ran out of reserved file descriptors for child".into())
281            })
282        };
283
284        // By default, we set up pdeathsig to "lifecycle bind" the child process to us.
285        let mut c = config.skopeo_cmd.unwrap_or_else(|| {
286            let mut c = std::process::Command::new("skopeo");
287            unsafe {
288                c.pre_exec(|| {
289                    Ok(rustix::process::set_parent_process_death_signal(Some(
290                        rustix::process::Signal::TERM,
291                    ))?)
292                });
293            }
294            c
295        });
296        c.arg("experimental-image-proxy");
297        if debug {
298            c.arg("--debug");
299        }
300        let auth_option_count = [
301            config.authfile.is_some(),
302            config.auth_data.is_some(),
303            config.auth_anonymous,
304        ]
305        .into_iter()
306        .filter(|&x| x)
307        .count();
308        if auth_option_count > 1 {
309            // This is a programmer error really
310            return Err(Error::Configuration(
311                "Conflicting authentication options".into(),
312            ));
313        }
314        if let Some(authfile) = config.authfile {
315            c.arg("--authfile");
316            c.arg(authfile);
317        } else if let Some(mut auth_data) = config.auth_data.map(std::io::BufReader::new) {
318            // If we get the authentication data as a file, we always copy it to a new temporary file under
319            // the assumption that the caller provided it this way to aid in privilege separation where
320            // the file is only readable to privileged code.
321            let target_fd = alloc_fd()?;
322            let tmpd = &cap_std::fs::Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
323            let mut tempfile =
324                cap_tempfile::TempFile::new_anonymous(tmpd).map(std::io::BufWriter::new)?;
325            std::io::copy(&mut auth_data, &mut tempfile)?;
326            let tempfile = tempfile
327                .into_inner()
328                .map_err(|e| e.into_error())?
329                .into_std();
330            let fd = std::sync::Arc::new(tempfile.into());
331            c.take_fd_n(fd, target_fd);
332            c.arg("--authfile");
333            c.arg(format!("/proc/self/fd/{target_fd}"));
334        } else if config.auth_anonymous {
335            c.arg("--no-creds");
336        }
337
338        if let Some(certificate_directory) = config.certificate_directory {
339            c.arg("--cert-dir");
340            c.arg(certificate_directory);
341        }
342
343        if let Some(decryption_keys) = config.decryption_keys {
344            for decryption_key in &decryption_keys {
345                c.arg("--decryption-key");
346                c.arg(decryption_key);
347            }
348        }
349
350        if config.insecure_skip_tls_verification.unwrap_or_default() {
351            c.arg("--tls-verify=false");
352        }
353
354        // Add user agent prefix if provided and supported by skopeo
355        if let Some(user_agent_prefix) = config.user_agent_prefix {
356            if supports_user_agent_prefix() {
357                c.arg("--user-agent-prefix");
358                c.arg(user_agent_prefix);
359            }
360        }
361
362        c.stdout(Stdio::null());
363        if !debug {
364            c.stderr(Stdio::piped());
365        }
366        Ok(c)
367    }
368}
369
370/// BlobInfo collects known information about a blob
371#[derive(Debug, serde::Deserialize)]
372pub struct ConvertedLayerInfo {
373    /// Uncompressed digest of a layer; for more information, see
374    /// https://github.com/opencontainers/image-spec/blob/main/config.md#layer-diffid
375    pub digest: Digest,
376
377    /// Size of blob
378    pub size: u64,
379
380    /// Mediatype of blob
381    pub media_type: oci_spec::image::MediaType,
382}
383
384/// A single fd; requires invoking FinishPipe
385#[derive(Debug)]
386struct FinishPipe {
387    pipeid: PipeId,
388    datafd: OwnedFd,
389}
390
391/// There is a data FD and an error FD. The error FD will be JSON.
392#[derive(Debug)]
393struct DualFds {
394    datafd: OwnedFd,
395    errfd: OwnedFd,
396}
397
398/// Helper trait for parsing the pipeid and/or file descriptors of a reply
399trait FromReplyFds: Send + 'static
400where
401    Self: Sized,
402{
403    fn from_reply(
404        iterable: impl IntoIterator<IntoIter: FusedIterator, Item = OwnedFd>,
405        pipeid: u32,
406    ) -> Result<Self>;
407}
408
409/// No file descriptors or pipeid expected
410impl FromReplyFds for () {
411    fn from_reply(fds: impl IntoIterator<Item = OwnedFd>, pipeid: u32) -> Result<Self> {
412        if fds.into_iter().next().is_some() {
413            return Err(Error::Other("expected no fds".into()));
414        }
415        if pipeid != 0 {
416            return Err(Error::Other("unexpected pipeid".into()));
417        }
418        Ok(())
419    }
420}
421
422/// A FinishPipe instance
423impl FromReplyFds for FinishPipe {
424    fn from_reply(fds: impl IntoIterator<Item = OwnedFd>, pipeid: u32) -> Result<Self> {
425        let Some(pipeid) = PipeId::try_new(pipeid) else {
426            return Err(Error::Other("Expected pipeid for FinishPipe".into()));
427        };
428        let datafd = fds
429            .into_iter()
430            .exactly_one()
431            .map_err(|_| Error::Other("Expected exactly one fd for FinishPipe".into()))?;
432        Ok(Self { pipeid, datafd })
433    }
434}
435
436/// A DualFds instance
437impl FromReplyFds for DualFds {
438    fn from_reply(fds: impl IntoIterator<Item = OwnedFd>, pipeid: u32) -> Result<Self> {
439        if pipeid != 0 {
440            return Err(Error::Other("Unexpected pipeid with DualFds".into()));
441        }
442        let [datafd, errfd] = fds
443            .into_iter()
444            .collect_array()
445            .ok_or_else(|| Error::Other("Expected two fds for DualFds".into()))?;
446        Ok(Self { datafd, errfd })
447    }
448}
449
450impl ImageProxy {
451    /// Create an image proxy that fetches the target image, using default configuration.
452    pub async fn new() -> Result<Self> {
453        Self::new_with_config(Default::default()).await
454    }
455
456    /// Create an image proxy that fetches the target image
457    #[instrument]
458    pub async fn new_with_config(config: ImageProxyConfig) -> Result<Self> {
459        let mut c = Command::try_from(config)?;
460        let (mysock, theirsock) = rustix::net::socketpair(
461            rustix::net::AddressFamily::UNIX,
462            rustix::net::SocketType::SEQPACKET,
463            rustix::net::SocketFlags::CLOEXEC,
464            None,
465        )?;
466        c.stdin(Stdio::from(theirsock));
467        let child = match c.spawn() {
468            Ok(c) => c,
469            Err(error) => return Err(Error::SkopeoSpawnError(error)),
470        };
471        tracing::debug!("Spawned skopeo pid={:?}", child.id());
472        // Here we use std sync API via thread because tokio installs
473        // a SIGCHLD handler which can conflict with e.g. the glib one
474        // which may also be in process.
475        // xref https://github.com/tokio-rs/tokio/issues/3520#issuecomment-968985861
476        let childwait = tokio::task::spawn_blocking(move || child.wait_with_output());
477        let sockfd = Arc::new(Mutex::new(mysock));
478
479        let mut r = Self {
480            sockfd,
481            childwait: Arc::new(AsyncMutex::new(Box::pin(childwait))),
482            protover: semver::Version::new(0, 0, 0),
483        };
484
485        // Verify semantic version
486        let protover: String = r.impl_request("Initialize", [(); 0]).await?;
487        tracing::debug!("Remote protocol version: {protover}");
488        let protover = semver::Version::parse(protover.as_str())?;
489        // Previously we had a feature to opt-in to requiring newer versions using `if cfg!()`.
490        let supported = base_proto_version();
491        if !supported.matches(&protover) {
492            return Err(Error::ProxyTooOld {
493                requested_version: protover.to_string().into(),
494                found_version: supported.to_string().into(),
495            });
496        }
497        r.protover = protover;
498
499        Ok(r)
500    }
501
502    /// Create and send a request. Should only be used by impl_request.
503    async fn impl_request_raw<T: serde::de::DeserializeOwned + Send + 'static, F: FromReplyFds>(
504        sockfd: Arc<Mutex<OwnedFd>>,
505        req: Request,
506    ) -> Result<(T, F)> {
507        tracing::trace!("sending request {}", req.method.as_str());
508        // TODO: Investigate https://crates.io/crates/uds for SOCK_SEQPACKET tokio
509        let r = tokio::task::spawn_blocking(move || {
510            let sockfd = sockfd.lock().unwrap();
511            let sendbuf = serde_json::to_vec(&req)?;
512            let sockfd = &*sockfd;
513            rustix::net::send(sockfd, &sendbuf, rustix::net::SendFlags::empty())?;
514            drop(sendbuf);
515            let mut buf = [0u8; MAX_MSG_SIZE];
516            let mut cmsg_space: Vec<std::mem::MaybeUninit<u8>> =
517                vec![std::mem::MaybeUninit::uninit(); rustix::cmsg_space!(ScmRights(1))];
518            let mut cmsg_buffer = rustix::net::RecvAncillaryBuffer::new(cmsg_space.as_mut_slice());
519            let iov = std::io::IoSliceMut::new(buf.as_mut());
520            let mut iov = [iov];
521            let nread = rustix::net::recvmsg(
522                sockfd,
523                &mut iov,
524                &mut cmsg_buffer,
525                rustix::net::RecvFlags::CMSG_CLOEXEC,
526            )?
527            .bytes;
528            let fdret = cmsg_buffer
529                .drain()
530                .filter_map(|m| match m {
531                    rustix::net::RecvAncillaryMessage::ScmRights(f) => Some(f),
532                    _ => None,
533                })
534                .flatten();
535            let buf = &buf[..nread];
536            let reply: Reply = serde_json::from_slice(buf)?;
537            if !reply.success {
538                return Err(Error::RequestInitiationFailure {
539                    method: req.method.clone().into(),
540                    error: reply.error.into(),
541                });
542            }
543            let fds = FromReplyFds::from_reply(fdret, reply.pipeid)?;
544            Ok((serde_json::from_value(reply.value)?, fds))
545        })
546        .await
547        .map_err(|e| Error::Other(e.to_string().into()))??;
548        tracing::trace!("completed request");
549        Ok(r)
550    }
551
552    /// Create a request that may return file descriptors, and also check for an unexpected
553    /// termination of the child process.
554    #[instrument(skip(args))]
555    async fn impl_request_with_fds<
556        T: serde::de::DeserializeOwned + Send + 'static,
557        F: FromReplyFds,
558    >(
559        &self,
560        method: &str,
561        args: impl IntoIterator<Item = impl Into<serde_json::Value>>,
562    ) -> Result<(T, F)> {
563        let req = Self::impl_request_raw(Arc::clone(&self.sockfd), Request::new(method, args));
564        let mut childwait = self.childwait.lock().await;
565        tokio::select! {
566            r = req => { r }
567            r = childwait.as_mut() => {
568                let r = r.map_err(|e| Error::Other(e.to_string().into()))??;
569                let stderr = String::from_utf8_lossy(&r.stderr);
570                Err(Error::Other(format!("skopeo proxy unexpectedly exited during request method {}: {}\n{}", method, r.status, stderr).into()))
571            }
572        }
573    }
574
575    /// A synchronous invocation which does not return any file descriptors.
576    async fn impl_request<T: serde::de::DeserializeOwned + Send + 'static>(
577        &self,
578        method: &str,
579        args: impl IntoIterator<Item = impl Into<serde_json::Value>>,
580    ) -> Result<T> {
581        let (r, ()) = self.impl_request_with_fds(method, args).await?;
582        Ok(r)
583    }
584
585    #[instrument]
586    async fn finish_pipe(&self, pipeid: PipeId) -> Result<()> {
587        tracing::debug!("closing pipe");
588        let (r, ()) = self
589            .impl_request_with_fds("FinishPipe", [pipeid.0.get()])
590            .await?;
591        Ok(r)
592    }
593
594    /// Open an image for fetching, using a string reference.
595    #[instrument]
596    pub async fn open_image(&self, imgref: &str) -> Result<OpenedImage> {
597        tracing::debug!("opening image");
598        let imgid = self.impl_request("OpenImage", [imgref]).await?;
599        Ok(OpenedImage(imgid))
600    }
601
602    /// Open an image for fetching, using a structured [`ImageReference`].
603    #[instrument]
604    pub async fn open_image_ref(&self, imgref: &ImageReference) -> Result<OpenedImage> {
605        self.open_image(&imgref.to_string()).await
606    }
607
608    /// Open an image for fetching if it exists, using a string reference.
609    /// Returns `Ok(None)` if the image does not exist.
610    #[instrument]
611    pub async fn open_image_optional(&self, imgref: &str) -> Result<Option<OpenedImage>> {
612        tracing::debug!("opening image");
613        let imgid = self.impl_request("OpenImageOptional", [imgref]).await?;
614        if imgid == 0 {
615            Ok(None)
616        } else {
617            Ok(Some(OpenedImage(imgid)))
618        }
619    }
620
621    /// Open an image for fetching if it exists, using a structured [`ImageReference`].
622    /// Returns `Ok(None)` if the image does not exist.
623    #[instrument]
624    pub async fn open_image_optional_ref(
625        &self,
626        imgref: &ImageReference,
627    ) -> Result<Option<OpenedImage>> {
628        self.open_image_optional(&imgref.to_string()).await
629    }
630
631    #[instrument]
632    pub async fn close_image(&self, img: &OpenedImage) -> Result<()> {
633        self.impl_request("CloseImage", [img.0]).await
634    }
635
636    async fn read_finish_pipe(&self, pipe: FinishPipe) -> Result<Vec<u8>> {
637        let fd = tokio::fs::File::from_std(std::fs::File::from(pipe.datafd));
638        let mut fd = tokio::io::BufReader::new(fd);
639        let mut r = Vec::new();
640        let reader = fd.read_to_end(&mut r);
641        let (nbytes, finish) = tokio::join!(reader, self.finish_pipe(pipe.pipeid));
642        finish?;
643        assert_eq!(nbytes?, r.len());
644        Ok(r)
645    }
646
647    /// Fetch the manifest as raw bytes, converted to OCI if necessary.
648    /// The original digest of the unconverted manifest is also returned.
649    /// For more information on OCI manifests, see <https://github.com/opencontainers/image-spec/blob/main/manifest.md>
650    pub async fn fetch_manifest_raw_oci(&self, img: &OpenedImage) -> Result<(String, Vec<u8>)> {
651        let (digest, pipefd) = self.impl_request_with_fds("GetManifest", [img.0]).await?;
652        Ok((digest, self.read_finish_pipe(pipefd).await?))
653    }
654
655    /// Fetch the manifest.
656    /// For more information on OCI manifests, see <https://github.com/opencontainers/image-spec/blob/main/manifest.md>
657    pub async fn fetch_manifest(
658        &self,
659        img: &OpenedImage,
660    ) -> Result<(String, oci_spec::image::ImageManifest)> {
661        let (digest, raw) = self.fetch_manifest_raw_oci(img).await?;
662        let manifest = serde_json::from_slice(&raw)?;
663        Ok((digest, manifest))
664    }
665
666    /// Fetch the config.
667    /// For more information on OCI config, see <https://github.com/opencontainers/image-spec/blob/main/config.md>
668    pub async fn fetch_config_raw(&self, img: &OpenedImage) -> Result<Vec<u8>> {
669        let ((), pipe) = self.impl_request_with_fds("GetFullConfig", [img.0]).await?;
670        self.read_finish_pipe(pipe).await
671    }
672
673    /// Fetch the config.
674    /// For more information on OCI config, see <https://github.com/opencontainers/image-spec/blob/main/config.md>
675    pub async fn fetch_config(
676        &self,
677        img: &OpenedImage,
678    ) -> Result<oci_spec::image::ImageConfiguration> {
679        let raw = self.fetch_config_raw(img).await?;
680        serde_json::from_slice(&raw).map_err(Into::into)
681    }
682
683    /// Fetch a blob identified by e.g. `sha256:<digest>`.
684    /// <https://github.com/opencontainers/image-spec/blob/main/descriptor.md>
685    ///
686    /// The requested size and digest are verified (by the proxy process).
687    ///
688    /// Note that because of the implementation details of this function, you should
689    /// [`futures::join!`] the returned futures instead of polling one after the other. The
690    /// secondary "driver" future will only return once everything has been read from
691    /// the reader future.
692    #[instrument]
693    pub async fn get_blob(
694        &self,
695        img: &OpenedImage,
696        digest: &Digest,
697        size: u64,
698    ) -> Result<(
699        impl AsyncBufRead + Send + Unpin,
700        impl Future<Output = Result<()>> + Unpin + '_,
701    )> {
702        // For previous discussion on digest/size verification, see
703        // https://github.com/cgwalters/container-image-proxy/issues/1#issuecomment-926712009
704        tracing::debug!("fetching blob");
705        let args: Vec<serde_json::Value> =
706            vec![img.0.into(), digest.to_string().into(), size.into()];
707        // Note that size may be -1 here if e.g. the remote registry doesn't give a Content-Length
708        // for example.
709        // We have always validated the size later (in FinishPipe) so out of conservatism we
710        // just ignore the size here.
711        let (_bloblen, pipe): (serde_json::Number, FinishPipe) =
712            self.impl_request_with_fds("GetBlob", args).await?;
713        let fd = tokio::fs::File::from_std(std::fs::File::from(pipe.datafd));
714        let fd = tokio::io::BufReader::new(fd);
715        let finish = Box::pin(self.finish_pipe(pipe.pipeid));
716        Ok((fd, finish))
717    }
718
719    async fn read_blob_error(fd: OwnedFd) -> std::result::Result<(), GetBlobError> {
720        let fd = tokio::fs::File::from_std(std::fs::File::from(fd));
721        let mut errfd = tokio::io::BufReader::new(fd);
722        let mut buf = Vec::new();
723        errfd
724            .read_to_end(&mut buf)
725            .await
726            .map_err(|e| GetBlobError::Other(e.to_string().into_boxed_str()))?;
727        if buf.is_empty() {
728            return Ok(());
729        }
730        #[derive(Deserialize)]
731        struct RemoteError {
732            code: String,
733            message: String,
734        }
735        let e: RemoteError = serde_json::from_slice(&buf)
736            .map_err(|e| GetBlobError::Other(e.to_string().into_boxed_str()))?;
737        match e.code.as_str() {
738            // Actually this is OK
739            "EPIPE" => Ok(()),
740            "retryable" => Err(GetBlobError::Retryable(e.message.into_boxed_str())),
741            _ => Err(GetBlobError::Other(e.message.into_boxed_str())),
742        }
743    }
744
745    /// Fetch a blob identified by e.g. `sha256:<digest>`; does not perform
746    /// any verification that the blob matches the digest. The size of the
747    /// blob (if available) and a pipe file descriptor are returned.
748    #[instrument]
749    pub async fn get_raw_blob(
750        &self,
751        img: &OpenedImage,
752        digest: &Digest,
753    ) -> Result<(
754        Option<u64>,
755        tokio::fs::File,
756        impl Future<Output = std::result::Result<(), GetBlobError>> + Unpin + '_,
757    )> {
758        tracing::debug!("fetching blob");
759        let args: Vec<serde_json::Value> = vec![img.0.into(), digest.to_string().into()];
760        let (bloblen, fds): (i64, DualFds) = self.impl_request_with_fds("GetRawBlob", args).await?;
761        // See the GetBlob case, we have a best-effort attempt to return the size, but it might not be known
762        let bloblen = u64::try_from(bloblen).ok();
763        let fd = tokio::fs::File::from_std(std::fs::File::from(fds.datafd));
764        let err = Self::read_blob_error(fds.errfd).boxed();
765        Ok((bloblen, fd, err))
766    }
767
768    /// Fetch a descriptor. The requested size and digest are verified (by the proxy process).
769    #[instrument]
770    pub async fn get_descriptor(
771        &self,
772        img: &OpenedImage,
773        descriptor: &Descriptor,
774    ) -> Result<(
775        impl AsyncBufRead + Send + Unpin,
776        impl Future<Output = Result<()>> + Unpin + '_,
777    )> {
778        self.get_blob(img, descriptor.digest(), descriptor.size())
779            .await
780    }
781
782    ///Returns data that can be used to find the "diffid" corresponding to a particular layer.
783    #[instrument]
784    pub async fn get_layer_info(
785        &self,
786        img: &OpenedImage,
787    ) -> Result<Option<Vec<ConvertedLayerInfo>>> {
788        tracing::debug!("Getting layer info");
789        if layer_info_piped_proto_version().matches(&self.protover) {
790            let ((), pipe) = self
791                .impl_request_with_fds("GetLayerInfoPiped", [img.0])
792                .await?;
793            let buf = self.read_finish_pipe(pipe).await?;
794            return Ok(Some(serde_json::from_slice(&buf)?));
795        }
796        if !layer_info_proto_version().matches(&self.protover) {
797            return Ok(None);
798        }
799        let layers = self.impl_request("GetLayerInfo", [img.0]).await?;
800        Ok(Some(layers))
801    }
802
803    /// Close the connection and wait for the child process to exit successfully.
804    #[instrument]
805    pub async fn finalize(self) -> Result<()> {
806        let _ = &self;
807        let req = Request::new_bare("Shutdown");
808        let sendbuf = serde_json::to_vec(&req)?;
809        // SAFETY: Only panics if a worker thread already panic'd
810        let sockfd = Arc::try_unwrap(self.sockfd).unwrap().into_inner().unwrap();
811        rustix::net::send(sockfd, &sendbuf, rustix::net::SendFlags::empty())?;
812        drop(sendbuf);
813        tracing::debug!("sent shutdown request");
814        let mut childwait = self.childwait.lock().await;
815        let output = childwait
816            .as_mut()
817            .await
818            .map_err(|e| Error::new_other(e.to_string()))??;
819        if !output.status.success() {
820            let stderr = String::from_utf8_lossy(&output.stderr);
821            return Err(Error::RequestReturned(
822                format!("proxy failed: {}\n{}", output.status, stderr).into(),
823            ));
824        }
825        tracing::debug!("proxy exited successfully");
826        Ok(())
827    }
828}
829
830#[cfg(test)]
831mod tests {
832    use std::io::{BufWriter, Seek, Write};
833    use std::os::fd::{AsRawFd, OwnedFd};
834
835    use super::*;
836    use cap_std_ext::cap_std::fs::Dir;
837    use rustix::fs::{memfd_create, MemfdFlags};
838
839    /// Check if we have skopeo
840    fn check_skopeo() -> bool {
841        static HAVE_SKOPEO: OnceLock<bool> = OnceLock::new();
842        *HAVE_SKOPEO.get_or_init(|| {
843            Command::new("skopeo")
844                .arg("--help")
845                .stdout(Stdio::null())
846                .stderr(Stdio::null())
847                .status()
848                .is_ok()
849        })
850    }
851
852    fn validate(c: Command, contains: &[&str], not_contains: &[&str]) {
853        // Format via debug, because
854        // https://doc.rust-lang.org/std/process/struct.Command.html#method.get_args
855        // is experimental
856        let d = format!("{:?}", c);
857        for c in contains {
858            assert!(d.contains(c), "{} missing {}", d, c);
859        }
860        for c in not_contains {
861            assert!(!d.contains(c), "{} should not contain {}", d, c);
862        }
863    }
864
865    #[test]
866    fn proxy_configs() {
867        let tmpd = &cap_tempfile::tempdir(cap_std::ambient_authority()).unwrap();
868
869        let c = Command::try_from(ImageProxyConfig::default()).unwrap();
870        validate(
871            c,
872            &["experimental-image-proxy"],
873            &["--no-creds", "--tls-verify", "--authfile"],
874        );
875
876        let c = Command::try_from(ImageProxyConfig {
877            authfile: Some(PathBuf::from("/path/to/authfile")),
878            ..Default::default()
879        })
880        .unwrap();
881        validate(c, &[r"--authfile", "/path/to/authfile"], &[]);
882
883        let decryption_key_path = "/path/to/decryption_key";
884        let c = Command::try_from(ImageProxyConfig {
885            decryption_keys: Some(vec![decryption_key_path.to_string()]),
886            ..Default::default()
887        })
888        .unwrap();
889        validate(c, &[r"--decryption-key", "/path/to/decryption_key"], &[]);
890
891        let c = Command::try_from(ImageProxyConfig {
892            certificate_directory: Some(PathBuf::from("/path/to/certs")),
893            ..Default::default()
894        })
895        .unwrap();
896        validate(c, &[r"--cert-dir", "/path/to/certs"], &[]);
897
898        let c = Command::try_from(ImageProxyConfig {
899            insecure_skip_tls_verification: Some(true),
900            ..Default::default()
901        })
902        .unwrap();
903        validate(c, &[r"--tls-verify=false"], &[]);
904
905        let mut tmpf = cap_tempfile::TempFile::new_anonymous(tmpd).unwrap();
906        tmpf.write_all(r#"{ "auths": {} "#.as_bytes()).unwrap();
907        tmpf.seek(std::io::SeekFrom::Start(0)).unwrap();
908        let c = Command::try_from(ImageProxyConfig {
909            auth_data: Some(tmpf.into_std()),
910            ..Default::default()
911        })
912        .unwrap();
913        validate(c, &["--authfile", "/proc/self/fd/100"], &[]);
914
915        // Test user-agent-prefix - only validate if supported
916        let c = Command::try_from(ImageProxyConfig {
917            user_agent_prefix: Some("bootc/1.0".to_string()),
918            ..Default::default()
919        })
920        .unwrap();
921        if supports_user_agent_prefix() {
922            validate(c, &["--user-agent-prefix", "bootc/1.0"], &[]);
923        } else {
924            validate(c, &[], &["--user-agent-prefix"]);
925        }
926    }
927
928    #[tokio::test]
929    async fn skopeo_not_found() {
930        let mut config = ImageProxyConfig {
931            ..ImageProxyConfig::default()
932        };
933        config.skopeo_cmd = Some(Command::new("no-skopeo"));
934
935        match ImageProxy::new_with_config(config).await {
936            Ok(_) => panic!("Expected an error"),
937            Err(ref e @ Error::SkopeoSpawnError(ref inner)) => {
938                assert_eq!(inner.kind(), std::io::ErrorKind::NotFound);
939                // Just to double check
940                assert!(e
941                    .to_string()
942                    .contains("skopeo spawn error: No such file or directory"));
943            }
944            Err(e) => panic!("Unexpected error {e}"),
945        }
946    }
947
948    #[tokio::test]
949    async fn test_proxy_send_sync() {
950        fn assert_send_sync(_x: impl Send + Sync) {}
951
952        let Ok(proxy) = ImageProxy::new().await else {
953            // doesn't matter: we only actually care to test if this compiles
954            return;
955        };
956        assert_send_sync(&proxy);
957        assert_send_sync(proxy);
958
959        let opened = OpenedImage(0);
960        assert_send_sync(&opened);
961        assert_send_sync(opened);
962    }
963
964    fn generate_err_fd(v: serde_json::Value) -> Result<OwnedFd> {
965        let tmp = Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
966        let mut tf = cap_tempfile::TempFile::new_anonymous(&tmp).map(BufWriter::new)?;
967        serde_json::to_writer(&mut tf, &v)?;
968        let mut tf = tf.into_inner().map_err(|e| e.into_error())?;
969        tf.seek(std::io::SeekFrom::Start(0))?;
970        let r = tf.into_std().into();
971        Ok(r)
972    }
973
974    #[tokio::test]
975    async fn test_read_blob_error_retryable() -> Result<()> {
976        let retryable = serde_json::json!({
977            "code": "retryable",
978            "message": "foo",
979        });
980        let retryable = generate_err_fd(retryable)?;
981        let err = ImageProxy::read_blob_error(retryable).boxed();
982        let e = err.await.unwrap_err();
983        match e {
984            GetBlobError::Retryable(s) => assert_eq!(s.as_ref(), "foo"),
985            _ => panic!("Unexpected error {e:?}"),
986        }
987        Ok(())
988    }
989
990    #[tokio::test]
991    async fn test_read_blob_error_none() -> Result<()> {
992        let tmp = Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
993        let tf = cap_tempfile::TempFile::new_anonymous(&tmp)?.into_std();
994        let err = ImageProxy::read_blob_error(tf.into()).boxed();
995        err.await.unwrap();
996        Ok(())
997    }
998
999    #[tokio::test]
1000    async fn test_read_blob_error_other() -> Result<()> {
1001        let other = serde_json::json!({
1002            "code": "other",
1003            "message": "bar",
1004        });
1005        let other = generate_err_fd(other)?;
1006        let err = ImageProxy::read_blob_error(other).boxed();
1007        let e = err.await.unwrap_err();
1008        match e {
1009            GetBlobError::Other(s) => assert_eq!(s.as_ref(), "bar"),
1010            _ => panic!("Unexpected error {e:?}"),
1011        }
1012        Ok(())
1013    }
1014
1015    #[tokio::test]
1016    async fn test_read_blob_error_epipe() -> Result<()> {
1017        let epipe = serde_json::json!({
1018            "code": "EPIPE",
1019            "message": "baz",
1020        });
1021        let epipe = generate_err_fd(epipe)?;
1022        let err = ImageProxy::read_blob_error(epipe).boxed();
1023        err.await.unwrap();
1024        Ok(())
1025    }
1026
1027    // Helper to create a dummy OwnedFd using memfd_create for testing.
1028    fn create_dummy_fd() -> OwnedFd {
1029        memfd_create(c"test-fd", MemfdFlags::CLOEXEC).unwrap()
1030    }
1031
1032    #[test]
1033    fn test_new_from_raw_values_finish_pipe() {
1034        let datafd = create_dummy_fd();
1035        // Keep a raw fd to compare later, as fds_and_pipeid consumes datafd
1036        let raw_datafd_val = datafd.as_raw_fd();
1037        let fds = vec![datafd];
1038        let v = FinishPipe::from_reply(fds, 1).unwrap();
1039        assert_eq!(v.pipeid.0.get(), 1);
1040        assert_eq!(v.datafd.as_raw_fd(), raw_datafd_val);
1041    }
1042
1043    #[test]
1044    fn test_new_from_raw_values_dual_fds() {
1045        let datafd = create_dummy_fd();
1046        let errfd = create_dummy_fd();
1047        let raw_datafd_val = datafd.as_raw_fd();
1048        let raw_errfd_val = errfd.as_raw_fd();
1049        let fds = vec![datafd, errfd];
1050        let v = DualFds::from_reply(fds, 0).unwrap();
1051        assert_eq!(v.datafd.as_raw_fd(), raw_datafd_val);
1052        assert_eq!(v.errfd.as_raw_fd(), raw_errfd_val);
1053    }
1054
1055    #[test]
1056    fn test_new_from_raw_values_error_too_many_fds() {
1057        let fds = vec![create_dummy_fd(), create_dummy_fd(), create_dummy_fd()];
1058        match DualFds::from_reply(fds, 0) {
1059            Ok(v) => unreachable!("{v:?}"),
1060            Err(Error::Other(msg)) => {
1061                assert_eq!(msg.as_ref(), "Expected two fds for DualFds")
1062            }
1063            Err(other) => unreachable!("{other}"),
1064        }
1065    }
1066
1067    #[test]
1068    fn test_new_from_raw_values_error_fd_with_zero_pipeid() {
1069        let fds = vec![create_dummy_fd()];
1070        match FinishPipe::from_reply(fds, 0) {
1071            Ok(v) => unreachable!("{v:?}"),
1072            Err(Error::Other(msg)) => {
1073                assert_eq!(msg.as_ref(), "Expected pipeid for FinishPipe")
1074            }
1075            Err(other) => unreachable!("{other}"),
1076        }
1077    }
1078
1079    #[test]
1080    fn test_new_from_raw_values_error_pipeid_with_both_fds() {
1081        let fds = vec![create_dummy_fd(), create_dummy_fd()];
1082        match DualFds::from_reply(fds, 1) {
1083            Ok(v) => unreachable!("{v:?}"),
1084            Err(Error::Other(msg)) => {
1085                assert_eq!(msg.as_ref(), "Unexpected pipeid with DualFds")
1086            }
1087            Err(other) => unreachable!("{other}"),
1088        }
1089    }
1090
1091    #[test]
1092    fn test_new_from_raw_values_error_no_fd_with_pipeid() {
1093        let fds: Vec<OwnedFd> = vec![];
1094        match FinishPipe::from_reply(fds, 1) {
1095            Ok(v) => unreachable!("{v:?}"),
1096            Err(Error::Other(msg)) => {
1097                assert_eq!(msg.as_ref(), "Expected exactly one fd for FinishPipe")
1098            }
1099            Err(other) => unreachable!("{other}"),
1100        }
1101    }
1102
1103    #[tokio::test]
1104    #[ignore = "https://github.com/coreos/rpm-ostree/issues/5442"]
1105    async fn test_open_optional() -> Result<()> {
1106        if !check_skopeo() {
1107            return Ok(());
1108        }
1109
1110        let td = tempfile::tempdir()?;
1111        let td = td.path().to_str().unwrap();
1112        let proxy = ImageProxy::new().await?;
1113        let imgpath = format!("oci-archive:{td}/some-nonexistent-image.ociarchive");
1114        let img = proxy.open_image_optional(&imgpath).await.unwrap();
1115        assert!(img.is_none());
1116
1117        Ok(())
1118    }
1119}