1use std::{
2 ops::RangeBounds,
3 path::{Path, PathBuf},
4};
5
6use async_trait::async_trait;
7use bytes::Bytes;
8use chrono::{DateTime, Utc};
9use futures::{future, stream::BoxStream, StreamExt};
10use getset::{Getters, Setters};
11use microsandbox_utils::{env, EXTRACTED_LAYER_SUFFIX, LAYERS_SUBDIR};
12use oci_spec::image::{Digest, ImageConfiguration, ImageIndex, ImageManifest, Os, Platform};
13use reqwest::Client;
14use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
15use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
16use serde::{Deserialize, Serialize};
17use sqlx::{Pool, Sqlite};
18use thiserror::Error;
19use tokio::{
20 fs::{self, OpenOptions},
21 io::AsyncWriteExt,
22};
23
24use crate::{
25 management::db,
26 oci::{OciRegistryPull, ReferenceSelector},
27 utils, MicrosandboxError, MicrosandboxResult,
28};
29
30#[cfg(feature = "cli")]
31use indicatif::{ProgressBar, ProgressStyle};
32#[cfg(feature = "cli")]
33use microsandbox_utils::term::{self, MULTI_PROGRESS};
34
35pub const DOCKER_REFERENCE_REGISTRY_DOMAIN: &str = "docker.io";
41
42const DOCKER_REGISTRY_URL: &str = "https://registry-1.docker.io";
44
45const DOCKER_AUTH_SERVICE: &str = "registry.docker.io";
47
48const DOCKER_AUTH_REALM: &str = "https://auth.docker.io/token";
50
51const DOCKER_MANIFEST_MIME_TYPE: &str = "application/vnd.docker.distribution.manifest.v2+json";
53
54const DOCKER_MANIFEST_LIST_MIME_TYPE: &str =
56 "application/vnd.docker.distribution.manifest.list.v2+json";
57
58const DOCKER_IMAGE_BLOB_MIME_TYPE: &str = "application/vnd.docker.image.rootfs.diff.tar.gzip";
60
61const DOCKER_CONFIG_MIME_TYPE: &str = "application/vnd.docker.container.image.v1+json";
63
64const DOCKER_REFERENCE_TYPE_ANNOTATION: &str = "vnd.docker.reference.type";
66
67#[cfg(feature = "cli")]
68const FETCH_IMAGE_DETAILS_MSG: &str = "Fetch image details";
70
71#[cfg(feature = "cli")]
72const DOWNLOAD_LAYER_MSG: &str = "Download layers";
74
75#[derive(Debug, Getters, Setters)]
89#[getset(get = "pub with_prefix", set = "pub with_prefix")]
90pub struct DockerRegistry {
91 client: ClientWithMiddleware,
93
94 layer_download_dir: PathBuf,
96
97 oci_db: Pool<Sqlite>,
99}
100
101#[derive(Debug, Serialize, Deserialize, Getters, Setters)]
107#[getset(get = "pub with_prefix", set = "pub with_prefix")]
108pub struct DockerAuthMaterial {
109 token: String,
111
112 access_token: String,
114
115 expires_in: u32,
117
118 issued_at: DateTime<Utc>,
120}
121
122#[derive(Debug, Serialize, Deserialize)]
124#[serde(untagged)]
125pub enum DockerRegistryResponse<T> {
126 Ok(T),
128
129 Error(DockerRegistryResponseError),
131}
132
133#[derive(Debug, Serialize, Deserialize, Error)]
135#[error("docker registry error: {errors}")]
136pub struct DockerRegistryResponseError {
137 errors: serde_json::Value,
139}
140
141impl DockerRegistry {
146 pub async fn new(
153 layer_download_dir: impl Into<PathBuf>,
154 oci_db_path: impl AsRef<Path>,
155 ) -> MicrosandboxResult<Self> {
156 let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3);
157 let client_builder = ClientBuilder::new(Client::new());
158 let client = client_builder
159 .with(RetryTransientMiddleware::new_with_policy(retry_policy))
160 .build();
161
162 Ok(Self {
163 client,
164 layer_download_dir: layer_download_dir.into(),
165 oci_db: db::get_or_create_pool(oci_db_path.as_ref(), &db::OCI_DB_MIGRATOR).await?,
166 })
167 }
168
169 fn get_downloaded_file_size(&self, digest: &Digest) -> u64 {
171 let download_path = self.layer_download_dir.join(digest.to_string());
172 if !download_path.exists() {
174 return 0;
175 }
176
177 download_path.metadata().unwrap().len()
178 }
179
180 async fn get_access_credentials(
185 &self,
186 repository: &str,
187 service: &str,
188 scopes: &[&str],
189 ) -> MicrosandboxResult<DockerAuthMaterial> {
190 let request = self
191 .client
192 .get(DOCKER_AUTH_REALM)
193 .query(&[
194 ("service", service),
195 (
196 "scope",
197 format!("repository:{}:{}", repository, scopes.join(",")).as_str(),
198 ),
199 ])
200 .build()?;
201
202 let response = self.client.execute(request).await?;
203 let auth_credentials = response.json::<DockerAuthMaterial>().await?;
204
205 Ok(auth_credentials)
206 }
207
208 pub async fn download_image_blob(
213 &self,
214 repository: &str,
215 digest: &Digest,
216 download_size: u64,
217 ) -> MicrosandboxResult<bool> {
218 #[cfg(feature = "cli")]
219 let progress_bar = {
220 let pb = MULTI_PROGRESS.add(ProgressBar::new(download_size));
221 let style = ProgressStyle::with_template(
222 "{prefix:.bold.dim} {bar:40.green/green.dim} {bytes:.bold} / {total_bytes:.dim}",
223 )
224 .unwrap()
225 .progress_chars("=+-");
226
227 pb.set_style(style);
228 let digest_short = digest.digest().get(..8).unwrap_or("");
230 pb.set_prefix(format!("{}", digest_short));
231 pb.clone()
232 };
233
234 #[cfg(feature = "cli")]
235 {
236 let downloaded_so_far = self.get_downloaded_file_size(digest);
238 progress_bar.set_position(downloaded_so_far);
239 }
240
241 let download_path = self.layer_download_dir.join(digest.to_string());
242
243 let microsandbox_home_path = env::get_microsandbox_home_path();
246 let layers_dir = microsandbox_home_path.join(LAYERS_SUBDIR);
247 let extracted_layer_path =
248 layers_dir.join(format!("{}.{}", digest.to_string(), EXTRACTED_LAYER_SUFFIX));
249
250 if extracted_layer_path.exists() {
252 match fs::read_dir(&extracted_layer_path).await {
253 Ok(mut read_dir) => {
254 if let Ok(Some(_)) = read_dir.next_entry().await {
255 tracing::info!(
257 "extracted layer already exists: {}, skipping download",
258 extracted_layer_path.display()
259 );
260 return Ok(false); }
262 }
263 Err(e) => {
264 tracing::warn!("error checking extracted layer directory: {}", e);
265 }
267 }
268 }
269
270 if let Some(parent) = download_path.parent() {
272 fs::create_dir_all(parent).await?;
273 }
274
275 let downloaded_size = self.get_downloaded_file_size(digest);
277
278 let mut file = if downloaded_size == 0 {
280 tracing::info!("layer {} does not exist, downloading", digest);
281 OpenOptions::new()
282 .create(true)
283 .truncate(true)
284 .write(true)
285 .open(&download_path)
286 .await?
287 } else if downloaded_size < download_size {
288 tracing::info!("layer {} exists, but is incomplete, downloading", digest);
289 OpenOptions::new().append(true).open(&download_path).await?
290 } else {
291 tracing::info!(
292 "file already exists skipping download: {}",
293 download_path.display()
294 );
295 return Ok(false); };
297
298 let mut stream = self
299 .fetch_image_blob(repository, digest, downloaded_size..)
300 .await?;
301
302 while let Some(chunk) = stream.next().await {
304 let bytes = chunk?;
305 file.write_all(&bytes).await?;
306 #[cfg(feature = "cli")]
307 progress_bar.inc(bytes.len() as u64);
308 }
309
310 #[cfg(feature = "cli")]
311 progress_bar.finish_and_clear();
312
313 let algorithm = digest.algorithm();
315 let expected_hash = digest.digest();
316 let actual_hash = hex::encode(utils::get_file_hash(&download_path, algorithm).await?);
317
318 if actual_hash != expected_hash {
320 fs::remove_file(&download_path).await?;
321 return Err(MicrosandboxError::ImageLayerDownloadFailed(format!(
322 "({repository}:{digest}) file hash {actual_hash} does not match expected hash {expected_hash}",
323 )));
324 }
325
326 Ok(true) }
328}
329
330#[async_trait]
335impl OciRegistryPull for DockerRegistry {
336 async fn pull_image(
337 &self,
338 repository: &str,
339 selector: ReferenceSelector,
340 ) -> MicrosandboxResult<()> {
341 #[cfg(feature = "cli")]
343 let fetch_details_sp =
344 term::create_spinner(FETCH_IMAGE_DETAILS_MSG.to_string(), None, None);
345
346 let index = self.fetch_index(repository, selector.clone()).await?;
347
348 let total_size: i64 = index.manifests().iter().map(|m| m.size() as i64).sum();
349
350 let reference = match &selector {
352 ReferenceSelector::Tag { tag, digest } => {
353 let digest_part = digest
354 .as_ref()
355 .map(|d| format!("@{}:{}", d.algorithm(), d.digest()))
356 .unwrap_or_default();
357 format!("{DOCKER_REFERENCE_REGISTRY_DOMAIN}/{repository}:{tag}{digest_part}")
358 }
359 ReferenceSelector::Digest(digest) => {
360 let digest_part = format!("@{}:{}", digest.algorithm(), digest.digest());
361 format!("{DOCKER_REFERENCE_REGISTRY_DOMAIN}/{repository}{digest_part}")
362 }
363 };
364
365 let image_id = db::save_or_update_image(&self.oci_db, &reference, total_size).await?;
366
367 let platform = Platform::default();
369 let index_id = db::save_index(&self.oci_db, image_id, &index, Some(&platform)).await?;
370
371 let manifest_desc = index
373 .manifests()
374 .iter()
375 .find(|m| {
376 m.platform().as_ref().is_some_and(|p| {
377 matches!(p.os(), Os::Linux) &&
379 p.architecture() == platform.architecture() &&
380 !m.annotations().as_ref().is_some_and(|a| a.contains_key(DOCKER_REFERENCE_TYPE_ANNOTATION))
382 })
383 })
384 .or_else(|| {
385 index.manifests().iter().find(|m| {
387 m.platform().as_ref().is_some_and(|p| {
388 p.architecture() == platform.architecture() &&
389 !m.annotations().as_ref().is_some_and(|a| a.contains_key(DOCKER_REFERENCE_TYPE_ANNOTATION))
390 })
391 })
392 })
393 .ok_or(MicrosandboxError::ManifestNotFound)?;
394
395 let manifest = self
396 .fetch_manifest(repository, manifest_desc.digest())
397 .await?;
398
399 let manifest_id =
400 db::save_manifest(&self.oci_db, image_id, Some(index_id), &manifest).await?;
401
402 let config = self
403 .fetch_config(repository, manifest.config().digest())
404 .await?;
405
406 db::save_config(&self.oci_db, manifest_id, &config).await?;
407
408 #[cfg(feature = "cli")]
409 fetch_details_sp.finish();
410
411 let layers = manifest.layers();
412
413 #[cfg(feature = "cli")]
414 let download_layers_sp = term::create_spinner(
415 DOWNLOAD_LAYER_MSG.to_string(),
416 None,
417 Some(layers.len() as u64),
418 );
419
420 let layer_futures: Vec<_> = layers
422 .iter()
423 .zip(config.rootfs().diff_ids())
424 .map(|(layer_desc, diff_id)| async {
425 let layer_downloaded = self
428 .download_image_blob(repository, layer_desc.digest(), layer_desc.size())
429 .await?;
430
431 #[cfg(feature = "cli")]
432 download_layers_sp.inc(1);
433
434 let layer_id = if layer_downloaded {
436 tracing::info!(
437 "Layer {} was downloaded, saving to database",
438 layer_desc.digest()
439 );
440
441 db::save_or_update_layer(
443 &self.oci_db,
444 &layer_desc.media_type().to_string(),
445 &layer_desc.digest().to_string(),
446 layer_desc.size() as i64,
447 diff_id,
448 )
449 .await?
450 } else {
451 tracing::info!(
452 "Layer {} already exists, finding in database or creating record",
453 layer_desc.digest()
454 );
455
456 let layers =
458 db::get_layers_by_digest(&self.oci_db, &[layer_desc.digest().to_string()])
459 .await?;
460
461 if let Some(layer) = layers.first() {
462 layer.id
464 } else {
465 db::save_or_update_layer(
467 &self.oci_db,
468 &layer_desc.media_type().to_string(),
469 &layer_desc.digest().to_string(),
470 layer_desc.size() as i64,
471 diff_id,
472 )
473 .await?
474 }
475 };
476
477 db::save_manifest_layer(&self.oci_db, manifest_id, layer_id).await?;
479
480 Ok::<_, MicrosandboxError>(())
481 })
482 .collect();
483
484 for result in future::join_all(layer_futures).await {
486 result?;
487 }
488
489 #[cfg(feature = "cli")]
490 download_layers_sp.finish();
491
492 Ok(())
493 }
494
495 async fn fetch_index(
496 &self,
497 repository: &str,
498 selector: ReferenceSelector,
499 ) -> MicrosandboxResult<ImageIndex> {
500 let token = self
501 .get_access_credentials(repository, DOCKER_AUTH_SERVICE, &["pull"])
502 .await?
503 .token;
504
505 let reference = match &selector {
507 ReferenceSelector::Tag { tag, digest } => {
508 let digest_part = digest
509 .as_ref()
510 .map(|d| format!("@{}:{}", d.algorithm(), d.digest()))
511 .unwrap_or_default();
512 format!("{tag}{digest_part}")
513 }
514 ReferenceSelector::Digest(digest) => {
515 format!("@{}:{}", digest.algorithm(), digest.digest())
516 }
517 };
518
519 let request = self
520 .client
521 .get(format!(
522 "{}/v2/{}/manifests/{}",
523 DOCKER_REGISTRY_URL, repository, reference
524 ))
525 .bearer_auth(token)
526 .header("Accept", DOCKER_MANIFEST_LIST_MIME_TYPE)
527 .build()?;
528
529 let response = self.client.execute(request).await?;
530 let image_index = response
531 .json::<DockerRegistryResponse<ImageIndex>>()
532 .await?;
533
534 match image_index {
535 DockerRegistryResponse::Ok(index) => Ok(index),
536 DockerRegistryResponse::Error(err) => Err(err.into()),
537 }
538 }
539
540 async fn fetch_manifest(
541 &self,
542 repository: &str,
543 digest: &Digest,
544 ) -> MicrosandboxResult<ImageManifest> {
545 let token = self
546 .get_access_credentials(repository, DOCKER_AUTH_SERVICE, &["pull"])
547 .await?
548 .token;
549
550 let request = self
551 .client
552 .get(format!(
553 "{}/v2/{}/manifests/{}",
554 DOCKER_REGISTRY_URL, repository, digest
555 ))
556 .bearer_auth(token)
557 .header("Accept", DOCKER_MANIFEST_MIME_TYPE)
558 .build()?;
559
560 let response = self.client.execute(request).await?;
561 let manifest = response
562 .json::<DockerRegistryResponse<ImageManifest>>()
563 .await?;
564
565 match manifest {
566 DockerRegistryResponse::Ok(manifest) => Ok(manifest),
567 DockerRegistryResponse::Error(err) => Err(err.into()),
568 }
569 }
570
571 async fn fetch_config(
572 &self,
573 repository: &str,
574 digest: &Digest,
575 ) -> MicrosandboxResult<ImageConfiguration> {
576 let token = self
577 .get_access_credentials(repository, DOCKER_AUTH_SERVICE, &["pull"])
578 .await?
579 .token;
580
581 let request = self
582 .client
583 .get(format!(
584 "{}/v2/{}/blobs/{}",
585 DOCKER_REGISTRY_URL, repository, digest
586 ))
587 .bearer_auth(token)
588 .header("Accept", DOCKER_CONFIG_MIME_TYPE)
589 .build()?;
590
591 let response = self.client.execute(request).await?;
592 let config = response
593 .json::<DockerRegistryResponse<ImageConfiguration>>()
594 .await?;
595
596 match config {
597 DockerRegistryResponse::Ok(config) => Ok(config),
598 DockerRegistryResponse::Error(err) => Err(err.into()),
599 }
600 }
601
602 async fn fetch_image_blob(
603 &self,
604 repository: &str,
605 digest: &Digest,
606 range: impl RangeBounds<u64> + Send,
607 ) -> MicrosandboxResult<BoxStream<'static, MicrosandboxResult<Bytes>>> {
608 let (start, end) = utils::convert_bounds(range);
609 let end = if end == u64::MAX {
610 "".to_string()
611 } else {
612 end.to_string()
613 };
614
615 tracing::info!("fetching blob: {digest} {start}-{end}");
616
617 let token = self
618 .get_access_credentials(repository, DOCKER_AUTH_SERVICE, &["pull"])
619 .await?
620 .token;
621
622 let request = self
623 .client
624 .get(format!(
625 "{}/v2/{}/blobs/{}",
626 DOCKER_REGISTRY_URL, repository, digest
627 ))
628 .bearer_auth(token)
629 .header("Accept", DOCKER_IMAGE_BLOB_MIME_TYPE)
630 .header("Range", format!("bytes={start}-{end}"))
631 .build()?;
632
633 let response = self.client.execute(request).await?;
634 let stream = response
635 .bytes_stream()
636 .map(|item| item.map_err(|e| e.into()));
637
638 Ok(stream.boxed())
639 }
640}
641
642#[cfg(test)]
647mod tests {
648 use super::*;
649 use chrono::DateTime;
650 use oci_spec::image::{DigestAlgorithm, Os};
651 use sqlx::Row;
652 use tokio::test;
653
654 #[test]
655 #[ignore = "makes network requests to Docker registry to pull an image"]
656 async fn test_docker_pull_image() -> anyhow::Result<()> {
657 let (client, temp_download_dir, _temp_db_dir) = helper::setup_test_client().await;
658 let repository = "library/alpine";
659 let tag = "latest";
660 let result = client
661 .pull_image(repository, ReferenceSelector::tag(tag))
662 .await;
663 assert!(result.is_ok());
664
665 let image = sqlx::query("SELECT * FROM images WHERE reference = ?")
667 .bind(format!(
668 "{DOCKER_REFERENCE_REGISTRY_DOMAIN}/{repository}:{tag}"
669 ))
670 .fetch_one(&client.oci_db)
671 .await?;
672 assert!(image.get::<i64, _>("size_bytes") > 0);
673
674 let index_id = image.get::<i64, _>("id");
676 let index = sqlx::query("SELECT * FROM indexes WHERE image_id = ?")
677 .bind(index_id)
678 .fetch_one(&client.oci_db)
679 .await?;
680 assert_eq!(index.get::<i64, _>("schema_version"), 2);
681
682 let manifest = sqlx::query("SELECT * FROM manifests WHERE image_id = ?")
684 .bind(index_id)
685 .fetch_one(&client.oci_db)
686 .await?;
687 assert_eq!(manifest.get::<i64, _>("schema_version"), 2);
688
689 let manifest_id = manifest.get::<i64, _>("id");
691 let config = sqlx::query("SELECT * FROM configs WHERE manifest_id = ?")
692 .bind(manifest_id)
693 .fetch_one(&client.oci_db)
694 .await?;
695 assert!(matches!(config.get::<String, _>("os"), s if s == Os::Linux.to_string()));
696
697 let layers = sqlx::query("SELECT * FROM layers WHERE manifest_id = ?")
699 .bind(manifest_id)
700 .fetch_all(&client.oci_db)
701 .await?;
702 assert!(!layers.is_empty());
703
704 for layer in layers {
705 let digest = layer.get::<String, _>("digest");
706 let size = layer.get::<i64, _>("size_bytes");
707 let layer_path = temp_download_dir.path().join(&digest);
708
709 assert!(layer_path.exists(), "Layer file {} not found", digest);
711 assert_eq!(
712 fs::metadata(&layer_path).await?.len() as i64,
713 size,
714 "Layer {} size mismatch",
715 digest
716 );
717
718 let parts: Vec<&str> = digest.split(':').collect();
720 let algorithm = &DigestAlgorithm::try_from(parts[0])?;
721 let expected_hash = parts[1];
722 let actual_hash = hex::encode(utils::get_file_hash(&layer_path, algorithm).await?);
723 assert_eq!(actual_hash, expected_hash, "Layer {} hash mismatch", digest);
724 }
725
726 Ok(())
727 }
728
729 #[test]
730 #[ignore = "makes network requests to Docker registry to fetch image index"]
731 async fn test_docker_fetch_index() -> anyhow::Result<()> {
732 let (client, _temp_download_dir, _temp_db_dir) = helper::setup_test_client().await;
733 let repository = "library/alpine";
734 let tag = "latest";
735
736 let result = client
737 .fetch_index(repository, ReferenceSelector::tag(tag))
738 .await;
739 assert!(result.is_ok());
740
741 let index = result.unwrap();
742 assert!(!index.manifests().is_empty());
743
744 for manifest in index.manifests() {
746 assert!(manifest.size() > 0);
747 assert!(manifest.digest().to_string().starts_with("sha256:"));
748 assert!(manifest.media_type().to_string().contains("manifest"));
749
750 if !manifest
752 .annotations()
753 .as_ref()
754 .is_some_and(|a| a.contains_key(DOCKER_REFERENCE_TYPE_ANNOTATION))
755 {
756 let platform = manifest.platform().as_ref().expect("Platform info missing");
757 assert!(matches!(platform.os(), Os::Linux));
758 }
759 }
760
761 Ok(())
762 }
763
764 #[test]
765 #[ignore = "makes network requests to Docker registry to fetch image manifest"]
766 async fn test_docker_fetch_manifest() -> anyhow::Result<()> {
767 let (client, _temp_download_dir, _temp_db_dir) = helper::setup_test_client().await;
768 let repository = "library/alpine";
769
770 let index = client
772 .fetch_index(repository, ReferenceSelector::tag("latest"))
773 .await?;
774
775 let manifest_desc = index.manifests().first().unwrap();
776 let result = client
777 .fetch_manifest(repository, manifest_desc.digest())
778 .await;
779
780 assert!(result.is_ok());
781 let manifest = result.unwrap();
782
783 assert_eq!(manifest.schema_version(), 2);
785 assert!(manifest.config().size() > 0);
786 assert!(manifest
787 .config()
788 .digest()
789 .to_string()
790 .starts_with("sha256:"));
791 assert!(manifest
792 .config()
793 .media_type()
794 .to_string()
795 .contains("config"));
796
797 assert!(!manifest.layers().is_empty());
799 for layer in manifest.layers() {
800 assert!(layer.size() > 0);
801 assert!(layer.digest().to_string().starts_with("sha256:"));
802 assert!(layer.media_type().to_string().contains("layer"));
803 }
804
805 Ok(())
806 }
807
808 #[test]
809 #[ignore = "makes network requests to Docker registry to fetch image config"]
810 async fn test_docker_fetch_config() -> anyhow::Result<()> {
811 let (client, _temp_download_dir, _temp_db_dir) = helper::setup_test_client().await;
812 let repository = "library/alpine";
813
814 let index = client
816 .fetch_index(repository, ReferenceSelector::tag("latest"))
817 .await?;
818
819 let manifest = client
820 .fetch_manifest(repository, index.manifests().first().unwrap().digest())
821 .await?;
822
823 let result = client
824 .fetch_config(repository, manifest.config().digest())
825 .await;
826 assert!(result.is_ok());
827
828 let config = result.unwrap();
829
830 assert_eq!(*config.os(), Os::Linux);
832 assert!(config.rootfs().typ() == "layers");
833 assert!(!config.rootfs().diff_ids().is_empty());
834
835 if let Some(created) = config.created() {
837 let created_time = DateTime::parse_from_rfc3339(created).unwrap();
838 assert!(created_time.timestamp_millis() > 0);
839 }
840 if let Some(config_fields) = config.config() {
841 if let Some(env) = config_fields.env() {
842 assert!(!env.is_empty());
843 }
844 if let Some(cmd) = config_fields.cmd() {
845 assert!(!cmd.is_empty());
846 }
847 }
848
849 Ok(())
850 }
851
852 #[test]
853 #[ignore = "makes network requests to Docker registry to fetch image blob"]
854 async fn test_docker_fetch_image_blob() -> anyhow::Result<()> {
855 let (client, temp_download_dir, _temp_db_dir) = helper::setup_test_client().await;
856 let repository = "library/alpine";
857
858 let index = client
860 .fetch_index(repository, ReferenceSelector::tag("latest"))
861 .await?;
862
863 let manifest = client
864 .fetch_manifest(repository, index.manifests().first().unwrap().digest())
865 .await?;
866
867 let layer = manifest.layers().first().unwrap();
868 let mut stream = client
869 .fetch_image_blob(repository, layer.digest(), 0..)
870 .await?;
871
872 let temp_file = temp_download_dir.path().join("test_blob");
874 let mut file = fs::File::create(&temp_file).await?;
875 let mut total_size = 0;
876
877 while let Some(chunk) = stream.next().await {
878 let bytes = chunk?;
879 total_size += bytes.len();
880 file.write_all(&bytes).await?;
881 }
882
883 assert!(total_size > 0);
885 assert_eq!(total_size as u64, layer.size());
886
887 let algorithm = layer.digest().algorithm();
889 let expected_hash = layer.digest().digest();
890 let actual_hash = hex::encode(utils::get_file_hash(&temp_file, algorithm).await?);
891 assert_eq!(actual_hash, expected_hash);
892
893 Ok(())
894 }
895
896 #[test]
897 #[ignore = "makes network requests to Docker registry to get authentication credentials"]
898 async fn test_docker_get_access_credentials() -> anyhow::Result<()> {
899 let (client, _temp_download_dir, _temp_db_dir) = helper::setup_test_client().await;
900
901 let result = client
902 .get_access_credentials("library/alpine", DOCKER_AUTH_SERVICE, &["pull"])
903 .await;
904
905 assert!(result.is_ok());
906 let credentials = result.unwrap();
907
908 assert!(!credentials.token.is_empty());
910 assert!(!credentials.access_token.is_empty());
911 assert!(credentials.expires_in > 0);
912
913 Ok(())
914 }
915}
916
917#[cfg(test)]
918mod helper {
919 use tempfile::TempDir;
920
921 use super::*;
922
923 pub(super) async fn setup_test_client() -> (DockerRegistry, TempDir, TempDir) {
925 let temp_download_dir = TempDir::new().unwrap();
926 let temp_db_dir = TempDir::new().unwrap();
927 let db_path = temp_db_dir.path().join("test.db");
928
929 let client = DockerRegistry::new(temp_download_dir.path().to_path_buf(), db_path)
930 .await
931 .unwrap();
932
933 (client, temp_download_dir, temp_db_dir)
934 }
935}