monocore/oci/distribution/
docker.rs

1use std::{
2    ops::RangeBounds,
3    path::{Path, PathBuf},
4};
5
6use bytes::Bytes;
7use chrono::{DateTime, Utc};
8use futures::{future, stream::BoxStream, StreamExt};
9use getset::{Getters, Setters};
10use oci_spec::image::{Digest, ImageConfiguration, ImageIndex, ImageManifest, Os, Platform};
11use reqwest::Client;
12use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
13use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
14use serde::{Deserialize, Serialize};
15use thiserror::Error;
16use tokio::{
17    fs::{self, OpenOptions},
18    io::AsyncWriteExt,
19};
20
21use crate::{
22    utils::{
23        self, OCI_CONFIG_FILENAME, OCI_INDEX_FILENAME, OCI_LAYER_SUBDIR, OCI_MANIFEST_FILENAME,
24        OCI_REPO_SUBDIR, OCI_SUBDIR,
25    },
26    MonocoreError, MonocoreResult,
27};
28
29use super::{AuthProvider, OciRegistryPull};
30
31//--------------------------------------------------------------------------------------------------
32// Constants
33//--------------------------------------------------------------------------------------------------
34
35/// Base URL for Docker Registry v2 API, used for accessing image manifests, layers, and other registry operations.
36const DOCKER_REGISTRY_URL: &str = "https://registry-1.docker.io";
37
38/// The service name used during token authentication, as specified by Docker's token-based authentication scheme.
39const DOCKER_AUTH_SERVICE: &str = "registry.docker.io";
40
41/// Endpoint for acquiring authentication tokens, as described in the Docker Registry authentication workflow.
42const DOCKER_AUTH_REALM: &str = "https://auth.docker.io/token";
43
44/// The MIME type for Docker Registry v2 manifests, used to identify the format of the manifest data.
45const DOCKER_MANIFEST_MIME_TYPE: &str = "application/vnd.docker.distribution.manifest.v2+json";
46
47/// The MIME type for Docker Registry v2 manifest lists, used to identify the format of the manifest list data.
48const DOCKER_MANIFEST_LIST_MIME_TYPE: &str =
49    "application/vnd.docker.distribution.manifest.list.v2+json";
50
51/// The MIME type for Docker Registry v2 image blobs, used to identify the format of the image blob data.
52const DOCKER_IMAGE_BLOB_MIME_TYPE: &str = "application/vnd.docker.image.rootfs.diff.tar.gzip";
53
54/// The MIME type for Docker Registry v2 configuration blobs, used to identify the format of the configuration blob data.
55const DOCKER_CONFIG_MIME_TYPE: &str = "application/vnd.docker.container.image.v1+json";
56
57/// The annotation key used to identify attestation manifests in the Docker Registry.
58const DOCKER_REFERENCE_TYPE_ANNOTATION: &str = "vnd.docker.reference.type";
59
60//--------------------------------------------------------------------------------------------------
61// Types
62//--------------------------------------------------------------------------------------------------
63
64/// DockerRegistry is a client for interacting with Docker's Registry HTTP API v2.
65/// It handles authentication, image manifest retrieval, and blob fetching.
66///
67/// [See OCI distribution specification for more details on the manifest schema][OCI Distribution Spec]
68///
69/// [See Docker Registry API for more details on the API][Docker Registry API]
70///
71/// [OCI Distribution Spec]: https://distribution.github.io/distribution/spec/manifest-v2-2/#image-manifest-version-2-schema-2
72/// [Docker Registry API]: https://distribution.github.io/distribution/spec/api/#introduction
73#[derive(Debug, Getters, Setters)]
74#[getset(get = "pub with_prefix", set = "pub with_prefix")]
75pub struct DockerRegistry {
76    /// The HTTP client used to make requests to the Docker registry.
77    client: ClientWithMiddleware,
78
79    /// The path to the OCI directory where artifacts like repositories metadata and layers are stored.
80    oci_dir: PathBuf,
81}
82
83/// Stores authentication credentials obtained from the Docker registry, including tokens and expiration details.
84#[derive(Debug, Serialize, Deserialize, Getters, Setters)]
85#[getset(get = "pub with_prefix", set = "pub with_prefix")]
86pub struct DockerAuthMaterial {
87    /// The token used to authenticate requests to the Docker registry.
88    token: String,
89
90    /// The access token used to authenticate requests to the Docker registry.
91    access_token: String,
92
93    /// The expiration time of the access token.
94    expires_in: u32,
95
96    /// The time the access token was issued.
97    issued_at: DateTime<Utc>,
98}
99
100/// Represents a response from the Docker registry, which could either be successful (`Ok`) or an error (`Error`).
101#[derive(Debug, Serialize, Deserialize)]
102#[serde(untagged)]
103pub enum DockerRegistryResponse<T> {
104    /// Represents a successful response from the Docker registry.
105    Ok(T),
106
107    /// Represents an error response from the Docker registry.
108    Error(DockerRegistryResponseError),
109}
110
111/// Represents an error response from the Docker registry, including detailed error messages.
112#[derive(Debug, Serialize, Deserialize, Error)]
113#[error("docker registry error: {errors}")]
114pub struct DockerRegistryResponseError {
115    /// The errors returned by the Docker registry.
116    errors: serde_json::Value,
117}
118
119//--------------------------------------------------------------------------------------------------
120// Methods
121//--------------------------------------------------------------------------------------------------
122
123/// Creates a new instance of `DockerRegistry` with an HTTP client configured for retrying transient errors.
124/// This client is used to interact with the Docker Registry HTTP API.
125impl DockerRegistry {
126    /// Creates a new DockerRegistry instance with the default artifacts directory (MONOCORE_HOME).
127    pub fn new() -> Self {
128        Self::with_oci_dir(utils::monocore_home_path().join(OCI_SUBDIR))
129    }
130
131    /// Creates a new DockerRegistry instance with a custom base path.
132    ///
133    /// This is useful for testing or when you need to store OCI artifacts
134    /// in a different location than the default MONOCORE_HOME.
135    ///
136    /// ## Arguments
137    /// * `oci_dir` - The base path where OCI artifacts will be stored. The OCI directory
138    ///               structure will be created under this path.
139    pub fn with_oci_dir(oci_dir: PathBuf) -> Self {
140        let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3);
141        let client_builder = ClientBuilder::new(Client::new());
142        let client = client_builder
143            .with(RetryTransientMiddleware::new_with_policy(retry_policy))
144            .build();
145
146        Self { client, oci_dir }
147    }
148
149    /// Gets the size of a downloaded file if it exists.
150    fn get_downloaded_file_size(&self, path: &Path) -> u64 {
151        // If the file does not exist, return 0 indicating no bytes have been downloaded
152        if !path.exists() {
153            return 0;
154        }
155
156        path.metadata().unwrap().len()
157    }
158
159    /// Downloads a blob from the registry, supports download resumption if the file already partially exists.
160    async fn download_image_blob(
161        &self,
162        repository: &str,
163        digest: &Digest,
164        download_size: u64,
165        destination: PathBuf,
166    ) -> MonocoreResult<()> {
167        // Ensure the destination directory exists
168        if let Some(parent) = destination.parent() {
169            fs::create_dir_all(parent).await?;
170        }
171
172        // Get the size of the already downloaded file if it exists
173        let downloaded_size = self.get_downloaded_file_size(&destination);
174
175        // Open the file for writing, create if it doesn't exist
176        let mut file = if downloaded_size == 0 {
177            OpenOptions::new()
178                .create(true)
179                .truncate(true)
180                .write(true)
181                .open(&destination)
182                .await?
183        } else if downloaded_size < download_size {
184            OpenOptions::new().append(true).open(&destination).await?
185        } else {
186            tracing::info!(
187                "file already exists skipping download: {}",
188                destination.display()
189            );
190            return Ok(());
191        };
192
193        let mut stream = self
194            .fetch_image_blob(repository, digest, downloaded_size..)
195            .await?;
196
197        // Write the stream to the file
198        while let Some(chunk) = stream.next().await {
199            let bytes = chunk?;
200            file.write_all(&bytes).await?;
201        }
202
203        // Verify the hash of the downloaded file
204        let algorithm = digest.algorithm();
205        let expected_hash = digest.digest();
206        let actual_hash = hex::encode(utils::get_file_hash(&destination, algorithm).await?);
207
208        // Delete the already downloaded file if the hash does not match
209        if actual_hash != expected_hash {
210            fs::remove_file(destination).await?;
211            return Err(MonocoreError::ImageLayerDownloadFailed(format!(
212                "({repository}:{digest}) file hash {actual_hash} does not match expected hash {expected_hash}",
213            )));
214        }
215
216        Ok(())
217    }
218}
219
220//--------------------------------------------------------------------------------------------------
221// Trait Implementations
222//--------------------------------------------------------------------------------------------------
223
224#[async_trait::async_trait]
225impl AuthProvider for DockerRegistry {
226    type AuthMaterial = DockerAuthMaterial;
227
228    /// Gets the necessary authentication credentials for the given repository and tag.
229    ///
230    /// Currently, Docker tokens expire after 300 seconds, so we need to re-authenticate
231    /// after that period or just fetch new tokens on each request.
232    async fn get_auth_material(
233        &self,
234        repository: &str,
235        service: &str,
236        scopes: &[&str],
237    ) -> MonocoreResult<Self::AuthMaterial> {
238        let request = self
239            .client
240            .get(DOCKER_AUTH_REALM)
241            .query(&[
242                ("service", service),
243                (
244                    "scope",
245                    format!("repository:{}:{}", repository, scopes.join(",")).as_str(),
246                ),
247            ])
248            .build()?;
249
250        let response = self.client.execute(request).await?;
251        let auth_credentials = response.json::<DockerAuthMaterial>().await?;
252
253        Ok(auth_credentials)
254    }
255}
256
257#[async_trait::async_trait]
258impl OciRegistryPull for DockerRegistry {
259    async fn pull_image(&self, repository: &str, tag: Option<&str>) -> MonocoreResult<()> {
260        let tag = tag.unwrap_or("latest");
261        let repo_tag = format!(
262            "{}__{}",
263            utils::sanitize_name_for_path(repository),
264            utils::sanitize_name_for_path(tag)
265        );
266
267        // Create the repository tag directory
268        let repo_tag_dir = self.oci_dir.join(OCI_REPO_SUBDIR).join(&repo_tag);
269
270        fs::create_dir_all(&repo_tag_dir).await?;
271
272        // Fetch and save index
273        let index = self.fetch_index(repository, Some(tag)).await?;
274        let index_path = repo_tag_dir.join(OCI_INDEX_FILENAME);
275        fs::write(&index_path, serde_json::to_string_pretty(&index)?).await?;
276
277        // Select the right manifest for the platform or choose first if not specified
278        let platform = Platform::default();
279        let manifest_desc = index
280            .manifests()
281            .iter()
282            .find(|m| {
283                m.platform().as_ref().is_some_and(|p| {
284                    // First priority: match both Linux OS and architecture
285                    matches!(p.os(), Os::Linux) &&
286                    p.architecture() == platform.architecture() &&
287                    // Skip attestation manifests
288                    !m.annotations().as_ref().is_some_and(|a| a.contains_key(DOCKER_REFERENCE_TYPE_ANNOTATION))
289                })
290            })
291            .or_else(|| {
292                // Second priority: match architecture only, if no Linux match found
293                index.manifests().iter().find(|m| {
294                    m.platform().as_ref().is_some_and(|p| {
295                        p.architecture() == platform.architecture() &&
296                        !m.annotations().as_ref().is_some_and(|a| a.contains_key(DOCKER_REFERENCE_TYPE_ANNOTATION))
297                    })
298                })
299            })
300            .ok_or(MonocoreError::ManifestNotFound)?;
301
302        // Fetch and save manifest
303        let manifest = self
304            .fetch_manifest(repository, manifest_desc.digest())
305            .await?;
306        let manifest_path = repo_tag_dir.join(OCI_MANIFEST_FILENAME);
307        fs::write(&manifest_path, serde_json::to_string_pretty(&manifest)?).await?;
308
309        // Fetch and save config
310        let config = self
311            .fetch_config(repository, manifest.config().digest())
312            .await?;
313        let config_path = repo_tag_dir.join(OCI_CONFIG_FILENAME);
314        fs::write(&config_path, serde_json::to_string_pretty(&config)?).await?;
315
316        // Download layers concurrently
317        let layer_futures: Vec<_> = manifest
318            .layers()
319            .iter()
320            .map(|layer_desc| {
321                let layer_path = self
322                    .oci_dir
323                    .join(OCI_LAYER_SUBDIR)
324                    .join(utils::sanitize_name_for_path(layer_desc.digest().as_ref()));
325
326                self.download_image_blob(
327                    repository,
328                    layer_desc.digest(),
329                    layer_desc.size(),
330                    layer_path,
331                )
332            })
333            .collect();
334
335        // Wait for all layers to download
336        for result in future::join_all(layer_futures).await {
337            result?;
338        }
339
340        Ok(())
341    }
342
343    async fn fetch_index(&self, repository: &str, tag: Option<&str>) -> MonocoreResult<ImageIndex> {
344        let token = self
345            .get_auth_material(repository, DOCKER_AUTH_SERVICE, &["pull"])
346            .await?
347            .token;
348
349        let tag = tag.unwrap_or("latest");
350
351        let request = self
352            .client
353            .get(format!(
354                "{}/v2/{}/manifests/{}",
355                DOCKER_REGISTRY_URL, repository, tag
356            ))
357            .bearer_auth(token)
358            .header("Accept", DOCKER_MANIFEST_LIST_MIME_TYPE)
359            .build()?;
360
361        let response = self.client.execute(request).await?;
362        let image_index = response
363            .json::<DockerRegistryResponse<ImageIndex>>()
364            .await?;
365
366        match image_index {
367            DockerRegistryResponse::Ok(index) => Ok(index),
368            DockerRegistryResponse::Error(err) => Err(err.into()),
369        }
370    }
371
372    async fn fetch_manifest(
373        &self,
374        repository: &str,
375        digest: &Digest,
376    ) -> MonocoreResult<ImageManifest> {
377        let token = self
378            .get_auth_material(repository, DOCKER_AUTH_SERVICE, &["pull"])
379            .await?
380            .token;
381
382        let request = self
383            .client
384            .get(format!(
385                "{}/v2/{}/manifests/{}",
386                DOCKER_REGISTRY_URL, repository, digest
387            ))
388            .bearer_auth(token)
389            .header("Accept", DOCKER_MANIFEST_MIME_TYPE)
390            .build()?;
391
392        let response = self.client.execute(request).await?;
393        let manifest = response
394            .json::<DockerRegistryResponse<ImageManifest>>()
395            .await?;
396
397        match manifest {
398            DockerRegistryResponse::Ok(manifest) => Ok(manifest),
399            DockerRegistryResponse::Error(err) => Err(err.into()),
400        }
401    }
402
403    async fn fetch_config(
404        &self,
405        repository: &str,
406        digest: &Digest,
407    ) -> MonocoreResult<ImageConfiguration> {
408        let token = self
409            .get_auth_material(repository, DOCKER_AUTH_SERVICE, &["pull"])
410            .await?
411            .token;
412
413        let request = self
414            .client
415            .get(format!(
416                "{}/v2/{}/blobs/{}",
417                DOCKER_REGISTRY_URL, repository, digest
418            ))
419            .bearer_auth(token)
420            .header("Accept", DOCKER_CONFIG_MIME_TYPE)
421            .build()?;
422
423        let response = self.client.execute(request).await?;
424        let config = response
425            .json::<DockerRegistryResponse<ImageConfiguration>>()
426            .await?;
427
428        match config {
429            DockerRegistryResponse::Ok(config) => Ok(config),
430            DockerRegistryResponse::Error(err) => Err(err.into()),
431        }
432    }
433
434    async fn fetch_image_blob(
435        &self,
436        repository: &str,
437        digest: &Digest,
438        range: impl RangeBounds<u64> + Send,
439    ) -> MonocoreResult<BoxStream<'static, MonocoreResult<Bytes>>> {
440        let (start, end) = utils::convert_bounds(range);
441        let end = if end == u64::MAX {
442            "".to_string()
443        } else {
444            end.to_string()
445        };
446
447        tracing::info!("fetching blob: {repository} {digest} {start}-{end}");
448
449        let token = self
450            .get_auth_material(repository, DOCKER_AUTH_SERVICE, &["pull"])
451            .await?
452            .token;
453
454        let request = self
455            .client
456            .get(format!(
457                "{}/v2/{}/blobs/{}",
458                DOCKER_REGISTRY_URL, repository, digest
459            ))
460            .bearer_auth(token)
461            .header("Accept", DOCKER_IMAGE_BLOB_MIME_TYPE)
462            .header("Range", format!("bytes={start}-{end}"))
463            .build()?;
464
465        let response = self.client.execute(request).await?;
466        let stream = response
467            .bytes_stream()
468            .map(|item| item.map_err(|e| e.into()));
469
470        Ok(stream.boxed())
471    }
472}
473
474impl Default for DockerRegistry {
475    fn default() -> Self {
476        Self::new()
477    }
478}