debian_packaging/
io.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at https://mozilla.org/MPL/2.0/.
4
5/*! I/O helpers. */
6
7use {
8    crate::{
9        error::{DebianError, Result},
10        repository::release::ChecksumType,
11    },
12    async_compression::futures::bufread::{
13        BzDecoder, BzEncoder, GzipDecoder, GzipEncoder, LzmaDecoder, LzmaEncoder, XzDecoder,
14        XzEncoder,
15    },
16    async_trait::async_trait,
17    futures::{AsyncBufRead, AsyncRead, AsyncWrite},
18    pgp::crypto::hash::Hasher,
19    pgp_cleartext::CleartextHasher,
20    pin_project::pin_project,
21    std::{
22        collections::HashMap,
23        fmt::Formatter,
24        pin::Pin,
25        task::{Context, Poll},
26    },
27};
28
29/// Represents a content digest.
30#[derive(Clone, Eq, PartialEq, PartialOrd)]
31pub enum ContentDigest {
32    /// An MD5 digest.
33    Md5(Vec<u8>),
34    /// A SHA-1 digest.
35    Sha1(Vec<u8>),
36    /// A SHA-256 digest.
37    Sha256(Vec<u8>),
38}
39
40impl std::fmt::Debug for ContentDigest {
41    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
42        match self {
43            Self::Md5(data) => write!(f, "Md5({})", hex::encode(data)),
44            Self::Sha1(data) => write!(f, "Sha1({})", hex::encode(data)),
45            Self::Sha256(data) => write!(f, "Sha256({})", hex::encode(data)),
46        }
47    }
48}
49
50impl ContentDigest {
51    /// Create a new MD5 instance by parsing a hex digest.
52    pub fn md5_hex(digest: &str) -> Result<Self> {
53        Self::from_hex_digest(ChecksumType::Md5, digest)
54    }
55
56    /// Create a new SHA-1 instance by parsing a hex digest.
57    pub fn sha1_hex(digest: &str) -> Result<Self> {
58        Self::from_hex_digest(ChecksumType::Sha1, digest)
59    }
60
61    /// Create a new SHA-256 instance by parsing a hex digest.
62    pub fn sha256_hex(digest: &str) -> Result<Self> {
63        Self::from_hex_digest(ChecksumType::Sha256, digest)
64    }
65
66    /// Obtain an instance by parsing a hex string as a [ChecksumType].
67    pub fn from_hex_digest(checksum: ChecksumType, digest: &str) -> Result<Self> {
68        let digest = hex::decode(digest)
69            .map_err(|e| DebianError::ContentDigestBadHex(digest.to_string(), e))?;
70
71        Ok(match checksum {
72            ChecksumType::Md5 => Self::Md5(digest),
73            ChecksumType::Sha1 => Self::Sha1(digest),
74            ChecksumType::Sha256 => Self::Sha256(digest),
75        })
76    }
77
78    /// Create a new hasher matching for the type of this digest.
79    pub fn new_hasher(&self) -> Box<dyn Hasher + Send> {
80        Box::new(match self {
81            Self::Md5(_) => CleartextHasher::md5(),
82            Self::Sha1(_) => CleartextHasher::sha1(),
83            Self::Sha256(_) => CleartextHasher::sha256(),
84        })
85    }
86
87    /// Obtain the digest bytes for this content digest.
88    pub fn digest_bytes(&self) -> &[u8] {
89        match self {
90            Self::Md5(x) => x,
91            Self::Sha1(x) => x,
92            Self::Sha256(x) => x,
93        }
94    }
95
96    /// Obtain the hex encoded content digest.
97    pub fn digest_hex(&self) -> String {
98        hex::encode(self.digest_bytes())
99    }
100
101    /// Obtain the [ChecksumType] for this digest.
102    pub fn checksum_type(&self) -> ChecksumType {
103        match self {
104            Self::Md5(_) => ChecksumType::Md5,
105            Self::Sha1(_) => ChecksumType::Sha1,
106            Self::Sha256(_) => ChecksumType::Sha256,
107        }
108    }
109
110    /// Obtain the name of the field in `[In]Release` files that holds this digest type.
111    ///
112    /// This also corresponds to the directory name for `by-hash` paths.
113    pub fn release_field_name(&self) -> &'static str {
114        self.checksum_type().field_name()
115    }
116}
117
118/// Compression format used by Debian primitives.
119#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
120pub enum Compression {
121    /// No compression (no extension).
122    None,
123
124    /// XZ compression (.xz extension).
125    Xz,
126
127    /// Gzip compression (.gz extension).
128    Gzip,
129
130    /// Bzip2 compression (.bz2 extension).
131    Bzip2,
132
133    /// LZMA compression (.lzma extension).
134    Lzma,
135}
136
137impl Compression {
138    /// Filename extension for files compressed in this format.
139    pub fn extension(&self) -> &'static str {
140        match self {
141            Self::None => "",
142            Self::Xz => ".xz",
143            Self::Gzip => ".gz",
144            Self::Bzip2 => ".bz2",
145            Self::Lzma => ".lzma",
146        }
147    }
148
149    /// The default retrieval preference order for client.
150    pub fn default_preferred_order() -> impl Iterator<Item = Compression> {
151        [Self::Xz, Self::Lzma, Self::Gzip, Self::Bzip2, Self::None].into_iter()
152    }
153}
154
155/// Wrap a reader with transparent decompression.
156pub async fn read_decompressed(
157    stream: Pin<Box<dyn AsyncBufRead + Send>>,
158    compression: Compression,
159) -> Result<Pin<Box<dyn AsyncRead + Send>>> {
160    Ok(match compression {
161        Compression::None => Box::pin(stream),
162        Compression::Gzip => Box::pin(GzipDecoder::new(stream)),
163        Compression::Xz => Box::pin(XzDecoder::new(stream)),
164        Compression::Bzip2 => Box::pin(BzDecoder::new(stream)),
165        Compression::Lzma => Box::pin(LzmaDecoder::new(stream)),
166    })
167}
168
169/// Wrap a reader with transparent compression.
170pub fn read_compressed<'a>(
171    stream: impl AsyncBufRead + Send + 'a,
172    compression: Compression,
173) -> Pin<Box<dyn AsyncRead + Send + 'a>> {
174    match compression {
175        Compression::None => Box::pin(stream),
176        Compression::Gzip => Box::pin(GzipEncoder::new(stream)),
177        Compression::Xz => Box::pin(XzEncoder::new(stream)),
178        Compression::Bzip2 => Box::pin(BzEncoder::new(stream)),
179        Compression::Lzma => Box::pin(LzmaEncoder::new(stream)),
180    }
181}
182
183/// Drain content from a reader to a black hole.
184pub async fn drain_reader(reader: impl AsyncRead) -> std::io::Result<u64> {
185    let mut sink = futures::io::sink();
186    futures::io::copy(reader, &mut sink).await
187}
188
189/// An adapter for [AsyncRead] streams that validates source size and digest.
190///
191/// Validation only occurs once the expected source size bytes have been read.
192///
193/// If the reader consumes less than the expected number of bytes, no validation
194/// occurs and incorrect data could have been read. Therefore it is **strongly recommended**
195/// for readers to drain this reader. e.g. by using [drain_reader()].
196#[pin_project]
197pub struct ContentValidatingReader<R> {
198    hasher: Option<Box<dyn pgp::crypto::hash::Hasher + Send>>,
199    expected_size: u64,
200    expected_digest: ContentDigest,
201    #[pin]
202    source: R,
203    bytes_read: u64,
204}
205
206impl<R> ContentValidatingReader<R> {
207    /// Create a new instance bound to a source and having expected size and content digest.
208    pub fn new(source: R, expected_size: u64, expected_digest: ContentDigest) -> Self {
209        Self {
210            hasher: Some(expected_digest.new_hasher()),
211            expected_size,
212            expected_digest,
213            source,
214            bytes_read: 0,
215        }
216    }
217}
218
219impl<R> AsyncRead for ContentValidatingReader<R>
220where
221    R: AsyncRead + Unpin,
222{
223    fn poll_read(
224        self: Pin<&mut Self>,
225        cx: &mut Context<'_>,
226        buf: &mut [u8],
227    ) -> Poll<std::io::Result<usize>> {
228        let mut this = self.project();
229
230        match this.source.as_mut().poll_read(cx, buf) {
231            Poll::Ready(Ok(size)) => {
232                if size > 0 {
233                    if let Some(hasher) = this.hasher.as_mut() {
234                        hasher.update(&buf[0..size]);
235                    } else {
236                        panic!("hasher destroyed prematurely");
237                    }
238
239                    *this.bytes_read += size as u64;
240                }
241
242                match this.bytes_read.cmp(&this.expected_size) {
243                    std::cmp::Ordering::Equal => {
244                        if let Some(hasher) = this.hasher.take() {
245                            let got_digest = hasher.finish();
246
247                            if got_digest != this.expected_digest.digest_bytes() {
248                                return Poll::Ready(Err(std::io::Error::new(
249                                    std::io::ErrorKind::Other,
250                                    format!(
251                                        "digest mismatch of retrieved content: expected {}, got {}",
252                                        this.expected_digest.digest_hex(),
253                                        hex::encode(got_digest)
254                                    ),
255                                )));
256                            }
257                        }
258                    }
259                    std::cmp::Ordering::Greater => {
260                        return Poll::Ready(Err(std::io::Error::new(
261                            std::io::ErrorKind::Other,
262                            format!(
263                                "extra bytes read: expected {}; got {}",
264                                this.expected_size, this.bytes_read
265                            ),
266                        )));
267                    }
268                    std::cmp::Ordering::Less => {}
269                }
270
271                Poll::Ready(Ok(size))
272            }
273            res => res,
274        }
275    }
276}
277
278/// Holds multiple flavors of content digests.
279#[derive(Clone, Debug)]
280pub struct MultiContentDigest {
281    pub md5: ContentDigest,
282    pub sha1: ContentDigest,
283    pub sha256: ContentDigest,
284}
285
286impl MultiContentDigest {
287    /// Whether this digest matches another one.
288    pub fn matches_digest(&self, other: &ContentDigest) -> bool {
289        match other {
290            ContentDigest::Md5(_) => &self.md5 == other,
291            ContentDigest::Sha1(_) => &self.sha1 == other,
292            ContentDigest::Sha256(_) => &self.sha256 == other,
293        }
294    }
295
296    /// Obtain the [ContentDigest] for a given [ChecksumType].
297    pub fn digest_from_checksum(&self, checksum: ChecksumType) -> &ContentDigest {
298        match checksum {
299            ChecksumType::Md5 => &self.md5,
300            ChecksumType::Sha1 => &self.sha1,
301            ChecksumType::Sha256 => &self.sha256,
302        }
303    }
304
305    /// Obtain an iterator of [ContentDigest] in this instance.
306    pub fn iter_digests(&self) -> impl Iterator<Item = &ContentDigest> + '_ {
307        [&self.md5, &self.sha1, &self.sha256].into_iter()
308    }
309}
310
311/// A content digester that simultaneously computes multiple digest types.
312pub struct MultiDigester {
313    md5: Box<dyn Hasher + Send>,
314    sha1: Box<dyn Hasher + Send>,
315    sha256: Box<dyn Hasher + Send>,
316}
317
318impl Default for MultiDigester {
319    fn default() -> Self {
320        Self {
321            md5: Box::new(CleartextHasher::md5()),
322            sha1: Box::new(CleartextHasher::sha1()),
323            sha256: Box::new(CleartextHasher::sha256()),
324        }
325    }
326}
327
328impl MultiDigester {
329    /// Write content into the digesters.
330    pub fn update(&mut self, data: &[u8]) {
331        self.md5.update(data);
332        self.sha1.update(data);
333        self.sha256.update(data);
334    }
335
336    /// Finish digesting content.
337    ///
338    /// Consumes the instance and returns a [MultiContentDigest] holding all the digests.
339    pub fn finish(self) -> MultiContentDigest {
340        MultiContentDigest {
341            md5: ContentDigest::Md5(self.md5.finish()),
342            sha1: ContentDigest::Sha1(self.sha1.finish()),
343            sha256: ContentDigest::Sha256(self.sha256.finish()),
344        }
345    }
346}
347
348/// An [AsyncRead] stream adapter that computes multiple [ContentDigest] as data is read.
349#[pin_project]
350pub struct DigestingReader<R> {
351    digester: MultiDigester,
352    #[pin]
353    source: R,
354}
355
356impl<R> DigestingReader<R> {
357    /// Construct a new instance from a source reader.
358    pub fn new(source: R) -> Self {
359        Self {
360            digester: MultiDigester::default(),
361            source,
362        }
363    }
364
365    /// Finish the stream.
366    ///
367    /// Returns the source reader and a resolved [MultiContentDigest].
368    pub fn finish(self) -> (R, MultiContentDigest) {
369        (self.source, self.digester.finish())
370    }
371}
372
373impl<R> AsyncRead for DigestingReader<R>
374where
375    R: AsyncRead + Unpin,
376{
377    fn poll_read(
378        self: Pin<&mut Self>,
379        cx: &mut Context<'_>,
380        buf: &mut [u8],
381    ) -> Poll<std::io::Result<usize>> {
382        let mut this = self.project();
383
384        match this.source.as_mut().poll_read(cx, buf) {
385            Poll::Ready(Ok(size)) => {
386                if size > 0 {
387                    this.digester.update(&buf[0..size]);
388                }
389
390                Poll::Ready(Ok(size))
391            }
392            res => res,
393        }
394    }
395}
396
397/// An [AsyncWrite] stream adapter that computes multiple [ContentDigest] as data is written.
398#[pin_project]
399pub struct DigestingWriter<W> {
400    digester: MultiDigester,
401    #[pin]
402    dest: W,
403}
404
405impl<W> DigestingWriter<W> {
406    /// Construct a new instance from a destination writer.
407    pub fn new(dest: W) -> Self {
408        Self {
409            digester: MultiDigester::default(),
410            dest,
411        }
412    }
413
414    /// Finish the stream.
415    ///
416    /// Returns the destination writer and a resolved [MultiContentDigest].
417    pub fn finish(self) -> (W, MultiContentDigest) {
418        (self.dest, self.digester.finish())
419    }
420}
421
422impl<W> AsyncWrite for DigestingWriter<W>
423where
424    W: AsyncWrite + Unpin,
425{
426    fn poll_write(
427        self: Pin<&mut Self>,
428        cx: &mut Context<'_>,
429        buf: &[u8],
430    ) -> Poll<std::io::Result<usize>> {
431        let mut this = self.project();
432
433        match this.dest.as_mut().poll_write(cx, buf) {
434            Poll::Ready(Ok(size)) => {
435                if size > 0 {
436                    this.digester.update(&buf[0..size]);
437                }
438
439                Poll::Ready(Ok(size))
440            }
441            res => res,
442        }
443    }
444
445    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
446        self.project().dest.as_mut().poll_flush(cx)
447    }
448
449    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
450        self.project().dest.as_mut().poll_close(cx)
451    }
452}
453
454/// Generic mechanism for obtaining content at a given path.
455///
456/// This trait is used to define a generic mechanism for resolving content given
457/// a lookup key/path.
458///
459/// Implementations only need to implement `get_path()`. The other members have
460/// default implementations that should do the correct thing by default.
461#[async_trait]
462pub trait DataResolver: Sync {
463    /// Get the content of a relative path as an async reader.
464    ///
465    /// This obtains a reader for path data and returns the raw data without any
466    /// decoding applied.
467    async fn get_path(&self, path: &str) -> Result<Pin<Box<dyn AsyncRead + Send>>>;
468
469    /// Obtain a reader that performs content integrity checking.
470    ///
471    /// Because content digests can only be computed once all content is read, the reader
472    /// emits data as it is streaming but only compares the cryptographic digest once all
473    /// data has been read. If there is a content digest mismatch, an error will be raised
474    /// once the final byte is read.
475    ///
476    /// Validation only occurs if the stream is read to completion. Failure to read the
477    /// entire stream could result in reading of unexpected content.
478    async fn get_path_with_digest_verification(
479        &self,
480        path: &str,
481        expected_size: u64,
482        expected_digest: ContentDigest,
483    ) -> Result<Pin<Box<dyn AsyncRead + Send>>> {
484        Ok(Box::pin(ContentValidatingReader::new(
485            self.get_path(path).await?,
486            expected_size,
487            expected_digest,
488        )))
489    }
490
491    /// Get the content of a relative path with decompression transparently applied.
492    async fn get_path_decoded(
493        &self,
494        path: &str,
495        compression: Compression,
496    ) -> Result<Pin<Box<dyn AsyncRead + Send>>> {
497        read_decompressed(
498            Box::pin(futures::io::BufReader::new(self.get_path(path).await?)),
499            compression,
500        )
501        .await
502    }
503
504    /// Like [Self::get_path_decoded()] but also perform content integrity verification.
505    ///
506    /// The digest is matched against the original fetched content, before decompression.
507    async fn get_path_decoded_with_digest_verification(
508        &self,
509        path: &str,
510        compression: Compression,
511        expected_size: u64,
512        expected_digest: ContentDigest,
513    ) -> Result<Pin<Box<dyn AsyncRead + Send>>> {
514        let reader = self
515            .get_path_with_digest_verification(path, expected_size, expected_digest)
516            .await?;
517
518        read_decompressed(Box::pin(futures::io::BufReader::new(reader)), compression).await
519    }
520}
521
522/// A [DataResolver] that maintains a path translation table and transparently redirects lookups.
523pub struct PathMappingDataResolver<R> {
524    source: R,
525    path_map: HashMap<String, String>,
526}
527
528impl<R: DataResolver + Send> PathMappingDataResolver<R> {
529    /// Construct a new instance that forwards to a source [DataResolver].
530    pub fn new(source: R) -> Self {
531        Self {
532            source,
533            path_map: HashMap::default(),
534        }
535    }
536
537    /// Register a mapping of 1 path to another.
538    ///
539    /// Future looks up `from_path` will resolve to `to_path`.
540    pub fn add_path_map(&mut self, from_path: impl ToString, to_path: impl ToString) {
541        self.path_map
542            .insert(from_path.to_string(), to_path.to_string());
543    }
544}
545
546#[async_trait]
547impl<R: DataResolver + Send> DataResolver for PathMappingDataResolver<R> {
548    async fn get_path(&self, path: &str) -> Result<Pin<Box<dyn AsyncRead + Send>>> {
549        self.source
550            .get_path(self.path_map.get(path).map(|s| s.as_str()).unwrap_or(path))
551            .await
552    }
553}