use std::{
ops::RangeBounds,
path::{Path, PathBuf},
};
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::{future, stream::BoxStream, StreamExt};
use getset::{Getters, Setters};
use oci_spec::image::{Digest, ImageConfiguration, ImageIndex, ImageManifest, Os, Platform};
use reqwest::Client;
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::{
fs::{self, OpenOptions},
io::AsyncWriteExt,
};
use crate::{
utils::{
self, OCI_CONFIG_FILENAME, OCI_INDEX_FILENAME, OCI_LAYER_SUBDIR, OCI_MANIFEST_FILENAME,
OCI_REPO_SUBDIR, OCI_SUBDIR,
},
MonocoreError, MonocoreResult,
};
use super::{AuthProvider, OciRegistryPull};
const DOCKER_REGISTRY_URL: &str = "https://registry-1.docker.io";
const DOCKER_AUTH_SERVICE: &str = "registry.docker.io";
const DOCKER_AUTH_REALM: &str = "https://auth.docker.io/token";
const DOCKER_MANIFEST_MIME_TYPE: &str = "application/vnd.docker.distribution.manifest.v2+json";
const DOCKER_MANIFEST_LIST_MIME_TYPE: &str =
"application/vnd.docker.distribution.manifest.list.v2+json";
const DOCKER_IMAGE_BLOB_MIME_TYPE: &str = "application/vnd.docker.image.rootfs.diff.tar.gzip";
const DOCKER_CONFIG_MIME_TYPE: &str = "application/vnd.docker.container.image.v1+json";
const DOCKER_REFERENCE_TYPE_ANNOTATION: &str = "vnd.docker.reference.type";
#[derive(Debug, Getters, Setters)]
#[getset(get = "pub with_prefix", set = "pub with_prefix")]
pub struct DockerRegistry {
client: ClientWithMiddleware,
oci_dir: PathBuf,
}
#[derive(Debug, Serialize, Deserialize, Getters, Setters)]
#[getset(get = "pub with_prefix", set = "pub with_prefix")]
pub struct DockerAuthMaterial {
token: String,
access_token: String,
expires_in: u32,
issued_at: DateTime<Utc>,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(untagged)]
pub enum DockerRegistryResponse<T> {
Ok(T),
Error(DockerRegistryResponseError),
}
#[derive(Debug, Serialize, Deserialize, Error)]
#[error("docker registry error: {errors}")]
pub struct DockerRegistryResponseError {
errors: serde_json::Value,
}
impl DockerRegistry {
pub fn new() -> Self {
Self::with_oci_dir(utils::monocore_home_path().join(OCI_SUBDIR))
}
pub fn with_oci_dir(oci_dir: PathBuf) -> Self {
let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3);
let client_builder = ClientBuilder::new(Client::new());
let client = client_builder
.with(RetryTransientMiddleware::new_with_policy(retry_policy))
.build();
Self { client, oci_dir }
}
fn get_downloaded_file_size(&self, path: &Path) -> u64 {
if !path.exists() {
return 0;
}
path.metadata().unwrap().len()
}
async fn download_image_blob(
&self,
repository: &str,
digest: &Digest,
download_size: u64,
destination: PathBuf,
) -> MonocoreResult<()> {
if let Some(parent) = destination.parent() {
fs::create_dir_all(parent).await?;
}
let downloaded_size = self.get_downloaded_file_size(&destination);
let mut file = if downloaded_size == 0 {
OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&destination)
.await?
} else if downloaded_size < download_size {
OpenOptions::new().append(true).open(&destination).await?
} else {
tracing::info!(
"file already exists skipping download: {}",
destination.display()
);
return Ok(());
};
let mut stream = self
.fetch_image_blob(repository, digest, downloaded_size..)
.await?;
while let Some(chunk) = stream.next().await {
let bytes = chunk?;
file.write_all(&bytes).await?;
}
let algorithm = digest.algorithm();
let expected_hash = digest.digest();
let actual_hash = hex::encode(utils::get_file_hash(&destination, algorithm).await?);
if actual_hash != expected_hash {
fs::remove_file(destination).await?;
return Err(MonocoreError::ImageLayerDownloadFailed(format!(
"({repository}:{digest}) file hash {actual_hash} does not match expected hash {expected_hash}",
)));
}
Ok(())
}
}
#[async_trait::async_trait]
impl AuthProvider for DockerRegistry {
type AuthMaterial = DockerAuthMaterial;
async fn get_auth_material(
&self,
repository: &str,
service: &str,
scopes: &[&str],
) -> MonocoreResult<Self::AuthMaterial> {
let request = self
.client
.get(DOCKER_AUTH_REALM)
.query(&[
("service", service),
(
"scope",
format!("repository:{}:{}", repository, scopes.join(",")).as_str(),
),
])
.build()?;
let response = self.client.execute(request).await?;
let auth_credentials = response.json::<DockerAuthMaterial>().await?;
Ok(auth_credentials)
}
}
#[async_trait::async_trait]
impl OciRegistryPull for DockerRegistry {
async fn pull_image(&self, repository: &str, tag: Option<&str>) -> MonocoreResult<()> {
let tag = tag.unwrap_or("latest");
let repo_tag = format!(
"{}__{}",
utils::sanitize_name_for_path(repository),
utils::sanitize_name_for_path(tag)
);
let repo_tag_dir = self.oci_dir.join(OCI_REPO_SUBDIR).join(&repo_tag);
fs::create_dir_all(&repo_tag_dir).await?;
let index = self.fetch_index(repository, Some(tag)).await?;
let index_path = repo_tag_dir.join(OCI_INDEX_FILENAME);
fs::write(&index_path, serde_json::to_string_pretty(&index)?).await?;
let platform = Platform::default();
let manifest_desc = index
.manifests()
.iter()
.find(|m| {
m.platform().as_ref().is_some_and(|p| {
matches!(p.os(), Os::Linux) &&
p.architecture() == platform.architecture() &&
!m.annotations().as_ref().is_some_and(|a| a.contains_key(DOCKER_REFERENCE_TYPE_ANNOTATION))
})
})
.or_else(|| {
index.manifests().iter().find(|m| {
m.platform().as_ref().is_some_and(|p| {
p.architecture() == platform.architecture() &&
!m.annotations().as_ref().is_some_and(|a| a.contains_key(DOCKER_REFERENCE_TYPE_ANNOTATION))
})
})
})
.ok_or(MonocoreError::ManifestNotFound)?;
let manifest = self
.fetch_manifest(repository, manifest_desc.digest())
.await?;
let manifest_path = repo_tag_dir.join(OCI_MANIFEST_FILENAME);
fs::write(&manifest_path, serde_json::to_string_pretty(&manifest)?).await?;
let config = self
.fetch_config(repository, manifest.config().digest())
.await?;
let config_path = repo_tag_dir.join(OCI_CONFIG_FILENAME);
fs::write(&config_path, serde_json::to_string_pretty(&config)?).await?;
let layer_futures: Vec<_> = manifest
.layers()
.iter()
.map(|layer_desc| {
let layer_path = self
.oci_dir
.join(OCI_LAYER_SUBDIR)
.join(utils::sanitize_name_for_path(layer_desc.digest().as_ref()));
self.download_image_blob(
repository,
layer_desc.digest(),
layer_desc.size(),
layer_path,
)
})
.collect();
for result in future::join_all(layer_futures).await {
result?;
}
Ok(())
}
async fn fetch_index(&self, repository: &str, tag: Option<&str>) -> MonocoreResult<ImageIndex> {
let token = self
.get_auth_material(repository, DOCKER_AUTH_SERVICE, &["pull"])
.await?
.token;
let tag = tag.unwrap_or("latest");
let request = self
.client
.get(format!(
"{}/v2/{}/manifests/{}",
DOCKER_REGISTRY_URL, repository, tag
))
.bearer_auth(token)
.header("Accept", DOCKER_MANIFEST_LIST_MIME_TYPE)
.build()?;
let response = self.client.execute(request).await?;
let image_index = response
.json::<DockerRegistryResponse<ImageIndex>>()
.await?;
match image_index {
DockerRegistryResponse::Ok(index) => Ok(index),
DockerRegistryResponse::Error(err) => Err(err.into()),
}
}
async fn fetch_manifest(
&self,
repository: &str,
digest: &Digest,
) -> MonocoreResult<ImageManifest> {
let token = self
.get_auth_material(repository, DOCKER_AUTH_SERVICE, &["pull"])
.await?
.token;
let request = self
.client
.get(format!(
"{}/v2/{}/manifests/{}",
DOCKER_REGISTRY_URL, repository, digest
))
.bearer_auth(token)
.header("Accept", DOCKER_MANIFEST_MIME_TYPE)
.build()?;
let response = self.client.execute(request).await?;
let manifest = response
.json::<DockerRegistryResponse<ImageManifest>>()
.await?;
match manifest {
DockerRegistryResponse::Ok(manifest) => Ok(manifest),
DockerRegistryResponse::Error(err) => Err(err.into()),
}
}
async fn fetch_config(
&self,
repository: &str,
digest: &Digest,
) -> MonocoreResult<ImageConfiguration> {
let token = self
.get_auth_material(repository, DOCKER_AUTH_SERVICE, &["pull"])
.await?
.token;
let request = self
.client
.get(format!(
"{}/v2/{}/blobs/{}",
DOCKER_REGISTRY_URL, repository, digest
))
.bearer_auth(token)
.header("Accept", DOCKER_CONFIG_MIME_TYPE)
.build()?;
let response = self.client.execute(request).await?;
let config = response
.json::<DockerRegistryResponse<ImageConfiguration>>()
.await?;
match config {
DockerRegistryResponse::Ok(config) => Ok(config),
DockerRegistryResponse::Error(err) => Err(err.into()),
}
}
async fn fetch_image_blob(
&self,
repository: &str,
digest: &Digest,
range: impl RangeBounds<u64> + Send,
) -> MonocoreResult<BoxStream<'static, MonocoreResult<Bytes>>> {
let (start, end) = utils::convert_bounds(range);
let end = if end == u64::MAX {
"".to_string()
} else {
end.to_string()
};
tracing::info!("fetching blob: {repository} {digest} {start}-{end}");
let token = self
.get_auth_material(repository, DOCKER_AUTH_SERVICE, &["pull"])
.await?
.token;
let request = self
.client
.get(format!(
"{}/v2/{}/blobs/{}",
DOCKER_REGISTRY_URL, repository, digest
))
.bearer_auth(token)
.header("Accept", DOCKER_IMAGE_BLOB_MIME_TYPE)
.header("Range", format!("bytes={start}-{end}"))
.build()?;
let response = self.client.execute(request).await?;
let stream = response
.bytes_stream()
.map(|item| item.map_err(|e| e.into()));
Ok(stream.boxed())
}
}
impl Default for DockerRegistry {
fn default() -> Self {
Self::new()
}
}