1use cap_std_ext::prelude::CapStdExtCommandExt;
8use cap_std_ext::{cap_std, cap_tempfile};
9use futures_util::{Future, FutureExt};
10use itertools::Itertools;
11use oci_spec::image::{Descriptor, Digest};
12use serde::{Deserialize, Serialize};
13use std::fs::File;
14use std::iter::FusedIterator;
15use std::num::NonZeroU32;
16use std::ops::Range;
17use std::os::fd::OwnedFd;
18use std::os::unix::prelude::CommandExt;
19use std::path::PathBuf;
20use std::pin::Pin;
21use std::process::{Command, Stdio};
22use std::sync::{Arc, Mutex, OnceLock};
23use thiserror::Error;
24use tokio::io::{AsyncBufRead, AsyncReadExt};
25use tokio::sync::Mutex as AsyncMutex;
26use tokio::task::JoinError;
27use tracing::instrument;
28
29#[derive(Error, Debug)]
31#[non_exhaustive]
32pub enum Error {
33 #[error("i/o error: {0}")]
34 Io(#[from] std::io::Error),
36 #[error("skopeo spawn error: {0}")]
37 SkopeoSpawnError(#[source] std::io::Error),
39 #[error("serialization error: {0}")]
40 SerDe(#[from] serde_json::Error),
42 #[error("failed to invoke method {method}: {error}")]
44 RequestInitiationFailure { method: Box<str>, error: Box<str> },
45 #[error("proxy request returned error: {0}")]
47 RequestReturned(Box<str>),
48 #[error("semantic version error: {0}")]
49 SemanticVersion(#[from] semver::Error),
50 #[error("proxy too old (requested={requested_version} found={found_version}) error")]
51 ProxyTooOld {
53 requested_version: Box<str>,
54 found_version: Box<str>,
55 },
56 #[error("configuration error: {0}")]
57 Configuration(Box<str>),
59 #[error("other error: {0}")]
60 Other(Box<str>),
62}
63
64impl Error {
65 pub(crate) fn new_other(e: impl Into<Box<str>>) -> Self {
66 Self::Other(e.into())
67 }
68}
69
70#[derive(Error, Debug)]
72#[non_exhaustive]
73pub enum GetBlobError {
74 #[error("retryable error: {0}")]
76 Retryable(Box<str>),
77 #[error("other error: {0}")]
78 Other(Box<str>),
80}
81
82impl From<rustix::io::Errno> for Error {
83 fn from(value: rustix::io::Errno) -> Self {
84 Self::Io(value.into())
85 }
86}
87
88pub type Result<T> = std::result::Result<T, Error>;
90
91pub use oci_spec;
93
94pub const RESERVED_FD_RANGE: Range<i32> = 100..200;
98
99const MAX_MSG_SIZE: usize = 32 * 1024;
102
103fn base_proto_version() -> &'static semver::VersionReq {
104 static BASE_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
106 BASE_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.3").unwrap())
107}
108
109fn layer_info_proto_version() -> &'static semver::VersionReq {
110 static LAYER_INFO_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
111 LAYER_INFO_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.5").unwrap())
112}
113
114fn layer_info_piped_proto_version() -> &'static semver::VersionReq {
115 static LAYER_INFO_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
116 LAYER_INFO_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.7").unwrap())
117}
118
119#[derive(Serialize)]
120struct Request {
121 method: String,
122 args: Vec<serde_json::Value>,
123}
124
125impl Request {
126 fn new<T, I>(method: &str, args: T) -> Self
127 where
128 T: IntoIterator<Item = I>,
129 I: Into<serde_json::Value>,
130 {
131 let args: Vec<_> = args.into_iter().map(|v| v.into()).collect();
132 Self {
133 method: method.to_string(),
134 args,
135 }
136 }
137
138 fn new_bare(method: &str) -> Self {
139 Self {
140 method: method.to_string(),
141 args: vec![],
142 }
143 }
144}
145
146#[derive(Deserialize)]
147struct Reply {
148 success: bool,
149 error: String,
150 pipeid: u32,
151 value: serde_json::Value,
152}
153
154type ChildFuture = Pin<
155 Box<
156 dyn Future<Output = std::result::Result<std::io::Result<std::process::Output>, JoinError>>
157 + Send,
158 >,
159>;
160
161pub struct ImageProxy {
163 sockfd: Arc<Mutex<OwnedFd>>,
164 childwait: Arc<AsyncMutex<ChildFuture>>,
165 protover: semver::Version,
166}
167
168impl std::fmt::Debug for ImageProxy {
169 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
170 f.debug_struct("ImageProxy").finish()
171 }
172}
173
174#[derive(Debug, PartialEq, Eq)]
176pub struct OpenedImage(u32);
177
178#[derive(Debug, PartialEq, Eq)]
179struct PipeId(NonZeroU32);
180
181impl PipeId {
182 fn try_new(pipeid: u32) -> Option<Self> {
183 Some(Self(NonZeroU32::new(pipeid)?))
184 }
185}
186
187#[derive(Debug, Default)]
189pub struct ImageProxyConfig {
190 pub authfile: Option<PathBuf>,
193
194 pub auth_data: Option<File>,
196
197 pub auth_anonymous: bool,
201
202 pub certificate_directory: Option<PathBuf>,
205
206 pub decryption_keys: Option<Vec<String>>,
209
210 pub insecure_skip_tls_verification: Option<bool>,
212
213 pub user_agent_prefix: Option<String>,
217
218 pub debug: bool,
222
223 pub skopeo_cmd: Option<Command>,
244}
245
246fn supports_user_agent_prefix() -> bool {
248 static SUPPORTS_USER_AGENT: OnceLock<bool> = OnceLock::new();
249 *SUPPORTS_USER_AGENT.get_or_init(|| {
250 Command::new("skopeo")
251 .arg("--help")
252 .stdout(Stdio::piped())
253 .stderr(Stdio::null())
254 .output()
255 .ok()
256 .and_then(|output| {
257 String::from_utf8(output.stdout)
258 .ok()
259 .map(|help| help.contains("--user-agent-prefix"))
260 })
261 .unwrap_or(false)
262 })
263}
264
265impl TryFrom<ImageProxyConfig> for Command {
266 type Error = Error;
267
268 fn try_from(config: ImageProxyConfig) -> Result<Self> {
269 let debug = config.debug || std::env::var_os("CONTAINERS_IMAGE_PROXY_DEBUG").is_some();
270 let mut allocated_fds = RESERVED_FD_RANGE.clone();
271 let mut alloc_fd = || {
272 allocated_fds.next().ok_or_else(|| {
273 Error::Other("Ran out of reserved file descriptors for child".into())
274 })
275 };
276
277 let mut c = config.skopeo_cmd.unwrap_or_else(|| {
279 let mut c = std::process::Command::new("skopeo");
280 unsafe {
281 c.pre_exec(|| {
282 Ok(rustix::process::set_parent_process_death_signal(Some(
283 rustix::process::Signal::TERM,
284 ))?)
285 });
286 }
287 c
288 });
289 c.arg("experimental-image-proxy");
290 if debug {
291 c.arg("--debug");
292 }
293 let auth_option_count = [
294 config.authfile.is_some(),
295 config.auth_data.is_some(),
296 config.auth_anonymous,
297 ]
298 .into_iter()
299 .filter(|&x| x)
300 .count();
301 if auth_option_count > 1 {
302 return Err(Error::Configuration(
304 "Conflicting authentication options".into(),
305 ));
306 }
307 if let Some(authfile) = config.authfile {
308 c.arg("--authfile");
309 c.arg(authfile);
310 } else if let Some(mut auth_data) = config.auth_data.map(std::io::BufReader::new) {
311 let target_fd = alloc_fd()?;
315 let tmpd = &cap_std::fs::Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
316 let mut tempfile =
317 cap_tempfile::TempFile::new_anonymous(tmpd).map(std::io::BufWriter::new)?;
318 std::io::copy(&mut auth_data, &mut tempfile)?;
319 let tempfile = tempfile
320 .into_inner()
321 .map_err(|e| e.into_error())?
322 .into_std();
323 let fd = std::sync::Arc::new(tempfile.into());
324 c.take_fd_n(fd, target_fd);
325 c.arg("--authfile");
326 c.arg(format!("/proc/self/fd/{target_fd}"));
327 } else if config.auth_anonymous {
328 c.arg("--no-creds");
329 }
330
331 if let Some(certificate_directory) = config.certificate_directory {
332 c.arg("--cert-dir");
333 c.arg(certificate_directory);
334 }
335
336 if let Some(decryption_keys) = config.decryption_keys {
337 for decryption_key in &decryption_keys {
338 c.arg("--decryption-key");
339 c.arg(decryption_key);
340 }
341 }
342
343 if config.insecure_skip_tls_verification.unwrap_or_default() {
344 c.arg("--tls-verify=false");
345 }
346
347 if let Some(user_agent_prefix) = config.user_agent_prefix {
349 if supports_user_agent_prefix() {
350 c.arg("--user-agent-prefix");
351 c.arg(user_agent_prefix);
352 }
353 }
354
355 c.stdout(Stdio::null());
356 if !debug {
357 c.stderr(Stdio::piped());
358 }
359 Ok(c)
360 }
361}
362
363#[derive(Debug, serde::Deserialize)]
365pub struct ConvertedLayerInfo {
366 pub digest: Digest,
369
370 pub size: u64,
372
373 pub media_type: oci_spec::image::MediaType,
375}
376
377#[derive(Debug)]
379struct FinishPipe {
380 pipeid: PipeId,
381 datafd: OwnedFd,
382}
383
384#[derive(Debug)]
386struct DualFds {
387 datafd: OwnedFd,
388 errfd: OwnedFd,
389}
390
391trait FromReplyFds: Send + 'static
393where
394 Self: Sized,
395{
396 fn from_reply(
397 iterable: impl IntoIterator<IntoIter: FusedIterator, Item = OwnedFd>,
398 pipeid: u32,
399 ) -> Result<Self>;
400}
401
402impl FromReplyFds for () {
404 fn from_reply(fds: impl IntoIterator<Item = OwnedFd>, pipeid: u32) -> Result<Self> {
405 if fds.into_iter().next().is_some() {
406 return Err(Error::Other("expected no fds".into()));
407 }
408 if pipeid != 0 {
409 return Err(Error::Other("unexpected pipeid".into()));
410 }
411 Ok(())
412 }
413}
414
415impl FromReplyFds for FinishPipe {
417 fn from_reply(fds: impl IntoIterator<Item = OwnedFd>, pipeid: u32) -> Result<Self> {
418 let Some(pipeid) = PipeId::try_new(pipeid) else {
419 return Err(Error::Other("Expected pipeid for FinishPipe".into()));
420 };
421 let datafd = fds
422 .into_iter()
423 .exactly_one()
424 .map_err(|_| Error::Other("Expected exactly one fd for FinishPipe".into()))?;
425 Ok(Self { pipeid, datafd })
426 }
427}
428
429impl FromReplyFds for DualFds {
431 fn from_reply(fds: impl IntoIterator<Item = OwnedFd>, pipeid: u32) -> Result<Self> {
432 if pipeid != 0 {
433 return Err(Error::Other("Unexpected pipeid with DualFds".into()));
434 }
435 let [datafd, errfd] = fds
436 .into_iter()
437 .collect_array()
438 .ok_or_else(|| Error::Other("Expected two fds for DualFds".into()))?;
439 Ok(Self { datafd, errfd })
440 }
441}
442
443impl ImageProxy {
444 pub async fn new() -> Result<Self> {
446 Self::new_with_config(Default::default()).await
447 }
448
449 #[instrument]
451 pub async fn new_with_config(config: ImageProxyConfig) -> Result<Self> {
452 let mut c = Command::try_from(config)?;
453 let (mysock, theirsock) = rustix::net::socketpair(
454 rustix::net::AddressFamily::UNIX,
455 rustix::net::SocketType::SEQPACKET,
456 rustix::net::SocketFlags::CLOEXEC,
457 None,
458 )?;
459 c.stdin(Stdio::from(theirsock));
460 let child = match c.spawn() {
461 Ok(c) => c,
462 Err(error) => return Err(Error::SkopeoSpawnError(error)),
463 };
464 tracing::debug!("Spawned skopeo pid={:?}", child.id());
465 let childwait = tokio::task::spawn_blocking(move || child.wait_with_output());
470 let sockfd = Arc::new(Mutex::new(mysock));
471
472 let mut r = Self {
473 sockfd,
474 childwait: Arc::new(AsyncMutex::new(Box::pin(childwait))),
475 protover: semver::Version::new(0, 0, 0),
476 };
477
478 let protover: String = r.impl_request("Initialize", [(); 0]).await?;
480 tracing::debug!("Remote protocol version: {protover}");
481 let protover = semver::Version::parse(protover.as_str())?;
482 let supported = base_proto_version();
484 if !supported.matches(&protover) {
485 return Err(Error::ProxyTooOld {
486 requested_version: protover.to_string().into(),
487 found_version: supported.to_string().into(),
488 });
489 }
490 r.protover = protover;
491
492 Ok(r)
493 }
494
495 async fn impl_request_raw<T: serde::de::DeserializeOwned + Send + 'static, F: FromReplyFds>(
497 sockfd: Arc<Mutex<OwnedFd>>,
498 req: Request,
499 ) -> Result<(T, F)> {
500 tracing::trace!("sending request {}", req.method.as_str());
501 let r = tokio::task::spawn_blocking(move || {
503 let sockfd = sockfd.lock().unwrap();
504 let sendbuf = serde_json::to_vec(&req)?;
505 let sockfd = &*sockfd;
506 rustix::net::send(sockfd, &sendbuf, rustix::net::SendFlags::empty())?;
507 drop(sendbuf);
508 let mut buf = [0u8; MAX_MSG_SIZE];
509 let mut cmsg_space: Vec<std::mem::MaybeUninit<u8>> =
510 vec![std::mem::MaybeUninit::uninit(); rustix::cmsg_space!(ScmRights(1))];
511 let mut cmsg_buffer = rustix::net::RecvAncillaryBuffer::new(cmsg_space.as_mut_slice());
512 let iov = std::io::IoSliceMut::new(buf.as_mut());
513 let mut iov = [iov];
514 let nread = rustix::net::recvmsg(
515 sockfd,
516 &mut iov,
517 &mut cmsg_buffer,
518 rustix::net::RecvFlags::CMSG_CLOEXEC,
519 )?
520 .bytes;
521 let fdret = cmsg_buffer
522 .drain()
523 .filter_map(|m| match m {
524 rustix::net::RecvAncillaryMessage::ScmRights(f) => Some(f),
525 _ => None,
526 })
527 .flatten();
528 let buf = &buf[..nread];
529 let reply: Reply = serde_json::from_slice(buf)?;
530 if !reply.success {
531 return Err(Error::RequestInitiationFailure {
532 method: req.method.clone().into(),
533 error: reply.error.into(),
534 });
535 }
536 let fds = FromReplyFds::from_reply(fdret, reply.pipeid)?;
537 Ok((serde_json::from_value(reply.value)?, fds))
538 })
539 .await
540 .map_err(|e| Error::Other(e.to_string().into()))??;
541 tracing::trace!("completed request");
542 Ok(r)
543 }
544
545 #[instrument(skip(args))]
548 async fn impl_request_with_fds<
549 T: serde::de::DeserializeOwned + Send + 'static,
550 F: FromReplyFds,
551 >(
552 &self,
553 method: &str,
554 args: impl IntoIterator<Item = impl Into<serde_json::Value>>,
555 ) -> Result<(T, F)> {
556 let req = Self::impl_request_raw(Arc::clone(&self.sockfd), Request::new(method, args));
557 let mut childwait = self.childwait.lock().await;
558 tokio::select! {
559 r = req => { r }
560 r = childwait.as_mut() => {
561 let r = r.map_err(|e| Error::Other(e.to_string().into()))??;
562 let stderr = String::from_utf8_lossy(&r.stderr);
563 Err(Error::Other(format!("skopeo proxy unexpectedly exited during request method {}: {}\n{}", method, r.status, stderr).into()))
564 }
565 }
566 }
567
568 async fn impl_request<T: serde::de::DeserializeOwned + Send + 'static>(
570 &self,
571 method: &str,
572 args: impl IntoIterator<Item = impl Into<serde_json::Value>>,
573 ) -> Result<T> {
574 let (r, ()) = self.impl_request_with_fds(method, args).await?;
575 Ok(r)
576 }
577
578 #[instrument]
579 async fn finish_pipe(&self, pipeid: PipeId) -> Result<()> {
580 tracing::debug!("closing pipe");
581 let (r, ()) = self
582 .impl_request_with_fds("FinishPipe", [pipeid.0.get()])
583 .await?;
584 Ok(r)
585 }
586
587 #[instrument]
588 pub async fn open_image(&self, imgref: &str) -> Result<OpenedImage> {
589 tracing::debug!("opening image");
590 let imgid = self.impl_request("OpenImage", [imgref]).await?;
591 Ok(OpenedImage(imgid))
592 }
593
594 #[instrument]
595 pub async fn open_image_optional(&self, imgref: &str) -> Result<Option<OpenedImage>> {
596 tracing::debug!("opening image");
597 let imgid = self.impl_request("OpenImageOptional", [imgref]).await?;
598 if imgid == 0 {
599 Ok(None)
600 } else {
601 Ok(Some(OpenedImage(imgid)))
602 }
603 }
604
605 #[instrument]
606 pub async fn close_image(&self, img: &OpenedImage) -> Result<()> {
607 self.impl_request("CloseImage", [img.0]).await
608 }
609
610 async fn read_finish_pipe(&self, pipe: FinishPipe) -> Result<Vec<u8>> {
611 let fd = tokio::fs::File::from_std(std::fs::File::from(pipe.datafd));
612 let mut fd = tokio::io::BufReader::new(fd);
613 let mut r = Vec::new();
614 let reader = fd.read_to_end(&mut r);
615 let (nbytes, finish) = tokio::join!(reader, self.finish_pipe(pipe.pipeid));
616 finish?;
617 assert_eq!(nbytes?, r.len());
618 Ok(r)
619 }
620
621 pub async fn fetch_manifest_raw_oci(&self, img: &OpenedImage) -> Result<(String, Vec<u8>)> {
625 let (digest, pipefd) = self.impl_request_with_fds("GetManifest", [img.0]).await?;
626 Ok((digest, self.read_finish_pipe(pipefd).await?))
627 }
628
629 pub async fn fetch_manifest(
632 &self,
633 img: &OpenedImage,
634 ) -> Result<(String, oci_spec::image::ImageManifest)> {
635 let (digest, raw) = self.fetch_manifest_raw_oci(img).await?;
636 let manifest = serde_json::from_slice(&raw)?;
637 Ok((digest, manifest))
638 }
639
640 pub async fn fetch_config_raw(&self, img: &OpenedImage) -> Result<Vec<u8>> {
643 let ((), pipe) = self.impl_request_with_fds("GetFullConfig", [img.0]).await?;
644 self.read_finish_pipe(pipe).await
645 }
646
647 pub async fn fetch_config(
650 &self,
651 img: &OpenedImage,
652 ) -> Result<oci_spec::image::ImageConfiguration> {
653 let raw = self.fetch_config_raw(img).await?;
654 serde_json::from_slice(&raw).map_err(Into::into)
655 }
656
657 #[instrument]
667 pub async fn get_blob(
668 &self,
669 img: &OpenedImage,
670 digest: &Digest,
671 size: u64,
672 ) -> Result<(
673 impl AsyncBufRead + Send + Unpin,
674 impl Future<Output = Result<()>> + Unpin + '_,
675 )> {
676 tracing::debug!("fetching blob");
679 let args: Vec<serde_json::Value> =
680 vec![img.0.into(), digest.to_string().into(), size.into()];
681 let (_bloblen, pipe): (serde_json::Number, FinishPipe) =
686 self.impl_request_with_fds("GetBlob", args).await?;
687 let fd = tokio::fs::File::from_std(std::fs::File::from(pipe.datafd));
688 let fd = tokio::io::BufReader::new(fd);
689 let finish = Box::pin(self.finish_pipe(pipe.pipeid));
690 Ok((fd, finish))
691 }
692
693 async fn read_blob_error(fd: OwnedFd) -> std::result::Result<(), GetBlobError> {
694 let fd = tokio::fs::File::from_std(std::fs::File::from(fd));
695 let mut errfd = tokio::io::BufReader::new(fd);
696 let mut buf = Vec::new();
697 errfd
698 .read_to_end(&mut buf)
699 .await
700 .map_err(|e| GetBlobError::Other(e.to_string().into_boxed_str()))?;
701 if buf.is_empty() {
702 return Ok(());
703 }
704 #[derive(Deserialize)]
705 struct RemoteError {
706 code: String,
707 message: String,
708 }
709 let e: RemoteError = serde_json::from_slice(&buf)
710 .map_err(|e| GetBlobError::Other(e.to_string().into_boxed_str()))?;
711 match e.code.as_str() {
712 "EPIPE" => Ok(()),
714 "retryable" => Err(GetBlobError::Retryable(e.message.into_boxed_str())),
715 _ => Err(GetBlobError::Other(e.message.into_boxed_str())),
716 }
717 }
718
719 #[instrument]
723 pub async fn get_raw_blob(
724 &self,
725 img: &OpenedImage,
726 digest: &Digest,
727 ) -> Result<(
728 Option<u64>,
729 tokio::fs::File,
730 impl Future<Output = std::result::Result<(), GetBlobError>> + Unpin + '_,
731 )> {
732 tracing::debug!("fetching blob");
733 let args: Vec<serde_json::Value> = vec![img.0.into(), digest.to_string().into()];
734 let (bloblen, fds): (i64, DualFds) = self.impl_request_with_fds("GetRawBlob", args).await?;
735 let bloblen = u64::try_from(bloblen).ok();
737 let fd = tokio::fs::File::from_std(std::fs::File::from(fds.datafd));
738 let err = Self::read_blob_error(fds.errfd).boxed();
739 Ok((bloblen, fd, err))
740 }
741
742 #[instrument]
744 pub async fn get_descriptor(
745 &self,
746 img: &OpenedImage,
747 descriptor: &Descriptor,
748 ) -> Result<(
749 impl AsyncBufRead + Send + Unpin,
750 impl Future<Output = Result<()>> + Unpin + '_,
751 )> {
752 self.get_blob(img, descriptor.digest(), descriptor.size())
753 .await
754 }
755
756 #[instrument]
758 pub async fn get_layer_info(
759 &self,
760 img: &OpenedImage,
761 ) -> Result<Option<Vec<ConvertedLayerInfo>>> {
762 tracing::debug!("Getting layer info");
763 if layer_info_piped_proto_version().matches(&self.protover) {
764 let ((), pipe) = self
765 .impl_request_with_fds("GetLayerInfoPiped", [img.0])
766 .await?;
767 let buf = self.read_finish_pipe(pipe).await?;
768 return Ok(Some(serde_json::from_slice(&buf)?));
769 }
770 if !layer_info_proto_version().matches(&self.protover) {
771 return Ok(None);
772 }
773 let layers = self.impl_request("GetLayerInfo", [img.0]).await?;
774 Ok(Some(layers))
775 }
776
777 #[instrument]
779 pub async fn finalize(self) -> Result<()> {
780 let _ = &self;
781 let req = Request::new_bare("Shutdown");
782 let sendbuf = serde_json::to_vec(&req)?;
783 let sockfd = Arc::try_unwrap(self.sockfd).unwrap().into_inner().unwrap();
785 rustix::net::send(sockfd, &sendbuf, rustix::net::SendFlags::empty())?;
786 drop(sendbuf);
787 tracing::debug!("sent shutdown request");
788 let mut childwait = self.childwait.lock().await;
789 let output = childwait
790 .as_mut()
791 .await
792 .map_err(|e| Error::new_other(e.to_string()))??;
793 if !output.status.success() {
794 let stderr = String::from_utf8_lossy(&output.stderr);
795 return Err(Error::RequestReturned(
796 format!("proxy failed: {}\n{}", output.status, stderr).into(),
797 ));
798 }
799 tracing::debug!("proxy exited successfully");
800 Ok(())
801 }
802}
803
804#[cfg(test)]
805mod tests {
806 use std::io::{BufWriter, Seek, Write};
807 use std::os::fd::{AsRawFd, OwnedFd};
808
809 use super::*;
810 use cap_std_ext::cap_std::fs::Dir;
811 use rustix::fs::{memfd_create, MemfdFlags};
812
813 fn check_skopeo() -> bool {
815 static HAVE_SKOPEO: OnceLock<bool> = OnceLock::new();
816 *HAVE_SKOPEO.get_or_init(|| {
817 Command::new("skopeo")
818 .arg("--help")
819 .stdout(Stdio::null())
820 .stderr(Stdio::null())
821 .status()
822 .is_ok()
823 })
824 }
825
826 fn validate(c: Command, contains: &[&str], not_contains: &[&str]) {
827 let d = format!("{:?}", c);
831 for c in contains {
832 assert!(d.contains(c), "{} missing {}", d, c);
833 }
834 for c in not_contains {
835 assert!(!d.contains(c), "{} should not contain {}", d, c);
836 }
837 }
838
839 #[test]
840 fn proxy_configs() {
841 let tmpd = &cap_tempfile::tempdir(cap_std::ambient_authority()).unwrap();
842
843 let c = Command::try_from(ImageProxyConfig::default()).unwrap();
844 validate(
845 c,
846 &["experimental-image-proxy"],
847 &["--no-creds", "--tls-verify", "--authfile"],
848 );
849
850 let c = Command::try_from(ImageProxyConfig {
851 authfile: Some(PathBuf::from("/path/to/authfile")),
852 ..Default::default()
853 })
854 .unwrap();
855 validate(c, &[r"--authfile", "/path/to/authfile"], &[]);
856
857 let decryption_key_path = "/path/to/decryption_key";
858 let c = Command::try_from(ImageProxyConfig {
859 decryption_keys: Some(vec![decryption_key_path.to_string()]),
860 ..Default::default()
861 })
862 .unwrap();
863 validate(c, &[r"--decryption-key", "/path/to/decryption_key"], &[]);
864
865 let c = Command::try_from(ImageProxyConfig {
866 certificate_directory: Some(PathBuf::from("/path/to/certs")),
867 ..Default::default()
868 })
869 .unwrap();
870 validate(c, &[r"--cert-dir", "/path/to/certs"], &[]);
871
872 let c = Command::try_from(ImageProxyConfig {
873 insecure_skip_tls_verification: Some(true),
874 ..Default::default()
875 })
876 .unwrap();
877 validate(c, &[r"--tls-verify=false"], &[]);
878
879 let mut tmpf = cap_tempfile::TempFile::new_anonymous(tmpd).unwrap();
880 tmpf.write_all(r#"{ "auths": {} "#.as_bytes()).unwrap();
881 tmpf.seek(std::io::SeekFrom::Start(0)).unwrap();
882 let c = Command::try_from(ImageProxyConfig {
883 auth_data: Some(tmpf.into_std()),
884 ..Default::default()
885 })
886 .unwrap();
887 validate(c, &["--authfile", "/proc/self/fd/100"], &[]);
888
889 let c = Command::try_from(ImageProxyConfig {
891 user_agent_prefix: Some("bootc/1.0".to_string()),
892 ..Default::default()
893 })
894 .unwrap();
895 if supports_user_agent_prefix() {
896 validate(c, &["--user-agent-prefix", "bootc/1.0"], &[]);
897 } else {
898 validate(c, &[], &["--user-agent-prefix"]);
899 }
900 }
901
902 #[tokio::test]
903 async fn skopeo_not_found() {
904 let mut config = ImageProxyConfig {
905 ..ImageProxyConfig::default()
906 };
907 config.skopeo_cmd = Some(Command::new("no-skopeo"));
908
909 match ImageProxy::new_with_config(config).await {
910 Ok(_) => panic!("Expected an error"),
911 Err(ref e @ Error::SkopeoSpawnError(ref inner)) => {
912 assert_eq!(inner.kind(), std::io::ErrorKind::NotFound);
913 assert!(e
915 .to_string()
916 .contains("skopeo spawn error: No such file or directory"));
917 }
918 Err(e) => panic!("Unexpected error {e}"),
919 }
920 }
921
922 #[tokio::test]
923 async fn test_proxy_send_sync() {
924 fn assert_send_sync(_x: impl Send + Sync) {}
925
926 let Ok(proxy) = ImageProxy::new().await else {
927 return;
929 };
930 assert_send_sync(&proxy);
931 assert_send_sync(proxy);
932
933 let opened = OpenedImage(0);
934 assert_send_sync(&opened);
935 assert_send_sync(opened);
936 }
937
938 fn generate_err_fd(v: serde_json::Value) -> Result<OwnedFd> {
939 let tmp = Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
940 let mut tf = cap_tempfile::TempFile::new_anonymous(&tmp).map(BufWriter::new)?;
941 serde_json::to_writer(&mut tf, &v)?;
942 let mut tf = tf.into_inner().map_err(|e| e.into_error())?;
943 tf.seek(std::io::SeekFrom::Start(0))?;
944 let r = tf.into_std().into();
945 Ok(r)
946 }
947
948 #[tokio::test]
949 async fn test_read_blob_error_retryable() -> Result<()> {
950 let retryable = serde_json::json!({
951 "code": "retryable",
952 "message": "foo",
953 });
954 let retryable = generate_err_fd(retryable)?;
955 let err = ImageProxy::read_blob_error(retryable).boxed();
956 let e = err.await.unwrap_err();
957 match e {
958 GetBlobError::Retryable(s) => assert_eq!(s.as_ref(), "foo"),
959 _ => panic!("Unexpected error {e:?}"),
960 }
961 Ok(())
962 }
963
964 #[tokio::test]
965 async fn test_read_blob_error_none() -> Result<()> {
966 let tmp = Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
967 let tf = cap_tempfile::TempFile::new_anonymous(&tmp)?.into_std();
968 let err = ImageProxy::read_blob_error(tf.into()).boxed();
969 err.await.unwrap();
970 Ok(())
971 }
972
973 #[tokio::test]
974 async fn test_read_blob_error_other() -> Result<()> {
975 let other = serde_json::json!({
976 "code": "other",
977 "message": "bar",
978 });
979 let other = generate_err_fd(other)?;
980 let err = ImageProxy::read_blob_error(other).boxed();
981 let e = err.await.unwrap_err();
982 match e {
983 GetBlobError::Other(s) => assert_eq!(s.as_ref(), "bar"),
984 _ => panic!("Unexpected error {e:?}"),
985 }
986 Ok(())
987 }
988
989 #[tokio::test]
990 async fn test_read_blob_error_epipe() -> Result<()> {
991 let epipe = serde_json::json!({
992 "code": "EPIPE",
993 "message": "baz",
994 });
995 let epipe = generate_err_fd(epipe)?;
996 let err = ImageProxy::read_blob_error(epipe).boxed();
997 err.await.unwrap();
998 Ok(())
999 }
1000
1001 fn create_dummy_fd() -> OwnedFd {
1003 memfd_create(c"test-fd", MemfdFlags::CLOEXEC).unwrap()
1004 }
1005
1006 #[test]
1007 fn test_new_from_raw_values_finish_pipe() {
1008 let datafd = create_dummy_fd();
1009 let raw_datafd_val = datafd.as_raw_fd();
1011 let fds = vec![datafd];
1012 let v = FinishPipe::from_reply(fds, 1).unwrap();
1013 assert_eq!(v.pipeid.0.get(), 1);
1014 assert_eq!(v.datafd.as_raw_fd(), raw_datafd_val);
1015 }
1016
1017 #[test]
1018 fn test_new_from_raw_values_dual_fds() {
1019 let datafd = create_dummy_fd();
1020 let errfd = create_dummy_fd();
1021 let raw_datafd_val = datafd.as_raw_fd();
1022 let raw_errfd_val = errfd.as_raw_fd();
1023 let fds = vec![datafd, errfd];
1024 let v = DualFds::from_reply(fds, 0).unwrap();
1025 assert_eq!(v.datafd.as_raw_fd(), raw_datafd_val);
1026 assert_eq!(v.errfd.as_raw_fd(), raw_errfd_val);
1027 }
1028
1029 #[test]
1030 fn test_new_from_raw_values_error_too_many_fds() {
1031 let fds = vec![create_dummy_fd(), create_dummy_fd(), create_dummy_fd()];
1032 match DualFds::from_reply(fds, 0) {
1033 Ok(v) => unreachable!("{v:?}"),
1034 Err(Error::Other(msg)) => {
1035 assert_eq!(msg.as_ref(), "Expected two fds for DualFds")
1036 }
1037 Err(other) => unreachable!("{other}"),
1038 }
1039 }
1040
1041 #[test]
1042 fn test_new_from_raw_values_error_fd_with_zero_pipeid() {
1043 let fds = vec![create_dummy_fd()];
1044 match FinishPipe::from_reply(fds, 0) {
1045 Ok(v) => unreachable!("{v:?}"),
1046 Err(Error::Other(msg)) => {
1047 assert_eq!(msg.as_ref(), "Expected pipeid for FinishPipe")
1048 }
1049 Err(other) => unreachable!("{other}"),
1050 }
1051 }
1052
1053 #[test]
1054 fn test_new_from_raw_values_error_pipeid_with_both_fds() {
1055 let fds = vec![create_dummy_fd(), create_dummy_fd()];
1056 match DualFds::from_reply(fds, 1) {
1057 Ok(v) => unreachable!("{v:?}"),
1058 Err(Error::Other(msg)) => {
1059 assert_eq!(msg.as_ref(), "Unexpected pipeid with DualFds")
1060 }
1061 Err(other) => unreachable!("{other}"),
1062 }
1063 }
1064
1065 #[test]
1066 fn test_new_from_raw_values_error_no_fd_with_pipeid() {
1067 let fds: Vec<OwnedFd> = vec![];
1068 match FinishPipe::from_reply(fds, 1) {
1069 Ok(v) => unreachable!("{v:?}"),
1070 Err(Error::Other(msg)) => {
1071 assert_eq!(msg.as_ref(), "Expected exactly one fd for FinishPipe")
1072 }
1073 Err(other) => unreachable!("{other}"),
1074 }
1075 }
1076
1077 #[tokio::test]
1078 #[ignore = "https://github.com/coreos/rpm-ostree/issues/5442"]
1079 async fn test_open_optional() -> Result<()> {
1080 if !check_skopeo() {
1081 return Ok(());
1082 }
1083
1084 let td = tempfile::tempdir()?;
1085 let td = td.path().to_str().unwrap();
1086 let proxy = ImageProxy::new().await?;
1087 let imgpath = format!("oci-archive:{td}/some-nonexistent-image.ociarchive");
1088 let img = proxy.open_image_optional(&imgpath).await.unwrap();
1089 assert!(img.is_none());
1090
1091 Ok(())
1092 }
1093}