use std::fmt::Formatter;
use {
crate::{
binary_package_control::BinaryPackageControlFile,
binary_package_list::BinaryPackageList,
control::ControlParagraphAsyncReader,
deb::reader::BinaryPackageReader,
debian_source_control::{DebianSourceControlFile, DebianSourceControlFileFetch},
debian_source_package_list::DebianSourcePackageList,
error::{DebianError, Result},
io::{drain_reader, Compression, ContentDigest, DataResolver},
repository::{
contents::{ContentsFile, ContentsFileAsyncReader},
release::{
ChecksumType, ClassifiedReleaseFileEntry, ContentsFileEntry, PackagesFileEntry,
ReleaseFile, SourcesFileEntry,
},
},
},
async_trait::async_trait,
futures::{AsyncRead, AsyncReadExt, StreamExt, TryStreamExt},
std::{borrow::Cow, collections::HashMap, ops::Deref, pin::Pin, str::FromStr},
};
pub mod builder;
pub mod contents;
pub mod copier;
pub mod filesystem;
#[cfg(feature = "http")]
pub mod http;
pub mod proxy_writer;
pub mod release;
#[cfg(feature = "s3")]
pub mod s3;
pub mod sink_writer;
#[derive(Clone, Debug)]
pub struct BinaryPackageFetch<'a> {
pub control_file: BinaryPackageControlFile<'a>,
pub path: String,
pub size: u64,
pub digest: ContentDigest,
}
pub struct SourcePackageFetch<'a> {
pub control_file: DebianSourceControlFile<'a>,
fetch: DebianSourceControlFileFetch,
}
impl<'a> Deref for SourcePackageFetch<'a> {
type Target = DebianSourceControlFileFetch;
fn deref(&self) -> &Self::Target {
&self.fetch
}
}
#[async_trait]
pub trait RepositoryRootReader: DataResolver + Sync {
fn url(&self) -> Result<url::Url>;
async fn release_reader(&self, distribution: &str) -> Result<Box<dyn ReleaseReader>> {
self.release_reader_with_distribution_path(&format!(
"dists/{}",
distribution.trim_matches('/')
))
.await
}
async fn release_reader_with_distribution_path(
&self,
path: &str,
) -> Result<Box<dyn ReleaseReader>>;
async fn fetch_inrelease(&self, path: &str) -> Result<ReleaseFile<'static>> {
let mut reader = self.get_path(path).await?;
let mut data = vec![];
reader.read_to_end(&mut data).await?;
Ok(ReleaseFile::from_armored_reader(std::io::Cursor::new(
data,
))?)
}
async fn fetch_binary_package_generic<'fetch>(
&self,
fetch: BinaryPackageFetch<'fetch>,
) -> Result<Pin<Box<dyn AsyncRead + Send>>> {
self.get_path_with_digest_verification(&fetch.path, fetch.size, fetch.digest)
.await
}
async fn fetch_binary_package_deb_reader<'fetch>(
&self,
fetch: BinaryPackageFetch<'fetch>,
) -> Result<BinaryPackageReader<std::io::Cursor<Vec<u8>>>> {
let mut reader = self.fetch_binary_package_generic(fetch).await?;
let mut buf = vec![];
reader.read_to_end(&mut buf).await?;
Ok(BinaryPackageReader::new(std::io::Cursor::new(buf))?)
}
async fn fetch_source_package_generic<'fetch>(
&self,
fetch: SourcePackageFetch<'fetch>,
) -> Result<Pin<Box<dyn AsyncRead + Send>>> {
self.get_path_with_digest_verification(&fetch.path, fetch.size, fetch.digest.clone())
.await
}
}
#[async_trait]
pub trait ReleaseReader: DataResolver + Sync {
fn url(&self) -> Result<url::Url>;
fn root_relative_path(&self) -> &str;
fn release_file(&self) -> &ReleaseFile<'_>;
fn retrieve_checksum(&self) -> Result<ChecksumType> {
let release = self.release_file();
let checksum = &[ChecksumType::Sha256, ChecksumType::Sha1, ChecksumType::Md5]
.iter()
.find(|variant| release.field(variant.field_name()).is_some())
.ok_or(DebianError::RepositoryReadReleaseNoKnownChecksum)?;
Ok(**checksum)
}
fn preferred_compression(&self) -> Compression;
fn set_preferred_compression(&mut self, compression: Compression);
fn classified_indices_entries(&self) -> Result<Vec<ClassifiedReleaseFileEntry<'_>>> {
self.release_file()
.iter_classified_index_files(self.retrieve_checksum()?)
.ok_or(DebianError::ReleaseNoIndicesFiles)?
.collect::<Result<Vec<_>>>()
}
fn packages_indices_entries(&self) -> Result<Vec<PackagesFileEntry<'_>>> {
Ok(
if let Some(entries) = self
.release_file()
.iter_packages_indices(self.retrieve_checksum()?)
{
entries.collect::<Result<Vec<_>>>()?
} else {
vec![]
},
)
}
fn packages_indices_entries_preferred_compression(&self) -> Result<Vec<PackagesFileEntry<'_>>> {
let mut entries = HashMap::new();
for entry in self.packages_indices_entries()? {
entries
.entry((
entry.component.clone(),
entry.architecture.clone(),
entry.is_installer,
))
.or_insert_with(Vec::new)
.push(entry);
}
entries
.into_values()
.map(|candidates| {
if let Some(entry) = candidates
.iter()
.find(|entry| entry.compression == self.preferred_compression())
{
Ok(entry.clone())
} else {
for compression in Compression::default_preferred_order() {
if let Some(entry) = candidates
.iter()
.find(|entry| entry.compression == compression)
{
return Ok(entry.clone());
}
}
Err(DebianError::RepositoryReadPackagesIndicesEntryNotFound)
}
})
.collect::<Result<Vec<_>>>()
}
fn contents_indices_entries(&self) -> Result<Vec<ContentsFileEntry<'_>>> {
Ok(
if let Some(entries) = self
.release_file()
.iter_contents_indices(self.retrieve_checksum()?)
{
entries.collect::<Result<Vec<_>>>()?
} else {
vec![]
},
)
}
fn sources_indices_entries(&self) -> Result<Vec<SourcesFileEntry<'_>>> {
Ok(
if let Some(entries) = self
.release_file()
.iter_sources_indices(self.retrieve_checksum()?)
{
entries.collect::<Result<Vec<_>>>()?
} else {
vec![]
},
)
}
fn sources_indices_entries_preferred_compression(&self) -> Result<Vec<SourcesFileEntry<'_>>> {
let mut entries = HashMap::new();
for entry in self.sources_indices_entries()? {
entries
.entry(entry.component.clone())
.or_insert_with(Vec::new)
.push(entry);
}
entries
.into_values()
.map(|candidates| {
if let Some(entry) = candidates
.iter()
.find(|entry| entry.compression == self.preferred_compression())
{
Ok(entry.clone())
} else {
for compression in Compression::default_preferred_order() {
if let Some(entry) = candidates
.iter()
.find(|entry| entry.compression == compression)
{
return Ok(entry.clone());
}
}
Err(DebianError::RepositoryReadPackagesIndicesEntryNotFound)
}
})
.collect::<Result<Vec<_>>>()
}
fn packages_entry(
&self,
component: &str,
architecture: &str,
is_installer: bool,
) -> Result<PackagesFileEntry<'_>> {
self.packages_indices_entries_preferred_compression()?
.into_iter()
.find(|entry| {
entry.component == component
&& entry.architecture == architecture
&& entry.is_installer == is_installer
})
.ok_or(DebianError::RepositoryReadPackagesIndicesEntryNotFound)
}
async fn resolve_packages_from_entry<'entry, 'slf: 'entry>(
&'slf self,
entry: &'entry PackagesFileEntry<'slf>,
) -> Result<BinaryPackageList<'static>> {
let release = self.release_file();
let path = if release.acquire_by_hash().unwrap_or_default() {
entry.by_hash_path()
} else {
entry.path.to_string()
};
let mut reader = ControlParagraphAsyncReader::new(futures::io::BufReader::new(
self.get_path_decoded_with_digest_verification(
&path,
entry.compression,
entry.size,
entry.digest.clone(),
)
.await?,
));
let mut res = BinaryPackageList::default();
while let Some(paragraph) = reader.read_paragraph().await? {
res.push(BinaryPackageControlFile::from(paragraph));
}
Ok(res)
}
async fn resolve_packages(
&self,
component: &str,
arch: &str,
is_installer: bool,
) -> Result<BinaryPackageList<'static>> {
let entry = self.packages_entry(component, arch, is_installer)?;
self.resolve_packages_from_entry(&entry).await
}
async fn resolve_package_fetches(
&self,
packages_file_filter: Box<dyn (Fn(PackagesFileEntry) -> bool) + Send>,
binary_package_filter: Box<dyn (Fn(BinaryPackageControlFile) -> bool) + Send>,
threads: usize,
) -> Result<Vec<BinaryPackageFetch<'_>>> {
let packages_entries = self.packages_indices_entries_preferred_compression()?;
let fs = packages_entries
.iter()
.filter(|entry| packages_file_filter((*entry).clone()))
.map(|entry| self.resolve_packages_from_entry(entry))
.collect::<Vec<_>>();
let mut packages_fs = futures::stream::iter(fs).buffer_unordered(threads);
let mut fetches = vec![];
while let Some(pl) = packages_fs.try_next().await? {
for cf in pl.into_iter() {
let cf: BinaryPackageControlFile = cf;
if binary_package_filter(cf.clone()) {
let path = cf.required_field_str("Filename")?.to_string();
let size = cf.field_u64("Size").ok_or_else(|| {
DebianError::ControlRequiredFieldMissing("Size".to_string())
})??;
let digest = ChecksumType::preferred_order()
.find_map(|checksum| {
cf.field_str(checksum.field_name()).map(|hex_digest| {
ContentDigest::from_hex_digest(checksum, hex_digest)
})
})
.ok_or(DebianError::RepositoryReadCouldNotDeterminePackageDigest)??;
fetches.push(BinaryPackageFetch {
control_file: cf,
path,
size,
digest,
});
}
}
}
Ok(fetches)
}
fn sources_entry(&self, component: &str) -> Result<SourcesFileEntry<'_>> {
self.sources_indices_entries_preferred_compression()?
.into_iter()
.find(|entry| entry.component == component)
.ok_or(DebianError::RepositoryReadSourcesIndicesEntryNotFound)
}
async fn resolve_sources_from_entry<'entry, 'slf: 'entry>(
&'slf self,
entry: &'entry SourcesFileEntry<'slf>,
) -> Result<DebianSourcePackageList<'static>> {
let release = self.release_file();
let path = if release.acquire_by_hash().unwrap_or_default() {
entry.by_hash_path()
} else {
entry.path.to_string()
};
let mut reader = ControlParagraphAsyncReader::new(futures::io::BufReader::new(
self.get_path_decoded_with_digest_verification(
&path,
entry.compression,
entry.size,
entry.digest.clone(),
)
.await?,
));
let mut res = DebianSourcePackageList::default();
while let Some(paragraph) = reader.read_paragraph().await? {
res.push(paragraph.into());
}
Ok(res)
}
async fn resolve_sources(&self, component: &str) -> Result<DebianSourcePackageList<'static>> {
let entry = self.sources_entry(component)?;
self.resolve_sources_from_entry(&entry).await
}
async fn resolve_source_fetches(
&self,
sources_file_filter: Box<dyn (Fn(SourcesFileEntry) -> bool) + Send>,
source_package_filter: Box<dyn (Fn(DebianSourceControlFile) -> bool) + Send>,
threads: usize,
) -> Result<Vec<SourcePackageFetch<'_>>> {
let sources_entries = self.sources_indices_entries_preferred_compression()?;
let fs = sources_entries
.iter()
.filter(|entry| sources_file_filter((*entry).clone()))
.map(|entry| self.resolve_sources_from_entry(entry))
.collect::<Vec<_>>();
let mut sources_fs = futures::stream::iter(fs).buffer_unordered(threads);
let mut fetches = vec![];
while let Some(pl) = sources_fs.try_next().await? {
for cf in pl.into_iter() {
if source_package_filter(cf.clone_no_signatures()) {
for fetch in cf.file_fetches(self.retrieve_checksum()?)? {
let fetch = fetch?;
fetches.push(SourcePackageFetch {
control_file: cf.clone_no_signatures(),
fetch,
});
}
}
}
}
Ok(fetches)
}
fn contents_entry(
&self,
component: &str,
architecture: &str,
is_installer: bool,
) -> Result<ContentsFileEntry> {
let entries = self
.contents_indices_entries()?
.into_iter()
.filter(|entry| {
entry.component == component
&& entry.architecture == architecture
&& entry.is_installer == is_installer
})
.collect::<Vec<_>>();
if let Some(entry) = entries
.iter()
.find(|entry| entry.compression == self.preferred_compression())
{
Ok(entry.clone())
} else {
for compression in Compression::default_preferred_order() {
if let Some(entry) = entries
.iter()
.find(|entry| entry.compression == compression)
{
return Ok(entry.clone());
}
}
Err(DebianError::RepositoryReadContentsIndicesEntryNotFound)
}
}
async fn resolve_contents(
&self,
component: &str,
architecture: &str,
is_installer: bool,
) -> Result<ContentsFile> {
let release = self.release_file();
let entry = self.contents_entry(component, architecture, is_installer)?;
let path = if release.acquire_by_hash().unwrap_or_default() {
entry.by_hash_path()
} else {
entry.path.to_string()
};
let reader = self
.get_path_decoded_with_digest_verification(
&path,
entry.compression,
entry.size,
entry.digest.clone(),
)
.await?;
let mut reader = ContentsFileAsyncReader::new(futures::io::BufReader::new(reader));
reader.read_all().await?;
let (contents, reader) = reader.consume();
drain_reader(reader)
.await
.map_err(|e| DebianError::RepositoryIoPath(path, e))?;
Ok(contents)
}
}
#[derive(Clone, Copy, Debug)]
pub enum RepositoryPathVerificationState {
ExistsNoIntegrityCheck,
ExistsIntegrityVerified,
ExistsIntegrityMismatch,
Missing,
}
#[derive(Clone, Debug)]
pub struct RepositoryPathVerification<'a> {
pub path: &'a str,
pub state: RepositoryPathVerificationState,
}
impl<'a> std::fmt::Display for RepositoryPathVerification<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.state {
RepositoryPathVerificationState::ExistsNoIntegrityCheck => {
write!(f, "{} exists (no integrity check performed)", self.path)
}
RepositoryPathVerificationState::ExistsIntegrityVerified => {
write!(f, "{} exists (integrity verified)", self.path)
}
RepositoryPathVerificationState::ExistsIntegrityMismatch => {
write!(f, "{} exists (integrity mismatch!)", self.path)
}
RepositoryPathVerificationState::Missing => {
write!(f, "{} missing", self.path)
}
}
}
}
#[derive(Clone, Copy, Debug)]
pub enum CopyPhase {
BinaryPackages,
InstallerBinaryPackages,
Sources,
Installers,
ReleaseIndices,
ReleaseFiles,
}
impl std::fmt::Display for CopyPhase {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
match self {
Self::BinaryPackages => "binary packages",
Self::InstallerBinaryPackages => "installer binary packages",
Self::Sources => "sources",
Self::Installers => "installers",
Self::ReleaseIndices => "release indices",
Self::ReleaseFiles => "release files",
}
)
}
}
pub enum PublishEvent {
ResolvedPoolArtifacts(usize),
PoolArtifactCurrent(String),
PoolArtifactMissing(String),
PoolArtifactsToPublish(usize),
PoolArtifactCreated(String, u64),
IndexFileToWrite(String),
IndexFileWritten(String, u64),
VerifyingDestinationPath(String),
CopyPhaseBegin(CopyPhase),
CopyPhaseEnd(CopyPhase),
CopyingPath(String, String),
CopyIndicesPathNotFound(String),
PathCopied(String, u64),
PathCopyNoop(String),
WriteSequenceBeginWithTotalBytes(u64),
WriteSequenceProgressBytes(u64),
WriteSequenceFinished,
}
impl std::fmt::Display for PublishEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ResolvedPoolArtifacts(count) => {
write!(f, "resolved {} needed pool artifacts", count)
}
Self::PoolArtifactCurrent(path) => {
write!(f, "pool path {} is present", path)
}
Self::PoolArtifactMissing(path) => {
write!(f, "pool path {} will be written", path)
}
Self::PoolArtifactsToPublish(count) => {
write!(f, "{} pool artifacts will be written", count)
}
Self::PoolArtifactCreated(path, size) => {
write!(f, "wrote {} bytes to {}", size, path)
}
Self::IndexFileToWrite(path) => {
write!(f, "index file {} will be written", path)
}
Self::IndexFileWritten(path, size) => {
write!(f, "wrote {} bytes to {}", size, path)
}
Self::VerifyingDestinationPath(path) => {
write!(f, "verifying destination path {}", path)
}
Self::CopyPhaseBegin(phase) => {
write!(f, "beginning copying of {}", phase)
}
Self::CopyPhaseEnd(phase) => {
write!(f, "finished copying of {}", phase)
}
Self::CopyingPath(source, dest) => {
write!(f, "copying {} to {}", source, dest)
}
Self::CopyIndicesPathNotFound(path) => {
write!(
f,
"copying indices file {} failed because it wasn't found",
path
)
}
Self::PathCopied(path, size) => {
write!(f, "copied {} bytes to {}", size, path)
}
Self::PathCopyNoop(path) => {
write!(f, "copy of {} was a no-op", path)
}
Self::WriteSequenceBeginWithTotalBytes(_)
| Self::WriteSequenceProgressBytes(_)
| Self::WriteSequenceFinished => Ok(()),
}
}
}
impl PublishEvent {
pub fn is_loggable(&self) -> bool {
!self.is_progress()
}
pub fn is_progress(&self) -> bool {
matches!(
self,
Self::WriteSequenceBeginWithTotalBytes(_)
| Self::WriteSequenceProgressBytes(_)
| Self::WriteSequenceFinished
)
}
}
#[derive(Clone, Debug)]
pub struct RepositoryWrite<'a> {
pub path: Cow<'a, str>,
pub bytes_written: u64,
}
pub enum RepositoryWriteOperation<'a> {
PathWritten(RepositoryWrite<'a>),
Noop(Cow<'a, str>, u64),
}
impl<'a> RepositoryWriteOperation<'a> {
pub fn bytes_written(&self) -> u64 {
match self {
Self::PathWritten(write) => write.bytes_written,
Self::Noop(_, size) => *size,
}
}
}
#[async_trait]
pub trait RepositoryWriter: Sync {
async fn verify_path<'path>(
&self,
path: &'path str,
expected_content: Option<(u64, ContentDigest)>,
) -> Result<RepositoryPathVerification<'path>>;
async fn write_path<'path, 'reader>(
&self,
path: Cow<'path, str>,
reader: Pin<Box<dyn AsyncRead + Send + 'reader>>,
) -> Result<RepositoryWrite<'path>>;
async fn copy_from<'path>(
&self,
reader: &dyn RepositoryRootReader,
source_path: Cow<'path, str>,
expected_content: Option<(u64, ContentDigest)>,
dest_path: Cow<'path, str>,
progress_cb: &Option<Box<dyn Fn(PublishEvent) + Sync>>,
) -> Result<RepositoryWriteOperation<'path>> {
if let Some(cb) = progress_cb {
cb(PublishEvent::VerifyingDestinationPath(
dest_path.to_string(),
));
}
let verification = self
.verify_path(dest_path.as_ref(), expected_content.clone())
.await?;
if matches!(
verification.state,
RepositoryPathVerificationState::ExistsIntegrityVerified
) {
return Ok(RepositoryWriteOperation::Noop(
dest_path,
if let Some((size, _)) = expected_content {
size
} else {
0
},
));
}
if let Some(cb) = progress_cb {
cb(PublishEvent::CopyingPath(
source_path.to_string(),
dest_path.to_string(),
));
}
let reader = if let Some((size, digest)) = expected_content {
reader
.get_path_with_digest_verification(source_path.as_ref(), size, digest)
.await?
} else {
reader.get_path(source_path.as_ref()).await?
};
let write = self.write_path(dest_path, reader).await?;
Ok(RepositoryWriteOperation::PathWritten(write))
}
}
pub fn reader_from_str(s: impl ToString) -> Result<Box<dyn RepositoryRootReader>> {
let s = s.to_string();
if s.contains("://") {
let url = url::Url::parse(&s)?;
match url.scheme() {
"file" => Ok(Box::new(filesystem::FilesystemRepositoryReader::new(
url.to_file_path()
.expect("path conversion should always work for file://"),
))),
#[cfg(feature = "http")]
"http" | "https" => Ok(Box::new(http::HttpRepositoryClient::new(url)?)),
_ => Err(DebianError::RepositoryReaderUnrecognizedUrl(s)),
}
} else {
Ok(Box::new(filesystem::FilesystemRepositoryReader::new(s)))
}
}
pub async fn writer_from_str(s: impl ToString) -> Result<Box<dyn RepositoryWriter>> {
let s = s.to_string();
if s.contains("://") {
let url = url::Url::parse(&s)?;
match url.scheme() {
"file" => Ok(Box::new(filesystem::FilesystemRepositoryWriter::new(
url.to_file_path()
.expect("path conversion should always work for file://"),
))),
"null" => {
let mut writer = sink_writer::SinkWriter::default();
let behavior = match url.host_str() {
Some(s) => sink_writer::SinkWriterVerifyBehavior::from_str(s)?,
None => sink_writer::SinkWriterVerifyBehavior::Missing,
};
writer.set_verify_behavior(behavior);
Ok(Box::new(writer))
}
#[cfg(feature = "s3")]
"s3" => {
let path = url.path();
if let Some((bucket, prefix)) = path.trim_matches('/').split_once('/') {
let region = s3::get_bucket_region(bucket).await?;
Ok(Box::new(s3::S3Writer::new(region, bucket, Some(prefix))))
} else {
let region = s3::get_bucket_region(path).await?;
Ok(Box::new(s3::S3Writer::new(region, path, None)))
}
}
_ => Err(DebianError::RepositoryWriterUnrecognizedUrl(s)),
}
} else {
Ok(Box::new(filesystem::FilesystemRepositoryWriter::new(s)))
}
}