1use cap_std_ext::prelude::CapStdExtCommandExt;
8use cap_std_ext::{cap_std, cap_tempfile};
9use futures_util::{Future, FutureExt};
10use itertools::Itertools;
11use oci_spec::image::{Descriptor, Digest};
12use serde::{Deserialize, Serialize};
13use std::fs::File;
14use std::iter::FusedIterator;
15use std::num::NonZeroU32;
16use std::ops::Range;
17use std::os::fd::OwnedFd;
18use std::os::unix::prelude::CommandExt;
19use std::path::PathBuf;
20use std::pin::Pin;
21use std::process::{Command, Stdio};
22use std::sync::{Arc, Mutex, OnceLock};
23use thiserror::Error;
24use tokio::io::{AsyncBufRead, AsyncReadExt};
25use tokio::sync::Mutex as AsyncMutex;
26use tokio::task::JoinError;
27use tracing::instrument;
28
29#[derive(Error, Debug)]
31#[non_exhaustive]
32pub enum Error {
33 #[error("i/o error")]
34 Io(#[from] std::io::Error),
36 #[error("skopeo spawn error: {}", .0)]
37 SkopeoSpawnError(#[source] std::io::Error),
39 #[error("serialization error")]
40 SerDe(#[from] serde_json::Error),
42 #[error("failed to invoke method {method}: {error}")]
44 RequestInitiationFailure { method: Box<str>, error: Box<str> },
45 #[error("proxy request returned error")]
47 RequestReturned(Box<str>),
48 #[error("semantic version error")]
49 SemanticVersion(#[from] semver::Error),
50 #[error("proxy too old (requested={requested_version} found={found_version}) error")]
51 ProxyTooOld {
53 requested_version: Box<str>,
54 found_version: Box<str>,
55 },
56 #[error("configuration error")]
57 Configuration(Box<str>),
59 #[error("error")]
60 Other(Box<str>),
62}
63
64impl Error {
65 pub(crate) fn new_other(e: impl Into<Box<str>>) -> Self {
66 Self::Other(e.into())
67 }
68}
69
70#[derive(Error, Debug)]
72#[non_exhaustive]
73pub enum GetBlobError {
74 #[error("retryable error")]
76 Retryable(Box<str>),
77 #[error("error")]
78 Other(Box<str>),
80}
81
82impl From<rustix::io::Errno> for Error {
83 fn from(value: rustix::io::Errno) -> Self {
84 Self::Io(value.into())
85 }
86}
87
88pub type Result<T> = std::result::Result<T, Error>;
90
91pub use oci_spec;
93
94pub const RESERVED_FD_RANGE: Range<i32> = 100..200;
98
99const MAX_MSG_SIZE: usize = 32 * 1024;
102
103fn base_proto_version() -> &'static semver::VersionReq {
104 static BASE_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
106 BASE_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.3").unwrap())
107}
108
109fn layer_info_proto_version() -> &'static semver::VersionReq {
110 static LAYER_INFO_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
111 LAYER_INFO_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.5").unwrap())
112}
113
114fn layer_info_piped_proto_version() -> &'static semver::VersionReq {
115 static LAYER_INFO_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
116 LAYER_INFO_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.7").unwrap())
117}
118
119#[derive(Serialize)]
120struct Request {
121 method: String,
122 args: Vec<serde_json::Value>,
123}
124
125impl Request {
126 fn new<T, I>(method: &str, args: T) -> Self
127 where
128 T: IntoIterator<Item = I>,
129 I: Into<serde_json::Value>,
130 {
131 let args: Vec<_> = args.into_iter().map(|v| v.into()).collect();
132 Self {
133 method: method.to_string(),
134 args,
135 }
136 }
137
138 fn new_bare(method: &str) -> Self {
139 Self {
140 method: method.to_string(),
141 args: vec![],
142 }
143 }
144}
145
146#[derive(Deserialize)]
147struct Reply {
148 success: bool,
149 error: String,
150 pipeid: u32,
151 value: serde_json::Value,
152}
153
154type ChildFuture = Pin<
155 Box<
156 dyn Future<Output = std::result::Result<std::io::Result<std::process::Output>, JoinError>>
157 + Send,
158 >,
159>;
160
161pub struct ImageProxy {
163 sockfd: Arc<Mutex<OwnedFd>>,
164 childwait: Arc<AsyncMutex<ChildFuture>>,
165 protover: semver::Version,
166}
167
168impl std::fmt::Debug for ImageProxy {
169 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
170 f.debug_struct("ImageProxy").finish()
171 }
172}
173
174#[derive(Debug, PartialEq, Eq)]
176pub struct OpenedImage(u32);
177
178#[derive(Debug, PartialEq, Eq)]
179struct PipeId(NonZeroU32);
180
181impl PipeId {
182 fn try_new(pipeid: u32) -> Option<Self> {
183 Some(Self(NonZeroU32::new(pipeid)?))
184 }
185}
186
187#[derive(Debug, Default)]
189pub struct ImageProxyConfig {
190 pub authfile: Option<PathBuf>,
193
194 pub auth_data: Option<File>,
196
197 pub auth_anonymous: bool,
201
202 pub certificate_directory: Option<PathBuf>,
205
206 pub decryption_keys: Option<Vec<String>>,
209
210 pub insecure_skip_tls_verification: Option<bool>,
212
213 pub debug: bool,
217
218 pub skopeo_cmd: Option<Command>,
239}
240
241impl TryFrom<ImageProxyConfig> for Command {
242 type Error = Error;
243
244 fn try_from(config: ImageProxyConfig) -> Result<Self> {
245 let debug = config.debug || std::env::var_os("CONTAINERS_IMAGE_PROXY_DEBUG").is_some();
246 let mut allocated_fds = RESERVED_FD_RANGE.clone();
247 let mut alloc_fd = || {
248 allocated_fds.next().ok_or_else(|| {
249 Error::Other("Ran out of reserved file descriptors for child".into())
250 })
251 };
252
253 let mut c = config.skopeo_cmd.unwrap_or_else(|| {
255 let mut c = std::process::Command::new("skopeo");
256 unsafe {
257 c.pre_exec(|| {
258 rustix::process::set_parent_process_death_signal(Some(
259 rustix::process::Signal::TERM,
260 ))
261 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
262 });
263 }
264 c
265 });
266 c.arg("experimental-image-proxy");
267 if debug {
268 c.arg("--debug");
269 }
270 let auth_option_count = [
271 config.authfile.is_some(),
272 config.auth_data.is_some(),
273 config.auth_anonymous,
274 ]
275 .into_iter()
276 .filter(|&x| x)
277 .count();
278 if auth_option_count > 1 {
279 return Err(Error::Configuration(
281 "Conflicting authentication options".into(),
282 ));
283 }
284 if let Some(authfile) = config.authfile {
285 c.arg("--authfile");
286 c.arg(authfile);
287 } else if let Some(mut auth_data) = config.auth_data.map(std::io::BufReader::new) {
288 let target_fd = alloc_fd()?;
292 let tmpd = &cap_std::fs::Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
293 let mut tempfile =
294 cap_tempfile::TempFile::new_anonymous(tmpd).map(std::io::BufWriter::new)?;
295 std::io::copy(&mut auth_data, &mut tempfile)?;
296 let tempfile = tempfile
297 .into_inner()
298 .map_err(|e| e.into_error())?
299 .into_std();
300 let fd = std::sync::Arc::new(tempfile.into());
301 c.take_fd_n(fd, target_fd);
302 c.arg("--authfile");
303 c.arg(format!("/proc/self/fd/{target_fd}"));
304 } else if config.auth_anonymous {
305 c.arg("--no-creds");
306 }
307
308 if let Some(certificate_directory) = config.certificate_directory {
309 c.arg("--cert-dir");
310 c.arg(certificate_directory);
311 }
312
313 if let Some(decryption_keys) = config.decryption_keys {
314 for decryption_key in &decryption_keys {
315 c.arg("--decryption-key");
316 c.arg(decryption_key);
317 }
318 }
319
320 if config.insecure_skip_tls_verification.unwrap_or_default() {
321 c.arg("--tls-verify=false");
322 }
323 c.stdout(Stdio::null());
324 if !debug {
325 c.stderr(Stdio::piped());
326 }
327 Ok(c)
328 }
329}
330
331#[derive(Debug, serde::Deserialize)]
333pub struct ConvertedLayerInfo {
334 pub digest: Digest,
337
338 pub size: u64,
340
341 pub media_type: oci_spec::image::MediaType,
343}
344
345#[derive(Debug)]
347struct FinishPipe {
348 pipeid: PipeId,
349 datafd: OwnedFd,
350}
351
352#[derive(Debug)]
354struct DualFds {
355 datafd: OwnedFd,
356 errfd: OwnedFd,
357}
358
359trait FromReplyFds: Send + 'static
361where
362 Self: Sized,
363{
364 fn from_reply(
365 iterable: impl IntoIterator<IntoIter: FusedIterator, Item = OwnedFd>,
366 pipeid: u32,
367 ) -> Result<Self>;
368}
369
370impl FromReplyFds for () {
372 fn from_reply(fds: impl IntoIterator<Item = OwnedFd>, pipeid: u32) -> Result<Self> {
373 if fds.into_iter().next().is_some() {
374 return Err(Error::Other("expected no fds".into()));
375 }
376 if pipeid != 0 {
377 return Err(Error::Other("unexpected pipeid".into()));
378 }
379 Ok(())
380 }
381}
382
383impl FromReplyFds for FinishPipe {
385 fn from_reply(fds: impl IntoIterator<Item = OwnedFd>, pipeid: u32) -> Result<Self> {
386 let Some(pipeid) = PipeId::try_new(pipeid) else {
387 return Err(Error::Other("Expected pipeid for FinishPipe".into()));
388 };
389 let datafd = fds
390 .into_iter()
391 .exactly_one()
392 .map_err(|_| Error::Other("Expected exactly one fd for FinishPipe".into()))?;
393 Ok(Self { pipeid, datafd })
394 }
395}
396
397impl FromReplyFds for DualFds {
399 fn from_reply(fds: impl IntoIterator<Item = OwnedFd>, pipeid: u32) -> Result<Self> {
400 if pipeid != 0 {
401 return Err(Error::Other("Unexpected pipeid with DualFds".into()));
402 }
403 let [datafd, errfd] = fds
404 .into_iter()
405 .collect_array()
406 .ok_or_else(|| Error::Other("Expected two fds for DualFds".into()))?;
407 Ok(Self { datafd, errfd })
408 }
409}
410
411impl ImageProxy {
412 pub async fn new() -> Result<Self> {
414 Self::new_with_config(Default::default()).await
415 }
416
417 #[instrument]
419 pub async fn new_with_config(config: ImageProxyConfig) -> Result<Self> {
420 let mut c = Command::try_from(config)?;
421 let (mysock, theirsock) = rustix::net::socketpair(
422 rustix::net::AddressFamily::UNIX,
423 rustix::net::SocketType::SEQPACKET,
424 rustix::net::SocketFlags::CLOEXEC,
425 None,
426 )?;
427 c.stdin(Stdio::from(theirsock));
428 let child = match c.spawn() {
429 Ok(c) => c,
430 Err(error) => return Err(Error::SkopeoSpawnError(error)),
431 };
432 tracing::debug!("Spawned skopeo pid={:?}", child.id());
433 let childwait = tokio::task::spawn_blocking(move || child.wait_with_output());
438 let sockfd = Arc::new(Mutex::new(mysock));
439
440 let mut r = Self {
441 sockfd,
442 childwait: Arc::new(AsyncMutex::new(Box::pin(childwait))),
443 protover: semver::Version::new(0, 0, 0),
444 };
445
446 let protover: String = r.impl_request("Initialize", [(); 0]).await?;
448 tracing::debug!("Remote protocol version: {protover}");
449 let protover = semver::Version::parse(protover.as_str())?;
450 let supported = base_proto_version();
452 if !supported.matches(&protover) {
453 return Err(Error::ProxyTooOld {
454 requested_version: protover.to_string().into(),
455 found_version: supported.to_string().into(),
456 });
457 }
458 r.protover = protover;
459
460 Ok(r)
461 }
462
463 async fn impl_request_raw<T: serde::de::DeserializeOwned + Send + 'static, F: FromReplyFds>(
465 sockfd: Arc<Mutex<OwnedFd>>,
466 req: Request,
467 ) -> Result<(T, F)> {
468 tracing::trace!("sending request {}", req.method.as_str());
469 let r = tokio::task::spawn_blocking(move || {
471 let sockfd = sockfd.lock().unwrap();
472 let sendbuf = serde_json::to_vec(&req)?;
473 let sockfd = &*sockfd;
474 rustix::net::send(sockfd, &sendbuf, rustix::net::SendFlags::empty())?;
475 drop(sendbuf);
476 let mut buf = [0u8; MAX_MSG_SIZE];
477 let mut cmsg_space: Vec<std::mem::MaybeUninit<u8>> =
478 vec![std::mem::MaybeUninit::uninit(); rustix::cmsg_space!(ScmRights(1))];
479 let mut cmsg_buffer = rustix::net::RecvAncillaryBuffer::new(cmsg_space.as_mut_slice());
480 let iov = std::io::IoSliceMut::new(buf.as_mut());
481 let mut iov = [iov];
482 let nread = rustix::net::recvmsg(
483 sockfd,
484 &mut iov,
485 &mut cmsg_buffer,
486 rustix::net::RecvFlags::CMSG_CLOEXEC,
487 )?
488 .bytes;
489 let fdret = cmsg_buffer
490 .drain()
491 .filter_map(|m| match m {
492 rustix::net::RecvAncillaryMessage::ScmRights(f) => Some(f),
493 _ => None,
494 })
495 .flatten();
496 let buf = &buf[..nread];
497 let reply: Reply = serde_json::from_slice(buf)?;
498 if !reply.success {
499 return Err(Error::RequestInitiationFailure {
500 method: req.method.clone().into(),
501 error: reply.error.into(),
502 });
503 }
504 let fds = FromReplyFds::from_reply(fdret, reply.pipeid)?;
505 Ok((serde_json::from_value(reply.value)?, fds))
506 })
507 .await
508 .map_err(|e| Error::Other(e.to_string().into()))??;
509 tracing::trace!("completed request");
510 Ok(r)
511 }
512
513 #[instrument(skip(args))]
516 async fn impl_request_with_fds<
517 T: serde::de::DeserializeOwned + Send + 'static,
518 F: FromReplyFds,
519 >(
520 &self,
521 method: &str,
522 args: impl IntoIterator<Item = impl Into<serde_json::Value>>,
523 ) -> Result<(T, F)> {
524 let req = Self::impl_request_raw(Arc::clone(&self.sockfd), Request::new(method, args));
525 let mut childwait = self.childwait.lock().await;
526 tokio::select! {
527 r = req => { r }
528 r = childwait.as_mut() => {
529 let r = r.map_err(|e| Error::Other(e.to_string().into()))??;
530 let stderr = String::from_utf8_lossy(&r.stderr);
531 Err(Error::Other(format!("skopeo proxy unexpectedly exited during request method {}: {}\n{}", method, r.status, stderr).into()))
532 }
533 }
534 }
535
536 async fn impl_request<T: serde::de::DeserializeOwned + Send + 'static>(
538 &self,
539 method: &str,
540 args: impl IntoIterator<Item = impl Into<serde_json::Value>>,
541 ) -> Result<T> {
542 let (r, ()) = self.impl_request_with_fds(method, args).await?;
543 Ok(r)
544 }
545
546 #[instrument]
547 async fn finish_pipe(&self, pipeid: PipeId) -> Result<()> {
548 tracing::debug!("closing pipe");
549 let (r, ()) = self
550 .impl_request_with_fds("FinishPipe", [pipeid.0.get()])
551 .await?;
552 Ok(r)
553 }
554
555 #[instrument]
556 pub async fn open_image(&self, imgref: &str) -> Result<OpenedImage> {
557 tracing::debug!("opening image");
558 let imgid = self.impl_request("OpenImage", [imgref]).await?;
559 Ok(OpenedImage(imgid))
560 }
561
562 #[instrument]
563 pub async fn open_image_optional(&self, imgref: &str) -> Result<Option<OpenedImage>> {
564 tracing::debug!("opening image");
565 let imgid = self.impl_request("OpenImageOptional", [imgref]).await?;
566 if imgid == 0 {
567 Ok(None)
568 } else {
569 Ok(Some(OpenedImage(imgid)))
570 }
571 }
572
573 #[instrument]
574 pub async fn close_image(&self, img: &OpenedImage) -> Result<()> {
575 self.impl_request("CloseImage", [img.0]).await
576 }
577
578 async fn read_finish_pipe(&self, pipe: FinishPipe) -> Result<Vec<u8>> {
579 let fd = tokio::fs::File::from_std(std::fs::File::from(pipe.datafd));
580 let mut fd = tokio::io::BufReader::new(fd);
581 let mut r = Vec::new();
582 let reader = fd.read_to_end(&mut r);
583 let (nbytes, finish) = tokio::join!(reader, self.finish_pipe(pipe.pipeid));
584 finish?;
585 assert_eq!(nbytes?, r.len());
586 Ok(r)
587 }
588
589 pub async fn fetch_manifest_raw_oci(&self, img: &OpenedImage) -> Result<(String, Vec<u8>)> {
593 let (digest, pipefd) = self.impl_request_with_fds("GetManifest", [img.0]).await?;
594 Ok((digest, self.read_finish_pipe(pipefd).await?))
595 }
596
597 pub async fn fetch_manifest(
600 &self,
601 img: &OpenedImage,
602 ) -> Result<(String, oci_spec::image::ImageManifest)> {
603 let (digest, raw) = self.fetch_manifest_raw_oci(img).await?;
604 let manifest = serde_json::from_slice(&raw)?;
605 Ok((digest, manifest))
606 }
607
608 pub async fn fetch_config_raw(&self, img: &OpenedImage) -> Result<Vec<u8>> {
611 let ((), pipe) = self.impl_request_with_fds("GetFullConfig", [img.0]).await?;
612 self.read_finish_pipe(pipe).await
613 }
614
615 pub async fn fetch_config(
618 &self,
619 img: &OpenedImage,
620 ) -> Result<oci_spec::image::ImageConfiguration> {
621 let raw = self.fetch_config_raw(img).await?;
622 serde_json::from_slice(&raw).map_err(Into::into)
623 }
624
625 #[instrument]
635 pub async fn get_blob(
636 &self,
637 img: &OpenedImage,
638 digest: &Digest,
639 size: u64,
640 ) -> Result<(
641 impl AsyncBufRead + Send + Unpin,
642 impl Future<Output = Result<()>> + Unpin + '_,
643 )> {
644 tracing::debug!("fetching blob");
647 let args: Vec<serde_json::Value> =
648 vec![img.0.into(), digest.to_string().into(), size.into()];
649 let (_bloblen, pipe): (serde_json::Number, FinishPipe) =
654 self.impl_request_with_fds("GetBlob", args).await?;
655 let fd = tokio::fs::File::from_std(std::fs::File::from(pipe.datafd));
656 let fd = tokio::io::BufReader::new(fd);
657 let finish = Box::pin(self.finish_pipe(pipe.pipeid));
658 Ok((fd, finish))
659 }
660
661 async fn read_blob_error(fd: OwnedFd) -> std::result::Result<(), GetBlobError> {
662 let fd = tokio::fs::File::from_std(std::fs::File::from(fd));
663 let mut errfd = tokio::io::BufReader::new(fd);
664 let mut buf = Vec::new();
665 errfd
666 .read_to_end(&mut buf)
667 .await
668 .map_err(|e| GetBlobError::Other(e.to_string().into_boxed_str()))?;
669 if buf.is_empty() {
670 return Ok(());
671 }
672 #[derive(Deserialize)]
673 struct RemoteError {
674 code: String,
675 message: String,
676 }
677 let e: RemoteError = serde_json::from_slice(&buf)
678 .map_err(|e| GetBlobError::Other(e.to_string().into_boxed_str()))?;
679 match e.code.as_str() {
680 "EPIPE" => Ok(()),
682 "retryable" => Err(GetBlobError::Retryable(e.message.into_boxed_str())),
683 _ => Err(GetBlobError::Other(e.message.into_boxed_str())),
684 }
685 }
686
687 #[instrument]
691 pub async fn get_raw_blob(
692 &self,
693 img: &OpenedImage,
694 digest: &Digest,
695 ) -> Result<(
696 Option<u64>,
697 tokio::fs::File,
698 impl Future<Output = std::result::Result<(), GetBlobError>> + Unpin + '_,
699 )> {
700 tracing::debug!("fetching blob");
701 let args: Vec<serde_json::Value> = vec![img.0.into(), digest.to_string().into()];
702 let (bloblen, fds): (i64, DualFds) = self.impl_request_with_fds("GetRawBlob", args).await?;
703 let bloblen = u64::try_from(bloblen).ok();
705 let fd = tokio::fs::File::from_std(std::fs::File::from(fds.datafd));
706 let err = Self::read_blob_error(fds.errfd).boxed();
707 Ok((bloblen, fd, err))
708 }
709
710 #[instrument]
712 pub async fn get_descriptor(
713 &self,
714 img: &OpenedImage,
715 descriptor: &Descriptor,
716 ) -> Result<(
717 impl AsyncBufRead + Send + Unpin,
718 impl Future<Output = Result<()>> + Unpin + '_,
719 )> {
720 self.get_blob(img, descriptor.digest(), descriptor.size())
721 .await
722 }
723
724 #[instrument]
726 pub async fn get_layer_info(
727 &self,
728 img: &OpenedImage,
729 ) -> Result<Option<Vec<ConvertedLayerInfo>>> {
730 tracing::debug!("Getting layer info");
731 if layer_info_piped_proto_version().matches(&self.protover) {
732 let ((), pipe) = self
733 .impl_request_with_fds("GetLayerInfoPiped", [img.0])
734 .await?;
735 let buf = self.read_finish_pipe(pipe).await?;
736 return Ok(Some(serde_json::from_slice(&buf)?));
737 }
738 if !layer_info_proto_version().matches(&self.protover) {
739 return Ok(None);
740 }
741 let layers = self.impl_request("GetLayerInfo", [img.0]).await?;
742 Ok(Some(layers))
743 }
744
745 #[instrument]
747 pub async fn finalize(self) -> Result<()> {
748 let _ = &self;
749 let req = Request::new_bare("Shutdown");
750 let sendbuf = serde_json::to_vec(&req)?;
751 let sockfd = Arc::try_unwrap(self.sockfd).unwrap().into_inner().unwrap();
753 rustix::net::send(sockfd, &sendbuf, rustix::net::SendFlags::empty())?;
754 drop(sendbuf);
755 tracing::debug!("sent shutdown request");
756 let mut childwait = self.childwait.lock().await;
757 let output = childwait
758 .as_mut()
759 .await
760 .map_err(|e| Error::new_other(e.to_string()))??;
761 if !output.status.success() {
762 let stderr = String::from_utf8_lossy(&output.stderr);
763 return Err(Error::RequestReturned(
764 format!("proxy failed: {}\n{}", output.status, stderr).into(),
765 ));
766 }
767 tracing::debug!("proxy exited successfully");
768 Ok(())
769 }
770}
771
772#[cfg(test)]
773mod tests {
774 use std::io::{BufWriter, Seek, Write};
775 use std::os::fd::{AsRawFd, OwnedFd};
776
777 use super::*;
778 use cap_std_ext::cap_std::fs::Dir;
779 use rustix::fs::{memfd_create, MemfdFlags};
780
781 fn check_skopeo() -> bool {
783 static HAVE_SKOPEO: OnceLock<bool> = OnceLock::new();
784 *HAVE_SKOPEO.get_or_init(|| {
785 Command::new("skopeo")
786 .arg("--help")
787 .stdout(Stdio::null())
788 .stderr(Stdio::null())
789 .status()
790 .is_ok()
791 })
792 }
793
794 fn validate(c: Command, contains: &[&str], not_contains: &[&str]) {
795 let d = format!("{:?}", c);
799 for c in contains {
800 assert!(d.contains(c), "{} missing {}", d, c);
801 }
802 for c in not_contains {
803 assert!(!d.contains(c), "{} should not contain {}", d, c);
804 }
805 }
806
807 #[test]
808 fn proxy_configs() {
809 let tmpd = &cap_tempfile::tempdir(cap_std::ambient_authority()).unwrap();
810
811 let c = Command::try_from(ImageProxyConfig::default()).unwrap();
812 validate(
813 c,
814 &["experimental-image-proxy"],
815 &["--no-creds", "--tls-verify", "--authfile"],
816 );
817
818 let c = Command::try_from(ImageProxyConfig {
819 authfile: Some(PathBuf::from("/path/to/authfile")),
820 ..Default::default()
821 })
822 .unwrap();
823 validate(c, &[r"--authfile", "/path/to/authfile"], &[]);
824
825 let decryption_key_path = "/path/to/decryption_key";
826 let c = Command::try_from(ImageProxyConfig {
827 decryption_keys: Some(vec![decryption_key_path.to_string()]),
828 ..Default::default()
829 })
830 .unwrap();
831 validate(c, &[r"--decryption-key", "/path/to/decryption_key"], &[]);
832
833 let c = Command::try_from(ImageProxyConfig {
834 certificate_directory: Some(PathBuf::from("/path/to/certs")),
835 ..Default::default()
836 })
837 .unwrap();
838 validate(c, &[r"--cert-dir", "/path/to/certs"], &[]);
839
840 let c = Command::try_from(ImageProxyConfig {
841 insecure_skip_tls_verification: Some(true),
842 ..Default::default()
843 })
844 .unwrap();
845 validate(c, &[r"--tls-verify=false"], &[]);
846
847 let mut tmpf = cap_tempfile::TempFile::new_anonymous(tmpd).unwrap();
848 tmpf.write_all(r#"{ "auths": {} "#.as_bytes()).unwrap();
849 tmpf.seek(std::io::SeekFrom::Start(0)).unwrap();
850 let c = Command::try_from(ImageProxyConfig {
851 auth_data: Some(tmpf.into_std()),
852 ..Default::default()
853 })
854 .unwrap();
855 validate(c, &["--authfile", "/proc/self/fd/100"], &[]);
856 }
857
858 #[tokio::test]
859 async fn skopeo_not_found() {
860 let mut config = ImageProxyConfig {
861 ..ImageProxyConfig::default()
862 };
863 config.skopeo_cmd = Some(Command::new("no-skopeo"));
864
865 match ImageProxy::new_with_config(config).await {
866 Ok(_) => panic!("Expected an error"),
867 Err(ref e @ Error::SkopeoSpawnError(ref inner)) => {
868 assert_eq!(inner.kind(), std::io::ErrorKind::NotFound);
869 assert!(e
871 .to_string()
872 .contains("skopeo spawn error: No such file or directory"));
873 }
874 Err(e) => panic!("Unexpected error {e}"),
875 }
876 }
877
878 #[tokio::test]
879 async fn test_proxy_send_sync() {
880 fn assert_send_sync(_x: impl Send + Sync) {}
881
882 let Ok(proxy) = ImageProxy::new().await else {
883 return;
885 };
886 assert_send_sync(&proxy);
887 assert_send_sync(proxy);
888
889 let opened = OpenedImage(0);
890 assert_send_sync(&opened);
891 assert_send_sync(opened);
892 }
893
894 fn generate_err_fd(v: serde_json::Value) -> Result<OwnedFd> {
895 let tmp = Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
896 let mut tf = cap_tempfile::TempFile::new_anonymous(&tmp).map(BufWriter::new)?;
897 serde_json::to_writer(&mut tf, &v)?;
898 let mut tf = tf.into_inner().map_err(|e| e.into_error())?;
899 tf.seek(std::io::SeekFrom::Start(0))?;
900 let r = tf.into_std().into();
901 Ok(r)
902 }
903
904 #[tokio::test]
905 async fn test_read_blob_error_retryable() -> Result<()> {
906 let retryable = serde_json::json!({
907 "code": "retryable",
908 "message": "foo",
909 });
910 let retryable = generate_err_fd(retryable)?;
911 let err = ImageProxy::read_blob_error(retryable).boxed();
912 let e = err.await.unwrap_err();
913 match e {
914 GetBlobError::Retryable(s) => assert_eq!(s.as_ref(), "foo"),
915 _ => panic!("Unexpected error {e:?}"),
916 }
917 Ok(())
918 }
919
920 #[tokio::test]
921 async fn test_read_blob_error_none() -> Result<()> {
922 let tmp = Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
923 let tf = cap_tempfile::TempFile::new_anonymous(&tmp)?.into_std();
924 let err = ImageProxy::read_blob_error(tf.into()).boxed();
925 err.await.unwrap();
926 Ok(())
927 }
928
929 #[tokio::test]
930 async fn test_read_blob_error_other() -> Result<()> {
931 let other = serde_json::json!({
932 "code": "other",
933 "message": "bar",
934 });
935 let other = generate_err_fd(other)?;
936 let err = ImageProxy::read_blob_error(other).boxed();
937 let e = err.await.unwrap_err();
938 match e {
939 GetBlobError::Other(s) => assert_eq!(s.as_ref(), "bar"),
940 _ => panic!("Unexpected error {e:?}"),
941 }
942 Ok(())
943 }
944
945 #[tokio::test]
946 async fn test_read_blob_error_epipe() -> Result<()> {
947 let epipe = serde_json::json!({
948 "code": "EPIPE",
949 "message": "baz",
950 });
951 let epipe = generate_err_fd(epipe)?;
952 let err = ImageProxy::read_blob_error(epipe).boxed();
953 err.await.unwrap();
954 Ok(())
955 }
956
957 fn create_dummy_fd() -> OwnedFd {
959 memfd_create(c"test-fd", MemfdFlags::CLOEXEC).unwrap()
960 }
961
962 #[test]
963 fn test_new_from_raw_values_finish_pipe() {
964 let datafd = create_dummy_fd();
965 let raw_datafd_val = datafd.as_raw_fd();
967 let fds = vec![datafd];
968 let v = FinishPipe::from_reply(fds, 1).unwrap();
969 assert_eq!(v.pipeid.0.get(), 1);
970 assert_eq!(v.datafd.as_raw_fd(), raw_datafd_val);
971 }
972
973 #[test]
974 fn test_new_from_raw_values_dual_fds() {
975 let datafd = create_dummy_fd();
976 let errfd = create_dummy_fd();
977 let raw_datafd_val = datafd.as_raw_fd();
978 let raw_errfd_val = errfd.as_raw_fd();
979 let fds = vec![datafd, errfd];
980 let v = DualFds::from_reply(fds, 0).unwrap();
981 assert_eq!(v.datafd.as_raw_fd(), raw_datafd_val);
982 assert_eq!(v.errfd.as_raw_fd(), raw_errfd_val);
983 }
984
985 #[test]
986 fn test_new_from_raw_values_error_too_many_fds() {
987 let fds = vec![create_dummy_fd(), create_dummy_fd(), create_dummy_fd()];
988 match DualFds::from_reply(fds, 0) {
989 Ok(v) => unreachable!("{v:?}"),
990 Err(Error::Other(msg)) => {
991 assert_eq!(msg.as_ref(), "Expected two fds for DualFds")
992 }
993 Err(other) => unreachable!("{other}"),
994 }
995 }
996
997 #[test]
998 fn test_new_from_raw_values_error_fd_with_zero_pipeid() {
999 let fds = vec![create_dummy_fd()];
1000 match FinishPipe::from_reply(fds, 0) {
1001 Ok(v) => unreachable!("{v:?}"),
1002 Err(Error::Other(msg)) => {
1003 assert_eq!(msg.as_ref(), "Expected pipeid for FinishPipe")
1004 }
1005 Err(other) => unreachable!("{other}"),
1006 }
1007 }
1008
1009 #[test]
1010 fn test_new_from_raw_values_error_pipeid_with_both_fds() {
1011 let fds = vec![create_dummy_fd(), create_dummy_fd()];
1012 match DualFds::from_reply(fds, 1) {
1013 Ok(v) => unreachable!("{v:?}"),
1014 Err(Error::Other(msg)) => {
1015 assert_eq!(msg.as_ref(), "Unexpected pipeid with DualFds")
1016 }
1017 Err(other) => unreachable!("{other}"),
1018 }
1019 }
1020
1021 #[test]
1022 fn test_new_from_raw_values_error_no_fd_with_pipeid() {
1023 let fds: Vec<OwnedFd> = vec![];
1024 match FinishPipe::from_reply(fds, 1) {
1025 Ok(v) => unreachable!("{v:?}"),
1026 Err(Error::Other(msg)) => {
1027 assert_eq!(msg.as_ref(), "Expected exactly one fd for FinishPipe")
1028 }
1029 Err(other) => unreachable!("{other}"),
1030 }
1031 }
1032
1033 #[tokio::test]
1034 #[ignore = "https://github.com/coreos/rpm-ostree/issues/5442"]
1035 async fn test_open_optional() -> Result<()> {
1036 if !check_skopeo() {
1037 return Ok(());
1038 }
1039
1040 let td = tempfile::tempdir()?;
1041 let td = td.path().to_str().unwrap();
1042 let proxy = ImageProxy::new().await?;
1043 let imgpath = format!("oci-archive:{td}/some-nonexistent-image.ociarchive");
1044 let img = proxy.open_image_optional(&imgpath).await.unwrap();
1045 assert!(img.is_none());
1046
1047 Ok(())
1048 }
1049}