1use {
8 crate::{
9 error::{DebianError, Result},
10 io::ContentDigest,
11 repository::{
12 reader_from_str, writer_from_str, CopyPhase, PublishEvent, ReleaseReader,
13 RepositoryRootReader, RepositoryWriteOperation, RepositoryWriter,
14 },
15 },
16 futures::StreamExt,
17 serde::{Deserialize, Serialize},
18};
19
20const RELEASE_FILES: &[&str; 4] = &["ChangeLog", "InRelease", "Release", "Release.gpg"];
22
23#[derive(Clone, Debug, Deserialize, Serialize)]
25#[serde(deny_unknown_fields)]
26pub struct RepositoryCopierConfig {
27 pub source_url: String,
29
30 pub destination_url: String,
32
33 #[serde(default)]
35 pub distributions: Vec<String>,
36
37 #[serde(default)]
39 pub distribution_paths: Vec<String>,
40
41 pub only_components: Option<Vec<String>>,
45
46 pub binary_packages_copy: Option<bool>,
48
49 pub binary_packages_only_architectures: Option<Vec<String>>,
54
55 pub installer_binary_packages_copy: Option<bool>,
57
58 pub installer_binary_packages_only_architectures: Option<Vec<String>>,
63
64 pub sources_copy: Option<bool>,
66}
67
68struct GenericCopy {
69 source_path: String,
70 dest_path: String,
71 expected_content: Option<(u64, ContentDigest)>,
72}
73
74pub struct RepositoryCopier {
93 only_components: Option<Vec<String>>,
95
96 binary_packages_copy: bool,
98 binary_packages_only_arches: Option<Vec<String>>,
100
101 installer_binary_packages_copy: bool,
103 installer_binary_packages_only_arches: Option<Vec<String>>,
105
106 sources_copy: bool,
108
109 installers_copy: bool,
111 #[allow(unused)]
113 installers_only_arches: Option<Vec<String>>,
114}
115
116impl Default for RepositoryCopier {
117 fn default() -> Self {
118 Self {
119 only_components: None,
120 binary_packages_copy: true,
121 binary_packages_only_arches: None,
122 installer_binary_packages_copy: true,
123 installer_binary_packages_only_arches: None,
124 sources_copy: true,
125 installers_copy: false,
127 installers_only_arches: None,
128 }
129 }
130}
131
132impl RepositoryCopier {
133 pub fn set_only_components(&mut self, components: impl Iterator<Item = String>) {
135 self.only_components = Some(components.collect());
136 }
137
138 pub fn set_binary_packages_copy(&mut self, value: bool) {
140 self.binary_packages_copy = value;
141 }
142
143 pub fn set_binary_packages_only_arches(&mut self, value: impl Iterator<Item = String>) {
147 self.binary_packages_only_arches = Some(value.collect::<Vec<_>>());
148 }
149
150 pub fn set_installer_binary_packages_copy(&mut self, value: bool) {
152 self.installer_binary_packages_copy = value;
153 }
154
155 pub fn set_installer_binary_packages_only_arches(
159 &mut self,
160 value: impl Iterator<Item = String>,
161 ) {
162 self.installer_binary_packages_only_arches = Some(value.collect::<Vec<_>>());
163 }
164
165 pub fn set_sources_copy(&mut self, value: bool) {
167 self.sources_copy = value;
168 }
169
170 pub async fn copy_from_config(
172 config: RepositoryCopierConfig,
173 max_copy_operations: usize,
174 progress_cb: &Option<Box<dyn Fn(PublishEvent) + Sync>>,
175 ) -> Result<()> {
176 let root_reader = reader_from_str(config.source_url)?;
177 let writer = writer_from_str(config.destination_url).await?;
178
179 let mut copier = Self::default();
180
181 if let Some(v) = config.only_components {
182 copier.set_only_components(v.into_iter());
183 }
184 if let Some(v) = config.binary_packages_copy {
185 copier.set_binary_packages_copy(v);
186 }
187 if let Some(v) = config.binary_packages_only_architectures {
188 copier.set_binary_packages_only_arches(v.into_iter());
189 }
190 if let Some(v) = config.installer_binary_packages_copy {
191 copier.set_installer_binary_packages_copy(v);
192 }
193 if let Some(v) = config.installer_binary_packages_only_architectures {
194 copier.set_installer_binary_packages_only_arches(v.into_iter());
195 }
196 if let Some(v) = config.sources_copy {
197 copier.set_sources_copy(v);
198 }
199
200 for dist in config.distributions {
201 copier
202 .copy_distribution(
203 root_reader.as_ref(),
204 writer.as_ref(),
205 &dist,
206 max_copy_operations,
207 progress_cb,
208 )
209 .await?;
210 }
211 for path in config.distribution_paths {
212 copier
213 .copy_distribution_path(
214 root_reader.as_ref(),
215 writer.as_ref(),
216 &path,
217 max_copy_operations,
218 progress_cb,
219 )
220 .await?;
221 }
222
223 Ok(())
224 }
225
226 pub async fn copy_distribution(
232 &self,
233 root_reader: &dyn RepositoryRootReader,
234 writer: &dyn RepositoryWriter,
235 distribution: &str,
236 max_copy_operations: usize,
237 progress_cb: &Option<Box<dyn Fn(PublishEvent) + Sync>>,
238 ) -> Result<()> {
239 self.copy_distribution_path(
240 root_reader,
241 writer,
242 &format!("dists/{}", distribution),
243 max_copy_operations,
244 progress_cb,
245 )
246 .await
247 }
248
249 pub async fn copy_distribution_path(
254 &self,
255 root_reader: &dyn RepositoryRootReader,
256 writer: &dyn RepositoryWriter,
257 distribution_path: &str,
258 max_copy_operations: usize,
259 progress_cb: &Option<Box<dyn Fn(PublishEvent) + Sync>>,
260 ) -> Result<()> {
261 let release = root_reader
262 .release_reader_with_distribution_path(distribution_path)
263 .await?;
264
265 if self.binary_packages_copy {
269 if let Some(cb) = progress_cb {
270 cb(PublishEvent::CopyPhaseBegin(CopyPhase::BinaryPackages));
271 }
272 self.copy_binary_packages(
273 root_reader,
274 writer,
275 release.as_ref(),
276 false,
277 max_copy_operations,
278 progress_cb,
279 )
280 .await?;
281 if let Some(cb) = progress_cb {
282 cb(PublishEvent::CopyPhaseEnd(CopyPhase::BinaryPackages));
283 }
284 }
285
286 if self.installer_binary_packages_copy {
287 if let Some(cb) = progress_cb {
288 cb(PublishEvent::CopyPhaseBegin(
289 CopyPhase::InstallerBinaryPackages,
290 ));
291 }
292 self.copy_binary_packages(
293 root_reader,
294 writer,
295 release.as_ref(),
296 true,
297 max_copy_operations,
298 progress_cb,
299 )
300 .await?;
301 if let Some(cb) = progress_cb {
302 cb(PublishEvent::CopyPhaseEnd(
303 CopyPhase::InstallerBinaryPackages,
304 ));
305 }
306 }
307
308 if self.sources_copy {
309 if let Some(cb) = progress_cb {
310 cb(PublishEvent::CopyPhaseBegin(CopyPhase::Sources));
311 }
312 self.copy_source_packages(
313 root_reader,
314 writer,
315 release.as_ref(),
316 max_copy_operations,
317 progress_cb,
318 )
319 .await?;
320 if let Some(cb) = progress_cb {
321 cb(PublishEvent::CopyPhaseEnd(CopyPhase::Sources));
322 }
323 }
324
325 if self.installers_copy {
326 if let Some(cb) = progress_cb {
327 cb(PublishEvent::CopyPhaseBegin(CopyPhase::Installers));
328 }
329 self.copy_installers(
330 root_reader,
331 writer,
332 release.as_ref(),
333 max_copy_operations,
334 progress_cb,
335 )
336 .await?;
337 if let Some(cb) = progress_cb {
338 cb(PublishEvent::CopyPhaseEnd(CopyPhase::Installers));
339 }
340 }
341
342 if let Some(cb) = progress_cb {
345 cb(PublishEvent::CopyPhaseBegin(CopyPhase::ReleaseIndices));
346 }
347 self.copy_release_indices(
348 root_reader,
349 writer,
350 release.as_ref(),
351 max_copy_operations,
352 progress_cb,
353 )
354 .await?;
355 if let Some(cb) = progress_cb {
356 cb(PublishEvent::CopyPhaseEnd(CopyPhase::ReleaseIndices));
357 }
358
359 if let Some(cb) = progress_cb {
361 cb(PublishEvent::CopyPhaseBegin(CopyPhase::ReleaseFiles));
362 }
363 self.copy_release_files(
364 root_reader,
365 writer,
366 distribution_path,
367 max_copy_operations,
368 progress_cb,
369 )
370 .await?;
371 if let Some(cb) = progress_cb {
372 cb(PublishEvent::CopyPhaseEnd(CopyPhase::ReleaseFiles));
373 }
374
375 Ok(())
376 }
377
378 async fn copy_binary_packages(
379 &self,
380 root_reader: &dyn RepositoryRootReader,
381 writer: &dyn RepositoryWriter,
382 release: &dyn ReleaseReader,
383 installer_packages: bool,
384 max_copy_operations: usize,
385 progress_cb: &Option<Box<dyn Fn(PublishEvent) + Sync>>,
386 ) -> Result<()> {
387 let only_arches = if installer_packages {
388 self.installer_binary_packages_only_arches.clone()
389 } else {
390 self.binary_packages_only_arches.clone()
391 };
392 let only_components = self.only_components.clone();
393
394 let copies = release
395 .resolve_package_fetches(
396 Box::new(move |entry| {
397 let component_allowed = if let Some(only_components) = &only_components {
398 only_components.contains(&entry.component.to_string())
399 } else {
400 true
401 };
402
403 let arch_allowed = if let Some(only_arches) = &only_arches {
404 only_arches.contains(&entry.architecture.to_string())
405 } else {
406 true
407 };
408
409 component_allowed && arch_allowed && entry.is_installer == installer_packages
410 }),
411 Box::new(move |_| true),
412 max_copy_operations,
413 )
414 .await?
415 .into_iter()
416 .map(|bpf| GenericCopy {
417 source_path: bpf.path.clone(),
418 dest_path: bpf.path,
419 expected_content: Some((bpf.size, bpf.digest)),
420 })
421 .collect::<Vec<_>>();
422
423 perform_copies(
424 root_reader,
425 writer,
426 copies,
427 max_copy_operations,
428 false,
429 progress_cb,
430 )
431 .await?;
432
433 Ok(())
434 }
435
436 async fn copy_source_packages(
437 &self,
438 root_reader: &dyn RepositoryRootReader,
439 writer: &dyn RepositoryWriter,
440 release: &dyn ReleaseReader,
441 max_copy_operations: usize,
442 progress_cb: &Option<Box<dyn Fn(PublishEvent) + Sync>>,
443 ) -> Result<()> {
444 let only_components = self.only_components.clone();
445
446 let copies = release
447 .resolve_source_fetches(
448 Box::new(move |entry| {
449 if let Some(only_components) = &only_components {
450 only_components.contains(&entry.component.to_string())
451 } else {
452 true
453 }
454 }),
455 Box::new(move |_| true),
456 max_copy_operations,
457 )
458 .await?
459 .into_iter()
460 .map(|spf| GenericCopy {
461 source_path: spf.path.clone(),
462 dest_path: spf.path.clone(),
463 expected_content: Some((spf.size, spf.digest.clone())),
464 })
465 .collect::<Vec<_>>();
466
467 perform_copies(
468 root_reader,
469 writer,
470 copies,
471 max_copy_operations,
472 false,
473 progress_cb,
474 )
475 .await?;
476
477 Ok(())
478 }
479
480 async fn copy_installers(
481 &self,
482 _root_reader: &dyn RepositoryRootReader,
483 _writer: &dyn RepositoryWriter,
484 _release: &dyn ReleaseReader,
485 _max_copy_operations: usize,
486 _progress_cb: &Option<Box<dyn Fn(PublishEvent) + Sync>>,
487 ) -> Result<()> {
488 todo!();
491 }
492
493 async fn copy_release_indices(
494 &self,
495 root_reader: &dyn RepositoryRootReader,
496 writer: &dyn RepositoryWriter,
497 release: &dyn ReleaseReader,
498 max_copy_operations: usize,
499 progress_cb: &Option<Box<dyn Fn(PublishEvent) + Sync>>,
500 ) -> Result<()> {
501 let by_hash = release.release_file().acquire_by_hash().unwrap_or(false);
502
503 let copies = release
504 .classified_indices_entries()?
505 .into_iter()
506 .filter(|_| {
507 true
510 })
511 .map(move |entry| {
512 let path = if by_hash {
513 entry.by_hash_path()
514 } else {
515 entry.path.to_string()
516 };
517
518 let path = format!("{}/{}", release.root_relative_path(), path);
519
520 GenericCopy {
521 source_path: path.clone(),
522 dest_path: path,
523 expected_content: Some((entry.size, entry.digest.clone())),
524 }
525 })
526 .collect::<Vec<_>>();
527
528 perform_copies(
533 root_reader,
534 writer,
535 copies,
536 max_copy_operations,
537 true,
538 progress_cb,
539 )
540 .await?;
541
542 Ok(())
543 }
544
545 async fn copy_release_files(
546 &self,
547 root_reader: &dyn RepositoryRootReader,
548 writer: &dyn RepositoryWriter,
549 distribution_path: &str,
550 max_copy_operations: usize,
551 progress_cb: &Option<Box<dyn Fn(PublishEvent) + Sync>>,
552 ) -> Result<()> {
553 let copies = RELEASE_FILES
554 .iter()
555 .map(|path| {
556 let path = format!("{}/{}", distribution_path, path);
557
558 GenericCopy {
559 source_path: path.clone(),
560 dest_path: path,
561 expected_content: None,
562 }
563 })
564 .collect::<Vec<_>>();
565
566 perform_copies(
569 root_reader,
570 writer,
571 copies,
572 max_copy_operations,
573 true,
574 progress_cb,
575 )
576 .await?;
577
578 Ok(())
579 }
580}
581
582async fn perform_copies(
584 root_reader: &dyn RepositoryRootReader,
585 writer: &dyn RepositoryWriter,
586 copies: Vec<GenericCopy>,
587 max_copy_operations: usize,
588 allow_not_found: bool,
589 progress_cb: &Option<Box<dyn Fn(PublishEvent) + Sync>>,
590) -> Result<()> {
591 let mut total_size = 0;
592
593 let fs = copies
594 .into_iter()
595 .map(|op| {
596 if let Some((size, _)) = op.expected_content {
597 total_size += size;
598 }
599
600 writer.copy_from(
601 root_reader,
602 op.source_path.into(),
603 op.expected_content,
604 op.dest_path.into(),
605 progress_cb,
606 )
607 })
608 .collect::<Vec<_>>();
609
610 if let Some(cb) = progress_cb {
611 cb(PublishEvent::WriteSequenceBeginWithTotalBytes(total_size));
612 }
613
614 let mut buffered = futures::stream::iter(fs).buffer_unordered(max_copy_operations);
615
616 while let Some(res) = buffered.next().await {
617 match res {
618 Ok(write) => {
619 if let Some(cb) = progress_cb {
620 cb(PublishEvent::WriteSequenceProgressBytes(
621 write.bytes_written(),
622 ));
623
624 match write {
625 RepositoryWriteOperation::PathWritten(write) => {
626 cb(PublishEvent::PathCopied(
627 write.path.to_string(),
628 write.bytes_written,
629 ));
630 }
631 RepositoryWriteOperation::Noop(path, _) => {
632 cb(PublishEvent::PathCopyNoop(path.to_string()));
633 }
634 }
635 }
636 }
637 Err(DebianError::RepositoryIoPath(path, err))
638 if allow_not_found && matches!(err.kind(), std::io::ErrorKind::NotFound) =>
639 {
640 if let Some(cb) = progress_cb {
641 cb(PublishEvent::CopyIndicesPathNotFound(path));
642 }
643 }
644 Err(e) => return Err(e),
645 }
646 }
647
648 if let Some(cb) = progress_cb {
649 cb(PublishEvent::WriteSequenceFinished);
650 }
651
652 Ok(())
653}
654
655#[cfg(test)]
656mod test {
657 use {
658 super::*,
659 crate::repository::{
660 proxy_writer::{ProxyVerifyBehavior, ProxyWriter},
661 sink_writer::SinkWriter,
662 },
663 };
664 #[cfg(feature = "http")]
665 use crate::repository::http::HttpRepositoryClient;
666
667 const DEBIAN_URL: &str = "http://snapshot.debian.org/archive/debian/20211120T085721Z";
668
669 #[tokio::test]
670 #[cfg(feature = "http")]
671 async fn bullseye_copy() -> Result<()> {
672 let root =
673 Box::new(HttpRepositoryClient::new(DEBIAN_URL)?) as Box<dyn RepositoryRootReader>;
674 let mut writer = ProxyWriter::new(SinkWriter::default());
675 writer.set_verify_behavior(ProxyVerifyBehavior::AlwaysExistsIntegrityVerified);
676 let writer: Box<dyn RepositoryWriter> = Box::new(writer);
677
678 let mut copier = RepositoryCopier::default();
679 copier.set_binary_packages_copy(false);
680 copier.set_installer_binary_packages_copy(false);
681 copier.set_sources_copy(false);
682
683 let cb = Box::new(|_| {});
684
685 copier
686 .copy_distribution(root.as_ref(), writer.as_ref(), "bullseye", 8, &Some(cb))
687 .await?;
688
689 Ok(())
690 }
691}