docker_registry/v2/
blobs.rs1use std::pin::Pin;
2
3use bytes::Bytes;
4use futures::{
5 stream::Stream,
6 task::{Context, Poll},
7};
8use log::{error, trace};
9use pin_project::pin_project;
10use reqwest::{self, Method, StatusCode};
11
12use crate::{
13 errors::{Error, Result},
14 v2::*,
15};
16
17impl Client {
18 pub async fn has_blob(&self, name: &str, digest: &str) -> Result<bool> {
20 let url = {
21 let ep = format!("{}/v2/{}/blobs/{}", self.base_url, name, digest);
22 reqwest::Url::parse(&ep)?
23 };
24
25 let res = self.build_reqwest(Method::HEAD, url.clone()).send().await?;
26
27 trace!("Blob HEAD status: {:?}", res.status());
28
29 match res.status() {
30 StatusCode::OK => Ok(true),
31 _ => Ok(false),
32 }
33 }
34
35 pub async fn get_blob_response(&self, name: &str, digest: &str) -> Result<BlobResponse> {
36 let ep = format!("{}/v2/{}/blobs/{}", self.base_url, name, digest);
37 let url = reqwest::Url::parse(&ep)?;
38
39 let resp = self.build_reqwest(Method::GET, url.clone()).send().await?;
40
41 let status = resp.status();
42 trace!("GET {} status: {}", resp.url(), status);
43
44 match resp.error_for_status_ref() {
45 Ok(_) => {
46 if let Some(len) = resp.content_length() {
47 trace!("Receiving a blob with {len} bytes");
48 } else {
49 trace!("Receiving a blob");
50 }
51 Ok(BlobResponse::new(resp, ContentDigest::try_new(digest)?))
52 }
53 Err(_) if status.is_client_error() => Err(ApiErrors::from(resp).await),
54 Err(_) if status.is_server_error() => Err(Error::Server { status }),
55 Err(_) => {
56 error!("Received unexpected HTTP status '{status}'");
57 Err(Error::UnexpectedHttpStatus(status))
58 }
59 }
60 }
61
62 pub async fn get_blob(&self, name: &str, digest: &str) -> Result<Vec<u8>> {
64 self.get_blob_response(name, digest).await?.bytes().await
65 }
66
67 pub async fn get_blob_stream(&self, name: &str, digest: &str) -> Result<impl Stream<Item = Result<Vec<u8>>>> {
69 Ok(self.get_blob_response(name, digest).await?.stream())
70 }
71}
72
73#[derive(Debug)]
74pub struct BlobResponse {
75 resp: reqwest::Response,
76 digest: ContentDigest,
77}
78
79impl BlobResponse {
80 fn new(resp: reqwest::Response, digest: ContentDigest) -> Self {
81 Self { resp, digest }
82 }
83
84 pub fn size(&self) -> Option<u64> {
87 self.resp.content_length()
88 }
89
90 pub async fn bytes(self) -> Result<Vec<u8>> {
92 let blob = self.resp.bytes().await?.to_vec();
93
94 let mut digest = self.digest;
95 digest.update(&blob);
96 digest.verify()?;
97
98 Ok(blob)
99 }
100
101 pub fn stream(self) -> impl Stream<Item = Result<Vec<u8>>> {
103 BlobStream::new(self.resp.bytes_stream(), self.digest)
104 }
105}
106
107#[pin_project]
108struct BlobStream<S>
109where
110 S: Stream<Item = reqwest::Result<Bytes>>,
111{
112 #[pin]
113 stream: S,
114 #[pin]
115 digest: Option<ContentDigest>,
116}
117
118impl<S> BlobStream<S>
119where
120 S: Stream<Item = reqwest::Result<Bytes>> + Unpin,
121{
122 fn new(stream: S, digest: ContentDigest) -> Self {
123 Self {
124 stream,
125 digest: Some(digest),
126 }
127 }
128}
129
130impl<S> Stream for BlobStream<S>
131where
132 S: Stream<Item = reqwest::Result<Bytes>> + Unpin,
133{
134 type Item = Result<Vec<u8>>;
135
136 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
137 let mut this = self.project();
138 match this.stream.poll_next(cx) {
139 Poll::Ready(Some(chunk_res)) => {
140 let mut digest = match this.digest.as_pin_mut() {
141 Some(digest) => digest,
142 None => return Poll::Ready(None),
143 };
144 let chunk = chunk_res?;
145 digest.update(&chunk);
146 Poll::Ready(Some(Ok(chunk.to_vec())))
147 }
148 Poll::Ready(None) => match this.digest.take() {
149 Some(digest) => match digest.verify() {
150 Ok(()) => Poll::Ready(None),
151 Err(err) => Poll::Ready(Some(Err(err.into()))),
152 },
153 None => Poll::Ready(None),
154 },
155 Poll::Pending => Poll::Pending,
156 }
157 }
158}