Skip to main content

containers_image_proxy/
imageproxy.rs

1//! Run skopeo as a subprocess to fetch container images.
2//!
3//! This allows fetching a container image manifest and layers in a streaming fashion.
4//!
5//! More information: <https://github.com/containers/skopeo/pull/1476>
6
7pub mod transport;
8
9// Re-export oci_spec so consumers don't need to add it as a dependency.
10// NOTE: Bumping oci_spec in a semver-incompatible way requires bumping this crate too.
11// See Cargo.toml for details.
12pub use ::oci_spec;
13pub use transport::{
14    ContainersStorageRef, ImageReference, ImageReferenceError, Transport, TransportConversionError,
15};
16
17use ::oci_spec::image::{Descriptor, Digest};
18use cap_std_ext::prelude::CapStdExtCommandExt;
19use cap_std_ext::{cap_std, cap_tempfile};
20use futures_util::{Future, FutureExt};
21use itertools::Itertools;
22use serde::{Deserialize, Serialize};
23use sha2::Digest as _;
24use std::fs::File;
25use std::iter::FusedIterator;
26use std::num::NonZeroU32;
27use std::ops::Range;
28use std::os::fd::OwnedFd;
29use std::os::unix::prelude::CommandExt;
30use std::path::PathBuf;
31use std::pin::Pin;
32use std::process::{Command, Stdio};
33use std::str::FromStr;
34use std::sync::{Arc, Mutex, OnceLock};
35use thiserror::Error;
36use tokio::io::{AsyncBufRead, AsyncRead, AsyncReadExt, ReadBuf};
37use tokio::sync::{oneshot, Mutex as AsyncMutex};
38use tokio::task::JoinError;
39use tracing::instrument;
40
41/// Errors returned by this crate.
42#[derive(Error, Debug)]
43#[non_exhaustive]
44pub enum Error {
45    #[error("i/o error: {0}")]
46    /// An input/output error
47    Io(#[from] std::io::Error),
48    #[error("skopeo spawn error: {0}")]
49    /// An error spawning skopeo
50    SkopeoSpawnError(#[source] std::io::Error),
51    #[error("serialization error: {0}")]
52    /// Returned when serialization or deserialization fails
53    SerDe(#[from] serde_json::Error),
54    /// The proxy failed to initiate a request
55    #[error("failed to invoke method {method}: {error}")]
56    RequestInitiationFailure { method: Box<str>, error: Box<str> },
57    /// An error returned from the remote proxy
58    #[error("proxy request returned error: {0}")]
59    RequestReturned(Box<str>),
60    /// An error returned via the `GetRawBlob` error pipe.
61    #[error(transparent)]
62    BlobError(#[from] GetBlobError),
63    #[error("semantic version error: {0}")]
64    SemanticVersion(#[from] semver::Error),
65    #[error("proxy too old (requested={requested_version} found={found_version}) error")]
66    /// The proxy doesn't support the requested semantic version
67    ProxyTooOld {
68        requested_version: Box<str>,
69        found_version: Box<str>,
70    },
71    #[error("configuration error: {0}")]
72    /// Conflicting or missing configuration
73    Configuration(Box<str>),
74    #[error("other error: {0}")]
75    /// An unknown other error
76    Other(Box<str>),
77}
78
79impl Error {
80    pub(crate) fn new_other(e: impl Into<Box<str>>) -> Self {
81        Self::Other(e.into())
82    }
83}
84
85/// Errors returned by get_raw_blob
86#[derive(Error, Debug)]
87#[non_exhaustive]
88pub enum GetBlobError {
89    /// A client may reasonably retry on this type of error.
90    #[error("retryable error: {0}")]
91    Retryable(Box<str>),
92    #[error("other error: {0}")]
93    /// An unknown other error
94    Other(Box<str>),
95}
96
97impl From<rustix::io::Errno> for Error {
98    fn from(value: rustix::io::Errno) -> Self {
99        Self::Io(value.into())
100    }
101}
102
103/// The error type returned from this crate.
104pub type Result<T> = std::result::Result<T, Error>;
105
106#[derive(Debug, Clone, Copy, PartialEq, Eq)]
107pub enum BlobStreamSource {
108    /// Used `GetRawBlob`.
109    GetRawBlob,
110    /// Fell back to `GetBlob`.
111    GetBlob,
112}
113
114/// A streaming blob reader and "driver" future.
115pub struct BlobStream<'a> {
116    source: BlobStreamSource,
117    expected_size: u64,
118    reader: Box<dyn AsyncRead + Send + Unpin>,
119    driver: futures_util::future::BoxFuture<'a, Result<()>>,
120}
121
122impl<'a> BlobStream<'a> {
123    pub fn source(&self) -> BlobStreamSource {
124        self.source
125    }
126
127    pub fn expected_size(&self) -> u64 {
128        self.expected_size
129    }
130
131    pub fn into_parts(
132        self,
133    ) -> (
134        Box<dyn AsyncRead + Send + Unpin>,
135        futures_util::future::BoxFuture<'a, Result<()>>,
136    ) {
137        (self.reader, self.driver)
138    }
139}
140
141impl std::fmt::Debug for BlobStream<'_> {
142    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143        f.debug_struct("BlobStream")
144            .field("source", &self.source)
145            .field("expected_size", &self.expected_size)
146            .finish_non_exhaustive()
147    }
148}
149
150#[derive(Debug)]
151enum VerifiedBlobReadResult {
152    Complete { nbytes: u64, digest: Digest },
153    Incomplete,
154}
155
156#[derive(Debug)]
157enum Hasher {
158    Sha256(sha2::Sha256),
159    Sha384(sha2::Sha384),
160    Sha512(sha2::Sha512),
161}
162
163impl Hasher {
164    fn new_for_digest(digest: &Digest) -> Result<Self> {
165        use oci_spec::image::DigestAlgorithm;
166        Ok(match digest.algorithm() {
167            DigestAlgorithm::Sha256 => Self::Sha256(sha2::Sha256::new()),
168            DigestAlgorithm::Sha384 => Self::Sha384(sha2::Sha384::new()),
169            DigestAlgorithm::Sha512 => Self::Sha512(sha2::Sha512::new()),
170            DigestAlgorithm::Other(a) => {
171                return Err(Error::Other(
172                    format!("Unsupported digest algorithm for blob verification: {a}").into(),
173                ));
174            }
175            _ => {
176                return Err(Error::Other(
177                    format!(
178                        "Unsupported digest algorithm for blob verification: {}",
179                        digest.algorithm().as_ref()
180                    )
181                    .into(),
182                ));
183            }
184        })
185    }
186
187    fn update(&mut self, chunk: &[u8]) {
188        match self {
189            Self::Sha256(h) => h.update(chunk),
190            Self::Sha384(h) => h.update(chunk),
191            Self::Sha512(h) => h.update(chunk),
192        }
193    }
194
195    fn finalize_digest(self) -> Digest {
196        let (algorithm, hex) = match self {
197            Self::Sha256(h) => ("sha256", hex::encode(h.finalize())),
198            Self::Sha384(h) => ("sha384", hex::encode(h.finalize())),
199            Self::Sha512(h) => ("sha512", hex::encode(h.finalize())),
200        };
201        Digest::from_str(&format!("{algorithm}:{hex}")).expect("valid digest")
202    }
203}
204
205/// Wraps an [`AsyncRead`] and computes a digest; sends the result on EOF so the
206/// driver future can verify the stream without re-reading it.
207#[derive(Debug)]
208struct VerifiedBlobReader<R> {
209    inner: R,
210    nbytes: u64,
211    hasher: Option<Hasher>,
212    completion: Option<oneshot::Sender<VerifiedBlobReadResult>>,
213}
214
215impl<R: AsyncRead + Unpin> VerifiedBlobReader<R> {
216    fn new(
217        inner: R,
218        expected: Digest,
219        completion: oneshot::Sender<VerifiedBlobReadResult>,
220    ) -> Result<Self> {
221        let hasher = Hasher::new_for_digest(&expected)?;
222        Ok(Self {
223            inner,
224            nbytes: 0,
225            hasher: Some(hasher),
226            completion: Some(completion),
227        })
228    }
229}
230
231impl<R: AsyncRead + Unpin> AsyncRead for VerifiedBlobReader<R> {
232    fn poll_read(
233        mut self: Pin<&mut Self>,
234        cx: &mut std::task::Context<'_>,
235        buf: &mut ReadBuf<'_>,
236    ) -> std::task::Poll<std::io::Result<()>> {
237        if buf.remaining() == 0 {
238            return std::task::Poll::Ready(Ok(()));
239        }
240        // ReadBuf may already have data; only hash the newly appended bytes.
241        let before = buf.filled().len();
242        match Pin::new(&mut self.inner).poll_read(cx, buf) {
243            v @ std::task::Poll::Ready(Ok(())) => {
244                let after = buf.filled().len();
245                let delta = after.checked_sub(before).unwrap();
246                if delta > 0 {
247                    let chunk = &buf.filled()[before..after];
248                    let hasher = self.hasher.as_mut().expect("hasher missing before EOF");
249                    hasher.update(chunk);
250                    self.nbytes += delta as u64;
251                } else {
252                    // EOF reached
253                    let Some(tx) = self.completion.take() else {
254                        return v;
255                    };
256                    let Some(hasher) = self.hasher.take() else {
257                        return v;
258                    };
259                    let _ = tx.send(VerifiedBlobReadResult::Complete {
260                        nbytes: self.nbytes,
261                        digest: hasher.finalize_digest(),
262                    });
263                }
264                v
265            }
266            o => o,
267        }
268    }
269}
270
271impl<R> Drop for VerifiedBlobReader<R> {
272    fn drop(&mut self) {
273        if let Some(tx) = self.completion.take() {
274            let _ = tx.send(VerifiedBlobReadResult::Incomplete);
275        }
276    }
277}
278
279fn verify_blob_bytes_read(
280    expected: &Digest,
281    expected_size: u64,
282    r: VerifiedBlobReadResult,
283) -> Result<()> {
284    match r {
285        VerifiedBlobReadResult::Incomplete => Ok(()),
286        VerifiedBlobReadResult::Complete { nbytes, digest } => {
287            if nbytes != expected_size {
288                return Err(Error::Other(
289                    format!(
290                        "Blob size mismatch for {expected}: expected {expected_size} bytes, read {nbytes} bytes"
291                    )
292                    .into(),
293                ));
294            }
295            if digest != *expected {
296                return Err(Error::Other(
297                    format!("Blob digest mismatch for {expected}: computed {digest}").into(),
298                ));
299            }
300            Ok(())
301        }
302    }
303}
304
305/// File descriptor range which is reserved for passing data down into the proxy;
306/// avoid configuring the command to use files in this range.  (Also, stdin is
307/// reserved)
308pub const RESERVED_FD_RANGE: Range<i32> = 100..200;
309
310// This is defined in skopeo; maximum size of JSON we will read/write.
311// Note that payload data (non-metadata) should go over a pipe file descriptor.
312const MAX_MSG_SIZE: usize = 32 * 1024;
313
314fn base_proto_version() -> &'static semver::VersionReq {
315    // Introduced in https://github.com/containers/skopeo/pull/1523
316    static BASE_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
317    BASE_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.3").unwrap())
318}
319
320fn layer_info_proto_version() -> &'static semver::VersionReq {
321    static LAYER_INFO_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
322    LAYER_INFO_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.5").unwrap())
323}
324
325fn layer_info_piped_proto_version() -> &'static semver::VersionReq {
326    static LAYER_INFO_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
327    LAYER_INFO_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.7").unwrap())
328}
329
330fn raw_blob_proto_version() -> &'static semver::VersionReq {
331    static RAW_BLOB_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
332    RAW_BLOB_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.8").unwrap())
333}
334
335#[derive(Serialize)]
336struct Request {
337    method: String,
338    args: Vec<serde_json::Value>,
339}
340
341impl Request {
342    fn new<T, I>(method: &str, args: T) -> Self
343    where
344        T: IntoIterator<Item = I>,
345        I: Into<serde_json::Value>,
346    {
347        let args: Vec<_> = args.into_iter().map(|v| v.into()).collect();
348        Self {
349            method: method.to_string(),
350            args,
351        }
352    }
353
354    fn new_bare(method: &str) -> Self {
355        Self {
356            method: method.to_string(),
357            args: vec![],
358        }
359    }
360}
361
362#[derive(Deserialize)]
363struct Reply {
364    success: bool,
365    error: String,
366    pipeid: u32,
367    value: serde_json::Value,
368}
369
370type ChildFuture = Pin<
371    Box<
372        dyn Future<Output = std::result::Result<std::io::Result<std::process::Output>, JoinError>>
373            + Send,
374    >,
375>;
376
377/// Manage a child process proxy to fetch container images.
378pub struct ImageProxy {
379    sockfd: Arc<Mutex<OwnedFd>>,
380    childwait: Arc<AsyncMutex<ChildFuture>>,
381    protover: semver::Version,
382}
383
384impl std::fmt::Debug for ImageProxy {
385    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
386        f.debug_struct("ImageProxy").finish()
387    }
388}
389
390/// Opaque identifier for an image
391#[derive(Debug, PartialEq, Eq)]
392pub struct OpenedImage(u32);
393
394#[derive(Debug, PartialEq, Eq)]
395struct PipeId(NonZeroU32);
396
397impl PipeId {
398    fn try_new(pipeid: u32) -> Option<Self> {
399        Some(Self(NonZeroU32::new(pipeid)?))
400    }
401}
402
403/// Configuration for the proxy.
404#[derive(Debug, Default)]
405#[non_exhaustive]
406pub struct ImageProxyConfig {
407    /// Path to container auth file; equivalent to `skopeo --authfile`.
408    /// This conflicts with [`auth_data`].
409    pub authfile: Option<PathBuf>,
410
411    /// Data stream for container auth.  This conflicts with [`authfile`].
412    pub auth_data: Option<File>,
413
414    /// Do not use default container authentication paths; equivalent to `skopeo --no-creds`.
415    ///
416    /// Defaults to `false`; in other words, use the default file paths from `man containers-auth.json`.
417    pub auth_anonymous: bool,
418
419    // Directory with certificates (*.crt, *.cert, *.key) used to connect to registry
420    // Equivalent to `skopeo --cert-dir`
421    pub certificate_directory: Option<PathBuf>,
422
423    /// Decryption keys to decrypt an encrypted container image.
424    /// equivalent to `skopeo copy --decryption-key <path_to_decryption_key> `
425    pub decryption_keys: Option<Vec<String>>,
426
427    /// If set, disable TLS verification.  Equivalent to `skopeo --tls-verify=false`.
428    pub insecure_skip_tls_verification: Option<bool>,
429
430    /// If set, disable signature verification. Equivalent to `skopeo --insecure-policy`.
431    pub insecure_policy: Option<bool>,
432
433    /// Prefix to add to the user agent string. Equivalent to `skopeo --user-agent-prefix`.
434    /// The resulting user agent will be in the format "prefix skopeo/version".
435    /// This option is only used if the installed skopeo version supports it.
436    pub user_agent_prefix: Option<String>,
437
438    /// If enabled, propagate debug-logging level from the proxy via stderr to the
439    /// current process' stderr. Note than when enabled, this also means that standard
440    /// error will no longer be captured.
441    pub debug: bool,
442
443    /// Provide a configured [`std::process::Command`] instance.
444    ///
445    /// This allows configuring aspects of the resulting child `skopeo` process.
446    /// The intention of this hook is to allow the caller to use e.g.
447    /// `systemd-run` or equivalent containerization tools.  For example you
448    /// can set up a command whose arguments are `systemd-run -Pq -p DynamicUser=yes -- skopeo`.
449    /// You can also set up arbitrary aspects of the child via e.g.
450    /// [`current_dir`] [`pre_exec`].
451    ///
452    /// [`current_dir`]: https://doc.rust-lang.org/std/process/struct.Command.html#method.current_dir
453    /// [`pre_exec`]: https://doc.rust-lang.org/std/os/unix/process/trait.CommandExt.html#tymethod.pre_exec
454    ///
455    /// The default is to wrap via util-linux `setpriv --pdeathsig SIGTERM -- skopeo`,
456    /// which on Linux binds the lifecycle of the child process to the parent.
457    ///
458    /// Note that you *must* add `skopeo` as the primary argument or
459    /// indirectly.  However, all other command line options including
460    /// `experimental-image-proxy` will be injected by this library.
461    /// You may use a different command name from `skopeo` if your
462    /// application has set up a compatible copy, e.g. `/usr/lib/myapp/my-private-skopeo`/
463    pub skopeo_cmd: Option<Command>,
464}
465
466/// Check if skopeo supports --user-agent-prefix by probing --help output
467fn supports_user_agent_prefix() -> bool {
468    static SUPPORTS_USER_AGENT: OnceLock<bool> = OnceLock::new();
469    *SUPPORTS_USER_AGENT.get_or_init(|| {
470        Command::new("skopeo")
471            .arg("--help")
472            .stdout(Stdio::piped())
473            .stderr(Stdio::null())
474            .output()
475            .ok()
476            .and_then(|output| {
477                String::from_utf8(output.stdout)
478                    .ok()
479                    .map(|help| help.contains("--user-agent-prefix"))
480            })
481            .unwrap_or(false)
482    })
483}
484
485impl TryFrom<ImageProxyConfig> for Command {
486    type Error = Error;
487
488    fn try_from(config: ImageProxyConfig) -> Result<Self> {
489        let debug = config.debug || std::env::var_os("CONTAINERS_IMAGE_PROXY_DEBUG").is_some();
490        let mut allocated_fds = RESERVED_FD_RANGE.clone();
491        let mut alloc_fd = || {
492            allocated_fds.next().ok_or_else(|| {
493                Error::Other("Ran out of reserved file descriptors for child".into())
494            })
495        };
496
497        // By default, we set up pdeathsig to "lifecycle bind" the child process to us.
498        let mut c = config.skopeo_cmd.unwrap_or_else(|| {
499            let mut c = std::process::Command::new("skopeo");
500            unsafe {
501                c.pre_exec(|| {
502                    Ok(rustix::process::set_parent_process_death_signal(Some(
503                        rustix::process::Signal::TERM,
504                    ))?)
505                });
506            }
507            c
508        });
509        c.arg("experimental-image-proxy");
510        if debug {
511            c.arg("--debug");
512        }
513        let auth_option_count = [
514            config.authfile.is_some(),
515            config.auth_data.is_some(),
516            config.auth_anonymous,
517        ]
518        .into_iter()
519        .filter(|&x| x)
520        .count();
521        if auth_option_count > 1 {
522            // This is a programmer error really
523            return Err(Error::Configuration(
524                "Conflicting authentication options".into(),
525            ));
526        }
527        if let Some(authfile) = config.authfile {
528            c.arg("--authfile");
529            c.arg(authfile);
530        } else if let Some(mut auth_data) = config.auth_data.map(std::io::BufReader::new) {
531            // If we get the authentication data as a file, we always copy it to a new temporary file under
532            // the assumption that the caller provided it this way to aid in privilege separation where
533            // the file is only readable to privileged code.
534            let target_fd = alloc_fd()?;
535            let tmpd = &cap_std::fs::Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
536            let mut tempfile =
537                cap_tempfile::TempFile::new_anonymous(tmpd).map(std::io::BufWriter::new)?;
538            std::io::copy(&mut auth_data, &mut tempfile)?;
539            let tempfile = tempfile
540                .into_inner()
541                .map_err(|e| e.into_error())?
542                .into_std();
543            let fd = std::sync::Arc::new(tempfile.into());
544            c.take_fd_n(fd, target_fd);
545            c.arg("--authfile");
546            c.arg(format!("/proc/self/fd/{target_fd}"));
547        } else if config.auth_anonymous {
548            c.arg("--no-creds");
549        }
550
551        if let Some(certificate_directory) = config.certificate_directory {
552            c.arg("--cert-dir");
553            c.arg(certificate_directory);
554        }
555
556        if let Some(decryption_keys) = config.decryption_keys {
557            for decryption_key in &decryption_keys {
558                c.arg("--decryption-key");
559                c.arg(decryption_key);
560            }
561        }
562
563        if config.insecure_skip_tls_verification.unwrap_or_default() {
564            c.arg("--tls-verify=false");
565        }
566
567        if config.insecure_policy.unwrap_or_default() {
568            c.arg("--insecure-policy");
569        }
570
571        // Add user agent prefix if provided and supported by skopeo
572        if let Some(user_agent_prefix) = config.user_agent_prefix {
573            if supports_user_agent_prefix() {
574                c.arg("--user-agent-prefix");
575                c.arg(user_agent_prefix);
576            }
577        }
578
579        c.stdout(Stdio::null());
580        if !debug {
581            c.stderr(Stdio::piped());
582        }
583        Ok(c)
584    }
585}
586
587/// BlobInfo collects known information about a blob
588#[derive(Debug, serde::Deserialize)]
589pub struct ConvertedLayerInfo {
590    /// Uncompressed digest of a layer; for more information, see
591    /// https://github.com/opencontainers/image-spec/blob/main/config.md#layer-diffid
592    pub digest: Digest,
593
594    /// Size of blob
595    pub size: u64,
596
597    /// Mediatype of blob
598    pub media_type: oci_spec::image::MediaType,
599}
600
601/// A single fd; requires invoking FinishPipe
602#[derive(Debug)]
603struct FinishPipe {
604    pipeid: PipeId,
605    datafd: OwnedFd,
606}
607
608/// There is a data FD and an error FD. The error FD will be JSON.
609#[derive(Debug)]
610struct DualFds {
611    datafd: OwnedFd,
612    errfd: OwnedFd,
613}
614
615/// Helper trait for parsing the pipeid and/or file descriptors of a reply
616trait FromReplyFds: Send + 'static
617where
618    Self: Sized,
619{
620    fn from_reply(
621        iterable: impl IntoIterator<IntoIter: FusedIterator, Item = OwnedFd>,
622        pipeid: u32,
623    ) -> Result<Self>;
624}
625
626/// No file descriptors or pipeid expected
627impl FromReplyFds for () {
628    fn from_reply(fds: impl IntoIterator<Item = OwnedFd>, pipeid: u32) -> Result<Self> {
629        if fds.into_iter().next().is_some() {
630            return Err(Error::Other("expected no fds".into()));
631        }
632        if pipeid != 0 {
633            return Err(Error::Other("unexpected pipeid".into()));
634        }
635        Ok(())
636    }
637}
638
639/// A FinishPipe instance
640impl FromReplyFds for FinishPipe {
641    fn from_reply(fds: impl IntoIterator<Item = OwnedFd>, pipeid: u32) -> Result<Self> {
642        let Some(pipeid) = PipeId::try_new(pipeid) else {
643            return Err(Error::Other("Expected pipeid for FinishPipe".into()));
644        };
645        let datafd = fds
646            .into_iter()
647            .exactly_one()
648            .map_err(|_| Error::Other("Expected exactly one fd for FinishPipe".into()))?;
649        Ok(Self { pipeid, datafd })
650    }
651}
652
653/// A DualFds instance
654impl FromReplyFds for DualFds {
655    fn from_reply(fds: impl IntoIterator<Item = OwnedFd>, pipeid: u32) -> Result<Self> {
656        if pipeid != 0 {
657            return Err(Error::Other("Unexpected pipeid with DualFds".into()));
658        }
659        let [datafd, errfd] = fds
660            .into_iter()
661            .collect_array()
662            .ok_or_else(|| Error::Other("Expected two fds for DualFds".into()))?;
663        Ok(Self { datafd, errfd })
664    }
665}
666
667impl ImageProxy {
668    /// Create an image proxy that fetches the target image, using default configuration.
669    pub async fn new() -> Result<Self> {
670        Self::new_with_config(Default::default()).await
671    }
672
673    /// Create an image proxy that fetches the target image
674    #[instrument]
675    pub async fn new_with_config(config: ImageProxyConfig) -> Result<Self> {
676        let mut c = Command::try_from(config)?;
677        let (mysock, theirsock) = rustix::net::socketpair(
678            rustix::net::AddressFamily::UNIX,
679            rustix::net::SocketType::SEQPACKET,
680            rustix::net::SocketFlags::CLOEXEC,
681            None,
682        )?;
683        c.stdin(Stdio::from(theirsock));
684        let child = match c.spawn() {
685            Ok(c) => c,
686            Err(error) => return Err(Error::SkopeoSpawnError(error)),
687        };
688        tracing::debug!("Spawned skopeo pid={:?}", child.id());
689        // Here we use std sync API via thread because tokio installs
690        // a SIGCHLD handler which can conflict with e.g. the glib one
691        // which may also be in process.
692        // xref https://github.com/tokio-rs/tokio/issues/3520#issuecomment-968985861
693        let childwait = tokio::task::spawn_blocking(move || child.wait_with_output());
694        let sockfd = Arc::new(Mutex::new(mysock));
695
696        let mut r = Self {
697            sockfd,
698            childwait: Arc::new(AsyncMutex::new(Box::pin(childwait))),
699            protover: semver::Version::new(0, 0, 0),
700        };
701
702        // Verify semantic version
703        let protover: String = r.impl_request("Initialize", [(); 0]).await?;
704        tracing::debug!("Remote protocol version: {protover}");
705        let protover = semver::Version::parse(protover.as_str())?;
706        // Previously we had a feature to opt-in to requiring newer versions using `if cfg!()`.
707        let supported = base_proto_version();
708        if !supported.matches(&protover) {
709            return Err(Error::ProxyTooOld {
710                requested_version: supported.to_string().into(),
711                found_version: protover.to_string().into(),
712            });
713        }
714        r.protover = protover;
715
716        Ok(r)
717    }
718
719    pub fn protocol_version(&self) -> &semver::Version {
720        &self.protover
721    }
722
723    pub fn supports_get_raw_blob(&self) -> bool {
724        raw_blob_proto_version().matches(&self.protover)
725    }
726
727    /// Create and send a request. Should only be used by impl_request.
728    async fn impl_request_raw<T: serde::de::DeserializeOwned + Send + 'static, F: FromReplyFds>(
729        sockfd: Arc<Mutex<OwnedFd>>,
730        req: Request,
731    ) -> Result<(T, F)> {
732        tracing::trace!("sending request {}", req.method.as_str());
733        // TODO: Investigate https://crates.io/crates/uds for SOCK_SEQPACKET tokio
734        let r = tokio::task::spawn_blocking(move || {
735            let sockfd = sockfd.lock().unwrap();
736            let sendbuf = serde_json::to_vec(&req)?;
737            let sockfd = &*sockfd;
738            rustix::net::send(sockfd, &sendbuf, rustix::net::SendFlags::empty())?;
739            drop(sendbuf);
740            let mut buf = [0u8; MAX_MSG_SIZE];
741            let mut cmsg_space: Vec<std::mem::MaybeUninit<u8>> =
742                vec![std::mem::MaybeUninit::uninit(); rustix::cmsg_space!(ScmRights(1))];
743            let mut cmsg_buffer = rustix::net::RecvAncillaryBuffer::new(cmsg_space.as_mut_slice());
744            let iov = std::io::IoSliceMut::new(buf.as_mut());
745            let mut iov = [iov];
746            let nread = rustix::net::recvmsg(
747                sockfd,
748                &mut iov,
749                &mut cmsg_buffer,
750                rustix::net::RecvFlags::CMSG_CLOEXEC,
751            )?
752            .bytes;
753            let fdret = cmsg_buffer
754                .drain()
755                .filter_map(|m| match m {
756                    rustix::net::RecvAncillaryMessage::ScmRights(f) => Some(f),
757                    _ => None,
758                })
759                .flatten();
760            let buf = &buf[..nread];
761            let reply: Reply = serde_json::from_slice(buf)?;
762            if !reply.success {
763                return Err(Error::RequestInitiationFailure {
764                    method: req.method.clone().into(),
765                    error: reply.error.into(),
766                });
767            }
768            let fds = FromReplyFds::from_reply(fdret, reply.pipeid)?;
769            Ok((serde_json::from_value(reply.value)?, fds))
770        })
771        .await
772        .map_err(|e| Error::Other(e.to_string().into()))??;
773        tracing::trace!("completed request");
774        Ok(r)
775    }
776
777    /// Create a request that may return file descriptors, and also check for an unexpected
778    /// termination of the child process.
779    #[instrument(skip(args))]
780    async fn impl_request_with_fds<
781        T: serde::de::DeserializeOwned + Send + 'static,
782        F: FromReplyFds,
783    >(
784        &self,
785        method: &str,
786        args: impl IntoIterator<Item = impl Into<serde_json::Value>>,
787    ) -> Result<(T, F)> {
788        let req = Self::impl_request_raw(Arc::clone(&self.sockfd), Request::new(method, args));
789        let mut childwait = self.childwait.lock().await;
790        tokio::select! {
791            r = req => { r }
792            r = childwait.as_mut() => {
793                let r = r.map_err(|e| Error::Other(e.to_string().into()))??;
794                let stderr = String::from_utf8_lossy(&r.stderr);
795                Err(Error::Other(format!("skopeo proxy unexpectedly exited during request method {}: {}\n{}", method, r.status, stderr).into()))
796            }
797        }
798    }
799
800    /// A synchronous invocation which does not return any file descriptors.
801    async fn impl_request<T: serde::de::DeserializeOwned + Send + 'static>(
802        &self,
803        method: &str,
804        args: impl IntoIterator<Item = impl Into<serde_json::Value>>,
805    ) -> Result<T> {
806        let (r, ()) = self.impl_request_with_fds(method, args).await?;
807        Ok(r)
808    }
809
810    #[instrument]
811    async fn finish_pipe(&self, pipeid: PipeId) -> Result<()> {
812        tracing::debug!("closing pipe");
813        let (r, ()) = self
814            .impl_request_with_fds("FinishPipe", [pipeid.0.get()])
815            .await?;
816        Ok(r)
817    }
818
819    /// Open an image for fetching, using a string reference.
820    #[instrument]
821    pub async fn open_image(&self, imgref: &str) -> Result<OpenedImage> {
822        tracing::debug!("opening image");
823        let imgid = self.impl_request("OpenImage", [imgref]).await?;
824        Ok(OpenedImage(imgid))
825    }
826
827    /// Open an image for fetching, using a structured [`ImageReference`].
828    #[instrument]
829    pub async fn open_image_ref(&self, imgref: &ImageReference) -> Result<OpenedImage> {
830        self.open_image(&imgref.to_string()).await
831    }
832
833    /// Open an image for fetching if it exists, using a string reference.
834    /// Returns `Ok(None)` if the image does not exist.
835    #[instrument]
836    pub async fn open_image_optional(&self, imgref: &str) -> Result<Option<OpenedImage>> {
837        tracing::debug!("opening image");
838        let imgid = self.impl_request("OpenImageOptional", [imgref]).await?;
839        if imgid == 0 {
840            Ok(None)
841        } else {
842            Ok(Some(OpenedImage(imgid)))
843        }
844    }
845
846    /// Open an image for fetching if it exists, using a structured [`ImageReference`].
847    /// Returns `Ok(None)` if the image does not exist.
848    #[instrument]
849    pub async fn open_image_optional_ref(
850        &self,
851        imgref: &ImageReference,
852    ) -> Result<Option<OpenedImage>> {
853        self.open_image_optional(&imgref.to_string()).await
854    }
855
856    #[instrument]
857    pub async fn close_image(&self, img: &OpenedImage) -> Result<()> {
858        self.impl_request("CloseImage", [img.0]).await
859    }
860
861    async fn read_finish_pipe(&self, pipe: FinishPipe) -> Result<Vec<u8>> {
862        let fd = tokio::fs::File::from_std(std::fs::File::from(pipe.datafd));
863        let mut fd = tokio::io::BufReader::new(fd);
864        let mut r = Vec::new();
865        let reader = fd.read_to_end(&mut r);
866        let (nbytes, finish) = tokio::join!(reader, self.finish_pipe(pipe.pipeid));
867        finish?;
868        assert_eq!(nbytes?, r.len());
869        Ok(r)
870    }
871
872    /// Fetch the manifest as raw bytes, converted to OCI if necessary.
873    /// The original digest of the unconverted manifest is also returned.
874    /// For more information on OCI manifests, see <https://github.com/opencontainers/image-spec/blob/main/manifest.md>
875    pub async fn fetch_manifest_raw_oci(&self, img: &OpenedImage) -> Result<(String, Vec<u8>)> {
876        let (digest, pipefd) = self.impl_request_with_fds("GetManifest", [img.0]).await?;
877        Ok((digest, self.read_finish_pipe(pipefd).await?))
878    }
879
880    /// Fetch the manifest.
881    /// For more information on OCI manifests, see <https://github.com/opencontainers/image-spec/blob/main/manifest.md>
882    pub async fn fetch_manifest(
883        &self,
884        img: &OpenedImage,
885    ) -> Result<(String, oci_spec::image::ImageManifest)> {
886        let (digest, raw) = self.fetch_manifest_raw_oci(img).await?;
887        let manifest = serde_json::from_slice(&raw)?;
888        Ok((digest, manifest))
889    }
890
891    /// Fetch the config.
892    /// For more information on OCI config, see <https://github.com/opencontainers/image-spec/blob/main/config.md>
893    pub async fn fetch_config_raw(&self, img: &OpenedImage) -> Result<Vec<u8>> {
894        let ((), pipe) = self.impl_request_with_fds("GetFullConfig", [img.0]).await?;
895        self.read_finish_pipe(pipe).await
896    }
897
898    /// Fetch the config.
899    /// For more information on OCI config, see <https://github.com/opencontainers/image-spec/blob/main/config.md>
900    pub async fn fetch_config(
901        &self,
902        img: &OpenedImage,
903    ) -> Result<oci_spec::image::ImageConfiguration> {
904        let raw = self.fetch_config_raw(img).await?;
905        serde_json::from_slice(&raw).map_err(Into::into)
906    }
907
908    /// Fetch a blob identified by e.g. `sha256:<digest>`.
909    /// <https://github.com/opencontainers/image-spec/blob/main/descriptor.md>
910    ///
911    /// The requested size and digest are verified (by the proxy process).
912    ///
913    /// Note that because of the implementation details of this function, you should
914    /// [`futures::join!`] the returned futures instead of polling one after the other. The
915    /// secondary "driver" future will only return once everything has been read from
916    /// the reader future.
917    #[instrument]
918    pub async fn get_blob(
919        &self,
920        img: &OpenedImage,
921        digest: &Digest,
922        size: u64,
923    ) -> Result<(
924        impl AsyncBufRead + Send + Unpin,
925        impl Future<Output = Result<()>> + Unpin + '_,
926    )> {
927        // For previous discussion on digest/size verification, see
928        // https://github.com/cgwalters/container-image-proxy/issues/1#issuecomment-926712009
929        tracing::debug!("fetching blob");
930        let args: Vec<serde_json::Value> =
931            vec![img.0.into(), digest.to_string().into(), size.into()];
932        // Note that size may be -1 here if e.g. the remote registry doesn't give a Content-Length
933        // for example.
934        // We have always validated the size later (in FinishPipe) so out of conservatism we
935        // just ignore the size here.
936        let (_bloblen, pipe): (serde_json::Number, FinishPipe) =
937            self.impl_request_with_fds("GetBlob", args).await?;
938        let fd = tokio::fs::File::from_std(std::fs::File::from(pipe.datafd));
939        let fd = tokio::io::BufReader::new(fd);
940        let finish = Box::pin(self.finish_pipe(pipe.pipeid));
941        Ok((fd, finish))
942    }
943
944    async fn read_blob_error(fd: OwnedFd) -> std::result::Result<(), GetBlobError> {
945        let fd = tokio::fs::File::from_std(std::fs::File::from(fd));
946        let mut errfd = tokio::io::BufReader::new(fd);
947        let mut buf = Vec::new();
948        errfd
949            .read_to_end(&mut buf)
950            .await
951            .map_err(|e| GetBlobError::Other(e.to_string().into_boxed_str()))?;
952        if buf.is_empty() {
953            return Ok(());
954        }
955        #[derive(Deserialize)]
956        struct RemoteError {
957            code: String,
958            message: String,
959        }
960        let e: RemoteError = serde_json::from_slice(&buf)
961            .map_err(|e| GetBlobError::Other(e.to_string().into_boxed_str()))?;
962        match e.code.as_str() {
963            // Actually this is OK
964            "EPIPE" => Ok(()),
965            "retryable" => Err(GetBlobError::Retryable(e.message.into_boxed_str())),
966            _ => Err(GetBlobError::Other(e.message.into_boxed_str())),
967        }
968    }
969
970    /// Fetch a blob identified by e.g. `sha256:<digest>`; does not perform
971    /// any verification that the blob matches the digest. The size of the
972    /// blob (if available) and a pipe file descriptor are returned.
973    #[instrument]
974    pub async fn get_raw_blob(
975        &self,
976        img: &OpenedImage,
977        digest: &Digest,
978    ) -> Result<(
979        Option<u64>,
980        tokio::fs::File,
981        impl Future<Output = std::result::Result<(), GetBlobError>> + Unpin + '_,
982    )> {
983        tracing::debug!("fetching blob");
984        let args: Vec<serde_json::Value> = vec![img.0.into(), digest.to_string().into()];
985        let (bloblen, fds): (i64, DualFds) = self.impl_request_with_fds("GetRawBlob", args).await?;
986        // See the GetBlob case, we have a best-effort attempt to return the size, but it might not be known
987        let bloblen = u64::try_from(bloblen).ok();
988        let fd = tokio::fs::File::from_std(std::fs::File::from(fds.datafd));
989        let err = Self::read_blob_error(fds.errfd).boxed();
990        Ok((bloblen, fd, err))
991    }
992
993    /// Fetch a blob as a stream, preferring `GetRawBlob` and falling back to `GetBlob`.
994    ///
995    /// The returned `driver` future completes only after proxy-side processing finishes; it also
996    /// verifies `expected_size` and `digest` for the `GetRawBlob` path.
997    #[instrument]
998    pub async fn get_blob_stream<'a>(
999        &'a self,
1000        img: &OpenedImage,
1001        digest: &Digest,
1002        expected_size: u64,
1003    ) -> Result<BlobStream<'a>> {
1004        if !self.supports_get_raw_blob() {
1005            let (reader, driver) = self.get_blob(img, digest, expected_size).await?;
1006            let driver = driver.boxed();
1007            return Ok(BlobStream {
1008                source: BlobStreamSource::GetBlob,
1009                expected_size,
1010                reader: Box::new(reader),
1011                driver,
1012            });
1013        }
1014
1015        let (reported_size, fd, err) = self.get_raw_blob(img, digest).await?;
1016        if let Some(sz) = reported_size {
1017            if sz != expected_size {
1018                return Err(Error::Other(
1019                    format!(
1020                        "Blob size mismatch for {digest}: expected {expected_size} bytes, got {sz} bytes"
1021                    )
1022                    .into(),
1023                ));
1024            }
1025        }
1026
1027        let expected = digest.clone();
1028        let (tx, rx) = oneshot::channel();
1029        let verified = VerifiedBlobReader::new(fd, expected.clone(), tx)?;
1030        let driver = async move {
1031            err.await.map_err(Error::from)?;
1032            match rx.await {
1033                Ok(r) => verify_blob_bytes_read(&expected, expected_size, r),
1034                Err(e) => Err(Error::Other(
1035                    format!("Blob stream verification channel error: {e}").into(),
1036                )),
1037            }
1038        }
1039        .boxed();
1040        Ok(BlobStream {
1041            source: BlobStreamSource::GetRawBlob,
1042            expected_size,
1043            reader: Box::new(verified),
1044            driver,
1045        })
1046    }
1047
1048    /// Fetch a descriptor. The requested size and digest are verified (by the proxy process).
1049    #[instrument]
1050    pub async fn get_descriptor(
1051        &self,
1052        img: &OpenedImage,
1053        descriptor: &Descriptor,
1054    ) -> Result<(
1055        impl AsyncBufRead + Send + Unpin,
1056        impl Future<Output = Result<()>> + Unpin + '_,
1057    )> {
1058        self.get_blob(img, descriptor.digest(), descriptor.size())
1059            .await
1060    }
1061
1062    ///Returns data that can be used to find the "diffid" corresponding to a particular layer.
1063    #[instrument]
1064    pub async fn get_layer_info(
1065        &self,
1066        img: &OpenedImage,
1067    ) -> Result<Option<Vec<ConvertedLayerInfo>>> {
1068        tracing::debug!("Getting layer info");
1069        if layer_info_piped_proto_version().matches(&self.protover) {
1070            let ((), pipe) = self
1071                .impl_request_with_fds("GetLayerInfoPiped", [img.0])
1072                .await?;
1073            let buf = self.read_finish_pipe(pipe).await?;
1074            return Ok(Some(serde_json::from_slice(&buf)?));
1075        }
1076        if !layer_info_proto_version().matches(&self.protover) {
1077            return Ok(None);
1078        }
1079        let layers = self.impl_request("GetLayerInfo", [img.0]).await?;
1080        Ok(Some(layers))
1081    }
1082
1083    /// Close the connection and wait for the child process to exit successfully.
1084    #[instrument]
1085    pub async fn finalize(self) -> Result<()> {
1086        let _ = &self;
1087        let req = Request::new_bare("Shutdown");
1088        let sendbuf = serde_json::to_vec(&req)?;
1089        // SAFETY: Only panics if a worker thread already panic'd
1090        let sockfd = Arc::try_unwrap(self.sockfd).unwrap().into_inner().unwrap();
1091        rustix::net::send(sockfd, &sendbuf, rustix::net::SendFlags::empty())?;
1092        drop(sendbuf);
1093        tracing::debug!("sent shutdown request");
1094        let mut childwait = self.childwait.lock().await;
1095        let output = childwait
1096            .as_mut()
1097            .await
1098            .map_err(|e| Error::new_other(e.to_string()))??;
1099        if !output.status.success() {
1100            let stderr = String::from_utf8_lossy(&output.stderr);
1101            return Err(Error::RequestReturned(
1102                format!("proxy failed: {}\n{}", output.status, stderr).into(),
1103            ));
1104        }
1105        tracing::debug!("proxy exited successfully");
1106        Ok(())
1107    }
1108}
1109
1110#[cfg(test)]
1111mod tests {
1112    use std::io::{BufWriter, Seek, Write};
1113    use std::os::fd::{AsRawFd, OwnedFd};
1114
1115    use super::*;
1116    use cap_std_ext::cap_std::fs::Dir;
1117    use rustix::fs::{memfd_create, MemfdFlags};
1118
1119    /// Check if we have skopeo
1120    fn check_skopeo() -> bool {
1121        static HAVE_SKOPEO: OnceLock<bool> = OnceLock::new();
1122        *HAVE_SKOPEO.get_or_init(|| {
1123            Command::new("skopeo")
1124                .arg("--help")
1125                .stdout(Stdio::null())
1126                .stderr(Stdio::null())
1127                .status()
1128                .is_ok()
1129        })
1130    }
1131
1132    fn validate(c: Command, contains: &[&str], not_contains: &[&str]) {
1133        // Format via debug, because
1134        // https://doc.rust-lang.org/std/process/struct.Command.html#method.get_args
1135        // is experimental
1136        let d = format!("{:?}", c);
1137        for c in contains {
1138            assert!(d.contains(c), "{} missing {}", d, c);
1139        }
1140        for c in not_contains {
1141            assert!(!d.contains(c), "{} should not contain {}", d, c);
1142        }
1143    }
1144
1145    #[test]
1146    fn proxy_configs() {
1147        let tmpd = &cap_tempfile::tempdir(cap_std::ambient_authority()).unwrap();
1148
1149        let c = Command::try_from(ImageProxyConfig::default()).unwrap();
1150        validate(
1151            c,
1152            &["experimental-image-proxy"],
1153            &["--no-creds", "--tls-verify", "--authfile"],
1154        );
1155
1156        let c = Command::try_from(ImageProxyConfig {
1157            authfile: Some(PathBuf::from("/path/to/authfile")),
1158            ..Default::default()
1159        })
1160        .unwrap();
1161        validate(c, &[r"--authfile", "/path/to/authfile"], &[]);
1162
1163        let decryption_key_path = "/path/to/decryption_key";
1164        let c = Command::try_from(ImageProxyConfig {
1165            decryption_keys: Some(vec![decryption_key_path.to_string()]),
1166            ..Default::default()
1167        })
1168        .unwrap();
1169        validate(c, &[r"--decryption-key", "/path/to/decryption_key"], &[]);
1170
1171        let c = Command::try_from(ImageProxyConfig {
1172            certificate_directory: Some(PathBuf::from("/path/to/certs")),
1173            ..Default::default()
1174        })
1175        .unwrap();
1176        validate(c, &[r"--cert-dir", "/path/to/certs"], &[]);
1177
1178        let c = Command::try_from(ImageProxyConfig {
1179            insecure_skip_tls_verification: Some(true),
1180            ..Default::default()
1181        })
1182        .unwrap();
1183        validate(c, &[r"--tls-verify=false"], &[]);
1184
1185        let c = Command::try_from(ImageProxyConfig {
1186            insecure_policy: Some(true),
1187            ..Default::default()
1188        })
1189        .unwrap();
1190        validate(c, &[r"--insecure-policy"], &[]);
1191
1192        let mut tmpf = cap_tempfile::TempFile::new_anonymous(tmpd).unwrap();
1193        tmpf.write_all(r#"{ "auths": {} "#.as_bytes()).unwrap();
1194        tmpf.seek(std::io::SeekFrom::Start(0)).unwrap();
1195        let c = Command::try_from(ImageProxyConfig {
1196            auth_data: Some(tmpf.into_std()),
1197            ..Default::default()
1198        })
1199        .unwrap();
1200        validate(c, &["--authfile", "/proc/self/fd/100"], &[]);
1201
1202        // Test user-agent-prefix - only validate if supported
1203        let c = Command::try_from(ImageProxyConfig {
1204            user_agent_prefix: Some("bootc/1.0".to_string()),
1205            ..Default::default()
1206        })
1207        .unwrap();
1208        if supports_user_agent_prefix() {
1209            validate(c, &["--user-agent-prefix", "bootc/1.0"], &[]);
1210        } else {
1211            validate(c, &[], &["--user-agent-prefix"]);
1212        }
1213    }
1214
1215    #[tokio::test]
1216    async fn skopeo_not_found() {
1217        let mut config = ImageProxyConfig {
1218            ..ImageProxyConfig::default()
1219        };
1220        config.skopeo_cmd = Some(Command::new("no-skopeo"));
1221
1222        match ImageProxy::new_with_config(config).await {
1223            Ok(_) => panic!("Expected an error"),
1224            Err(ref e @ Error::SkopeoSpawnError(ref inner)) => {
1225                assert_eq!(inner.kind(), std::io::ErrorKind::NotFound);
1226                // Just to double check
1227                assert!(e
1228                    .to_string()
1229                    .contains("skopeo spawn error: No such file or directory"));
1230            }
1231            Err(e) => panic!("Unexpected error {e}"),
1232        }
1233    }
1234
1235    #[tokio::test]
1236    async fn test_proxy_send_sync() {
1237        fn assert_send_sync(_x: impl Send + Sync) {}
1238
1239        let Ok(proxy) = ImageProxy::new().await else {
1240            // doesn't matter: we only actually care to test if this compiles
1241            return;
1242        };
1243        assert_send_sync(&proxy);
1244        assert_send_sync(proxy);
1245
1246        let opened = OpenedImage(0);
1247        assert_send_sync(&opened);
1248        assert_send_sync(opened);
1249    }
1250
1251    fn generate_err_fd(v: serde_json::Value) -> Result<OwnedFd> {
1252        let tmp = Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
1253        let mut tf = cap_tempfile::TempFile::new_anonymous(&tmp).map(BufWriter::new)?;
1254        serde_json::to_writer(&mut tf, &v)?;
1255        let mut tf = tf.into_inner().map_err(|e| e.into_error())?;
1256        tf.seek(std::io::SeekFrom::Start(0))?;
1257        let r = tf.into_std().into();
1258        Ok(r)
1259    }
1260
1261    #[tokio::test]
1262    async fn test_read_blob_error_retryable() -> Result<()> {
1263        let retryable = serde_json::json!({
1264            "code": "retryable",
1265            "message": "foo",
1266        });
1267        let retryable = generate_err_fd(retryable)?;
1268        let err = ImageProxy::read_blob_error(retryable).boxed();
1269        let e = err.await.unwrap_err();
1270        match e {
1271            GetBlobError::Retryable(s) => assert_eq!(s.as_ref(), "foo"),
1272            _ => panic!("Unexpected error {e:?}"),
1273        }
1274        Ok(())
1275    }
1276
1277    #[tokio::test]
1278    async fn test_read_blob_error_none() -> Result<()> {
1279        let tmp = Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
1280        let tf = cap_tempfile::TempFile::new_anonymous(&tmp)?.into_std();
1281        let err = ImageProxy::read_blob_error(tf.into()).boxed();
1282        err.await.unwrap();
1283        Ok(())
1284    }
1285
1286    #[tokio::test]
1287    async fn test_read_blob_error_other() -> Result<()> {
1288        let other = serde_json::json!({
1289            "code": "other",
1290            "message": "bar",
1291        });
1292        let other = generate_err_fd(other)?;
1293        let err = ImageProxy::read_blob_error(other).boxed();
1294        let e = err.await.unwrap_err();
1295        match e {
1296            GetBlobError::Other(s) => assert_eq!(s.as_ref(), "bar"),
1297            _ => panic!("Unexpected error {e:?}"),
1298        }
1299        Ok(())
1300    }
1301
1302    #[tokio::test]
1303    async fn test_read_blob_error_epipe() -> Result<()> {
1304        let epipe = serde_json::json!({
1305            "code": "EPIPE",
1306            "message": "baz",
1307        });
1308        let epipe = generate_err_fd(epipe)?;
1309        let err = ImageProxy::read_blob_error(epipe).boxed();
1310        err.await.unwrap();
1311        Ok(())
1312    }
1313
1314    #[tokio::test]
1315    async fn test_verified_blob_reader_ok() -> Result<()> {
1316        use std::str::FromStr;
1317        use tokio::io::AsyncReadExt;
1318
1319        let data = b"hello world";
1320        let mut tmp = tempfile::NamedTempFile::new()?;
1321        tmp.as_file_mut().write_all(data)?;
1322        tmp.as_file_mut().sync_all()?;
1323
1324        let digest = {
1325            let mut h = sha2::Sha256::new();
1326            h.update(data);
1327            Digest::from_str(&format!("sha256:{}", hex::encode(h.finalize()))).unwrap()
1328        };
1329
1330        let fd = tokio::fs::File::open(tmp.path()).await?;
1331        let (tx, rx) = oneshot::channel();
1332        let mut reader = VerifiedBlobReader::new(fd, digest.clone(), tx)?;
1333
1334        let mut out = Vec::new();
1335        reader.read_to_end(&mut out).await?;
1336        assert_eq!(&out, data);
1337
1338        let result = rx.await.map_err(|e| Error::Other(e.to_string().into()))?;
1339        verify_blob_bytes_read(&digest, data.len() as u64, result)?;
1340        Ok(())
1341    }
1342
1343    #[tokio::test]
1344    async fn test_verified_blob_reader_digest_mismatch() -> Result<()> {
1345        use std::str::FromStr;
1346        use tokio::io::AsyncReadExt;
1347
1348        let data = b"hello world";
1349        let mut tmp = tempfile::NamedTempFile::new()?;
1350        tmp.as_file_mut().write_all(data)?;
1351        tmp.as_file_mut().sync_all()?;
1352
1353        let digest = {
1354            let mut h = sha2::Sha256::new();
1355            h.update(b"not the content");
1356            Digest::from_str(&format!("sha256:{}", hex::encode(h.finalize()))).unwrap()
1357        };
1358
1359        let fd = tokio::fs::File::open(tmp.path()).await?;
1360        let (tx, rx) = oneshot::channel();
1361        let mut reader = VerifiedBlobReader::new(fd, digest.clone(), tx)?;
1362
1363        let mut out = Vec::new();
1364        reader.read_to_end(&mut out).await?;
1365        assert_eq!(&out, data);
1366
1367        let result = rx.await.map_err(|e| Error::Other(e.to_string().into()))?;
1368        assert!(verify_blob_bytes_read(&digest, data.len() as u64, result).is_err());
1369        Ok(())
1370    }
1371
1372    #[tokio::test]
1373    async fn test_get_blob_stream_oci_dir() -> Result<()> {
1374        use ocidir::{oci_spec as ocidir_spec, OciDir};
1375
1376        if !check_skopeo() {
1377            return Ok(());
1378        }
1379
1380        let td = tempfile::tempdir()?;
1381        fn to_other<E: std::fmt::Display>(e: E) -> Error {
1382            Error::Other(e.to_string().into())
1383        }
1384        let layer_bytes = b"layer bytes";
1385        let dir = ocidir::cap_std::fs::Dir::open_ambient_dir(
1386            td.path(),
1387            ocidir::cap_std::ambient_authority(),
1388        )
1389        .map_err(to_other)?;
1390        let oci_dir = OciDir::ensure(dir).map_err(to_other)?;
1391        let mut layerw = oci_dir.create_gzip_layer(None).map_err(to_other)?;
1392        layerw.write_all(layer_bytes)?;
1393        let layer = layerw.complete().map_err(to_other)?;
1394        let layer_desc = layer.descriptor().build().unwrap();
1395        let layer_digest = Digest::from_str(layer_desc.digest().as_ref()).map_err(to_other)?;
1396        let layer_size = layer_desc.size();
1397
1398        let mut manifest = oci_dir
1399            .new_empty_manifest()
1400            .map_err(to_other)?
1401            .build()
1402            .map_err(to_other)?;
1403        let mut config = ocidir_spec::image::ImageConfigurationBuilder::default()
1404            .architecture("amd64")
1405            .os("linux")
1406            .build()
1407            .unwrap();
1408        oci_dir.push_layer(&mut manifest, &mut config, layer, "layer", None);
1409        let config_desc = oci_dir.write_config(config).map_err(to_other)?;
1410        manifest.set_config(config_desc);
1411        oci_dir
1412            .insert_manifest(
1413                manifest,
1414                Some("test"),
1415                ocidir_spec::image::Platform::default(),
1416            )
1417            .map_err(to_other)?;
1418
1419        let proxy = ImageProxy::new().await?;
1420        let imgref = format!("oci:{}:test", td.path().display());
1421        let img = proxy.open_image(&imgref).await?;
1422
1423        let expected_source = match std::env::var("EXPECT_BLOB_STREAM_SOURCE").ok().as_deref() {
1424            Some("GetRawBlob") => BlobStreamSource::GetRawBlob,
1425            Some("GetBlob") => BlobStreamSource::GetBlob,
1426            Some(v) => {
1427                return Err(Error::Other(
1428                    format!(
1429                        "Invalid EXPECT_BLOB_STREAM_SOURCE={v}; expected GetRawBlob or GetBlob"
1430                    )
1431                    .into(),
1432                ));
1433            }
1434            None => {
1435                if proxy.supports_get_raw_blob() {
1436                    BlobStreamSource::GetRawBlob
1437                } else {
1438                    BlobStreamSource::GetBlob
1439                }
1440            }
1441        };
1442
1443        let stream = proxy
1444            .get_blob_stream(&img, &layer_digest, layer_size)
1445            .await?;
1446        assert_eq!(stream.source(), expected_source);
1447        let (mut reader, driver) = stream.into_parts();
1448
1449        let mut sink = tokio::io::sink();
1450        let read = async move {
1451            let n = tokio::io::copy(&mut *reader, &mut sink).await?;
1452            Result::Ok(n)
1453        };
1454        let (n, driver) = tokio::join!(read, driver);
1455        assert_eq!(n?, layer_size);
1456        driver?;
1457
1458        proxy.close_image(&img).await?;
1459        proxy.finalize().await?;
1460        Ok(())
1461    }
1462
1463    // Helper to create a dummy OwnedFd using memfd_create for testing.
1464    fn create_dummy_fd() -> OwnedFd {
1465        memfd_create(c"test-fd", MemfdFlags::CLOEXEC).unwrap()
1466    }
1467
1468    #[test]
1469    fn test_new_from_raw_values_finish_pipe() {
1470        let datafd = create_dummy_fd();
1471        // Keep a raw fd to compare later, as fds_and_pipeid consumes datafd
1472        let raw_datafd_val = datafd.as_raw_fd();
1473        let fds = vec![datafd];
1474        let v = FinishPipe::from_reply(fds, 1).unwrap();
1475        assert_eq!(v.pipeid.0.get(), 1);
1476        assert_eq!(v.datafd.as_raw_fd(), raw_datafd_val);
1477    }
1478
1479    #[test]
1480    fn test_new_from_raw_values_dual_fds() {
1481        let datafd = create_dummy_fd();
1482        let errfd = create_dummy_fd();
1483        let raw_datafd_val = datafd.as_raw_fd();
1484        let raw_errfd_val = errfd.as_raw_fd();
1485        let fds = vec![datafd, errfd];
1486        let v = DualFds::from_reply(fds, 0).unwrap();
1487        assert_eq!(v.datafd.as_raw_fd(), raw_datafd_val);
1488        assert_eq!(v.errfd.as_raw_fd(), raw_errfd_val);
1489    }
1490
1491    #[test]
1492    fn test_new_from_raw_values_error_too_many_fds() {
1493        let fds = vec![create_dummy_fd(), create_dummy_fd(), create_dummy_fd()];
1494        match DualFds::from_reply(fds, 0) {
1495            Ok(v) => unreachable!("{v:?}"),
1496            Err(Error::Other(msg)) => {
1497                assert_eq!(msg.as_ref(), "Expected two fds for DualFds")
1498            }
1499            Err(other) => unreachable!("{other}"),
1500        }
1501    }
1502
1503    #[test]
1504    fn test_new_from_raw_values_error_fd_with_zero_pipeid() {
1505        let fds = vec![create_dummy_fd()];
1506        match FinishPipe::from_reply(fds, 0) {
1507            Ok(v) => unreachable!("{v:?}"),
1508            Err(Error::Other(msg)) => {
1509                assert_eq!(msg.as_ref(), "Expected pipeid for FinishPipe")
1510            }
1511            Err(other) => unreachable!("{other}"),
1512        }
1513    }
1514
1515    #[test]
1516    fn test_new_from_raw_values_error_pipeid_with_both_fds() {
1517        let fds = vec![create_dummy_fd(), create_dummy_fd()];
1518        match DualFds::from_reply(fds, 1) {
1519            Ok(v) => unreachable!("{v:?}"),
1520            Err(Error::Other(msg)) => {
1521                assert_eq!(msg.as_ref(), "Unexpected pipeid with DualFds")
1522            }
1523            Err(other) => unreachable!("{other}"),
1524        }
1525    }
1526
1527    #[test]
1528    fn test_new_from_raw_values_error_no_fd_with_pipeid() {
1529        let fds: Vec<OwnedFd> = vec![];
1530        match FinishPipe::from_reply(fds, 1) {
1531            Ok(v) => unreachable!("{v:?}"),
1532            Err(Error::Other(msg)) => {
1533                assert_eq!(msg.as_ref(), "Expected exactly one fd for FinishPipe")
1534            }
1535            Err(other) => unreachable!("{other}"),
1536        }
1537    }
1538
1539    #[tokio::test]
1540    #[ignore = "https://github.com/coreos/rpm-ostree/issues/5442"]
1541    async fn test_open_optional() -> Result<()> {
1542        if !check_skopeo() {
1543            return Ok(());
1544        }
1545
1546        let td = tempfile::tempdir()?;
1547        let td = td.path().to_str().unwrap();
1548        let proxy = ImageProxy::new().await?;
1549        let imgpath = format!("oci-archive:{td}/some-nonexistent-image.ociarchive");
1550        let img = proxy.open_image_optional(&imgpath).await.unwrap();
1551        assert!(img.is_none());
1552
1553        Ok(())
1554    }
1555}