1use crate::container::store::LayerProgress;
35
36use super::*;
37use containers_image_proxy::{ImageProxy, OpenedImage};
38use fn_error_context::context;
39use futures_util::{Future, FutureExt};
40use oci_spec::image::{self as oci_image, Digest};
41use std::io::Read;
42use std::sync::{Arc, Mutex};
43use tokio::{
44 io::{AsyncBufRead, AsyncRead},
45 sync::watch::{Receiver, Sender},
46};
47use tracing::instrument;
48
49const DOCKER_TYPE_LAYER_TAR: &str = "application/vnd.docker.image.rootfs.diff.tar";
53
54type Progress = tokio::sync::watch::Sender<u64>;
55
56#[pin_project::pin_project]
58#[derive(Debug)]
59pub(crate) struct ProgressReader<T> {
60 #[pin]
61 pub(crate) reader: T,
62 #[pin]
63 pub(crate) progress: Arc<Mutex<Progress>>,
64}
65
66impl<T: AsyncRead> ProgressReader<T> {
67 pub(crate) fn new(reader: T) -> (Self, Receiver<u64>) {
68 let (progress, r) = tokio::sync::watch::channel(1);
69 let progress = Arc::new(Mutex::new(progress));
70 (ProgressReader { reader, progress }, r)
71 }
72}
73
74impl<T: AsyncRead> AsyncRead for ProgressReader<T> {
75 fn poll_read(
76 self: std::pin::Pin<&mut Self>,
77 cx: &mut std::task::Context<'_>,
78 buf: &mut tokio::io::ReadBuf<'_>,
79 ) -> std::task::Poll<std::io::Result<()>> {
80 let this = self.project();
81 let len = buf.filled().len();
82 match this.reader.poll_read(cx, buf) {
83 v @ std::task::Poll::Ready(Ok(_)) => {
84 let progress = this.progress.lock().unwrap();
85 let state = {
86 let mut state = *progress.borrow();
87 let newlen = buf.filled().len();
88 debug_assert!(newlen >= len);
89 let read = (newlen - len) as u64;
90 state += read;
91 state
92 };
93 let _ = progress.send(state);
95 v
96 }
97 o => o,
98 }
99 }
100}
101
102async fn fetch_manifest_impl(
103 proxy: &mut ImageProxy,
104 imgref: &OstreeImageReference,
105) -> Result<(oci_image::ImageManifest, oci_image::Digest)> {
106 let oi = &proxy.open_image(&imgref.imgref.to_string()).await?;
107 let (digest, manifest) = proxy.fetch_manifest(oi).await?;
108 proxy.close_image(oi).await?;
109 Ok((manifest, oci_image::Digest::from_str(digest.as_str())?))
110}
111
112#[context("Fetching manifest")]
114pub async fn fetch_manifest(
115 imgref: &OstreeImageReference,
116) -> Result<(oci_image::ImageManifest, oci_image::Digest)> {
117 let mut proxy = ImageProxy::new().await?;
118 fetch_manifest_impl(&mut proxy, imgref).await
119}
120
121#[context("Fetching manifest and config")]
123pub async fn fetch_manifest_and_config(
124 imgref: &OstreeImageReference,
125) -> Result<(
126 oci_image::ImageManifest,
127 oci_image::Digest,
128 oci_image::ImageConfiguration,
129)> {
130 let proxy = ImageProxy::new().await?;
131 let oi = &proxy.open_image(&imgref.imgref.to_string()).await?;
132 let (digest, manifest) = proxy.fetch_manifest(oi).await?;
133 let digest = oci_image::Digest::from_str(&digest)?;
134 let config = proxy.fetch_config(oi).await?;
135 Ok((manifest, digest, config))
136}
137
138#[derive(Debug)]
140pub struct Import {
141 pub ostree_commit: String,
143 pub image_digest: Digest,
145
146 pub deprecated_warning: Option<String>,
148}
149
150pub(crate) async fn join_fetch<T: std::fmt::Debug>(
166 worker: impl Future<Output = Result<T>>,
167 driver: impl Future<Output = Result<()>>,
168) -> Result<T> {
169 let (worker, driver) = tokio::join!(worker, driver);
170 match (worker, driver) {
171 (Ok(t), Ok(())) => Ok(t),
172 (Err(worker), Err(driver)) => {
173 let text = driver.root_cause().to_string();
174 if text.ends_with("broken pipe") {
175 tracing::trace!("Ignoring broken pipe failure from driver");
176 Err(worker)
177 } else {
178 Err(worker.context(format!("proxy failure: {} and client error", text)))
179 }
180 }
181 (Ok(_), Err(driver)) => Err(driver),
182 (Err(worker), Ok(())) => Err(worker),
183 }
184}
185
186#[context("Importing {}", imgref)]
188#[instrument(level = "debug", skip(repo))]
189pub async fn unencapsulate(repo: &ostree::Repo, imgref: &OstreeImageReference) -> Result<Import> {
190 let importer = super::store::ImageImporter::new(repo, imgref, Default::default()).await?;
191 importer.unencapsulate().await
192}
193
194pub(crate) fn decompressor(
196 media_type: &oci_image::MediaType,
197 src: impl Read + Send + 'static,
198) -> Result<Box<dyn Read + Send + 'static>> {
199 let r: Box<dyn std::io::Read + Send + 'static> = match media_type {
200 m @ (oci_image::MediaType::ImageLayerGzip | oci_image::MediaType::ImageLayerZstd) => {
201 if matches!(m, oci_image::MediaType::ImageLayerZstd) {
202 Box::new(zstd::stream::read::Decoder::new(src)?)
203 } else {
204 Box::new(flate2::bufread::GzDecoder::new(std::io::BufReader::new(
205 src,
206 )))
207 }
208 }
209 oci_image::MediaType::ImageLayer => Box::new(src),
210 oci_image::MediaType::Other(t) if t.as_str() == DOCKER_TYPE_LAYER_TAR => Box::new(src),
211 o => anyhow::bail!("Unhandled layer type: {}", o),
212 };
213 Ok(r)
214}
215
216pub(crate) async fn fetch_layer<'a>(
218 proxy: &'a ImageProxy,
219 img: &OpenedImage,
220 manifest: &oci_image::ImageManifest,
221 layer: &'a oci_image::Descriptor,
222 progress: Option<&'a Sender<Option<store::LayerProgress>>>,
223 layer_info: Option<&Vec<containers_image_proxy::ConvertedLayerInfo>>,
224 transport_src: Transport,
225) -> Result<(
226 Box<dyn AsyncBufRead + Send + Unpin>,
227 impl Future<Output = Result<()>> + 'a,
228 oci_image::MediaType,
229)> {
230 use futures_util::future::Either;
231 tracing::debug!("fetching {}", layer.digest());
232 let layer_index = manifest.layers().iter().position(|x| x == layer).unwrap();
233 let (blob, driver, size);
234 let media_type: oci_image::MediaType;
235 match transport_src {
236 Transport::ContainerStorage => {
237 let layer_info = layer_info
238 .ok_or_else(|| anyhow!("skopeo too old to pull from containers-storage"))?;
239 let n_layers = layer_info.len();
240 let layer_blob = layer_info.get(layer_index).ok_or_else(|| {
241 anyhow!("blobid position {layer_index} exceeds diffid count {n_layers}")
242 })?;
243 size = layer_blob.size;
244 media_type = layer_blob.media_type.clone();
245 (blob, driver) = proxy.get_blob(img, &layer_blob.digest, size).await?;
246 }
247 _ => {
248 size = layer.size();
249 media_type = layer.media_type().clone();
250 (blob, driver) = proxy.get_blob(img, layer.digest(), size).await?;
251 }
252 };
253
254 let driver = async { driver.await.map_err(Into::into) };
255
256 if let Some(progress) = progress {
257 let (readprogress, mut readwatch) = ProgressReader::new(blob);
258 let readprogress = tokio::io::BufReader::new(readprogress);
259 let readproxy = async move {
260 while let Ok(()) = readwatch.changed().await {
261 let fetched = readwatch.borrow_and_update();
262 let status = LayerProgress {
263 layer_index,
264 fetched: *fetched,
265 total: size,
266 };
267 progress.send_replace(Some(status));
268 }
269 };
270 let reader = Box::new(readprogress);
271 let driver = futures_util::future::join(readproxy, driver).map(|r| r.1);
272 Ok((reader, Either::Left(driver), media_type))
273 } else {
274 Ok((Box::new(blob), Either::Right(driver), media_type))
275 }
276}