1pub mod transport;
8
9pub use ::oci_spec;
13pub use transport::{
14 ContainersStorageRef, ImageReference, ImageReferenceError, Transport, TransportConversionError,
15};
16
17use ::oci_spec::image::{Descriptor, Digest};
18use cap_std_ext::prelude::CapStdExtCommandExt;
19use cap_std_ext::{cap_std, cap_tempfile};
20use futures_util::{Future, FutureExt};
21use itertools::Itertools;
22use serde::{Deserialize, Serialize};
23use sha2::Digest as _;
24use std::fs::File;
25use std::iter::FusedIterator;
26use std::num::NonZeroU32;
27use std::ops::Range;
28use std::os::fd::OwnedFd;
29use std::os::unix::prelude::CommandExt;
30use std::path::PathBuf;
31use std::pin::Pin;
32use std::process::{Command, Stdio};
33use std::str::FromStr;
34use std::sync::{Arc, Mutex, OnceLock};
35use thiserror::Error;
36use tokio::io::{AsyncBufRead, AsyncRead, AsyncReadExt, ReadBuf};
37use tokio::sync::{oneshot, Mutex as AsyncMutex};
38use tokio::task::JoinError;
39use tracing::instrument;
40
41#[derive(Error, Debug)]
43#[non_exhaustive]
44pub enum Error {
45 #[error("i/o error: {0}")]
46 Io(#[from] std::io::Error),
48 #[error("skopeo spawn error: {0}")]
49 SkopeoSpawnError(#[source] std::io::Error),
51 #[error("serialization error: {0}")]
52 SerDe(#[from] serde_json::Error),
54 #[error("failed to invoke method {method}: {error}")]
56 RequestInitiationFailure { method: Box<str>, error: Box<str> },
57 #[error("proxy request returned error: {0}")]
59 RequestReturned(Box<str>),
60 #[error(transparent)]
62 BlobError(#[from] GetBlobError),
63 #[error("semantic version error: {0}")]
64 SemanticVersion(#[from] semver::Error),
65 #[error("proxy too old (requested={requested_version} found={found_version}) error")]
66 ProxyTooOld {
68 requested_version: Box<str>,
69 found_version: Box<str>,
70 },
71 #[error("configuration error: {0}")]
72 Configuration(Box<str>),
74 #[error("other error: {0}")]
75 Other(Box<str>),
77}
78
79impl Error {
80 pub(crate) fn new_other(e: impl Into<Box<str>>) -> Self {
81 Self::Other(e.into())
82 }
83}
84
85#[derive(Error, Debug)]
87#[non_exhaustive]
88pub enum GetBlobError {
89 #[error("retryable error: {0}")]
91 Retryable(Box<str>),
92 #[error("other error: {0}")]
93 Other(Box<str>),
95}
96
97impl From<rustix::io::Errno> for Error {
98 fn from(value: rustix::io::Errno) -> Self {
99 Self::Io(value.into())
100 }
101}
102
103pub type Result<T> = std::result::Result<T, Error>;
105
106#[derive(Debug, Clone, Copy, PartialEq, Eq)]
107pub enum BlobStreamSource {
108 GetRawBlob,
110 GetBlob,
112}
113
114pub struct BlobStream<'a> {
116 source: BlobStreamSource,
117 expected_size: u64,
118 reader: Box<dyn AsyncRead + Send + Unpin>,
119 driver: futures_util::future::BoxFuture<'a, Result<()>>,
120}
121
122impl<'a> BlobStream<'a> {
123 pub fn source(&self) -> BlobStreamSource {
124 self.source
125 }
126
127 pub fn expected_size(&self) -> u64 {
128 self.expected_size
129 }
130
131 pub fn into_parts(
132 self,
133 ) -> (
134 Box<dyn AsyncRead + Send + Unpin>,
135 futures_util::future::BoxFuture<'a, Result<()>>,
136 ) {
137 (self.reader, self.driver)
138 }
139}
140
141impl std::fmt::Debug for BlobStream<'_> {
142 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143 f.debug_struct("BlobStream")
144 .field("source", &self.source)
145 .field("expected_size", &self.expected_size)
146 .finish_non_exhaustive()
147 }
148}
149
150#[derive(Debug)]
151enum VerifiedBlobReadResult {
152 Complete { nbytes: u64, digest: Digest },
153 Incomplete,
154}
155
156#[derive(Debug)]
157enum Hasher {
158 Sha256(sha2::Sha256),
159 Sha384(sha2::Sha384),
160 Sha512(sha2::Sha512),
161}
162
163impl Hasher {
164 fn new_for_digest(digest: &Digest) -> Result<Self> {
165 use oci_spec::image::DigestAlgorithm;
166 Ok(match digest.algorithm() {
167 DigestAlgorithm::Sha256 => Self::Sha256(sha2::Sha256::new()),
168 DigestAlgorithm::Sha384 => Self::Sha384(sha2::Sha384::new()),
169 DigestAlgorithm::Sha512 => Self::Sha512(sha2::Sha512::new()),
170 DigestAlgorithm::Other(a) => {
171 return Err(Error::Other(
172 format!("Unsupported digest algorithm for blob verification: {a}").into(),
173 ));
174 }
175 _ => {
176 return Err(Error::Other(
177 format!(
178 "Unsupported digest algorithm for blob verification: {}",
179 digest.algorithm().as_ref()
180 )
181 .into(),
182 ));
183 }
184 })
185 }
186
187 fn update(&mut self, chunk: &[u8]) {
188 match self {
189 Self::Sha256(h) => h.update(chunk),
190 Self::Sha384(h) => h.update(chunk),
191 Self::Sha512(h) => h.update(chunk),
192 }
193 }
194
195 fn finalize_digest(self) -> Digest {
196 let (algorithm, hex) = match self {
197 Self::Sha256(h) => ("sha256", hex::encode(h.finalize())),
198 Self::Sha384(h) => ("sha384", hex::encode(h.finalize())),
199 Self::Sha512(h) => ("sha512", hex::encode(h.finalize())),
200 };
201 Digest::from_str(&format!("{algorithm}:{hex}")).expect("valid digest")
202 }
203}
204
205#[derive(Debug)]
208struct VerifiedBlobReader<R> {
209 inner: R,
210 nbytes: u64,
211 hasher: Option<Hasher>,
212 completion: Option<oneshot::Sender<VerifiedBlobReadResult>>,
213}
214
215impl<R: AsyncRead + Unpin> VerifiedBlobReader<R> {
216 fn new(
217 inner: R,
218 expected: Digest,
219 completion: oneshot::Sender<VerifiedBlobReadResult>,
220 ) -> Result<Self> {
221 let hasher = Hasher::new_for_digest(&expected)?;
222 Ok(Self {
223 inner,
224 nbytes: 0,
225 hasher: Some(hasher),
226 completion: Some(completion),
227 })
228 }
229}
230
231impl<R: AsyncRead + Unpin> AsyncRead for VerifiedBlobReader<R> {
232 fn poll_read(
233 mut self: Pin<&mut Self>,
234 cx: &mut std::task::Context<'_>,
235 buf: &mut ReadBuf<'_>,
236 ) -> std::task::Poll<std::io::Result<()>> {
237 if buf.remaining() == 0 {
238 return std::task::Poll::Ready(Ok(()));
239 }
240 let before = buf.filled().len();
242 match Pin::new(&mut self.inner).poll_read(cx, buf) {
243 v @ std::task::Poll::Ready(Ok(())) => {
244 let after = buf.filled().len();
245 let delta = after.checked_sub(before).unwrap();
246 if delta > 0 {
247 let chunk = &buf.filled()[before..after];
248 let hasher = self.hasher.as_mut().expect("hasher missing before EOF");
249 hasher.update(chunk);
250 self.nbytes += delta as u64;
251 } else {
252 let Some(tx) = self.completion.take() else {
254 return v;
255 };
256 let Some(hasher) = self.hasher.take() else {
257 return v;
258 };
259 let _ = tx.send(VerifiedBlobReadResult::Complete {
260 nbytes: self.nbytes,
261 digest: hasher.finalize_digest(),
262 });
263 }
264 v
265 }
266 o => o,
267 }
268 }
269}
270
271impl<R> Drop for VerifiedBlobReader<R> {
272 fn drop(&mut self) {
273 if let Some(tx) = self.completion.take() {
274 let _ = tx.send(VerifiedBlobReadResult::Incomplete);
275 }
276 }
277}
278
279fn verify_blob_bytes_read(
280 expected: &Digest,
281 expected_size: u64,
282 r: VerifiedBlobReadResult,
283) -> Result<()> {
284 match r {
285 VerifiedBlobReadResult::Incomplete => Ok(()),
286 VerifiedBlobReadResult::Complete { nbytes, digest } => {
287 if nbytes != expected_size {
288 return Err(Error::Other(
289 format!(
290 "Blob size mismatch for {expected}: expected {expected_size} bytes, read {nbytes} bytes"
291 )
292 .into(),
293 ));
294 }
295 if digest != *expected {
296 return Err(Error::Other(
297 format!("Blob digest mismatch for {expected}: computed {digest}").into(),
298 ));
299 }
300 Ok(())
301 }
302 }
303}
304
305pub const RESERVED_FD_RANGE: Range<i32> = 100..200;
309
310const MAX_MSG_SIZE: usize = 32 * 1024;
313
314fn base_proto_version() -> &'static semver::VersionReq {
315 static BASE_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
317 BASE_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.3").unwrap())
318}
319
320fn layer_info_proto_version() -> &'static semver::VersionReq {
321 static LAYER_INFO_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
322 LAYER_INFO_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.5").unwrap())
323}
324
325fn layer_info_piped_proto_version() -> &'static semver::VersionReq {
326 static LAYER_INFO_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
327 LAYER_INFO_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.7").unwrap())
328}
329
330fn raw_blob_proto_version() -> &'static semver::VersionReq {
331 static RAW_BLOB_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
332 RAW_BLOB_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.8").unwrap())
333}
334
335#[derive(Serialize)]
336struct Request {
337 method: String,
338 args: Vec<serde_json::Value>,
339}
340
341impl Request {
342 fn new<T, I>(method: &str, args: T) -> Self
343 where
344 T: IntoIterator<Item = I>,
345 I: Into<serde_json::Value>,
346 {
347 let args: Vec<_> = args.into_iter().map(|v| v.into()).collect();
348 Self {
349 method: method.to_string(),
350 args,
351 }
352 }
353
354 fn new_bare(method: &str) -> Self {
355 Self {
356 method: method.to_string(),
357 args: vec![],
358 }
359 }
360}
361
362#[derive(Deserialize)]
363struct Reply {
364 success: bool,
365 error: String,
366 pipeid: u32,
367 value: serde_json::Value,
368}
369
370type ChildFuture = Pin<
371 Box<
372 dyn Future<Output = std::result::Result<std::io::Result<std::process::Output>, JoinError>>
373 + Send,
374 >,
375>;
376
377pub struct ImageProxy {
379 sockfd: Arc<Mutex<OwnedFd>>,
380 childwait: Arc<AsyncMutex<ChildFuture>>,
381 protover: semver::Version,
382}
383
384impl std::fmt::Debug for ImageProxy {
385 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
386 f.debug_struct("ImageProxy").finish()
387 }
388}
389
390#[derive(Debug, PartialEq, Eq)]
392pub struct OpenedImage(u32);
393
394#[derive(Debug, PartialEq, Eq)]
395struct PipeId(NonZeroU32);
396
397impl PipeId {
398 fn try_new(pipeid: u32) -> Option<Self> {
399 Some(Self(NonZeroU32::new(pipeid)?))
400 }
401}
402
403#[derive(Debug, Default)]
405#[non_exhaustive]
406pub struct ImageProxyConfig {
407 pub authfile: Option<PathBuf>,
410
411 pub auth_data: Option<File>,
413
414 pub auth_anonymous: bool,
418
419 pub certificate_directory: Option<PathBuf>,
422
423 pub decryption_keys: Option<Vec<String>>,
426
427 pub insecure_skip_tls_verification: Option<bool>,
429
430 pub insecure_policy: Option<bool>,
432
433 pub user_agent_prefix: Option<String>,
437
438 pub debug: bool,
442
443 pub skopeo_cmd: Option<Command>,
464}
465
466fn supports_user_agent_prefix() -> bool {
468 static SUPPORTS_USER_AGENT: OnceLock<bool> = OnceLock::new();
469 *SUPPORTS_USER_AGENT.get_or_init(|| {
470 Command::new("skopeo")
471 .arg("--help")
472 .stdout(Stdio::piped())
473 .stderr(Stdio::null())
474 .output()
475 .ok()
476 .and_then(|output| {
477 String::from_utf8(output.stdout)
478 .ok()
479 .map(|help| help.contains("--user-agent-prefix"))
480 })
481 .unwrap_or(false)
482 })
483}
484
485impl TryFrom<ImageProxyConfig> for Command {
486 type Error = Error;
487
488 fn try_from(config: ImageProxyConfig) -> Result<Self> {
489 let debug = config.debug || std::env::var_os("CONTAINERS_IMAGE_PROXY_DEBUG").is_some();
490 let mut allocated_fds = RESERVED_FD_RANGE.clone();
491 let mut alloc_fd = || {
492 allocated_fds.next().ok_or_else(|| {
493 Error::Other("Ran out of reserved file descriptors for child".into())
494 })
495 };
496
497 let mut c = config.skopeo_cmd.unwrap_or_else(|| {
499 let mut c = std::process::Command::new("skopeo");
500 unsafe {
501 c.pre_exec(|| {
502 Ok(rustix::process::set_parent_process_death_signal(Some(
503 rustix::process::Signal::TERM,
504 ))?)
505 });
506 }
507 c
508 });
509 c.arg("experimental-image-proxy");
510 if debug {
511 c.arg("--debug");
512 }
513 let auth_option_count = [
514 config.authfile.is_some(),
515 config.auth_data.is_some(),
516 config.auth_anonymous,
517 ]
518 .into_iter()
519 .filter(|&x| x)
520 .count();
521 if auth_option_count > 1 {
522 return Err(Error::Configuration(
524 "Conflicting authentication options".into(),
525 ));
526 }
527 if let Some(authfile) = config.authfile {
528 c.arg("--authfile");
529 c.arg(authfile);
530 } else if let Some(mut auth_data) = config.auth_data.map(std::io::BufReader::new) {
531 let target_fd = alloc_fd()?;
535 let tmpd = &cap_std::fs::Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
536 let mut tempfile =
537 cap_tempfile::TempFile::new_anonymous(tmpd).map(std::io::BufWriter::new)?;
538 std::io::copy(&mut auth_data, &mut tempfile)?;
539 let tempfile = tempfile
540 .into_inner()
541 .map_err(|e| e.into_error())?
542 .into_std();
543 let fd = std::sync::Arc::new(tempfile.into());
544 c.take_fd_n(fd, target_fd);
545 c.arg("--authfile");
546 c.arg(format!("/proc/self/fd/{target_fd}"));
547 } else if config.auth_anonymous {
548 c.arg("--no-creds");
549 }
550
551 if let Some(certificate_directory) = config.certificate_directory {
552 c.arg("--cert-dir");
553 c.arg(certificate_directory);
554 }
555
556 if let Some(decryption_keys) = config.decryption_keys {
557 for decryption_key in &decryption_keys {
558 c.arg("--decryption-key");
559 c.arg(decryption_key);
560 }
561 }
562
563 if config.insecure_skip_tls_verification.unwrap_or_default() {
564 c.arg("--tls-verify=false");
565 }
566
567 if config.insecure_policy.unwrap_or_default() {
568 c.arg("--insecure-policy");
569 }
570
571 if let Some(user_agent_prefix) = config.user_agent_prefix {
573 if supports_user_agent_prefix() {
574 c.arg("--user-agent-prefix");
575 c.arg(user_agent_prefix);
576 }
577 }
578
579 c.stdout(Stdio::null());
580 if !debug {
581 c.stderr(Stdio::piped());
582 }
583 Ok(c)
584 }
585}
586
587#[derive(Debug, serde::Deserialize)]
589pub struct ConvertedLayerInfo {
590 pub digest: Digest,
593
594 pub size: u64,
596
597 pub media_type: oci_spec::image::MediaType,
599}
600
601#[derive(Debug)]
603struct FinishPipe {
604 pipeid: PipeId,
605 datafd: OwnedFd,
606}
607
608#[derive(Debug)]
610struct DualFds {
611 datafd: OwnedFd,
612 errfd: OwnedFd,
613}
614
615trait FromReplyFds: Send + 'static
617where
618 Self: Sized,
619{
620 fn from_reply(
621 iterable: impl IntoIterator<IntoIter: FusedIterator, Item = OwnedFd>,
622 pipeid: u32,
623 ) -> Result<Self>;
624}
625
626impl FromReplyFds for () {
628 fn from_reply(fds: impl IntoIterator<Item = OwnedFd>, pipeid: u32) -> Result<Self> {
629 if fds.into_iter().next().is_some() {
630 return Err(Error::Other("expected no fds".into()));
631 }
632 if pipeid != 0 {
633 return Err(Error::Other("unexpected pipeid".into()));
634 }
635 Ok(())
636 }
637}
638
639impl FromReplyFds for FinishPipe {
641 fn from_reply(fds: impl IntoIterator<Item = OwnedFd>, pipeid: u32) -> Result<Self> {
642 let Some(pipeid) = PipeId::try_new(pipeid) else {
643 return Err(Error::Other("Expected pipeid for FinishPipe".into()));
644 };
645 let datafd = fds
646 .into_iter()
647 .exactly_one()
648 .map_err(|_| Error::Other("Expected exactly one fd for FinishPipe".into()))?;
649 Ok(Self { pipeid, datafd })
650 }
651}
652
653impl FromReplyFds for DualFds {
655 fn from_reply(fds: impl IntoIterator<Item = OwnedFd>, pipeid: u32) -> Result<Self> {
656 if pipeid != 0 {
657 return Err(Error::Other("Unexpected pipeid with DualFds".into()));
658 }
659 let [datafd, errfd] = fds
660 .into_iter()
661 .collect_array()
662 .ok_or_else(|| Error::Other("Expected two fds for DualFds".into()))?;
663 Ok(Self { datafd, errfd })
664 }
665}
666
667impl ImageProxy {
668 pub async fn new() -> Result<Self> {
670 Self::new_with_config(Default::default()).await
671 }
672
673 #[instrument]
675 pub async fn new_with_config(config: ImageProxyConfig) -> Result<Self> {
676 let mut c = Command::try_from(config)?;
677 let (mysock, theirsock) = rustix::net::socketpair(
678 rustix::net::AddressFamily::UNIX,
679 rustix::net::SocketType::SEQPACKET,
680 rustix::net::SocketFlags::CLOEXEC,
681 None,
682 )?;
683 c.stdin(Stdio::from(theirsock));
684 let child = match c.spawn() {
685 Ok(c) => c,
686 Err(error) => return Err(Error::SkopeoSpawnError(error)),
687 };
688 tracing::debug!("Spawned skopeo pid={:?}", child.id());
689 let childwait = tokio::task::spawn_blocking(move || child.wait_with_output());
694 let sockfd = Arc::new(Mutex::new(mysock));
695
696 let mut r = Self {
697 sockfd,
698 childwait: Arc::new(AsyncMutex::new(Box::pin(childwait))),
699 protover: semver::Version::new(0, 0, 0),
700 };
701
702 let protover: String = r.impl_request("Initialize", [(); 0]).await?;
704 tracing::debug!("Remote protocol version: {protover}");
705 let protover = semver::Version::parse(protover.as_str())?;
706 let supported = base_proto_version();
708 if !supported.matches(&protover) {
709 return Err(Error::ProxyTooOld {
710 requested_version: supported.to_string().into(),
711 found_version: protover.to_string().into(),
712 });
713 }
714 r.protover = protover;
715
716 Ok(r)
717 }
718
719 pub fn protocol_version(&self) -> &semver::Version {
720 &self.protover
721 }
722
723 pub fn supports_get_raw_blob(&self) -> bool {
724 raw_blob_proto_version().matches(&self.protover)
725 }
726
727 async fn impl_request_raw<T: serde::de::DeserializeOwned + Send + 'static, F: FromReplyFds>(
729 sockfd: Arc<Mutex<OwnedFd>>,
730 req: Request,
731 ) -> Result<(T, F)> {
732 tracing::trace!("sending request {}", req.method.as_str());
733 let r = tokio::task::spawn_blocking(move || {
735 let sockfd = sockfd.lock().unwrap();
736 let sendbuf = serde_json::to_vec(&req)?;
737 let sockfd = &*sockfd;
738 rustix::net::send(sockfd, &sendbuf, rustix::net::SendFlags::empty())?;
739 drop(sendbuf);
740 let mut buf = [0u8; MAX_MSG_SIZE];
741 let mut cmsg_space: Vec<std::mem::MaybeUninit<u8>> =
742 vec![std::mem::MaybeUninit::uninit(); rustix::cmsg_space!(ScmRights(1))];
743 let mut cmsg_buffer = rustix::net::RecvAncillaryBuffer::new(cmsg_space.as_mut_slice());
744 let iov = std::io::IoSliceMut::new(buf.as_mut());
745 let mut iov = [iov];
746 let nread = rustix::net::recvmsg(
747 sockfd,
748 &mut iov,
749 &mut cmsg_buffer,
750 rustix::net::RecvFlags::CMSG_CLOEXEC,
751 )?
752 .bytes;
753 let fdret = cmsg_buffer
754 .drain()
755 .filter_map(|m| match m {
756 rustix::net::RecvAncillaryMessage::ScmRights(f) => Some(f),
757 _ => None,
758 })
759 .flatten();
760 let buf = &buf[..nread];
761 let reply: Reply = serde_json::from_slice(buf)?;
762 if !reply.success {
763 return Err(Error::RequestInitiationFailure {
764 method: req.method.clone().into(),
765 error: reply.error.into(),
766 });
767 }
768 let fds = FromReplyFds::from_reply(fdret, reply.pipeid)?;
769 Ok((serde_json::from_value(reply.value)?, fds))
770 })
771 .await
772 .map_err(|e| Error::Other(e.to_string().into()))??;
773 tracing::trace!("completed request");
774 Ok(r)
775 }
776
777 #[instrument(skip(args))]
780 async fn impl_request_with_fds<
781 T: serde::de::DeserializeOwned + Send + 'static,
782 F: FromReplyFds,
783 >(
784 &self,
785 method: &str,
786 args: impl IntoIterator<Item = impl Into<serde_json::Value>>,
787 ) -> Result<(T, F)> {
788 let req = Self::impl_request_raw(Arc::clone(&self.sockfd), Request::new(method, args));
789 let mut childwait = self.childwait.lock().await;
790 tokio::select! {
791 r = req => { r }
792 r = childwait.as_mut() => {
793 let r = r.map_err(|e| Error::Other(e.to_string().into()))??;
794 let stderr = String::from_utf8_lossy(&r.stderr);
795 Err(Error::Other(format!("skopeo proxy unexpectedly exited during request method {}: {}\n{}", method, r.status, stderr).into()))
796 }
797 }
798 }
799
800 async fn impl_request<T: serde::de::DeserializeOwned + Send + 'static>(
802 &self,
803 method: &str,
804 args: impl IntoIterator<Item = impl Into<serde_json::Value>>,
805 ) -> Result<T> {
806 let (r, ()) = self.impl_request_with_fds(method, args).await?;
807 Ok(r)
808 }
809
810 #[instrument]
811 async fn finish_pipe(&self, pipeid: PipeId) -> Result<()> {
812 tracing::debug!("closing pipe");
813 let (r, ()) = self
814 .impl_request_with_fds("FinishPipe", [pipeid.0.get()])
815 .await?;
816 Ok(r)
817 }
818
819 #[instrument]
821 pub async fn open_image(&self, imgref: &str) -> Result<OpenedImage> {
822 tracing::debug!("opening image");
823 let imgid = self.impl_request("OpenImage", [imgref]).await?;
824 Ok(OpenedImage(imgid))
825 }
826
827 #[instrument]
829 pub async fn open_image_ref(&self, imgref: &ImageReference) -> Result<OpenedImage> {
830 self.open_image(&imgref.to_string()).await
831 }
832
833 #[instrument]
836 pub async fn open_image_optional(&self, imgref: &str) -> Result<Option<OpenedImage>> {
837 tracing::debug!("opening image");
838 let imgid = self.impl_request("OpenImageOptional", [imgref]).await?;
839 if imgid == 0 {
840 Ok(None)
841 } else {
842 Ok(Some(OpenedImage(imgid)))
843 }
844 }
845
846 #[instrument]
849 pub async fn open_image_optional_ref(
850 &self,
851 imgref: &ImageReference,
852 ) -> Result<Option<OpenedImage>> {
853 self.open_image_optional(&imgref.to_string()).await
854 }
855
856 #[instrument]
857 pub async fn close_image(&self, img: &OpenedImage) -> Result<()> {
858 self.impl_request("CloseImage", [img.0]).await
859 }
860
861 async fn read_finish_pipe(&self, pipe: FinishPipe) -> Result<Vec<u8>> {
862 let fd = tokio::fs::File::from_std(std::fs::File::from(pipe.datafd));
863 let mut fd = tokio::io::BufReader::new(fd);
864 let mut r = Vec::new();
865 let reader = fd.read_to_end(&mut r);
866 let (nbytes, finish) = tokio::join!(reader, self.finish_pipe(pipe.pipeid));
867 finish?;
868 assert_eq!(nbytes?, r.len());
869 Ok(r)
870 }
871
872 pub async fn fetch_manifest_raw_oci(&self, img: &OpenedImage) -> Result<(String, Vec<u8>)> {
876 let (digest, pipefd) = self.impl_request_with_fds("GetManifest", [img.0]).await?;
877 Ok((digest, self.read_finish_pipe(pipefd).await?))
878 }
879
880 pub async fn fetch_manifest(
883 &self,
884 img: &OpenedImage,
885 ) -> Result<(String, oci_spec::image::ImageManifest)> {
886 let (digest, raw) = self.fetch_manifest_raw_oci(img).await?;
887 let manifest = serde_json::from_slice(&raw)?;
888 Ok((digest, manifest))
889 }
890
891 pub async fn fetch_config_raw(&self, img: &OpenedImage) -> Result<Vec<u8>> {
894 let ((), pipe) = self.impl_request_with_fds("GetFullConfig", [img.0]).await?;
895 self.read_finish_pipe(pipe).await
896 }
897
898 pub async fn fetch_config(
901 &self,
902 img: &OpenedImage,
903 ) -> Result<oci_spec::image::ImageConfiguration> {
904 let raw = self.fetch_config_raw(img).await?;
905 serde_json::from_slice(&raw).map_err(Into::into)
906 }
907
908 #[instrument]
918 pub async fn get_blob(
919 &self,
920 img: &OpenedImage,
921 digest: &Digest,
922 size: u64,
923 ) -> Result<(
924 impl AsyncBufRead + Send + Unpin,
925 impl Future<Output = Result<()>> + Unpin + '_,
926 )> {
927 tracing::debug!("fetching blob");
930 let args: Vec<serde_json::Value> =
931 vec![img.0.into(), digest.to_string().into(), size.into()];
932 let (_bloblen, pipe): (serde_json::Number, FinishPipe) =
937 self.impl_request_with_fds("GetBlob", args).await?;
938 let fd = tokio::fs::File::from_std(std::fs::File::from(pipe.datafd));
939 let fd = tokio::io::BufReader::new(fd);
940 let finish = Box::pin(self.finish_pipe(pipe.pipeid));
941 Ok((fd, finish))
942 }
943
944 async fn read_blob_error(fd: OwnedFd) -> std::result::Result<(), GetBlobError> {
945 let fd = tokio::fs::File::from_std(std::fs::File::from(fd));
946 let mut errfd = tokio::io::BufReader::new(fd);
947 let mut buf = Vec::new();
948 errfd
949 .read_to_end(&mut buf)
950 .await
951 .map_err(|e| GetBlobError::Other(e.to_string().into_boxed_str()))?;
952 if buf.is_empty() {
953 return Ok(());
954 }
955 #[derive(Deserialize)]
956 struct RemoteError {
957 code: String,
958 message: String,
959 }
960 let e: RemoteError = serde_json::from_slice(&buf)
961 .map_err(|e| GetBlobError::Other(e.to_string().into_boxed_str()))?;
962 match e.code.as_str() {
963 "EPIPE" => Ok(()),
965 "retryable" => Err(GetBlobError::Retryable(e.message.into_boxed_str())),
966 _ => Err(GetBlobError::Other(e.message.into_boxed_str())),
967 }
968 }
969
970 #[instrument]
974 pub async fn get_raw_blob(
975 &self,
976 img: &OpenedImage,
977 digest: &Digest,
978 ) -> Result<(
979 Option<u64>,
980 tokio::fs::File,
981 impl Future<Output = std::result::Result<(), GetBlobError>> + Unpin + '_,
982 )> {
983 tracing::debug!("fetching blob");
984 let args: Vec<serde_json::Value> = vec![img.0.into(), digest.to_string().into()];
985 let (bloblen, fds): (i64, DualFds) = self.impl_request_with_fds("GetRawBlob", args).await?;
986 let bloblen = u64::try_from(bloblen).ok();
988 let fd = tokio::fs::File::from_std(std::fs::File::from(fds.datafd));
989 let err = Self::read_blob_error(fds.errfd).boxed();
990 Ok((bloblen, fd, err))
991 }
992
993 #[instrument]
998 pub async fn get_blob_stream<'a>(
999 &'a self,
1000 img: &OpenedImage,
1001 digest: &Digest,
1002 expected_size: u64,
1003 ) -> Result<BlobStream<'a>> {
1004 if !self.supports_get_raw_blob() {
1005 let (reader, driver) = self.get_blob(img, digest, expected_size).await?;
1006 let driver = driver.boxed();
1007 return Ok(BlobStream {
1008 source: BlobStreamSource::GetBlob,
1009 expected_size,
1010 reader: Box::new(reader),
1011 driver,
1012 });
1013 }
1014
1015 let (reported_size, fd, err) = self.get_raw_blob(img, digest).await?;
1016 if let Some(sz) = reported_size {
1017 if sz != expected_size {
1018 return Err(Error::Other(
1019 format!(
1020 "Blob size mismatch for {digest}: expected {expected_size} bytes, got {sz} bytes"
1021 )
1022 .into(),
1023 ));
1024 }
1025 }
1026
1027 let expected = digest.clone();
1028 let (tx, rx) = oneshot::channel();
1029 let verified = VerifiedBlobReader::new(fd, expected.clone(), tx)?;
1030 let driver = async move {
1031 err.await.map_err(Error::from)?;
1032 match rx.await {
1033 Ok(r) => verify_blob_bytes_read(&expected, expected_size, r),
1034 Err(e) => Err(Error::Other(
1035 format!("Blob stream verification channel error: {e}").into(),
1036 )),
1037 }
1038 }
1039 .boxed();
1040 Ok(BlobStream {
1041 source: BlobStreamSource::GetRawBlob,
1042 expected_size,
1043 reader: Box::new(verified),
1044 driver,
1045 })
1046 }
1047
1048 #[instrument]
1050 pub async fn get_descriptor(
1051 &self,
1052 img: &OpenedImage,
1053 descriptor: &Descriptor,
1054 ) -> Result<(
1055 impl AsyncBufRead + Send + Unpin,
1056 impl Future<Output = Result<()>> + Unpin + '_,
1057 )> {
1058 self.get_blob(img, descriptor.digest(), descriptor.size())
1059 .await
1060 }
1061
1062 #[instrument]
1064 pub async fn get_layer_info(
1065 &self,
1066 img: &OpenedImage,
1067 ) -> Result<Option<Vec<ConvertedLayerInfo>>> {
1068 tracing::debug!("Getting layer info");
1069 if layer_info_piped_proto_version().matches(&self.protover) {
1070 let ((), pipe) = self
1071 .impl_request_with_fds("GetLayerInfoPiped", [img.0])
1072 .await?;
1073 let buf = self.read_finish_pipe(pipe).await?;
1074 return Ok(Some(serde_json::from_slice(&buf)?));
1075 }
1076 if !layer_info_proto_version().matches(&self.protover) {
1077 return Ok(None);
1078 }
1079 let layers = self.impl_request("GetLayerInfo", [img.0]).await?;
1080 Ok(Some(layers))
1081 }
1082
1083 #[instrument]
1085 pub async fn finalize(self) -> Result<()> {
1086 let _ = &self;
1087 let req = Request::new_bare("Shutdown");
1088 let sendbuf = serde_json::to_vec(&req)?;
1089 let sockfd = Arc::try_unwrap(self.sockfd).unwrap().into_inner().unwrap();
1091 rustix::net::send(sockfd, &sendbuf, rustix::net::SendFlags::empty())?;
1092 drop(sendbuf);
1093 tracing::debug!("sent shutdown request");
1094 let mut childwait = self.childwait.lock().await;
1095 let output = childwait
1096 .as_mut()
1097 .await
1098 .map_err(|e| Error::new_other(e.to_string()))??;
1099 if !output.status.success() {
1100 let stderr = String::from_utf8_lossy(&output.stderr);
1101 return Err(Error::RequestReturned(
1102 format!("proxy failed: {}\n{}", output.status, stderr).into(),
1103 ));
1104 }
1105 tracing::debug!("proxy exited successfully");
1106 Ok(())
1107 }
1108}
1109
1110#[cfg(test)]
1111mod tests {
1112 use std::io::{BufWriter, Seek, Write};
1113 use std::os::fd::{AsRawFd, OwnedFd};
1114
1115 use super::*;
1116 use cap_std_ext::cap_std::fs::Dir;
1117 use rustix::fs::{memfd_create, MemfdFlags};
1118
1119 fn check_skopeo() -> bool {
1121 static HAVE_SKOPEO: OnceLock<bool> = OnceLock::new();
1122 *HAVE_SKOPEO.get_or_init(|| {
1123 Command::new("skopeo")
1124 .arg("--help")
1125 .stdout(Stdio::null())
1126 .stderr(Stdio::null())
1127 .status()
1128 .is_ok()
1129 })
1130 }
1131
1132 fn validate(c: Command, contains: &[&str], not_contains: &[&str]) {
1133 let d = format!("{:?}", c);
1137 for c in contains {
1138 assert!(d.contains(c), "{} missing {}", d, c);
1139 }
1140 for c in not_contains {
1141 assert!(!d.contains(c), "{} should not contain {}", d, c);
1142 }
1143 }
1144
1145 #[test]
1146 fn proxy_configs() {
1147 let tmpd = &cap_tempfile::tempdir(cap_std::ambient_authority()).unwrap();
1148
1149 let c = Command::try_from(ImageProxyConfig::default()).unwrap();
1150 validate(
1151 c,
1152 &["experimental-image-proxy"],
1153 &["--no-creds", "--tls-verify", "--authfile"],
1154 );
1155
1156 let c = Command::try_from(ImageProxyConfig {
1157 authfile: Some(PathBuf::from("/path/to/authfile")),
1158 ..Default::default()
1159 })
1160 .unwrap();
1161 validate(c, &[r"--authfile", "/path/to/authfile"], &[]);
1162
1163 let decryption_key_path = "/path/to/decryption_key";
1164 let c = Command::try_from(ImageProxyConfig {
1165 decryption_keys: Some(vec![decryption_key_path.to_string()]),
1166 ..Default::default()
1167 })
1168 .unwrap();
1169 validate(c, &[r"--decryption-key", "/path/to/decryption_key"], &[]);
1170
1171 let c = Command::try_from(ImageProxyConfig {
1172 certificate_directory: Some(PathBuf::from("/path/to/certs")),
1173 ..Default::default()
1174 })
1175 .unwrap();
1176 validate(c, &[r"--cert-dir", "/path/to/certs"], &[]);
1177
1178 let c = Command::try_from(ImageProxyConfig {
1179 insecure_skip_tls_verification: Some(true),
1180 ..Default::default()
1181 })
1182 .unwrap();
1183 validate(c, &[r"--tls-verify=false"], &[]);
1184
1185 let c = Command::try_from(ImageProxyConfig {
1186 insecure_policy: Some(true),
1187 ..Default::default()
1188 })
1189 .unwrap();
1190 validate(c, &[r"--insecure-policy"], &[]);
1191
1192 let mut tmpf = cap_tempfile::TempFile::new_anonymous(tmpd).unwrap();
1193 tmpf.write_all(r#"{ "auths": {} "#.as_bytes()).unwrap();
1194 tmpf.seek(std::io::SeekFrom::Start(0)).unwrap();
1195 let c = Command::try_from(ImageProxyConfig {
1196 auth_data: Some(tmpf.into_std()),
1197 ..Default::default()
1198 })
1199 .unwrap();
1200 validate(c, &["--authfile", "/proc/self/fd/100"], &[]);
1201
1202 let c = Command::try_from(ImageProxyConfig {
1204 user_agent_prefix: Some("bootc/1.0".to_string()),
1205 ..Default::default()
1206 })
1207 .unwrap();
1208 if supports_user_agent_prefix() {
1209 validate(c, &["--user-agent-prefix", "bootc/1.0"], &[]);
1210 } else {
1211 validate(c, &[], &["--user-agent-prefix"]);
1212 }
1213 }
1214
1215 #[tokio::test]
1216 async fn skopeo_not_found() {
1217 let mut config = ImageProxyConfig {
1218 ..ImageProxyConfig::default()
1219 };
1220 config.skopeo_cmd = Some(Command::new("no-skopeo"));
1221
1222 match ImageProxy::new_with_config(config).await {
1223 Ok(_) => panic!("Expected an error"),
1224 Err(ref e @ Error::SkopeoSpawnError(ref inner)) => {
1225 assert_eq!(inner.kind(), std::io::ErrorKind::NotFound);
1226 assert!(e
1228 .to_string()
1229 .contains("skopeo spawn error: No such file or directory"));
1230 }
1231 Err(e) => panic!("Unexpected error {e}"),
1232 }
1233 }
1234
1235 #[tokio::test]
1236 async fn test_proxy_send_sync() {
1237 fn assert_send_sync(_x: impl Send + Sync) {}
1238
1239 let Ok(proxy) = ImageProxy::new().await else {
1240 return;
1242 };
1243 assert_send_sync(&proxy);
1244 assert_send_sync(proxy);
1245
1246 let opened = OpenedImage(0);
1247 assert_send_sync(&opened);
1248 assert_send_sync(opened);
1249 }
1250
1251 fn generate_err_fd(v: serde_json::Value) -> Result<OwnedFd> {
1252 let tmp = Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
1253 let mut tf = cap_tempfile::TempFile::new_anonymous(&tmp).map(BufWriter::new)?;
1254 serde_json::to_writer(&mut tf, &v)?;
1255 let mut tf = tf.into_inner().map_err(|e| e.into_error())?;
1256 tf.seek(std::io::SeekFrom::Start(0))?;
1257 let r = tf.into_std().into();
1258 Ok(r)
1259 }
1260
1261 #[tokio::test]
1262 async fn test_read_blob_error_retryable() -> Result<()> {
1263 let retryable = serde_json::json!({
1264 "code": "retryable",
1265 "message": "foo",
1266 });
1267 let retryable = generate_err_fd(retryable)?;
1268 let err = ImageProxy::read_blob_error(retryable).boxed();
1269 let e = err.await.unwrap_err();
1270 match e {
1271 GetBlobError::Retryable(s) => assert_eq!(s.as_ref(), "foo"),
1272 _ => panic!("Unexpected error {e:?}"),
1273 }
1274 Ok(())
1275 }
1276
1277 #[tokio::test]
1278 async fn test_read_blob_error_none() -> Result<()> {
1279 let tmp = Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
1280 let tf = cap_tempfile::TempFile::new_anonymous(&tmp)?.into_std();
1281 let err = ImageProxy::read_blob_error(tf.into()).boxed();
1282 err.await.unwrap();
1283 Ok(())
1284 }
1285
1286 #[tokio::test]
1287 async fn test_read_blob_error_other() -> Result<()> {
1288 let other = serde_json::json!({
1289 "code": "other",
1290 "message": "bar",
1291 });
1292 let other = generate_err_fd(other)?;
1293 let err = ImageProxy::read_blob_error(other).boxed();
1294 let e = err.await.unwrap_err();
1295 match e {
1296 GetBlobError::Other(s) => assert_eq!(s.as_ref(), "bar"),
1297 _ => panic!("Unexpected error {e:?}"),
1298 }
1299 Ok(())
1300 }
1301
1302 #[tokio::test]
1303 async fn test_read_blob_error_epipe() -> Result<()> {
1304 let epipe = serde_json::json!({
1305 "code": "EPIPE",
1306 "message": "baz",
1307 });
1308 let epipe = generate_err_fd(epipe)?;
1309 let err = ImageProxy::read_blob_error(epipe).boxed();
1310 err.await.unwrap();
1311 Ok(())
1312 }
1313
1314 #[tokio::test]
1315 async fn test_verified_blob_reader_ok() -> Result<()> {
1316 use std::str::FromStr;
1317 use tokio::io::AsyncReadExt;
1318
1319 let data = b"hello world";
1320 let mut tmp = tempfile::NamedTempFile::new()?;
1321 tmp.as_file_mut().write_all(data)?;
1322 tmp.as_file_mut().sync_all()?;
1323
1324 let digest = {
1325 let mut h = sha2::Sha256::new();
1326 h.update(data);
1327 Digest::from_str(&format!("sha256:{}", hex::encode(h.finalize()))).unwrap()
1328 };
1329
1330 let fd = tokio::fs::File::open(tmp.path()).await?;
1331 let (tx, rx) = oneshot::channel();
1332 let mut reader = VerifiedBlobReader::new(fd, digest.clone(), tx)?;
1333
1334 let mut out = Vec::new();
1335 reader.read_to_end(&mut out).await?;
1336 assert_eq!(&out, data);
1337
1338 let result = rx.await.map_err(|e| Error::Other(e.to_string().into()))?;
1339 verify_blob_bytes_read(&digest, data.len() as u64, result)?;
1340 Ok(())
1341 }
1342
1343 #[tokio::test]
1344 async fn test_verified_blob_reader_digest_mismatch() -> Result<()> {
1345 use std::str::FromStr;
1346 use tokio::io::AsyncReadExt;
1347
1348 let data = b"hello world";
1349 let mut tmp = tempfile::NamedTempFile::new()?;
1350 tmp.as_file_mut().write_all(data)?;
1351 tmp.as_file_mut().sync_all()?;
1352
1353 let digest = {
1354 let mut h = sha2::Sha256::new();
1355 h.update(b"not the content");
1356 Digest::from_str(&format!("sha256:{}", hex::encode(h.finalize()))).unwrap()
1357 };
1358
1359 let fd = tokio::fs::File::open(tmp.path()).await?;
1360 let (tx, rx) = oneshot::channel();
1361 let mut reader = VerifiedBlobReader::new(fd, digest.clone(), tx)?;
1362
1363 let mut out = Vec::new();
1364 reader.read_to_end(&mut out).await?;
1365 assert_eq!(&out, data);
1366
1367 let result = rx.await.map_err(|e| Error::Other(e.to_string().into()))?;
1368 assert!(verify_blob_bytes_read(&digest, data.len() as u64, result).is_err());
1369 Ok(())
1370 }
1371
1372 #[tokio::test]
1373 async fn test_get_blob_stream_oci_dir() -> Result<()> {
1374 use ocidir::{oci_spec as ocidir_spec, OciDir};
1375
1376 if !check_skopeo() {
1377 return Ok(());
1378 }
1379
1380 let td = tempfile::tempdir()?;
1381 fn to_other<E: std::fmt::Display>(e: E) -> Error {
1382 Error::Other(e.to_string().into())
1383 }
1384 let layer_bytes = b"layer bytes";
1385 let dir = ocidir::cap_std::fs::Dir::open_ambient_dir(
1386 td.path(),
1387 ocidir::cap_std::ambient_authority(),
1388 )
1389 .map_err(to_other)?;
1390 let oci_dir = OciDir::ensure(dir).map_err(to_other)?;
1391 let mut layerw = oci_dir.create_gzip_layer(None).map_err(to_other)?;
1392 layerw.write_all(layer_bytes)?;
1393 let layer = layerw.complete().map_err(to_other)?;
1394 let layer_desc = layer.descriptor().build().unwrap();
1395 let layer_digest = Digest::from_str(layer_desc.digest().as_ref()).map_err(to_other)?;
1396 let layer_size = layer_desc.size();
1397
1398 let mut manifest = oci_dir
1399 .new_empty_manifest()
1400 .map_err(to_other)?
1401 .build()
1402 .map_err(to_other)?;
1403 let mut config = ocidir_spec::image::ImageConfigurationBuilder::default()
1404 .architecture("amd64")
1405 .os("linux")
1406 .build()
1407 .unwrap();
1408 oci_dir.push_layer(&mut manifest, &mut config, layer, "layer", None);
1409 let config_desc = oci_dir.write_config(config).map_err(to_other)?;
1410 manifest.set_config(config_desc);
1411 oci_dir
1412 .insert_manifest(
1413 manifest,
1414 Some("test"),
1415 ocidir_spec::image::Platform::default(),
1416 )
1417 .map_err(to_other)?;
1418
1419 let proxy = ImageProxy::new().await?;
1420 let imgref = format!("oci:{}:test", td.path().display());
1421 let img = proxy.open_image(&imgref).await?;
1422
1423 let expected_source = match std::env::var("EXPECT_BLOB_STREAM_SOURCE").ok().as_deref() {
1424 Some("GetRawBlob") => BlobStreamSource::GetRawBlob,
1425 Some("GetBlob") => BlobStreamSource::GetBlob,
1426 Some(v) => {
1427 return Err(Error::Other(
1428 format!(
1429 "Invalid EXPECT_BLOB_STREAM_SOURCE={v}; expected GetRawBlob or GetBlob"
1430 )
1431 .into(),
1432 ));
1433 }
1434 None => {
1435 if proxy.supports_get_raw_blob() {
1436 BlobStreamSource::GetRawBlob
1437 } else {
1438 BlobStreamSource::GetBlob
1439 }
1440 }
1441 };
1442
1443 let stream = proxy
1444 .get_blob_stream(&img, &layer_digest, layer_size)
1445 .await?;
1446 assert_eq!(stream.source(), expected_source);
1447 let (mut reader, driver) = stream.into_parts();
1448
1449 let mut sink = tokio::io::sink();
1450 let read = async move {
1451 let n = tokio::io::copy(&mut *reader, &mut sink).await?;
1452 Result::Ok(n)
1453 };
1454 let (n, driver) = tokio::join!(read, driver);
1455 assert_eq!(n?, layer_size);
1456 driver?;
1457
1458 proxy.close_image(&img).await?;
1459 proxy.finalize().await?;
1460 Ok(())
1461 }
1462
1463 fn create_dummy_fd() -> OwnedFd {
1465 memfd_create(c"test-fd", MemfdFlags::CLOEXEC).unwrap()
1466 }
1467
1468 #[test]
1469 fn test_new_from_raw_values_finish_pipe() {
1470 let datafd = create_dummy_fd();
1471 let raw_datafd_val = datafd.as_raw_fd();
1473 let fds = vec![datafd];
1474 let v = FinishPipe::from_reply(fds, 1).unwrap();
1475 assert_eq!(v.pipeid.0.get(), 1);
1476 assert_eq!(v.datafd.as_raw_fd(), raw_datafd_val);
1477 }
1478
1479 #[test]
1480 fn test_new_from_raw_values_dual_fds() {
1481 let datafd = create_dummy_fd();
1482 let errfd = create_dummy_fd();
1483 let raw_datafd_val = datafd.as_raw_fd();
1484 let raw_errfd_val = errfd.as_raw_fd();
1485 let fds = vec![datafd, errfd];
1486 let v = DualFds::from_reply(fds, 0).unwrap();
1487 assert_eq!(v.datafd.as_raw_fd(), raw_datafd_val);
1488 assert_eq!(v.errfd.as_raw_fd(), raw_errfd_val);
1489 }
1490
1491 #[test]
1492 fn test_new_from_raw_values_error_too_many_fds() {
1493 let fds = vec![create_dummy_fd(), create_dummy_fd(), create_dummy_fd()];
1494 match DualFds::from_reply(fds, 0) {
1495 Ok(v) => unreachable!("{v:?}"),
1496 Err(Error::Other(msg)) => {
1497 assert_eq!(msg.as_ref(), "Expected two fds for DualFds")
1498 }
1499 Err(other) => unreachable!("{other}"),
1500 }
1501 }
1502
1503 #[test]
1504 fn test_new_from_raw_values_error_fd_with_zero_pipeid() {
1505 let fds = vec![create_dummy_fd()];
1506 match FinishPipe::from_reply(fds, 0) {
1507 Ok(v) => unreachable!("{v:?}"),
1508 Err(Error::Other(msg)) => {
1509 assert_eq!(msg.as_ref(), "Expected pipeid for FinishPipe")
1510 }
1511 Err(other) => unreachable!("{other}"),
1512 }
1513 }
1514
1515 #[test]
1516 fn test_new_from_raw_values_error_pipeid_with_both_fds() {
1517 let fds = vec![create_dummy_fd(), create_dummy_fd()];
1518 match DualFds::from_reply(fds, 1) {
1519 Ok(v) => unreachable!("{v:?}"),
1520 Err(Error::Other(msg)) => {
1521 assert_eq!(msg.as_ref(), "Unexpected pipeid with DualFds")
1522 }
1523 Err(other) => unreachable!("{other}"),
1524 }
1525 }
1526
1527 #[test]
1528 fn test_new_from_raw_values_error_no_fd_with_pipeid() {
1529 let fds: Vec<OwnedFd> = vec![];
1530 match FinishPipe::from_reply(fds, 1) {
1531 Ok(v) => unreachable!("{v:?}"),
1532 Err(Error::Other(msg)) => {
1533 assert_eq!(msg.as_ref(), "Expected exactly one fd for FinishPipe")
1534 }
1535 Err(other) => unreachable!("{other}"),
1536 }
1537 }
1538
1539 #[tokio::test]
1540 #[ignore = "https://github.com/coreos/rpm-ostree/issues/5442"]
1541 async fn test_open_optional() -> Result<()> {
1542 if !check_skopeo() {
1543 return Ok(());
1544 }
1545
1546 let td = tempfile::tempdir()?;
1547 let td = td.path().to_str().unwrap();
1548 let proxy = ImageProxy::new().await?;
1549 let imgpath = format!("oci-archive:{td}/some-nonexistent-image.ociarchive");
1550 let img = proxy.open_image_optional(&imgpath).await.unwrap();
1551 assert!(img.is_none());
1552
1553 Ok(())
1554 }
1555}