use crate::container::store::LayerProgress;
use super::*;
use containers_image_proxy::{ImageProxy, OpenedImage};
use fn_error_context::context;
use futures_util::{Future, FutureExt};
use oci_spec::image as oci_image;
use std::sync::{Arc, Mutex};
use tokio::{
io::{AsyncBufRead, AsyncRead},
sync::watch::{Receiver, Sender},
};
use tracing::instrument;
type Progress = tokio::sync::watch::Sender<u64>;
#[pin_project::pin_project]
#[derive(Debug)]
pub(crate) struct ProgressReader<T> {
#[pin]
pub(crate) reader: T,
#[pin]
pub(crate) progress: Arc<Mutex<Progress>>,
}
impl<T: AsyncRead> ProgressReader<T> {
pub(crate) fn new(reader: T) -> (Self, Receiver<u64>) {
let (progress, r) = tokio::sync::watch::channel(1);
let progress = Arc::new(Mutex::new(progress));
(ProgressReader { reader, progress }, r)
}
}
impl<T: AsyncRead> AsyncRead for ProgressReader<T> {
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
let this = self.project();
let len = buf.filled().len();
match this.reader.poll_read(cx, buf) {
v @ std::task::Poll::Ready(Ok(_)) => {
let progress = this.progress.lock().unwrap();
let state = {
let mut state = *progress.borrow();
let newlen = buf.filled().len();
debug_assert!(newlen >= len);
let read = (newlen - len) as u64;
state += read;
state
};
let _ = progress.send(state);
v
}
o => o,
}
}
}
async fn fetch_manifest_impl(
proxy: &mut ImageProxy,
imgref: &OstreeImageReference,
) -> Result<(oci_spec::image::ImageManifest, String)> {
let oi = &proxy.open_image(&imgref.imgref.to_string()).await?;
let (digest, manifest) = proxy.fetch_manifest(oi).await?;
proxy.close_image(oi).await?;
Ok((manifest, digest))
}
#[context("Fetching manifest")]
pub async fn fetch_manifest(
imgref: &OstreeImageReference,
) -> Result<(oci_spec::image::ImageManifest, String)> {
let mut proxy = ImageProxy::new().await?;
fetch_manifest_impl(&mut proxy, imgref).await
}
#[context("Fetching manifest and config")]
pub async fn fetch_manifest_and_config(
imgref: &OstreeImageReference,
) -> Result<(
oci_spec::image::ImageManifest,
String,
oci_spec::image::ImageConfiguration,
)> {
let proxy = ImageProxy::new().await?;
let oi = &proxy.open_image(&imgref.imgref.to_string()).await?;
let (digest, manifest) = proxy.fetch_manifest(oi).await?;
let config = proxy.fetch_config(oi).await?;
Ok((manifest, digest, config))
}
#[derive(Debug)]
pub struct Import {
pub ostree_commit: String,
pub image_digest: String,
pub deprecated_warning: Option<String>,
}
pub(crate) async fn join_fetch<T: std::fmt::Debug>(
worker: impl Future<Output = Result<T>>,
driver: impl Future<Output = Result<()>>,
) -> Result<T> {
let (worker, driver) = tokio::join!(worker, driver);
match (worker, driver) {
(Ok(t), Ok(())) => Ok(t),
(Err(worker), Err(driver)) => {
let text = driver.root_cause().to_string();
if text.ends_with("broken pipe") {
Err(worker)
} else {
Err(worker.context(format!("proxy failure: {} and client error", text)))
}
}
(Ok(_), Err(driver)) => Err(driver),
(Err(worker), Ok(())) => Err(worker),
}
}
#[context("Importing {}", imgref)]
#[instrument(skip(repo))]
pub async fn unencapsulate(repo: &ostree::Repo, imgref: &OstreeImageReference) -> Result<Import> {
let importer = super::store::ImageImporter::new(repo, imgref, Default::default()).await?;
importer.unencapsulate().await
}
fn new_async_decompressor<'a>(
media_type: &oci_image::MediaType,
src: impl AsyncBufRead + Send + Unpin + 'a,
) -> Result<Box<dyn AsyncBufRead + Send + Unpin + 'a>> {
match media_type {
oci_image::MediaType::ImageLayerGzip => Ok(Box::new(tokio::io::BufReader::new(
async_compression::tokio::bufread::GzipDecoder::new(src),
))),
oci_image::MediaType::ImageLayer => Ok(Box::new(src)),
o => Err(anyhow::anyhow!("Unhandled layer type: {}", o)),
}
}
pub(crate) async fn fetch_layer_decompress<'a>(
proxy: &'a mut ImageProxy,
img: &OpenedImage,
manifest: &oci_image::ImageManifest,
layer: &'a oci_image::Descriptor,
progress: Option<&'a Sender<Option<store::LayerProgress>>>,
layer_info: Option<&Vec<containers_image_proxy::ConvertedLayerInfo>>,
transport_src: Transport,
) -> Result<(
Box<dyn AsyncBufRead + Send + Unpin>,
impl Future<Output = Result<()>> + 'a,
)> {
use futures_util::future::Either;
tracing::debug!("fetching {}", layer.digest());
let layer_index = manifest.layers().iter().position(|x| x == layer).unwrap();
let (blob, driver, size);
let media_type: &oci_image::MediaType;
match transport_src {
Transport::ContainerStorage => {
let layer_info = layer_info
.ok_or_else(|| anyhow!("skopeo too old to pull from containers-storage"))?;
let n_layers = layer_info.len();
let layer_blob = layer_info.get(layer_index).ok_or_else(|| {
anyhow!("blobid position {layer_index} exceeds diffid count {n_layers}")
})?;
size = layer_blob.size;
media_type = &layer_blob.media_type;
(blob, driver) = proxy
.get_blob(img, layer_blob.digest.as_str(), size as u64)
.await?;
}
_ => {
size = layer.size();
media_type = layer.media_type();
(blob, driver) = proxy
.get_blob(img, layer.digest().as_str(), size as u64)
.await?;
}
};
if let Some(progress) = progress {
let (readprogress, mut readwatch) = ProgressReader::new(blob);
let readprogress = tokio::io::BufReader::new(readprogress);
let readproxy = async move {
while let Ok(()) = readwatch.changed().await {
let fetched = readwatch.borrow_and_update();
let status = LayerProgress {
layer_index,
fetched: *fetched,
total: size as u64,
};
progress.send_replace(Some(status));
}
};
let reader = new_async_decompressor(media_type, readprogress)?;
let driver = futures_util::future::join(readproxy, driver).map(|r| r.1);
Ok((reader, Either::Left(driver)))
} else {
let blob = new_async_decompressor(media_type, blob)?;
Ok((blob, Either::Right(driver)))
}
}