pub mod transport;
pub use ::oci_spec;
pub use transport::{
ContainersStorageRef, ImageReference, ImageReferenceError, Transport, TransportConversionError,
};
use ::oci_spec::image::{Descriptor, Digest};
use cap_std_ext::prelude::CapStdExtCommandExt;
use cap_std_ext::{cap_std, cap_tempfile};
use futures_util::{Future, FutureExt};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::iter::FusedIterator;
use std::num::NonZeroU32;
use std::ops::Range;
use std::os::fd::OwnedFd;
use std::os::unix::prelude::CommandExt;
use std::path::PathBuf;
use std::pin::Pin;
use std::process::{Command, Stdio};
use std::sync::{Arc, Mutex, OnceLock};
use thiserror::Error;
use tokio::io::{AsyncBufRead, AsyncReadExt};
use tokio::sync::Mutex as AsyncMutex;
use tokio::task::JoinError;
use tracing::instrument;
#[derive(Error, Debug)]
#[non_exhaustive]
pub enum Error {
#[error("i/o error: {0}")]
Io(#[from] std::io::Error),
#[error("skopeo spawn error: {0}")]
SkopeoSpawnError(#[source] std::io::Error),
#[error("serialization error: {0}")]
SerDe(#[from] serde_json::Error),
#[error("failed to invoke method {method}: {error}")]
RequestInitiationFailure { method: Box<str>, error: Box<str> },
#[error("proxy request returned error: {0}")]
RequestReturned(Box<str>),
#[error("semantic version error: {0}")]
SemanticVersion(#[from] semver::Error),
#[error("proxy too old (requested={requested_version} found={found_version}) error")]
ProxyTooOld {
requested_version: Box<str>,
found_version: Box<str>,
},
#[error("configuration error: {0}")]
Configuration(Box<str>),
#[error("other error: {0}")]
Other(Box<str>),
}
impl Error {
pub(crate) fn new_other(e: impl Into<Box<str>>) -> Self {
Self::Other(e.into())
}
}
#[derive(Error, Debug)]
#[non_exhaustive]
pub enum GetBlobError {
#[error("retryable error: {0}")]
Retryable(Box<str>),
#[error("other error: {0}")]
Other(Box<str>),
}
impl From<rustix::io::Errno> for Error {
fn from(value: rustix::io::Errno) -> Self {
Self::Io(value.into())
}
}
pub type Result<T> = std::result::Result<T, Error>;
pub const RESERVED_FD_RANGE: Range<i32> = 100..200;
const MAX_MSG_SIZE: usize = 32 * 1024;
fn base_proto_version() -> &'static semver::VersionReq {
static BASE_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
BASE_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.3").unwrap())
}
fn layer_info_proto_version() -> &'static semver::VersionReq {
static LAYER_INFO_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
LAYER_INFO_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.5").unwrap())
}
fn layer_info_piped_proto_version() -> &'static semver::VersionReq {
static LAYER_INFO_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
LAYER_INFO_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.7").unwrap())
}
#[derive(Serialize)]
struct Request {
method: String,
args: Vec<serde_json::Value>,
}
impl Request {
fn new<T, I>(method: &str, args: T) -> Self
where
T: IntoIterator<Item = I>,
I: Into<serde_json::Value>,
{
let args: Vec<_> = args.into_iter().map(|v| v.into()).collect();
Self {
method: method.to_string(),
args,
}
}
fn new_bare(method: &str) -> Self {
Self {
method: method.to_string(),
args: vec![],
}
}
}
#[derive(Deserialize)]
struct Reply {
success: bool,
error: String,
pipeid: u32,
value: serde_json::Value,
}
type ChildFuture = Pin<
Box<
dyn Future<Output = std::result::Result<std::io::Result<std::process::Output>, JoinError>>
+ Send,
>,
>;
pub struct ImageProxy {
sockfd: Arc<Mutex<OwnedFd>>,
childwait: Arc<AsyncMutex<ChildFuture>>,
protover: semver::Version,
}
impl std::fmt::Debug for ImageProxy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ImageProxy").finish()
}
}
#[derive(Debug, PartialEq, Eq)]
pub struct OpenedImage(u32);
#[derive(Debug, PartialEq, Eq)]
struct PipeId(NonZeroU32);
impl PipeId {
fn try_new(pipeid: u32) -> Option<Self> {
Some(Self(NonZeroU32::new(pipeid)?))
}
}
#[derive(Debug, Default)]
#[non_exhaustive]
pub struct ImageProxyConfig {
pub authfile: Option<PathBuf>,
pub auth_data: Option<File>,
pub auth_anonymous: bool,
pub certificate_directory: Option<PathBuf>,
pub decryption_keys: Option<Vec<String>>,
pub insecure_skip_tls_verification: Option<bool>,
pub insecure_policy: Option<bool>,
pub user_agent_prefix: Option<String>,
pub debug: bool,
pub skopeo_cmd: Option<Command>,
}
fn supports_user_agent_prefix() -> bool {
static SUPPORTS_USER_AGENT: OnceLock<bool> = OnceLock::new();
*SUPPORTS_USER_AGENT.get_or_init(|| {
Command::new("skopeo")
.arg("--help")
.stdout(Stdio::piped())
.stderr(Stdio::null())
.output()
.ok()
.and_then(|output| {
String::from_utf8(output.stdout)
.ok()
.map(|help| help.contains("--user-agent-prefix"))
})
.unwrap_or(false)
})
}
impl TryFrom<ImageProxyConfig> for Command {
type Error = Error;
fn try_from(config: ImageProxyConfig) -> Result<Self> {
let debug = config.debug || std::env::var_os("CONTAINERS_IMAGE_PROXY_DEBUG").is_some();
let mut allocated_fds = RESERVED_FD_RANGE.clone();
let mut alloc_fd = || {
allocated_fds.next().ok_or_else(|| {
Error::Other("Ran out of reserved file descriptors for child".into())
})
};
let mut c = config.skopeo_cmd.unwrap_or_else(|| {
let mut c = std::process::Command::new("skopeo");
unsafe {
c.pre_exec(|| {
Ok(rustix::process::set_parent_process_death_signal(Some(
rustix::process::Signal::TERM,
))?)
});
}
c
});
c.arg("experimental-image-proxy");
if debug {
c.arg("--debug");
}
let auth_option_count = [
config.authfile.is_some(),
config.auth_data.is_some(),
config.auth_anonymous,
]
.into_iter()
.filter(|&x| x)
.count();
if auth_option_count > 1 {
return Err(Error::Configuration(
"Conflicting authentication options".into(),
));
}
if let Some(authfile) = config.authfile {
c.arg("--authfile");
c.arg(authfile);
} else if let Some(mut auth_data) = config.auth_data.map(std::io::BufReader::new) {
let target_fd = alloc_fd()?;
let tmpd = &cap_std::fs::Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
let mut tempfile =
cap_tempfile::TempFile::new_anonymous(tmpd).map(std::io::BufWriter::new)?;
std::io::copy(&mut auth_data, &mut tempfile)?;
let tempfile = tempfile
.into_inner()
.map_err(|e| e.into_error())?
.into_std();
let fd = std::sync::Arc::new(tempfile.into());
c.take_fd_n(fd, target_fd);
c.arg("--authfile");
c.arg(format!("/proc/self/fd/{target_fd}"));
} else if config.auth_anonymous {
c.arg("--no-creds");
}
if let Some(certificate_directory) = config.certificate_directory {
c.arg("--cert-dir");
c.arg(certificate_directory);
}
if let Some(decryption_keys) = config.decryption_keys {
for decryption_key in &decryption_keys {
c.arg("--decryption-key");
c.arg(decryption_key);
}
}
if config.insecure_skip_tls_verification.unwrap_or_default() {
c.arg("--tls-verify=false");
}
if config.insecure_policy.unwrap_or_default() {
c.arg("--insecure-policy");
}
if let Some(user_agent_prefix) = config.user_agent_prefix {
if supports_user_agent_prefix() {
c.arg("--user-agent-prefix");
c.arg(user_agent_prefix);
}
}
c.stdout(Stdio::null());
if !debug {
c.stderr(Stdio::piped());
}
Ok(c)
}
}
#[derive(Debug, serde::Deserialize)]
pub struct ConvertedLayerInfo {
pub digest: Digest,
pub size: u64,
pub media_type: oci_spec::image::MediaType,
}
#[derive(Debug)]
struct FinishPipe {
pipeid: PipeId,
datafd: OwnedFd,
}
#[derive(Debug)]
struct DualFds {
datafd: OwnedFd,
errfd: OwnedFd,
}
trait FromReplyFds: Send + 'static
where
Self: Sized,
{
fn from_reply(
iterable: impl IntoIterator<IntoIter: FusedIterator, Item = OwnedFd>,
pipeid: u32,
) -> Result<Self>;
}
impl FromReplyFds for () {
fn from_reply(fds: impl IntoIterator<Item = OwnedFd>, pipeid: u32) -> Result<Self> {
if fds.into_iter().next().is_some() {
return Err(Error::Other("expected no fds".into()));
}
if pipeid != 0 {
return Err(Error::Other("unexpected pipeid".into()));
}
Ok(())
}
}
impl FromReplyFds for FinishPipe {
fn from_reply(fds: impl IntoIterator<Item = OwnedFd>, pipeid: u32) -> Result<Self> {
let Some(pipeid) = PipeId::try_new(pipeid) else {
return Err(Error::Other("Expected pipeid for FinishPipe".into()));
};
let datafd = fds
.into_iter()
.exactly_one()
.map_err(|_| Error::Other("Expected exactly one fd for FinishPipe".into()))?;
Ok(Self { pipeid, datafd })
}
}
impl FromReplyFds for DualFds {
fn from_reply(fds: impl IntoIterator<Item = OwnedFd>, pipeid: u32) -> Result<Self> {
if pipeid != 0 {
return Err(Error::Other("Unexpected pipeid with DualFds".into()));
}
let [datafd, errfd] = fds
.into_iter()
.collect_array()
.ok_or_else(|| Error::Other("Expected two fds for DualFds".into()))?;
Ok(Self { datafd, errfd })
}
}
impl ImageProxy {
pub async fn new() -> Result<Self> {
Self::new_with_config(Default::default()).await
}
#[instrument]
pub async fn new_with_config(config: ImageProxyConfig) -> Result<Self> {
let mut c = Command::try_from(config)?;
let (mysock, theirsock) = rustix::net::socketpair(
rustix::net::AddressFamily::UNIX,
rustix::net::SocketType::SEQPACKET,
rustix::net::SocketFlags::CLOEXEC,
None,
)?;
c.stdin(Stdio::from(theirsock));
let child = match c.spawn() {
Ok(c) => c,
Err(error) => return Err(Error::SkopeoSpawnError(error)),
};
tracing::debug!("Spawned skopeo pid={:?}", child.id());
let childwait = tokio::task::spawn_blocking(move || child.wait_with_output());
let sockfd = Arc::new(Mutex::new(mysock));
let mut r = Self {
sockfd,
childwait: Arc::new(AsyncMutex::new(Box::pin(childwait))),
protover: semver::Version::new(0, 0, 0),
};
let protover: String = r.impl_request("Initialize", [(); 0]).await?;
tracing::debug!("Remote protocol version: {protover}");
let protover = semver::Version::parse(protover.as_str())?;
let supported = base_proto_version();
if !supported.matches(&protover) {
return Err(Error::ProxyTooOld {
requested_version: protover.to_string().into(),
found_version: supported.to_string().into(),
});
}
r.protover = protover;
Ok(r)
}
async fn impl_request_raw<T: serde::de::DeserializeOwned + Send + 'static, F: FromReplyFds>(
sockfd: Arc<Mutex<OwnedFd>>,
req: Request,
) -> Result<(T, F)> {
tracing::trace!("sending request {}", req.method.as_str());
let r = tokio::task::spawn_blocking(move || {
let sockfd = sockfd.lock().unwrap();
let sendbuf = serde_json::to_vec(&req)?;
let sockfd = &*sockfd;
rustix::net::send(sockfd, &sendbuf, rustix::net::SendFlags::empty())?;
drop(sendbuf);
let mut buf = [0u8; MAX_MSG_SIZE];
let mut cmsg_space: Vec<std::mem::MaybeUninit<u8>> =
vec![std::mem::MaybeUninit::uninit(); rustix::cmsg_space!(ScmRights(1))];
let mut cmsg_buffer = rustix::net::RecvAncillaryBuffer::new(cmsg_space.as_mut_slice());
let iov = std::io::IoSliceMut::new(buf.as_mut());
let mut iov = [iov];
let nread = rustix::net::recvmsg(
sockfd,
&mut iov,
&mut cmsg_buffer,
rustix::net::RecvFlags::CMSG_CLOEXEC,
)?
.bytes;
let fdret = cmsg_buffer
.drain()
.filter_map(|m| match m {
rustix::net::RecvAncillaryMessage::ScmRights(f) => Some(f),
_ => None,
})
.flatten();
let buf = &buf[..nread];
let reply: Reply = serde_json::from_slice(buf)?;
if !reply.success {
return Err(Error::RequestInitiationFailure {
method: req.method.clone().into(),
error: reply.error.into(),
});
}
let fds = FromReplyFds::from_reply(fdret, reply.pipeid)?;
Ok((serde_json::from_value(reply.value)?, fds))
})
.await
.map_err(|e| Error::Other(e.to_string().into()))??;
tracing::trace!("completed request");
Ok(r)
}
#[instrument(skip(args))]
async fn impl_request_with_fds<
T: serde::de::DeserializeOwned + Send + 'static,
F: FromReplyFds,
>(
&self,
method: &str,
args: impl IntoIterator<Item = impl Into<serde_json::Value>>,
) -> Result<(T, F)> {
let req = Self::impl_request_raw(Arc::clone(&self.sockfd), Request::new(method, args));
let mut childwait = self.childwait.lock().await;
tokio::select! {
r = req => { r }
r = childwait.as_mut() => {
let r = r.map_err(|e| Error::Other(e.to_string().into()))??;
let stderr = String::from_utf8_lossy(&r.stderr);
Err(Error::Other(format!("skopeo proxy unexpectedly exited during request method {}: {}\n{}", method, r.status, stderr).into()))
}
}
}
async fn impl_request<T: serde::de::DeserializeOwned + Send + 'static>(
&self,
method: &str,
args: impl IntoIterator<Item = impl Into<serde_json::Value>>,
) -> Result<T> {
let (r, ()) = self.impl_request_with_fds(method, args).await?;
Ok(r)
}
#[instrument]
async fn finish_pipe(&self, pipeid: PipeId) -> Result<()> {
tracing::debug!("closing pipe");
let (r, ()) = self
.impl_request_with_fds("FinishPipe", [pipeid.0.get()])
.await?;
Ok(r)
}
#[instrument]
pub async fn open_image(&self, imgref: &str) -> Result<OpenedImage> {
tracing::debug!("opening image");
let imgid = self.impl_request("OpenImage", [imgref]).await?;
Ok(OpenedImage(imgid))
}
#[instrument]
pub async fn open_image_ref(&self, imgref: &ImageReference) -> Result<OpenedImage> {
self.open_image(&imgref.to_string()).await
}
#[instrument]
pub async fn open_image_optional(&self, imgref: &str) -> Result<Option<OpenedImage>> {
tracing::debug!("opening image");
let imgid = self.impl_request("OpenImageOptional", [imgref]).await?;
if imgid == 0 {
Ok(None)
} else {
Ok(Some(OpenedImage(imgid)))
}
}
#[instrument]
pub async fn open_image_optional_ref(
&self,
imgref: &ImageReference,
) -> Result<Option<OpenedImage>> {
self.open_image_optional(&imgref.to_string()).await
}
#[instrument]
pub async fn close_image(&self, img: &OpenedImage) -> Result<()> {
self.impl_request("CloseImage", [img.0]).await
}
async fn read_finish_pipe(&self, pipe: FinishPipe) -> Result<Vec<u8>> {
let fd = tokio::fs::File::from_std(std::fs::File::from(pipe.datafd));
let mut fd = tokio::io::BufReader::new(fd);
let mut r = Vec::new();
let reader = fd.read_to_end(&mut r);
let (nbytes, finish) = tokio::join!(reader, self.finish_pipe(pipe.pipeid));
finish?;
assert_eq!(nbytes?, r.len());
Ok(r)
}
pub async fn fetch_manifest_raw_oci(&self, img: &OpenedImage) -> Result<(String, Vec<u8>)> {
let (digest, pipefd) = self.impl_request_with_fds("GetManifest", [img.0]).await?;
Ok((digest, self.read_finish_pipe(pipefd).await?))
}
pub async fn fetch_manifest(
&self,
img: &OpenedImage,
) -> Result<(String, oci_spec::image::ImageManifest)> {
let (digest, raw) = self.fetch_manifest_raw_oci(img).await?;
let manifest = serde_json::from_slice(&raw)?;
Ok((digest, manifest))
}
pub async fn fetch_config_raw(&self, img: &OpenedImage) -> Result<Vec<u8>> {
let ((), pipe) = self.impl_request_with_fds("GetFullConfig", [img.0]).await?;
self.read_finish_pipe(pipe).await
}
pub async fn fetch_config(
&self,
img: &OpenedImage,
) -> Result<oci_spec::image::ImageConfiguration> {
let raw = self.fetch_config_raw(img).await?;
serde_json::from_slice(&raw).map_err(Into::into)
}
#[instrument]
pub async fn get_blob(
&self,
img: &OpenedImage,
digest: &Digest,
size: u64,
) -> Result<(
impl AsyncBufRead + Send + Unpin,
impl Future<Output = Result<()>> + Unpin + '_,
)> {
tracing::debug!("fetching blob");
let args: Vec<serde_json::Value> =
vec![img.0.into(), digest.to_string().into(), size.into()];
let (_bloblen, pipe): (serde_json::Number, FinishPipe) =
self.impl_request_with_fds("GetBlob", args).await?;
let fd = tokio::fs::File::from_std(std::fs::File::from(pipe.datafd));
let fd = tokio::io::BufReader::new(fd);
let finish = Box::pin(self.finish_pipe(pipe.pipeid));
Ok((fd, finish))
}
async fn read_blob_error(fd: OwnedFd) -> std::result::Result<(), GetBlobError> {
let fd = tokio::fs::File::from_std(std::fs::File::from(fd));
let mut errfd = tokio::io::BufReader::new(fd);
let mut buf = Vec::new();
errfd
.read_to_end(&mut buf)
.await
.map_err(|e| GetBlobError::Other(e.to_string().into_boxed_str()))?;
if buf.is_empty() {
return Ok(());
}
#[derive(Deserialize)]
struct RemoteError {
code: String,
message: String,
}
let e: RemoteError = serde_json::from_slice(&buf)
.map_err(|e| GetBlobError::Other(e.to_string().into_boxed_str()))?;
match e.code.as_str() {
"EPIPE" => Ok(()),
"retryable" => Err(GetBlobError::Retryable(e.message.into_boxed_str())),
_ => Err(GetBlobError::Other(e.message.into_boxed_str())),
}
}
#[instrument]
pub async fn get_raw_blob(
&self,
img: &OpenedImage,
digest: &Digest,
) -> Result<(
Option<u64>,
tokio::fs::File,
impl Future<Output = std::result::Result<(), GetBlobError>> + Unpin + '_,
)> {
tracing::debug!("fetching blob");
let args: Vec<serde_json::Value> = vec![img.0.into(), digest.to_string().into()];
let (bloblen, fds): (i64, DualFds) = self.impl_request_with_fds("GetRawBlob", args).await?;
let bloblen = u64::try_from(bloblen).ok();
let fd = tokio::fs::File::from_std(std::fs::File::from(fds.datafd));
let err = Self::read_blob_error(fds.errfd).boxed();
Ok((bloblen, fd, err))
}
#[instrument]
pub async fn get_descriptor(
&self,
img: &OpenedImage,
descriptor: &Descriptor,
) -> Result<(
impl AsyncBufRead + Send + Unpin,
impl Future<Output = Result<()>> + Unpin + '_,
)> {
self.get_blob(img, descriptor.digest(), descriptor.size())
.await
}
#[instrument]
pub async fn get_layer_info(
&self,
img: &OpenedImage,
) -> Result<Option<Vec<ConvertedLayerInfo>>> {
tracing::debug!("Getting layer info");
if layer_info_piped_proto_version().matches(&self.protover) {
let ((), pipe) = self
.impl_request_with_fds("GetLayerInfoPiped", [img.0])
.await?;
let buf = self.read_finish_pipe(pipe).await?;
return Ok(Some(serde_json::from_slice(&buf)?));
}
if !layer_info_proto_version().matches(&self.protover) {
return Ok(None);
}
let layers = self.impl_request("GetLayerInfo", [img.0]).await?;
Ok(Some(layers))
}
#[instrument]
pub async fn finalize(self) -> Result<()> {
let _ = &self;
let req = Request::new_bare("Shutdown");
let sendbuf = serde_json::to_vec(&req)?;
let sockfd = Arc::try_unwrap(self.sockfd).unwrap().into_inner().unwrap();
rustix::net::send(sockfd, &sendbuf, rustix::net::SendFlags::empty())?;
drop(sendbuf);
tracing::debug!("sent shutdown request");
let mut childwait = self.childwait.lock().await;
let output = childwait
.as_mut()
.await
.map_err(|e| Error::new_other(e.to_string()))??;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(Error::RequestReturned(
format!("proxy failed: {}\n{}", output.status, stderr).into(),
));
}
tracing::debug!("proxy exited successfully");
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::io::{BufWriter, Seek, Write};
use std::os::fd::{AsRawFd, OwnedFd};
use super::*;
use cap_std_ext::cap_std::fs::Dir;
use rustix::fs::{memfd_create, MemfdFlags};
fn check_skopeo() -> bool {
static HAVE_SKOPEO: OnceLock<bool> = OnceLock::new();
*HAVE_SKOPEO.get_or_init(|| {
Command::new("skopeo")
.arg("--help")
.stdout(Stdio::null())
.stderr(Stdio::null())
.status()
.is_ok()
})
}
fn validate(c: Command, contains: &[&str], not_contains: &[&str]) {
let d = format!("{:?}", c);
for c in contains {
assert!(d.contains(c), "{} missing {}", d, c);
}
for c in not_contains {
assert!(!d.contains(c), "{} should not contain {}", d, c);
}
}
#[test]
fn proxy_configs() {
let tmpd = &cap_tempfile::tempdir(cap_std::ambient_authority()).unwrap();
let c = Command::try_from(ImageProxyConfig::default()).unwrap();
validate(
c,
&["experimental-image-proxy"],
&["--no-creds", "--tls-verify", "--authfile"],
);
let c = Command::try_from(ImageProxyConfig {
authfile: Some(PathBuf::from("/path/to/authfile")),
..Default::default()
})
.unwrap();
validate(c, &[r"--authfile", "/path/to/authfile"], &[]);
let decryption_key_path = "/path/to/decryption_key";
let c = Command::try_from(ImageProxyConfig {
decryption_keys: Some(vec![decryption_key_path.to_string()]),
..Default::default()
})
.unwrap();
validate(c, &[r"--decryption-key", "/path/to/decryption_key"], &[]);
let c = Command::try_from(ImageProxyConfig {
certificate_directory: Some(PathBuf::from("/path/to/certs")),
..Default::default()
})
.unwrap();
validate(c, &[r"--cert-dir", "/path/to/certs"], &[]);
let c = Command::try_from(ImageProxyConfig {
insecure_skip_tls_verification: Some(true),
..Default::default()
})
.unwrap();
validate(c, &[r"--tls-verify=false"], &[]);
let c = Command::try_from(ImageProxyConfig {
insecure_policy: Some(true),
..Default::default()
})
.unwrap();
validate(c, &[r"--insecure-policy"], &[]);
let mut tmpf = cap_tempfile::TempFile::new_anonymous(tmpd).unwrap();
tmpf.write_all(r#"{ "auths": {} "#.as_bytes()).unwrap();
tmpf.seek(std::io::SeekFrom::Start(0)).unwrap();
let c = Command::try_from(ImageProxyConfig {
auth_data: Some(tmpf.into_std()),
..Default::default()
})
.unwrap();
validate(c, &["--authfile", "/proc/self/fd/100"], &[]);
let c = Command::try_from(ImageProxyConfig {
user_agent_prefix: Some("bootc/1.0".to_string()),
..Default::default()
})
.unwrap();
if supports_user_agent_prefix() {
validate(c, &["--user-agent-prefix", "bootc/1.0"], &[]);
} else {
validate(c, &[], &["--user-agent-prefix"]);
}
}
#[tokio::test]
async fn skopeo_not_found() {
let mut config = ImageProxyConfig {
..ImageProxyConfig::default()
};
config.skopeo_cmd = Some(Command::new("no-skopeo"));
match ImageProxy::new_with_config(config).await {
Ok(_) => panic!("Expected an error"),
Err(ref e @ Error::SkopeoSpawnError(ref inner)) => {
assert_eq!(inner.kind(), std::io::ErrorKind::NotFound);
assert!(e
.to_string()
.contains("skopeo spawn error: No such file or directory"));
}
Err(e) => panic!("Unexpected error {e}"),
}
}
#[tokio::test]
async fn test_proxy_send_sync() {
fn assert_send_sync(_x: impl Send + Sync) {}
let Ok(proxy) = ImageProxy::new().await else {
return;
};
assert_send_sync(&proxy);
assert_send_sync(proxy);
let opened = OpenedImage(0);
assert_send_sync(&opened);
assert_send_sync(opened);
}
fn generate_err_fd(v: serde_json::Value) -> Result<OwnedFd> {
let tmp = Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
let mut tf = cap_tempfile::TempFile::new_anonymous(&tmp).map(BufWriter::new)?;
serde_json::to_writer(&mut tf, &v)?;
let mut tf = tf.into_inner().map_err(|e| e.into_error())?;
tf.seek(std::io::SeekFrom::Start(0))?;
let r = tf.into_std().into();
Ok(r)
}
#[tokio::test]
async fn test_read_blob_error_retryable() -> Result<()> {
let retryable = serde_json::json!({
"code": "retryable",
"message": "foo",
});
let retryable = generate_err_fd(retryable)?;
let err = ImageProxy::read_blob_error(retryable).boxed();
let e = err.await.unwrap_err();
match e {
GetBlobError::Retryable(s) => assert_eq!(s.as_ref(), "foo"),
_ => panic!("Unexpected error {e:?}"),
}
Ok(())
}
#[tokio::test]
async fn test_read_blob_error_none() -> Result<()> {
let tmp = Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
let tf = cap_tempfile::TempFile::new_anonymous(&tmp)?.into_std();
let err = ImageProxy::read_blob_error(tf.into()).boxed();
err.await.unwrap();
Ok(())
}
#[tokio::test]
async fn test_read_blob_error_other() -> Result<()> {
let other = serde_json::json!({
"code": "other",
"message": "bar",
});
let other = generate_err_fd(other)?;
let err = ImageProxy::read_blob_error(other).boxed();
let e = err.await.unwrap_err();
match e {
GetBlobError::Other(s) => assert_eq!(s.as_ref(), "bar"),
_ => panic!("Unexpected error {e:?}"),
}
Ok(())
}
#[tokio::test]
async fn test_read_blob_error_epipe() -> Result<()> {
let epipe = serde_json::json!({
"code": "EPIPE",
"message": "baz",
});
let epipe = generate_err_fd(epipe)?;
let err = ImageProxy::read_blob_error(epipe).boxed();
err.await.unwrap();
Ok(())
}
fn create_dummy_fd() -> OwnedFd {
memfd_create(c"test-fd", MemfdFlags::CLOEXEC).unwrap()
}
#[test]
fn test_new_from_raw_values_finish_pipe() {
let datafd = create_dummy_fd();
let raw_datafd_val = datafd.as_raw_fd();
let fds = vec![datafd];
let v = FinishPipe::from_reply(fds, 1).unwrap();
assert_eq!(v.pipeid.0.get(), 1);
assert_eq!(v.datafd.as_raw_fd(), raw_datafd_val);
}
#[test]
fn test_new_from_raw_values_dual_fds() {
let datafd = create_dummy_fd();
let errfd = create_dummy_fd();
let raw_datafd_val = datafd.as_raw_fd();
let raw_errfd_val = errfd.as_raw_fd();
let fds = vec![datafd, errfd];
let v = DualFds::from_reply(fds, 0).unwrap();
assert_eq!(v.datafd.as_raw_fd(), raw_datafd_val);
assert_eq!(v.errfd.as_raw_fd(), raw_errfd_val);
}
#[test]
fn test_new_from_raw_values_error_too_many_fds() {
let fds = vec![create_dummy_fd(), create_dummy_fd(), create_dummy_fd()];
match DualFds::from_reply(fds, 0) {
Ok(v) => unreachable!("{v:?}"),
Err(Error::Other(msg)) => {
assert_eq!(msg.as_ref(), "Expected two fds for DualFds")
}
Err(other) => unreachable!("{other}"),
}
}
#[test]
fn test_new_from_raw_values_error_fd_with_zero_pipeid() {
let fds = vec![create_dummy_fd()];
match FinishPipe::from_reply(fds, 0) {
Ok(v) => unreachable!("{v:?}"),
Err(Error::Other(msg)) => {
assert_eq!(msg.as_ref(), "Expected pipeid for FinishPipe")
}
Err(other) => unreachable!("{other}"),
}
}
#[test]
fn test_new_from_raw_values_error_pipeid_with_both_fds() {
let fds = vec![create_dummy_fd(), create_dummy_fd()];
match DualFds::from_reply(fds, 1) {
Ok(v) => unreachable!("{v:?}"),
Err(Error::Other(msg)) => {
assert_eq!(msg.as_ref(), "Unexpected pipeid with DualFds")
}
Err(other) => unreachable!("{other}"),
}
}
#[test]
fn test_new_from_raw_values_error_no_fd_with_pipeid() {
let fds: Vec<OwnedFd> = vec![];
match FinishPipe::from_reply(fds, 1) {
Ok(v) => unreachable!("{v:?}"),
Err(Error::Other(msg)) => {
assert_eq!(msg.as_ref(), "Expected exactly one fd for FinishPipe")
}
Err(other) => unreachable!("{other}"),
}
}
#[tokio::test]
#[ignore = "https://github.com/coreos/rpm-ostree/issues/5442"]
async fn test_open_optional() -> Result<()> {
if !check_skopeo() {
return Ok(());
}
let td = tempfile::tempdir()?;
let td = td.path().to_str().unwrap();
let proxy = ImageProxy::new().await?;
let imgpath = format!("oci-archive:{td}/some-nonexistent-image.ociarchive");
let img = proxy.open_image_optional(&imgpath).await.unwrap();
assert!(img.is_none());
Ok(())
}
}