debian_packaging/repository/
copier.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at https://mozilla.org/MPL/2.0/.
4
5/*! Debian repository copying. */
6
7use {
8    crate::{
9        error::{DebianError, Result},
10        io::ContentDigest,
11        repository::{
12            reader_from_str, writer_from_str, CopyPhase, PublishEvent, ReleaseReader,
13            RepositoryRootReader, RepositoryWriteOperation, RepositoryWriter,
14        },
15    },
16    futures::StreamExt,
17    serde::{Deserialize, Serialize},
18};
19
20/// Well-known files at the root of distribution/release directories.
21const RELEASE_FILES: &[&str; 4] = &["ChangeLog", "InRelease", "Release", "Release.gpg"];
22
23/// A configuration for initializing a [RepositoryCopier].
24#[derive(Clone, Debug, Deserialize, Serialize)]
25#[serde(deny_unknown_fields)]
26pub struct RepositoryCopierConfig {
27    /// The URL or path of the source repository to copy from.
28    pub source_url: String,
29
30    /// The URL or path of the destination repository to copy from.
31    pub destination_url: String,
32
33    /// Names of distributions to copy.
34    #[serde(default)]
35    pub distributions: Vec<String>,
36
37    /// Repository root relative paths of distributions to copy.
38    #[serde(default)]
39    pub distribution_paths: Vec<String>,
40
41    /// Filter of components to copy.
42    ///
43    /// If not defined, all components will be copied.
44    pub only_components: Option<Vec<String>>,
45
46    /// Whether to copy binary packages.
47    pub binary_packages_copy: Option<bool>,
48
49    /// Filter of architectures of binary packages to copy.
50    ///
51    /// If not defined, all architectures will be copied if binary packages are
52    /// being copied.
53    pub binary_packages_only_architectures: Option<Vec<String>>,
54
55    /// Whether to copy installer binary packages.
56    pub installer_binary_packages_copy: Option<bool>,
57
58    /// Filter of architectures of installer binary packages to copy.
59    ///
60    /// If not defined, all architectures will be copied if installer binary packages
61    /// are being copied.
62    pub installer_binary_packages_only_architectures: Option<Vec<String>>,
63
64    /// Whether to copy source packages.
65    pub sources_copy: Option<bool>,
66}
67
68struct GenericCopy {
69    source_path: String,
70    dest_path: String,
71    expected_content: Option<(u64, ContentDigest)>,
72}
73
74/// Entity for copying Debian repository content.
75///
76/// Instances of this type can be used to copy select Debian repository content
77/// between a reader and a writer.
78///
79/// The file layout and content is preserved, so existing PGP signatures can be preserved.
80/// However, the copier does have the ability to selectively filter which files are copied.
81/// So the destination repository may reference content that doesn't exist in that location.
82///
83/// Because repositories do not have a standardized mechanism for discovering dists/releases
84/// within, this type must be told which distributions to copy. Copying is performed 1
85/// distribution at a time.
86///
87/// By default, instances copy all copyable content. Installer files are currently not
88/// supported. Incomplete copies for all other files is considered a bug and should be
89/// reported.
90///
91/// Various `set_*` methods exist to control the copying behavior.
92pub struct RepositoryCopier {
93    /// Filter of components that should be copied.
94    only_components: Option<Vec<String>>,
95
96    /// Whether to copy non-installer binary packages.
97    binary_packages_copy: bool,
98    /// Filter of architectures of binary packages to copy.
99    binary_packages_only_arches: Option<Vec<String>>,
100
101    /// Whether to copy installer binary packages.
102    installer_binary_packages_copy: bool,
103    /// Filter of architectures of installer binary packages to copy.
104    installer_binary_packages_only_arches: Option<Vec<String>>,
105
106    /// Whether to copy source packages.
107    sources_copy: bool,
108
109    /// Whether to copy installers files.
110    installers_copy: bool,
111    /// Filter of architectures of installers to copy.
112    #[allow(unused)]
113    installers_only_arches: Option<Vec<String>>,
114}
115
116impl Default for RepositoryCopier {
117    fn default() -> Self {
118        Self {
119            only_components: None,
120            binary_packages_copy: true,
121            binary_packages_only_arches: None,
122            installer_binary_packages_copy: true,
123            installer_binary_packages_only_arches: None,
124            sources_copy: true,
125            // TODO enable once implemented
126            installers_copy: false,
127            installers_only_arches: None,
128        }
129    }
130}
131
132impl RepositoryCopier {
133    /// Set an explicit list of components whose files to copy.
134    pub fn set_only_components(&mut self, components: impl Iterator<Item = String>) {
135        self.only_components = Some(components.collect());
136    }
137
138    /// Set whether to copy non-installer binary packages.
139    pub fn set_binary_packages_copy(&mut self, value: bool) {
140        self.binary_packages_copy = value;
141    }
142
143    /// Set a filter for architectures of non-installer binary packages to copy.
144    ///
145    /// Binary packages for architectures not in this set will be ignored.
146    pub fn set_binary_packages_only_arches(&mut self, value: impl Iterator<Item = String>) {
147        self.binary_packages_only_arches = Some(value.collect::<Vec<_>>());
148    }
149
150    /// Set whether to copy installer binary packages.
151    pub fn set_installer_binary_packages_copy(&mut self, value: bool) {
152        self.installer_binary_packages_copy = value;
153    }
154
155    /// Set a filter for architectures of installer binary packages to copy.
156    ///
157    /// Binary packages for architectures not in this set will be ignored.
158    pub fn set_installer_binary_packages_only_arches(
159        &mut self,
160        value: impl Iterator<Item = String>,
161    ) {
162        self.installer_binary_packages_only_arches = Some(value.collect::<Vec<_>>());
163    }
164
165    /// Set whether to copy sources package files.
166    pub fn set_sources_copy(&mut self, value: bool) {
167        self.sources_copy = value;
168    }
169
170    /// Perform a copy operation as defined by a [RepositoryCopierConfig].
171    pub async fn copy_from_config(
172        config: RepositoryCopierConfig,
173        max_copy_operations: usize,
174        progress_cb: &Option<Box<dyn Fn(PublishEvent) + Sync>>,
175    ) -> Result<()> {
176        let root_reader = reader_from_str(config.source_url)?;
177        let writer = writer_from_str(config.destination_url).await?;
178
179        let mut copier = Self::default();
180
181        if let Some(v) = config.only_components {
182            copier.set_only_components(v.into_iter());
183        }
184        if let Some(v) = config.binary_packages_copy {
185            copier.set_binary_packages_copy(v);
186        }
187        if let Some(v) = config.binary_packages_only_architectures {
188            copier.set_binary_packages_only_arches(v.into_iter());
189        }
190        if let Some(v) = config.installer_binary_packages_copy {
191            copier.set_installer_binary_packages_copy(v);
192        }
193        if let Some(v) = config.installer_binary_packages_only_architectures {
194            copier.set_installer_binary_packages_only_arches(v.into_iter());
195        }
196        if let Some(v) = config.sources_copy {
197            copier.set_sources_copy(v);
198        }
199
200        for dist in config.distributions {
201            copier
202                .copy_distribution(
203                    root_reader.as_ref(),
204                    writer.as_ref(),
205                    &dist,
206                    max_copy_operations,
207                    progress_cb,
208                )
209                .await?;
210        }
211        for path in config.distribution_paths {
212            copier
213                .copy_distribution_path(
214                    root_reader.as_ref(),
215                    writer.as_ref(),
216                    &path,
217                    max_copy_operations,
218                    progress_cb,
219                )
220                .await?;
221        }
222
223        Ok(())
224    }
225
226    /// Copy content for a given distribution given a distribution name.
227    ///
228    /// This is a proxy for [Self::copy_distribution_path()] which simply passes
229    /// `dists/{distribution}` as the path value. This is the standard layout for Debian
230    /// repositories.
231    pub async fn copy_distribution(
232        &self,
233        root_reader: &dyn RepositoryRootReader,
234        writer: &dyn RepositoryWriter,
235        distribution: &str,
236        max_copy_operations: usize,
237        progress_cb: &Option<Box<dyn Fn(PublishEvent) + Sync>>,
238    ) -> Result<()> {
239        self.copy_distribution_path(
240            root_reader,
241            writer,
242            &format!("dists/{}", distribution),
243            max_copy_operations,
244            progress_cb,
245        )
246        .await
247    }
248
249    /// Copy content for a given distribution at a path relative to the repository root.
250    ///
251    /// The given `distribution_path` is usually prefixed with `dists/`. e.g. `dists/bullseye`.
252    /// But it can be something else for non-standard repository layouts.
253    pub async fn copy_distribution_path(
254        &self,
255        root_reader: &dyn RepositoryRootReader,
256        writer: &dyn RepositoryWriter,
257        distribution_path: &str,
258        max_copy_operations: usize,
259        progress_cb: &Option<Box<dyn Fn(PublishEvent) + Sync>>,
260    ) -> Result<()> {
261        let release = root_reader
262            .release_reader_with_distribution_path(distribution_path)
263            .await?;
264
265        // We copy all the pool artifacts first because otherwise a client could fetch an indices
266        // file referring to a pool file that isn't available yet.
267
268        if self.binary_packages_copy {
269            if let Some(cb) = progress_cb {
270                cb(PublishEvent::CopyPhaseBegin(CopyPhase::BinaryPackages));
271            }
272            self.copy_binary_packages(
273                root_reader,
274                writer,
275                release.as_ref(),
276                false,
277                max_copy_operations,
278                progress_cb,
279            )
280            .await?;
281            if let Some(cb) = progress_cb {
282                cb(PublishEvent::CopyPhaseEnd(CopyPhase::BinaryPackages));
283            }
284        }
285
286        if self.installer_binary_packages_copy {
287            if let Some(cb) = progress_cb {
288                cb(PublishEvent::CopyPhaseBegin(
289                    CopyPhase::InstallerBinaryPackages,
290                ));
291            }
292            self.copy_binary_packages(
293                root_reader,
294                writer,
295                release.as_ref(),
296                true,
297                max_copy_operations,
298                progress_cb,
299            )
300            .await?;
301            if let Some(cb) = progress_cb {
302                cb(PublishEvent::CopyPhaseEnd(
303                    CopyPhase::InstallerBinaryPackages,
304                ));
305            }
306        }
307
308        if self.sources_copy {
309            if let Some(cb) = progress_cb {
310                cb(PublishEvent::CopyPhaseBegin(CopyPhase::Sources));
311            }
312            self.copy_source_packages(
313                root_reader,
314                writer,
315                release.as_ref(),
316                max_copy_operations,
317                progress_cb,
318            )
319            .await?;
320            if let Some(cb) = progress_cb {
321                cb(PublishEvent::CopyPhaseEnd(CopyPhase::Sources));
322            }
323        }
324
325        if self.installers_copy {
326            if let Some(cb) = progress_cb {
327                cb(PublishEvent::CopyPhaseBegin(CopyPhase::Installers));
328            }
329            self.copy_installers(
330                root_reader,
331                writer,
332                release.as_ref(),
333                max_copy_operations,
334                progress_cb,
335            )
336            .await?;
337            if let Some(cb) = progress_cb {
338                cb(PublishEvent::CopyPhaseEnd(CopyPhase::Installers));
339            }
340        }
341
342        // All the pool artifacts are in place. Publish the indices files.
343
344        if let Some(cb) = progress_cb {
345            cb(PublishEvent::CopyPhaseBegin(CopyPhase::ReleaseIndices));
346        }
347        self.copy_release_indices(
348            root_reader,
349            writer,
350            release.as_ref(),
351            max_copy_operations,
352            progress_cb,
353        )
354        .await?;
355        if let Some(cb) = progress_cb {
356            cb(PublishEvent::CopyPhaseEnd(CopyPhase::ReleaseIndices));
357        }
358
359        // And finally publish the Release files.
360        if let Some(cb) = progress_cb {
361            cb(PublishEvent::CopyPhaseBegin(CopyPhase::ReleaseFiles));
362        }
363        self.copy_release_files(
364            root_reader,
365            writer,
366            distribution_path,
367            max_copy_operations,
368            progress_cb,
369        )
370        .await?;
371        if let Some(cb) = progress_cb {
372            cb(PublishEvent::CopyPhaseEnd(CopyPhase::ReleaseFiles));
373        }
374
375        Ok(())
376    }
377
378    async fn copy_binary_packages(
379        &self,
380        root_reader: &dyn RepositoryRootReader,
381        writer: &dyn RepositoryWriter,
382        release: &dyn ReleaseReader,
383        installer_packages: bool,
384        max_copy_operations: usize,
385        progress_cb: &Option<Box<dyn Fn(PublishEvent) + Sync>>,
386    ) -> Result<()> {
387        let only_arches = if installer_packages {
388            self.installer_binary_packages_only_arches.clone()
389        } else {
390            self.binary_packages_only_arches.clone()
391        };
392        let only_components = self.only_components.clone();
393
394        let copies = release
395            .resolve_package_fetches(
396                Box::new(move |entry| {
397                    let component_allowed = if let Some(only_components) = &only_components {
398                        only_components.contains(&entry.component.to_string())
399                    } else {
400                        true
401                    };
402
403                    let arch_allowed = if let Some(only_arches) = &only_arches {
404                        only_arches.contains(&entry.architecture.to_string())
405                    } else {
406                        true
407                    };
408
409                    component_allowed && arch_allowed && entry.is_installer == installer_packages
410                }),
411                Box::new(move |_| true),
412                max_copy_operations,
413            )
414            .await?
415            .into_iter()
416            .map(|bpf| GenericCopy {
417                source_path: bpf.path.clone(),
418                dest_path: bpf.path,
419                expected_content: Some((bpf.size, bpf.digest)),
420            })
421            .collect::<Vec<_>>();
422
423        perform_copies(
424            root_reader,
425            writer,
426            copies,
427            max_copy_operations,
428            false,
429            progress_cb,
430        )
431        .await?;
432
433        Ok(())
434    }
435
436    async fn copy_source_packages(
437        &self,
438        root_reader: &dyn RepositoryRootReader,
439        writer: &dyn RepositoryWriter,
440        release: &dyn ReleaseReader,
441        max_copy_operations: usize,
442        progress_cb: &Option<Box<dyn Fn(PublishEvent) + Sync>>,
443    ) -> Result<()> {
444        let only_components = self.only_components.clone();
445
446        let copies = release
447            .resolve_source_fetches(
448                Box::new(move |entry| {
449                    if let Some(only_components) = &only_components {
450                        only_components.contains(&entry.component.to_string())
451                    } else {
452                        true
453                    }
454                }),
455                Box::new(move |_| true),
456                max_copy_operations,
457            )
458            .await?
459            .into_iter()
460            .map(|spf| GenericCopy {
461                source_path: spf.path.clone(),
462                dest_path: spf.path.clone(),
463                expected_content: Some((spf.size, spf.digest.clone())),
464            })
465            .collect::<Vec<_>>();
466
467        perform_copies(
468            root_reader,
469            writer,
470            copies,
471            max_copy_operations,
472            false,
473            progress_cb,
474        )
475        .await?;
476
477        Ok(())
478    }
479
480    async fn copy_installers(
481        &self,
482        _root_reader: &dyn RepositoryRootReader,
483        _writer: &dyn RepositoryWriter,
484        _release: &dyn ReleaseReader,
485        _max_copy_operations: usize,
486        _progress_cb: &Option<Box<dyn Fn(PublishEvent) + Sync>>,
487    ) -> Result<()> {
488        // Not yet supported since this requires teaching content validating fetching about
489        // optional sizes.
490        todo!();
491    }
492
493    async fn copy_release_indices(
494        &self,
495        root_reader: &dyn RepositoryRootReader,
496        writer: &dyn RepositoryWriter,
497        release: &dyn ReleaseReader,
498        max_copy_operations: usize,
499        progress_cb: &Option<Box<dyn Fn(PublishEvent) + Sync>>,
500    ) -> Result<()> {
501        let by_hash = release.release_file().acquire_by_hash().unwrap_or(false);
502
503        let copies = release
504            .classified_indices_entries()?
505            .into_iter()
506            .filter(|_| {
507                // For now we always copy all the indices files. There is certainly room to filter the
508                // files that are copied. We can do that when we feel like writing the code...
509                true
510            })
511            .map(move |entry| {
512                let path = if by_hash {
513                    entry.by_hash_path()
514                } else {
515                    entry.path.to_string()
516                };
517
518                let path = format!("{}/{}", release.root_relative_path(), path);
519
520                GenericCopy {
521                    source_path: path.clone(),
522                    dest_path: path,
523                    expected_content: Some((entry.size, entry.digest.clone())),
524                }
525            })
526            .collect::<Vec<_>>();
527
528        // Some indices files don't actually exist! For example, the release file will publish
529        // the checksums of the uncompressed file variant but the uncompressed file won't
530        // actually be present in the repository! These errors are OK to ignore. But we still
531        // report on them.
532        perform_copies(
533            root_reader,
534            writer,
535            copies,
536            max_copy_operations,
537            true,
538            progress_cb,
539        )
540        .await?;
541
542        Ok(())
543    }
544
545    async fn copy_release_files(
546        &self,
547        root_reader: &dyn RepositoryRootReader,
548        writer: &dyn RepositoryWriter,
549        distribution_path: &str,
550        max_copy_operations: usize,
551        progress_cb: &Option<Box<dyn Fn(PublishEvent) + Sync>>,
552    ) -> Result<()> {
553        let copies = RELEASE_FILES
554            .iter()
555            .map(|path| {
556                let path = format!("{}/{}", distribution_path, path);
557
558                GenericCopy {
559                    source_path: path.clone(),
560                    dest_path: path,
561                    expected_content: None,
562                }
563            })
564            .collect::<Vec<_>>();
565
566        // Not all the well-known files exist. So ignore missing file errors.
567        // TODO we probably want a hard error if `Release` or `InRelease` fail.
568        perform_copies(
569            root_reader,
570            writer,
571            copies,
572            max_copy_operations,
573            true,
574            progress_cb,
575        )
576        .await?;
577
578        Ok(())
579    }
580}
581
582/// Perform a sequence of copy operations between a reader and writer.
583async fn perform_copies(
584    root_reader: &dyn RepositoryRootReader,
585    writer: &dyn RepositoryWriter,
586    copies: Vec<GenericCopy>,
587    max_copy_operations: usize,
588    allow_not_found: bool,
589    progress_cb: &Option<Box<dyn Fn(PublishEvent) + Sync>>,
590) -> Result<()> {
591    let mut total_size = 0;
592
593    let fs = copies
594        .into_iter()
595        .map(|op| {
596            if let Some((size, _)) = op.expected_content {
597                total_size += size;
598            }
599
600            writer.copy_from(
601                root_reader,
602                op.source_path.into(),
603                op.expected_content,
604                op.dest_path.into(),
605                progress_cb,
606            )
607        })
608        .collect::<Vec<_>>();
609
610    if let Some(cb) = progress_cb {
611        cb(PublishEvent::WriteSequenceBeginWithTotalBytes(total_size));
612    }
613
614    let mut buffered = futures::stream::iter(fs).buffer_unordered(max_copy_operations);
615
616    while let Some(res) = buffered.next().await {
617        match res {
618            Ok(write) => {
619                if let Some(cb) = progress_cb {
620                    cb(PublishEvent::WriteSequenceProgressBytes(
621                        write.bytes_written(),
622                    ));
623
624                    match write {
625                        RepositoryWriteOperation::PathWritten(write) => {
626                            cb(PublishEvent::PathCopied(
627                                write.path.to_string(),
628                                write.bytes_written,
629                            ));
630                        }
631                        RepositoryWriteOperation::Noop(path, _) => {
632                            cb(PublishEvent::PathCopyNoop(path.to_string()));
633                        }
634                    }
635                }
636            }
637            Err(DebianError::RepositoryIoPath(path, err))
638                if allow_not_found && matches!(err.kind(), std::io::ErrorKind::NotFound) =>
639            {
640                if let Some(cb) = progress_cb {
641                    cb(PublishEvent::CopyIndicesPathNotFound(path));
642                }
643            }
644            Err(e) => return Err(e),
645        }
646    }
647
648    if let Some(cb) = progress_cb {
649        cb(PublishEvent::WriteSequenceFinished);
650    }
651
652    Ok(())
653}
654
655#[cfg(test)]
656mod test {
657    use {
658        super::*,
659        crate::repository::{
660            proxy_writer::{ProxyVerifyBehavior, ProxyWriter},
661            sink_writer::SinkWriter,
662        },
663    };
664    #[cfg(feature = "http")]
665    use crate::repository::http::HttpRepositoryClient;
666
667    const DEBIAN_URL: &str = "http://snapshot.debian.org/archive/debian/20211120T085721Z";
668
669    #[tokio::test]
670    #[cfg(feature = "http")]
671    async fn bullseye_copy() -> Result<()> {
672        let root =
673            Box::new(HttpRepositoryClient::new(DEBIAN_URL)?) as Box<dyn RepositoryRootReader>;
674        let mut writer = ProxyWriter::new(SinkWriter::default());
675        writer.set_verify_behavior(ProxyVerifyBehavior::AlwaysExistsIntegrityVerified);
676        let writer: Box<dyn RepositoryWriter> = Box::new(writer);
677
678        let mut copier = RepositoryCopier::default();
679        copier.set_binary_packages_copy(false);
680        copier.set_installer_binary_packages_copy(false);
681        copier.set_sources_copy(false);
682
683        let cb = Box::new(|_| {});
684
685        copier
686            .copy_distribution(root.as_ref(), writer.as_ref(), "bullseye", 8, &Some(cb))
687            .await?;
688
689        Ok(())
690    }
691}