1pub mod transport;
8
9pub use ::oci_spec;
13pub use transport::{
14 ContainersStorageRef, ImageReference, ImageReferenceError, Transport, TransportConversionError,
15};
16
17use ::oci_spec::image::{Descriptor, Digest};
18use cap_std_ext::prelude::CapStdExtCommandExt;
19use cap_std_ext::{cap_std, cap_tempfile};
20use futures_util::{Future, FutureExt};
21use itertools::Itertools;
22use serde::{Deserialize, Serialize};
23use std::fs::File;
24use std::iter::FusedIterator;
25use std::num::NonZeroU32;
26use std::ops::Range;
27use std::os::fd::OwnedFd;
28use std::os::unix::prelude::CommandExt;
29use std::path::PathBuf;
30use std::pin::Pin;
31use std::process::{Command, Stdio};
32use std::sync::{Arc, Mutex, OnceLock};
33use thiserror::Error;
34use tokio::io::{AsyncBufRead, AsyncReadExt};
35use tokio::sync::Mutex as AsyncMutex;
36use tokio::task::JoinError;
37use tracing::instrument;
38
39#[derive(Error, Debug)]
41#[non_exhaustive]
42pub enum Error {
43 #[error("i/o error: {0}")]
44 Io(#[from] std::io::Error),
46 #[error("skopeo spawn error: {0}")]
47 SkopeoSpawnError(#[source] std::io::Error),
49 #[error("serialization error: {0}")]
50 SerDe(#[from] serde_json::Error),
52 #[error("failed to invoke method {method}: {error}")]
54 RequestInitiationFailure { method: Box<str>, error: Box<str> },
55 #[error("proxy request returned error: {0}")]
57 RequestReturned(Box<str>),
58 #[error("semantic version error: {0}")]
59 SemanticVersion(#[from] semver::Error),
60 #[error("proxy too old (requested={requested_version} found={found_version}) error")]
61 ProxyTooOld {
63 requested_version: Box<str>,
64 found_version: Box<str>,
65 },
66 #[error("configuration error: {0}")]
67 Configuration(Box<str>),
69 #[error("other error: {0}")]
70 Other(Box<str>),
72}
73
74impl Error {
75 pub(crate) fn new_other(e: impl Into<Box<str>>) -> Self {
76 Self::Other(e.into())
77 }
78}
79
80#[derive(Error, Debug)]
82#[non_exhaustive]
83pub enum GetBlobError {
84 #[error("retryable error: {0}")]
86 Retryable(Box<str>),
87 #[error("other error: {0}")]
88 Other(Box<str>),
90}
91
92impl From<rustix::io::Errno> for Error {
93 fn from(value: rustix::io::Errno) -> Self {
94 Self::Io(value.into())
95 }
96}
97
98pub type Result<T> = std::result::Result<T, Error>;
100
101pub const RESERVED_FD_RANGE: Range<i32> = 100..200;
105
106const MAX_MSG_SIZE: usize = 32 * 1024;
109
110fn base_proto_version() -> &'static semver::VersionReq {
111 static BASE_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
113 BASE_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.3").unwrap())
114}
115
116fn layer_info_proto_version() -> &'static semver::VersionReq {
117 static LAYER_INFO_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
118 LAYER_INFO_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.5").unwrap())
119}
120
121fn layer_info_piped_proto_version() -> &'static semver::VersionReq {
122 static LAYER_INFO_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
123 LAYER_INFO_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.7").unwrap())
124}
125
126#[derive(Serialize)]
127struct Request {
128 method: String,
129 args: Vec<serde_json::Value>,
130}
131
132impl Request {
133 fn new<T, I>(method: &str, args: T) -> Self
134 where
135 T: IntoIterator<Item = I>,
136 I: Into<serde_json::Value>,
137 {
138 let args: Vec<_> = args.into_iter().map(|v| v.into()).collect();
139 Self {
140 method: method.to_string(),
141 args,
142 }
143 }
144
145 fn new_bare(method: &str) -> Self {
146 Self {
147 method: method.to_string(),
148 args: vec![],
149 }
150 }
151}
152
153#[derive(Deserialize)]
154struct Reply {
155 success: bool,
156 error: String,
157 pipeid: u32,
158 value: serde_json::Value,
159}
160
161type ChildFuture = Pin<
162 Box<
163 dyn Future<Output = std::result::Result<std::io::Result<std::process::Output>, JoinError>>
164 + Send,
165 >,
166>;
167
168pub struct ImageProxy {
170 sockfd: Arc<Mutex<OwnedFd>>,
171 childwait: Arc<AsyncMutex<ChildFuture>>,
172 protover: semver::Version,
173}
174
175impl std::fmt::Debug for ImageProxy {
176 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
177 f.debug_struct("ImageProxy").finish()
178 }
179}
180
181#[derive(Debug, PartialEq, Eq)]
183pub struct OpenedImage(u32);
184
185#[derive(Debug, PartialEq, Eq)]
186struct PipeId(NonZeroU32);
187
188impl PipeId {
189 fn try_new(pipeid: u32) -> Option<Self> {
190 Some(Self(NonZeroU32::new(pipeid)?))
191 }
192}
193
194#[derive(Debug, Default)]
196#[non_exhaustive]
197pub struct ImageProxyConfig {
198 pub authfile: Option<PathBuf>,
201
202 pub auth_data: Option<File>,
204
205 pub auth_anonymous: bool,
209
210 pub certificate_directory: Option<PathBuf>,
213
214 pub decryption_keys: Option<Vec<String>>,
217
218 pub insecure_skip_tls_verification: Option<bool>,
220
221 pub insecure_policy: Option<bool>,
223
224 pub user_agent_prefix: Option<String>,
228
229 pub debug: bool,
233
234 pub skopeo_cmd: Option<Command>,
255}
256
257fn supports_user_agent_prefix() -> bool {
259 static SUPPORTS_USER_AGENT: OnceLock<bool> = OnceLock::new();
260 *SUPPORTS_USER_AGENT.get_or_init(|| {
261 Command::new("skopeo")
262 .arg("--help")
263 .stdout(Stdio::piped())
264 .stderr(Stdio::null())
265 .output()
266 .ok()
267 .and_then(|output| {
268 String::from_utf8(output.stdout)
269 .ok()
270 .map(|help| help.contains("--user-agent-prefix"))
271 })
272 .unwrap_or(false)
273 })
274}
275
276impl TryFrom<ImageProxyConfig> for Command {
277 type Error = Error;
278
279 fn try_from(config: ImageProxyConfig) -> Result<Self> {
280 let debug = config.debug || std::env::var_os("CONTAINERS_IMAGE_PROXY_DEBUG").is_some();
281 let mut allocated_fds = RESERVED_FD_RANGE.clone();
282 let mut alloc_fd = || {
283 allocated_fds.next().ok_or_else(|| {
284 Error::Other("Ran out of reserved file descriptors for child".into())
285 })
286 };
287
288 let mut c = config.skopeo_cmd.unwrap_or_else(|| {
290 let mut c = std::process::Command::new("skopeo");
291 unsafe {
292 c.pre_exec(|| {
293 Ok(rustix::process::set_parent_process_death_signal(Some(
294 rustix::process::Signal::TERM,
295 ))?)
296 });
297 }
298 c
299 });
300 c.arg("experimental-image-proxy");
301 if debug {
302 c.arg("--debug");
303 }
304 let auth_option_count = [
305 config.authfile.is_some(),
306 config.auth_data.is_some(),
307 config.auth_anonymous,
308 ]
309 .into_iter()
310 .filter(|&x| x)
311 .count();
312 if auth_option_count > 1 {
313 return Err(Error::Configuration(
315 "Conflicting authentication options".into(),
316 ));
317 }
318 if let Some(authfile) = config.authfile {
319 c.arg("--authfile");
320 c.arg(authfile);
321 } else if let Some(mut auth_data) = config.auth_data.map(std::io::BufReader::new) {
322 let target_fd = alloc_fd()?;
326 let tmpd = &cap_std::fs::Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
327 let mut tempfile =
328 cap_tempfile::TempFile::new_anonymous(tmpd).map(std::io::BufWriter::new)?;
329 std::io::copy(&mut auth_data, &mut tempfile)?;
330 let tempfile = tempfile
331 .into_inner()
332 .map_err(|e| e.into_error())?
333 .into_std();
334 let fd = std::sync::Arc::new(tempfile.into());
335 c.take_fd_n(fd, target_fd);
336 c.arg("--authfile");
337 c.arg(format!("/proc/self/fd/{target_fd}"));
338 } else if config.auth_anonymous {
339 c.arg("--no-creds");
340 }
341
342 if let Some(certificate_directory) = config.certificate_directory {
343 c.arg("--cert-dir");
344 c.arg(certificate_directory);
345 }
346
347 if let Some(decryption_keys) = config.decryption_keys {
348 for decryption_key in &decryption_keys {
349 c.arg("--decryption-key");
350 c.arg(decryption_key);
351 }
352 }
353
354 if config.insecure_skip_tls_verification.unwrap_or_default() {
355 c.arg("--tls-verify=false");
356 }
357
358 if config.insecure_policy.unwrap_or_default() {
359 c.arg("--insecure-policy");
360 }
361
362 if let Some(user_agent_prefix) = config.user_agent_prefix {
364 if supports_user_agent_prefix() {
365 c.arg("--user-agent-prefix");
366 c.arg(user_agent_prefix);
367 }
368 }
369
370 c.stdout(Stdio::null());
371 if !debug {
372 c.stderr(Stdio::piped());
373 }
374 Ok(c)
375 }
376}
377
378#[derive(Debug, serde::Deserialize)]
380pub struct ConvertedLayerInfo {
381 pub digest: Digest,
384
385 pub size: u64,
387
388 pub media_type: oci_spec::image::MediaType,
390}
391
392#[derive(Debug)]
394struct FinishPipe {
395 pipeid: PipeId,
396 datafd: OwnedFd,
397}
398
399#[derive(Debug)]
401struct DualFds {
402 datafd: OwnedFd,
403 errfd: OwnedFd,
404}
405
406trait FromReplyFds: Send + 'static
408where
409 Self: Sized,
410{
411 fn from_reply(
412 iterable: impl IntoIterator<IntoIter: FusedIterator, Item = OwnedFd>,
413 pipeid: u32,
414 ) -> Result<Self>;
415}
416
417impl FromReplyFds for () {
419 fn from_reply(fds: impl IntoIterator<Item = OwnedFd>, pipeid: u32) -> Result<Self> {
420 if fds.into_iter().next().is_some() {
421 return Err(Error::Other("expected no fds".into()));
422 }
423 if pipeid != 0 {
424 return Err(Error::Other("unexpected pipeid".into()));
425 }
426 Ok(())
427 }
428}
429
430impl FromReplyFds for FinishPipe {
432 fn from_reply(fds: impl IntoIterator<Item = OwnedFd>, pipeid: u32) -> Result<Self> {
433 let Some(pipeid) = PipeId::try_new(pipeid) else {
434 return Err(Error::Other("Expected pipeid for FinishPipe".into()));
435 };
436 let datafd = fds
437 .into_iter()
438 .exactly_one()
439 .map_err(|_| Error::Other("Expected exactly one fd for FinishPipe".into()))?;
440 Ok(Self { pipeid, datafd })
441 }
442}
443
444impl FromReplyFds for DualFds {
446 fn from_reply(fds: impl IntoIterator<Item = OwnedFd>, pipeid: u32) -> Result<Self> {
447 if pipeid != 0 {
448 return Err(Error::Other("Unexpected pipeid with DualFds".into()));
449 }
450 let [datafd, errfd] = fds
451 .into_iter()
452 .collect_array()
453 .ok_or_else(|| Error::Other("Expected two fds for DualFds".into()))?;
454 Ok(Self { datafd, errfd })
455 }
456}
457
458impl ImageProxy {
459 pub async fn new() -> Result<Self> {
461 Self::new_with_config(Default::default()).await
462 }
463
464 #[instrument]
466 pub async fn new_with_config(config: ImageProxyConfig) -> Result<Self> {
467 let mut c = Command::try_from(config)?;
468 let (mysock, theirsock) = rustix::net::socketpair(
469 rustix::net::AddressFamily::UNIX,
470 rustix::net::SocketType::SEQPACKET,
471 rustix::net::SocketFlags::CLOEXEC,
472 None,
473 )?;
474 c.stdin(Stdio::from(theirsock));
475 let child = match c.spawn() {
476 Ok(c) => c,
477 Err(error) => return Err(Error::SkopeoSpawnError(error)),
478 };
479 tracing::debug!("Spawned skopeo pid={:?}", child.id());
480 let childwait = tokio::task::spawn_blocking(move || child.wait_with_output());
485 let sockfd = Arc::new(Mutex::new(mysock));
486
487 let mut r = Self {
488 sockfd,
489 childwait: Arc::new(AsyncMutex::new(Box::pin(childwait))),
490 protover: semver::Version::new(0, 0, 0),
491 };
492
493 let protover: String = r.impl_request("Initialize", [(); 0]).await?;
495 tracing::debug!("Remote protocol version: {protover}");
496 let protover = semver::Version::parse(protover.as_str())?;
497 let supported = base_proto_version();
499 if !supported.matches(&protover) {
500 return Err(Error::ProxyTooOld {
501 requested_version: protover.to_string().into(),
502 found_version: supported.to_string().into(),
503 });
504 }
505 r.protover = protover;
506
507 Ok(r)
508 }
509
510 async fn impl_request_raw<T: serde::de::DeserializeOwned + Send + 'static, F: FromReplyFds>(
512 sockfd: Arc<Mutex<OwnedFd>>,
513 req: Request,
514 ) -> Result<(T, F)> {
515 tracing::trace!("sending request {}", req.method.as_str());
516 let r = tokio::task::spawn_blocking(move || {
518 let sockfd = sockfd.lock().unwrap();
519 let sendbuf = serde_json::to_vec(&req)?;
520 let sockfd = &*sockfd;
521 rustix::net::send(sockfd, &sendbuf, rustix::net::SendFlags::empty())?;
522 drop(sendbuf);
523 let mut buf = [0u8; MAX_MSG_SIZE];
524 let mut cmsg_space: Vec<std::mem::MaybeUninit<u8>> =
525 vec![std::mem::MaybeUninit::uninit(); rustix::cmsg_space!(ScmRights(1))];
526 let mut cmsg_buffer = rustix::net::RecvAncillaryBuffer::new(cmsg_space.as_mut_slice());
527 let iov = std::io::IoSliceMut::new(buf.as_mut());
528 let mut iov = [iov];
529 let nread = rustix::net::recvmsg(
530 sockfd,
531 &mut iov,
532 &mut cmsg_buffer,
533 rustix::net::RecvFlags::CMSG_CLOEXEC,
534 )?
535 .bytes;
536 let fdret = cmsg_buffer
537 .drain()
538 .filter_map(|m| match m {
539 rustix::net::RecvAncillaryMessage::ScmRights(f) => Some(f),
540 _ => None,
541 })
542 .flatten();
543 let buf = &buf[..nread];
544 let reply: Reply = serde_json::from_slice(buf)?;
545 if !reply.success {
546 return Err(Error::RequestInitiationFailure {
547 method: req.method.clone().into(),
548 error: reply.error.into(),
549 });
550 }
551 let fds = FromReplyFds::from_reply(fdret, reply.pipeid)?;
552 Ok((serde_json::from_value(reply.value)?, fds))
553 })
554 .await
555 .map_err(|e| Error::Other(e.to_string().into()))??;
556 tracing::trace!("completed request");
557 Ok(r)
558 }
559
560 #[instrument(skip(args))]
563 async fn impl_request_with_fds<
564 T: serde::de::DeserializeOwned + Send + 'static,
565 F: FromReplyFds,
566 >(
567 &self,
568 method: &str,
569 args: impl IntoIterator<Item = impl Into<serde_json::Value>>,
570 ) -> Result<(T, F)> {
571 let req = Self::impl_request_raw(Arc::clone(&self.sockfd), Request::new(method, args));
572 let mut childwait = self.childwait.lock().await;
573 tokio::select! {
574 r = req => { r }
575 r = childwait.as_mut() => {
576 let r = r.map_err(|e| Error::Other(e.to_string().into()))??;
577 let stderr = String::from_utf8_lossy(&r.stderr);
578 Err(Error::Other(format!("skopeo proxy unexpectedly exited during request method {}: {}\n{}", method, r.status, stderr).into()))
579 }
580 }
581 }
582
583 async fn impl_request<T: serde::de::DeserializeOwned + Send + 'static>(
585 &self,
586 method: &str,
587 args: impl IntoIterator<Item = impl Into<serde_json::Value>>,
588 ) -> Result<T> {
589 let (r, ()) = self.impl_request_with_fds(method, args).await?;
590 Ok(r)
591 }
592
593 #[instrument]
594 async fn finish_pipe(&self, pipeid: PipeId) -> Result<()> {
595 tracing::debug!("closing pipe");
596 let (r, ()) = self
597 .impl_request_with_fds("FinishPipe", [pipeid.0.get()])
598 .await?;
599 Ok(r)
600 }
601
602 #[instrument]
604 pub async fn open_image(&self, imgref: &str) -> Result<OpenedImage> {
605 tracing::debug!("opening image");
606 let imgid = self.impl_request("OpenImage", [imgref]).await?;
607 Ok(OpenedImage(imgid))
608 }
609
610 #[instrument]
612 pub async fn open_image_ref(&self, imgref: &ImageReference) -> Result<OpenedImage> {
613 self.open_image(&imgref.to_string()).await
614 }
615
616 #[instrument]
619 pub async fn open_image_optional(&self, imgref: &str) -> Result<Option<OpenedImage>> {
620 tracing::debug!("opening image");
621 let imgid = self.impl_request("OpenImageOptional", [imgref]).await?;
622 if imgid == 0 {
623 Ok(None)
624 } else {
625 Ok(Some(OpenedImage(imgid)))
626 }
627 }
628
629 #[instrument]
632 pub async fn open_image_optional_ref(
633 &self,
634 imgref: &ImageReference,
635 ) -> Result<Option<OpenedImage>> {
636 self.open_image_optional(&imgref.to_string()).await
637 }
638
639 #[instrument]
640 pub async fn close_image(&self, img: &OpenedImage) -> Result<()> {
641 self.impl_request("CloseImage", [img.0]).await
642 }
643
644 async fn read_finish_pipe(&self, pipe: FinishPipe) -> Result<Vec<u8>> {
645 let fd = tokio::fs::File::from_std(std::fs::File::from(pipe.datafd));
646 let mut fd = tokio::io::BufReader::new(fd);
647 let mut r = Vec::new();
648 let reader = fd.read_to_end(&mut r);
649 let (nbytes, finish) = tokio::join!(reader, self.finish_pipe(pipe.pipeid));
650 finish?;
651 assert_eq!(nbytes?, r.len());
652 Ok(r)
653 }
654
655 pub async fn fetch_manifest_raw_oci(&self, img: &OpenedImage) -> Result<(String, Vec<u8>)> {
659 let (digest, pipefd) = self.impl_request_with_fds("GetManifest", [img.0]).await?;
660 Ok((digest, self.read_finish_pipe(pipefd).await?))
661 }
662
663 pub async fn fetch_manifest(
666 &self,
667 img: &OpenedImage,
668 ) -> Result<(String, oci_spec::image::ImageManifest)> {
669 let (digest, raw) = self.fetch_manifest_raw_oci(img).await?;
670 let manifest = serde_json::from_slice(&raw)?;
671 Ok((digest, manifest))
672 }
673
674 pub async fn fetch_config_raw(&self, img: &OpenedImage) -> Result<Vec<u8>> {
677 let ((), pipe) = self.impl_request_with_fds("GetFullConfig", [img.0]).await?;
678 self.read_finish_pipe(pipe).await
679 }
680
681 pub async fn fetch_config(
684 &self,
685 img: &OpenedImage,
686 ) -> Result<oci_spec::image::ImageConfiguration> {
687 let raw = self.fetch_config_raw(img).await?;
688 serde_json::from_slice(&raw).map_err(Into::into)
689 }
690
691 #[instrument]
701 pub async fn get_blob(
702 &self,
703 img: &OpenedImage,
704 digest: &Digest,
705 size: u64,
706 ) -> Result<(
707 impl AsyncBufRead + Send + Unpin,
708 impl Future<Output = Result<()>> + Unpin + '_,
709 )> {
710 tracing::debug!("fetching blob");
713 let args: Vec<serde_json::Value> =
714 vec![img.0.into(), digest.to_string().into(), size.into()];
715 let (_bloblen, pipe): (serde_json::Number, FinishPipe) =
720 self.impl_request_with_fds("GetBlob", args).await?;
721 let fd = tokio::fs::File::from_std(std::fs::File::from(pipe.datafd));
722 let fd = tokio::io::BufReader::new(fd);
723 let finish = Box::pin(self.finish_pipe(pipe.pipeid));
724 Ok((fd, finish))
725 }
726
727 async fn read_blob_error(fd: OwnedFd) -> std::result::Result<(), GetBlobError> {
728 let fd = tokio::fs::File::from_std(std::fs::File::from(fd));
729 let mut errfd = tokio::io::BufReader::new(fd);
730 let mut buf = Vec::new();
731 errfd
732 .read_to_end(&mut buf)
733 .await
734 .map_err(|e| GetBlobError::Other(e.to_string().into_boxed_str()))?;
735 if buf.is_empty() {
736 return Ok(());
737 }
738 #[derive(Deserialize)]
739 struct RemoteError {
740 code: String,
741 message: String,
742 }
743 let e: RemoteError = serde_json::from_slice(&buf)
744 .map_err(|e| GetBlobError::Other(e.to_string().into_boxed_str()))?;
745 match e.code.as_str() {
746 "EPIPE" => Ok(()),
748 "retryable" => Err(GetBlobError::Retryable(e.message.into_boxed_str())),
749 _ => Err(GetBlobError::Other(e.message.into_boxed_str())),
750 }
751 }
752
753 #[instrument]
757 pub async fn get_raw_blob(
758 &self,
759 img: &OpenedImage,
760 digest: &Digest,
761 ) -> Result<(
762 Option<u64>,
763 tokio::fs::File,
764 impl Future<Output = std::result::Result<(), GetBlobError>> + Unpin + '_,
765 )> {
766 tracing::debug!("fetching blob");
767 let args: Vec<serde_json::Value> = vec![img.0.into(), digest.to_string().into()];
768 let (bloblen, fds): (i64, DualFds) = self.impl_request_with_fds("GetRawBlob", args).await?;
769 let bloblen = u64::try_from(bloblen).ok();
771 let fd = tokio::fs::File::from_std(std::fs::File::from(fds.datafd));
772 let err = Self::read_blob_error(fds.errfd).boxed();
773 Ok((bloblen, fd, err))
774 }
775
776 #[instrument]
778 pub async fn get_descriptor(
779 &self,
780 img: &OpenedImage,
781 descriptor: &Descriptor,
782 ) -> Result<(
783 impl AsyncBufRead + Send + Unpin,
784 impl Future<Output = Result<()>> + Unpin + '_,
785 )> {
786 self.get_blob(img, descriptor.digest(), descriptor.size())
787 .await
788 }
789
790 #[instrument]
792 pub async fn get_layer_info(
793 &self,
794 img: &OpenedImage,
795 ) -> Result<Option<Vec<ConvertedLayerInfo>>> {
796 tracing::debug!("Getting layer info");
797 if layer_info_piped_proto_version().matches(&self.protover) {
798 let ((), pipe) = self
799 .impl_request_with_fds("GetLayerInfoPiped", [img.0])
800 .await?;
801 let buf = self.read_finish_pipe(pipe).await?;
802 return Ok(Some(serde_json::from_slice(&buf)?));
803 }
804 if !layer_info_proto_version().matches(&self.protover) {
805 return Ok(None);
806 }
807 let layers = self.impl_request("GetLayerInfo", [img.0]).await?;
808 Ok(Some(layers))
809 }
810
811 #[instrument]
813 pub async fn finalize(self) -> Result<()> {
814 let _ = &self;
815 let req = Request::new_bare("Shutdown");
816 let sendbuf = serde_json::to_vec(&req)?;
817 let sockfd = Arc::try_unwrap(self.sockfd).unwrap().into_inner().unwrap();
819 rustix::net::send(sockfd, &sendbuf, rustix::net::SendFlags::empty())?;
820 drop(sendbuf);
821 tracing::debug!("sent shutdown request");
822 let mut childwait = self.childwait.lock().await;
823 let output = childwait
824 .as_mut()
825 .await
826 .map_err(|e| Error::new_other(e.to_string()))??;
827 if !output.status.success() {
828 let stderr = String::from_utf8_lossy(&output.stderr);
829 return Err(Error::RequestReturned(
830 format!("proxy failed: {}\n{}", output.status, stderr).into(),
831 ));
832 }
833 tracing::debug!("proxy exited successfully");
834 Ok(())
835 }
836}
837
838#[cfg(test)]
839mod tests {
840 use std::io::{BufWriter, Seek, Write};
841 use std::os::fd::{AsRawFd, OwnedFd};
842
843 use super::*;
844 use cap_std_ext::cap_std::fs::Dir;
845 use rustix::fs::{memfd_create, MemfdFlags};
846
847 fn check_skopeo() -> bool {
849 static HAVE_SKOPEO: OnceLock<bool> = OnceLock::new();
850 *HAVE_SKOPEO.get_or_init(|| {
851 Command::new("skopeo")
852 .arg("--help")
853 .stdout(Stdio::null())
854 .stderr(Stdio::null())
855 .status()
856 .is_ok()
857 })
858 }
859
860 fn validate(c: Command, contains: &[&str], not_contains: &[&str]) {
861 let d = format!("{:?}", c);
865 for c in contains {
866 assert!(d.contains(c), "{} missing {}", d, c);
867 }
868 for c in not_contains {
869 assert!(!d.contains(c), "{} should not contain {}", d, c);
870 }
871 }
872
873 #[test]
874 fn proxy_configs() {
875 let tmpd = &cap_tempfile::tempdir(cap_std::ambient_authority()).unwrap();
876
877 let c = Command::try_from(ImageProxyConfig::default()).unwrap();
878 validate(
879 c,
880 &["experimental-image-proxy"],
881 &["--no-creds", "--tls-verify", "--authfile"],
882 );
883
884 let c = Command::try_from(ImageProxyConfig {
885 authfile: Some(PathBuf::from("/path/to/authfile")),
886 ..Default::default()
887 })
888 .unwrap();
889 validate(c, &[r"--authfile", "/path/to/authfile"], &[]);
890
891 let decryption_key_path = "/path/to/decryption_key";
892 let c = Command::try_from(ImageProxyConfig {
893 decryption_keys: Some(vec![decryption_key_path.to_string()]),
894 ..Default::default()
895 })
896 .unwrap();
897 validate(c, &[r"--decryption-key", "/path/to/decryption_key"], &[]);
898
899 let c = Command::try_from(ImageProxyConfig {
900 certificate_directory: Some(PathBuf::from("/path/to/certs")),
901 ..Default::default()
902 })
903 .unwrap();
904 validate(c, &[r"--cert-dir", "/path/to/certs"], &[]);
905
906 let c = Command::try_from(ImageProxyConfig {
907 insecure_skip_tls_verification: Some(true),
908 ..Default::default()
909 })
910 .unwrap();
911 validate(c, &[r"--tls-verify=false"], &[]);
912
913 let c = Command::try_from(ImageProxyConfig {
914 insecure_policy: Some(true),
915 ..Default::default()
916 })
917 .unwrap();
918 validate(c, &[r"--insecure-policy"], &[]);
919
920 let mut tmpf = cap_tempfile::TempFile::new_anonymous(tmpd).unwrap();
921 tmpf.write_all(r#"{ "auths": {} "#.as_bytes()).unwrap();
922 tmpf.seek(std::io::SeekFrom::Start(0)).unwrap();
923 let c = Command::try_from(ImageProxyConfig {
924 auth_data: Some(tmpf.into_std()),
925 ..Default::default()
926 })
927 .unwrap();
928 validate(c, &["--authfile", "/proc/self/fd/100"], &[]);
929
930 let c = Command::try_from(ImageProxyConfig {
932 user_agent_prefix: Some("bootc/1.0".to_string()),
933 ..Default::default()
934 })
935 .unwrap();
936 if supports_user_agent_prefix() {
937 validate(c, &["--user-agent-prefix", "bootc/1.0"], &[]);
938 } else {
939 validate(c, &[], &["--user-agent-prefix"]);
940 }
941 }
942
943 #[tokio::test]
944 async fn skopeo_not_found() {
945 let mut config = ImageProxyConfig {
946 ..ImageProxyConfig::default()
947 };
948 config.skopeo_cmd = Some(Command::new("no-skopeo"));
949
950 match ImageProxy::new_with_config(config).await {
951 Ok(_) => panic!("Expected an error"),
952 Err(ref e @ Error::SkopeoSpawnError(ref inner)) => {
953 assert_eq!(inner.kind(), std::io::ErrorKind::NotFound);
954 assert!(e
956 .to_string()
957 .contains("skopeo spawn error: No such file or directory"));
958 }
959 Err(e) => panic!("Unexpected error {e}"),
960 }
961 }
962
963 #[tokio::test]
964 async fn test_proxy_send_sync() {
965 fn assert_send_sync(_x: impl Send + Sync) {}
966
967 let Ok(proxy) = ImageProxy::new().await else {
968 return;
970 };
971 assert_send_sync(&proxy);
972 assert_send_sync(proxy);
973
974 let opened = OpenedImage(0);
975 assert_send_sync(&opened);
976 assert_send_sync(opened);
977 }
978
979 fn generate_err_fd(v: serde_json::Value) -> Result<OwnedFd> {
980 let tmp = Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
981 let mut tf = cap_tempfile::TempFile::new_anonymous(&tmp).map(BufWriter::new)?;
982 serde_json::to_writer(&mut tf, &v)?;
983 let mut tf = tf.into_inner().map_err(|e| e.into_error())?;
984 tf.seek(std::io::SeekFrom::Start(0))?;
985 let r = tf.into_std().into();
986 Ok(r)
987 }
988
989 #[tokio::test]
990 async fn test_read_blob_error_retryable() -> Result<()> {
991 let retryable = serde_json::json!({
992 "code": "retryable",
993 "message": "foo",
994 });
995 let retryable = generate_err_fd(retryable)?;
996 let err = ImageProxy::read_blob_error(retryable).boxed();
997 let e = err.await.unwrap_err();
998 match e {
999 GetBlobError::Retryable(s) => assert_eq!(s.as_ref(), "foo"),
1000 _ => panic!("Unexpected error {e:?}"),
1001 }
1002 Ok(())
1003 }
1004
1005 #[tokio::test]
1006 async fn test_read_blob_error_none() -> Result<()> {
1007 let tmp = Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
1008 let tf = cap_tempfile::TempFile::new_anonymous(&tmp)?.into_std();
1009 let err = ImageProxy::read_blob_error(tf.into()).boxed();
1010 err.await.unwrap();
1011 Ok(())
1012 }
1013
1014 #[tokio::test]
1015 async fn test_read_blob_error_other() -> Result<()> {
1016 let other = serde_json::json!({
1017 "code": "other",
1018 "message": "bar",
1019 });
1020 let other = generate_err_fd(other)?;
1021 let err = ImageProxy::read_blob_error(other).boxed();
1022 let e = err.await.unwrap_err();
1023 match e {
1024 GetBlobError::Other(s) => assert_eq!(s.as_ref(), "bar"),
1025 _ => panic!("Unexpected error {e:?}"),
1026 }
1027 Ok(())
1028 }
1029
1030 #[tokio::test]
1031 async fn test_read_blob_error_epipe() -> Result<()> {
1032 let epipe = serde_json::json!({
1033 "code": "EPIPE",
1034 "message": "baz",
1035 });
1036 let epipe = generate_err_fd(epipe)?;
1037 let err = ImageProxy::read_blob_error(epipe).boxed();
1038 err.await.unwrap();
1039 Ok(())
1040 }
1041
1042 fn create_dummy_fd() -> OwnedFd {
1044 memfd_create(c"test-fd", MemfdFlags::CLOEXEC).unwrap()
1045 }
1046
1047 #[test]
1048 fn test_new_from_raw_values_finish_pipe() {
1049 let datafd = create_dummy_fd();
1050 let raw_datafd_val = datafd.as_raw_fd();
1052 let fds = vec![datafd];
1053 let v = FinishPipe::from_reply(fds, 1).unwrap();
1054 assert_eq!(v.pipeid.0.get(), 1);
1055 assert_eq!(v.datafd.as_raw_fd(), raw_datafd_val);
1056 }
1057
1058 #[test]
1059 fn test_new_from_raw_values_dual_fds() {
1060 let datafd = create_dummy_fd();
1061 let errfd = create_dummy_fd();
1062 let raw_datafd_val = datafd.as_raw_fd();
1063 let raw_errfd_val = errfd.as_raw_fd();
1064 let fds = vec![datafd, errfd];
1065 let v = DualFds::from_reply(fds, 0).unwrap();
1066 assert_eq!(v.datafd.as_raw_fd(), raw_datafd_val);
1067 assert_eq!(v.errfd.as_raw_fd(), raw_errfd_val);
1068 }
1069
1070 #[test]
1071 fn test_new_from_raw_values_error_too_many_fds() {
1072 let fds = vec![create_dummy_fd(), create_dummy_fd(), create_dummy_fd()];
1073 match DualFds::from_reply(fds, 0) {
1074 Ok(v) => unreachable!("{v:?}"),
1075 Err(Error::Other(msg)) => {
1076 assert_eq!(msg.as_ref(), "Expected two fds for DualFds")
1077 }
1078 Err(other) => unreachable!("{other}"),
1079 }
1080 }
1081
1082 #[test]
1083 fn test_new_from_raw_values_error_fd_with_zero_pipeid() {
1084 let fds = vec![create_dummy_fd()];
1085 match FinishPipe::from_reply(fds, 0) {
1086 Ok(v) => unreachable!("{v:?}"),
1087 Err(Error::Other(msg)) => {
1088 assert_eq!(msg.as_ref(), "Expected pipeid for FinishPipe")
1089 }
1090 Err(other) => unreachable!("{other}"),
1091 }
1092 }
1093
1094 #[test]
1095 fn test_new_from_raw_values_error_pipeid_with_both_fds() {
1096 let fds = vec![create_dummy_fd(), create_dummy_fd()];
1097 match DualFds::from_reply(fds, 1) {
1098 Ok(v) => unreachable!("{v:?}"),
1099 Err(Error::Other(msg)) => {
1100 assert_eq!(msg.as_ref(), "Unexpected pipeid with DualFds")
1101 }
1102 Err(other) => unreachable!("{other}"),
1103 }
1104 }
1105
1106 #[test]
1107 fn test_new_from_raw_values_error_no_fd_with_pipeid() {
1108 let fds: Vec<OwnedFd> = vec![];
1109 match FinishPipe::from_reply(fds, 1) {
1110 Ok(v) => unreachable!("{v:?}"),
1111 Err(Error::Other(msg)) => {
1112 assert_eq!(msg.as_ref(), "Expected exactly one fd for FinishPipe")
1113 }
1114 Err(other) => unreachable!("{other}"),
1115 }
1116 }
1117
1118 #[tokio::test]
1119 #[ignore = "https://github.com/coreos/rpm-ostree/issues/5442"]
1120 async fn test_open_optional() -> Result<()> {
1121 if !check_skopeo() {
1122 return Ok(());
1123 }
1124
1125 let td = tempfile::tempdir()?;
1126 let td = td.path().to_str().unwrap();
1127 let proxy = ImageProxy::new().await?;
1128 let imgpath = format!("oci-archive:{td}/some-nonexistent-image.ociarchive");
1129 let img = proxy.open_image_optional(&imgpath).await.unwrap();
1130 assert!(img.is_none());
1131
1132 Ok(())
1133 }
1134}