microsandbox_core/oci/implementations/
docker.rs

1use std::{
2    ops::RangeBounds,
3    path::{Path, PathBuf},
4};
5
6use async_trait::async_trait;
7use bytes::Bytes;
8use chrono::{DateTime, Utc};
9use futures::{future, stream::BoxStream, StreamExt};
10use getset::{Getters, Setters};
11use microsandbox_utils::{env, EXTRACTED_LAYER_SUFFIX, LAYERS_SUBDIR};
12use oci_spec::image::{Digest, ImageConfiguration, ImageIndex, ImageManifest, Os, Platform};
13use reqwest::Client;
14use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
15use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
16use serde::{Deserialize, Serialize};
17use sqlx::{Pool, Sqlite};
18use thiserror::Error;
19use tokio::{
20    fs::{self, OpenOptions},
21    io::AsyncWriteExt,
22};
23
24use crate::{
25    management::db,
26    oci::{OciRegistryPull, ReferenceSelector},
27    utils, MicrosandboxError, MicrosandboxResult,
28};
29
30#[cfg(feature = "cli")]
31use indicatif::{ProgressBar, ProgressStyle};
32#[cfg(feature = "cli")]
33use microsandbox_utils::term::{self, MULTI_PROGRESS};
34
35//--------------------------------------------------------------------------------------------------
36// Constants
37//--------------------------------------------------------------------------------------------------
38
39/// The domain name of the Docker registry, used to construct image references.
40pub const DOCKER_REFERENCE_REGISTRY_DOMAIN: &str = "docker.io";
41
42/// Base URL for Docker Registry v2 API, used for accessing image manifests, layers, and other registry operations.
43const DOCKER_REGISTRY_URL: &str = "https://registry-1.docker.io";
44
45/// The service name used during token authentication, as specified by Docker's token-based authentication scheme.
46const DOCKER_AUTH_SERVICE: &str = "registry.docker.io";
47
48/// Endpoint for acquiring authentication tokens, as described in the Docker Registry authentication workflow.
49const DOCKER_AUTH_REALM: &str = "https://auth.docker.io/token";
50
51/// The MIME type for Docker Registry v2 manifests, used to identify the format of the manifest data.
52const DOCKER_MANIFEST_MIME_TYPE: &str = "application/vnd.docker.distribution.manifest.v2+json";
53
54/// The MIME type for Docker Registry v2 manifest lists, used to identify the format of the manifest list data.
55const DOCKER_MANIFEST_LIST_MIME_TYPE: &str =
56    "application/vnd.docker.distribution.manifest.list.v2+json";
57
58/// The MIME type for Docker Registry v2 image blobs, used to identify the format of the image blob data.
59const DOCKER_IMAGE_BLOB_MIME_TYPE: &str = "application/vnd.docker.image.rootfs.diff.tar.gzip";
60
61/// The MIME type for Docker Registry v2 configuration blobs, used to identify the format of the configuration blob data.
62const DOCKER_CONFIG_MIME_TYPE: &str = "application/vnd.docker.container.image.v1+json";
63
64/// The annotation key used to identify attestation manifests in the Docker Registry.
65const DOCKER_REFERENCE_TYPE_ANNOTATION: &str = "vnd.docker.reference.type";
66
67#[cfg(feature = "cli")]
68/// Spinner message used for fetching image details.
69const FETCH_IMAGE_DETAILS_MSG: &str = "Fetch image details";
70
71#[cfg(feature = "cli")]
72/// Spinner message used for downloading layers.
73const DOWNLOAD_LAYER_MSG: &str = "Download layers";
74
75//--------------------------------------------------------------------------------------------------
76// Types
77//--------------------------------------------------------------------------------------------------
78
79/// DockerRegistry is a client for interacting with Docker's Registry HTTP API v2.
80/// It handles authentication, image manifest retrieval, and blob fetching.
81///
82/// [See OCI distribution specification for more details on the manifest schema][OCI Distribution Spec]
83///
84/// [See Docker Registry API for more details on the API][Docker Registry API]
85///
86/// [OCI Distribution Spec]: https://distribution.github.io/distribution/spec/manifest-v2-2/#image-manifest-version-2-schema-2
87/// [Docker Registry API]: https://distribution.github.io/distribution/spec/api/#introduction
88#[derive(Debug, Getters, Setters)]
89#[getset(get = "pub with_prefix", set = "pub with_prefix")]
90pub struct DockerRegistry {
91    /// The HTTP client used to make requests to the Docker registry.
92    client: ClientWithMiddleware,
93
94    /// The directory where image layers are downloaded.
95    layer_download_dir: PathBuf,
96
97    /// The database where image configurations, indexes, and manifests are stored.
98    oci_db: Pool<Sqlite>,
99}
100
101//--------------------------------------------------------------------------------------------------
102// Types: Models
103//--------------------------------------------------------------------------------------------------
104
105/// Stores authentication credentials obtained from the Docker registry, including tokens and expiration details.
106#[derive(Debug, Serialize, Deserialize, Getters, Setters)]
107#[getset(get = "pub with_prefix", set = "pub with_prefix")]
108pub struct DockerAuthMaterial {
109    /// The token used to authenticate requests to the Docker registry.
110    token: String,
111
112    /// The access token used to authenticate requests to the Docker registry.
113    access_token: String,
114
115    /// The expiration time of the access token.
116    expires_in: u32,
117
118    /// The time the access token was issued.
119    issued_at: DateTime<Utc>,
120}
121
122/// Represents a response from the Docker registry, which could either be successful (`Ok`) or an error (`Error`).
123#[derive(Debug, Serialize, Deserialize)]
124#[serde(untagged)]
125pub enum DockerRegistryResponse<T> {
126    /// Represents a successful response from the Docker registry.
127    Ok(T),
128
129    /// Represents an error response from the Docker registry.
130    Error(DockerRegistryResponseError),
131}
132
133/// Represents an error response from the Docker registry, including detailed error messages.
134#[derive(Debug, Serialize, Deserialize, Error)]
135#[error("docker registry error: {errors}")]
136pub struct DockerRegistryResponseError {
137    /// The errors returned by the Docker registry.
138    errors: serde_json::Value,
139}
140
141//--------------------------------------------------------------------------------------------------
142// Methods
143//--------------------------------------------------------------------------------------------------
144
145impl DockerRegistry {
146    /// Creates a new Docker Registry client with the specified image download path and OCI database path.
147    ///
148    /// ## Arguments
149    ///
150    /// * `layer_download_dir` - The directory where downloaded image layers will be stored
151    /// * `oci_db_path` - The path to the SQLite database that stores OCI-related metadata
152    pub async fn new(
153        layer_download_dir: impl Into<PathBuf>,
154        oci_db_path: impl AsRef<Path>,
155    ) -> MicrosandboxResult<Self> {
156        let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3);
157        let client_builder = ClientBuilder::new(Client::new());
158        let client = client_builder
159            .with(RetryTransientMiddleware::new_with_policy(retry_policy))
160            .build();
161
162        Ok(Self {
163            client,
164            layer_download_dir: layer_download_dir.into(),
165            oci_db: db::get_or_create_pool(oci_db_path.as_ref(), &db::OCI_DB_MIGRATOR).await?,
166        })
167    }
168
169    /// Gets the size of a downloaded file if it exists.
170    fn get_downloaded_file_size(&self, digest: &Digest) -> u64 {
171        let download_path = self.layer_download_dir.join(digest.to_string());
172        // If the file does not exist, return 0 indicating no bytes have been downloaded
173        if !download_path.exists() {
174            return 0;
175        }
176
177        download_path.metadata().unwrap().len()
178    }
179
180    /// Gets the necessary authentication credentials for the given repository and tag.
181    ///
182    /// Currently, Docker tokens expire after 300 seconds, so we need to re-authenticate
183    /// after that period or just fetch new tokens on each request.
184    async fn get_access_credentials(
185        &self,
186        repository: &str,
187        service: &str,
188        scopes: &[&str],
189    ) -> MicrosandboxResult<DockerAuthMaterial> {
190        let request = self
191            .client
192            .get(DOCKER_AUTH_REALM)
193            .query(&[
194                ("service", service),
195                (
196                    "scope",
197                    format!("repository:{}:{}", repository, scopes.join(",")).as_str(),
198                ),
199            ])
200            .build()?;
201
202        let response = self.client.execute(request).await?;
203        let auth_credentials = response.json::<DockerAuthMaterial>().await?;
204
205        Ok(auth_credentials)
206    }
207
208    /// Downloads a blob from the registry, supports download resumption if the file already partially exists.
209    ///
210    /// Returns a tuple (MicrosandboxResult<()>, bool) where the boolean indicates whether a download
211    /// actually occurred (true) or was skipped because the file already exists (false).
212    pub async fn download_image_blob(
213        &self,
214        repository: &str,
215        digest: &Digest,
216        download_size: u64,
217    ) -> MicrosandboxResult<bool> {
218        #[cfg(feature = "cli")]
219        let progress_bar = {
220            let pb = MULTI_PROGRESS.add(ProgressBar::new(download_size));
221            let style = ProgressStyle::with_template(
222                "{prefix:.bold.dim} {bar:40.green/green.dim} {bytes:.bold} / {total_bytes:.dim}",
223            )
224            .unwrap()
225            .progress_chars("=+-");
226
227            pb.set_style(style);
228            // first 8 chars of sha part
229            let digest_short = digest.digest().get(..8).unwrap_or("");
230            pb.set_prefix(format!("{}", digest_short));
231            pb.clone()
232        };
233
234        #[cfg(feature = "cli")]
235        {
236            // If we already have some bytes downloaded, reflect that on the progress bar.
237            let downloaded_so_far = self.get_downloaded_file_size(digest);
238            progress_bar.set_position(downloaded_so_far);
239        }
240
241        let download_path = self.layer_download_dir.join(digest.to_string());
242
243        // First, check if the extracted layer directory already exists and is not empty
244        // Get the microsandbox home path and layers directory
245        let microsandbox_home_path = env::get_microsandbox_home_path();
246        let layers_dir = microsandbox_home_path.join(LAYERS_SUBDIR);
247        let extracted_layer_path =
248            layers_dir.join(format!("{}.{}", digest.to_string(), EXTRACTED_LAYER_SUFFIX));
249
250        // Check if extracted directory exists and has content
251        if extracted_layer_path.exists() {
252            match fs::read_dir(&extracted_layer_path).await {
253                Ok(mut read_dir) => {
254                    if let Ok(Some(_)) = read_dir.next_entry().await {
255                        // Extracted layer exists and contains at least one file
256                        tracing::info!(
257                            "extracted layer already exists: {}, skipping download",
258                            extracted_layer_path.display()
259                        );
260                        return Ok(false); // Return false to indicate no download occurred
261                    }
262                }
263                Err(e) => {
264                    tracing::warn!("error checking extracted layer directory: {}", e);
265                    // Continue with download if we can't read the directory
266                }
267            }
268        }
269
270        // Ensure the destination directory exists
271        if let Some(parent) = download_path.parent() {
272            fs::create_dir_all(parent).await?;
273        }
274
275        // Get the size of the already downloaded file if it exists
276        let downloaded_size = self.get_downloaded_file_size(digest);
277
278        // Open the file for writing, create if it doesn't exist
279        let mut file = if downloaded_size == 0 {
280            tracing::info!("layer {} does not exist, downloading", digest);
281            OpenOptions::new()
282                .create(true)
283                .truncate(true)
284                .write(true)
285                .open(&download_path)
286                .await?
287        } else if downloaded_size < download_size {
288            tracing::info!("layer {} exists, but is incomplete, downloading", digest);
289            OpenOptions::new().append(true).open(&download_path).await?
290        } else {
291            tracing::info!(
292                "file already exists skipping download: {}",
293                download_path.display()
294            );
295            return Ok(false); // Return false to indicate no download occurred
296        };
297
298        let mut stream = self
299            .fetch_image_blob(repository, digest, downloaded_size..)
300            .await?;
301
302        // Write the stream to the file
303        while let Some(chunk) = stream.next().await {
304            let bytes = chunk?;
305            file.write_all(&bytes).await?;
306            #[cfg(feature = "cli")]
307            progress_bar.inc(bytes.len() as u64);
308        }
309
310        #[cfg(feature = "cli")]
311        progress_bar.finish_and_clear();
312
313        // Verify the hash of the downloaded file
314        let algorithm = digest.algorithm();
315        let expected_hash = digest.digest();
316        let actual_hash = hex::encode(utils::get_file_hash(&download_path, algorithm).await?);
317
318        // Delete the already downloaded file if the hash does not match
319        if actual_hash != expected_hash {
320            fs::remove_file(&download_path).await?;
321            return Err(MicrosandboxError::ImageLayerDownloadFailed(format!(
322                    "({repository}:{digest}) file hash {actual_hash} does not match expected hash {expected_hash}",
323                )));
324        }
325
326        Ok(true) // Return true to indicate a download occurred
327    }
328}
329
330//--------------------------------------------------------------------------------------------------
331// Trait Implementations
332//--------------------------------------------------------------------------------------------------
333
334#[async_trait]
335impl OciRegistryPull for DockerRegistry {
336    async fn pull_image(
337        &self,
338        repository: &str,
339        selector: ReferenceSelector,
340    ) -> MicrosandboxResult<()> {
341        // Calculate total size and save image record
342        #[cfg(feature = "cli")]
343        let fetch_details_sp =
344            term::create_spinner(FETCH_IMAGE_DETAILS_MSG.to_string(), None, None);
345
346        let index = self.fetch_index(repository, selector.clone()).await?;
347
348        let total_size: i64 = index.manifests().iter().map(|m| m.size() as i64).sum();
349
350        // Construct reference based on selector type
351        let reference = match &selector {
352            ReferenceSelector::Tag { tag, digest } => {
353                let digest_part = digest
354                    .as_ref()
355                    .map(|d| format!("@{}:{}", d.algorithm(), d.digest()))
356                    .unwrap_or_default();
357                format!("{DOCKER_REFERENCE_REGISTRY_DOMAIN}/{repository}:{tag}{digest_part}")
358            }
359            ReferenceSelector::Digest(digest) => {
360                let digest_part = format!("@{}:{}", digest.algorithm(), digest.digest());
361                format!("{DOCKER_REFERENCE_REGISTRY_DOMAIN}/{repository}{digest_part}")
362            }
363        };
364
365        let image_id = db::save_or_update_image(&self.oci_db, &reference, total_size).await?;
366
367        // Save index
368        let platform = Platform::default();
369        let index_id = db::save_index(&self.oci_db, image_id, &index, Some(&platform)).await?;
370
371        // Select the right manifest for the platform or choose first if not specified
372        let manifest_desc = index
373            .manifests()
374            .iter()
375            .find(|m| {
376                m.platform().as_ref().is_some_and(|p| {
377                    // First priority: match both Linux OS and architecture
378                    matches!(p.os(), Os::Linux) &&
379                    p.architecture() == platform.architecture() &&
380                    // Skip attestation manifests
381                    !m.annotations().as_ref().is_some_and(|a| a.contains_key(DOCKER_REFERENCE_TYPE_ANNOTATION))
382                })
383            })
384            .or_else(|| {
385                // Second priority: match architecture only, if no Linux match found
386                index.manifests().iter().find(|m| {
387                    m.platform().as_ref().is_some_and(|p| {
388                        p.architecture() == platform.architecture() &&
389                        !m.annotations().as_ref().is_some_and(|a| a.contains_key(DOCKER_REFERENCE_TYPE_ANNOTATION))
390                    })
391                })
392            })
393            .ok_or(MicrosandboxError::ManifestNotFound)?;
394
395        let manifest = self
396            .fetch_manifest(repository, manifest_desc.digest())
397            .await?;
398
399        let manifest_id =
400            db::save_manifest(&self.oci_db, image_id, Some(index_id), &manifest).await?;
401
402        let config = self
403            .fetch_config(repository, manifest.config().digest())
404            .await?;
405
406        db::save_config(&self.oci_db, manifest_id, &config).await?;
407
408        #[cfg(feature = "cli")]
409        fetch_details_sp.finish();
410
411        let layers = manifest.layers();
412
413        #[cfg(feature = "cli")]
414        let download_layers_sp = term::create_spinner(
415            DOWNLOAD_LAYER_MSG.to_string(),
416            None,
417            Some(layers.len() as u64),
418        );
419
420        // Download layers concurrently and save to database
421        let layer_futures: Vec<_> = layers
422            .iter()
423            .zip(config.rootfs().diff_ids())
424            .map(|(layer_desc, diff_id)| async {
425                // Download the layer if it doesn't exist
426                // Check if the layer was actually downloaded
427                let layer_downloaded = self
428                    .download_image_blob(repository, layer_desc.digest(), layer_desc.size())
429                    .await?;
430
431                #[cfg(feature = "cli")]
432                download_layers_sp.inc(1);
433
434                // Get or create layer record in database
435                let layer_id = if layer_downloaded {
436                    tracing::info!(
437                        "Layer {} was downloaded, saving to database",
438                        layer_desc.digest()
439                    );
440
441                    // Save new layer metadata to database
442                    db::save_or_update_layer(
443                        &self.oci_db,
444                        &layer_desc.media_type().to_string(),
445                        &layer_desc.digest().to_string(),
446                        layer_desc.size() as i64,
447                        diff_id,
448                    )
449                    .await?
450                } else {
451                    tracing::info!(
452                        "Layer {} already exists, finding in database or creating record",
453                        layer_desc.digest()
454                    );
455
456                    // Try to find existing layer in database by digest
457                    let layers =
458                        db::get_layers_by_digest(&self.oci_db, &[layer_desc.digest().to_string()])
459                            .await?;
460
461                    if let Some(layer) = layers.first() {
462                        // Layer exists in database, use its ID
463                        layer.id
464                    } else {
465                        // Layer exists on disk but not in database, create record
466                        db::save_or_update_layer(
467                            &self.oci_db,
468                            &layer_desc.media_type().to_string(),
469                            &layer_desc.digest().to_string(),
470                            layer_desc.size() as i64,
471                            diff_id,
472                        )
473                        .await?
474                    }
475                };
476
477                // Always link the layer to the manifest
478                db::save_manifest_layer(&self.oci_db, manifest_id, layer_id).await?;
479
480                Ok::<_, MicrosandboxError>(())
481            })
482            .collect();
483
484        // Wait for all layers to download and save
485        for result in future::join_all(layer_futures).await {
486            result?;
487        }
488
489        #[cfg(feature = "cli")]
490        download_layers_sp.finish();
491
492        Ok(())
493    }
494
495    async fn fetch_index(
496        &self,
497        repository: &str,
498        selector: ReferenceSelector,
499    ) -> MicrosandboxResult<ImageIndex> {
500        let token = self
501            .get_access_credentials(repository, DOCKER_AUTH_SERVICE, &["pull"])
502            .await?
503            .token;
504
505        // Construct URL based on selector type
506        let reference = match &selector {
507            ReferenceSelector::Tag { tag, digest } => {
508                let digest_part = digest
509                    .as_ref()
510                    .map(|d| format!("@{}:{}", d.algorithm(), d.digest()))
511                    .unwrap_or_default();
512                format!("{tag}{digest_part}")
513            }
514            ReferenceSelector::Digest(digest) => {
515                format!("@{}:{}", digest.algorithm(), digest.digest())
516            }
517        };
518
519        let request = self
520            .client
521            .get(format!(
522                "{}/v2/{}/manifests/{}",
523                DOCKER_REGISTRY_URL, repository, reference
524            ))
525            .bearer_auth(token)
526            .header("Accept", DOCKER_MANIFEST_LIST_MIME_TYPE)
527            .build()?;
528
529        let response = self.client.execute(request).await?;
530        let image_index = response
531            .json::<DockerRegistryResponse<ImageIndex>>()
532            .await?;
533
534        match image_index {
535            DockerRegistryResponse::Ok(index) => Ok(index),
536            DockerRegistryResponse::Error(err) => Err(err.into()),
537        }
538    }
539
540    async fn fetch_manifest(
541        &self,
542        repository: &str,
543        digest: &Digest,
544    ) -> MicrosandboxResult<ImageManifest> {
545        let token = self
546            .get_access_credentials(repository, DOCKER_AUTH_SERVICE, &["pull"])
547            .await?
548            .token;
549
550        let request = self
551            .client
552            .get(format!(
553                "{}/v2/{}/manifests/{}",
554                DOCKER_REGISTRY_URL, repository, digest
555            ))
556            .bearer_auth(token)
557            .header("Accept", DOCKER_MANIFEST_MIME_TYPE)
558            .build()?;
559
560        let response = self.client.execute(request).await?;
561        let manifest = response
562            .json::<DockerRegistryResponse<ImageManifest>>()
563            .await?;
564
565        match manifest {
566            DockerRegistryResponse::Ok(manifest) => Ok(manifest),
567            DockerRegistryResponse::Error(err) => Err(err.into()),
568        }
569    }
570
571    async fn fetch_config(
572        &self,
573        repository: &str,
574        digest: &Digest,
575    ) -> MicrosandboxResult<ImageConfiguration> {
576        let token = self
577            .get_access_credentials(repository, DOCKER_AUTH_SERVICE, &["pull"])
578            .await?
579            .token;
580
581        let request = self
582            .client
583            .get(format!(
584                "{}/v2/{}/blobs/{}",
585                DOCKER_REGISTRY_URL, repository, digest
586            ))
587            .bearer_auth(token)
588            .header("Accept", DOCKER_CONFIG_MIME_TYPE)
589            .build()?;
590
591        let response = self.client.execute(request).await?;
592        let config = response
593            .json::<DockerRegistryResponse<ImageConfiguration>>()
594            .await?;
595
596        match config {
597            DockerRegistryResponse::Ok(config) => Ok(config),
598            DockerRegistryResponse::Error(err) => Err(err.into()),
599        }
600    }
601
602    async fn fetch_image_blob(
603        &self,
604        repository: &str,
605        digest: &Digest,
606        range: impl RangeBounds<u64> + Send,
607    ) -> MicrosandboxResult<BoxStream<'static, MicrosandboxResult<Bytes>>> {
608        let (start, end) = utils::convert_bounds(range);
609        let end = if end == u64::MAX {
610            "".to_string()
611        } else {
612            end.to_string()
613        };
614
615        tracing::info!("fetching blob: {digest} {start}-{end}");
616
617        let token = self
618            .get_access_credentials(repository, DOCKER_AUTH_SERVICE, &["pull"])
619            .await?
620            .token;
621
622        let request = self
623            .client
624            .get(format!(
625                "{}/v2/{}/blobs/{}",
626                DOCKER_REGISTRY_URL, repository, digest
627            ))
628            .bearer_auth(token)
629            .header("Accept", DOCKER_IMAGE_BLOB_MIME_TYPE)
630            .header("Range", format!("bytes={start}-{end}"))
631            .build()?;
632
633        let response = self.client.execute(request).await?;
634        let stream = response
635            .bytes_stream()
636            .map(|item| item.map_err(|e| e.into()));
637
638        Ok(stream.boxed())
639    }
640}
641
642//--------------------------------------------------------------------------------------------------
643// Tests
644//--------------------------------------------------------------------------------------------------
645
646#[cfg(test)]
647mod tests {
648    use super::*;
649    use chrono::DateTime;
650    use oci_spec::image::{DigestAlgorithm, Os};
651    use sqlx::Row;
652    use tokio::test;
653
654    #[test]
655    #[ignore = "makes network requests to Docker registry to pull an image"]
656    async fn test_docker_pull_image() -> anyhow::Result<()> {
657        let (client, temp_download_dir, _temp_db_dir) = helper::setup_test_client().await;
658        let repository = "library/alpine";
659        let tag = "latest";
660        let result = client
661            .pull_image(repository, ReferenceSelector::tag(tag))
662            .await;
663        assert!(result.is_ok());
664
665        // Verify image record in database
666        let image = sqlx::query("SELECT * FROM images WHERE reference = ?")
667            .bind(format!(
668                "{DOCKER_REFERENCE_REGISTRY_DOMAIN}/{repository}:{tag}"
669            ))
670            .fetch_one(&client.oci_db)
671            .await?;
672        assert!(image.get::<i64, _>("size_bytes") > 0);
673
674        // Verify index record
675        let index_id = image.get::<i64, _>("id");
676        let index = sqlx::query("SELECT * FROM indexes WHERE image_id = ?")
677            .bind(index_id)
678            .fetch_one(&client.oci_db)
679            .await?;
680        assert_eq!(index.get::<i64, _>("schema_version"), 2);
681
682        // Verify manifest record
683        let manifest = sqlx::query("SELECT * FROM manifests WHERE image_id = ?")
684            .bind(index_id)
685            .fetch_one(&client.oci_db)
686            .await?;
687        assert_eq!(manifest.get::<i64, _>("schema_version"), 2);
688
689        // Verify config record
690        let manifest_id = manifest.get::<i64, _>("id");
691        let config = sqlx::query("SELECT * FROM configs WHERE manifest_id = ?")
692            .bind(manifest_id)
693            .fetch_one(&client.oci_db)
694            .await?;
695        assert!(matches!(config.get::<String, _>("os"), s if s == Os::Linux.to_string()));
696
697        // Verify layers were downloaded and match records
698        let layers = sqlx::query("SELECT * FROM layers WHERE manifest_id = ?")
699            .bind(manifest_id)
700            .fetch_all(&client.oci_db)
701            .await?;
702        assert!(!layers.is_empty());
703
704        for layer in layers {
705            let digest = layer.get::<String, _>("digest");
706            let size = layer.get::<i64, _>("size_bytes");
707            let layer_path = temp_download_dir.path().join(&digest);
708
709            // Verify layer file exists and has correct size
710            assert!(layer_path.exists(), "Layer file {} not found", digest);
711            assert_eq!(
712                fs::metadata(&layer_path).await?.len() as i64,
713                size,
714                "Layer {} size mismatch",
715                digest
716            );
717
718            // Verify layer hash
719            let parts: Vec<&str> = digest.split(':').collect();
720            let algorithm = &DigestAlgorithm::try_from(parts[0])?;
721            let expected_hash = parts[1];
722            let actual_hash = hex::encode(utils::get_file_hash(&layer_path, algorithm).await?);
723            assert_eq!(actual_hash, expected_hash, "Layer {} hash mismatch", digest);
724        }
725
726        Ok(())
727    }
728
729    #[test]
730    #[ignore = "makes network requests to Docker registry to fetch image index"]
731    async fn test_docker_fetch_index() -> anyhow::Result<()> {
732        let (client, _temp_download_dir, _temp_db_dir) = helper::setup_test_client().await;
733        let repository = "library/alpine";
734        let tag = "latest";
735
736        let result = client
737            .fetch_index(repository, ReferenceSelector::tag(tag))
738            .await;
739        assert!(result.is_ok());
740
741        let index = result.unwrap();
742        assert!(!index.manifests().is_empty());
743
744        // Verify manifest entries have required fields
745        for manifest in index.manifests() {
746            assert!(manifest.size() > 0);
747            assert!(manifest.digest().to_string().starts_with("sha256:"));
748            assert!(manifest.media_type().to_string().contains("manifest"));
749
750            // Verify platform info for non-attestation manifests
751            if !manifest
752                .annotations()
753                .as_ref()
754                .is_some_and(|a| a.contains_key(DOCKER_REFERENCE_TYPE_ANNOTATION))
755            {
756                let platform = manifest.platform().as_ref().expect("Platform info missing");
757                assert!(matches!(platform.os(), Os::Linux));
758            }
759        }
760
761        Ok(())
762    }
763
764    #[test]
765    #[ignore = "makes network requests to Docker registry to fetch image manifest"]
766    async fn test_docker_fetch_manifest() -> anyhow::Result<()> {
767        let (client, _temp_download_dir, _temp_db_dir) = helper::setup_test_client().await;
768        let repository = "library/alpine";
769
770        // First get the manifest digest from the index
771        let index = client
772            .fetch_index(repository, ReferenceSelector::tag("latest"))
773            .await?;
774
775        let manifest_desc = index.manifests().first().unwrap();
776        let result = client
777            .fetch_manifest(repository, manifest_desc.digest())
778            .await;
779
780        assert!(result.is_ok());
781        let manifest = result.unwrap();
782
783        // Verify manifest has required fields
784        assert_eq!(manifest.schema_version(), 2);
785        assert!(manifest.config().size() > 0);
786        assert!(manifest
787            .config()
788            .digest()
789            .to_string()
790            .starts_with("sha256:"));
791        assert!(manifest
792            .config()
793            .media_type()
794            .to_string()
795            .contains("config"));
796
797        // Verify layers
798        assert!(!manifest.layers().is_empty());
799        for layer in manifest.layers() {
800            assert!(layer.size() > 0);
801            assert!(layer.digest().to_string().starts_with("sha256:"));
802            assert!(layer.media_type().to_string().contains("layer"));
803        }
804
805        Ok(())
806    }
807
808    #[test]
809    #[ignore = "makes network requests to Docker registry to fetch image config"]
810    async fn test_docker_fetch_config() -> anyhow::Result<()> {
811        let (client, _temp_download_dir, _temp_db_dir) = helper::setup_test_client().await;
812        let repository = "library/alpine";
813
814        // Get the config digest from manifest
815        let index = client
816            .fetch_index(repository, ReferenceSelector::tag("latest"))
817            .await?;
818
819        let manifest = client
820            .fetch_manifest(repository, index.manifests().first().unwrap().digest())
821            .await?;
822
823        let result = client
824            .fetch_config(repository, manifest.config().digest())
825            .await;
826        assert!(result.is_ok());
827
828        let config = result.unwrap();
829
830        // Verify required OCI spec fields
831        assert_eq!(*config.os(), Os::Linux);
832        assert!(config.rootfs().typ() == "layers");
833        assert!(!config.rootfs().diff_ids().is_empty());
834
835        // Verify optional but common fields
836        if let Some(created) = config.created() {
837            let created_time = DateTime::parse_from_rfc3339(created).unwrap();
838            assert!(created_time.timestamp_millis() > 0);
839        }
840        if let Some(config_fields) = config.config() {
841            if let Some(env) = config_fields.env() {
842                assert!(!env.is_empty());
843            }
844            if let Some(cmd) = config_fields.cmd() {
845                assert!(!cmd.is_empty());
846            }
847        }
848
849        Ok(())
850    }
851
852    #[test]
853    #[ignore = "makes network requests to Docker registry to fetch image blob"]
854    async fn test_docker_fetch_image_blob() -> anyhow::Result<()> {
855        let (client, temp_download_dir, _temp_db_dir) = helper::setup_test_client().await;
856        let repository = "library/alpine";
857
858        // Get a layer digest from manifest
859        let index = client
860            .fetch_index(repository, ReferenceSelector::tag("latest"))
861            .await?;
862
863        let manifest = client
864            .fetch_manifest(repository, index.manifests().first().unwrap().digest())
865            .await?;
866
867        let layer = manifest.layers().first().unwrap();
868        let mut stream = client
869            .fetch_image_blob(repository, layer.digest(), 0..)
870            .await?;
871
872        // Download the blob to a temporary file
873        let temp_file = temp_download_dir.path().join("test_blob");
874        let mut file = fs::File::create(&temp_file).await?;
875        let mut total_size = 0;
876
877        while let Some(chunk) = stream.next().await {
878            let bytes = chunk?;
879            total_size += bytes.len();
880            file.write_all(&bytes).await?;
881        }
882
883        // Verify size matches
884        assert!(total_size > 0);
885        assert_eq!(total_size as u64, layer.size());
886
887        // Verify hash matches
888        let algorithm = layer.digest().algorithm();
889        let expected_hash = layer.digest().digest();
890        let actual_hash = hex::encode(utils::get_file_hash(&temp_file, algorithm).await?);
891        assert_eq!(actual_hash, expected_hash);
892
893        Ok(())
894    }
895
896    #[test]
897    #[ignore = "makes network requests to Docker registry to get authentication credentials"]
898    async fn test_docker_get_access_credentials() -> anyhow::Result<()> {
899        let (client, _temp_download_dir, _temp_db_dir) = helper::setup_test_client().await;
900
901        let result = client
902            .get_access_credentials("library/alpine", DOCKER_AUTH_SERVICE, &["pull"])
903            .await;
904
905        assert!(result.is_ok());
906        let credentials = result.unwrap();
907
908        // Verify credential fields
909        assert!(!credentials.token.is_empty());
910        assert!(!credentials.access_token.is_empty());
911        assert!(credentials.expires_in > 0);
912
913        Ok(())
914    }
915}
916
917#[cfg(test)]
918mod helper {
919    use tempfile::TempDir;
920
921    use super::*;
922
923    // Helper function to create a test Docker registry client
924    pub(super) async fn setup_test_client() -> (DockerRegistry, TempDir, TempDir) {
925        let temp_download_dir = TempDir::new().unwrap();
926        let temp_db_dir = TempDir::new().unwrap();
927        let db_path = temp_db_dir.path().join("test.db");
928
929        let client = DockerRegistry::new(temp_download_dir.path().to_path_buf(), db_path)
930            .await
931            .unwrap();
932
933        (client, temp_download_dir, temp_db_dir)
934    }
935}