containers_image_proxy/
imageproxy.rs

1//! Run skopeo as a subprocess to fetch container images.
2//!
3//! This allows fetching a container image manifest and layers in a streaming fashion.
4//!
5//! More information: <https://github.com/containers/skopeo/pull/1476>
6
7use cap_std_ext::prelude::CapStdExtCommandExt;
8use cap_std_ext::{cap_std, cap_tempfile};
9use futures_util::Future;
10use oci_spec::image::{Descriptor, Digest};
11use serde::{Deserialize, Serialize};
12use std::fs::File;
13use std::ops::Range;
14use std::os::fd::OwnedFd;
15use std::os::unix::prelude::CommandExt;
16use std::path::PathBuf;
17use std::pin::Pin;
18use std::process::{Command, Stdio};
19use std::sync::{Arc, Mutex, OnceLock};
20use thiserror::Error;
21use tokio::io::{AsyncBufRead, AsyncReadExt};
22use tokio::sync::Mutex as AsyncMutex;
23use tokio::task::JoinError;
24use tracing::instrument;
25
26/// Errors returned by this crate.
27#[derive(Error, Debug)]
28#[non_exhaustive]
29pub enum Error {
30    #[error("i/o error")]
31    /// An input/output error
32    Io(#[from] std::io::Error),
33    #[error("skopeo spawn error: {}", .0)]
34    /// An error spawning skopeo
35    SkopeoSpawnError(#[source] std::io::Error),
36    #[error("serialization error")]
37    /// Returned when serialization or deserialization fails
38    SerDe(#[from] serde_json::Error),
39    /// The proxy failed to initiate a request
40    #[error("failed to invoke method {method}: {error}")]
41    RequestInitiationFailure { method: Box<str>, error: Box<str> },
42    /// An error returned from the remote proxy
43    #[error("proxy request returned error")]
44    RequestReturned(Box<str>),
45    #[error("semantic version error")]
46    SemanticVersion(#[from] semver::Error),
47    #[error("proxy too old (requested={requested_version} found={found_version}) error")]
48    /// The proxy doesn't support the requested semantic version
49    ProxyTooOld {
50        requested_version: Box<str>,
51        found_version: Box<str>,
52    },
53    #[error("configuration error")]
54    /// Conflicting or missing configuration
55    Configuration(Box<str>),
56    #[error("error")]
57    /// An unknown other error
58    Other(Box<str>),
59}
60
61impl Error {
62    pub(crate) fn new_other(e: impl Into<Box<str>>) -> Self {
63        Self::Other(e.into())
64    }
65}
66
67impl From<rustix::io::Errno> for Error {
68    fn from(value: rustix::io::Errno) -> Self {
69        Self::Io(value.into())
70    }
71}
72
73/// The error type returned from this crate.
74pub type Result<T> = std::result::Result<T, Error>;
75
76/// Re-export because we use this in our public APIs
77pub use oci_spec;
78
79/// File descriptor range which is reserved for passing data down into the proxy;
80/// avoid configuring the command to use files in this range.  (Also, stdin is
81/// reserved)
82pub const RESERVED_FD_RANGE: Range<i32> = 100..200;
83
84// This is defined in skopeo; maximum size of JSON we will read/write.
85// Note that payload data (non-metadata) should go over a pipe file descriptor.
86const MAX_MSG_SIZE: usize = 32 * 1024;
87
88fn base_proto_version() -> &'static semver::VersionReq {
89    // Introduced in https://github.com/containers/skopeo/pull/1523
90    static BASE_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
91    BASE_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.3").unwrap())
92}
93
94fn layer_info_proto_version() -> &'static semver::VersionReq {
95    static LAYER_INFO_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
96    LAYER_INFO_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.5").unwrap())
97}
98
99fn layer_info_piped_proto_version() -> &'static semver::VersionReq {
100    static LAYER_INFO_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
101    LAYER_INFO_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.7").unwrap())
102}
103
104#[derive(Serialize)]
105struct Request {
106    method: String,
107    args: Vec<serde_json::Value>,
108}
109
110impl Request {
111    fn new<T, I>(method: &str, args: T) -> Self
112    where
113        T: IntoIterator<Item = I>,
114        I: Into<serde_json::Value>,
115    {
116        let args: Vec<_> = args.into_iter().map(|v| v.into()).collect();
117        Self {
118            method: method.to_string(),
119            args,
120        }
121    }
122
123    fn new_bare(method: &str) -> Self {
124        Self {
125            method: method.to_string(),
126            args: vec![],
127        }
128    }
129}
130
131#[derive(Deserialize)]
132struct Reply {
133    success: bool,
134    error: String,
135    pipeid: u32,
136    value: serde_json::Value,
137}
138
139type ChildFuture = Pin<
140    Box<
141        dyn Future<Output = std::result::Result<std::io::Result<std::process::Output>, JoinError>>
142            + Send,
143    >,
144>;
145
146/// Manage a child process proxy to fetch container images.
147pub struct ImageProxy {
148    sockfd: Arc<Mutex<OwnedFd>>,
149    childwait: Arc<AsyncMutex<ChildFuture>>,
150    protover: semver::Version,
151}
152
153impl std::fmt::Debug for ImageProxy {
154    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155        f.debug_struct("ImageProxy").finish()
156    }
157}
158
159/// Opaque identifier for an image
160#[derive(Debug, PartialEq, Eq)]
161pub struct OpenedImage(u32);
162
163/// Configuration for the proxy.
164#[derive(Debug, Default)]
165pub struct ImageProxyConfig {
166    /// Path to container auth file; equivalent to `skopeo --authfile`.
167    /// This conflicts with [`auth_data`].
168    pub authfile: Option<PathBuf>,
169
170    /// Data stream for container auth.  This conflicts with [`authfile`].
171    pub auth_data: Option<File>,
172
173    /// Do not use default container authentication paths; equivalent to `skopeo --no-creds`.
174    ///
175    /// Defaults to `false`; in other words, use the default file paths from `man containers-auth.json`.
176    pub auth_anonymous: bool,
177
178    // Directory with certificates (*.crt, *.cert, *.key) used to connect to registry
179    // Equivalent to `skopeo --cert-dir`
180    pub certificate_directory: Option<PathBuf>,
181
182    /// Decryption keys to decrypt an encrypted container image.
183    /// equivalent to `skopeo copy --decryption-key <path_to_decryption_key> `
184    pub decryption_keys: Option<Vec<String>>,
185
186    /// If set, disable TLS verification.  Equivalent to `skopeo --tls-verify=false`.
187    pub insecure_skip_tls_verification: Option<bool>,
188
189    /// If enabled, propagate debug-logging level from the proxy via stderr to the
190    /// current process' stderr. Note than when enabled, this also means that standard
191    /// error will no longer be captured.
192    pub debug: bool,
193
194    /// Provide a configured [`std::process::Command`] instance.
195    ///
196    /// This allows configuring aspects of the resulting child `skopeo` process.
197    /// The intention of this hook is to allow the caller to use e.g.
198    /// `systemd-run` or equivalent containerization tools.  For example you
199    /// can set up a command whose arguments are `systemd-run -Pq -p DynamicUser=yes -- skopeo`.
200    /// You can also set up arbitrary aspects of the child via e.g.
201    /// [`current_dir`] [`pre_exec`].
202    ///
203    /// [`current_dir`]: https://doc.rust-lang.org/std/process/struct.Command.html#method.current_dir
204    /// [`pre_exec`]: https://doc.rust-lang.org/std/os/unix/process/trait.CommandExt.html#tymethod.pre_exec
205    ///
206    /// The default is to wrap via util-linux `setpriv --pdeathsig SIGTERM -- skopeo`,
207    /// which on Linux binds the lifecycle of the child process to the parent.
208    ///
209    /// Note that you *must* add `skopeo` as the primary argument or
210    /// indirectly.  However, all other command line options including
211    /// `experimental-image-proxy` will be injected by this library.
212    /// You may use a different command name from `skopeo` if your
213    /// application has set up a compatible copy, e.g. `/usr/lib/myapp/my-private-skopeo`/
214    pub skopeo_cmd: Option<Command>,
215}
216
217impl TryFrom<ImageProxyConfig> for Command {
218    type Error = Error;
219
220    fn try_from(config: ImageProxyConfig) -> Result<Self> {
221        let debug = config.debug || std::env::var_os("CONTAINERS_IMAGE_PROXY_DEBUG").is_some();
222        let mut allocated_fds = RESERVED_FD_RANGE.clone();
223        let mut alloc_fd = || {
224            allocated_fds.next().ok_or_else(|| {
225                Error::Other("Ran out of reserved file descriptors for child".into())
226            })
227        };
228
229        // By default, we set up pdeathsig to "lifecycle bind" the child process to us.
230        let mut c = config.skopeo_cmd.unwrap_or_else(|| {
231            let mut c = std::process::Command::new("skopeo");
232            unsafe {
233                c.pre_exec(|| {
234                    rustix::process::set_parent_process_death_signal(Some(
235                        rustix::process::Signal::Term,
236                    ))
237                    .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
238                });
239            }
240            c
241        });
242        c.arg("experimental-image-proxy");
243        if debug {
244            c.arg("--debug");
245        }
246        let auth_option_count = [
247            config.authfile.is_some(),
248            config.auth_data.is_some(),
249            config.auth_anonymous,
250        ]
251        .into_iter()
252        .filter(|&x| x)
253        .count();
254        if auth_option_count > 1 {
255            // This is a programmer error really
256            return Err(Error::Configuration(
257                "Conflicting authentication options".into(),
258            ));
259        }
260        if let Some(authfile) = config.authfile {
261            c.arg("--authfile");
262            c.arg(authfile);
263        } else if let Some(mut auth_data) = config.auth_data.map(std::io::BufReader::new) {
264            // If we get the authentication data as a file, we always copy it to a new temporary file under
265            // the assumption that the caller provided it this way to aid in privilege separation where
266            // the file is only readable to privileged code.
267            let target_fd = alloc_fd()?;
268            let tmpd = &cap_std::fs::Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
269            let mut tempfile =
270                cap_tempfile::TempFile::new_anonymous(tmpd).map(std::io::BufWriter::new)?;
271            std::io::copy(&mut auth_data, &mut tempfile)?;
272            let tempfile = tempfile
273                .into_inner()
274                .map_err(|e| e.into_error())?
275                .into_std();
276            let fd = std::sync::Arc::new(tempfile.into());
277            c.take_fd_n(fd, target_fd);
278            c.arg("--authfile");
279            c.arg(format!("/proc/self/fd/{target_fd}"));
280        } else if config.auth_anonymous {
281            c.arg("--no-creds");
282        }
283
284        if let Some(certificate_directory) = config.certificate_directory {
285            c.arg("--cert-dir");
286            c.arg(certificate_directory);
287        }
288
289        if let Some(decryption_keys) = config.decryption_keys {
290            for decryption_key in &decryption_keys {
291                c.arg("--decryption-key");
292                c.arg(decryption_key);
293            }
294        }
295
296        if config.insecure_skip_tls_verification.unwrap_or_default() {
297            c.arg("--tls-verify=false");
298        }
299        c.stdout(Stdio::null());
300        if !debug {
301            c.stderr(Stdio::piped());
302        }
303        Ok(c)
304    }
305}
306
307/// BlobInfo collects known information about a blob
308#[derive(Debug, serde::Deserialize)]
309pub struct ConvertedLayerInfo {
310    /// Uncompressed digest of a layer; for more information, see
311    /// https://github.com/opencontainers/image-spec/blob/main/config.md#layer-diffid
312    pub digest: Digest,
313
314    /// Size of blob
315    pub size: u64,
316
317    /// Mediatype of blob
318    pub media_type: oci_spec::image::MediaType,
319}
320
321impl ImageProxy {
322    /// Create an image proxy that fetches the target image, using default configuration.
323    pub async fn new() -> Result<Self> {
324        Self::new_with_config(Default::default()).await
325    }
326
327    /// Create an image proxy that fetches the target image
328    #[instrument]
329    pub async fn new_with_config(config: ImageProxyConfig) -> Result<Self> {
330        let mut c = Command::try_from(config)?;
331        let (mysock, theirsock) = rustix::net::socketpair(
332            rustix::net::AddressFamily::UNIX,
333            rustix::net::SocketType::SEQPACKET,
334            rustix::net::SocketFlags::CLOEXEC,
335            None,
336        )?;
337        c.stdin(Stdio::from(theirsock));
338        let child = match c.spawn() {
339            Ok(c) => c,
340            Err(error) => return Err(Error::SkopeoSpawnError(error)),
341        };
342        tracing::debug!("Spawned skopeo pid={:?}", child.id());
343        // Here we use std sync API via thread because tokio installs
344        // a SIGCHLD handler which can conflict with e.g. the glib one
345        // which may also be in process.
346        // xref https://github.com/tokio-rs/tokio/issues/3520#issuecomment-968985861
347        let childwait = tokio::task::spawn_blocking(move || child.wait_with_output());
348        let sockfd = Arc::new(Mutex::new(mysock));
349
350        let mut r = Self {
351            sockfd,
352            childwait: Arc::new(AsyncMutex::new(Box::pin(childwait))),
353            protover: semver::Version::new(0, 0, 0),
354        };
355
356        // Verify semantic version
357        let protover = r.impl_request::<String, _, ()>("Initialize", []).await?.0;
358        tracing::debug!("Remote protocol version: {protover}");
359        let protover = semver::Version::parse(protover.as_str())?;
360        // Previously we had a feature to opt-in to requiring newer versions using `if cfg!()`.
361        let supported = base_proto_version();
362        if !supported.matches(&protover) {
363            return Err(Error::ProxyTooOld {
364                requested_version: protover.to_string().into(),
365                found_version: supported.to_string().into(),
366            });
367        }
368        r.protover = protover;
369
370        Ok(r)
371    }
372
373    async fn impl_request_raw<T: serde::de::DeserializeOwned + Send + 'static>(
374        sockfd: Arc<Mutex<OwnedFd>>,
375        req: Request,
376    ) -> Result<(T, Option<(OwnedFd, u32)>)> {
377        tracing::trace!("sending request {}", req.method.as_str());
378        // TODO: Investigate https://crates.io/crates/uds for SOCK_SEQPACKET tokio
379        let r = tokio::task::spawn_blocking(move || {
380            let sockfd = sockfd.lock().unwrap();
381            let sendbuf = serde_json::to_vec(&req)?;
382            let sockfd = &*sockfd;
383            rustix::net::send(sockfd, &sendbuf, rustix::net::SendFlags::empty())?;
384            drop(sendbuf);
385            let mut buf = [0u8; MAX_MSG_SIZE];
386            let mut cmsg_space = vec![0; rustix::cmsg_space!(ScmRights(1))];
387            let mut cmsg_buffer = rustix::net::RecvAncillaryBuffer::new(&mut cmsg_space);
388            let iov = std::io::IoSliceMut::new(buf.as_mut());
389            let mut iov = [iov];
390            let nread = rustix::net::recvmsg(
391                sockfd,
392                &mut iov,
393                &mut cmsg_buffer,
394                rustix::net::RecvFlags::CMSG_CLOEXEC,
395            )?
396            .bytes;
397            let fdret = cmsg_buffer
398                .drain()
399                .filter_map(|m| match m {
400                    rustix::net::RecvAncillaryMessage::ScmRights(f) => Some(f),
401                    _ => None,
402                })
403                .flatten()
404                .next();
405            let buf = &buf[..nread];
406            let reply: Reply = serde_json::from_slice(buf)?;
407            if !reply.success {
408                return Err(Error::RequestInitiationFailure {
409                    method: req.method.clone().into(),
410                    error: reply.error.into(),
411                });
412            }
413            let fdret = match (fdret, reply.pipeid) {
414                (Some(fd), n) => {
415                    if n == 0 {
416                        return Err(Error::Other("got fd but no pipeid".into()));
417                    }
418                    Some((fd, n))
419                }
420                (None, n) => {
421                    if n != 0 {
422                        return Err(Error::Other(format!("got no fd with pipeid {}", n).into()));
423                    }
424                    None
425                }
426            };
427            let reply = serde_json::from_value(reply.value)?;
428            Ok((reply, fdret))
429        })
430        .await
431        .map_err(|e| Error::Other(e.to_string().into()))??;
432        tracing::trace!("completed request");
433        Ok(r)
434    }
435
436    #[instrument(skip(args))]
437    async fn impl_request<R: serde::de::DeserializeOwned + Send + 'static, T, I>(
438        &self,
439        method: &str,
440        args: T,
441    ) -> Result<(R, Option<(OwnedFd, u32)>)>
442    where
443        T: IntoIterator<Item = I>,
444        I: Into<serde_json::Value>,
445    {
446        let req = Self::impl_request_raw(Arc::clone(&self.sockfd), Request::new(method, args));
447        let mut childwait = self.childwait.lock().await;
448        tokio::select! {
449            r = req => {
450                r.map_err(|e| Error::RequestInitiationFailure {
451                    method: method.to_string().into(),
452                    error: e.to_string().into()
453            })
454            }
455            r = childwait.as_mut() => {
456                let r = r.map_err(|e| Error::Other(e.to_string().into()))??;
457                let stderr = String::from_utf8_lossy(&r.stderr);
458                Err(Error::Other(format!("skopeo proxy unexpectedly exited during request method {}: {}\n{}", method, r.status, stderr).into()))
459            }
460        }
461    }
462
463    #[instrument]
464    async fn finish_pipe(&self, pipeid: u32) -> Result<()> {
465        tracing::debug!("closing pipe");
466        let (r, fd) = self.impl_request("FinishPipe", [pipeid]).await?;
467        if fd.is_some() {
468            return Err(Error::Other("Unexpected fd in finish_pipe reply".into()));
469        }
470        Ok(r)
471    }
472
473    #[instrument]
474    pub async fn open_image(&self, imgref: &str) -> Result<OpenedImage> {
475        tracing::debug!("opening image");
476        let (imgid, _) = self
477            .impl_request::<u32, _, _>("OpenImage", [imgref])
478            .await?;
479        Ok(OpenedImage(imgid))
480    }
481
482    #[instrument]
483    pub async fn open_image_optional(&self, imgref: &str) -> Result<Option<OpenedImage>> {
484        tracing::debug!("opening image");
485        let (imgid, _) = self
486            .impl_request::<u32, _, _>("OpenImageOptional", [imgref])
487            .await?;
488        if imgid == 0 {
489            Ok(None)
490        } else {
491            Ok(Some(OpenedImage(imgid)))
492        }
493    }
494
495    #[instrument]
496    pub async fn close_image(&self, img: &OpenedImage) -> Result<()> {
497        tracing::debug!("closing image");
498        let (r, _) = self.impl_request("CloseImage", [img.0]).await?;
499        Ok(r)
500    }
501
502    async fn read_all_fd(&self, fd: Option<(OwnedFd, u32)>) -> Result<Vec<u8>> {
503        let (fd, pipeid) = fd.ok_or_else(|| Error::Other("Missing fd from reply".into()))?;
504        let fd = tokio::fs::File::from_std(std::fs::File::from(fd));
505        let mut fd = tokio::io::BufReader::new(fd);
506        let mut r = Vec::new();
507        let reader = fd.read_to_end(&mut r);
508        let (nbytes, finish) = tokio::join!(reader, self.finish_pipe(pipeid));
509        finish?;
510        assert_eq!(nbytes?, r.len());
511        Ok(r)
512    }
513
514    /// Fetch the manifest as raw bytes, converted to OCI if necessary.
515    /// The original digest of the unconverted manifest is also returned.
516    /// For more information on OCI manifests, see <https://github.com/opencontainers/image-spec/blob/main/manifest.md>
517    pub async fn fetch_manifest_raw_oci(&self, img: &OpenedImage) -> Result<(String, Vec<u8>)> {
518        let (digest, fd) = self.impl_request("GetManifest", [img.0]).await?;
519        Ok((digest, self.read_all_fd(fd).await?))
520    }
521
522    /// Fetch the manifest.
523    /// For more information on OCI manifests, see <https://github.com/opencontainers/image-spec/blob/main/manifest.md>
524    pub async fn fetch_manifest(
525        &self,
526        img: &OpenedImage,
527    ) -> Result<(String, oci_spec::image::ImageManifest)> {
528        let (digest, raw) = self.fetch_manifest_raw_oci(img).await?;
529        let manifest = serde_json::from_slice(&raw)?;
530        Ok((digest, manifest))
531    }
532
533    /// Fetch the config.
534    /// For more information on OCI config, see <https://github.com/opencontainers/image-spec/blob/main/config.md>
535    pub async fn fetch_config_raw(&self, img: &OpenedImage) -> Result<Vec<u8>> {
536        let (_, fd) = self
537            .impl_request::<(), _, _>("GetFullConfig", [img.0])
538            .await?;
539        self.read_all_fd(fd).await
540    }
541
542    /// Fetch the config.
543    /// For more information on OCI config, see <https://github.com/opencontainers/image-spec/blob/main/config.md>
544    pub async fn fetch_config(
545        &self,
546        img: &OpenedImage,
547    ) -> Result<oci_spec::image::ImageConfiguration> {
548        let raw = self.fetch_config_raw(img).await?;
549        serde_json::from_slice(&raw).map_err(Into::into)
550    }
551
552    /// Fetch a blob identified by e.g. `sha256:<digest>`.
553    /// <https://github.com/opencontainers/image-spec/blob/main/descriptor.md>
554    ///
555    /// The requested size and digest are verified (by the proxy process).
556    ///
557    /// Note that because of the implementation details of this function, you should
558    /// [`futures::join!`] the returned futures instead of polling one after the other. The
559    /// secondary "driver" future will only return once everything has been read from
560    /// the reader future.
561    #[instrument]
562    pub async fn get_blob(
563        &self,
564        img: &OpenedImage,
565        digest: &Digest,
566        size: u64,
567    ) -> Result<(
568        impl AsyncBufRead + Send + Unpin,
569        impl Future<Output = Result<()>> + Unpin + '_,
570    )> {
571        // For previous discussion on digest/size verification, see
572        // https://github.com/cgwalters/container-image-proxy/issues/1#issuecomment-926712009
573        tracing::debug!("fetching blob");
574        let args: Vec<serde_json::Value> =
575            vec![img.0.into(), digest.to_string().into(), size.into()];
576        let (_bloblen, fd) = self.impl_request::<i64, _, _>("GetBlob", args).await?;
577        let (fd, pipeid) = fd.ok_or_else(|| Error::new_other("Missing fd from reply"))?;
578        let fd = tokio::fs::File::from_std(std::fs::File::from(fd));
579        let fd = tokio::io::BufReader::new(fd);
580        let finish = Box::pin(self.finish_pipe(pipeid));
581        Ok((fd, finish))
582    }
583
584    /// Fetch a descriptor. The requested size and digest are verified (by the proxy process).
585    #[instrument]
586    pub async fn get_descriptor(
587        &self,
588        img: &OpenedImage,
589        descriptor: &Descriptor,
590    ) -> Result<(
591        impl AsyncBufRead + Send + Unpin,
592        impl Future<Output = Result<()>> + Unpin + '_,
593    )> {
594        self.get_blob(img, descriptor.digest(), descriptor.size())
595            .await
596    }
597
598    ///Returns data that can be used to find the "diffid" corresponding to a particular layer.
599    #[instrument]
600    pub async fn get_layer_info(
601        &self,
602        img: &OpenedImage,
603    ) -> Result<Option<Vec<ConvertedLayerInfo>>> {
604        tracing::debug!("Getting layer info");
605        if layer_info_piped_proto_version().matches(&self.protover) {
606            let (_, fd) = self
607                .impl_request::<(), _, _>("GetLayerInfoPiped", [img.0])
608                .await?;
609            let buf = self.read_all_fd(fd).await?;
610            return Ok(Some(serde_json::from_slice(&buf)?));
611        }
612        if !layer_info_proto_version().matches(&self.protover) {
613            return Ok(None);
614        }
615        let reply = self.impl_request("GetLayerInfo", [img.0]).await?;
616        let layers: Vec<ConvertedLayerInfo> = reply.0;
617        Ok(Some(layers))
618    }
619
620    /// Close the connection and wait for the child process to exit successfully.
621    #[instrument]
622    pub async fn finalize(self) -> Result<()> {
623        let _ = &self;
624        let req = Request::new_bare("Shutdown");
625        let sendbuf = serde_json::to_vec(&req)?;
626        // SAFETY: Only panics if a worker thread already panic'd
627        let sockfd = Arc::try_unwrap(self.sockfd).unwrap().into_inner().unwrap();
628        rustix::net::send(sockfd, &sendbuf, rustix::net::SendFlags::empty())?;
629        drop(sendbuf);
630        tracing::debug!("sent shutdown request");
631        let mut childwait = self.childwait.lock().await;
632        let output = childwait
633            .as_mut()
634            .await
635            .map_err(|e| Error::new_other(e.to_string()))??;
636        if !output.status.success() {
637            let stderr = String::from_utf8_lossy(&output.stderr);
638            return Err(Error::RequestReturned(
639                format!("proxy failed: {}\n{}", output.status, stderr).into(),
640            ));
641        }
642        tracing::debug!("proxy exited successfully");
643        Ok(())
644    }
645}
646
647#[cfg(test)]
648mod tests {
649    use std::io::{Seek, Write};
650
651    use super::*;
652
653    fn validate(c: Command, contains: &[&str], not_contains: &[&str]) {
654        // Format via debug, because
655        // https://doc.rust-lang.org/std/process/struct.Command.html#method.get_args
656        // is experimental
657        let d = format!("{:?}", c);
658        for c in contains {
659            assert!(d.contains(c), "{} missing {}", d, c);
660        }
661        for c in not_contains {
662            assert!(!d.contains(c), "{} should not contain {}", d, c);
663        }
664    }
665
666    #[test]
667    fn proxy_configs() {
668        let tmpd = &cap_tempfile::tempdir(cap_std::ambient_authority()).unwrap();
669
670        let c = Command::try_from(ImageProxyConfig::default()).unwrap();
671        validate(
672            c,
673            &["experimental-image-proxy"],
674            &["--no-creds", "--tls-verify", "--authfile"],
675        );
676
677        let c = Command::try_from(ImageProxyConfig {
678            authfile: Some(PathBuf::from("/path/to/authfile")),
679            ..Default::default()
680        })
681        .unwrap();
682        validate(c, &[r"--authfile", "/path/to/authfile"], &[]);
683
684        let decryption_key_path = "/path/to/decryption_key";
685        let c = Command::try_from(ImageProxyConfig {
686            decryption_keys: Some(vec![decryption_key_path.to_string()]),
687            ..Default::default()
688        })
689        .unwrap();
690        validate(c, &[r"--decryption-key", "/path/to/decryption_key"], &[]);
691
692        let c = Command::try_from(ImageProxyConfig {
693            certificate_directory: Some(PathBuf::from("/path/to/certs")),
694            ..Default::default()
695        })
696        .unwrap();
697        validate(c, &[r"--cert-dir", "/path/to/certs"], &[]);
698
699        let c = Command::try_from(ImageProxyConfig {
700            insecure_skip_tls_verification: Some(true),
701            ..Default::default()
702        })
703        .unwrap();
704        validate(c, &[r"--tls-verify=false"], &[]);
705
706        let mut tmpf = cap_tempfile::TempFile::new_anonymous(tmpd).unwrap();
707        tmpf.write_all(r#"{ "auths": {} "#.as_bytes()).unwrap();
708        tmpf.seek(std::io::SeekFrom::Start(0)).unwrap();
709        let c = Command::try_from(ImageProxyConfig {
710            auth_data: Some(tmpf.into_std()),
711            ..Default::default()
712        })
713        .unwrap();
714        validate(c, &["--authfile", "/proc/self/fd/100"], &[]);
715    }
716
717    #[tokio::test]
718    async fn skopeo_not_found() {
719        let mut config = ImageProxyConfig {
720            ..ImageProxyConfig::default()
721        };
722        config.skopeo_cmd = Some(Command::new("no-skopeo"));
723
724        match ImageProxy::new_with_config(config).await {
725            Ok(_) => panic!("Expected an error"),
726            Err(ref e @ Error::SkopeoSpawnError(ref inner)) => {
727                assert_eq!(inner.kind(), std::io::ErrorKind::NotFound);
728                // Just to double check
729                assert!(e
730                    .to_string()
731                    .contains("skopeo spawn error: No such file or directory"));
732            }
733            Err(e) => panic!("Unexpected error {e}"),
734        }
735    }
736
737    #[tokio::test]
738    async fn test_proxy_send_sync() {
739        fn assert_send_sync(_x: impl Send + Sync) {}
740
741        let Ok(proxy) = ImageProxy::new().await else {
742            // doesn't matter: we only actually care to test if this compiles
743            return;
744        };
745        assert_send_sync(&proxy);
746        assert_send_sync(proxy);
747
748        let opened = OpenedImage(0);
749        assert_send_sync(&opened);
750        assert_send_sync(opened);
751    }
752}