1use cap_std_ext::prelude::CapStdExtCommandExt;
8use cap_std_ext::{cap_std, cap_tempfile};
9use futures_util::Future;
10use oci_spec::image::{Descriptor, Digest};
11use serde::{Deserialize, Serialize};
12use std::fs::File;
13use std::ops::Range;
14use std::os::fd::OwnedFd;
15use std::os::unix::prelude::CommandExt;
16use std::path::PathBuf;
17use std::pin::Pin;
18use std::process::{Command, Stdio};
19use std::sync::{Arc, Mutex, OnceLock};
20use thiserror::Error;
21use tokio::io::{AsyncBufRead, AsyncReadExt};
22use tokio::sync::Mutex as AsyncMutex;
23use tokio::task::JoinError;
24use tracing::instrument;
25
26#[derive(Error, Debug)]
28#[non_exhaustive]
29pub enum Error {
30 #[error("i/o error")]
31 Io(#[from] std::io::Error),
33 #[error("skopeo spawn error: {}", .0)]
34 SkopeoSpawnError(#[source] std::io::Error),
36 #[error("serialization error")]
37 SerDe(#[from] serde_json::Error),
39 #[error("failed to invoke method {method}: {error}")]
41 RequestInitiationFailure { method: Box<str>, error: Box<str> },
42 #[error("proxy request returned error")]
44 RequestReturned(Box<str>),
45 #[error("semantic version error")]
46 SemanticVersion(#[from] semver::Error),
47 #[error("proxy too old (requested={requested_version} found={found_version}) error")]
48 ProxyTooOld {
50 requested_version: Box<str>,
51 found_version: Box<str>,
52 },
53 #[error("configuration error")]
54 Configuration(Box<str>),
56 #[error("error")]
57 Other(Box<str>),
59}
60
61impl Error {
62 pub(crate) fn new_other(e: impl Into<Box<str>>) -> Self {
63 Self::Other(e.into())
64 }
65}
66
67impl From<rustix::io::Errno> for Error {
68 fn from(value: rustix::io::Errno) -> Self {
69 Self::Io(value.into())
70 }
71}
72
73pub type Result<T> = std::result::Result<T, Error>;
75
76pub use oci_spec;
78
79pub const RESERVED_FD_RANGE: Range<i32> = 100..200;
83
84const MAX_MSG_SIZE: usize = 32 * 1024;
87
88fn base_proto_version() -> &'static semver::VersionReq {
89 static BASE_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
91 BASE_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.3").unwrap())
92}
93
94fn layer_info_proto_version() -> &'static semver::VersionReq {
95 static LAYER_INFO_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
96 LAYER_INFO_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.5").unwrap())
97}
98
99fn layer_info_piped_proto_version() -> &'static semver::VersionReq {
100 static LAYER_INFO_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
101 LAYER_INFO_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.7").unwrap())
102}
103
104#[derive(Serialize)]
105struct Request {
106 method: String,
107 args: Vec<serde_json::Value>,
108}
109
110impl Request {
111 fn new<T, I>(method: &str, args: T) -> Self
112 where
113 T: IntoIterator<Item = I>,
114 I: Into<serde_json::Value>,
115 {
116 let args: Vec<_> = args.into_iter().map(|v| v.into()).collect();
117 Self {
118 method: method.to_string(),
119 args,
120 }
121 }
122
123 fn new_bare(method: &str) -> Self {
124 Self {
125 method: method.to_string(),
126 args: vec![],
127 }
128 }
129}
130
131#[derive(Deserialize)]
132struct Reply {
133 success: bool,
134 error: String,
135 pipeid: u32,
136 value: serde_json::Value,
137}
138
139type ChildFuture = Pin<
140 Box<
141 dyn Future<Output = std::result::Result<std::io::Result<std::process::Output>, JoinError>>
142 + Send,
143 >,
144>;
145
146pub struct ImageProxy {
148 sockfd: Arc<Mutex<OwnedFd>>,
149 childwait: Arc<AsyncMutex<ChildFuture>>,
150 protover: semver::Version,
151}
152
153impl std::fmt::Debug for ImageProxy {
154 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155 f.debug_struct("ImageProxy").finish()
156 }
157}
158
159#[derive(Debug, PartialEq, Eq)]
161pub struct OpenedImage(u32);
162
163#[derive(Debug, Default)]
165pub struct ImageProxyConfig {
166 pub authfile: Option<PathBuf>,
169
170 pub auth_data: Option<File>,
172
173 pub auth_anonymous: bool,
177
178 pub certificate_directory: Option<PathBuf>,
181
182 pub decryption_keys: Option<Vec<String>>,
185
186 pub insecure_skip_tls_verification: Option<bool>,
188
189 pub debug: bool,
193
194 pub skopeo_cmd: Option<Command>,
215}
216
217impl TryFrom<ImageProxyConfig> for Command {
218 type Error = Error;
219
220 fn try_from(config: ImageProxyConfig) -> Result<Self> {
221 let debug = config.debug || std::env::var_os("CONTAINERS_IMAGE_PROXY_DEBUG").is_some();
222 let mut allocated_fds = RESERVED_FD_RANGE.clone();
223 let mut alloc_fd = || {
224 allocated_fds.next().ok_or_else(|| {
225 Error::Other("Ran out of reserved file descriptors for child".into())
226 })
227 };
228
229 let mut c = config.skopeo_cmd.unwrap_or_else(|| {
231 let mut c = std::process::Command::new("skopeo");
232 unsafe {
233 c.pre_exec(|| {
234 rustix::process::set_parent_process_death_signal(Some(
235 rustix::process::Signal::Term,
236 ))
237 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
238 });
239 }
240 c
241 });
242 c.arg("experimental-image-proxy");
243 if debug {
244 c.arg("--debug");
245 }
246 let auth_option_count = [
247 config.authfile.is_some(),
248 config.auth_data.is_some(),
249 config.auth_anonymous,
250 ]
251 .into_iter()
252 .filter(|&x| x)
253 .count();
254 if auth_option_count > 1 {
255 return Err(Error::Configuration(
257 "Conflicting authentication options".into(),
258 ));
259 }
260 if let Some(authfile) = config.authfile {
261 c.arg("--authfile");
262 c.arg(authfile);
263 } else if let Some(mut auth_data) = config.auth_data.map(std::io::BufReader::new) {
264 let target_fd = alloc_fd()?;
268 let tmpd = &cap_std::fs::Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
269 let mut tempfile =
270 cap_tempfile::TempFile::new_anonymous(tmpd).map(std::io::BufWriter::new)?;
271 std::io::copy(&mut auth_data, &mut tempfile)?;
272 let tempfile = tempfile
273 .into_inner()
274 .map_err(|e| e.into_error())?
275 .into_std();
276 let fd = std::sync::Arc::new(tempfile.into());
277 c.take_fd_n(fd, target_fd);
278 c.arg("--authfile");
279 c.arg(format!("/proc/self/fd/{target_fd}"));
280 } else if config.auth_anonymous {
281 c.arg("--no-creds");
282 }
283
284 if let Some(certificate_directory) = config.certificate_directory {
285 c.arg("--cert-dir");
286 c.arg(certificate_directory);
287 }
288
289 if let Some(decryption_keys) = config.decryption_keys {
290 for decryption_key in &decryption_keys {
291 c.arg("--decryption-key");
292 c.arg(decryption_key);
293 }
294 }
295
296 if config.insecure_skip_tls_verification.unwrap_or_default() {
297 c.arg("--tls-verify=false");
298 }
299 c.stdout(Stdio::null());
300 if !debug {
301 c.stderr(Stdio::piped());
302 }
303 Ok(c)
304 }
305}
306
307#[derive(Debug, serde::Deserialize)]
309pub struct ConvertedLayerInfo {
310 pub digest: Digest,
313
314 pub size: u64,
316
317 pub media_type: oci_spec::image::MediaType,
319}
320
321impl ImageProxy {
322 pub async fn new() -> Result<Self> {
324 Self::new_with_config(Default::default()).await
325 }
326
327 #[instrument]
329 pub async fn new_with_config(config: ImageProxyConfig) -> Result<Self> {
330 let mut c = Command::try_from(config)?;
331 let (mysock, theirsock) = rustix::net::socketpair(
332 rustix::net::AddressFamily::UNIX,
333 rustix::net::SocketType::SEQPACKET,
334 rustix::net::SocketFlags::CLOEXEC,
335 None,
336 )?;
337 c.stdin(Stdio::from(theirsock));
338 let child = match c.spawn() {
339 Ok(c) => c,
340 Err(error) => return Err(Error::SkopeoSpawnError(error)),
341 };
342 tracing::debug!("Spawned skopeo pid={:?}", child.id());
343 let childwait = tokio::task::spawn_blocking(move || child.wait_with_output());
348 let sockfd = Arc::new(Mutex::new(mysock));
349
350 let mut r = Self {
351 sockfd,
352 childwait: Arc::new(AsyncMutex::new(Box::pin(childwait))),
353 protover: semver::Version::new(0, 0, 0),
354 };
355
356 let protover = r.impl_request::<String, _, ()>("Initialize", []).await?.0;
358 tracing::debug!("Remote protocol version: {protover}");
359 let protover = semver::Version::parse(protover.as_str())?;
360 let supported = base_proto_version();
362 if !supported.matches(&protover) {
363 return Err(Error::ProxyTooOld {
364 requested_version: protover.to_string().into(),
365 found_version: supported.to_string().into(),
366 });
367 }
368 r.protover = protover;
369
370 Ok(r)
371 }
372
373 async fn impl_request_raw<T: serde::de::DeserializeOwned + Send + 'static>(
374 sockfd: Arc<Mutex<OwnedFd>>,
375 req: Request,
376 ) -> Result<(T, Option<(OwnedFd, u32)>)> {
377 tracing::trace!("sending request {}", req.method.as_str());
378 let r = tokio::task::spawn_blocking(move || {
380 let sockfd = sockfd.lock().unwrap();
381 let sendbuf = serde_json::to_vec(&req)?;
382 let sockfd = &*sockfd;
383 rustix::net::send(sockfd, &sendbuf, rustix::net::SendFlags::empty())?;
384 drop(sendbuf);
385 let mut buf = [0u8; MAX_MSG_SIZE];
386 let mut cmsg_space = vec![0; rustix::cmsg_space!(ScmRights(1))];
387 let mut cmsg_buffer = rustix::net::RecvAncillaryBuffer::new(&mut cmsg_space);
388 let iov = std::io::IoSliceMut::new(buf.as_mut());
389 let mut iov = [iov];
390 let nread = rustix::net::recvmsg(
391 sockfd,
392 &mut iov,
393 &mut cmsg_buffer,
394 rustix::net::RecvFlags::CMSG_CLOEXEC,
395 )?
396 .bytes;
397 let fdret = cmsg_buffer
398 .drain()
399 .filter_map(|m| match m {
400 rustix::net::RecvAncillaryMessage::ScmRights(f) => Some(f),
401 _ => None,
402 })
403 .flatten()
404 .next();
405 let buf = &buf[..nread];
406 let reply: Reply = serde_json::from_slice(buf)?;
407 if !reply.success {
408 return Err(Error::RequestInitiationFailure {
409 method: req.method.clone().into(),
410 error: reply.error.into(),
411 });
412 }
413 let fdret = match (fdret, reply.pipeid) {
414 (Some(fd), n) => {
415 if n == 0 {
416 return Err(Error::Other("got fd but no pipeid".into()));
417 }
418 Some((fd, n))
419 }
420 (None, n) => {
421 if n != 0 {
422 return Err(Error::Other(format!("got no fd with pipeid {}", n).into()));
423 }
424 None
425 }
426 };
427 let reply = serde_json::from_value(reply.value)?;
428 Ok((reply, fdret))
429 })
430 .await
431 .map_err(|e| Error::Other(e.to_string().into()))??;
432 tracing::trace!("completed request");
433 Ok(r)
434 }
435
436 #[instrument(skip(args))]
437 async fn impl_request<R: serde::de::DeserializeOwned + Send + 'static, T, I>(
438 &self,
439 method: &str,
440 args: T,
441 ) -> Result<(R, Option<(OwnedFd, u32)>)>
442 where
443 T: IntoIterator<Item = I>,
444 I: Into<serde_json::Value>,
445 {
446 let req = Self::impl_request_raw(Arc::clone(&self.sockfd), Request::new(method, args));
447 let mut childwait = self.childwait.lock().await;
448 tokio::select! {
449 r = req => {
450 r.map_err(|e| Error::RequestInitiationFailure {
451 method: method.to_string().into(),
452 error: e.to_string().into()
453 })
454 }
455 r = childwait.as_mut() => {
456 let r = r.map_err(|e| Error::Other(e.to_string().into()))??;
457 let stderr = String::from_utf8_lossy(&r.stderr);
458 Err(Error::Other(format!("skopeo proxy unexpectedly exited during request method {}: {}\n{}", method, r.status, stderr).into()))
459 }
460 }
461 }
462
463 #[instrument]
464 async fn finish_pipe(&self, pipeid: u32) -> Result<()> {
465 tracing::debug!("closing pipe");
466 let (r, fd) = self.impl_request("FinishPipe", [pipeid]).await?;
467 if fd.is_some() {
468 return Err(Error::Other("Unexpected fd in finish_pipe reply".into()));
469 }
470 Ok(r)
471 }
472
473 #[instrument]
474 pub async fn open_image(&self, imgref: &str) -> Result<OpenedImage> {
475 tracing::debug!("opening image");
476 let (imgid, _) = self
477 .impl_request::<u32, _, _>("OpenImage", [imgref])
478 .await?;
479 Ok(OpenedImage(imgid))
480 }
481
482 #[instrument]
483 pub async fn open_image_optional(&self, imgref: &str) -> Result<Option<OpenedImage>> {
484 tracing::debug!("opening image");
485 let (imgid, _) = self
486 .impl_request::<u32, _, _>("OpenImageOptional", [imgref])
487 .await?;
488 if imgid == 0 {
489 Ok(None)
490 } else {
491 Ok(Some(OpenedImage(imgid)))
492 }
493 }
494
495 #[instrument]
496 pub async fn close_image(&self, img: &OpenedImage) -> Result<()> {
497 tracing::debug!("closing image");
498 let (r, _) = self.impl_request("CloseImage", [img.0]).await?;
499 Ok(r)
500 }
501
502 async fn read_all_fd(&self, fd: Option<(OwnedFd, u32)>) -> Result<Vec<u8>> {
503 let (fd, pipeid) = fd.ok_or_else(|| Error::Other("Missing fd from reply".into()))?;
504 let fd = tokio::fs::File::from_std(std::fs::File::from(fd));
505 let mut fd = tokio::io::BufReader::new(fd);
506 let mut r = Vec::new();
507 let reader = fd.read_to_end(&mut r);
508 let (nbytes, finish) = tokio::join!(reader, self.finish_pipe(pipeid));
509 finish?;
510 assert_eq!(nbytes?, r.len());
511 Ok(r)
512 }
513
514 pub async fn fetch_manifest_raw_oci(&self, img: &OpenedImage) -> Result<(String, Vec<u8>)> {
518 let (digest, fd) = self.impl_request("GetManifest", [img.0]).await?;
519 Ok((digest, self.read_all_fd(fd).await?))
520 }
521
522 pub async fn fetch_manifest(
525 &self,
526 img: &OpenedImage,
527 ) -> Result<(String, oci_spec::image::ImageManifest)> {
528 let (digest, raw) = self.fetch_manifest_raw_oci(img).await?;
529 let manifest = serde_json::from_slice(&raw)?;
530 Ok((digest, manifest))
531 }
532
533 pub async fn fetch_config_raw(&self, img: &OpenedImage) -> Result<Vec<u8>> {
536 let (_, fd) = self
537 .impl_request::<(), _, _>("GetFullConfig", [img.0])
538 .await?;
539 self.read_all_fd(fd).await
540 }
541
542 pub async fn fetch_config(
545 &self,
546 img: &OpenedImage,
547 ) -> Result<oci_spec::image::ImageConfiguration> {
548 let raw = self.fetch_config_raw(img).await?;
549 serde_json::from_slice(&raw).map_err(Into::into)
550 }
551
552 #[instrument]
562 pub async fn get_blob(
563 &self,
564 img: &OpenedImage,
565 digest: &Digest,
566 size: u64,
567 ) -> Result<(
568 impl AsyncBufRead + Send + Unpin,
569 impl Future<Output = Result<()>> + Unpin + '_,
570 )> {
571 tracing::debug!("fetching blob");
574 let args: Vec<serde_json::Value> =
575 vec![img.0.into(), digest.to_string().into(), size.into()];
576 let (_bloblen, fd) = self.impl_request::<i64, _, _>("GetBlob", args).await?;
577 let (fd, pipeid) = fd.ok_or_else(|| Error::new_other("Missing fd from reply"))?;
578 let fd = tokio::fs::File::from_std(std::fs::File::from(fd));
579 let fd = tokio::io::BufReader::new(fd);
580 let finish = Box::pin(self.finish_pipe(pipeid));
581 Ok((fd, finish))
582 }
583
584 #[instrument]
586 pub async fn get_descriptor(
587 &self,
588 img: &OpenedImage,
589 descriptor: &Descriptor,
590 ) -> Result<(
591 impl AsyncBufRead + Send + Unpin,
592 impl Future<Output = Result<()>> + Unpin + '_,
593 )> {
594 self.get_blob(img, descriptor.digest(), descriptor.size())
595 .await
596 }
597
598 #[instrument]
600 pub async fn get_layer_info(
601 &self,
602 img: &OpenedImage,
603 ) -> Result<Option<Vec<ConvertedLayerInfo>>> {
604 tracing::debug!("Getting layer info");
605 if layer_info_piped_proto_version().matches(&self.protover) {
606 let (_, fd) = self
607 .impl_request::<(), _, _>("GetLayerInfoPiped", [img.0])
608 .await?;
609 let buf = self.read_all_fd(fd).await?;
610 return Ok(Some(serde_json::from_slice(&buf)?));
611 }
612 if !layer_info_proto_version().matches(&self.protover) {
613 return Ok(None);
614 }
615 let reply = self.impl_request("GetLayerInfo", [img.0]).await?;
616 let layers: Vec<ConvertedLayerInfo> = reply.0;
617 Ok(Some(layers))
618 }
619
620 #[instrument]
622 pub async fn finalize(self) -> Result<()> {
623 let _ = &self;
624 let req = Request::new_bare("Shutdown");
625 let sendbuf = serde_json::to_vec(&req)?;
626 let sockfd = Arc::try_unwrap(self.sockfd).unwrap().into_inner().unwrap();
628 rustix::net::send(sockfd, &sendbuf, rustix::net::SendFlags::empty())?;
629 drop(sendbuf);
630 tracing::debug!("sent shutdown request");
631 let mut childwait = self.childwait.lock().await;
632 let output = childwait
633 .as_mut()
634 .await
635 .map_err(|e| Error::new_other(e.to_string()))??;
636 if !output.status.success() {
637 let stderr = String::from_utf8_lossy(&output.stderr);
638 return Err(Error::RequestReturned(
639 format!("proxy failed: {}\n{}", output.status, stderr).into(),
640 ));
641 }
642 tracing::debug!("proxy exited successfully");
643 Ok(())
644 }
645}
646
647#[cfg(test)]
648mod tests {
649 use std::io::{Seek, Write};
650
651 use super::*;
652
653 fn validate(c: Command, contains: &[&str], not_contains: &[&str]) {
654 let d = format!("{:?}", c);
658 for c in contains {
659 assert!(d.contains(c), "{} missing {}", d, c);
660 }
661 for c in not_contains {
662 assert!(!d.contains(c), "{} should not contain {}", d, c);
663 }
664 }
665
666 #[test]
667 fn proxy_configs() {
668 let tmpd = &cap_tempfile::tempdir(cap_std::ambient_authority()).unwrap();
669
670 let c = Command::try_from(ImageProxyConfig::default()).unwrap();
671 validate(
672 c,
673 &["experimental-image-proxy"],
674 &["--no-creds", "--tls-verify", "--authfile"],
675 );
676
677 let c = Command::try_from(ImageProxyConfig {
678 authfile: Some(PathBuf::from("/path/to/authfile")),
679 ..Default::default()
680 })
681 .unwrap();
682 validate(c, &[r"--authfile", "/path/to/authfile"], &[]);
683
684 let decryption_key_path = "/path/to/decryption_key";
685 let c = Command::try_from(ImageProxyConfig {
686 decryption_keys: Some(vec![decryption_key_path.to_string()]),
687 ..Default::default()
688 })
689 .unwrap();
690 validate(c, &[r"--decryption-key", "/path/to/decryption_key"], &[]);
691
692 let c = Command::try_from(ImageProxyConfig {
693 certificate_directory: Some(PathBuf::from("/path/to/certs")),
694 ..Default::default()
695 })
696 .unwrap();
697 validate(c, &[r"--cert-dir", "/path/to/certs"], &[]);
698
699 let c = Command::try_from(ImageProxyConfig {
700 insecure_skip_tls_verification: Some(true),
701 ..Default::default()
702 })
703 .unwrap();
704 validate(c, &[r"--tls-verify=false"], &[]);
705
706 let mut tmpf = cap_tempfile::TempFile::new_anonymous(tmpd).unwrap();
707 tmpf.write_all(r#"{ "auths": {} "#.as_bytes()).unwrap();
708 tmpf.seek(std::io::SeekFrom::Start(0)).unwrap();
709 let c = Command::try_from(ImageProxyConfig {
710 auth_data: Some(tmpf.into_std()),
711 ..Default::default()
712 })
713 .unwrap();
714 validate(c, &["--authfile", "/proc/self/fd/100"], &[]);
715 }
716
717 #[tokio::test]
718 async fn skopeo_not_found() {
719 let mut config = ImageProxyConfig {
720 ..ImageProxyConfig::default()
721 };
722 config.skopeo_cmd = Some(Command::new("no-skopeo"));
723
724 match ImageProxy::new_with_config(config).await {
725 Ok(_) => panic!("Expected an error"),
726 Err(ref e @ Error::SkopeoSpawnError(ref inner)) => {
727 assert_eq!(inner.kind(), std::io::ErrorKind::NotFound);
728 assert!(e
730 .to_string()
731 .contains("skopeo spawn error: No such file or directory"));
732 }
733 Err(e) => panic!("Unexpected error {e}"),
734 }
735 }
736
737 #[tokio::test]
738 async fn test_proxy_send_sync() {
739 fn assert_send_sync(_x: impl Send + Sync) {}
740
741 let Ok(proxy) = ImageProxy::new().await else {
742 return;
744 };
745 assert_send_sync(&proxy);
746 assert_send_sync(proxy);
747
748 let opened = OpenedImage(0);
749 assert_send_sync(&opened);
750 assert_send_sync(opened);
751 }
752}