1use cap_std_ext::prelude::CapStdExtCommandExt;
8use cap_std_ext::{cap_std, cap_tempfile};
9use futures_util::Future;
10use oci_spec::image::{Descriptor, Digest};
11use serde::{Deserialize, Serialize};
12use std::fs::File;
13use std::ops::Range;
14use std::os::fd::OwnedFd;
15use std::os::unix::prelude::CommandExt;
16use std::path::PathBuf;
17use std::pin::Pin;
18use std::process::{Command, Stdio};
19use std::sync::{Arc, Mutex, OnceLock};
20use thiserror::Error;
21use tokio::io::{AsyncBufRead, AsyncReadExt};
22use tokio::sync::Mutex as AsyncMutex;
23use tokio::task::JoinError;
24use tracing::instrument;
25
26#[derive(Error, Debug)]
28#[non_exhaustive]
29pub enum Error {
30 #[error("i/o error")]
31 Io(#[from] std::io::Error),
33 #[error("serialization error")]
34 SerDe(#[from] serde_json::Error),
36 #[error("failed to invoke method {method}: {error}")]
38 RequestInitiationFailure { method: Box<str>, error: Box<str> },
39 #[error("proxy request returned error")]
41 RequestReturned(Box<str>),
42 #[error("semantic version error")]
43 SemanticVersion(#[from] semver::Error),
44 #[error("proxy too old (requested={requested_version} found={found_version}) error")]
45 ProxyTooOld {
47 requested_version: Box<str>,
48 found_version: Box<str>,
49 },
50 #[error("configuration error")]
51 Configuration(Box<str>),
53 #[error("error")]
54 Other(Box<str>),
56}
57
58impl Error {
59 pub(crate) fn new_other(e: impl Into<Box<str>>) -> Self {
60 Self::Other(e.into())
61 }
62}
63
64impl From<rustix::io::Errno> for Error {
65 fn from(value: rustix::io::Errno) -> Self {
66 Self::Io(value.into())
67 }
68}
69
70pub type Result<T> = std::result::Result<T, Error>;
72
73pub use oci_spec;
75
76pub const RESERVED_FD_RANGE: Range<i32> = 100..200;
80
81const MAX_MSG_SIZE: usize = 32 * 1024;
84
85fn base_proto_version() -> &'static semver::VersionReq {
86 static BASE_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
88 BASE_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.3").unwrap())
89}
90
91fn layer_info_proto_version() -> &'static semver::VersionReq {
92 static LAYER_INFO_PROTO_VERSION: OnceLock<semver::VersionReq> = OnceLock::new();
93 LAYER_INFO_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.5").unwrap())
94}
95
96#[derive(Serialize)]
97struct Request {
98 method: String,
99 args: Vec<serde_json::Value>,
100}
101
102impl Request {
103 fn new<T, I>(method: &str, args: T) -> Self
104 where
105 T: IntoIterator<Item = I>,
106 I: Into<serde_json::Value>,
107 {
108 let args: Vec<_> = args.into_iter().map(|v| v.into()).collect();
109 Self {
110 method: method.to_string(),
111 args,
112 }
113 }
114
115 fn new_bare(method: &str) -> Self {
116 Self {
117 method: method.to_string(),
118 args: vec![],
119 }
120 }
121}
122
123#[derive(Deserialize)]
124struct Reply {
125 success: bool,
126 error: String,
127 pipeid: u32,
128 value: serde_json::Value,
129}
130
131type ChildFuture = Pin<
132 Box<dyn Future<Output = std::result::Result<std::io::Result<std::process::Output>, JoinError>>>,
133>;
134
135pub struct ImageProxy {
137 sockfd: Arc<Mutex<OwnedFd>>,
138 childwait: Arc<AsyncMutex<ChildFuture>>,
139 protover: semver::Version,
140}
141
142impl std::fmt::Debug for ImageProxy {
143 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144 f.debug_struct("ImageProxy").finish()
145 }
146}
147
148#[derive(Debug, PartialEq, Eq)]
150pub struct OpenedImage(u32);
151
152#[derive(Debug, Default)]
154pub struct ImageProxyConfig {
155 pub authfile: Option<PathBuf>,
158
159 pub auth_data: Option<File>,
161
162 pub auth_anonymous: bool,
166
167 pub certificate_directory: Option<PathBuf>,
170
171 pub decryption_keys: Option<Vec<String>>,
174
175 pub insecure_skip_tls_verification: Option<bool>,
177
178 pub skopeo_cmd: Option<Command>,
199}
200
201impl TryFrom<ImageProxyConfig> for Command {
202 type Error = Error;
203
204 fn try_from(config: ImageProxyConfig) -> Result<Self> {
205 let mut allocated_fds = RESERVED_FD_RANGE.clone();
206 let mut alloc_fd = || {
207 allocated_fds.next().ok_or_else(|| {
208 Error::Other("Ran out of reserved file descriptors for child".into())
209 })
210 };
211
212 let mut c = config.skopeo_cmd.unwrap_or_else(|| {
214 let mut c = std::process::Command::new("skopeo");
215 unsafe {
216 c.pre_exec(|| {
217 rustix::process::set_parent_process_death_signal(Some(
218 rustix::process::Signal::Term,
219 ))
220 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
221 });
222 }
223 c
224 });
225 c.arg("experimental-image-proxy");
226 let auth_option_count = [
227 config.authfile.is_some(),
228 config.auth_data.is_some(),
229 config.auth_anonymous,
230 ]
231 .into_iter()
232 .filter(|&x| x)
233 .count();
234 if auth_option_count > 1 {
235 return Err(Error::Configuration(
237 "Conflicting authentication options".into(),
238 ));
239 }
240 if let Some(authfile) = config.authfile {
241 c.arg("--authfile");
242 c.arg(authfile);
243 } else if let Some(mut auth_data) = config.auth_data.map(std::io::BufReader::new) {
244 let target_fd = alloc_fd()?;
248 let tmpd = &cap_std::fs::Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?;
249 let mut tempfile =
250 cap_tempfile::TempFile::new_anonymous(tmpd).map(std::io::BufWriter::new)?;
251 std::io::copy(&mut auth_data, &mut tempfile)?;
252 let tempfile = tempfile
253 .into_inner()
254 .map_err(|e| e.into_error())?
255 .into_std();
256 let fd = std::sync::Arc::new(tempfile.into());
257 c.take_fd_n(fd, target_fd);
258 c.arg("--authfile");
259 c.arg(format!("/proc/self/fd/{target_fd}"));
260 } else if config.auth_anonymous {
261 c.arg("--no-creds");
262 }
263
264 if let Some(certificate_directory) = config.certificate_directory {
265 c.arg("--cert-dir");
266 c.arg(certificate_directory);
267 }
268
269 if let Some(decryption_keys) = config.decryption_keys {
270 for decryption_key in &decryption_keys {
271 c.arg("--decryption-key");
272 c.arg(decryption_key);
273 }
274 }
275
276 if config.insecure_skip_tls_verification.unwrap_or_default() {
277 c.arg("--tls-verify=false");
278 }
279 c.stdout(Stdio::null()).stderr(Stdio::piped());
280 Ok(c)
281 }
282}
283
284#[derive(Debug, serde::Deserialize)]
286pub struct ConvertedLayerInfo {
287 pub digest: Digest,
290
291 pub size: u64,
293
294 pub media_type: oci_spec::image::MediaType,
296}
297
298impl ImageProxy {
299 pub async fn new() -> Result<Self> {
301 Self::new_with_config(Default::default()).await
302 }
303
304 #[instrument]
306 pub async fn new_with_config(config: ImageProxyConfig) -> Result<Self> {
307 let mut c = Command::try_from(config)?;
308 let (mysock, theirsock) = rustix::net::socketpair(
309 rustix::net::AddressFamily::UNIX,
310 rustix::net::SocketType::SEQPACKET,
311 rustix::net::SocketFlags::CLOEXEC,
312 None,
313 )?;
314 c.stdin(Stdio::from(theirsock));
315 let child = c.spawn()?;
316 tracing::debug!("Spawned skopeo pid={:?}", child.id());
317 let childwait = tokio::task::spawn_blocking(move || child.wait_with_output());
322 let sockfd = Arc::new(Mutex::new(mysock));
323
324 let mut r = Self {
325 sockfd,
326 childwait: Arc::new(AsyncMutex::new(Box::pin(childwait))),
327 protover: semver::Version::new(0, 0, 0),
328 };
329
330 let protover = r.impl_request::<String, _, ()>("Initialize", []).await?.0;
332 tracing::debug!("Remote protocol version: {protover}");
333 let protover = semver::Version::parse(protover.as_str())?;
334 let supported = base_proto_version();
336 if !supported.matches(&protover) {
337 return Err(Error::ProxyTooOld {
338 requested_version: protover.to_string().into(),
339 found_version: supported.to_string().into(),
340 });
341 }
342 r.protover = protover;
343
344 Ok(r)
345 }
346
347 async fn impl_request_raw<T: serde::de::DeserializeOwned + Send + 'static>(
348 sockfd: Arc<Mutex<OwnedFd>>,
349 req: Request,
350 ) -> Result<(T, Option<(OwnedFd, u32)>)> {
351 tracing::trace!("sending request {}", req.method.as_str());
352 let r = tokio::task::spawn_blocking(move || {
354 let sockfd = sockfd.lock().unwrap();
355 let sendbuf = serde_json::to_vec(&req)?;
356 let sockfd = &*sockfd;
357 rustix::net::send(sockfd, &sendbuf, rustix::net::SendFlags::empty())?;
358 drop(sendbuf);
359 let mut buf = [0u8; MAX_MSG_SIZE];
360 let mut cmsg_space = vec![0; rustix::cmsg_space!(ScmRights(1))];
361 let mut cmsg_buffer = rustix::net::RecvAncillaryBuffer::new(&mut cmsg_space);
362 let iov = std::io::IoSliceMut::new(buf.as_mut());
363 let mut iov = [iov];
364 let nread = rustix::net::recvmsg(
365 sockfd,
366 &mut iov,
367 &mut cmsg_buffer,
368 rustix::net::RecvFlags::CMSG_CLOEXEC,
369 )?
370 .bytes;
371 let fdret = cmsg_buffer
372 .drain()
373 .filter_map(|m| match m {
374 rustix::net::RecvAncillaryMessage::ScmRights(f) => Some(f),
375 _ => None,
376 })
377 .flatten()
378 .next();
379 let buf = &buf[..nread];
380 let reply: Reply = serde_json::from_slice(buf)?;
381 if !reply.success {
382 return Err(Error::RequestInitiationFailure {
383 method: req.method.clone().into(),
384 error: reply.error.into(),
385 });
386 }
387 let fdret = match (fdret, reply.pipeid) {
388 (Some(fd), n) => {
389 if n == 0 {
390 return Err(Error::Other("got fd but no pipeid".into()));
391 }
392 Some((fd, n))
393 }
394 (None, n) => {
395 if n != 0 {
396 return Err(Error::Other(format!("got no fd with pipeid {}", n).into()));
397 }
398 None
399 }
400 };
401 let reply = serde_json::from_value(reply.value)?;
402 Ok((reply, fdret))
403 })
404 .await
405 .map_err(|e| Error::Other(e.to_string().into()))??;
406 tracing::trace!("completed request");
407 Ok(r)
408 }
409
410 #[instrument(skip(args))]
411 async fn impl_request<R: serde::de::DeserializeOwned + Send + 'static, T, I>(
412 &self,
413 method: &str,
414 args: T,
415 ) -> Result<(R, Option<(OwnedFd, u32)>)>
416 where
417 T: IntoIterator<Item = I>,
418 I: Into<serde_json::Value>,
419 {
420 let req = Self::impl_request_raw(Arc::clone(&self.sockfd), Request::new(method, args));
421 let mut childwait = self.childwait.lock().await;
422 tokio::select! {
423 r = req => {
424 r.map_err(|e| Error::RequestInitiationFailure {
425 method: method.to_string().into(),
426 error: e.to_string().into()
427 })
428 }
429 r = childwait.as_mut() => {
430 let r = r.map_err(|e| Error::Other(e.to_string().into()))??;
431 let stderr = String::from_utf8_lossy(&r.stderr);
432 Err(Error::Other(format!("skopeo proxy unexpectedly exited during request method {}: {}\n{}", method, r.status, stderr).into()))
433 }
434 }
435 }
436
437 #[instrument]
438 async fn finish_pipe(&self, pipeid: u32) -> Result<()> {
439 tracing::debug!("closing pipe");
440 let (r, fd) = self.impl_request("FinishPipe", [pipeid]).await?;
441 if fd.is_some() {
442 return Err(Error::Other("Unexpected fd in finish_pipe reply".into()));
443 }
444 Ok(r)
445 }
446
447 #[instrument]
448 pub async fn open_image(&self, imgref: &str) -> Result<OpenedImage> {
449 tracing::debug!("opening image");
450 let (imgid, _) = self
451 .impl_request::<u32, _, _>("OpenImage", [imgref])
452 .await?;
453 Ok(OpenedImage(imgid))
454 }
455
456 #[instrument]
457 pub async fn open_image_optional(&self, imgref: &str) -> Result<Option<OpenedImage>> {
458 tracing::debug!("opening image");
459 let (imgid, _) = self
460 .impl_request::<u32, _, _>("OpenImageOptional", [imgref])
461 .await?;
462 if imgid == 0 {
463 Ok(None)
464 } else {
465 Ok(Some(OpenedImage(imgid)))
466 }
467 }
468
469 #[instrument]
470 pub async fn close_image(&self, img: &OpenedImage) -> Result<()> {
471 tracing::debug!("closing image");
472 let (r, _) = self.impl_request("CloseImage", [img.0]).await?;
473 Ok(r)
474 }
475
476 async fn read_all_fd(&self, fd: Option<(OwnedFd, u32)>) -> Result<Vec<u8>> {
477 let (fd, pipeid) = fd.ok_or_else(|| Error::Other("Missing fd from reply".into()))?;
478 let fd = tokio::fs::File::from_std(std::fs::File::from(fd));
479 let mut fd = tokio::io::BufReader::new(fd);
480 let mut r = Vec::new();
481 let reader = fd.read_to_end(&mut r);
482 let (nbytes, finish) = tokio::join!(reader, self.finish_pipe(pipeid));
483 finish?;
484 assert_eq!(nbytes?, r.len());
485 Ok(r)
486 }
487
488 pub async fn fetch_manifest_raw_oci(&self, img: &OpenedImage) -> Result<(String, Vec<u8>)> {
492 let (digest, fd) = self.impl_request("GetManifest", [img.0]).await?;
493 Ok((digest, self.read_all_fd(fd).await?))
494 }
495
496 pub async fn fetch_manifest(
499 &self,
500 img: &OpenedImage,
501 ) -> Result<(String, oci_spec::image::ImageManifest)> {
502 let (digest, raw) = self.fetch_manifest_raw_oci(img).await?;
503 let manifest = serde_json::from_slice(&raw)?;
504 Ok((digest, manifest))
505 }
506
507 pub async fn fetch_config_raw(&self, img: &OpenedImage) -> Result<Vec<u8>> {
510 let (_, fd) = self
511 .impl_request::<(), _, _>("GetFullConfig", [img.0])
512 .await?;
513 self.read_all_fd(fd).await
514 }
515
516 pub async fn fetch_config(
519 &self,
520 img: &OpenedImage,
521 ) -> Result<oci_spec::image::ImageConfiguration> {
522 let raw = self.fetch_config_raw(img).await?;
523 serde_json::from_slice(&raw).map_err(Into::into)
524 }
525
526 #[instrument]
531 pub async fn get_blob(
532 &self,
533 img: &OpenedImage,
534 digest: &Digest,
535 size: u64,
536 ) -> Result<(
537 impl AsyncBufRead + Send + Unpin,
538 impl Future<Output = Result<()>> + Unpin + '_,
539 )> {
540 tracing::debug!("fetching blob");
543 let args: Vec<serde_json::Value> =
544 vec![img.0.into(), digest.to_string().into(), size.into()];
545 let (_bloblen, fd) = self.impl_request::<i64, _, _>("GetBlob", args).await?;
546 let (fd, pipeid) = fd.ok_or_else(|| Error::new_other("Missing fd from reply"))?;
547 let fd = tokio::fs::File::from_std(std::fs::File::from(fd));
548 let fd = tokio::io::BufReader::new(fd);
549 let finish = Box::pin(self.finish_pipe(pipeid));
550 Ok((fd, finish))
551 }
552
553 #[instrument]
555 pub async fn get_descriptor(
556 &self,
557 img: &OpenedImage,
558 descriptor: &Descriptor,
559 ) -> Result<(
560 impl AsyncBufRead + Send + Unpin,
561 impl Future<Output = Result<()>> + Unpin + '_,
562 )> {
563 self.get_blob(img, descriptor.digest(), descriptor.size())
564 .await
565 }
566
567 #[instrument]
569 pub async fn get_layer_info(
570 &self,
571 img: &OpenedImage,
572 ) -> Result<Option<Vec<ConvertedLayerInfo>>> {
573 tracing::debug!("Getting layer info");
574 if !layer_info_proto_version().matches(&self.protover) {
575 return Ok(None);
576 }
577 let reply = self.impl_request("GetLayerInfo", [img.0]).await?;
578 let layers: Vec<ConvertedLayerInfo> = reply.0;
579 Ok(Some(layers))
580 }
581
582 #[instrument]
584 pub async fn finalize(self) -> Result<()> {
585 let _ = &self;
586 let req = Request::new_bare("Shutdown");
587 let sendbuf = serde_json::to_vec(&req)?;
588 let sockfd = Arc::try_unwrap(self.sockfd).unwrap().into_inner().unwrap();
590 rustix::net::send(sockfd, &sendbuf, rustix::net::SendFlags::empty())?;
591 drop(sendbuf);
592 tracing::debug!("sent shutdown request");
593 let mut childwait = self.childwait.lock().await;
594 let output = childwait
595 .as_mut()
596 .await
597 .map_err(|e| Error::new_other(e.to_string()))??;
598 if !output.status.success() {
599 let stderr = String::from_utf8_lossy(&output.stderr);
600 return Err(Error::RequestReturned(
601 format!("proxy failed: {}\n{}", output.status, stderr).into(),
602 ));
603 }
604 tracing::debug!("proxy exited successfully");
605 Ok(())
606 }
607}
608
609#[cfg(test)]
610mod tests {
611 use std::io::{Seek, Write};
612
613 use super::*;
614
615 fn validate(c: Command, contains: &[&str], not_contains: &[&str]) {
616 let d = format!("{:?}", c);
620 for c in contains {
621 assert!(d.contains(c), "{} missing {}", d, c);
622 }
623 for c in not_contains {
624 assert!(!d.contains(c), "{} should not contain {}", d, c);
625 }
626 }
627
628 #[test]
629 fn proxy_configs() {
630 let tmpd = &cap_tempfile::tempdir(cap_std::ambient_authority()).unwrap();
631
632 let c = Command::try_from(ImageProxyConfig::default()).unwrap();
633 validate(
634 c,
635 &["experimental-image-proxy"],
636 &["--no-creds", "--tls-verify", "--authfile"],
637 );
638
639 let c = Command::try_from(ImageProxyConfig {
640 authfile: Some(PathBuf::from("/path/to/authfile")),
641 ..Default::default()
642 })
643 .unwrap();
644 validate(c, &[r"--authfile", "/path/to/authfile"], &[]);
645
646 let decryption_key_path = "/path/to/decryption_key";
647 let c = Command::try_from(ImageProxyConfig {
648 decryption_keys: Some(vec![decryption_key_path.to_string()]),
649 ..Default::default()
650 })
651 .unwrap();
652 validate(c, &[r"--decryption-key", "/path/to/decryption_key"], &[]);
653
654 let c = Command::try_from(ImageProxyConfig {
655 certificate_directory: Some(PathBuf::from("/path/to/certs")),
656 ..Default::default()
657 })
658 .unwrap();
659 validate(c, &[r"--cert-dir", "/path/to/certs"], &[]);
660
661 let c = Command::try_from(ImageProxyConfig {
662 insecure_skip_tls_verification: Some(true),
663 ..Default::default()
664 })
665 .unwrap();
666 validate(c, &[r"--tls-verify=false"], &[]);
667
668 let mut tmpf = cap_tempfile::TempFile::new_anonymous(tmpd).unwrap();
669 tmpf.write_all(r#"{ "auths": {} "#.as_bytes()).unwrap();
670 tmpf.seek(std::io::SeekFrom::Start(0)).unwrap();
671 let c = Command::try_from(ImageProxyConfig {
672 auth_data: Some(tmpf.into_std()),
673 ..Default::default()
674 })
675 .unwrap();
676 validate(c, &["--authfile", "/proc/self/fd/100"], &[]);
677 }
678}