use anyhow::{anyhow, Context, Result};
use cap_std_ext::prelude::CapStdExtCommandExt;
use cap_std_ext::{cap_std, cap_tempfile};
use futures_util::Future;
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::ops::Range;
use std::os::fd::OwnedFd;
use std::os::unix::prelude::CommandExt;
use std::path::PathBuf;
use std::pin::Pin;
use std::process::{Command, Stdio};
use std::sync::{Arc, Mutex, OnceLock};
use tokio::io::{AsyncBufRead, AsyncReadExt};
use tokio::sync::Mutex as AsyncMutex;
use tokio::task::JoinError;
use tracing::instrument;
pub use oci_spec;
pub const OCI_TYPE_LAYER_GZIP: &str = "application/vnd.oci.image.layer.v1.tar+gzip";
pub const OCI_TYPE_LAYER_TAR: &str = "application/vnd.oci.image.layer.v1.tar";
pub const RESERVED_FD_RANGE: Range<i32> = 100..200;
const MAX_MSG_SIZE: usize = 32 * 1024;
fn base_proto_version() -> &'static semver::VersionReq {
static BASE_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
BASE_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.3").unwrap())
}
fn layer_info_proto_version() -> &'static semver::VersionReq {
static LAYER_INFO_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
LAYER_INFO_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.5").unwrap())
}
#[derive(Serialize)]
struct Request {
method: String,
args: Vec<serde_json::Value>,
}
impl Request {
fn new<T, I>(method: &str, args: T) -> Self
where
T: IntoIterator<Item = I>,
I: Into<serde_json::Value>,
{
let args: Vec<_> = args.into_iter().map(|v| v.into()).collect();
Self {
method: method.to_string(),
args,
}
}
fn new_bare(method: &str) -> Self {
Self {
method: method.to_string(),
args: vec![],
}
}
}
#[derive(Deserialize)]
struct Reply {
success: bool,
error: String,
pipeid: u32,
value: serde_json::Value,
}
type ChildFuture = Pin<
Box<dyn Future<Output = std::result::Result<std::io::Result<std::process::Output>, JoinError>>>,
>;
pub struct ImageProxy {
sockfd: Arc<Mutex<OwnedFd>>,
childwait: Arc<AsyncMutex<ChildFuture>>,
protover: semver::Version,
}
impl std::fmt::Debug for ImageProxy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ImageProxy").finish()
}
}
#[derive(Debug, PartialEq, Eq)]
pub struct OpenedImage(u32);
#[derive(Debug, Default)]
pub struct ImageProxyConfig {
pub authfile: Option<PathBuf>,
pub auth_data: Option<File>,
pub auth_anonymous: bool,
pub certificate_directory: Option<PathBuf>,
pub decryption_keys: Option<Vec<String>>,
pub insecure_skip_tls_verification: Option<bool>,
pub skopeo_cmd: Option<Command>,
}
impl TryFrom<ImageProxyConfig> for Command {
type Error = anyhow::Error;
fn try_from(config: ImageProxyConfig) -> Result<Self, Self::Error> {
let mut allocated_fds = RESERVED_FD_RANGE.clone();
let mut alloc_fd = || {
allocated_fds
.next()
.ok_or_else(|| anyhow::anyhow!("Ran out of reserved file descriptors for child"))
};
let mut c = config.skopeo_cmd.unwrap_or_else(|| {
let mut c = std::process::Command::new("skopeo");
unsafe {
c.pre_exec(|| {
rustix::process::set_parent_process_death_signal(Some(
rustix::process::Signal::Term,
))
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
});
}
c
});
c.arg("experimental-image-proxy");
let auth_option_count = [
config.authfile.is_some(),
config.auth_data.is_some(),
config.auth_anonymous,
]
.into_iter()
.filter(|&x| x)
.count();
if auth_option_count > 1 {
anyhow::bail!("Conflicting authentication options");
}
if let Some(authfile) = config.authfile {
c.arg("--authfile");
c.arg(authfile);
} else if let Some(mut auth_data) = config.auth_data.map(std::io::BufReader::new) {
let target_fd = alloc_fd()?;
let tmpd = &cap_std::fs::Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
let mut tempfile = cap_tempfile::TempFile::new_anonymous(tmpd)
.context("Creating temporary file for auth data")
.map(std::io::BufWriter::new)?;
std::io::copy(&mut auth_data, &mut tempfile)?;
let tempfile = tempfile.into_inner()?.into_std();
let fd = std::sync::Arc::new(tempfile.into());
c.take_fd_n(fd, target_fd);
c.arg("--authfile");
c.arg(format!("/proc/self/fd/{target_fd}"));
} else if config.auth_anonymous {
c.arg("--no-creds");
}
if let Some(certificate_directory) = config.certificate_directory {
c.arg("--cert-dir");
c.arg(certificate_directory);
}
if let Some(decryption_keys) = config.decryption_keys {
for decryption_key in &decryption_keys {
c.arg("--decryption-key");
c.arg(decryption_key);
}
}
if config.insecure_skip_tls_verification.unwrap_or_default() {
c.arg("--tls-verify=false");
}
c.stdout(Stdio::null()).stderr(Stdio::piped());
Ok(c)
}
}
#[derive(Debug, serde::Deserialize)]
pub struct ConvertedLayerInfo {
pub digest: String,
pub size: i64,
pub media_type: oci_spec::image::MediaType,
}
impl ImageProxy {
pub async fn new() -> Result<Self> {
Self::new_with_config(Default::default()).await
}
#[instrument]
pub async fn new_with_config(config: ImageProxyConfig) -> Result<Self> {
let mut c = Command::try_from(config)?;
let (mysock, theirsock) = rustix::net::socketpair(
rustix::net::AddressFamily::UNIX,
rustix::net::SocketType::SEQPACKET,
rustix::net::SocketFlags::CLOEXEC,
None,
)?;
c.stdin(Stdio::from(theirsock));
let child = c.spawn().context("Failed to spawn skopeo")?;
tracing::debug!("Spawned skopeo pid={:?}", child.id());
let childwait = tokio::task::spawn_blocking(move || child.wait_with_output());
let sockfd = Arc::new(Mutex::new(mysock));
let mut r = Self {
sockfd,
childwait: Arc::new(AsyncMutex::new(Box::pin(childwait))),
protover: semver::Version::new(0, 0, 0),
};
let protover = r.impl_request::<String, _, ()>("Initialize", []).await?.0;
tracing::debug!("Remote protocol version: {protover}");
let protover = semver::Version::parse(protover.as_str())?;
let supported = base_proto_version();
if !supported.matches(&protover) {
return Err(anyhow!(
"Unsupported protocol version {} (compatible: {})",
protover,
supported
));
}
r.protover = protover;
Ok(r)
}
async fn impl_request_raw<T: serde::de::DeserializeOwned + Send + 'static>(
sockfd: Arc<Mutex<OwnedFd>>,
req: Request,
) -> Result<(T, Option<(OwnedFd, u32)>)> {
tracing::trace!("sending request {}", req.method.as_str());
let r = tokio::task::spawn_blocking(move || {
let sockfd = sockfd.lock().unwrap();
let sendbuf = serde_json::to_vec(&req)?;
let sockfd = &*sockfd;
rustix::net::send(sockfd, &sendbuf, rustix::net::SendFlags::empty())?;
drop(sendbuf);
let mut buf = [0u8; MAX_MSG_SIZE];
let mut cmsg_space = vec![0; rustix::cmsg_space!(ScmRights(1))];
let mut cmsg_buffer = rustix::net::RecvAncillaryBuffer::new(&mut cmsg_space);
let iov = std::io::IoSliceMut::new(buf.as_mut());
let mut iov = [iov];
let nread = rustix::net::recvmsg(
sockfd,
&mut iov,
&mut cmsg_buffer,
rustix::net::RecvFlags::CMSG_CLOEXEC,
)?
.bytes;
let fdret = cmsg_buffer
.drain()
.filter_map(|m| match m {
rustix::net::RecvAncillaryMessage::ScmRights(f) => Some(f),
_ => None,
})
.flatten()
.next();
let buf = &buf[..nread];
let reply: Reply = serde_json::from_slice(buf).context("Deserializing reply")?;
if !reply.success {
return Err(anyhow!("remote error: {}", reply.error));
}
let fdret = match (fdret, reply.pipeid) {
(Some(fd), n) => {
if n == 0 {
return Err(anyhow!("got fd but no pipeid"));
}
Some((fd, n))
}
(None, n) => {
if n != 0 {
return Err(anyhow!("got no fd with pipeid {}", n));
}
None
}
};
let reply = serde_json::from_value(reply.value).context("Deserializing value")?;
Ok((reply, fdret))
})
.await??;
tracing::trace!("completed request");
Ok(r)
}
#[instrument(skip(args))]
async fn impl_request<R: serde::de::DeserializeOwned + Send + 'static, T, I>(
&self,
method: &str,
args: T,
) -> Result<(R, Option<(OwnedFd, u32)>)>
where
T: IntoIterator<Item = I>,
I: Into<serde_json::Value>,
{
let req = Self::impl_request_raw(Arc::clone(&self.sockfd), Request::new(method, args));
let mut childwait = self.childwait.lock().await;
tokio::select! {
r = req => {
Ok(r.with_context(|| format!("Failed to invoke skopeo proxy method {method}"))?)
}
r = childwait.as_mut() => {
let r = r??;
let stderr = String::from_utf8_lossy(&r.stderr);
Err(anyhow::anyhow!("skopeo proxy unexpectedly exited during request method {}: {}\n{}", method, r.status, stderr))
}
}
}
#[instrument]
async fn finish_pipe(&self, pipeid: u32) -> Result<()> {
tracing::debug!("closing pipe");
let (r, fd) = self.impl_request("FinishPipe", [pipeid]).await?;
if fd.is_some() {
return Err(anyhow!("Unexpected fd in finish_pipe reply"));
}
Ok(r)
}
#[instrument]
pub async fn open_image(&self, imgref: &str) -> Result<OpenedImage> {
tracing::debug!("opening image");
let (imgid, _) = self
.impl_request::<u32, _, _>("OpenImage", [imgref])
.await?;
Ok(OpenedImage(imgid))
}
#[instrument]
#[cfg(feature = "proxy_v0_2_4")]
pub async fn open_image_optional(&self, imgref: &str) -> Result<Option<OpenedImage>> {
tracing::debug!("opening image");
let (imgid, _) = self
.impl_request::<u32, _, _>("OpenImageOptional", [imgref])
.await?;
if imgid == 0 {
Ok(None)
} else {
Ok(Some(OpenedImage(imgid)))
}
}
#[instrument]
pub async fn close_image(&self, img: &OpenedImage) -> Result<()> {
tracing::debug!("closing image");
let (r, _) = self.impl_request("CloseImage", [img.0]).await?;
Ok(r)
}
async fn read_all_fd(&self, fd: Option<(OwnedFd, u32)>) -> Result<Vec<u8>> {
let (fd, pipeid) = fd.ok_or_else(|| anyhow!("Missing fd from reply"))?;
let fd = tokio::fs::File::from_std(std::fs::File::from(fd));
let mut fd = tokio::io::BufReader::new(fd);
let mut r = Vec::new();
let reader = fd.read_to_end(&mut r);
let (nbytes, finish) = tokio::join!(reader, self.finish_pipe(pipeid));
finish?;
assert_eq!(nbytes?, r.len());
Ok(r)
}
pub async fn fetch_manifest_raw_oci(&self, img: &OpenedImage) -> Result<(String, Vec<u8>)> {
let (digest, fd) = self.impl_request("GetManifest", [img.0]).await?;
Ok((digest, self.read_all_fd(fd).await?))
}
pub async fn fetch_manifest(
&self,
img: &OpenedImage,
) -> Result<(String, oci_spec::image::ImageManifest)> {
let (digest, raw) = self.fetch_manifest_raw_oci(img).await?;
let manifest =
serde_json::from_slice(&raw).context("Deserializing manifest from skopeo")?;
Ok((digest, manifest))
}
pub async fn fetch_config_raw(&self, img: &OpenedImage) -> Result<Vec<u8>> {
let (_, fd) = self
.impl_request::<(), _, _>("GetFullConfig", [img.0])
.await?;
self.read_all_fd(fd).await
}
pub async fn fetch_config(
&self,
img: &OpenedImage,
) -> Result<oci_spec::image::ImageConfiguration> {
let raw = self.fetch_config_raw(img).await?;
serde_json::from_slice(&raw).context("Deserializing config from skopeo")
}
#[instrument]
pub async fn get_blob(
&self,
img: &OpenedImage,
digest: &str,
size: u64,
) -> Result<(
impl AsyncBufRead + Send + Unpin,
impl Future<Output = Result<()>> + Unpin + '_,
)> {
tracing::debug!("fetching blob");
let args: Vec<serde_json::Value> =
vec![img.0.into(), digest.to_string().into(), size.into()];
let (_bloblen, fd) = self.impl_request::<i64, _, _>("GetBlob", args).await?;
let (fd, pipeid) = fd.ok_or_else(|| anyhow!("Missing fd from reply"))?;
let fd = tokio::fs::File::from_std(std::fs::File::from(fd));
let fd = tokio::io::BufReader::new(fd);
let finish = Box::pin(self.finish_pipe(pipeid));
Ok((fd, finish))
}
#[instrument]
pub async fn get_layer_info(
&self,
img: &OpenedImage,
) -> Result<Option<Vec<ConvertedLayerInfo>>> {
tracing::debug!("Getting layer info");
if !layer_info_proto_version().matches(&self.protover) {
return Ok(None);
}
let reply = self.impl_request("GetLayerInfo", [img.0]).await?;
let layers: Vec<ConvertedLayerInfo> = reply.0;
Ok(Some(layers))
}
#[instrument]
pub async fn finalize(self) -> Result<()> {
let _ = &self;
let req = Request::new_bare("Shutdown");
let sendbuf = serde_json::to_vec(&req)?;
let sockfd = Arc::try_unwrap(self.sockfd).unwrap().into_inner().unwrap();
rustix::net::send(sockfd, &sendbuf, rustix::net::SendFlags::empty())?;
drop(sendbuf);
tracing::debug!("sent shutdown request");
let mut childwait = self.childwait.lock().await;
let output = childwait.as_mut().await??;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("proxy failed: {}\n{}", output.status, stderr)
}
tracing::debug!("proxy exited successfully");
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::io::{Seek, Write};
use super::*;
fn validate(c: Command, contains: &[&str], not_contains: &[&str]) {
let d = format!("{:?}", c);
for c in contains {
assert!(d.contains(c), "{} missing {}", d, c);
}
for c in not_contains {
assert!(!d.contains(c), "{} should not contain {}", d, c);
}
}
#[test]
fn proxy_configs() {
let tmpd = &cap_tempfile::tempdir(cap_std::ambient_authority()).unwrap();
let c = Command::try_from(ImageProxyConfig::default()).unwrap();
validate(
c,
&["experimental-image-proxy"],
&["--no-creds", "--tls-verify", "--authfile"],
);
let c = Command::try_from(ImageProxyConfig {
authfile: Some(PathBuf::from("/path/to/authfile")),
..Default::default()
})
.unwrap();
validate(c, &[r"--authfile", "/path/to/authfile"], &[]);
let decryption_key_path = "/path/to/decryption_key";
let c = Command::try_from(ImageProxyConfig {
decryption_keys: Some(vec![decryption_key_path.to_string()]),
..Default::default()
})
.unwrap();
validate(c, &[r"--decryption-key", "/path/to/decryption_key"], &[]);
let c = Command::try_from(ImageProxyConfig {
certificate_directory: Some(PathBuf::from("/path/to/certs")),
..Default::default()
})
.unwrap();
validate(c, &[r"--cert-dir", "/path/to/certs"], &[]);
let c = Command::try_from(ImageProxyConfig {
insecure_skip_tls_verification: Some(true),
..Default::default()
})
.unwrap();
validate(c, &[r"--tls-verify=false"], &[]);
let mut tmpf = cap_tempfile::TempFile::new_anonymous(tmpd).unwrap();
tmpf.write_all(r#"{ "auths": {} "#.as_bytes()).unwrap();
tmpf.seek(std::io::SeekFrom::Start(0)).unwrap();
let c = Command::try_from(ImageProxyConfig {
auth_data: Some(tmpf.into_std()),
..Default::default()
})
.unwrap();
validate(c, &["--authfile", "/proc/self/fd/100"], &[]);
}
}