debian_packaging/repository/
mod.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at https://mozilla.org/MPL/2.0/.
4
5/*! Debian repositories.
6
7A Debian repository is a collection of files holding packages and other
8support primitives. See <https://wiki.debian.org/DebianRepository/Format>
9for the canonical definition of a Debian repository.
10
11Here's the concise version.
12
13A Debian repository is a collection of files rooted at a given path or URL.
14In Apt sources files, this is the value after lines beginning with `apt`. e.g.
15`deb http://us.archive.ubuntu.com/ubuntu/ impish main` defines the repository
16rooted at `http://us.archive.ubuntu.com/ubuntu/`.
17
18Under the root are `dists/<distribution>/` directories. Each of these
19directories (`<distribution>` can have `/` within them) has a `Release`
20and/or `InRelease` file. These files serve as the main *index* for a given
21*distribution*. (`InRelease` is the same as `Release` except it has a PGP
22cleartext signature and may be encoded slightly differently as a result.)
23
24Each `[In]Release` file defines metadata for a *distribution* as well as
25provides a manifest of additional files further defining the content of the
26*distribution*. These additional files define available binary packages (what
27you typically install), sources packages, lists of which packages provide which
28filenames, etc.
29
30In addition to *distributions*, a repository also contains a *pool* where
31non-distribution blob data is stored. This typically exists under the
32`pool/` path relative to the repository root.
33
34Since repositories are logically a virtual filesystem and can be backed
35by any key-value blob store, we've abstracted repository I/O through a
36series of traits.
37
38[DataResolver] is a generic trait for providing path/key based I/O.
39
40[RepositoryRootReader] describes an interface for reading from the root of
41a repository. It is used to obtain and parse `[In]Release` files and to read
42from the *pool*.
43
44[ReleaseReader] describes an interface for reading from a *distribution*
45and a parsed `[In]Release` file describing the distribution.
46
47[RepositoryWriter] describes an interface for writing to a repository.
48
49Concrete implementations of repositories exist in submodules. [http]
50provides [http::HttpRepositoryClient], which implements [RepositoryRootReader]
51and serves as the primary HTTP-based client. [filesystem] provides
52[filesystem::FilesystemRepositoryReader] and [filesystem::FilesystemRepositoryWriter]
53for reading and writing repositories using a local filesystem. [s3] provides
54[s3::S3Writer].
55
56A couple of special [RepositoryWriter] exist. [sink_writer::SinkWriter] provides a writer
57that will send its content to a black hole. It can be used for testing writing without
58actually performing writes. [proxy_writer::ProxyWriter] proxies an inner writer and
59can override behavior on certain I/O operations.
60
61Modules like [contents] and [release] define primitives encountered in
62repositories, such as `[In]Release` files.
63
64The [builder] module contains functionality for creating/publishing
65repositories.
66*/
67
68use std::fmt::Formatter;
69use {
70    crate::{
71        binary_package_control::BinaryPackageControlFile,
72        binary_package_list::BinaryPackageList,
73        control::ControlParagraphAsyncReader,
74        deb::reader::BinaryPackageReader,
75        debian_source_control::{DebianSourceControlFile, DebianSourceControlFileFetch},
76        debian_source_package_list::DebianSourcePackageList,
77        error::{DebianError, Result},
78        io::{drain_reader, Compression, ContentDigest, DataResolver},
79        repository::{
80            contents::{ContentsFile, ContentsFileAsyncReader},
81            release::{
82                ChecksumType, ClassifiedReleaseFileEntry, ContentsFileEntry, PackagesFileEntry,
83                ReleaseFile, SourcesFileEntry,
84            },
85        },
86    },
87    async_trait::async_trait,
88    futures::{AsyncRead, AsyncReadExt, StreamExt, TryStreamExt},
89    std::{borrow::Cow, collections::HashMap, ops::Deref, pin::Pin, str::FromStr},
90};
91
92pub mod builder;
93pub mod contents;
94pub mod copier;
95pub mod filesystem;
96#[cfg(feature = "http")]
97pub mod http;
98pub mod proxy_writer;
99pub mod release;
100#[cfg(feature = "s3")]
101pub mod s3;
102pub mod sink_writer;
103
104/// Describes how to fetch a binary package from a repository.
105#[derive(Clone, Debug)]
106pub struct BinaryPackageFetch<'a> {
107    /// The binary package control paragraph from which this entry came.
108    pub control_file: BinaryPackageControlFile<'a>,
109    /// The relative path of this binary package.
110    ///
111    /// Corresponds to the `Filename` field.
112    pub path: String,
113    /// The expected size of the retrieved file.
114    pub size: u64,
115    /// The expected content digest of the retrieved file.
116    pub digest: ContentDigest,
117}
118
119/// Describes how to fetch a source package from a repository.
120pub struct SourcePackageFetch<'a> {
121    /// The control file from which this these fetches were derived.
122    pub control_file: DebianSourceControlFile<'a>,
123    /// Fetch instruction for a file in this package.
124    fetch: DebianSourceControlFileFetch,
125}
126
127impl<'a> Deref for SourcePackageFetch<'a> {
128    type Target = DebianSourceControlFileFetch;
129
130    fn deref(&self) -> &Self::Target {
131        &self.fetch
132    }
133}
134
135/// Debian repository reader bound to the root of the repository.
136///
137/// This trait facilitates access to *pool* as well as to multiple
138/// *releases* within the repository.
139#[async_trait]
140pub trait RepositoryRootReader: DataResolver + Sync {
141    /// Obtain the URL to which this reader is bound.  
142    fn url(&self) -> Result<url::Url>;
143
144    /// Obtain a [ReleaseReader] for a given distribution.
145    ///
146    /// This assumes either an `InRelease` or `Release` file is located in `dists/{distribution}/`.
147    /// This is the case for most repositories.
148    async fn release_reader(&self, distribution: &str) -> Result<Box<dyn ReleaseReader>> {
149        self.release_reader_with_distribution_path(&format!(
150            "dists/{}",
151            distribution.trim_matches('/')
152        ))
153        .await
154    }
155
156    /// Obtain a [ReleaseReader] given a distribution path.
157    ///
158    /// Typically distributions exist at `dists/<distribution>/`. However, this may not
159    /// always be the case. This method allows explicitly passing in the relative path
160    /// holding the `InRelease` file.
161    async fn release_reader_with_distribution_path(
162        &self,
163        path: &str,
164    ) -> Result<Box<dyn ReleaseReader>>;
165
166    /// Fetch and parse an `InRelease` file at the relative path specified.
167    ///
168    /// `path` is typically a value like `dists/<distribution>/InRelease`. e.g.
169    /// `dists/bullseye/InRelease`.
170    ///
171    /// The default implementation of this trait should be sufficient for most types.
172    async fn fetch_inrelease(&self, path: &str) -> Result<ReleaseFile<'static>> {
173        let mut reader = self.get_path(path).await?;
174
175        let mut data = vec![];
176        reader.read_to_end(&mut data).await?;
177
178        Ok(ReleaseFile::from_armored_reader(std::io::Cursor::new(
179            data,
180        ))?)
181    }
182
183    /// Fetch and parse an `Release` file at the relative path specified.
184    ///
185    /// `path` is typically a value like `dists/<distribution>/Release`. e.g.
186    /// `dists/bullseye/Release`.
187    ///
188    /// The default implementation of this trait should be sufficient for most types.
189    async fn fetch_release(&self, path: &str) -> Result<ReleaseFile<'static>> {
190        let mut reader = self.get_path(path).await?;
191
192        let mut data = vec![];
193        reader.read_to_end(&mut data).await?;
194
195        Ok(ReleaseFile::from_reader(std::io::Cursor::new(data))?)
196    }
197    /// Fetch and parse either an `InRelease` or `Release` file at the relative path specified.
198    ///
199    /// First attempt to use the more modern `InRelease` file, fall back to `Release`
200    ///
201    /// The default implementation of this trait should be sufficient for most types.
202    async fn fetch_inrelease_or_release(
203        &self,
204        inrelease_path: &str,
205        release_path: &str,
206    ) -> Result<ReleaseFile<'static>> {
207        match self.fetch_inrelease(inrelease_path).await {
208            Ok(release) => Ok(release),
209            Err(DebianError::RepositoryIoPath(_, e))
210                if e.kind() == std::io::ErrorKind::NotFound =>
211            {
212                self.fetch_release(release_path).await
213            }
214            Err(e) => Err(e),
215        }
216    }
217
218    /// Fetch a binary package given a [BinaryPackageFetch] instruction.
219    ///
220    /// Returns a generic [AsyncRead] to obtain the raw file content.
221    async fn fetch_binary_package_generic<'fetch>(
222        &self,
223        fetch: BinaryPackageFetch<'fetch>,
224    ) -> Result<Pin<Box<dyn AsyncRead + Send>>> {
225        self.get_path_with_digest_verification(&fetch.path, fetch.size, fetch.digest)
226            .await
227    }
228
229    /// Fetch a binary package given a [BinaryPackageFetch] instruction.
230    ///
231    /// Returns a [BinaryPackageReader] capable of parsing the package.
232    ///
233    /// Due to limitations in [BinaryPackageReader], the entire package content is buffered
234    /// in memory and isn't read lazily.
235    async fn fetch_binary_package_deb_reader<'fetch>(
236        &self,
237        fetch: BinaryPackageFetch<'fetch>,
238    ) -> Result<BinaryPackageReader<std::io::Cursor<Vec<u8>>>> {
239        let mut reader = self.fetch_binary_package_generic(fetch).await?;
240        // TODO implement an async reader.
241        let mut buf = vec![];
242        reader.read_to_end(&mut buf).await?;
243
244        Ok(BinaryPackageReader::new(std::io::Cursor::new(buf))?)
245    }
246
247    /// Fetch a source package file given a [SourcePackageFetch] instruction.
248    ///
249    /// Returns a generic [AsyncRead] to obtain the raw file content.
250    async fn fetch_source_package_generic<'fetch>(
251        &self,
252        fetch: SourcePackageFetch<'fetch>,
253    ) -> Result<Pin<Box<dyn AsyncRead + Send>>> {
254        self.get_path_with_digest_verification(&fetch.path, fetch.size, fetch.digest.clone())
255            .await
256    }
257}
258
259/// Provides a transport-agnostic mechanism for reading from a parsed `[In]Release` file.
260#[async_trait]
261pub trait ReleaseReader: DataResolver + Sync {
262    /// Obtain the base URL to which this instance is bound.
263    fn url(&self) -> Result<url::Url>;
264
265    /// Obtain the path relative to the repository root this instance is bound to.
266    ///
267    /// e.g. `dists/bullseye`.
268    ///
269    /// Implementations must not return a string with a leading or trailing `/`.
270    fn root_relative_path(&self) -> &str;
271
272    /// Obtain the parsed `[In]Release` file from which this reader is derived.
273    fn release_file(&self) -> &ReleaseFile<'_>;
274
275    /// Obtain the checksum flavor of content to retrieve.
276    ///
277    /// By default, this will prefer the strongest known checksum advertised in the
278    /// release file.
279    fn retrieve_checksum(&self) -> Result<ChecksumType> {
280        let release = self.release_file();
281
282        let checksum = &[ChecksumType::Sha256, ChecksumType::Sha1, ChecksumType::Md5]
283            .iter()
284            .find(|variant| release.field(variant.field_name()).is_some())
285            .ok_or(DebianError::RepositoryReadReleaseNoKnownChecksum)?;
286
287        Ok(**checksum)
288    }
289
290    /// Obtain the preferred compression format to retrieve index files in.
291    fn preferred_compression(&self) -> Compression;
292
293    /// Set the preferred compression format for retrieved index files.
294    ///
295    /// Index files are often published in multiple compression formats, including no
296    /// compression. This function can be used to instruct the reader which compression
297    /// format to prefer.
298    fn set_preferred_compression(&mut self, compression: Compression);
299
300    /// Obtain [ClassifiedReleaseFileEntry] within the parsed `Release` file.
301    fn classified_indices_entries(&self) -> Result<Vec<ClassifiedReleaseFileEntry<'_>>> {
302        self.release_file()
303            .iter_classified_index_files(self.retrieve_checksum()?)
304            .ok_or(DebianError::ReleaseNoIndicesFiles)?
305            .collect::<Result<Vec<_>>>()
306    }
307
308    /// Obtain parsed `Packages` file entries within this Release file.
309    ///
310    /// Only entries for the checksum as defined by [Self::retrieve_checksum()] are returned.
311    ///
312    /// There may be multiple entries for a given logical `Packages` file corresponding
313    /// to different compression formats. Use [Self::packages_entry()] to resolve the entry
314    /// for the `Packages` file for the preferred configuration.
315    fn packages_indices_entries(&self) -> Result<Vec<PackagesFileEntry<'_>>> {
316        Ok(
317            if let Some(entries) = self
318                .release_file()
319                .iter_packages_indices(self.retrieve_checksum()?)
320            {
321                entries.collect::<Result<Vec<_>>>()?
322            } else {
323                vec![]
324            },
325        )
326    }
327
328    /// Like [Self::packages_indices_entries()] except it deduplicates entries.
329    ///
330    /// If there are multiple entries for a `Packages` file with varying compression, the most
331    /// preferred compression format is returned.
332    fn packages_indices_entries_preferred_compression(&self) -> Result<Vec<PackagesFileEntry<'_>>> {
333        let mut entries = HashMap::new();
334
335        for entry in self.packages_indices_entries()? {
336            entries
337                .entry((
338                    entry.component.clone(),
339                    entry.architecture.clone(),
340                    entry.is_installer,
341                ))
342                .or_insert_with(Vec::new)
343                .push(entry);
344        }
345
346        entries
347            .into_values()
348            .map(|candidates| {
349                if let Some(entry) = candidates
350                    .iter()
351                    .find(|entry| entry.compression == self.preferred_compression())
352                {
353                    Ok(entry.clone())
354                } else {
355                    for compression in Compression::default_preferred_order() {
356                        if let Some(entry) = candidates
357                            .iter()
358                            .find(|entry| entry.compression == compression)
359                        {
360                            return Ok(entry.clone());
361                        }
362                    }
363
364                    Err(DebianError::RepositoryReadPackagesIndicesEntryNotFound)
365                }
366            })
367            .collect::<Result<Vec<_>>>()
368    }
369
370    /// Resolve indices for `Contents` files.
371    ///
372    /// Only entries for the checksum as defined by [Self::retrieve_checksum()] are returned.
373    ///
374    /// Multiple entries for the same logical file with varying compression formats may be
375    /// returned.
376    fn contents_indices_entries(&self) -> Result<Vec<ContentsFileEntry<'_>>> {
377        Ok(
378            if let Some(entries) = self
379                .release_file()
380                .iter_contents_indices(self.retrieve_checksum()?)
381            {
382                entries.collect::<Result<Vec<_>>>()?
383            } else {
384                vec![]
385            },
386        )
387    }
388
389    /// Resolve indices for `Sources` file.
390    ///
391    /// Only entries for the checksum as defined by [Self::retrieve_checksum()] are returned.
392    ///
393    /// Multiple entries for the same logical file with varying compression formats may be
394    /// returned.
395    fn sources_indices_entries(&self) -> Result<Vec<SourcesFileEntry<'_>>> {
396        Ok(
397            if let Some(entries) = self
398                .release_file()
399                .iter_sources_indices(self.retrieve_checksum()?)
400            {
401                entries.collect::<Result<Vec<_>>>()?
402            } else {
403                vec![]
404            },
405        )
406    }
407
408    /// Like [Self::sources_indices_entries] except it deduplicates entries.
409    ///
410    /// If there are multiple entries for a `Sources` file with varying compression, the most
411    /// preferred compression format is returned.
412    fn sources_indices_entries_preferred_compression(&self) -> Result<Vec<SourcesFileEntry<'_>>> {
413        let mut entries = HashMap::new();
414
415        for entry in self.sources_indices_entries()? {
416            entries
417                .entry(entry.component.clone())
418                .or_insert_with(Vec::new)
419                .push(entry);
420        }
421
422        entries
423            .into_values()
424            .map(|candidates| {
425                if let Some(entry) = candidates
426                    .iter()
427                    .find(|entry| entry.compression == self.preferred_compression())
428                {
429                    Ok(entry.clone())
430                } else {
431                    for compression in Compression::default_preferred_order() {
432                        if let Some(entry) = candidates
433                            .iter()
434                            .find(|entry| entry.compression == compression)
435                        {
436                            return Ok(entry.clone());
437                        }
438                    }
439
440                    Err(DebianError::RepositoryReadPackagesIndicesEntryNotFound)
441                }
442            })
443            .collect::<Result<Vec<_>>>()
444    }
445
446    /// Resolve a reference to a `Packages` file to fetch given search criteria.
447    ///
448    /// This will find all entries defining the desired `Packages` file. It will filter
449    /// through the [ChecksumType] as defined by [Self::retrieve_checksum()] and will prioritize
450    /// the compression format according to [Self::preferred_compression()].
451    fn packages_entry(
452        &self,
453        component: &str,
454        architecture: &str,
455        is_installer: bool,
456    ) -> Result<PackagesFileEntry<'_>> {
457        self.packages_indices_entries_preferred_compression()?
458            .into_iter()
459            .find(|entry| {
460                entry.component == component
461                    && entry.architecture == architecture
462                    && entry.is_installer == is_installer
463            })
464            .ok_or(DebianError::RepositoryReadPackagesIndicesEntryNotFound)
465    }
466
467    /// Fetch and parse a `Packages` file described by a [PackagesFileEntry].
468    async fn resolve_packages_from_entry<'entry, 'slf: 'entry>(
469        &'slf self,
470        entry: &'entry PackagesFileEntry<'slf>,
471    ) -> Result<BinaryPackageList<'static>> {
472        let release = self.release_file();
473
474        let path = if release.acquire_by_hash().unwrap_or_default() {
475            entry.by_hash_path()
476        } else {
477            entry.path.to_string()
478        };
479
480        let mut reader = ControlParagraphAsyncReader::new(futures::io::BufReader::new(
481            self.get_path_decoded_with_digest_verification(
482                &path,
483                entry.compression,
484                entry.size,
485                entry.digest.clone(),
486            )
487            .await?,
488        ));
489
490        let mut res = BinaryPackageList::default();
491
492        while let Some(paragraph) = reader.read_paragraph().await? {
493            res.push(BinaryPackageControlFile::from(paragraph));
494        }
495
496        Ok(res)
497    }
498
499    /// Resolve packages given parameters to resolve a `Packages` file.
500    async fn resolve_packages(
501        &self,
502        component: &str,
503        arch: &str,
504        is_installer: bool,
505    ) -> Result<BinaryPackageList<'static>> {
506        let entry = self.packages_entry(component, arch, is_installer)?;
507
508        self.resolve_packages_from_entry(&entry).await
509    }
510
511    /// Retrieve fetch instructions for binary packages.
512    ///
513    /// The caller can specify a filter function to choose which packages to retrieve.
514    /// Filtering works in 2 stages.
515    ///
516    /// First, `packages_file_filter` is called with each [PackagesFileEntry] defining
517    /// a `Packages*` file. If the filter returns true, this list of packages will be
518    /// retrieved and expanded.
519    ///
520    /// Second, `binary_package_filter` is called for each binary package entry seen
521    /// in parsed `Packages*` files. If the function returns true, this binary package
522    /// will be retrieved.
523    ///
524    /// The emitted values can be fed into [RepositoryRootReader::fetch_binary_package_generic()]
525    /// and [RepositoryRootReader::fetch_binary_package_deb_reader()] to fetch the binary package
526    /// content.
527    async fn resolve_package_fetches(
528        &self,
529        packages_file_filter: Box<dyn (Fn(PackagesFileEntry) -> bool) + Send>,
530        binary_package_filter: Box<dyn (Fn(BinaryPackageControlFile) -> bool) + Send>,
531        threads: usize,
532    ) -> Result<Vec<BinaryPackageFetch<'_>>> {
533        let packages_entries = self.packages_indices_entries_preferred_compression()?;
534
535        let fs = packages_entries
536            .iter()
537            .filter(|entry| packages_file_filter((*entry).clone()))
538            .map(|entry| self.resolve_packages_from_entry(entry))
539            .collect::<Vec<_>>();
540
541        let mut packages_fs = futures::stream::iter(fs).buffer_unordered(threads);
542
543        let mut fetches = vec![];
544
545        while let Some(pl) = packages_fs.try_next().await? {
546            for cf in pl.into_iter() {
547                // Needed by IDE for type hinting for some reason.
548                let cf: BinaryPackageControlFile = cf;
549
550                if binary_package_filter(cf.clone()) {
551                    let path = cf.required_field_str("Filename")?.to_string();
552
553                    let size = cf.field_u64("Size").ok_or_else(|| {
554                        DebianError::ControlRequiredFieldMissing("Size".to_string())
555                    })??;
556
557                    let digest = ChecksumType::preferred_order()
558                        .find_map(|checksum| {
559                            cf.field_str(checksum.field_name()).map(|hex_digest| {
560                                ContentDigest::from_hex_digest(checksum, hex_digest)
561                            })
562                        })
563                        .ok_or(DebianError::RepositoryReadCouldNotDeterminePackageDigest)??;
564
565                    fetches.push(BinaryPackageFetch {
566                        control_file: cf,
567                        path,
568                        size,
569                        digest,
570                    });
571                }
572            }
573        }
574
575        Ok(fetches)
576    }
577
578    /// Resolve the [SourcesFileEntry] for a given component.
579    ///
580    /// This returns the entry variant that is preferred given digest and compression
581    /// settings. If no entry is found, [DebianError::RepositoryReadSourcesIndicesEntryNotFound]
582    /// is returned.
583    fn sources_entry(&self, component: &str) -> Result<SourcesFileEntry<'_>> {
584        self.sources_indices_entries_preferred_compression()?
585            .into_iter()
586            .find(|entry| entry.component == component)
587            .ok_or(DebianError::RepositoryReadSourcesIndicesEntryNotFound)
588    }
589
590    /// Fetch a `Sources` file and parse source package entries inside.
591    ///
592    /// The file to fetch is specified from a [SourcesFileEntry] describing it.
593    async fn resolve_sources_from_entry<'entry, 'slf: 'entry>(
594        &'slf self,
595        entry: &'entry SourcesFileEntry<'slf>,
596    ) -> Result<DebianSourcePackageList<'static>> {
597        let release = self.release_file();
598
599        let path = if release.acquire_by_hash().unwrap_or_default() {
600            entry.by_hash_path()
601        } else {
602            entry.path.to_string()
603        };
604
605        let mut reader = ControlParagraphAsyncReader::new(futures::io::BufReader::new(
606            self.get_path_decoded_with_digest_verification(
607                &path,
608                entry.compression,
609                entry.size,
610                entry.digest.clone(),
611            )
612            .await?,
613        ));
614
615        let mut res = DebianSourcePackageList::default();
616
617        while let Some(paragraph) = reader.read_paragraph().await? {
618            res.push(paragraph.into());
619        }
620
621        Ok(res)
622    }
623
624    /// Fetch a `Sources` file for the given component and parse source package entries inside.
625    ///
626    /// This will call [Self::sources_entry] to resolve the [SourcesFileEntry] for the given
627    /// `component` then will call [Self::resolve_sources_from_entry] to fetch and parse it.
628    async fn resolve_sources(&self, component: &str) -> Result<DebianSourcePackageList<'static>> {
629        let entry = self.sources_entry(component)?;
630
631        self.resolve_sources_from_entry(&entry).await
632    }
633
634    /// Resolves [SourcePackageFetch] for describing files to fetch for source packages.
635    ///
636    /// The caller specifies filter functions to choose which source packages' files to
637    /// retrieve. Filtering works in 2 stages.
638    ///
639    /// First, `sources_filter_filter` is called with each [SourcesFileEntry] defining a
640    /// `Sources` file. If the filter returns true, this list of packages will be retrieved
641    /// and expanded.
642    ///
643    /// Second, `source_package_filter` is called for each source package entry seen in
644    /// parsed `Sources` files. If the function returns true, the instructions for fetching
645    /// the files comprising this source package will returned.
646    ///
647    /// The returned [SourcePackageFetch] can be fed into
648    /// [RepositoryRootReader::fetch_source_package_generic()] to retrieve the file content.
649    async fn resolve_source_fetches(
650        &self,
651        sources_file_filter: Box<dyn (Fn(SourcesFileEntry) -> bool) + Send>,
652        source_package_filter: Box<dyn (Fn(DebianSourceControlFile) -> bool) + Send>,
653        threads: usize,
654    ) -> Result<Vec<SourcePackageFetch<'_>>> {
655        let sources_entries = self.sources_indices_entries_preferred_compression()?;
656
657        let fs = sources_entries
658            .iter()
659            .filter(|entry| sources_file_filter((*entry).clone()))
660            .map(|entry| self.resolve_sources_from_entry(entry))
661            .collect::<Vec<_>>();
662
663        let mut sources_fs = futures::stream::iter(fs).buffer_unordered(threads);
664
665        let mut fetches = vec![];
666
667        while let Some(pl) = sources_fs.try_next().await? {
668            for cf in pl.into_iter() {
669                if source_package_filter(cf.clone_no_signatures()) {
670                    for fetch in cf.file_fetches(self.retrieve_checksum()?)? {
671                        let fetch = fetch?;
672
673                        fetches.push(SourcePackageFetch {
674                            control_file: cf.clone_no_signatures(),
675                            fetch,
676                        });
677                    }
678                }
679            }
680        }
681
682        Ok(fetches)
683    }
684
685    /// Resolve a reference to a `Contents` file to fetch given search criteria.
686    ///
687    /// This will attempt to find the entry for a `Contents` file given search criteria.
688    fn contents_entry(
689        &self,
690        component: Option<&str>,
691        architecture: &str,
692        is_installer: bool,
693    ) -> Result<ContentsFileEntry> {
694        let component = component.map(Cow::from);
695
696        let entries = self
697            .contents_indices_entries()?
698            .into_iter()
699            .filter(|entry| {
700                entry.component == component
701                    && entry.architecture == architecture
702                    && entry.is_installer == is_installer
703            })
704            .collect::<Vec<_>>();
705
706        if let Some(entry) = entries
707            .iter()
708            .find(|entry| entry.compression == self.preferred_compression())
709        {
710            Ok(entry.clone())
711        } else {
712            for compression in Compression::default_preferred_order() {
713                if let Some(entry) = entries
714                    .iter()
715                    .find(|entry| entry.compression == compression)
716                {
717                    return Ok(entry.clone());
718                }
719            }
720
721            Err(DebianError::RepositoryReadContentsIndicesEntryNotFound)
722        }
723    }
724
725    async fn resolve_contents(
726        &self,
727        component: Option<&str>,
728        architecture: &str,
729        is_installer: bool,
730    ) -> Result<ContentsFile> {
731        let release = self.release_file();
732        let entry = self.contents_entry(component, architecture, is_installer)?;
733
734        let path = if release.acquire_by_hash().unwrap_or_default() {
735            entry.by_hash_path()
736        } else {
737            entry.path.to_string()
738        };
739
740        let reader = self
741            .get_path_decoded_with_digest_verification(
742                &path,
743                entry.compression,
744                entry.size,
745                entry.digest.clone(),
746            )
747            .await?;
748
749        let mut reader = ContentsFileAsyncReader::new(futures::io::BufReader::new(reader));
750        reader.read_all().await?;
751
752        let (contents, reader) = reader.consume();
753
754        drain_reader(reader)
755            .await
756            .map_err(|e| DebianError::RepositoryIoPath(path, e))?;
757
758        Ok(contents)
759    }
760}
761
762/// Describes a repository path verification state.
763#[derive(Clone, Copy, Debug)]
764pub enum RepositoryPathVerificationState {
765    /// The path exists but its integrity was not verified.
766    ExistsNoIntegrityCheck,
767    /// The path exists and its integrity was verified.
768    ExistsIntegrityVerified,
769    /// The path exists and its integrity didn't match expectations.
770    ExistsIntegrityMismatch,
771    /// The path is missing.
772    Missing,
773}
774
775/// Represents the result of a repository path verification check.
776#[derive(Clone, Debug)]
777pub struct RepositoryPathVerification<'a> {
778    /// The path that was tested.
779    pub path: &'a str,
780    /// The state of the path.
781    pub state: RepositoryPathVerificationState,
782}
783
784impl<'a> std::fmt::Display for RepositoryPathVerification<'a> {
785    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
786        match self.state {
787            RepositoryPathVerificationState::ExistsNoIntegrityCheck => {
788                write!(f, "{} exists (no integrity check performed)", self.path)
789            }
790            RepositoryPathVerificationState::ExistsIntegrityVerified => {
791                write!(f, "{} exists (integrity verified)", self.path)
792            }
793            RepositoryPathVerificationState::ExistsIntegrityMismatch => {
794                write!(f, "{} exists (integrity mismatch!)", self.path)
795            }
796            RepositoryPathVerificationState::Missing => {
797                write!(f, "{} missing", self.path)
798            }
799        }
800    }
801}
802
803/// A phase during a repository copy operation.
804#[derive(Clone, Copy, Debug)]
805pub enum CopyPhase {
806    BinaryPackages,
807    InstallerBinaryPackages,
808    Sources,
809    Installers,
810    ReleaseIndices,
811    ReleaseFiles,
812}
813
814impl std::fmt::Display for CopyPhase {
815    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
816        write!(
817            f,
818            "{}",
819            match self {
820                Self::BinaryPackages => "binary packages",
821                Self::InstallerBinaryPackages => "installer binary packages",
822                Self::Sources => "sources",
823                Self::Installers => "installers",
824                Self::ReleaseIndices => "release indices",
825                Self::ReleaseFiles => "release files",
826            }
827        )
828    }
829}
830
831/// Represents a repository publishing event.
832///
833/// Instances are sent to callbacks during repository writing to inform of activity.
834pub enum PublishEvent {
835    ResolvedPoolArtifacts(usize),
836
837    /// A pool artifact with the given path is current and was not updated.
838    PoolArtifactCurrent(String),
839
840    /// A pool artifact with the given path is missing and will be created.
841    PoolArtifactMissing(String),
842
843    /// Total number of pool artifacts to publish.
844    PoolArtifactsToPublish(usize),
845
846    /// A pool artifact with the given path and size was created.
847    PoolArtifactCreated(String, u64),
848
849    /// The path to an index file to write.
850    IndexFileToWrite(String),
851
852    /// An index file that was written.
853    IndexFileWritten(String, u64),
854
855    /// A path is being verified.
856    VerifyingDestinationPath(String),
857
858    /// A phase in a copy operation has begin.
859    CopyPhaseBegin(CopyPhase),
860
861    /// A phase in a copy operation has finished.
862    CopyPhaseEnd(CopyPhase),
863
864    /// Copying a path from a source to a destination.
865    CopyingPath(String, String),
866
867    /// Copying an indices file but the source wasn't found.
868    CopyIndicesPathNotFound(String),
869
870    /// A path was copied.
871    PathCopied(String, u64),
872
873    /// A path copy was a no-op.
874    PathCopyNoop(String),
875
876    /// Begin a write sequence where we will write N total bytes.
877    WriteSequenceBeginWithTotalBytes(u64),
878
879    /// Report that N bytes have been written as part of a write operation.
880    WriteSequenceProgressBytes(u64),
881
882    /// Report the conclusion of a logical write sequence.
883    WriteSequenceFinished,
884}
885
886impl std::fmt::Display for PublishEvent {
887    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
888        match self {
889            Self::ResolvedPoolArtifacts(count) => {
890                write!(f, "resolved {} needed pool artifacts", count)
891            }
892            Self::PoolArtifactCurrent(path) => {
893                write!(f, "pool path {} is present", path)
894            }
895            Self::PoolArtifactMissing(path) => {
896                write!(f, "pool path {} will be written", path)
897            }
898            Self::PoolArtifactsToPublish(count) => {
899                write!(f, "{} pool artifacts will be written", count)
900            }
901            Self::PoolArtifactCreated(path, size) => {
902                write!(f, "wrote {} bytes to {}", size, path)
903            }
904            Self::IndexFileToWrite(path) => {
905                write!(f, "index file {} will be written", path)
906            }
907            Self::IndexFileWritten(path, size) => {
908                write!(f, "wrote {} bytes to {}", size, path)
909            }
910            Self::VerifyingDestinationPath(path) => {
911                write!(f, "verifying destination path {}", path)
912            }
913            Self::CopyPhaseBegin(phase) => {
914                write!(f, "beginning copying of {}", phase)
915            }
916            Self::CopyPhaseEnd(phase) => {
917                write!(f, "finished copying of {}", phase)
918            }
919            Self::CopyingPath(source, dest) => {
920                write!(f, "copying {} to {}", source, dest)
921            }
922            Self::CopyIndicesPathNotFound(path) => {
923                write!(
924                    f,
925                    "copying indices file {} failed because it wasn't found",
926                    path
927                )
928            }
929            Self::PathCopied(path, size) => {
930                write!(f, "copied {} bytes to {}", size, path)
931            }
932            Self::PathCopyNoop(path) => {
933                write!(f, "copy of {} was a no-op", path)
934            }
935            Self::WriteSequenceBeginWithTotalBytes(_)
936            | Self::WriteSequenceProgressBytes(_)
937            | Self::WriteSequenceFinished => Ok(()),
938        }
939    }
940}
941
942impl PublishEvent {
943    /// Whether this even contains a meaningful log message.
944    pub fn is_loggable(&self) -> bool {
945        !self.is_progress()
946    }
947
948    /// Whether this is a progress update.
949    pub fn is_progress(&self) -> bool {
950        matches!(
951            self,
952            Self::WriteSequenceBeginWithTotalBytes(_)
953                | Self::WriteSequenceProgressBytes(_)
954                | Self::WriteSequenceFinished
955        )
956    }
957}
958
959#[derive(Clone, Debug)]
960pub struct RepositoryWrite<'a> {
961    /// The path that was written.
962    pub path: Cow<'a, str>,
963    /// The number of bytes written.
964    pub bytes_written: u64,
965}
966
967/// Describes the result of a repository write operation.
968pub enum RepositoryWriteOperation<'a> {
969    /// A path was written.
970    PathWritten(RepositoryWrite<'a>),
971    /// The operation didn't do anything meaningful.
972    Noop(Cow<'a, str>, u64),
973}
974
975impl<'a> RepositoryWriteOperation<'a> {
976    pub fn bytes_written(&self) -> u64 {
977        match self {
978            Self::PathWritten(write) => write.bytes_written,
979            Self::Noop(_, size) => *size,
980        }
981    }
982}
983
984/// An interface for writing to a repository.
985///
986/// From the perspective of this trait, writing to a repository is a matter of
987/// providing I/O for testing for path/key existence/integrity and storing new
988/// data under a path/key. Additional logic about what to write where is
989/// implemented elsewhere.
990#[async_trait]
991pub trait RepositoryWriter: Sync {
992    /// Verify the existence of a path with optional content integrity checking.
993    ///
994    /// If the size and digest are [Some] implementations *may* perform additional
995    /// content integrity verification. Or they may not. They should not lie about
996    /// whether integrity verification was performed in the returned value, however.
997    async fn verify_path<'path>(
998        &self,
999        path: &'path str,
1000        expected_content: Option<(u64, ContentDigest)>,
1001    ) -> Result<RepositoryPathVerification<'path>>;
1002
1003    /// Write data to a given path.
1004    ///
1005    /// The data to write is provided by an [AsyncRead] reader.
1006    async fn write_path<'path, 'reader>(
1007        &self,
1008        path: Cow<'path, str>,
1009        reader: Pin<Box<dyn AsyncRead + Send + 'reader>>,
1010    ) -> Result<RepositoryWrite<'path>>;
1011
1012    /// Copy a path from a reader to this writer.
1013    ///
1014    /// The source reader is a [RepositoryRootReader] and the path is relative to the repository
1015    /// root.
1016    ///
1017    /// The default implementation verifies the integrity of the destination and will no-op if
1018    /// the desired content is already present.
1019    ///
1020    /// Implementations of this trait may have a custom implementation that changes semantics.
1021    /// For example, a writer could operate in a dry-run mode where it doesn't actually attempt
1022    /// any I/O. Custom implementations should call `progress_cb` with events, as appropriate.
1023    async fn copy_from<'path>(
1024        &self,
1025        reader: &dyn RepositoryRootReader,
1026        source_path: Cow<'path, str>,
1027        expected_content: Option<(u64, ContentDigest)>,
1028        dest_path: Cow<'path, str>,
1029        progress_cb: &Option<Box<dyn Fn(PublishEvent) + Sync>>,
1030    ) -> Result<RepositoryWriteOperation<'path>> {
1031        if let Some(cb) = progress_cb {
1032            cb(PublishEvent::VerifyingDestinationPath(
1033                dest_path.to_string(),
1034            ));
1035        }
1036
1037        let verification = self
1038            .verify_path(dest_path.as_ref(), expected_content.clone())
1039            .await?;
1040
1041        if matches!(
1042            verification.state,
1043            RepositoryPathVerificationState::ExistsIntegrityVerified
1044        ) {
1045            return Ok(RepositoryWriteOperation::Noop(
1046                dest_path,
1047                if let Some((size, _)) = expected_content {
1048                    size
1049                } else {
1050                    0
1051                },
1052            ));
1053        }
1054
1055        if let Some(cb) = progress_cb {
1056            cb(PublishEvent::CopyingPath(
1057                source_path.to_string(),
1058                dest_path.to_string(),
1059            ));
1060        }
1061
1062        let reader = if let Some((size, digest)) = expected_content {
1063            reader
1064                .get_path_with_digest_verification(source_path.as_ref(), size, digest)
1065                .await?
1066        } else {
1067            reader.get_path(source_path.as_ref()).await?
1068        };
1069
1070        let write = self.write_path(dest_path, reader).await?;
1071
1072        Ok(RepositoryWriteOperation::PathWritten(write))
1073    }
1074}
1075
1076/// Construct a [RepositoryRootReader] from a string/URL.
1077///
1078/// If the string contains `://` it will be parsed as a URL. `file://`, `http://`,
1079/// and `https://` are recognized.
1080///
1081/// Otherwise the string will be interpreted as a filesystem path. No test for whether
1082/// the repository exists is performed.
1083pub fn reader_from_str(s: impl ToString) -> Result<Box<dyn RepositoryRootReader>> {
1084    let s = s.to_string();
1085
1086    if s.contains("://") {
1087        let url = url::Url::parse(&s)?;
1088
1089        match url.scheme() {
1090            "file" => Ok(Box::new(filesystem::FilesystemRepositoryReader::new(
1091                url.to_file_path()
1092                    .expect("path conversion should always work for file://"),
1093            ))),
1094            #[cfg(feature = "http")]
1095            "http" | "https" => Ok(Box::new(http::HttpRepositoryClient::new(url)?)),
1096            _ => Err(DebianError::RepositoryReaderUnrecognizedUrl(s)),
1097        }
1098    } else {
1099        // Assume a filesystem path.
1100        Ok(Box::new(filesystem::FilesystemRepositoryReader::new(s)))
1101    }
1102}
1103
1104/// Construct a [RepositoryWriter] from a string/URL.
1105///
1106/// If the string contains `://` it will be parsed as a URL. `file://`, `null://`, and `s3://` are
1107/// recognized.
1108///
1109/// Otherwise the string will be interpreted as a filesystem path. No test for
1110/// whether the repository exists is performed.
1111pub async fn writer_from_str(s: impl ToString) -> Result<Box<dyn RepositoryWriter>> {
1112    let s = s.to_string();
1113
1114    if s.contains("://") {
1115        let url = url::Url::parse(&s)?;
1116
1117        match url.scheme() {
1118            "file" => Ok(Box::new(filesystem::FilesystemRepositoryWriter::new(
1119                url.to_file_path()
1120                    .expect("path conversion should always work for file://"),
1121            ))),
1122            "null" => {
1123                let mut writer = sink_writer::SinkWriter::default();
1124
1125                let behavior = match url.host_str() {
1126                    Some(s) => sink_writer::SinkWriterVerifyBehavior::from_str(s)?,
1127                    None => sink_writer::SinkWriterVerifyBehavior::Missing,
1128                };
1129
1130                writer.set_verify_behavior(behavior);
1131
1132                Ok(Box::new(writer))
1133            }
1134            #[cfg(feature = "s3")]
1135            "s3" => {
1136                let path = url.path();
1137
1138                if let Some((bucket, prefix)) = path.trim_matches('/').split_once('/') {
1139                    let region = s3::get_bucket_region(bucket).await?;
1140
1141                    Ok(Box::new(s3::S3Writer::new(region, bucket, Some(prefix))))
1142                } else {
1143                    let region = s3::get_bucket_region(path).await?;
1144
1145                    Ok(Box::new(s3::S3Writer::new(region, path, None)))
1146                }
1147            }
1148            _ => Err(DebianError::RepositoryWriterUnrecognizedUrl(s)),
1149        }
1150    } else {
1151        Ok(Box::new(filesystem::FilesystemRepositoryWriter::new(s)))
1152    }
1153}