use crate::client::RegistryClient;
use crate::layer::Layer;
use crate::models::{
DockerConfig, ErrorResponse, MediaType, Platform, RepositoryList, TagList, Token,
};
use crate::uri::RegistryUri;
use crate::{Result, error};
#[cfg(feature = "aws")]
use aws_config::BehaviorVersion;
use base64::Engine;
use bytes::Bytes;
use cfg_if::cfg_if;
use futures::stream::{Stream, TryStreamExt};
use home::home_dir;
use keyring::Entry;
use reqwest::Response;
use serde::Serialize;
use serde::de::DeserializeOwned;
use sha2::{Digest, Sha256};
use snafu::{OptionExt, ResultExt, ensure};
use url::Url;
const COMMON_AUTH_FILES: &[&str] = &[".finch/config.json", ".docker/config.json"];
#[derive(Clone, Debug)]
pub struct Registry {
uri: RegistryUri,
pub(crate) client: RegistryClient,
#[cfg(feature = "aws")]
is_ecr: bool,
}
unsafe impl Send for Registry {}
unsafe impl Sync for Registry {}
impl Registry {
pub async fn new(uri: &RegistryUri) -> Result<Self> {
let mut token = None;
#[cfg(feature = "aws")]
let mut is_ecr = false;
cfg_if! {
if #[cfg(feature = "aws")] {
if uri.base().starts_with("public.ecr.aws") {
debug!(target: "registry", "using public ecr");
let sdk_config = aws_config::defaults(BehaviorVersion::latest()).region("us-east-1").load().await;
let client = aws_sdk_ecrpublic::Client::new(&sdk_config);
let ecr_response = client.get_authorization_token().send()
.await
.map_err(|e| { error!("public ecr: {:?}", e); error::Error::Authorization { reason: e.to_string() } })?;
trace!(target: "registry", "public ecr authorization response: {:?}", ecr_response);
is_ecr = true;
token = ecr_response.authorization_data()
.and_then(|x| x.authorization_token.clone()
.map(Token::Bearer));
} else if uri.base().contains("ecr") {
debug!(target: "registry", "using private ecr");
let sdk_config = aws_config::load_defaults(BehaviorVersion::latest()).await;
let ecr_client = aws_sdk_ecr::Client::new(&sdk_config);
is_ecr = true;
let ecr_response = ecr_client.get_authorization_token()
.send()
.await
.map_err(|e| error::Error::Authorization { reason: e.to_string() })?;
trace!(target: "registry", "private ecr authorization response: {:?}", ecr_response);
token = ecr_response.authorization_data()
.first()
.and_then(|x| {
x.authorization_token().map(|y| {
let decoded = base64::engine::general_purpose::STANDARD.decode(y).unwrap();
Token::Basic { username: "AWS".to_string(), password: String::from_utf8_lossy(decoded.as_slice()).strip_prefix("AWS:").unwrap().to_string() }
})
});
}
}
}
if token.is_none() {
for file in COMMON_AUTH_FILES {
if let Some(path) = home_dir() {
let path = path.join(file);
if path.exists() {
let auth = tokio::fs::read_to_string(path)
.await
.context(error::FileSnafu)?;
let config: DockerConfig =
serde_json::from_str(&auth).context(error::ConfigDeserializeSnafu)?;
if let Some(entry) = config.auths.get(uri.base()) {
if entry.auth.is_none() && entry.identitytoken.is_none() {
if let Ok(entry) =
Entry::new("docker-credential-helpers", uri.base())
{
if let Ok(password) = entry.get_password() {
let decoded = base64::engine::general_purpose::STANDARD
.decode(password)
.unwrap();
let decoded = String::from_utf8_lossy(decoded.as_slice());
if decoded.contains(':') {
let (username, password) =
decoded.split_once(':').unwrap();
token = Some(Token::Basic {
username: username.to_string(),
password: password.to_string(),
});
} else {
token = Some(Token::Bearer(decoded.to_string()));
}
} else {
token = None;
}
}
} else {
token = Token::parse(entry.clone());
}
}
}
}
}
}
Ok(Self {
client: RegistryClient::new(token),
uri: uri.clone(),
#[cfg(feature = "aws")]
is_ecr,
})
}
pub fn set_secure(&mut self, flag: bool) {
self.uri.set_secure(flag);
}
pub fn uri(&self) -> &RegistryUri {
&self.uri
}
pub fn url(&self) -> crate::Result<Url> {
self.uri.clone().try_into()
}
fn repository_name(&self, repository: &str) -> String {
cfg_if! {
if #[cfg(feature = "aws")] {
if self.is_ecr {
if let Some(precursor) = self.uri().base().split_once('/').map(|x| x.1) {
format!("{}/{}", precursor, repository)
} else {
repository.to_string()
}
} else {
repository.to_string()
}
} else {
repository.to_string()
}
}
}
pub async fn catalog(&self) -> crate::Result<Vec<String>> {
let response = self.client.clone().catalog(self.url()?).await?;
trace!(target: "registry", "catalog: {:?}", response);
ensure!(
response.status().is_success(),
error::ListReposSnafu {
reason: response
.json::<ErrorResponse>()
.await
.context(error::ErrorDeserializeSnafu)?
}
);
let list: RepositoryList = Self::body(response).await?;
Ok(list.repositories)
}
pub(crate) async fn check_blob(&self, repository: &str, digest: &str) -> Result<bool> {
let repository = self.repository_name(repository);
let response = self
.client
.clone()
.head_blob(self.url()?, repository, digest.into())
.await?;
trace!(target: "registry", "head_blob: {:?}", response);
Ok(response.status().is_success())
}
pub(crate) async fn fetch_blob(
&self,
repository: &str,
digest: &str,
) -> Result<(
impl Stream<Item = std::result::Result<Bytes, std::io::Error>> + use<>,
u64,
)> {
let repository = self.repository_name(repository);
let response = self
.client
.clone()
.get_blob(self.url()?, repository, digest.into())
.await?;
trace!(target: "registry", "get_blob: {:?}", response);
ensure!(
response.status().is_success(),
error::FetchBlobSnafu {
reason: response
.json::<ErrorResponse>()
.await
.context(error::ErrorDeserializeSnafu)?
}
);
let size: u64 = response
.headers()
.clone()
.get("Content-Length")
.context(error::ContentLengthMissingSnafu)?
.to_str()
.context(error::ImproperHeaderSnafu)?
.parse()
.context(error::ContentLengthNotNumberSnafu)?;
Ok((response.bytes_stream().map_err(std::io::Error::other), size))
}
pub(crate) async fn delete_blob(&self, repository: &str, digest: &str) -> Result<()> {
let repository = self.repository_name(repository);
let response = self
.client
.del_blob(self.url()?, repository, digest.into())
.await?;
trace!(target: "registry", "del_blob: {:?}", response);
ensure!(
response.status().is_success(),
error::DeleteBlobSnafu {
digest,
reason: response
.json::<ErrorResponse>()
.await
.context(error::ErrorDeserializeSnafu)?
}
);
Ok(())
}
pub(crate) async fn check_manifest(&self, repository: &str, reference: &str) -> Result<bool> {
let repository = self.repository_name(repository);
let response = self
.client
.head_manifest(self.url()?, repository, reference.into())
.await?;
trace!(target: "registry", "head_manifest: {:?}", response);
Ok(response.status().is_success())
}
pub(crate) async fn fetch_manifest<T>(&self, repository: &str, reference: &str) -> Result<T>
where
T: DeserializeOwned,
{
let repository = self.repository_name(repository);
let response = self
.client
.get_manifest(self.url()?, repository, reference.into())
.await?;
trace!(target: "registry", "get_manifest: {:?}", response);
ensure!(
response.status().is_success(),
error::FetchManifestSnafu {
reason: response
.json::<ErrorResponse>()
.await
.context(error::ErrorDeserializeSnafu)?
}
);
Self::body(response).await
}
pub(crate) async fn push_manifest<T>(
&self,
media_type: &MediaType,
repository: &str,
reference: &str,
manifest: &T,
platform: Option<Platform>,
) -> Result<Layer>
where
T: Serialize,
{
let repository = self.repository_name(repository);
let bytes = serde_json::to_vec(manifest).context(error::SerializeSnafu)?;
let size = bytes.len();
let hash = Sha256::digest(bytes.as_slice());
let digest = format!("sha256:{}", base16::encode_lower(hash.as_slice()));
let response = self
.client
.put_manifest(
self.url()?,
repository,
reference.into(),
Bytes::from_owner(bytes),
)
.await?;
trace!(target: "registry", "put_manifest: {:?}", response);
ensure!(
response.status().is_success(),
error::PushImageSnafu {
uri: self.url()?.clone(),
reason: response
.json::<ErrorResponse>()
.await
.context(error::ErrorDeserializeSnafu)?
}
);
Ok(Layer::builder()
.digest(digest.clone())
.media_type(media_type.clone())
.size(size)
.maybe_platform(platform)
.build())
}
pub(crate) async fn get_tags(&self, repository: &str) -> Result<Vec<String>> {
let repository_name = self.repository_name(repository);
let response = self
.client
.get_tags(&self.url()?, repository_name.as_str())
.await?;
trace!(target: "registry", "get_tags: {:?}", response);
ensure!(
response.status().is_success(),
error::ListTagsSnafu {
reason: response
.json::<ErrorResponse>()
.await
.context(error::ErrorDeserializeSnafu)?
}
);
let taglist: TagList = Self::body(response).await?;
let mut tags = taglist.tags.clone();
tags.sort();
Ok(tags)
}
pub(crate) async fn delete_tag(&self, repository: &str, tag: &str) -> Result<()> {
let repository = self.repository_name(repository);
let response = self
.client
.del_manifest(self.url()?, repository, tag.into())
.await?;
trace!(target: "registry", "del_tag: {:?}", response);
ensure!(
response.status().is_success(),
error::DeleteTagSnafu {
tag: tag.to_string(),
reason: response
.json::<ErrorResponse>()
.await
.context(error::ErrorDeserializeSnafu)?
}
);
Ok(())
}
pub(crate) async fn body<T>(response: Response) -> crate::Result<T>
where
T: DeserializeOwned,
{
let value: serde_json::Value = response
.json()
.await
.context(error::ResponseDeserializeSnafu)?;
trace!(target: "registry", "RESPONSE BODY: {}", serde_json::to_string_pretty(&value).unwrap());
serde_json::from_value(value).context(error::BodyDeserializeSnafu)
}
}