monocore/oci/distribution/
docker.rs1use std::{
2 ops::RangeBounds,
3 path::{Path, PathBuf},
4};
5
6use bytes::Bytes;
7use chrono::{DateTime, Utc};
8use futures::{future, stream::BoxStream, StreamExt};
9use getset::{Getters, Setters};
10use oci_spec::image::{Digest, ImageConfiguration, ImageIndex, ImageManifest, Os, Platform};
11use reqwest::Client;
12use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
13use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
14use serde::{Deserialize, Serialize};
15use thiserror::Error;
16use tokio::{
17 fs::{self, OpenOptions},
18 io::AsyncWriteExt,
19};
20
21use crate::{
22 utils::{
23 self, OCI_CONFIG_FILENAME, OCI_INDEX_FILENAME, OCI_LAYER_SUBDIR, OCI_MANIFEST_FILENAME,
24 OCI_REPO_SUBDIR, OCI_SUBDIR,
25 },
26 MonocoreError, MonocoreResult,
27};
28
29use super::{AuthProvider, OciRegistryPull};
30
31const DOCKER_REGISTRY_URL: &str = "https://registry-1.docker.io";
37
38const DOCKER_AUTH_SERVICE: &str = "registry.docker.io";
40
41const DOCKER_AUTH_REALM: &str = "https://auth.docker.io/token";
43
44const DOCKER_MANIFEST_MIME_TYPE: &str = "application/vnd.docker.distribution.manifest.v2+json";
46
47const DOCKER_MANIFEST_LIST_MIME_TYPE: &str =
49 "application/vnd.docker.distribution.manifest.list.v2+json";
50
51const DOCKER_IMAGE_BLOB_MIME_TYPE: &str = "application/vnd.docker.image.rootfs.diff.tar.gzip";
53
54const DOCKER_CONFIG_MIME_TYPE: &str = "application/vnd.docker.container.image.v1+json";
56
57const DOCKER_REFERENCE_TYPE_ANNOTATION: &str = "vnd.docker.reference.type";
59
60#[derive(Debug, Getters, Setters)]
74#[getset(get = "pub with_prefix", set = "pub with_prefix")]
75pub struct DockerRegistry {
76 client: ClientWithMiddleware,
78
79 oci_dir: PathBuf,
81}
82
83#[derive(Debug, Serialize, Deserialize, Getters, Setters)]
85#[getset(get = "pub with_prefix", set = "pub with_prefix")]
86pub struct DockerAuthMaterial {
87 token: String,
89
90 access_token: String,
92
93 expires_in: u32,
95
96 issued_at: DateTime<Utc>,
98}
99
100#[derive(Debug, Serialize, Deserialize)]
102#[serde(untagged)]
103pub enum DockerRegistryResponse<T> {
104 Ok(T),
106
107 Error(DockerRegistryResponseError),
109}
110
111#[derive(Debug, Serialize, Deserialize, Error)]
113#[error("docker registry error: {errors}")]
114pub struct DockerRegistryResponseError {
115 errors: serde_json::Value,
117}
118
119impl DockerRegistry {
126 pub fn new() -> Self {
128 Self::with_oci_dir(utils::monocore_home_path().join(OCI_SUBDIR))
129 }
130
131 pub fn with_oci_dir(oci_dir: PathBuf) -> Self {
140 let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3);
141 let client_builder = ClientBuilder::new(Client::new());
142 let client = client_builder
143 .with(RetryTransientMiddleware::new_with_policy(retry_policy))
144 .build();
145
146 Self { client, oci_dir }
147 }
148
149 fn get_downloaded_file_size(&self, path: &Path) -> u64 {
151 if !path.exists() {
153 return 0;
154 }
155
156 path.metadata().unwrap().len()
157 }
158
159 async fn download_image_blob(
161 &self,
162 repository: &str,
163 digest: &Digest,
164 download_size: u64,
165 destination: PathBuf,
166 ) -> MonocoreResult<()> {
167 if let Some(parent) = destination.parent() {
169 fs::create_dir_all(parent).await?;
170 }
171
172 let downloaded_size = self.get_downloaded_file_size(&destination);
174
175 let mut file = if downloaded_size == 0 {
177 OpenOptions::new()
178 .create(true)
179 .truncate(true)
180 .write(true)
181 .open(&destination)
182 .await?
183 } else if downloaded_size < download_size {
184 OpenOptions::new().append(true).open(&destination).await?
185 } else {
186 tracing::info!(
187 "file already exists skipping download: {}",
188 destination.display()
189 );
190 return Ok(());
191 };
192
193 let mut stream = self
194 .fetch_image_blob(repository, digest, downloaded_size..)
195 .await?;
196
197 while let Some(chunk) = stream.next().await {
199 let bytes = chunk?;
200 file.write_all(&bytes).await?;
201 }
202
203 let algorithm = digest.algorithm();
205 let expected_hash = digest.digest();
206 let actual_hash = hex::encode(utils::get_file_hash(&destination, algorithm).await?);
207
208 if actual_hash != expected_hash {
210 fs::remove_file(destination).await?;
211 return Err(MonocoreError::ImageLayerDownloadFailed(format!(
212 "({repository}:{digest}) file hash {actual_hash} does not match expected hash {expected_hash}",
213 )));
214 }
215
216 Ok(())
217 }
218}
219
220#[async_trait::async_trait]
225impl AuthProvider for DockerRegistry {
226 type AuthMaterial = DockerAuthMaterial;
227
228 async fn get_auth_material(
233 &self,
234 repository: &str,
235 service: &str,
236 scopes: &[&str],
237 ) -> MonocoreResult<Self::AuthMaterial> {
238 let request = self
239 .client
240 .get(DOCKER_AUTH_REALM)
241 .query(&[
242 ("service", service),
243 (
244 "scope",
245 format!("repository:{}:{}", repository, scopes.join(",")).as_str(),
246 ),
247 ])
248 .build()?;
249
250 let response = self.client.execute(request).await?;
251 let auth_credentials = response.json::<DockerAuthMaterial>().await?;
252
253 Ok(auth_credentials)
254 }
255}
256
257#[async_trait::async_trait]
258impl OciRegistryPull for DockerRegistry {
259 async fn pull_image(&self, repository: &str, tag: Option<&str>) -> MonocoreResult<()> {
260 let tag = tag.unwrap_or("latest");
261 let repo_tag = format!(
262 "{}__{}",
263 utils::sanitize_name_for_path(repository),
264 utils::sanitize_name_for_path(tag)
265 );
266
267 let repo_tag_dir = self.oci_dir.join(OCI_REPO_SUBDIR).join(&repo_tag);
269
270 fs::create_dir_all(&repo_tag_dir).await?;
271
272 let index = self.fetch_index(repository, Some(tag)).await?;
274 let index_path = repo_tag_dir.join(OCI_INDEX_FILENAME);
275 fs::write(&index_path, serde_json::to_string_pretty(&index)?).await?;
276
277 let platform = Platform::default();
279 let manifest_desc = index
280 .manifests()
281 .iter()
282 .find(|m| {
283 m.platform().as_ref().is_some_and(|p| {
284 matches!(p.os(), Os::Linux) &&
286 p.architecture() == platform.architecture() &&
287 !m.annotations().as_ref().is_some_and(|a| a.contains_key(DOCKER_REFERENCE_TYPE_ANNOTATION))
289 })
290 })
291 .or_else(|| {
292 index.manifests().iter().find(|m| {
294 m.platform().as_ref().is_some_and(|p| {
295 p.architecture() == platform.architecture() &&
296 !m.annotations().as_ref().is_some_and(|a| a.contains_key(DOCKER_REFERENCE_TYPE_ANNOTATION))
297 })
298 })
299 })
300 .ok_or(MonocoreError::ManifestNotFound)?;
301
302 let manifest = self
304 .fetch_manifest(repository, manifest_desc.digest())
305 .await?;
306 let manifest_path = repo_tag_dir.join(OCI_MANIFEST_FILENAME);
307 fs::write(&manifest_path, serde_json::to_string_pretty(&manifest)?).await?;
308
309 let config = self
311 .fetch_config(repository, manifest.config().digest())
312 .await?;
313 let config_path = repo_tag_dir.join(OCI_CONFIG_FILENAME);
314 fs::write(&config_path, serde_json::to_string_pretty(&config)?).await?;
315
316 let layer_futures: Vec<_> = manifest
318 .layers()
319 .iter()
320 .map(|layer_desc| {
321 let layer_path = self
322 .oci_dir
323 .join(OCI_LAYER_SUBDIR)
324 .join(utils::sanitize_name_for_path(layer_desc.digest().as_ref()));
325
326 self.download_image_blob(
327 repository,
328 layer_desc.digest(),
329 layer_desc.size(),
330 layer_path,
331 )
332 })
333 .collect();
334
335 for result in future::join_all(layer_futures).await {
337 result?;
338 }
339
340 Ok(())
341 }
342
343 async fn fetch_index(&self, repository: &str, tag: Option<&str>) -> MonocoreResult<ImageIndex> {
344 let token = self
345 .get_auth_material(repository, DOCKER_AUTH_SERVICE, &["pull"])
346 .await?
347 .token;
348
349 let tag = tag.unwrap_or("latest");
350
351 let request = self
352 .client
353 .get(format!(
354 "{}/v2/{}/manifests/{}",
355 DOCKER_REGISTRY_URL, repository, tag
356 ))
357 .bearer_auth(token)
358 .header("Accept", DOCKER_MANIFEST_LIST_MIME_TYPE)
359 .build()?;
360
361 let response = self.client.execute(request).await?;
362 let image_index = response
363 .json::<DockerRegistryResponse<ImageIndex>>()
364 .await?;
365
366 match image_index {
367 DockerRegistryResponse::Ok(index) => Ok(index),
368 DockerRegistryResponse::Error(err) => Err(err.into()),
369 }
370 }
371
372 async fn fetch_manifest(
373 &self,
374 repository: &str,
375 digest: &Digest,
376 ) -> MonocoreResult<ImageManifest> {
377 let token = self
378 .get_auth_material(repository, DOCKER_AUTH_SERVICE, &["pull"])
379 .await?
380 .token;
381
382 let request = self
383 .client
384 .get(format!(
385 "{}/v2/{}/manifests/{}",
386 DOCKER_REGISTRY_URL, repository, digest
387 ))
388 .bearer_auth(token)
389 .header("Accept", DOCKER_MANIFEST_MIME_TYPE)
390 .build()?;
391
392 let response = self.client.execute(request).await?;
393 let manifest = response
394 .json::<DockerRegistryResponse<ImageManifest>>()
395 .await?;
396
397 match manifest {
398 DockerRegistryResponse::Ok(manifest) => Ok(manifest),
399 DockerRegistryResponse::Error(err) => Err(err.into()),
400 }
401 }
402
403 async fn fetch_config(
404 &self,
405 repository: &str,
406 digest: &Digest,
407 ) -> MonocoreResult<ImageConfiguration> {
408 let token = self
409 .get_auth_material(repository, DOCKER_AUTH_SERVICE, &["pull"])
410 .await?
411 .token;
412
413 let request = self
414 .client
415 .get(format!(
416 "{}/v2/{}/blobs/{}",
417 DOCKER_REGISTRY_URL, repository, digest
418 ))
419 .bearer_auth(token)
420 .header("Accept", DOCKER_CONFIG_MIME_TYPE)
421 .build()?;
422
423 let response = self.client.execute(request).await?;
424 let config = response
425 .json::<DockerRegistryResponse<ImageConfiguration>>()
426 .await?;
427
428 match config {
429 DockerRegistryResponse::Ok(config) => Ok(config),
430 DockerRegistryResponse::Error(err) => Err(err.into()),
431 }
432 }
433
434 async fn fetch_image_blob(
435 &self,
436 repository: &str,
437 digest: &Digest,
438 range: impl RangeBounds<u64> + Send,
439 ) -> MonocoreResult<BoxStream<'static, MonocoreResult<Bytes>>> {
440 let (start, end) = utils::convert_bounds(range);
441 let end = if end == u64::MAX {
442 "".to_string()
443 } else {
444 end.to_string()
445 };
446
447 tracing::info!("fetching blob: {repository} {digest} {start}-{end}");
448
449 let token = self
450 .get_auth_material(repository, DOCKER_AUTH_SERVICE, &["pull"])
451 .await?
452 .token;
453
454 let request = self
455 .client
456 .get(format!(
457 "{}/v2/{}/blobs/{}",
458 DOCKER_REGISTRY_URL, repository, digest
459 ))
460 .bearer_auth(token)
461 .header("Accept", DOCKER_IMAGE_BLOB_MIME_TYPE)
462 .header("Range", format!("bytes={start}-{end}"))
463 .build()?;
464
465 let response = self.client.execute(request).await?;
466 let stream = response
467 .bytes_stream()
468 .map(|item| item.map_err(|e| e.into()));
469
470 Ok(stream.boxed())
471 }
472}
473
474impl Default for DockerRegistry {
475 fn default() -> Self {
476 Self::new()
477 }
478}