containers_image_proxy/
imageproxy.rs

1//! Run skopeo as a subprocess to fetch container images.
2//!
3//! This allows fetching a container image manifest and layers in a streaming fashion.
4//!
5//! More information: <https://github.com/containers/skopeo/pull/1476>
6
7use cap_std_ext::prelude::CapStdExtCommandExt;
8use cap_std_ext::{cap_std, cap_tempfile};
9use futures_util::{Future, FutureExt};
10use itertools::Itertools;
11use oci_spec::image::{Descriptor, Digest};
12use serde::{Deserialize, Serialize};
13use std::fs::File;
14use std::iter::FusedIterator;
15use std::num::NonZeroU32;
16use std::ops::Range;
17use std::os::fd::OwnedFd;
18use std::os::unix::prelude::CommandExt;
19use std::path::PathBuf;
20use std::pin::Pin;
21use std::process::{Command, Stdio};
22use std::sync::{Arc, Mutex, OnceLock};
23use thiserror::Error;
24use tokio::io::{AsyncBufRead, AsyncReadExt};
25use tokio::sync::Mutex as AsyncMutex;
26use tokio::task::JoinError;
27use tracing::instrument;
28
29/// Errors returned by this crate.
30#[derive(Error, Debug)]
31#[non_exhaustive]
32pub enum Error {
33    #[error("i/o error: {0}")]
34    /// An input/output error
35    Io(#[from] std::io::Error),
36    #[error("skopeo spawn error: {0}")]
37    /// An error spawning skopeo
38    SkopeoSpawnError(#[source] std::io::Error),
39    #[error("serialization error: {0}")]
40    /// Returned when serialization or deserialization fails
41    SerDe(#[from] serde_json::Error),
42    /// The proxy failed to initiate a request
43    #[error("failed to invoke method {method}: {error}")]
44    RequestInitiationFailure { method: Box<str>, error: Box<str> },
45    /// An error returned from the remote proxy
46    #[error("proxy request returned error: {0}")]
47    RequestReturned(Box<str>),
48    #[error("semantic version error: {0}")]
49    SemanticVersion(#[from] semver::Error),
50    #[error("proxy too old (requested={requested_version} found={found_version}) error")]
51    /// The proxy doesn't support the requested semantic version
52    ProxyTooOld {
53        requested_version: Box<str>,
54        found_version: Box<str>,
55    },
56    #[error("configuration error: {0}")]
57    /// Conflicting or missing configuration
58    Configuration(Box<str>),
59    #[error("other error: {0}")]
60    /// An unknown other error
61    Other(Box<str>),
62}
63
64impl Error {
65    pub(crate) fn new_other(e: impl Into<Box<str>>) -> Self {
66        Self::Other(e.into())
67    }
68}
69
70/// Errors returned by get_raw_blob
71#[derive(Error, Debug)]
72#[non_exhaustive]
73pub enum GetBlobError {
74    /// A client may reasonably retry on this type of error.
75    #[error("retryable error: {0}")]
76    Retryable(Box<str>),
77    #[error("other error: {0}")]
78    /// An unknown other error
79    Other(Box<str>),
80}
81
82impl From<rustix::io::Errno> for Error {
83    fn from(value: rustix::io::Errno) -> Self {
84        Self::Io(value.into())
85    }
86}
87
88/// The error type returned from this crate.
89pub type Result<T> = std::result::Result<T, Error>;
90
91/// Re-export because we use this in our public APIs
92pub use oci_spec;
93
94/// File descriptor range which is reserved for passing data down into the proxy;
95/// avoid configuring the command to use files in this range.  (Also, stdin is
96/// reserved)
97pub const RESERVED_FD_RANGE: Range<i32> = 100..200;
98
99// This is defined in skopeo; maximum size of JSON we will read/write.
100// Note that payload data (non-metadata) should go over a pipe file descriptor.
101const MAX_MSG_SIZE: usize = 32 * 1024;
102
103fn base_proto_version() -> &'static semver::VersionReq {
104    // Introduced in https://github.com/containers/skopeo/pull/1523
105    static BASE_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
106    BASE_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.3").unwrap())
107}
108
109fn layer_info_proto_version() -> &'static semver::VersionReq {
110    static LAYER_INFO_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
111    LAYER_INFO_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.5").unwrap())
112}
113
114fn layer_info_piped_proto_version() -> &'static semver::VersionReq {
115    static LAYER_INFO_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
116    LAYER_INFO_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.7").unwrap())
117}
118
119#[derive(Serialize)]
120struct Request {
121    method: String,
122    args: Vec<serde_json::Value>,
123}
124
125impl Request {
126    fn new<T, I>(method: &str, args: T) -> Self
127    where
128        T: IntoIterator<Item = I>,
129        I: Into<serde_json::Value>,
130    {
131        let args: Vec<_> = args.into_iter().map(|v| v.into()).collect();
132        Self {
133            method: method.to_string(),
134            args,
135        }
136    }
137
138    fn new_bare(method: &str) -> Self {
139        Self {
140            method: method.to_string(),
141            args: vec![],
142        }
143    }
144}
145
146#[derive(Deserialize)]
147struct Reply {
148    success: bool,
149    error: String,
150    pipeid: u32,
151    value: serde_json::Value,
152}
153
154type ChildFuture = Pin<
155    Box<
156        dyn Future<Output = std::result::Result<std::io::Result<std::process::Output>, JoinError>>
157            + Send,
158    >,
159>;
160
161/// Manage a child process proxy to fetch container images.
162pub struct ImageProxy {
163    sockfd: Arc<Mutex<OwnedFd>>,
164    childwait: Arc<AsyncMutex<ChildFuture>>,
165    protover: semver::Version,
166}
167
168impl std::fmt::Debug for ImageProxy {
169    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
170        f.debug_struct("ImageProxy").finish()
171    }
172}
173
174/// Opaque identifier for an image
175#[derive(Debug, PartialEq, Eq)]
176pub struct OpenedImage(u32);
177
178#[derive(Debug, PartialEq, Eq)]
179struct PipeId(NonZeroU32);
180
181impl PipeId {
182    fn try_new(pipeid: u32) -> Option<Self> {
183        Some(Self(NonZeroU32::new(pipeid)?))
184    }
185}
186
187/// Configuration for the proxy.
188#[derive(Debug, Default)]
189pub struct ImageProxyConfig {
190    /// Path to container auth file; equivalent to `skopeo --authfile`.
191    /// This conflicts with [`auth_data`].
192    pub authfile: Option<PathBuf>,
193
194    /// Data stream for container auth.  This conflicts with [`authfile`].
195    pub auth_data: Option<File>,
196
197    /// Do not use default container authentication paths; equivalent to `skopeo --no-creds`.
198    ///
199    /// Defaults to `false`; in other words, use the default file paths from `man containers-auth.json`.
200    pub auth_anonymous: bool,
201
202    // Directory with certificates (*.crt, *.cert, *.key) used to connect to registry
203    // Equivalent to `skopeo --cert-dir`
204    pub certificate_directory: Option<PathBuf>,
205
206    /// Decryption keys to decrypt an encrypted container image.
207    /// equivalent to `skopeo copy --decryption-key <path_to_decryption_key> `
208    pub decryption_keys: Option<Vec<String>>,
209
210    /// If set, disable TLS verification.  Equivalent to `skopeo --tls-verify=false`.
211    pub insecure_skip_tls_verification: Option<bool>,
212
213    /// Prefix to add to the user agent string. Equivalent to `skopeo --user-agent-prefix`.
214    /// The resulting user agent will be in the format "prefix skopeo/version".
215    /// This option is only used if the installed skopeo version supports it.
216    pub user_agent_prefix: Option<String>,
217
218    /// If enabled, propagate debug-logging level from the proxy via stderr to the
219    /// current process' stderr. Note than when enabled, this also means that standard
220    /// error will no longer be captured.
221    pub debug: bool,
222
223    /// Provide a configured [`std::process::Command`] instance.
224    ///
225    /// This allows configuring aspects of the resulting child `skopeo` process.
226    /// The intention of this hook is to allow the caller to use e.g.
227    /// `systemd-run` or equivalent containerization tools.  For example you
228    /// can set up a command whose arguments are `systemd-run -Pq -p DynamicUser=yes -- skopeo`.
229    /// You can also set up arbitrary aspects of the child via e.g.
230    /// [`current_dir`] [`pre_exec`].
231    ///
232    /// [`current_dir`]: https://doc.rust-lang.org/std/process/struct.Command.html#method.current_dir
233    /// [`pre_exec`]: https://doc.rust-lang.org/std/os/unix/process/trait.CommandExt.html#tymethod.pre_exec
234    ///
235    /// The default is to wrap via util-linux `setpriv --pdeathsig SIGTERM -- skopeo`,
236    /// which on Linux binds the lifecycle of the child process to the parent.
237    ///
238    /// Note that you *must* add `skopeo` as the primary argument or
239    /// indirectly.  However, all other command line options including
240    /// `experimental-image-proxy` will be injected by this library.
241    /// You may use a different command name from `skopeo` if your
242    /// application has set up a compatible copy, e.g. `/usr/lib/myapp/my-private-skopeo`/
243    pub skopeo_cmd: Option<Command>,
244}
245
246/// Check if skopeo supports --user-agent-prefix by probing --help output
247fn supports_user_agent_prefix() -> bool {
248    static SUPPORTS_USER_AGENT: OnceLock<bool> = OnceLock::new();
249    *SUPPORTS_USER_AGENT.get_or_init(|| {
250        Command::new("skopeo")
251            .arg("--help")
252            .stdout(Stdio::piped())
253            .stderr(Stdio::null())
254            .output()
255            .ok()
256            .and_then(|output| {
257                String::from_utf8(output.stdout)
258                    .ok()
259                    .map(|help| help.contains("--user-agent-prefix"))
260            })
261            .unwrap_or(false)
262    })
263}
264
265impl TryFrom<ImageProxyConfig> for Command {
266    type Error = Error;
267
268    fn try_from(config: ImageProxyConfig) -> Result<Self> {
269        let debug = config.debug || std::env::var_os("CONTAINERS_IMAGE_PROXY_DEBUG").is_some();
270        let mut allocated_fds = RESERVED_FD_RANGE.clone();
271        let mut alloc_fd = || {
272            allocated_fds.next().ok_or_else(|| {
273                Error::Other("Ran out of reserved file descriptors for child".into())
274            })
275        };
276
277        // By default, we set up pdeathsig to "lifecycle bind" the child process to us.
278        let mut c = config.skopeo_cmd.unwrap_or_else(|| {
279            let mut c = std::process::Command::new("skopeo");
280            unsafe {
281                c.pre_exec(|| {
282                    Ok(rustix::process::set_parent_process_death_signal(Some(
283                        rustix::process::Signal::TERM,
284                    ))?)
285                });
286            }
287            c
288        });
289        c.arg("experimental-image-proxy");
290        if debug {
291            c.arg("--debug");
292        }
293        let auth_option_count = [
294            config.authfile.is_some(),
295            config.auth_data.is_some(),
296            config.auth_anonymous,
297        ]
298        .into_iter()
299        .filter(|&x| x)
300        .count();
301        if auth_option_count > 1 {
302            // This is a programmer error really
303            return Err(Error::Configuration(
304                "Conflicting authentication options".into(),
305            ));
306        }
307        if let Some(authfile) = config.authfile {
308            c.arg("--authfile");
309            c.arg(authfile);
310        } else if let Some(mut auth_data) = config.auth_data.map(std::io::BufReader::new) {
311            // If we get the authentication data as a file, we always copy it to a new temporary file under
312            // the assumption that the caller provided it this way to aid in privilege separation where
313            // the file is only readable to privileged code.
314            let target_fd = alloc_fd()?;
315            let tmpd = &cap_std::fs::Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
316            let mut tempfile =
317                cap_tempfile::TempFile::new_anonymous(tmpd).map(std::io::BufWriter::new)?;
318            std::io::copy(&mut auth_data, &mut tempfile)?;
319            let tempfile = tempfile
320                .into_inner()
321                .map_err(|e| e.into_error())?
322                .into_std();
323            let fd = std::sync::Arc::new(tempfile.into());
324            c.take_fd_n(fd, target_fd);
325            c.arg("--authfile");
326            c.arg(format!("/proc/self/fd/{target_fd}"));
327        } else if config.auth_anonymous {
328            c.arg("--no-creds");
329        }
330
331        if let Some(certificate_directory) = config.certificate_directory {
332            c.arg("--cert-dir");
333            c.arg(certificate_directory);
334        }
335
336        if let Some(decryption_keys) = config.decryption_keys {
337            for decryption_key in &decryption_keys {
338                c.arg("--decryption-key");
339                c.arg(decryption_key);
340            }
341        }
342
343        if config.insecure_skip_tls_verification.unwrap_or_default() {
344            c.arg("--tls-verify=false");
345        }
346
347        // Add user agent prefix if provided and supported by skopeo
348        if let Some(user_agent_prefix) = config.user_agent_prefix {
349            if supports_user_agent_prefix() {
350                c.arg("--user-agent-prefix");
351                c.arg(user_agent_prefix);
352            }
353        }
354
355        c.stdout(Stdio::null());
356        if !debug {
357            c.stderr(Stdio::piped());
358        }
359        Ok(c)
360    }
361}
362
363/// BlobInfo collects known information about a blob
364#[derive(Debug, serde::Deserialize)]
365pub struct ConvertedLayerInfo {
366    /// Uncompressed digest of a layer; for more information, see
367    /// https://github.com/opencontainers/image-spec/blob/main/config.md#layer-diffid
368    pub digest: Digest,
369
370    /// Size of blob
371    pub size: u64,
372
373    /// Mediatype of blob
374    pub media_type: oci_spec::image::MediaType,
375}
376
377/// A single fd; requires invoking FinishPipe
378#[derive(Debug)]
379struct FinishPipe {
380    pipeid: PipeId,
381    datafd: OwnedFd,
382}
383
384/// There is a data FD and an error FD. The error FD will be JSON.
385#[derive(Debug)]
386struct DualFds {
387    datafd: OwnedFd,
388    errfd: OwnedFd,
389}
390
391/// Helper trait for parsing the pipeid and/or file descriptors of a reply
392trait FromReplyFds: Send + 'static
393where
394    Self: Sized,
395{
396    fn from_reply(
397        iterable: impl IntoIterator<IntoIter: FusedIterator, Item = OwnedFd>,
398        pipeid: u32,
399    ) -> Result<Self>;
400}
401
402/// No file descriptors or pipeid expected
403impl FromReplyFds for () {
404    fn from_reply(fds: impl IntoIterator<Item = OwnedFd>, pipeid: u32) -> Result<Self> {
405        if fds.into_iter().next().is_some() {
406            return Err(Error::Other("expected no fds".into()));
407        }
408        if pipeid != 0 {
409            return Err(Error::Other("unexpected pipeid".into()));
410        }
411        Ok(())
412    }
413}
414
415/// A FinishPipe instance
416impl FromReplyFds for FinishPipe {
417    fn from_reply(fds: impl IntoIterator<Item = OwnedFd>, pipeid: u32) -> Result<Self> {
418        let Some(pipeid) = PipeId::try_new(pipeid) else {
419            return Err(Error::Other("Expected pipeid for FinishPipe".into()));
420        };
421        let datafd = fds
422            .into_iter()
423            .exactly_one()
424            .map_err(|_| Error::Other("Expected exactly one fd for FinishPipe".into()))?;
425        Ok(Self { pipeid, datafd })
426    }
427}
428
429/// A DualFds instance
430impl FromReplyFds for DualFds {
431    fn from_reply(fds: impl IntoIterator<Item = OwnedFd>, pipeid: u32) -> Result<Self> {
432        if pipeid != 0 {
433            return Err(Error::Other("Unexpected pipeid with DualFds".into()));
434        }
435        let [datafd, errfd] = fds
436            .into_iter()
437            .collect_array()
438            .ok_or_else(|| Error::Other("Expected two fds for DualFds".into()))?;
439        Ok(Self { datafd, errfd })
440    }
441}
442
443impl ImageProxy {
444    /// Create an image proxy that fetches the target image, using default configuration.
445    pub async fn new() -> Result<Self> {
446        Self::new_with_config(Default::default()).await
447    }
448
449    /// Create an image proxy that fetches the target image
450    #[instrument]
451    pub async fn new_with_config(config: ImageProxyConfig) -> Result<Self> {
452        let mut c = Command::try_from(config)?;
453        let (mysock, theirsock) = rustix::net::socketpair(
454            rustix::net::AddressFamily::UNIX,
455            rustix::net::SocketType::SEQPACKET,
456            rustix::net::SocketFlags::CLOEXEC,
457            None,
458        )?;
459        c.stdin(Stdio::from(theirsock));
460        let child = match c.spawn() {
461            Ok(c) => c,
462            Err(error) => return Err(Error::SkopeoSpawnError(error)),
463        };
464        tracing::debug!("Spawned skopeo pid={:?}", child.id());
465        // Here we use std sync API via thread because tokio installs
466        // a SIGCHLD handler which can conflict with e.g. the glib one
467        // which may also be in process.
468        // xref https://github.com/tokio-rs/tokio/issues/3520#issuecomment-968985861
469        let childwait = tokio::task::spawn_blocking(move || child.wait_with_output());
470        let sockfd = Arc::new(Mutex::new(mysock));
471
472        let mut r = Self {
473            sockfd,
474            childwait: Arc::new(AsyncMutex::new(Box::pin(childwait))),
475            protover: semver::Version::new(0, 0, 0),
476        };
477
478        // Verify semantic version
479        let protover: String = r.impl_request("Initialize", [(); 0]).await?;
480        tracing::debug!("Remote protocol version: {protover}");
481        let protover = semver::Version::parse(protover.as_str())?;
482        // Previously we had a feature to opt-in to requiring newer versions using `if cfg!()`.
483        let supported = base_proto_version();
484        if !supported.matches(&protover) {
485            return Err(Error::ProxyTooOld {
486                requested_version: protover.to_string().into(),
487                found_version: supported.to_string().into(),
488            });
489        }
490        r.protover = protover;
491
492        Ok(r)
493    }
494
495    /// Create and send a request. Should only be used by impl_request.
496    async fn impl_request_raw<T: serde::de::DeserializeOwned + Send + 'static, F: FromReplyFds>(
497        sockfd: Arc<Mutex<OwnedFd>>,
498        req: Request,
499    ) -> Result<(T, F)> {
500        tracing::trace!("sending request {}", req.method.as_str());
501        // TODO: Investigate https://crates.io/crates/uds for SOCK_SEQPACKET tokio
502        let r = tokio::task::spawn_blocking(move || {
503            let sockfd = sockfd.lock().unwrap();
504            let sendbuf = serde_json::to_vec(&req)?;
505            let sockfd = &*sockfd;
506            rustix::net::send(sockfd, &sendbuf, rustix::net::SendFlags::empty())?;
507            drop(sendbuf);
508            let mut buf = [0u8; MAX_MSG_SIZE];
509            let mut cmsg_space: Vec<std::mem::MaybeUninit<u8>> =
510                vec![std::mem::MaybeUninit::uninit(); rustix::cmsg_space!(ScmRights(1))];
511            let mut cmsg_buffer = rustix::net::RecvAncillaryBuffer::new(cmsg_space.as_mut_slice());
512            let iov = std::io::IoSliceMut::new(buf.as_mut());
513            let mut iov = [iov];
514            let nread = rustix::net::recvmsg(
515                sockfd,
516                &mut iov,
517                &mut cmsg_buffer,
518                rustix::net::RecvFlags::CMSG_CLOEXEC,
519            )?
520            .bytes;
521            let fdret = cmsg_buffer
522                .drain()
523                .filter_map(|m| match m {
524                    rustix::net::RecvAncillaryMessage::ScmRights(f) => Some(f),
525                    _ => None,
526                })
527                .flatten();
528            let buf = &buf[..nread];
529            let reply: Reply = serde_json::from_slice(buf)?;
530            if !reply.success {
531                return Err(Error::RequestInitiationFailure {
532                    method: req.method.clone().into(),
533                    error: reply.error.into(),
534                });
535            }
536            let fds = FromReplyFds::from_reply(fdret, reply.pipeid)?;
537            Ok((serde_json::from_value(reply.value)?, fds))
538        })
539        .await
540        .map_err(|e| Error::Other(e.to_string().into()))??;
541        tracing::trace!("completed request");
542        Ok(r)
543    }
544
545    /// Create a request that may return file descriptors, and also check for an unexpected
546    /// termination of the child process.
547    #[instrument(skip(args))]
548    async fn impl_request_with_fds<
549        T: serde::de::DeserializeOwned + Send + 'static,
550        F: FromReplyFds,
551    >(
552        &self,
553        method: &str,
554        args: impl IntoIterator<Item = impl Into<serde_json::Value>>,
555    ) -> Result<(T, F)> {
556        let req = Self::impl_request_raw(Arc::clone(&self.sockfd), Request::new(method, args));
557        let mut childwait = self.childwait.lock().await;
558        tokio::select! {
559            r = req => { r }
560            r = childwait.as_mut() => {
561                let r = r.map_err(|e| Error::Other(e.to_string().into()))??;
562                let stderr = String::from_utf8_lossy(&r.stderr);
563                Err(Error::Other(format!("skopeo proxy unexpectedly exited during request method {}: {}\n{}", method, r.status, stderr).into()))
564            }
565        }
566    }
567
568    /// A synchronous invocation which does not return any file descriptors.
569    async fn impl_request<T: serde::de::DeserializeOwned + Send + 'static>(
570        &self,
571        method: &str,
572        args: impl IntoIterator<Item = impl Into<serde_json::Value>>,
573    ) -> Result<T> {
574        let (r, ()) = self.impl_request_with_fds(method, args).await?;
575        Ok(r)
576    }
577
578    #[instrument]
579    async fn finish_pipe(&self, pipeid: PipeId) -> Result<()> {
580        tracing::debug!("closing pipe");
581        let (r, ()) = self
582            .impl_request_with_fds("FinishPipe", [pipeid.0.get()])
583            .await?;
584        Ok(r)
585    }
586
587    #[instrument]
588    pub async fn open_image(&self, imgref: &str) -> Result<OpenedImage> {
589        tracing::debug!("opening image");
590        let imgid = self.impl_request("OpenImage", [imgref]).await?;
591        Ok(OpenedImage(imgid))
592    }
593
594    #[instrument]
595    pub async fn open_image_optional(&self, imgref: &str) -> Result<Option<OpenedImage>> {
596        tracing::debug!("opening image");
597        let imgid = self.impl_request("OpenImageOptional", [imgref]).await?;
598        if imgid == 0 {
599            Ok(None)
600        } else {
601            Ok(Some(OpenedImage(imgid)))
602        }
603    }
604
605    #[instrument]
606    pub async fn close_image(&self, img: &OpenedImage) -> Result<()> {
607        self.impl_request("CloseImage", [img.0]).await
608    }
609
610    async fn read_finish_pipe(&self, pipe: FinishPipe) -> Result<Vec<u8>> {
611        let fd = tokio::fs::File::from_std(std::fs::File::from(pipe.datafd));
612        let mut fd = tokio::io::BufReader::new(fd);
613        let mut r = Vec::new();
614        let reader = fd.read_to_end(&mut r);
615        let (nbytes, finish) = tokio::join!(reader, self.finish_pipe(pipe.pipeid));
616        finish?;
617        assert_eq!(nbytes?, r.len());
618        Ok(r)
619    }
620
621    /// Fetch the manifest as raw bytes, converted to OCI if necessary.
622    /// The original digest of the unconverted manifest is also returned.
623    /// For more information on OCI manifests, see <https://github.com/opencontainers/image-spec/blob/main/manifest.md>
624    pub async fn fetch_manifest_raw_oci(&self, img: &OpenedImage) -> Result<(String, Vec<u8>)> {
625        let (digest, pipefd) = self.impl_request_with_fds("GetManifest", [img.0]).await?;
626        Ok((digest, self.read_finish_pipe(pipefd).await?))
627    }
628
629    /// Fetch the manifest.
630    /// For more information on OCI manifests, see <https://github.com/opencontainers/image-spec/blob/main/manifest.md>
631    pub async fn fetch_manifest(
632        &self,
633        img: &OpenedImage,
634    ) -> Result<(String, oci_spec::image::ImageManifest)> {
635        let (digest, raw) = self.fetch_manifest_raw_oci(img).await?;
636        let manifest = serde_json::from_slice(&raw)?;
637        Ok((digest, manifest))
638    }
639
640    /// Fetch the config.
641    /// For more information on OCI config, see <https://github.com/opencontainers/image-spec/blob/main/config.md>
642    pub async fn fetch_config_raw(&self, img: &OpenedImage) -> Result<Vec<u8>> {
643        let ((), pipe) = self.impl_request_with_fds("GetFullConfig", [img.0]).await?;
644        self.read_finish_pipe(pipe).await
645    }
646
647    /// Fetch the config.
648    /// For more information on OCI config, see <https://github.com/opencontainers/image-spec/blob/main/config.md>
649    pub async fn fetch_config(
650        &self,
651        img: &OpenedImage,
652    ) -> Result<oci_spec::image::ImageConfiguration> {
653        let raw = self.fetch_config_raw(img).await?;
654        serde_json::from_slice(&raw).map_err(Into::into)
655    }
656
657    /// Fetch a blob identified by e.g. `sha256:<digest>`.
658    /// <https://github.com/opencontainers/image-spec/blob/main/descriptor.md>
659    ///
660    /// The requested size and digest are verified (by the proxy process).
661    ///
662    /// Note that because of the implementation details of this function, you should
663    /// [`futures::join!`] the returned futures instead of polling one after the other. The
664    /// secondary "driver" future will only return once everything has been read from
665    /// the reader future.
666    #[instrument]
667    pub async fn get_blob(
668        &self,
669        img: &OpenedImage,
670        digest: &Digest,
671        size: u64,
672    ) -> Result<(
673        impl AsyncBufRead + Send + Unpin,
674        impl Future<Output = Result<()>> + Unpin + '_,
675    )> {
676        // For previous discussion on digest/size verification, see
677        // https://github.com/cgwalters/container-image-proxy/issues/1#issuecomment-926712009
678        tracing::debug!("fetching blob");
679        let args: Vec<serde_json::Value> =
680            vec![img.0.into(), digest.to_string().into(), size.into()];
681        // Note that size may be -1 here if e.g. the remote registry doesn't give a Content-Length
682        // for example.
683        // We have always validated the size later (in FinishPipe) so out of conservatism we
684        // just ignore the size here.
685        let (_bloblen, pipe): (serde_json::Number, FinishPipe) =
686            self.impl_request_with_fds("GetBlob", args).await?;
687        let fd = tokio::fs::File::from_std(std::fs::File::from(pipe.datafd));
688        let fd = tokio::io::BufReader::new(fd);
689        let finish = Box::pin(self.finish_pipe(pipe.pipeid));
690        Ok((fd, finish))
691    }
692
693    async fn read_blob_error(fd: OwnedFd) -> std::result::Result<(), GetBlobError> {
694        let fd = tokio::fs::File::from_std(std::fs::File::from(fd));
695        let mut errfd = tokio::io::BufReader::new(fd);
696        let mut buf = Vec::new();
697        errfd
698            .read_to_end(&mut buf)
699            .await
700            .map_err(|e| GetBlobError::Other(e.to_string().into_boxed_str()))?;
701        if buf.is_empty() {
702            return Ok(());
703        }
704        #[derive(Deserialize)]
705        struct RemoteError {
706            code: String,
707            message: String,
708        }
709        let e: RemoteError = serde_json::from_slice(&buf)
710            .map_err(|e| GetBlobError::Other(e.to_string().into_boxed_str()))?;
711        match e.code.as_str() {
712            // Actually this is OK
713            "EPIPE" => Ok(()),
714            "retryable" => Err(GetBlobError::Retryable(e.message.into_boxed_str())),
715            _ => Err(GetBlobError::Other(e.message.into_boxed_str())),
716        }
717    }
718
719    /// Fetch a blob identified by e.g. `sha256:<digest>`; does not perform
720    /// any verification that the blob matches the digest. The size of the
721    /// blob (if available) and a pipe file descriptor are returned.
722    #[instrument]
723    pub async fn get_raw_blob(
724        &self,
725        img: &OpenedImage,
726        digest: &Digest,
727    ) -> Result<(
728        Option<u64>,
729        tokio::fs::File,
730        impl Future<Output = std::result::Result<(), GetBlobError>> + Unpin + '_,
731    )> {
732        tracing::debug!("fetching blob");
733        let args: Vec<serde_json::Value> = vec![img.0.into(), digest.to_string().into()];
734        let (bloblen, fds): (i64, DualFds) = self.impl_request_with_fds("GetRawBlob", args).await?;
735        // See the GetBlob case, we have a best-effort attempt to return the size, but it might not be known
736        let bloblen = u64::try_from(bloblen).ok();
737        let fd = tokio::fs::File::from_std(std::fs::File::from(fds.datafd));
738        let err = Self::read_blob_error(fds.errfd).boxed();
739        Ok((bloblen, fd, err))
740    }
741
742    /// Fetch a descriptor. The requested size and digest are verified (by the proxy process).
743    #[instrument]
744    pub async fn get_descriptor(
745        &self,
746        img: &OpenedImage,
747        descriptor: &Descriptor,
748    ) -> Result<(
749        impl AsyncBufRead + Send + Unpin,
750        impl Future<Output = Result<()>> + Unpin + '_,
751    )> {
752        self.get_blob(img, descriptor.digest(), descriptor.size())
753            .await
754    }
755
756    ///Returns data that can be used to find the "diffid" corresponding to a particular layer.
757    #[instrument]
758    pub async fn get_layer_info(
759        &self,
760        img: &OpenedImage,
761    ) -> Result<Option<Vec<ConvertedLayerInfo>>> {
762        tracing::debug!("Getting layer info");
763        if layer_info_piped_proto_version().matches(&self.protover) {
764            let ((), pipe) = self
765                .impl_request_with_fds("GetLayerInfoPiped", [img.0])
766                .await?;
767            let buf = self.read_finish_pipe(pipe).await?;
768            return Ok(Some(serde_json::from_slice(&buf)?));
769        }
770        if !layer_info_proto_version().matches(&self.protover) {
771            return Ok(None);
772        }
773        let layers = self.impl_request("GetLayerInfo", [img.0]).await?;
774        Ok(Some(layers))
775    }
776
777    /// Close the connection and wait for the child process to exit successfully.
778    #[instrument]
779    pub async fn finalize(self) -> Result<()> {
780        let _ = &self;
781        let req = Request::new_bare("Shutdown");
782        let sendbuf = serde_json::to_vec(&req)?;
783        // SAFETY: Only panics if a worker thread already panic'd
784        let sockfd = Arc::try_unwrap(self.sockfd).unwrap().into_inner().unwrap();
785        rustix::net::send(sockfd, &sendbuf, rustix::net::SendFlags::empty())?;
786        drop(sendbuf);
787        tracing::debug!("sent shutdown request");
788        let mut childwait = self.childwait.lock().await;
789        let output = childwait
790            .as_mut()
791            .await
792            .map_err(|e| Error::new_other(e.to_string()))??;
793        if !output.status.success() {
794            let stderr = String::from_utf8_lossy(&output.stderr);
795            return Err(Error::RequestReturned(
796                format!("proxy failed: {}\n{}", output.status, stderr).into(),
797            ));
798        }
799        tracing::debug!("proxy exited successfully");
800        Ok(())
801    }
802}
803
804#[cfg(test)]
805mod tests {
806    use std::io::{BufWriter, Seek, Write};
807    use std::os::fd::{AsRawFd, OwnedFd};
808
809    use super::*;
810    use cap_std_ext::cap_std::fs::Dir;
811    use rustix::fs::{memfd_create, MemfdFlags};
812
813    /// Check if we have skopeo
814    fn check_skopeo() -> bool {
815        static HAVE_SKOPEO: OnceLock<bool> = OnceLock::new();
816        *HAVE_SKOPEO.get_or_init(|| {
817            Command::new("skopeo")
818                .arg("--help")
819                .stdout(Stdio::null())
820                .stderr(Stdio::null())
821                .status()
822                .is_ok()
823        })
824    }
825
826    fn validate(c: Command, contains: &[&str], not_contains: &[&str]) {
827        // Format via debug, because
828        // https://doc.rust-lang.org/std/process/struct.Command.html#method.get_args
829        // is experimental
830        let d = format!("{:?}", c);
831        for c in contains {
832            assert!(d.contains(c), "{} missing {}", d, c);
833        }
834        for c in not_contains {
835            assert!(!d.contains(c), "{} should not contain {}", d, c);
836        }
837    }
838
839    #[test]
840    fn proxy_configs() {
841        let tmpd = &cap_tempfile::tempdir(cap_std::ambient_authority()).unwrap();
842
843        let c = Command::try_from(ImageProxyConfig::default()).unwrap();
844        validate(
845            c,
846            &["experimental-image-proxy"],
847            &["--no-creds", "--tls-verify", "--authfile"],
848        );
849
850        let c = Command::try_from(ImageProxyConfig {
851            authfile: Some(PathBuf::from("/path/to/authfile")),
852            ..Default::default()
853        })
854        .unwrap();
855        validate(c, &[r"--authfile", "/path/to/authfile"], &[]);
856
857        let decryption_key_path = "/path/to/decryption_key";
858        let c = Command::try_from(ImageProxyConfig {
859            decryption_keys: Some(vec![decryption_key_path.to_string()]),
860            ..Default::default()
861        })
862        .unwrap();
863        validate(c, &[r"--decryption-key", "/path/to/decryption_key"], &[]);
864
865        let c = Command::try_from(ImageProxyConfig {
866            certificate_directory: Some(PathBuf::from("/path/to/certs")),
867            ..Default::default()
868        })
869        .unwrap();
870        validate(c, &[r"--cert-dir", "/path/to/certs"], &[]);
871
872        let c = Command::try_from(ImageProxyConfig {
873            insecure_skip_tls_verification: Some(true),
874            ..Default::default()
875        })
876        .unwrap();
877        validate(c, &[r"--tls-verify=false"], &[]);
878
879        let mut tmpf = cap_tempfile::TempFile::new_anonymous(tmpd).unwrap();
880        tmpf.write_all(r#"{ "auths": {} "#.as_bytes()).unwrap();
881        tmpf.seek(std::io::SeekFrom::Start(0)).unwrap();
882        let c = Command::try_from(ImageProxyConfig {
883            auth_data: Some(tmpf.into_std()),
884            ..Default::default()
885        })
886        .unwrap();
887        validate(c, &["--authfile", "/proc/self/fd/100"], &[]);
888
889        // Test user-agent-prefix - only validate if supported
890        let c = Command::try_from(ImageProxyConfig {
891            user_agent_prefix: Some("bootc/1.0".to_string()),
892            ..Default::default()
893        })
894        .unwrap();
895        if supports_user_agent_prefix() {
896            validate(c, &["--user-agent-prefix", "bootc/1.0"], &[]);
897        } else {
898            validate(c, &[], &["--user-agent-prefix"]);
899        }
900    }
901
902    #[tokio::test]
903    async fn skopeo_not_found() {
904        let mut config = ImageProxyConfig {
905            ..ImageProxyConfig::default()
906        };
907        config.skopeo_cmd = Some(Command::new("no-skopeo"));
908
909        match ImageProxy::new_with_config(config).await {
910            Ok(_) => panic!("Expected an error"),
911            Err(ref e @ Error::SkopeoSpawnError(ref inner)) => {
912                assert_eq!(inner.kind(), std::io::ErrorKind::NotFound);
913                // Just to double check
914                assert!(e
915                    .to_string()
916                    .contains("skopeo spawn error: No such file or directory"));
917            }
918            Err(e) => panic!("Unexpected error {e}"),
919        }
920    }
921
922    #[tokio::test]
923    async fn test_proxy_send_sync() {
924        fn assert_send_sync(_x: impl Send + Sync) {}
925
926        let Ok(proxy) = ImageProxy::new().await else {
927            // doesn't matter: we only actually care to test if this compiles
928            return;
929        };
930        assert_send_sync(&proxy);
931        assert_send_sync(proxy);
932
933        let opened = OpenedImage(0);
934        assert_send_sync(&opened);
935        assert_send_sync(opened);
936    }
937
938    fn generate_err_fd(v: serde_json::Value) -> Result<OwnedFd> {
939        let tmp = Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
940        let mut tf = cap_tempfile::TempFile::new_anonymous(&tmp).map(BufWriter::new)?;
941        serde_json::to_writer(&mut tf, &v)?;
942        let mut tf = tf.into_inner().map_err(|e| e.into_error())?;
943        tf.seek(std::io::SeekFrom::Start(0))?;
944        let r = tf.into_std().into();
945        Ok(r)
946    }
947
948    #[tokio::test]
949    async fn test_read_blob_error_retryable() -> Result<()> {
950        let retryable = serde_json::json!({
951            "code": "retryable",
952            "message": "foo",
953        });
954        let retryable = generate_err_fd(retryable)?;
955        let err = ImageProxy::read_blob_error(retryable).boxed();
956        let e = err.await.unwrap_err();
957        match e {
958            GetBlobError::Retryable(s) => assert_eq!(s.as_ref(), "foo"),
959            _ => panic!("Unexpected error {e:?}"),
960        }
961        Ok(())
962    }
963
964    #[tokio::test]
965    async fn test_read_blob_error_none() -> Result<()> {
966        let tmp = Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
967        let tf = cap_tempfile::TempFile::new_anonymous(&tmp)?.into_std();
968        let err = ImageProxy::read_blob_error(tf.into()).boxed();
969        err.await.unwrap();
970        Ok(())
971    }
972
973    #[tokio::test]
974    async fn test_read_blob_error_other() -> Result<()> {
975        let other = serde_json::json!({
976            "code": "other",
977            "message": "bar",
978        });
979        let other = generate_err_fd(other)?;
980        let err = ImageProxy::read_blob_error(other).boxed();
981        let e = err.await.unwrap_err();
982        match e {
983            GetBlobError::Other(s) => assert_eq!(s.as_ref(), "bar"),
984            _ => panic!("Unexpected error {e:?}"),
985        }
986        Ok(())
987    }
988
989    #[tokio::test]
990    async fn test_read_blob_error_epipe() -> Result<()> {
991        let epipe = serde_json::json!({
992            "code": "EPIPE",
993            "message": "baz",
994        });
995        let epipe = generate_err_fd(epipe)?;
996        let err = ImageProxy::read_blob_error(epipe).boxed();
997        err.await.unwrap();
998        Ok(())
999    }
1000
1001    // Helper to create a dummy OwnedFd using memfd_create for testing.
1002    fn create_dummy_fd() -> OwnedFd {
1003        memfd_create(c"test-fd", MemfdFlags::CLOEXEC).unwrap()
1004    }
1005
1006    #[test]
1007    fn test_new_from_raw_values_finish_pipe() {
1008        let datafd = create_dummy_fd();
1009        // Keep a raw fd to compare later, as fds_and_pipeid consumes datafd
1010        let raw_datafd_val = datafd.as_raw_fd();
1011        let fds = vec![datafd];
1012        let v = FinishPipe::from_reply(fds, 1).unwrap();
1013        assert_eq!(v.pipeid.0.get(), 1);
1014        assert_eq!(v.datafd.as_raw_fd(), raw_datafd_val);
1015    }
1016
1017    #[test]
1018    fn test_new_from_raw_values_dual_fds() {
1019        let datafd = create_dummy_fd();
1020        let errfd = create_dummy_fd();
1021        let raw_datafd_val = datafd.as_raw_fd();
1022        let raw_errfd_val = errfd.as_raw_fd();
1023        let fds = vec![datafd, errfd];
1024        let v = DualFds::from_reply(fds, 0).unwrap();
1025        assert_eq!(v.datafd.as_raw_fd(), raw_datafd_val);
1026        assert_eq!(v.errfd.as_raw_fd(), raw_errfd_val);
1027    }
1028
1029    #[test]
1030    fn test_new_from_raw_values_error_too_many_fds() {
1031        let fds = vec![create_dummy_fd(), create_dummy_fd(), create_dummy_fd()];
1032        match DualFds::from_reply(fds, 0) {
1033            Ok(v) => unreachable!("{v:?}"),
1034            Err(Error::Other(msg)) => {
1035                assert_eq!(msg.as_ref(), "Expected two fds for DualFds")
1036            }
1037            Err(other) => unreachable!("{other}"),
1038        }
1039    }
1040
1041    #[test]
1042    fn test_new_from_raw_values_error_fd_with_zero_pipeid() {
1043        let fds = vec![create_dummy_fd()];
1044        match FinishPipe::from_reply(fds, 0) {
1045            Ok(v) => unreachable!("{v:?}"),
1046            Err(Error::Other(msg)) => {
1047                assert_eq!(msg.as_ref(), "Expected pipeid for FinishPipe")
1048            }
1049            Err(other) => unreachable!("{other}"),
1050        }
1051    }
1052
1053    #[test]
1054    fn test_new_from_raw_values_error_pipeid_with_both_fds() {
1055        let fds = vec![create_dummy_fd(), create_dummy_fd()];
1056        match DualFds::from_reply(fds, 1) {
1057            Ok(v) => unreachable!("{v:?}"),
1058            Err(Error::Other(msg)) => {
1059                assert_eq!(msg.as_ref(), "Unexpected pipeid with DualFds")
1060            }
1061            Err(other) => unreachable!("{other}"),
1062        }
1063    }
1064
1065    #[test]
1066    fn test_new_from_raw_values_error_no_fd_with_pipeid() {
1067        let fds: Vec<OwnedFd> = vec![];
1068        match FinishPipe::from_reply(fds, 1) {
1069            Ok(v) => unreachable!("{v:?}"),
1070            Err(Error::Other(msg)) => {
1071                assert_eq!(msg.as_ref(), "Expected exactly one fd for FinishPipe")
1072            }
1073            Err(other) => unreachable!("{other}"),
1074        }
1075    }
1076
1077    #[tokio::test]
1078    #[ignore = "https://github.com/coreos/rpm-ostree/issues/5442"]
1079    async fn test_open_optional() -> Result<()> {
1080        if !check_skopeo() {
1081            return Ok(());
1082        }
1083
1084        let td = tempfile::tempdir()?;
1085        let td = td.path().to_str().unwrap();
1086        let proxy = ImageProxy::new().await?;
1087        let imgpath = format!("oci-archive:{td}/some-nonexistent-image.ociarchive");
1088        let img = proxy.open_image_optional(&imgpath).await.unwrap();
1089        assert!(img.is_none());
1090
1091        Ok(())
1092    }
1093}