use {
crate::{
error::{DebianError, Result},
io::ContentDigest,
repository::{
reader_from_str, writer_from_str, CopyPhase, PublishEvent, ReleaseReader,
RepositoryRootReader, RepositoryWriteOperation, RepositoryWriter,
},
},
futures::StreamExt,
serde::{Deserialize, Serialize},
};
const RELEASE_FILES: &[&str; 4] = &["ChangeLog", "InRelease", "Release", "Release.gpg"];
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct RepositoryCopierConfig {
pub source_url: String,
pub destination_url: String,
#[serde(default)]
pub distributions: Vec<String>,
#[serde(default)]
pub distribution_paths: Vec<String>,
pub only_components: Option<Vec<String>>,
pub binary_packages_copy: Option<bool>,
pub binary_packages_only_architectures: Option<Vec<String>>,
pub installer_binary_packages_copy: Option<bool>,
pub installer_binary_packages_only_architectures: Option<Vec<String>>,
pub sources_copy: Option<bool>,
}
struct GenericCopy {
source_path: String,
dest_path: String,
expected_content: Option<(u64, ContentDigest)>,
}
pub struct RepositoryCopier {
only_components: Option<Vec<String>>,
binary_packages_copy: bool,
binary_packages_only_arches: Option<Vec<String>>,
installer_binary_packages_copy: bool,
installer_binary_packages_only_arches: Option<Vec<String>>,
sources_copy: bool,
installers_copy: bool,
#[allow(unused)]
installers_only_arches: Option<Vec<String>>,
}
impl Default for RepositoryCopier {
fn default() -> Self {
Self {
only_components: None,
binary_packages_copy: true,
binary_packages_only_arches: None,
installer_binary_packages_copy: true,
installer_binary_packages_only_arches: None,
sources_copy: true,
installers_copy: false,
installers_only_arches: None,
}
}
}
impl RepositoryCopier {
pub fn set_only_components(&mut self, components: impl Iterator<Item = String>) {
self.only_components = Some(components.collect());
}
pub fn set_binary_packages_copy(&mut self, value: bool) {
self.binary_packages_copy = value;
}
pub fn set_binary_packages_only_arches(&mut self, value: impl Iterator<Item = String>) {
self.binary_packages_only_arches = Some(value.collect::<Vec<_>>());
}
pub fn set_installer_binary_packages_copy(&mut self, value: bool) {
self.installer_binary_packages_copy = value;
}
pub fn set_installer_binary_packages_only_arches(
&mut self,
value: impl Iterator<Item = String>,
) {
self.installer_binary_packages_only_arches = Some(value.collect::<Vec<_>>());
}
pub fn set_sources_copy(&mut self, value: bool) {
self.sources_copy = value;
}
pub async fn copy_from_config(
config: RepositoryCopierConfig,
max_copy_operations: usize,
progress_cb: &Option<Box<dyn Fn(PublishEvent) + Sync>>,
) -> Result<()> {
let root_reader = reader_from_str(config.source_url)?;
let writer = writer_from_str(config.destination_url).await?;
let mut copier = Self::default();
if let Some(v) = config.only_components {
copier.set_only_components(v.into_iter());
}
if let Some(v) = config.binary_packages_copy {
copier.set_binary_packages_copy(v);
}
if let Some(v) = config.binary_packages_only_architectures {
copier.set_binary_packages_only_arches(v.into_iter());
}
if let Some(v) = config.installer_binary_packages_copy {
copier.set_installer_binary_packages_copy(v);
}
if let Some(v) = config.installer_binary_packages_only_architectures {
copier.set_installer_binary_packages_only_arches(v.into_iter());
}
if let Some(v) = config.sources_copy {
copier.set_sources_copy(v);
}
for dist in config.distributions {
copier
.copy_distribution(
root_reader.as_ref(),
writer.as_ref(),
&dist,
max_copy_operations,
progress_cb,
)
.await?;
}
for path in config.distribution_paths {
copier
.copy_distribution_path(
root_reader.as_ref(),
writer.as_ref(),
&path,
max_copy_operations,
progress_cb,
)
.await?;
}
Ok(())
}
pub async fn copy_distribution(
&self,
root_reader: &dyn RepositoryRootReader,
writer: &dyn RepositoryWriter,
distribution: &str,
max_copy_operations: usize,
progress_cb: &Option<Box<dyn Fn(PublishEvent) + Sync>>,
) -> Result<()> {
self.copy_distribution_path(
root_reader,
writer,
&format!("dists/{}", distribution),
max_copy_operations,
progress_cb,
)
.await
}
pub async fn copy_distribution_path(
&self,
root_reader: &dyn RepositoryRootReader,
writer: &dyn RepositoryWriter,
distribution_path: &str,
max_copy_operations: usize,
progress_cb: &Option<Box<dyn Fn(PublishEvent) + Sync>>,
) -> Result<()> {
let release = root_reader
.release_reader_with_distribution_path(distribution_path)
.await?;
if self.binary_packages_copy {
if let Some(cb) = progress_cb {
cb(PublishEvent::CopyPhaseBegin(CopyPhase::BinaryPackages));
}
self.copy_binary_packages(
root_reader,
writer,
release.as_ref(),
false,
max_copy_operations,
progress_cb,
)
.await?;
if let Some(cb) = progress_cb {
cb(PublishEvent::CopyPhaseEnd(CopyPhase::BinaryPackages));
}
}
if self.installer_binary_packages_copy {
if let Some(cb) = progress_cb {
cb(PublishEvent::CopyPhaseBegin(
CopyPhase::InstallerBinaryPackages,
));
}
self.copy_binary_packages(
root_reader,
writer,
release.as_ref(),
true,
max_copy_operations,
progress_cb,
)
.await?;
if let Some(cb) = progress_cb {
cb(PublishEvent::CopyPhaseEnd(
CopyPhase::InstallerBinaryPackages,
));
}
}
if self.sources_copy {
if let Some(cb) = progress_cb {
cb(PublishEvent::CopyPhaseBegin(CopyPhase::Sources));
}
self.copy_source_packages(
root_reader,
writer,
release.as_ref(),
max_copy_operations,
progress_cb,
)
.await?;
if let Some(cb) = progress_cb {
cb(PublishEvent::CopyPhaseEnd(CopyPhase::Sources));
}
}
if self.installers_copy {
if let Some(cb) = progress_cb {
cb(PublishEvent::CopyPhaseBegin(CopyPhase::Installers));
}
self.copy_installers(
root_reader,
writer,
release.as_ref(),
max_copy_operations,
progress_cb,
)
.await?;
if let Some(cb) = progress_cb {
cb(PublishEvent::CopyPhaseEnd(CopyPhase::Installers));
}
}
if let Some(cb) = progress_cb {
cb(PublishEvent::CopyPhaseBegin(CopyPhase::ReleaseIndices));
}
self.copy_release_indices(
root_reader,
writer,
release.as_ref(),
max_copy_operations,
progress_cb,
)
.await?;
if let Some(cb) = progress_cb {
cb(PublishEvent::CopyPhaseEnd(CopyPhase::ReleaseIndices));
}
if let Some(cb) = progress_cb {
cb(PublishEvent::CopyPhaseBegin(CopyPhase::ReleaseFiles));
}
self.copy_release_files(
root_reader,
writer,
distribution_path,
max_copy_operations,
progress_cb,
)
.await?;
if let Some(cb) = progress_cb {
cb(PublishEvent::CopyPhaseEnd(CopyPhase::ReleaseFiles));
}
Ok(())
}
async fn copy_binary_packages(
&self,
root_reader: &dyn RepositoryRootReader,
writer: &dyn RepositoryWriter,
release: &dyn ReleaseReader,
installer_packages: bool,
max_copy_operations: usize,
progress_cb: &Option<Box<dyn Fn(PublishEvent) + Sync>>,
) -> Result<()> {
let only_arches = if installer_packages {
self.installer_binary_packages_only_arches.clone()
} else {
self.binary_packages_only_arches.clone()
};
let only_components = self.only_components.clone();
let copies = release
.resolve_package_fetches(
Box::new(move |entry| {
let component_allowed = if let Some(only_components) = &only_components {
only_components.contains(&entry.component.to_string())
} else {
true
};
let arch_allowed = if let Some(only_arches) = &only_arches {
only_arches.contains(&entry.architecture.to_string())
} else {
true
};
component_allowed && arch_allowed && entry.is_installer == installer_packages
}),
Box::new(move |_| true),
max_copy_operations,
)
.await?
.into_iter()
.map(|bpf| GenericCopy {
source_path: bpf.path.clone(),
dest_path: bpf.path,
expected_content: Some((bpf.size, bpf.digest)),
})
.collect::<Vec<_>>();
perform_copies(
root_reader,
writer,
copies,
max_copy_operations,
false,
progress_cb,
)
.await?;
Ok(())
}
async fn copy_source_packages(
&self,
root_reader: &dyn RepositoryRootReader,
writer: &dyn RepositoryWriter,
release: &dyn ReleaseReader,
max_copy_operations: usize,
progress_cb: &Option<Box<dyn Fn(PublishEvent) + Sync>>,
) -> Result<()> {
let only_components = self.only_components.clone();
let copies = release
.resolve_source_fetches(
Box::new(move |entry| {
if let Some(only_components) = &only_components {
only_components.contains(&entry.component.to_string())
} else {
true
}
}),
Box::new(move |_| true),
max_copy_operations,
)
.await?
.into_iter()
.map(|spf| GenericCopy {
source_path: spf.path.clone(),
dest_path: spf.path.clone(),
expected_content: Some((spf.size, spf.digest.clone())),
})
.collect::<Vec<_>>();
perform_copies(
root_reader,
writer,
copies,
max_copy_operations,
false,
progress_cb,
)
.await?;
Ok(())
}
async fn copy_installers(
&self,
_root_reader: &dyn RepositoryRootReader,
_writer: &dyn RepositoryWriter,
_release: &dyn ReleaseReader,
_max_copy_operations: usize,
_progress_cb: &Option<Box<dyn Fn(PublishEvent) + Sync>>,
) -> Result<()> {
todo!();
}
async fn copy_release_indices(
&self,
root_reader: &dyn RepositoryRootReader,
writer: &dyn RepositoryWriter,
release: &dyn ReleaseReader,
max_copy_operations: usize,
progress_cb: &Option<Box<dyn Fn(PublishEvent) + Sync>>,
) -> Result<()> {
let by_hash = release.release_file().acquire_by_hash().unwrap_or(false);
let copies = release
.classified_indices_entries()?
.into_iter()
.filter(|_| {
true
})
.map(move |entry| {
let path = if by_hash {
entry.by_hash_path()
} else {
entry.path.to_string()
};
let path = format!("{}/{}", release.root_relative_path(), path);
GenericCopy {
source_path: path.clone(),
dest_path: path,
expected_content: Some((entry.size, entry.digest.clone())),
}
})
.collect::<Vec<_>>();
perform_copies(
root_reader,
writer,
copies,
max_copy_operations,
true,
progress_cb,
)
.await?;
Ok(())
}
async fn copy_release_files(
&self,
root_reader: &dyn RepositoryRootReader,
writer: &dyn RepositoryWriter,
distribution_path: &str,
max_copy_operations: usize,
progress_cb: &Option<Box<dyn Fn(PublishEvent) + Sync>>,
) -> Result<()> {
let copies = RELEASE_FILES
.iter()
.map(|path| {
let path = format!("{}/{}", distribution_path, path);
GenericCopy {
source_path: path.clone(),
dest_path: path,
expected_content: None,
}
})
.collect::<Vec<_>>();
perform_copies(
root_reader,
writer,
copies,
max_copy_operations,
true,
progress_cb,
)
.await?;
Ok(())
}
}
async fn perform_copies(
root_reader: &dyn RepositoryRootReader,
writer: &dyn RepositoryWriter,
copies: Vec<GenericCopy>,
max_copy_operations: usize,
allow_not_found: bool,
progress_cb: &Option<Box<dyn Fn(PublishEvent) + Sync>>,
) -> Result<()> {
let mut total_size = 0;
let fs = copies
.into_iter()
.map(|op| {
if let Some((size, _)) = op.expected_content {
total_size += size;
}
writer.copy_from(
root_reader,
op.source_path.into(),
op.expected_content,
op.dest_path.into(),
progress_cb,
)
})
.collect::<Vec<_>>();
if let Some(cb) = progress_cb {
cb(PublishEvent::WriteSequenceBeginWithTotalBytes(total_size));
}
let mut buffered = futures::stream::iter(fs).buffer_unordered(max_copy_operations);
while let Some(res) = buffered.next().await {
match res {
Ok(write) => {
if let Some(cb) = progress_cb {
cb(PublishEvent::WriteSequenceProgressBytes(
write.bytes_written(),
));
match write {
RepositoryWriteOperation::PathWritten(write) => {
cb(PublishEvent::PathCopied(
write.path.to_string(),
write.bytes_written,
));
}
RepositoryWriteOperation::Noop(path, _) => {
cb(PublishEvent::PathCopyNoop(path.to_string()));
}
}
}
}
Err(DebianError::RepositoryIoPath(path, err))
if allow_not_found && matches!(err.kind(), std::io::ErrorKind::NotFound) =>
{
if let Some(cb) = progress_cb {
cb(PublishEvent::CopyIndicesPathNotFound(path));
}
}
Err(e) => return Err(e),
}
}
if let Some(cb) = progress_cb {
cb(PublishEvent::WriteSequenceFinished);
}
Ok(())
}
#[cfg(test)]
mod test {
use {
super::*,
crate::repository::{
proxy_writer::{ProxyVerifyBehavior, ProxyWriter},
sink_writer::SinkWriter,
},
};
#[cfg(feature = "http")]
use crate::repository::http::HttpRepositoryClient;
const DEBIAN_URL: &str = "http://snapshot.debian.org/archive/debian/20211120T085721Z";
#[tokio::test]
#[cfg(feature = "http")]
async fn bullseye_copy() -> Result<()> {
let root =
Box::new(HttpRepositoryClient::new(DEBIAN_URL)?) as Box<dyn RepositoryRootReader>;
let mut writer = ProxyWriter::new(SinkWriter::default());
writer.set_verify_behavior(ProxyVerifyBehavior::AlwaysExistsIntegrityVerified);
let writer: Box<dyn RepositoryWriter> = Box::new(writer);
let mut copier = RepositoryCopier::default();
copier.set_binary_packages_copy(false);
copier.set_installer_binary_packages_copy(false);
copier.set_sources_copy(false);
let cb = Box::new(|_| {});
copier
.copy_distribution(root.as_ref(), writer.as_ref(), "bullseye", 8, &Some(cb))
.await?;
Ok(())
}
}