Skip to main content

containers_image_proxy/
imageproxy.rs

1//! Run skopeo as a subprocess to fetch container images.
2//!
3//! This allows fetching a container image manifest and layers in a streaming fashion.
4//!
5//! More information: <https://github.com/containers/skopeo/pull/1476>
6
7pub mod transport;
8
9// Re-export oci_spec so consumers don't need to add it as a dependency.
10// NOTE: Bumping oci_spec in a semver-incompatible way requires bumping this crate too.
11// See Cargo.toml for details.
12pub use ::oci_spec;
13pub use transport::{
14    ContainersStorageRef, ImageReference, ImageReferenceError, Transport, TransportConversionError,
15};
16
17use ::oci_spec::image::{Descriptor, Digest};
18use cap_std_ext::prelude::CapStdExtCommandExt;
19use cap_std_ext::{cap_std, cap_tempfile};
20use futures_util::{Future, FutureExt};
21use itertools::Itertools;
22use serde::{Deserialize, Serialize};
23use std::fs::File;
24use std::iter::FusedIterator;
25use std::num::NonZeroU32;
26use std::ops::Range;
27use std::os::fd::OwnedFd;
28use std::os::unix::prelude::CommandExt;
29use std::path::PathBuf;
30use std::pin::Pin;
31use std::process::{Command, Stdio};
32use std::sync::{Arc, Mutex, OnceLock};
33use thiserror::Error;
34use tokio::io::{AsyncBufRead, AsyncReadExt};
35use tokio::sync::Mutex as AsyncMutex;
36use tokio::task::JoinError;
37use tracing::instrument;
38
39/// Errors returned by this crate.
40#[derive(Error, Debug)]
41#[non_exhaustive]
42pub enum Error {
43    #[error("i/o error: {0}")]
44    /// An input/output error
45    Io(#[from] std::io::Error),
46    #[error("skopeo spawn error: {0}")]
47    /// An error spawning skopeo
48    SkopeoSpawnError(#[source] std::io::Error),
49    #[error("serialization error: {0}")]
50    /// Returned when serialization or deserialization fails
51    SerDe(#[from] serde_json::Error),
52    /// The proxy failed to initiate a request
53    #[error("failed to invoke method {method}: {error}")]
54    RequestInitiationFailure { method: Box<str>, error: Box<str> },
55    /// An error returned from the remote proxy
56    #[error("proxy request returned error: {0}")]
57    RequestReturned(Box<str>),
58    #[error("semantic version error: {0}")]
59    SemanticVersion(#[from] semver::Error),
60    #[error("proxy too old (requested={requested_version} found={found_version}) error")]
61    /// The proxy doesn't support the requested semantic version
62    ProxyTooOld {
63        requested_version: Box<str>,
64        found_version: Box<str>,
65    },
66    #[error("configuration error: {0}")]
67    /// Conflicting or missing configuration
68    Configuration(Box<str>),
69    #[error("other error: {0}")]
70    /// An unknown other error
71    Other(Box<str>),
72}
73
74impl Error {
75    pub(crate) fn new_other(e: impl Into<Box<str>>) -> Self {
76        Self::Other(e.into())
77    }
78}
79
80/// Errors returned by get_raw_blob
81#[derive(Error, Debug)]
82#[non_exhaustive]
83pub enum GetBlobError {
84    /// A client may reasonably retry on this type of error.
85    #[error("retryable error: {0}")]
86    Retryable(Box<str>),
87    #[error("other error: {0}")]
88    /// An unknown other error
89    Other(Box<str>),
90}
91
92impl From<rustix::io::Errno> for Error {
93    fn from(value: rustix::io::Errno) -> Self {
94        Self::Io(value.into())
95    }
96}
97
98/// The error type returned from this crate.
99pub type Result<T> = std::result::Result<T, Error>;
100
101/// File descriptor range which is reserved for passing data down into the proxy;
102/// avoid configuring the command to use files in this range.  (Also, stdin is
103/// reserved)
104pub const RESERVED_FD_RANGE: Range<i32> = 100..200;
105
106// This is defined in skopeo; maximum size of JSON we will read/write.
107// Note that payload data (non-metadata) should go over a pipe file descriptor.
108const MAX_MSG_SIZE: usize = 32 * 1024;
109
110fn base_proto_version() -> &'static semver::VersionReq {
111    // Introduced in https://github.com/containers/skopeo/pull/1523
112    static BASE_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
113    BASE_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.3").unwrap())
114}
115
116fn layer_info_proto_version() -> &'static semver::VersionReq {
117    static LAYER_INFO_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
118    LAYER_INFO_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.5").unwrap())
119}
120
121fn layer_info_piped_proto_version() -> &'static semver::VersionReq {
122    static LAYER_INFO_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
123    LAYER_INFO_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.7").unwrap())
124}
125
126#[derive(Serialize)]
127struct Request {
128    method: String,
129    args: Vec<serde_json::Value>,
130}
131
132impl Request {
133    fn new<T, I>(method: &str, args: T) -> Self
134    where
135        T: IntoIterator<Item = I>,
136        I: Into<serde_json::Value>,
137    {
138        let args: Vec<_> = args.into_iter().map(|v| v.into()).collect();
139        Self {
140            method: method.to_string(),
141            args,
142        }
143    }
144
145    fn new_bare(method: &str) -> Self {
146        Self {
147            method: method.to_string(),
148            args: vec![],
149        }
150    }
151}
152
153#[derive(Deserialize)]
154struct Reply {
155    success: bool,
156    error: String,
157    pipeid: u32,
158    value: serde_json::Value,
159}
160
161type ChildFuture = Pin<
162    Box<
163        dyn Future<Output = std::result::Result<std::io::Result<std::process::Output>, JoinError>>
164            + Send,
165    >,
166>;
167
168/// Manage a child process proxy to fetch container images.
169pub struct ImageProxy {
170    sockfd: Arc<Mutex<OwnedFd>>,
171    childwait: Arc<AsyncMutex<ChildFuture>>,
172    protover: semver::Version,
173}
174
175impl std::fmt::Debug for ImageProxy {
176    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
177        f.debug_struct("ImageProxy").finish()
178    }
179}
180
181/// Opaque identifier for an image
182#[derive(Debug, PartialEq, Eq)]
183pub struct OpenedImage(u32);
184
185#[derive(Debug, PartialEq, Eq)]
186struct PipeId(NonZeroU32);
187
188impl PipeId {
189    fn try_new(pipeid: u32) -> Option<Self> {
190        Some(Self(NonZeroU32::new(pipeid)?))
191    }
192}
193
194/// Configuration for the proxy.
195#[derive(Debug, Default)]
196#[non_exhaustive]
197pub struct ImageProxyConfig {
198    /// Path to container auth file; equivalent to `skopeo --authfile`.
199    /// This conflicts with [`auth_data`].
200    pub authfile: Option<PathBuf>,
201
202    /// Data stream for container auth.  This conflicts with [`authfile`].
203    pub auth_data: Option<File>,
204
205    /// Do not use default container authentication paths; equivalent to `skopeo --no-creds`.
206    ///
207    /// Defaults to `false`; in other words, use the default file paths from `man containers-auth.json`.
208    pub auth_anonymous: bool,
209
210    // Directory with certificates (*.crt, *.cert, *.key) used to connect to registry
211    // Equivalent to `skopeo --cert-dir`
212    pub certificate_directory: Option<PathBuf>,
213
214    /// Decryption keys to decrypt an encrypted container image.
215    /// equivalent to `skopeo copy --decryption-key <path_to_decryption_key> `
216    pub decryption_keys: Option<Vec<String>>,
217
218    /// If set, disable TLS verification.  Equivalent to `skopeo --tls-verify=false`.
219    pub insecure_skip_tls_verification: Option<bool>,
220
221    /// If set, disable signature verification. Equivalent to `skopeo --insecure-policy`.
222    pub insecure_policy: Option<bool>,
223
224    /// Prefix to add to the user agent string. Equivalent to `skopeo --user-agent-prefix`.
225    /// The resulting user agent will be in the format "prefix skopeo/version".
226    /// This option is only used if the installed skopeo version supports it.
227    pub user_agent_prefix: Option<String>,
228
229    /// If enabled, propagate debug-logging level from the proxy via stderr to the
230    /// current process' stderr. Note than when enabled, this also means that standard
231    /// error will no longer be captured.
232    pub debug: bool,
233
234    /// Provide a configured [`std::process::Command`] instance.
235    ///
236    /// This allows configuring aspects of the resulting child `skopeo` process.
237    /// The intention of this hook is to allow the caller to use e.g.
238    /// `systemd-run` or equivalent containerization tools.  For example you
239    /// can set up a command whose arguments are `systemd-run -Pq -p DynamicUser=yes -- skopeo`.
240    /// You can also set up arbitrary aspects of the child via e.g.
241    /// [`current_dir`] [`pre_exec`].
242    ///
243    /// [`current_dir`]: https://doc.rust-lang.org/std/process/struct.Command.html#method.current_dir
244    /// [`pre_exec`]: https://doc.rust-lang.org/std/os/unix/process/trait.CommandExt.html#tymethod.pre_exec
245    ///
246    /// The default is to wrap via util-linux `setpriv --pdeathsig SIGTERM -- skopeo`,
247    /// which on Linux binds the lifecycle of the child process to the parent.
248    ///
249    /// Note that you *must* add `skopeo` as the primary argument or
250    /// indirectly.  However, all other command line options including
251    /// `experimental-image-proxy` will be injected by this library.
252    /// You may use a different command name from `skopeo` if your
253    /// application has set up a compatible copy, e.g. `/usr/lib/myapp/my-private-skopeo`/
254    pub skopeo_cmd: Option<Command>,
255}
256
257/// Check if skopeo supports --user-agent-prefix by probing --help output
258fn supports_user_agent_prefix() -> bool {
259    static SUPPORTS_USER_AGENT: OnceLock<bool> = OnceLock::new();
260    *SUPPORTS_USER_AGENT.get_or_init(|| {
261        Command::new("skopeo")
262            .arg("--help")
263            .stdout(Stdio::piped())
264            .stderr(Stdio::null())
265            .output()
266            .ok()
267            .and_then(|output| {
268                String::from_utf8(output.stdout)
269                    .ok()
270                    .map(|help| help.contains("--user-agent-prefix"))
271            })
272            .unwrap_or(false)
273    })
274}
275
276impl TryFrom<ImageProxyConfig> for Command {
277    type Error = Error;
278
279    fn try_from(config: ImageProxyConfig) -> Result<Self> {
280        let debug = config.debug || std::env::var_os("CONTAINERS_IMAGE_PROXY_DEBUG").is_some();
281        let mut allocated_fds = RESERVED_FD_RANGE.clone();
282        let mut alloc_fd = || {
283            allocated_fds.next().ok_or_else(|| {
284                Error::Other("Ran out of reserved file descriptors for child".into())
285            })
286        };
287
288        // By default, we set up pdeathsig to "lifecycle bind" the child process to us.
289        let mut c = config.skopeo_cmd.unwrap_or_else(|| {
290            let mut c = std::process::Command::new("skopeo");
291            unsafe {
292                c.pre_exec(|| {
293                    Ok(rustix::process::set_parent_process_death_signal(Some(
294                        rustix::process::Signal::TERM,
295                    ))?)
296                });
297            }
298            c
299        });
300        c.arg("experimental-image-proxy");
301        if debug {
302            c.arg("--debug");
303        }
304        let auth_option_count = [
305            config.authfile.is_some(),
306            config.auth_data.is_some(),
307            config.auth_anonymous,
308        ]
309        .into_iter()
310        .filter(|&x| x)
311        .count();
312        if auth_option_count > 1 {
313            // This is a programmer error really
314            return Err(Error::Configuration(
315                "Conflicting authentication options".into(),
316            ));
317        }
318        if let Some(authfile) = config.authfile {
319            c.arg("--authfile");
320            c.arg(authfile);
321        } else if let Some(mut auth_data) = config.auth_data.map(std::io::BufReader::new) {
322            // If we get the authentication data as a file, we always copy it to a new temporary file under
323            // the assumption that the caller provided it this way to aid in privilege separation where
324            // the file is only readable to privileged code.
325            let target_fd = alloc_fd()?;
326            let tmpd = &cap_std::fs::Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
327            let mut tempfile =
328                cap_tempfile::TempFile::new_anonymous(tmpd).map(std::io::BufWriter::new)?;
329            std::io::copy(&mut auth_data, &mut tempfile)?;
330            let tempfile = tempfile
331                .into_inner()
332                .map_err(|e| e.into_error())?
333                .into_std();
334            let fd = std::sync::Arc::new(tempfile.into());
335            c.take_fd_n(fd, target_fd);
336            c.arg("--authfile");
337            c.arg(format!("/proc/self/fd/{target_fd}"));
338        } else if config.auth_anonymous {
339            c.arg("--no-creds");
340        }
341
342        if let Some(certificate_directory) = config.certificate_directory {
343            c.arg("--cert-dir");
344            c.arg(certificate_directory);
345        }
346
347        if let Some(decryption_keys) = config.decryption_keys {
348            for decryption_key in &decryption_keys {
349                c.arg("--decryption-key");
350                c.arg(decryption_key);
351            }
352        }
353
354        if config.insecure_skip_tls_verification.unwrap_or_default() {
355            c.arg("--tls-verify=false");
356        }
357
358        if config.insecure_policy.unwrap_or_default() {
359            c.arg("--insecure-policy");
360        }
361
362        // Add user agent prefix if provided and supported by skopeo
363        if let Some(user_agent_prefix) = config.user_agent_prefix {
364            if supports_user_agent_prefix() {
365                c.arg("--user-agent-prefix");
366                c.arg(user_agent_prefix);
367            }
368        }
369
370        c.stdout(Stdio::null());
371        if !debug {
372            c.stderr(Stdio::piped());
373        }
374        Ok(c)
375    }
376}
377
378/// BlobInfo collects known information about a blob
379#[derive(Debug, serde::Deserialize)]
380pub struct ConvertedLayerInfo {
381    /// Uncompressed digest of a layer; for more information, see
382    /// https://github.com/opencontainers/image-spec/blob/main/config.md#layer-diffid
383    pub digest: Digest,
384
385    /// Size of blob
386    pub size: u64,
387
388    /// Mediatype of blob
389    pub media_type: oci_spec::image::MediaType,
390}
391
392/// A single fd; requires invoking FinishPipe
393#[derive(Debug)]
394struct FinishPipe {
395    pipeid: PipeId,
396    datafd: OwnedFd,
397}
398
399/// There is a data FD and an error FD. The error FD will be JSON.
400#[derive(Debug)]
401struct DualFds {
402    datafd: OwnedFd,
403    errfd: OwnedFd,
404}
405
406/// Helper trait for parsing the pipeid and/or file descriptors of a reply
407trait FromReplyFds: Send + 'static
408where
409    Self: Sized,
410{
411    fn from_reply(
412        iterable: impl IntoIterator<IntoIter: FusedIterator, Item = OwnedFd>,
413        pipeid: u32,
414    ) -> Result<Self>;
415}
416
417/// No file descriptors or pipeid expected
418impl FromReplyFds for () {
419    fn from_reply(fds: impl IntoIterator<Item = OwnedFd>, pipeid: u32) -> Result<Self> {
420        if fds.into_iter().next().is_some() {
421            return Err(Error::Other("expected no fds".into()));
422        }
423        if pipeid != 0 {
424            return Err(Error::Other("unexpected pipeid".into()));
425        }
426        Ok(())
427    }
428}
429
430/// A FinishPipe instance
431impl FromReplyFds for FinishPipe {
432    fn from_reply(fds: impl IntoIterator<Item = OwnedFd>, pipeid: u32) -> Result<Self> {
433        let Some(pipeid) = PipeId::try_new(pipeid) else {
434            return Err(Error::Other("Expected pipeid for FinishPipe".into()));
435        };
436        let datafd = fds
437            .into_iter()
438            .exactly_one()
439            .map_err(|_| Error::Other("Expected exactly one fd for FinishPipe".into()))?;
440        Ok(Self { pipeid, datafd })
441    }
442}
443
444/// A DualFds instance
445impl FromReplyFds for DualFds {
446    fn from_reply(fds: impl IntoIterator<Item = OwnedFd>, pipeid: u32) -> Result<Self> {
447        if pipeid != 0 {
448            return Err(Error::Other("Unexpected pipeid with DualFds".into()));
449        }
450        let [datafd, errfd] = fds
451            .into_iter()
452            .collect_array()
453            .ok_or_else(|| Error::Other("Expected two fds for DualFds".into()))?;
454        Ok(Self { datafd, errfd })
455    }
456}
457
458impl ImageProxy {
459    /// Create an image proxy that fetches the target image, using default configuration.
460    pub async fn new() -> Result<Self> {
461        Self::new_with_config(Default::default()).await
462    }
463
464    /// Create an image proxy that fetches the target image
465    #[instrument]
466    pub async fn new_with_config(config: ImageProxyConfig) -> Result<Self> {
467        let mut c = Command::try_from(config)?;
468        let (mysock, theirsock) = rustix::net::socketpair(
469            rustix::net::AddressFamily::UNIX,
470            rustix::net::SocketType::SEQPACKET,
471            rustix::net::SocketFlags::CLOEXEC,
472            None,
473        )?;
474        c.stdin(Stdio::from(theirsock));
475        let child = match c.spawn() {
476            Ok(c) => c,
477            Err(error) => return Err(Error::SkopeoSpawnError(error)),
478        };
479        tracing::debug!("Spawned skopeo pid={:?}", child.id());
480        // Here we use std sync API via thread because tokio installs
481        // a SIGCHLD handler which can conflict with e.g. the glib one
482        // which may also be in process.
483        // xref https://github.com/tokio-rs/tokio/issues/3520#issuecomment-968985861
484        let childwait = tokio::task::spawn_blocking(move || child.wait_with_output());
485        let sockfd = Arc::new(Mutex::new(mysock));
486
487        let mut r = Self {
488            sockfd,
489            childwait: Arc::new(AsyncMutex::new(Box::pin(childwait))),
490            protover: semver::Version::new(0, 0, 0),
491        };
492
493        // Verify semantic version
494        let protover: String = r.impl_request("Initialize", [(); 0]).await?;
495        tracing::debug!("Remote protocol version: {protover}");
496        let protover = semver::Version::parse(protover.as_str())?;
497        // Previously we had a feature to opt-in to requiring newer versions using `if cfg!()`.
498        let supported = base_proto_version();
499        if !supported.matches(&protover) {
500            return Err(Error::ProxyTooOld {
501                requested_version: protover.to_string().into(),
502                found_version: supported.to_string().into(),
503            });
504        }
505        r.protover = protover;
506
507        Ok(r)
508    }
509
510    /// Create and send a request. Should only be used by impl_request.
511    async fn impl_request_raw<T: serde::de::DeserializeOwned + Send + 'static, F: FromReplyFds>(
512        sockfd: Arc<Mutex<OwnedFd>>,
513        req: Request,
514    ) -> Result<(T, F)> {
515        tracing::trace!("sending request {}", req.method.as_str());
516        // TODO: Investigate https://crates.io/crates/uds for SOCK_SEQPACKET tokio
517        let r = tokio::task::spawn_blocking(move || {
518            let sockfd = sockfd.lock().unwrap();
519            let sendbuf = serde_json::to_vec(&req)?;
520            let sockfd = &*sockfd;
521            rustix::net::send(sockfd, &sendbuf, rustix::net::SendFlags::empty())?;
522            drop(sendbuf);
523            let mut buf = [0u8; MAX_MSG_SIZE];
524            let mut cmsg_space: Vec<std::mem::MaybeUninit<u8>> =
525                vec![std::mem::MaybeUninit::uninit(); rustix::cmsg_space!(ScmRights(1))];
526            let mut cmsg_buffer = rustix::net::RecvAncillaryBuffer::new(cmsg_space.as_mut_slice());
527            let iov = std::io::IoSliceMut::new(buf.as_mut());
528            let mut iov = [iov];
529            let nread = rustix::net::recvmsg(
530                sockfd,
531                &mut iov,
532                &mut cmsg_buffer,
533                rustix::net::RecvFlags::CMSG_CLOEXEC,
534            )?
535            .bytes;
536            let fdret = cmsg_buffer
537                .drain()
538                .filter_map(|m| match m {
539                    rustix::net::RecvAncillaryMessage::ScmRights(f) => Some(f),
540                    _ => None,
541                })
542                .flatten();
543            let buf = &buf[..nread];
544            let reply: Reply = serde_json::from_slice(buf)?;
545            if !reply.success {
546                return Err(Error::RequestInitiationFailure {
547                    method: req.method.clone().into(),
548                    error: reply.error.into(),
549                });
550            }
551            let fds = FromReplyFds::from_reply(fdret, reply.pipeid)?;
552            Ok((serde_json::from_value(reply.value)?, fds))
553        })
554        .await
555        .map_err(|e| Error::Other(e.to_string().into()))??;
556        tracing::trace!("completed request");
557        Ok(r)
558    }
559
560    /// Create a request that may return file descriptors, and also check for an unexpected
561    /// termination of the child process.
562    #[instrument(skip(args))]
563    async fn impl_request_with_fds<
564        T: serde::de::DeserializeOwned + Send + 'static,
565        F: FromReplyFds,
566    >(
567        &self,
568        method: &str,
569        args: impl IntoIterator<Item = impl Into<serde_json::Value>>,
570    ) -> Result<(T, F)> {
571        let req = Self::impl_request_raw(Arc::clone(&self.sockfd), Request::new(method, args));
572        let mut childwait = self.childwait.lock().await;
573        tokio::select! {
574            r = req => { r }
575            r = childwait.as_mut() => {
576                let r = r.map_err(|e| Error::Other(e.to_string().into()))??;
577                let stderr = String::from_utf8_lossy(&r.stderr);
578                Err(Error::Other(format!("skopeo proxy unexpectedly exited during request method {}: {}\n{}", method, r.status, stderr).into()))
579            }
580        }
581    }
582
583    /// A synchronous invocation which does not return any file descriptors.
584    async fn impl_request<T: serde::de::DeserializeOwned + Send + 'static>(
585        &self,
586        method: &str,
587        args: impl IntoIterator<Item = impl Into<serde_json::Value>>,
588    ) -> Result<T> {
589        let (r, ()) = self.impl_request_with_fds(method, args).await?;
590        Ok(r)
591    }
592
593    #[instrument]
594    async fn finish_pipe(&self, pipeid: PipeId) -> Result<()> {
595        tracing::debug!("closing pipe");
596        let (r, ()) = self
597            .impl_request_with_fds("FinishPipe", [pipeid.0.get()])
598            .await?;
599        Ok(r)
600    }
601
602    /// Open an image for fetching, using a string reference.
603    #[instrument]
604    pub async fn open_image(&self, imgref: &str) -> Result<OpenedImage> {
605        tracing::debug!("opening image");
606        let imgid = self.impl_request("OpenImage", [imgref]).await?;
607        Ok(OpenedImage(imgid))
608    }
609
610    /// Open an image for fetching, using a structured [`ImageReference`].
611    #[instrument]
612    pub async fn open_image_ref(&self, imgref: &ImageReference) -> Result<OpenedImage> {
613        self.open_image(&imgref.to_string()).await
614    }
615
616    /// Open an image for fetching if it exists, using a string reference.
617    /// Returns `Ok(None)` if the image does not exist.
618    #[instrument]
619    pub async fn open_image_optional(&self, imgref: &str) -> Result<Option<OpenedImage>> {
620        tracing::debug!("opening image");
621        let imgid = self.impl_request("OpenImageOptional", [imgref]).await?;
622        if imgid == 0 {
623            Ok(None)
624        } else {
625            Ok(Some(OpenedImage(imgid)))
626        }
627    }
628
629    /// Open an image for fetching if it exists, using a structured [`ImageReference`].
630    /// Returns `Ok(None)` if the image does not exist.
631    #[instrument]
632    pub async fn open_image_optional_ref(
633        &self,
634        imgref: &ImageReference,
635    ) -> Result<Option<OpenedImage>> {
636        self.open_image_optional(&imgref.to_string()).await
637    }
638
639    #[instrument]
640    pub async fn close_image(&self, img: &OpenedImage) -> Result<()> {
641        self.impl_request("CloseImage", [img.0]).await
642    }
643
644    async fn read_finish_pipe(&self, pipe: FinishPipe) -> Result<Vec<u8>> {
645        let fd = tokio::fs::File::from_std(std::fs::File::from(pipe.datafd));
646        let mut fd = tokio::io::BufReader::new(fd);
647        let mut r = Vec::new();
648        let reader = fd.read_to_end(&mut r);
649        let (nbytes, finish) = tokio::join!(reader, self.finish_pipe(pipe.pipeid));
650        finish?;
651        assert_eq!(nbytes?, r.len());
652        Ok(r)
653    }
654
655    /// Fetch the manifest as raw bytes, converted to OCI if necessary.
656    /// The original digest of the unconverted manifest is also returned.
657    /// For more information on OCI manifests, see <https://github.com/opencontainers/image-spec/blob/main/manifest.md>
658    pub async fn fetch_manifest_raw_oci(&self, img: &OpenedImage) -> Result<(String, Vec<u8>)> {
659        let (digest, pipefd) = self.impl_request_with_fds("GetManifest", [img.0]).await?;
660        Ok((digest, self.read_finish_pipe(pipefd).await?))
661    }
662
663    /// Fetch the manifest.
664    /// For more information on OCI manifests, see <https://github.com/opencontainers/image-spec/blob/main/manifest.md>
665    pub async fn fetch_manifest(
666        &self,
667        img: &OpenedImage,
668    ) -> Result<(String, oci_spec::image::ImageManifest)> {
669        let (digest, raw) = self.fetch_manifest_raw_oci(img).await?;
670        let manifest = serde_json::from_slice(&raw)?;
671        Ok((digest, manifest))
672    }
673
674    /// Fetch the config.
675    /// For more information on OCI config, see <https://github.com/opencontainers/image-spec/blob/main/config.md>
676    pub async fn fetch_config_raw(&self, img: &OpenedImage) -> Result<Vec<u8>> {
677        let ((), pipe) = self.impl_request_with_fds("GetFullConfig", [img.0]).await?;
678        self.read_finish_pipe(pipe).await
679    }
680
681    /// Fetch the config.
682    /// For more information on OCI config, see <https://github.com/opencontainers/image-spec/blob/main/config.md>
683    pub async fn fetch_config(
684        &self,
685        img: &OpenedImage,
686    ) -> Result<oci_spec::image::ImageConfiguration> {
687        let raw = self.fetch_config_raw(img).await?;
688        serde_json::from_slice(&raw).map_err(Into::into)
689    }
690
691    /// Fetch a blob identified by e.g. `sha256:<digest>`.
692    /// <https://github.com/opencontainers/image-spec/blob/main/descriptor.md>
693    ///
694    /// The requested size and digest are verified (by the proxy process).
695    ///
696    /// Note that because of the implementation details of this function, you should
697    /// [`futures::join!`] the returned futures instead of polling one after the other. The
698    /// secondary "driver" future will only return once everything has been read from
699    /// the reader future.
700    #[instrument]
701    pub async fn get_blob(
702        &self,
703        img: &OpenedImage,
704        digest: &Digest,
705        size: u64,
706    ) -> Result<(
707        impl AsyncBufRead + Send + Unpin,
708        impl Future<Output = Result<()>> + Unpin + '_,
709    )> {
710        // For previous discussion on digest/size verification, see
711        // https://github.com/cgwalters/container-image-proxy/issues/1#issuecomment-926712009
712        tracing::debug!("fetching blob");
713        let args: Vec<serde_json::Value> =
714            vec![img.0.into(), digest.to_string().into(), size.into()];
715        // Note that size may be -1 here if e.g. the remote registry doesn't give a Content-Length
716        // for example.
717        // We have always validated the size later (in FinishPipe) so out of conservatism we
718        // just ignore the size here.
719        let (_bloblen, pipe): (serde_json::Number, FinishPipe) =
720            self.impl_request_with_fds("GetBlob", args).await?;
721        let fd = tokio::fs::File::from_std(std::fs::File::from(pipe.datafd));
722        let fd = tokio::io::BufReader::new(fd);
723        let finish = Box::pin(self.finish_pipe(pipe.pipeid));
724        Ok((fd, finish))
725    }
726
727    async fn read_blob_error(fd: OwnedFd) -> std::result::Result<(), GetBlobError> {
728        let fd = tokio::fs::File::from_std(std::fs::File::from(fd));
729        let mut errfd = tokio::io::BufReader::new(fd);
730        let mut buf = Vec::new();
731        errfd
732            .read_to_end(&mut buf)
733            .await
734            .map_err(|e| GetBlobError::Other(e.to_string().into_boxed_str()))?;
735        if buf.is_empty() {
736            return Ok(());
737        }
738        #[derive(Deserialize)]
739        struct RemoteError {
740            code: String,
741            message: String,
742        }
743        let e: RemoteError = serde_json::from_slice(&buf)
744            .map_err(|e| GetBlobError::Other(e.to_string().into_boxed_str()))?;
745        match e.code.as_str() {
746            // Actually this is OK
747            "EPIPE" => Ok(()),
748            "retryable" => Err(GetBlobError::Retryable(e.message.into_boxed_str())),
749            _ => Err(GetBlobError::Other(e.message.into_boxed_str())),
750        }
751    }
752
753    /// Fetch a blob identified by e.g. `sha256:<digest>`; does not perform
754    /// any verification that the blob matches the digest. The size of the
755    /// blob (if available) and a pipe file descriptor are returned.
756    #[instrument]
757    pub async fn get_raw_blob(
758        &self,
759        img: &OpenedImage,
760        digest: &Digest,
761    ) -> Result<(
762        Option<u64>,
763        tokio::fs::File,
764        impl Future<Output = std::result::Result<(), GetBlobError>> + Unpin + '_,
765    )> {
766        tracing::debug!("fetching blob");
767        let args: Vec<serde_json::Value> = vec![img.0.into(), digest.to_string().into()];
768        let (bloblen, fds): (i64, DualFds) = self.impl_request_with_fds("GetRawBlob", args).await?;
769        // See the GetBlob case, we have a best-effort attempt to return the size, but it might not be known
770        let bloblen = u64::try_from(bloblen).ok();
771        let fd = tokio::fs::File::from_std(std::fs::File::from(fds.datafd));
772        let err = Self::read_blob_error(fds.errfd).boxed();
773        Ok((bloblen, fd, err))
774    }
775
776    /// Fetch a descriptor. The requested size and digest are verified (by the proxy process).
777    #[instrument]
778    pub async fn get_descriptor(
779        &self,
780        img: &OpenedImage,
781        descriptor: &Descriptor,
782    ) -> Result<(
783        impl AsyncBufRead + Send + Unpin,
784        impl Future<Output = Result<()>> + Unpin + '_,
785    )> {
786        self.get_blob(img, descriptor.digest(), descriptor.size())
787            .await
788    }
789
790    ///Returns data that can be used to find the "diffid" corresponding to a particular layer.
791    #[instrument]
792    pub async fn get_layer_info(
793        &self,
794        img: &OpenedImage,
795    ) -> Result<Option<Vec<ConvertedLayerInfo>>> {
796        tracing::debug!("Getting layer info");
797        if layer_info_piped_proto_version().matches(&self.protover) {
798            let ((), pipe) = self
799                .impl_request_with_fds("GetLayerInfoPiped", [img.0])
800                .await?;
801            let buf = self.read_finish_pipe(pipe).await?;
802            return Ok(Some(serde_json::from_slice(&buf)?));
803        }
804        if !layer_info_proto_version().matches(&self.protover) {
805            return Ok(None);
806        }
807        let layers = self.impl_request("GetLayerInfo", [img.0]).await?;
808        Ok(Some(layers))
809    }
810
811    /// Close the connection and wait for the child process to exit successfully.
812    #[instrument]
813    pub async fn finalize(self) -> Result<()> {
814        let _ = &self;
815        let req = Request::new_bare("Shutdown");
816        let sendbuf = serde_json::to_vec(&req)?;
817        // SAFETY: Only panics if a worker thread already panic'd
818        let sockfd = Arc::try_unwrap(self.sockfd).unwrap().into_inner().unwrap();
819        rustix::net::send(sockfd, &sendbuf, rustix::net::SendFlags::empty())?;
820        drop(sendbuf);
821        tracing::debug!("sent shutdown request");
822        let mut childwait = self.childwait.lock().await;
823        let output = childwait
824            .as_mut()
825            .await
826            .map_err(|e| Error::new_other(e.to_string()))??;
827        if !output.status.success() {
828            let stderr = String::from_utf8_lossy(&output.stderr);
829            return Err(Error::RequestReturned(
830                format!("proxy failed: {}\n{}", output.status, stderr).into(),
831            ));
832        }
833        tracing::debug!("proxy exited successfully");
834        Ok(())
835    }
836}
837
838#[cfg(test)]
839mod tests {
840    use std::io::{BufWriter, Seek, Write};
841    use std::os::fd::{AsRawFd, OwnedFd};
842
843    use super::*;
844    use cap_std_ext::cap_std::fs::Dir;
845    use rustix::fs::{memfd_create, MemfdFlags};
846
847    /// Check if we have skopeo
848    fn check_skopeo() -> bool {
849        static HAVE_SKOPEO: OnceLock<bool> = OnceLock::new();
850        *HAVE_SKOPEO.get_or_init(|| {
851            Command::new("skopeo")
852                .arg("--help")
853                .stdout(Stdio::null())
854                .stderr(Stdio::null())
855                .status()
856                .is_ok()
857        })
858    }
859
860    fn validate(c: Command, contains: &[&str], not_contains: &[&str]) {
861        // Format via debug, because
862        // https://doc.rust-lang.org/std/process/struct.Command.html#method.get_args
863        // is experimental
864        let d = format!("{:?}", c);
865        for c in contains {
866            assert!(d.contains(c), "{} missing {}", d, c);
867        }
868        for c in not_contains {
869            assert!(!d.contains(c), "{} should not contain {}", d, c);
870        }
871    }
872
873    #[test]
874    fn proxy_configs() {
875        let tmpd = &cap_tempfile::tempdir(cap_std::ambient_authority()).unwrap();
876
877        let c = Command::try_from(ImageProxyConfig::default()).unwrap();
878        validate(
879            c,
880            &["experimental-image-proxy"],
881            &["--no-creds", "--tls-verify", "--authfile"],
882        );
883
884        let c = Command::try_from(ImageProxyConfig {
885            authfile: Some(PathBuf::from("/path/to/authfile")),
886            ..Default::default()
887        })
888        .unwrap();
889        validate(c, &[r"--authfile", "/path/to/authfile"], &[]);
890
891        let decryption_key_path = "/path/to/decryption_key";
892        let c = Command::try_from(ImageProxyConfig {
893            decryption_keys: Some(vec![decryption_key_path.to_string()]),
894            ..Default::default()
895        })
896        .unwrap();
897        validate(c, &[r"--decryption-key", "/path/to/decryption_key"], &[]);
898
899        let c = Command::try_from(ImageProxyConfig {
900            certificate_directory: Some(PathBuf::from("/path/to/certs")),
901            ..Default::default()
902        })
903        .unwrap();
904        validate(c, &[r"--cert-dir", "/path/to/certs"], &[]);
905
906        let c = Command::try_from(ImageProxyConfig {
907            insecure_skip_tls_verification: Some(true),
908            ..Default::default()
909        })
910        .unwrap();
911        validate(c, &[r"--tls-verify=false"], &[]);
912
913        let c = Command::try_from(ImageProxyConfig {
914            insecure_policy: Some(true),
915            ..Default::default()
916        })
917        .unwrap();
918        validate(c, &[r"--insecure-policy"], &[]);
919
920        let mut tmpf = cap_tempfile::TempFile::new_anonymous(tmpd).unwrap();
921        tmpf.write_all(r#"{ "auths": {} "#.as_bytes()).unwrap();
922        tmpf.seek(std::io::SeekFrom::Start(0)).unwrap();
923        let c = Command::try_from(ImageProxyConfig {
924            auth_data: Some(tmpf.into_std()),
925            ..Default::default()
926        })
927        .unwrap();
928        validate(c, &["--authfile", "/proc/self/fd/100"], &[]);
929
930        // Test user-agent-prefix - only validate if supported
931        let c = Command::try_from(ImageProxyConfig {
932            user_agent_prefix: Some("bootc/1.0".to_string()),
933            ..Default::default()
934        })
935        .unwrap();
936        if supports_user_agent_prefix() {
937            validate(c, &["--user-agent-prefix", "bootc/1.0"], &[]);
938        } else {
939            validate(c, &[], &["--user-agent-prefix"]);
940        }
941    }
942
943    #[tokio::test]
944    async fn skopeo_not_found() {
945        let mut config = ImageProxyConfig {
946            ..ImageProxyConfig::default()
947        };
948        config.skopeo_cmd = Some(Command::new("no-skopeo"));
949
950        match ImageProxy::new_with_config(config).await {
951            Ok(_) => panic!("Expected an error"),
952            Err(ref e @ Error::SkopeoSpawnError(ref inner)) => {
953                assert_eq!(inner.kind(), std::io::ErrorKind::NotFound);
954                // Just to double check
955                assert!(e
956                    .to_string()
957                    .contains("skopeo spawn error: No such file or directory"));
958            }
959            Err(e) => panic!("Unexpected error {e}"),
960        }
961    }
962
963    #[tokio::test]
964    async fn test_proxy_send_sync() {
965        fn assert_send_sync(_x: impl Send + Sync) {}
966
967        let Ok(proxy) = ImageProxy::new().await else {
968            // doesn't matter: we only actually care to test if this compiles
969            return;
970        };
971        assert_send_sync(&proxy);
972        assert_send_sync(proxy);
973
974        let opened = OpenedImage(0);
975        assert_send_sync(&opened);
976        assert_send_sync(opened);
977    }
978
979    fn generate_err_fd(v: serde_json::Value) -> Result<OwnedFd> {
980        let tmp = Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
981        let mut tf = cap_tempfile::TempFile::new_anonymous(&tmp).map(BufWriter::new)?;
982        serde_json::to_writer(&mut tf, &v)?;
983        let mut tf = tf.into_inner().map_err(|e| e.into_error())?;
984        tf.seek(std::io::SeekFrom::Start(0))?;
985        let r = tf.into_std().into();
986        Ok(r)
987    }
988
989    #[tokio::test]
990    async fn test_read_blob_error_retryable() -> Result<()> {
991        let retryable = serde_json::json!({
992            "code": "retryable",
993            "message": "foo",
994        });
995        let retryable = generate_err_fd(retryable)?;
996        let err = ImageProxy::read_blob_error(retryable).boxed();
997        let e = err.await.unwrap_err();
998        match e {
999            GetBlobError::Retryable(s) => assert_eq!(s.as_ref(), "foo"),
1000            _ => panic!("Unexpected error {e:?}"),
1001        }
1002        Ok(())
1003    }
1004
1005    #[tokio::test]
1006    async fn test_read_blob_error_none() -> Result<()> {
1007        let tmp = Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
1008        let tf = cap_tempfile::TempFile::new_anonymous(&tmp)?.into_std();
1009        let err = ImageProxy::read_blob_error(tf.into()).boxed();
1010        err.await.unwrap();
1011        Ok(())
1012    }
1013
1014    #[tokio::test]
1015    async fn test_read_blob_error_other() -> Result<()> {
1016        let other = serde_json::json!({
1017            "code": "other",
1018            "message": "bar",
1019        });
1020        let other = generate_err_fd(other)?;
1021        let err = ImageProxy::read_blob_error(other).boxed();
1022        let e = err.await.unwrap_err();
1023        match e {
1024            GetBlobError::Other(s) => assert_eq!(s.as_ref(), "bar"),
1025            _ => panic!("Unexpected error {e:?}"),
1026        }
1027        Ok(())
1028    }
1029
1030    #[tokio::test]
1031    async fn test_read_blob_error_epipe() -> Result<()> {
1032        let epipe = serde_json::json!({
1033            "code": "EPIPE",
1034            "message": "baz",
1035        });
1036        let epipe = generate_err_fd(epipe)?;
1037        let err = ImageProxy::read_blob_error(epipe).boxed();
1038        err.await.unwrap();
1039        Ok(())
1040    }
1041
1042    // Helper to create a dummy OwnedFd using memfd_create for testing.
1043    fn create_dummy_fd() -> OwnedFd {
1044        memfd_create(c"test-fd", MemfdFlags::CLOEXEC).unwrap()
1045    }
1046
1047    #[test]
1048    fn test_new_from_raw_values_finish_pipe() {
1049        let datafd = create_dummy_fd();
1050        // Keep a raw fd to compare later, as fds_and_pipeid consumes datafd
1051        let raw_datafd_val = datafd.as_raw_fd();
1052        let fds = vec![datafd];
1053        let v = FinishPipe::from_reply(fds, 1).unwrap();
1054        assert_eq!(v.pipeid.0.get(), 1);
1055        assert_eq!(v.datafd.as_raw_fd(), raw_datafd_val);
1056    }
1057
1058    #[test]
1059    fn test_new_from_raw_values_dual_fds() {
1060        let datafd = create_dummy_fd();
1061        let errfd = create_dummy_fd();
1062        let raw_datafd_val = datafd.as_raw_fd();
1063        let raw_errfd_val = errfd.as_raw_fd();
1064        let fds = vec![datafd, errfd];
1065        let v = DualFds::from_reply(fds, 0).unwrap();
1066        assert_eq!(v.datafd.as_raw_fd(), raw_datafd_val);
1067        assert_eq!(v.errfd.as_raw_fd(), raw_errfd_val);
1068    }
1069
1070    #[test]
1071    fn test_new_from_raw_values_error_too_many_fds() {
1072        let fds = vec![create_dummy_fd(), create_dummy_fd(), create_dummy_fd()];
1073        match DualFds::from_reply(fds, 0) {
1074            Ok(v) => unreachable!("{v:?}"),
1075            Err(Error::Other(msg)) => {
1076                assert_eq!(msg.as_ref(), "Expected two fds for DualFds")
1077            }
1078            Err(other) => unreachable!("{other}"),
1079        }
1080    }
1081
1082    #[test]
1083    fn test_new_from_raw_values_error_fd_with_zero_pipeid() {
1084        let fds = vec![create_dummy_fd()];
1085        match FinishPipe::from_reply(fds, 0) {
1086            Ok(v) => unreachable!("{v:?}"),
1087            Err(Error::Other(msg)) => {
1088                assert_eq!(msg.as_ref(), "Expected pipeid for FinishPipe")
1089            }
1090            Err(other) => unreachable!("{other}"),
1091        }
1092    }
1093
1094    #[test]
1095    fn test_new_from_raw_values_error_pipeid_with_both_fds() {
1096        let fds = vec![create_dummy_fd(), create_dummy_fd()];
1097        match DualFds::from_reply(fds, 1) {
1098            Ok(v) => unreachable!("{v:?}"),
1099            Err(Error::Other(msg)) => {
1100                assert_eq!(msg.as_ref(), "Unexpected pipeid with DualFds")
1101            }
1102            Err(other) => unreachable!("{other}"),
1103        }
1104    }
1105
1106    #[test]
1107    fn test_new_from_raw_values_error_no_fd_with_pipeid() {
1108        let fds: Vec<OwnedFd> = vec![];
1109        match FinishPipe::from_reply(fds, 1) {
1110            Ok(v) => unreachable!("{v:?}"),
1111            Err(Error::Other(msg)) => {
1112                assert_eq!(msg.as_ref(), "Expected exactly one fd for FinishPipe")
1113            }
1114            Err(other) => unreachable!("{other}"),
1115        }
1116    }
1117
1118    #[tokio::test]
1119    #[ignore = "https://github.com/coreos/rpm-ostree/issues/5442"]
1120    async fn test_open_optional() -> Result<()> {
1121        if !check_skopeo() {
1122            return Ok(());
1123        }
1124
1125        let td = tempfile::tempdir()?;
1126        let td = td.path().to_str().unwrap();
1127        let proxy = ImageProxy::new().await?;
1128        let imgpath = format!("oci-archive:{td}/some-nonexistent-image.ociarchive");
1129        let img = proxy.open_image_optional(&imgpath).await.unwrap();
1130        assert!(img.is_none());
1131
1132        Ok(())
1133    }
1134}