1pub mod transport;
8
9pub use ::oci_spec;
13pub use transport::{
14 ContainersStorageRef, ImageReference, ImageReferenceError, Transport, TransportConversionError,
15};
16
17use ::oci_spec::image::{Descriptor, Digest};
18use cap_std_ext::prelude::CapStdExtCommandExt;
19use cap_std_ext::{cap_std, cap_tempfile};
20use futures_util::{Future, FutureExt};
21use itertools::Itertools;
22use serde::{Deserialize, Serialize};
23use std::fs::File;
24use std::iter::FusedIterator;
25use std::num::NonZeroU32;
26use std::ops::Range;
27use std::os::fd::OwnedFd;
28use std::os::unix::prelude::CommandExt;
29use std::path::PathBuf;
30use std::pin::Pin;
31use std::process::{Command, Stdio};
32use std::sync::{Arc, Mutex, OnceLock};
33use thiserror::Error;
34use tokio::io::{AsyncBufRead, AsyncReadExt};
35use tokio::sync::Mutex as AsyncMutex;
36use tokio::task::JoinError;
37use tracing::instrument;
38
39#[derive(Error, Debug)]
41#[non_exhaustive]
42pub enum Error {
43 #[error("i/o error: {0}")]
44 Io(#[from] std::io::Error),
46 #[error("skopeo spawn error: {0}")]
47 SkopeoSpawnError(#[source] std::io::Error),
49 #[error("serialization error: {0}")]
50 SerDe(#[from] serde_json::Error),
52 #[error("failed to invoke method {method}: {error}")]
54 RequestInitiationFailure { method: Box<str>, error: Box<str> },
55 #[error("proxy request returned error: {0}")]
57 RequestReturned(Box<str>),
58 #[error("semantic version error: {0}")]
59 SemanticVersion(#[from] semver::Error),
60 #[error("proxy too old (requested={requested_version} found={found_version}) error")]
61 ProxyTooOld {
63 requested_version: Box<str>,
64 found_version: Box<str>,
65 },
66 #[error("configuration error: {0}")]
67 Configuration(Box<str>),
69 #[error("other error: {0}")]
70 Other(Box<str>),
72}
73
74impl Error {
75 pub(crate) fn new_other(e: impl Into<Box<str>>) -> Self {
76 Self::Other(e.into())
77 }
78}
79
80#[derive(Error, Debug)]
82#[non_exhaustive]
83pub enum GetBlobError {
84 #[error("retryable error: {0}")]
86 Retryable(Box<str>),
87 #[error("other error: {0}")]
88 Other(Box<str>),
90}
91
92impl From<rustix::io::Errno> for Error {
93 fn from(value: rustix::io::Errno) -> Self {
94 Self::Io(value.into())
95 }
96}
97
98pub type Result<T> = std::result::Result<T, Error>;
100
101pub const RESERVED_FD_RANGE: Range<i32> = 100..200;
105
106const MAX_MSG_SIZE: usize = 32 * 1024;
109
110fn base_proto_version() -> &'static semver::VersionReq {
111 static BASE_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
113 BASE_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.3").unwrap())
114}
115
116fn layer_info_proto_version() -> &'static semver::VersionReq {
117 static LAYER_INFO_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
118 LAYER_INFO_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.5").unwrap())
119}
120
121fn layer_info_piped_proto_version() -> &'static semver::VersionReq {
122 static LAYER_INFO_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
123 LAYER_INFO_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.7").unwrap())
124}
125
126#[derive(Serialize)]
127struct Request {
128 method: String,
129 args: Vec<serde_json::Value>,
130}
131
132impl Request {
133 fn new<T, I>(method: &str, args: T) -> Self
134 where
135 T: IntoIterator<Item = I>,
136 I: Into<serde_json::Value>,
137 {
138 let args: Vec<_> = args.into_iter().map(|v| v.into()).collect();
139 Self {
140 method: method.to_string(),
141 args,
142 }
143 }
144
145 fn new_bare(method: &str) -> Self {
146 Self {
147 method: method.to_string(),
148 args: vec![],
149 }
150 }
151}
152
153#[derive(Deserialize)]
154struct Reply {
155 success: bool,
156 error: String,
157 pipeid: u32,
158 value: serde_json::Value,
159}
160
161type ChildFuture = Pin<
162 Box<
163 dyn Future<Output = std::result::Result<std::io::Result<std::process::Output>, JoinError>>
164 + Send,
165 >,
166>;
167
168pub struct ImageProxy {
170 sockfd: Arc<Mutex<OwnedFd>>,
171 childwait: Arc<AsyncMutex<ChildFuture>>,
172 protover: semver::Version,
173}
174
175impl std::fmt::Debug for ImageProxy {
176 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
177 f.debug_struct("ImageProxy").finish()
178 }
179}
180
181#[derive(Debug, PartialEq, Eq)]
183pub struct OpenedImage(u32);
184
185#[derive(Debug, PartialEq, Eq)]
186struct PipeId(NonZeroU32);
187
188impl PipeId {
189 fn try_new(pipeid: u32) -> Option<Self> {
190 Some(Self(NonZeroU32::new(pipeid)?))
191 }
192}
193
194#[derive(Debug, Default)]
196pub struct ImageProxyConfig {
197 pub authfile: Option<PathBuf>,
200
201 pub auth_data: Option<File>,
203
204 pub auth_anonymous: bool,
208
209 pub certificate_directory: Option<PathBuf>,
212
213 pub decryption_keys: Option<Vec<String>>,
216
217 pub insecure_skip_tls_verification: Option<bool>,
219
220 pub user_agent_prefix: Option<String>,
224
225 pub debug: bool,
229
230 pub skopeo_cmd: Option<Command>,
251}
252
253fn supports_user_agent_prefix() -> bool {
255 static SUPPORTS_USER_AGENT: OnceLock<bool> = OnceLock::new();
256 *SUPPORTS_USER_AGENT.get_or_init(|| {
257 Command::new("skopeo")
258 .arg("--help")
259 .stdout(Stdio::piped())
260 .stderr(Stdio::null())
261 .output()
262 .ok()
263 .and_then(|output| {
264 String::from_utf8(output.stdout)
265 .ok()
266 .map(|help| help.contains("--user-agent-prefix"))
267 })
268 .unwrap_or(false)
269 })
270}
271
272impl TryFrom<ImageProxyConfig> for Command {
273 type Error = Error;
274
275 fn try_from(config: ImageProxyConfig) -> Result<Self> {
276 let debug = config.debug || std::env::var_os("CONTAINERS_IMAGE_PROXY_DEBUG").is_some();
277 let mut allocated_fds = RESERVED_FD_RANGE.clone();
278 let mut alloc_fd = || {
279 allocated_fds.next().ok_or_else(|| {
280 Error::Other("Ran out of reserved file descriptors for child".into())
281 })
282 };
283
284 let mut c = config.skopeo_cmd.unwrap_or_else(|| {
286 let mut c = std::process::Command::new("skopeo");
287 unsafe {
288 c.pre_exec(|| {
289 Ok(rustix::process::set_parent_process_death_signal(Some(
290 rustix::process::Signal::TERM,
291 ))?)
292 });
293 }
294 c
295 });
296 c.arg("experimental-image-proxy");
297 if debug {
298 c.arg("--debug");
299 }
300 let auth_option_count = [
301 config.authfile.is_some(),
302 config.auth_data.is_some(),
303 config.auth_anonymous,
304 ]
305 .into_iter()
306 .filter(|&x| x)
307 .count();
308 if auth_option_count > 1 {
309 return Err(Error::Configuration(
311 "Conflicting authentication options".into(),
312 ));
313 }
314 if let Some(authfile) = config.authfile {
315 c.arg("--authfile");
316 c.arg(authfile);
317 } else if let Some(mut auth_data) = config.auth_data.map(std::io::BufReader::new) {
318 let target_fd = alloc_fd()?;
322 let tmpd = &cap_std::fs::Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
323 let mut tempfile =
324 cap_tempfile::TempFile::new_anonymous(tmpd).map(std::io::BufWriter::new)?;
325 std::io::copy(&mut auth_data, &mut tempfile)?;
326 let tempfile = tempfile
327 .into_inner()
328 .map_err(|e| e.into_error())?
329 .into_std();
330 let fd = std::sync::Arc::new(tempfile.into());
331 c.take_fd_n(fd, target_fd);
332 c.arg("--authfile");
333 c.arg(format!("/proc/self/fd/{target_fd}"));
334 } else if config.auth_anonymous {
335 c.arg("--no-creds");
336 }
337
338 if let Some(certificate_directory) = config.certificate_directory {
339 c.arg("--cert-dir");
340 c.arg(certificate_directory);
341 }
342
343 if let Some(decryption_keys) = config.decryption_keys {
344 for decryption_key in &decryption_keys {
345 c.arg("--decryption-key");
346 c.arg(decryption_key);
347 }
348 }
349
350 if config.insecure_skip_tls_verification.unwrap_or_default() {
351 c.arg("--tls-verify=false");
352 }
353
354 if let Some(user_agent_prefix) = config.user_agent_prefix {
356 if supports_user_agent_prefix() {
357 c.arg("--user-agent-prefix");
358 c.arg(user_agent_prefix);
359 }
360 }
361
362 c.stdout(Stdio::null());
363 if !debug {
364 c.stderr(Stdio::piped());
365 }
366 Ok(c)
367 }
368}
369
370#[derive(Debug, serde::Deserialize)]
372pub struct ConvertedLayerInfo {
373 pub digest: Digest,
376
377 pub size: u64,
379
380 pub media_type: oci_spec::image::MediaType,
382}
383
384#[derive(Debug)]
386struct FinishPipe {
387 pipeid: PipeId,
388 datafd: OwnedFd,
389}
390
391#[derive(Debug)]
393struct DualFds {
394 datafd: OwnedFd,
395 errfd: OwnedFd,
396}
397
398trait FromReplyFds: Send + 'static
400where
401 Self: Sized,
402{
403 fn from_reply(
404 iterable: impl IntoIterator<IntoIter: FusedIterator, Item = OwnedFd>,
405 pipeid: u32,
406 ) -> Result<Self>;
407}
408
409impl FromReplyFds for () {
411 fn from_reply(fds: impl IntoIterator<Item = OwnedFd>, pipeid: u32) -> Result<Self> {
412 if fds.into_iter().next().is_some() {
413 return Err(Error::Other("expected no fds".into()));
414 }
415 if pipeid != 0 {
416 return Err(Error::Other("unexpected pipeid".into()));
417 }
418 Ok(())
419 }
420}
421
422impl FromReplyFds for FinishPipe {
424 fn from_reply(fds: impl IntoIterator<Item = OwnedFd>, pipeid: u32) -> Result<Self> {
425 let Some(pipeid) = PipeId::try_new(pipeid) else {
426 return Err(Error::Other("Expected pipeid for FinishPipe".into()));
427 };
428 let datafd = fds
429 .into_iter()
430 .exactly_one()
431 .map_err(|_| Error::Other("Expected exactly one fd for FinishPipe".into()))?;
432 Ok(Self { pipeid, datafd })
433 }
434}
435
436impl FromReplyFds for DualFds {
438 fn from_reply(fds: impl IntoIterator<Item = OwnedFd>, pipeid: u32) -> Result<Self> {
439 if pipeid != 0 {
440 return Err(Error::Other("Unexpected pipeid with DualFds".into()));
441 }
442 let [datafd, errfd] = fds
443 .into_iter()
444 .collect_array()
445 .ok_or_else(|| Error::Other("Expected two fds for DualFds".into()))?;
446 Ok(Self { datafd, errfd })
447 }
448}
449
450impl ImageProxy {
451 pub async fn new() -> Result<Self> {
453 Self::new_with_config(Default::default()).await
454 }
455
456 #[instrument]
458 pub async fn new_with_config(config: ImageProxyConfig) -> Result<Self> {
459 let mut c = Command::try_from(config)?;
460 let (mysock, theirsock) = rustix::net::socketpair(
461 rustix::net::AddressFamily::UNIX,
462 rustix::net::SocketType::SEQPACKET,
463 rustix::net::SocketFlags::CLOEXEC,
464 None,
465 )?;
466 c.stdin(Stdio::from(theirsock));
467 let child = match c.spawn() {
468 Ok(c) => c,
469 Err(error) => return Err(Error::SkopeoSpawnError(error)),
470 };
471 tracing::debug!("Spawned skopeo pid={:?}", child.id());
472 let childwait = tokio::task::spawn_blocking(move || child.wait_with_output());
477 let sockfd = Arc::new(Mutex::new(mysock));
478
479 let mut r = Self {
480 sockfd,
481 childwait: Arc::new(AsyncMutex::new(Box::pin(childwait))),
482 protover: semver::Version::new(0, 0, 0),
483 };
484
485 let protover: String = r.impl_request("Initialize", [(); 0]).await?;
487 tracing::debug!("Remote protocol version: {protover}");
488 let protover = semver::Version::parse(protover.as_str())?;
489 let supported = base_proto_version();
491 if !supported.matches(&protover) {
492 return Err(Error::ProxyTooOld {
493 requested_version: protover.to_string().into(),
494 found_version: supported.to_string().into(),
495 });
496 }
497 r.protover = protover;
498
499 Ok(r)
500 }
501
502 async fn impl_request_raw<T: serde::de::DeserializeOwned + Send + 'static, F: FromReplyFds>(
504 sockfd: Arc<Mutex<OwnedFd>>,
505 req: Request,
506 ) -> Result<(T, F)> {
507 tracing::trace!("sending request {}", req.method.as_str());
508 let r = tokio::task::spawn_blocking(move || {
510 let sockfd = sockfd.lock().unwrap();
511 let sendbuf = serde_json::to_vec(&req)?;
512 let sockfd = &*sockfd;
513 rustix::net::send(sockfd, &sendbuf, rustix::net::SendFlags::empty())?;
514 drop(sendbuf);
515 let mut buf = [0u8; MAX_MSG_SIZE];
516 let mut cmsg_space: Vec<std::mem::MaybeUninit<u8>> =
517 vec![std::mem::MaybeUninit::uninit(); rustix::cmsg_space!(ScmRights(1))];
518 let mut cmsg_buffer = rustix::net::RecvAncillaryBuffer::new(cmsg_space.as_mut_slice());
519 let iov = std::io::IoSliceMut::new(buf.as_mut());
520 let mut iov = [iov];
521 let nread = rustix::net::recvmsg(
522 sockfd,
523 &mut iov,
524 &mut cmsg_buffer,
525 rustix::net::RecvFlags::CMSG_CLOEXEC,
526 )?
527 .bytes;
528 let fdret = cmsg_buffer
529 .drain()
530 .filter_map(|m| match m {
531 rustix::net::RecvAncillaryMessage::ScmRights(f) => Some(f),
532 _ => None,
533 })
534 .flatten();
535 let buf = &buf[..nread];
536 let reply: Reply = serde_json::from_slice(buf)?;
537 if !reply.success {
538 return Err(Error::RequestInitiationFailure {
539 method: req.method.clone().into(),
540 error: reply.error.into(),
541 });
542 }
543 let fds = FromReplyFds::from_reply(fdret, reply.pipeid)?;
544 Ok((serde_json::from_value(reply.value)?, fds))
545 })
546 .await
547 .map_err(|e| Error::Other(e.to_string().into()))??;
548 tracing::trace!("completed request");
549 Ok(r)
550 }
551
552 #[instrument(skip(args))]
555 async fn impl_request_with_fds<
556 T: serde::de::DeserializeOwned + Send + 'static,
557 F: FromReplyFds,
558 >(
559 &self,
560 method: &str,
561 args: impl IntoIterator<Item = impl Into<serde_json::Value>>,
562 ) -> Result<(T, F)> {
563 let req = Self::impl_request_raw(Arc::clone(&self.sockfd), Request::new(method, args));
564 let mut childwait = self.childwait.lock().await;
565 tokio::select! {
566 r = req => { r }
567 r = childwait.as_mut() => {
568 let r = r.map_err(|e| Error::Other(e.to_string().into()))??;
569 let stderr = String::from_utf8_lossy(&r.stderr);
570 Err(Error::Other(format!("skopeo proxy unexpectedly exited during request method {}: {}\n{}", method, r.status, stderr).into()))
571 }
572 }
573 }
574
575 async fn impl_request<T: serde::de::DeserializeOwned + Send + 'static>(
577 &self,
578 method: &str,
579 args: impl IntoIterator<Item = impl Into<serde_json::Value>>,
580 ) -> Result<T> {
581 let (r, ()) = self.impl_request_with_fds(method, args).await?;
582 Ok(r)
583 }
584
585 #[instrument]
586 async fn finish_pipe(&self, pipeid: PipeId) -> Result<()> {
587 tracing::debug!("closing pipe");
588 let (r, ()) = self
589 .impl_request_with_fds("FinishPipe", [pipeid.0.get()])
590 .await?;
591 Ok(r)
592 }
593
594 #[instrument]
596 pub async fn open_image(&self, imgref: &str) -> Result<OpenedImage> {
597 tracing::debug!("opening image");
598 let imgid = self.impl_request("OpenImage", [imgref]).await?;
599 Ok(OpenedImage(imgid))
600 }
601
602 #[instrument]
604 pub async fn open_image_ref(&self, imgref: &ImageReference) -> Result<OpenedImage> {
605 self.open_image(&imgref.to_string()).await
606 }
607
608 #[instrument]
611 pub async fn open_image_optional(&self, imgref: &str) -> Result<Option<OpenedImage>> {
612 tracing::debug!("opening image");
613 let imgid = self.impl_request("OpenImageOptional", [imgref]).await?;
614 if imgid == 0 {
615 Ok(None)
616 } else {
617 Ok(Some(OpenedImage(imgid)))
618 }
619 }
620
621 #[instrument]
624 pub async fn open_image_optional_ref(
625 &self,
626 imgref: &ImageReference,
627 ) -> Result<Option<OpenedImage>> {
628 self.open_image_optional(&imgref.to_string()).await
629 }
630
631 #[instrument]
632 pub async fn close_image(&self, img: &OpenedImage) -> Result<()> {
633 self.impl_request("CloseImage", [img.0]).await
634 }
635
636 async fn read_finish_pipe(&self, pipe: FinishPipe) -> Result<Vec<u8>> {
637 let fd = tokio::fs::File::from_std(std::fs::File::from(pipe.datafd));
638 let mut fd = tokio::io::BufReader::new(fd);
639 let mut r = Vec::new();
640 let reader = fd.read_to_end(&mut r);
641 let (nbytes, finish) = tokio::join!(reader, self.finish_pipe(pipe.pipeid));
642 finish?;
643 assert_eq!(nbytes?, r.len());
644 Ok(r)
645 }
646
647 pub async fn fetch_manifest_raw_oci(&self, img: &OpenedImage) -> Result<(String, Vec<u8>)> {
651 let (digest, pipefd) = self.impl_request_with_fds("GetManifest", [img.0]).await?;
652 Ok((digest, self.read_finish_pipe(pipefd).await?))
653 }
654
655 pub async fn fetch_manifest(
658 &self,
659 img: &OpenedImage,
660 ) -> Result<(String, oci_spec::image::ImageManifest)> {
661 let (digest, raw) = self.fetch_manifest_raw_oci(img).await?;
662 let manifest = serde_json::from_slice(&raw)?;
663 Ok((digest, manifest))
664 }
665
666 pub async fn fetch_config_raw(&self, img: &OpenedImage) -> Result<Vec<u8>> {
669 let ((), pipe) = self.impl_request_with_fds("GetFullConfig", [img.0]).await?;
670 self.read_finish_pipe(pipe).await
671 }
672
673 pub async fn fetch_config(
676 &self,
677 img: &OpenedImage,
678 ) -> Result<oci_spec::image::ImageConfiguration> {
679 let raw = self.fetch_config_raw(img).await?;
680 serde_json::from_slice(&raw).map_err(Into::into)
681 }
682
683 #[instrument]
693 pub async fn get_blob(
694 &self,
695 img: &OpenedImage,
696 digest: &Digest,
697 size: u64,
698 ) -> Result<(
699 impl AsyncBufRead + Send + Unpin,
700 impl Future<Output = Result<()>> + Unpin + '_,
701 )> {
702 tracing::debug!("fetching blob");
705 let args: Vec<serde_json::Value> =
706 vec![img.0.into(), digest.to_string().into(), size.into()];
707 let (_bloblen, pipe): (serde_json::Number, FinishPipe) =
712 self.impl_request_with_fds("GetBlob", args).await?;
713 let fd = tokio::fs::File::from_std(std::fs::File::from(pipe.datafd));
714 let fd = tokio::io::BufReader::new(fd);
715 let finish = Box::pin(self.finish_pipe(pipe.pipeid));
716 Ok((fd, finish))
717 }
718
719 async fn read_blob_error(fd: OwnedFd) -> std::result::Result<(), GetBlobError> {
720 let fd = tokio::fs::File::from_std(std::fs::File::from(fd));
721 let mut errfd = tokio::io::BufReader::new(fd);
722 let mut buf = Vec::new();
723 errfd
724 .read_to_end(&mut buf)
725 .await
726 .map_err(|e| GetBlobError::Other(e.to_string().into_boxed_str()))?;
727 if buf.is_empty() {
728 return Ok(());
729 }
730 #[derive(Deserialize)]
731 struct RemoteError {
732 code: String,
733 message: String,
734 }
735 let e: RemoteError = serde_json::from_slice(&buf)
736 .map_err(|e| GetBlobError::Other(e.to_string().into_boxed_str()))?;
737 match e.code.as_str() {
738 "EPIPE" => Ok(()),
740 "retryable" => Err(GetBlobError::Retryable(e.message.into_boxed_str())),
741 _ => Err(GetBlobError::Other(e.message.into_boxed_str())),
742 }
743 }
744
745 #[instrument]
749 pub async fn get_raw_blob(
750 &self,
751 img: &OpenedImage,
752 digest: &Digest,
753 ) -> Result<(
754 Option<u64>,
755 tokio::fs::File,
756 impl Future<Output = std::result::Result<(), GetBlobError>> + Unpin + '_,
757 )> {
758 tracing::debug!("fetching blob");
759 let args: Vec<serde_json::Value> = vec![img.0.into(), digest.to_string().into()];
760 let (bloblen, fds): (i64, DualFds) = self.impl_request_with_fds("GetRawBlob", args).await?;
761 let bloblen = u64::try_from(bloblen).ok();
763 let fd = tokio::fs::File::from_std(std::fs::File::from(fds.datafd));
764 let err = Self::read_blob_error(fds.errfd).boxed();
765 Ok((bloblen, fd, err))
766 }
767
768 #[instrument]
770 pub async fn get_descriptor(
771 &self,
772 img: &OpenedImage,
773 descriptor: &Descriptor,
774 ) -> Result<(
775 impl AsyncBufRead + Send + Unpin,
776 impl Future<Output = Result<()>> + Unpin + '_,
777 )> {
778 self.get_blob(img, descriptor.digest(), descriptor.size())
779 .await
780 }
781
782 #[instrument]
784 pub async fn get_layer_info(
785 &self,
786 img: &OpenedImage,
787 ) -> Result<Option<Vec<ConvertedLayerInfo>>> {
788 tracing::debug!("Getting layer info");
789 if layer_info_piped_proto_version().matches(&self.protover) {
790 let ((), pipe) = self
791 .impl_request_with_fds("GetLayerInfoPiped", [img.0])
792 .await?;
793 let buf = self.read_finish_pipe(pipe).await?;
794 return Ok(Some(serde_json::from_slice(&buf)?));
795 }
796 if !layer_info_proto_version().matches(&self.protover) {
797 return Ok(None);
798 }
799 let layers = self.impl_request("GetLayerInfo", [img.0]).await?;
800 Ok(Some(layers))
801 }
802
803 #[instrument]
805 pub async fn finalize(self) -> Result<()> {
806 let _ = &self;
807 let req = Request::new_bare("Shutdown");
808 let sendbuf = serde_json::to_vec(&req)?;
809 let sockfd = Arc::try_unwrap(self.sockfd).unwrap().into_inner().unwrap();
811 rustix::net::send(sockfd, &sendbuf, rustix::net::SendFlags::empty())?;
812 drop(sendbuf);
813 tracing::debug!("sent shutdown request");
814 let mut childwait = self.childwait.lock().await;
815 let output = childwait
816 .as_mut()
817 .await
818 .map_err(|e| Error::new_other(e.to_string()))??;
819 if !output.status.success() {
820 let stderr = String::from_utf8_lossy(&output.stderr);
821 return Err(Error::RequestReturned(
822 format!("proxy failed: {}\n{}", output.status, stderr).into(),
823 ));
824 }
825 tracing::debug!("proxy exited successfully");
826 Ok(())
827 }
828}
829
830#[cfg(test)]
831mod tests {
832 use std::io::{BufWriter, Seek, Write};
833 use std::os::fd::{AsRawFd, OwnedFd};
834
835 use super::*;
836 use cap_std_ext::cap_std::fs::Dir;
837 use rustix::fs::{memfd_create, MemfdFlags};
838
839 fn check_skopeo() -> bool {
841 static HAVE_SKOPEO: OnceLock<bool> = OnceLock::new();
842 *HAVE_SKOPEO.get_or_init(|| {
843 Command::new("skopeo")
844 .arg("--help")
845 .stdout(Stdio::null())
846 .stderr(Stdio::null())
847 .status()
848 .is_ok()
849 })
850 }
851
852 fn validate(c: Command, contains: &[&str], not_contains: &[&str]) {
853 let d = format!("{:?}", c);
857 for c in contains {
858 assert!(d.contains(c), "{} missing {}", d, c);
859 }
860 for c in not_contains {
861 assert!(!d.contains(c), "{} should not contain {}", d, c);
862 }
863 }
864
865 #[test]
866 fn proxy_configs() {
867 let tmpd = &cap_tempfile::tempdir(cap_std::ambient_authority()).unwrap();
868
869 let c = Command::try_from(ImageProxyConfig::default()).unwrap();
870 validate(
871 c,
872 &["experimental-image-proxy"],
873 &["--no-creds", "--tls-verify", "--authfile"],
874 );
875
876 let c = Command::try_from(ImageProxyConfig {
877 authfile: Some(PathBuf::from("/path/to/authfile")),
878 ..Default::default()
879 })
880 .unwrap();
881 validate(c, &[r"--authfile", "/path/to/authfile"], &[]);
882
883 let decryption_key_path = "/path/to/decryption_key";
884 let c = Command::try_from(ImageProxyConfig {
885 decryption_keys: Some(vec![decryption_key_path.to_string()]),
886 ..Default::default()
887 })
888 .unwrap();
889 validate(c, &[r"--decryption-key", "/path/to/decryption_key"], &[]);
890
891 let c = Command::try_from(ImageProxyConfig {
892 certificate_directory: Some(PathBuf::from("/path/to/certs")),
893 ..Default::default()
894 })
895 .unwrap();
896 validate(c, &[r"--cert-dir", "/path/to/certs"], &[]);
897
898 let c = Command::try_from(ImageProxyConfig {
899 insecure_skip_tls_verification: Some(true),
900 ..Default::default()
901 })
902 .unwrap();
903 validate(c, &[r"--tls-verify=false"], &[]);
904
905 let mut tmpf = cap_tempfile::TempFile::new_anonymous(tmpd).unwrap();
906 tmpf.write_all(r#"{ "auths": {} "#.as_bytes()).unwrap();
907 tmpf.seek(std::io::SeekFrom::Start(0)).unwrap();
908 let c = Command::try_from(ImageProxyConfig {
909 auth_data: Some(tmpf.into_std()),
910 ..Default::default()
911 })
912 .unwrap();
913 validate(c, &["--authfile", "/proc/self/fd/100"], &[]);
914
915 let c = Command::try_from(ImageProxyConfig {
917 user_agent_prefix: Some("bootc/1.0".to_string()),
918 ..Default::default()
919 })
920 .unwrap();
921 if supports_user_agent_prefix() {
922 validate(c, &["--user-agent-prefix", "bootc/1.0"], &[]);
923 } else {
924 validate(c, &[], &["--user-agent-prefix"]);
925 }
926 }
927
928 #[tokio::test]
929 async fn skopeo_not_found() {
930 let mut config = ImageProxyConfig {
931 ..ImageProxyConfig::default()
932 };
933 config.skopeo_cmd = Some(Command::new("no-skopeo"));
934
935 match ImageProxy::new_with_config(config).await {
936 Ok(_) => panic!("Expected an error"),
937 Err(ref e @ Error::SkopeoSpawnError(ref inner)) => {
938 assert_eq!(inner.kind(), std::io::ErrorKind::NotFound);
939 assert!(e
941 .to_string()
942 .contains("skopeo spawn error: No such file or directory"));
943 }
944 Err(e) => panic!("Unexpected error {e}"),
945 }
946 }
947
948 #[tokio::test]
949 async fn test_proxy_send_sync() {
950 fn assert_send_sync(_x: impl Send + Sync) {}
951
952 let Ok(proxy) = ImageProxy::new().await else {
953 return;
955 };
956 assert_send_sync(&proxy);
957 assert_send_sync(proxy);
958
959 let opened = OpenedImage(0);
960 assert_send_sync(&opened);
961 assert_send_sync(opened);
962 }
963
964 fn generate_err_fd(v: serde_json::Value) -> Result<OwnedFd> {
965 let tmp = Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
966 let mut tf = cap_tempfile::TempFile::new_anonymous(&tmp).map(BufWriter::new)?;
967 serde_json::to_writer(&mut tf, &v)?;
968 let mut tf = tf.into_inner().map_err(|e| e.into_error())?;
969 tf.seek(std::io::SeekFrom::Start(0))?;
970 let r = tf.into_std().into();
971 Ok(r)
972 }
973
974 #[tokio::test]
975 async fn test_read_blob_error_retryable() -> Result<()> {
976 let retryable = serde_json::json!({
977 "code": "retryable",
978 "message": "foo",
979 });
980 let retryable = generate_err_fd(retryable)?;
981 let err = ImageProxy::read_blob_error(retryable).boxed();
982 let e = err.await.unwrap_err();
983 match e {
984 GetBlobError::Retryable(s) => assert_eq!(s.as_ref(), "foo"),
985 _ => panic!("Unexpected error {e:?}"),
986 }
987 Ok(())
988 }
989
990 #[tokio::test]
991 async fn test_read_blob_error_none() -> Result<()> {
992 let tmp = Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
993 let tf = cap_tempfile::TempFile::new_anonymous(&tmp)?.into_std();
994 let err = ImageProxy::read_blob_error(tf.into()).boxed();
995 err.await.unwrap();
996 Ok(())
997 }
998
999 #[tokio::test]
1000 async fn test_read_blob_error_other() -> Result<()> {
1001 let other = serde_json::json!({
1002 "code": "other",
1003 "message": "bar",
1004 });
1005 let other = generate_err_fd(other)?;
1006 let err = ImageProxy::read_blob_error(other).boxed();
1007 let e = err.await.unwrap_err();
1008 match e {
1009 GetBlobError::Other(s) => assert_eq!(s.as_ref(), "bar"),
1010 _ => panic!("Unexpected error {e:?}"),
1011 }
1012 Ok(())
1013 }
1014
1015 #[tokio::test]
1016 async fn test_read_blob_error_epipe() -> Result<()> {
1017 let epipe = serde_json::json!({
1018 "code": "EPIPE",
1019 "message": "baz",
1020 });
1021 let epipe = generate_err_fd(epipe)?;
1022 let err = ImageProxy::read_blob_error(epipe).boxed();
1023 err.await.unwrap();
1024 Ok(())
1025 }
1026
1027 fn create_dummy_fd() -> OwnedFd {
1029 memfd_create(c"test-fd", MemfdFlags::CLOEXEC).unwrap()
1030 }
1031
1032 #[test]
1033 fn test_new_from_raw_values_finish_pipe() {
1034 let datafd = create_dummy_fd();
1035 let raw_datafd_val = datafd.as_raw_fd();
1037 let fds = vec![datafd];
1038 let v = FinishPipe::from_reply(fds, 1).unwrap();
1039 assert_eq!(v.pipeid.0.get(), 1);
1040 assert_eq!(v.datafd.as_raw_fd(), raw_datafd_val);
1041 }
1042
1043 #[test]
1044 fn test_new_from_raw_values_dual_fds() {
1045 let datafd = create_dummy_fd();
1046 let errfd = create_dummy_fd();
1047 let raw_datafd_val = datafd.as_raw_fd();
1048 let raw_errfd_val = errfd.as_raw_fd();
1049 let fds = vec![datafd, errfd];
1050 let v = DualFds::from_reply(fds, 0).unwrap();
1051 assert_eq!(v.datafd.as_raw_fd(), raw_datafd_val);
1052 assert_eq!(v.errfd.as_raw_fd(), raw_errfd_val);
1053 }
1054
1055 #[test]
1056 fn test_new_from_raw_values_error_too_many_fds() {
1057 let fds = vec![create_dummy_fd(), create_dummy_fd(), create_dummy_fd()];
1058 match DualFds::from_reply(fds, 0) {
1059 Ok(v) => unreachable!("{v:?}"),
1060 Err(Error::Other(msg)) => {
1061 assert_eq!(msg.as_ref(), "Expected two fds for DualFds")
1062 }
1063 Err(other) => unreachable!("{other}"),
1064 }
1065 }
1066
1067 #[test]
1068 fn test_new_from_raw_values_error_fd_with_zero_pipeid() {
1069 let fds = vec![create_dummy_fd()];
1070 match FinishPipe::from_reply(fds, 0) {
1071 Ok(v) => unreachable!("{v:?}"),
1072 Err(Error::Other(msg)) => {
1073 assert_eq!(msg.as_ref(), "Expected pipeid for FinishPipe")
1074 }
1075 Err(other) => unreachable!("{other}"),
1076 }
1077 }
1078
1079 #[test]
1080 fn test_new_from_raw_values_error_pipeid_with_both_fds() {
1081 let fds = vec![create_dummy_fd(), create_dummy_fd()];
1082 match DualFds::from_reply(fds, 1) {
1083 Ok(v) => unreachable!("{v:?}"),
1084 Err(Error::Other(msg)) => {
1085 assert_eq!(msg.as_ref(), "Unexpected pipeid with DualFds")
1086 }
1087 Err(other) => unreachable!("{other}"),
1088 }
1089 }
1090
1091 #[test]
1092 fn test_new_from_raw_values_error_no_fd_with_pipeid() {
1093 let fds: Vec<OwnedFd> = vec![];
1094 match FinishPipe::from_reply(fds, 1) {
1095 Ok(v) => unreachable!("{v:?}"),
1096 Err(Error::Other(msg)) => {
1097 assert_eq!(msg.as_ref(), "Expected exactly one fd for FinishPipe")
1098 }
1099 Err(other) => unreachable!("{other}"),
1100 }
1101 }
1102
1103 #[tokio::test]
1104 #[ignore = "https://github.com/coreos/rpm-ostree/issues/5442"]
1105 async fn test_open_optional() -> Result<()> {
1106 if !check_skopeo() {
1107 return Ok(());
1108 }
1109
1110 let td = tempfile::tempdir()?;
1111 let td = td.path().to_str().unwrap();
1112 let proxy = ImageProxy::new().await?;
1113 let imgpath = format!("oci-archive:{td}/some-nonexistent-image.ociarchive");
1114 let img = proxy.open_image_optional(&imgpath).await.unwrap();
1115 assert!(img.is_none());
1116
1117 Ok(())
1118 }
1119}