docker_registry/v2/
blobs.rs

1use std::pin::Pin;
2
3use bytes::Bytes;
4use futures::{
5  stream::Stream,
6  task::{Context, Poll},
7};
8use log::{error, trace};
9use pin_project::pin_project;
10use reqwest::{self, Method, StatusCode};
11
12use crate::{
13  errors::{Error, Result},
14  v2::*,
15};
16
17impl Client {
18  /// Check if a blob exists.
19  pub async fn has_blob(&self, name: &str, digest: &str) -> Result<bool> {
20    let url = {
21      let ep = format!("{}/v2/{}/blobs/{}", self.base_url, name, digest);
22      reqwest::Url::parse(&ep)?
23    };
24
25    let res = self.build_reqwest(Method::HEAD, url.clone()).send().await?;
26
27    trace!("Blob HEAD status: {:?}", res.status());
28
29    match res.status() {
30      StatusCode::OK => Ok(true),
31      _ => Ok(false),
32    }
33  }
34
35  pub async fn get_blob_response(&self, name: &str, digest: &str) -> Result<BlobResponse> {
36    let ep = format!("{}/v2/{}/blobs/{}", self.base_url, name, digest);
37    let url = reqwest::Url::parse(&ep)?;
38
39    let resp = self.build_reqwest(Method::GET, url.clone()).send().await?;
40
41    let status = resp.status();
42    trace!("GET {} status: {}", resp.url(), status);
43
44    match resp.error_for_status_ref() {
45      Ok(_) => {
46        if let Some(len) = resp.content_length() {
47          trace!("Receiving a blob with {len} bytes");
48        } else {
49          trace!("Receiving a blob");
50        }
51        Ok(BlobResponse::new(resp, ContentDigest::try_new(digest)?))
52      }
53      Err(_) if status.is_client_error() => Err(ApiErrors::from(resp).await),
54      Err(_) if status.is_server_error() => Err(Error::Server { status }),
55      Err(_) => {
56        error!("Received unexpected HTTP status '{status}'");
57        Err(Error::UnexpectedHttpStatus(status))
58      }
59    }
60  }
61
62  /// Retrieve blob.
63  pub async fn get_blob(&self, name: &str, digest: &str) -> Result<Vec<u8>> {
64    self.get_blob_response(name, digest).await?.bytes().await
65  }
66
67  /// Retrieve blob stream.
68  pub async fn get_blob_stream(&self, name: &str, digest: &str) -> Result<impl Stream<Item = Result<Vec<u8>>>> {
69    Ok(self.get_blob_response(name, digest).await?.stream())
70  }
71}
72
73#[derive(Debug)]
74pub struct BlobResponse {
75  resp: reqwest::Response,
76  digest: ContentDigest,
77}
78
79impl BlobResponse {
80  fn new(resp: reqwest::Response, digest: ContentDigest) -> Self {
81    Self { resp, digest }
82  }
83
84  /// Get size of the blob.
85  /// This method can be useful to render progress bar when downloading a blob.
86  pub fn size(&self) -> Option<u64> {
87    self.resp.content_length()
88  }
89
90  /// Retrieve content of the blob.
91  pub async fn bytes(self) -> Result<Vec<u8>> {
92    let blob = self.resp.bytes().await?.to_vec();
93
94    let mut digest = self.digest;
95    digest.update(&blob);
96    digest.verify()?;
97
98    Ok(blob)
99  }
100
101  /// Get bytes stream of the blob.
102  pub fn stream(self) -> impl Stream<Item = Result<Vec<u8>>> {
103    BlobStream::new(self.resp.bytes_stream(), self.digest)
104  }
105}
106
107#[pin_project]
108struct BlobStream<S>
109where
110  S: Stream<Item = reqwest::Result<Bytes>>,
111{
112  #[pin]
113  stream: S,
114  #[pin]
115  digest: Option<ContentDigest>,
116}
117
118impl<S> BlobStream<S>
119where
120  S: Stream<Item = reqwest::Result<Bytes>> + Unpin,
121{
122  fn new(stream: S, digest: ContentDigest) -> Self {
123    Self {
124      stream,
125      digest: Some(digest),
126    }
127  }
128}
129
130impl<S> Stream for BlobStream<S>
131where
132  S: Stream<Item = reqwest::Result<Bytes>> + Unpin,
133{
134  type Item = Result<Vec<u8>>;
135
136  fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
137    let mut this = self.project();
138    match this.stream.poll_next(cx) {
139      Poll::Ready(Some(chunk_res)) => {
140        let mut digest = match this.digest.as_pin_mut() {
141          Some(digest) => digest,
142          None => return Poll::Ready(None),
143        };
144        let chunk = chunk_res?;
145        digest.update(&chunk);
146        Poll::Ready(Some(Ok(chunk.to_vec())))
147      }
148      Poll::Ready(None) => match this.digest.take() {
149        Some(digest) => match digest.verify() {
150          Ok(()) => Poll::Ready(None),
151          Err(err) => Poll::Ready(Some(Err(err.into()))),
152        },
153        None => Poll::Ready(None),
154      },
155      Poll::Pending => Poll::Pending,
156    }
157  }
158}