1use {
8 crate::{
9 error::{DebianError, Result},
10 repository::release::ChecksumType,
11 },
12 async_compression::futures::bufread::{
13 BzDecoder, BzEncoder, GzipDecoder, GzipEncoder, LzmaDecoder, LzmaEncoder, XzDecoder,
14 XzEncoder,
15 },
16 async_trait::async_trait,
17 futures::{AsyncBufRead, AsyncRead, AsyncWrite},
18 pgp::crypto::hash::Hasher,
19 pgp_cleartext::CleartextHasher,
20 pin_project::pin_project,
21 std::{
22 collections::HashMap,
23 fmt::Formatter,
24 pin::Pin,
25 task::{Context, Poll},
26 },
27};
28
29#[derive(Clone, Eq, PartialEq, PartialOrd)]
31pub enum ContentDigest {
32 Md5(Vec<u8>),
34 Sha1(Vec<u8>),
36 Sha256(Vec<u8>),
38}
39
40impl std::fmt::Debug for ContentDigest {
41 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
42 match self {
43 Self::Md5(data) => write!(f, "Md5({})", hex::encode(data)),
44 Self::Sha1(data) => write!(f, "Sha1({})", hex::encode(data)),
45 Self::Sha256(data) => write!(f, "Sha256({})", hex::encode(data)),
46 }
47 }
48}
49
50impl ContentDigest {
51 pub fn md5_hex(digest: &str) -> Result<Self> {
53 Self::from_hex_digest(ChecksumType::Md5, digest)
54 }
55
56 pub fn sha1_hex(digest: &str) -> Result<Self> {
58 Self::from_hex_digest(ChecksumType::Sha1, digest)
59 }
60
61 pub fn sha256_hex(digest: &str) -> Result<Self> {
63 Self::from_hex_digest(ChecksumType::Sha256, digest)
64 }
65
66 pub fn from_hex_digest(checksum: ChecksumType, digest: &str) -> Result<Self> {
68 let digest = hex::decode(digest)
69 .map_err(|e| DebianError::ContentDigestBadHex(digest.to_string(), e))?;
70
71 Ok(match checksum {
72 ChecksumType::Md5 => Self::Md5(digest),
73 ChecksumType::Sha1 => Self::Sha1(digest),
74 ChecksumType::Sha256 => Self::Sha256(digest),
75 })
76 }
77
78 pub fn new_hasher(&self) -> Box<dyn Hasher + Send> {
80 Box::new(match self {
81 Self::Md5(_) => CleartextHasher::md5(),
82 Self::Sha1(_) => CleartextHasher::sha1(),
83 Self::Sha256(_) => CleartextHasher::sha256(),
84 })
85 }
86
87 pub fn digest_bytes(&self) -> &[u8] {
89 match self {
90 Self::Md5(x) => x,
91 Self::Sha1(x) => x,
92 Self::Sha256(x) => x,
93 }
94 }
95
96 pub fn digest_hex(&self) -> String {
98 hex::encode(self.digest_bytes())
99 }
100
101 pub fn checksum_type(&self) -> ChecksumType {
103 match self {
104 Self::Md5(_) => ChecksumType::Md5,
105 Self::Sha1(_) => ChecksumType::Sha1,
106 Self::Sha256(_) => ChecksumType::Sha256,
107 }
108 }
109
110 pub fn release_field_name(&self) -> &'static str {
114 self.checksum_type().field_name()
115 }
116}
117
118#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
120pub enum Compression {
121 None,
123
124 Xz,
126
127 Gzip,
129
130 Bzip2,
132
133 Lzma,
135}
136
137impl Compression {
138 pub fn extension(&self) -> &'static str {
140 match self {
141 Self::None => "",
142 Self::Xz => ".xz",
143 Self::Gzip => ".gz",
144 Self::Bzip2 => ".bz2",
145 Self::Lzma => ".lzma",
146 }
147 }
148
149 pub fn default_preferred_order() -> impl Iterator<Item = Compression> {
151 [Self::Xz, Self::Lzma, Self::Gzip, Self::Bzip2, Self::None].into_iter()
152 }
153}
154
155pub async fn read_decompressed(
157 stream: Pin<Box<dyn AsyncBufRead + Send>>,
158 compression: Compression,
159) -> Result<Pin<Box<dyn AsyncRead + Send>>> {
160 Ok(match compression {
161 Compression::None => Box::pin(stream),
162 Compression::Gzip => Box::pin(GzipDecoder::new(stream)),
163 Compression::Xz => Box::pin(XzDecoder::new(stream)),
164 Compression::Bzip2 => Box::pin(BzDecoder::new(stream)),
165 Compression::Lzma => Box::pin(LzmaDecoder::new(stream)),
166 })
167}
168
169pub fn read_compressed<'a>(
171 stream: impl AsyncBufRead + Send + 'a,
172 compression: Compression,
173) -> Pin<Box<dyn AsyncRead + Send + 'a>> {
174 match compression {
175 Compression::None => Box::pin(stream),
176 Compression::Gzip => Box::pin(GzipEncoder::new(stream)),
177 Compression::Xz => Box::pin(XzEncoder::new(stream)),
178 Compression::Bzip2 => Box::pin(BzEncoder::new(stream)),
179 Compression::Lzma => Box::pin(LzmaEncoder::new(stream)),
180 }
181}
182
183pub async fn drain_reader(reader: impl AsyncRead) -> std::io::Result<u64> {
185 let mut sink = futures::io::sink();
186 futures::io::copy(reader, &mut sink).await
187}
188
189#[pin_project]
197pub struct ContentValidatingReader<R> {
198 hasher: Option<Box<dyn pgp::crypto::hash::Hasher + Send>>,
199 expected_size: u64,
200 expected_digest: ContentDigest,
201 #[pin]
202 source: R,
203 bytes_read: u64,
204}
205
206impl<R> ContentValidatingReader<R> {
207 pub fn new(source: R, expected_size: u64, expected_digest: ContentDigest) -> Self {
209 Self {
210 hasher: Some(expected_digest.new_hasher()),
211 expected_size,
212 expected_digest,
213 source,
214 bytes_read: 0,
215 }
216 }
217}
218
219impl<R> AsyncRead for ContentValidatingReader<R>
220where
221 R: AsyncRead + Unpin,
222{
223 fn poll_read(
224 self: Pin<&mut Self>,
225 cx: &mut Context<'_>,
226 buf: &mut [u8],
227 ) -> Poll<std::io::Result<usize>> {
228 let mut this = self.project();
229
230 match this.source.as_mut().poll_read(cx, buf) {
231 Poll::Ready(Ok(size)) => {
232 if size > 0 {
233 if let Some(hasher) = this.hasher.as_mut() {
234 hasher.update(&buf[0..size]);
235 } else {
236 panic!("hasher destroyed prematurely");
237 }
238
239 *this.bytes_read += size as u64;
240 }
241
242 match this.bytes_read.cmp(&this.expected_size) {
243 std::cmp::Ordering::Equal => {
244 if let Some(hasher) = this.hasher.take() {
245 let got_digest = hasher.finish();
246
247 if got_digest != this.expected_digest.digest_bytes() {
248 return Poll::Ready(Err(std::io::Error::new(
249 std::io::ErrorKind::Other,
250 format!(
251 "digest mismatch of retrieved content: expected {}, got {}",
252 this.expected_digest.digest_hex(),
253 hex::encode(got_digest)
254 ),
255 )));
256 }
257 }
258 }
259 std::cmp::Ordering::Greater => {
260 return Poll::Ready(Err(std::io::Error::new(
261 std::io::ErrorKind::Other,
262 format!(
263 "extra bytes read: expected {}; got {}",
264 this.expected_size, this.bytes_read
265 ),
266 )));
267 }
268 std::cmp::Ordering::Less => {}
269 }
270
271 Poll::Ready(Ok(size))
272 }
273 res => res,
274 }
275 }
276}
277
278#[derive(Clone, Debug)]
280pub struct MultiContentDigest {
281 pub md5: ContentDigest,
282 pub sha1: ContentDigest,
283 pub sha256: ContentDigest,
284}
285
286impl MultiContentDigest {
287 pub fn matches_digest(&self, other: &ContentDigest) -> bool {
289 match other {
290 ContentDigest::Md5(_) => &self.md5 == other,
291 ContentDigest::Sha1(_) => &self.sha1 == other,
292 ContentDigest::Sha256(_) => &self.sha256 == other,
293 }
294 }
295
296 pub fn digest_from_checksum(&self, checksum: ChecksumType) -> &ContentDigest {
298 match checksum {
299 ChecksumType::Md5 => &self.md5,
300 ChecksumType::Sha1 => &self.sha1,
301 ChecksumType::Sha256 => &self.sha256,
302 }
303 }
304
305 pub fn iter_digests(&self) -> impl Iterator<Item = &ContentDigest> + '_ {
307 [&self.md5, &self.sha1, &self.sha256].into_iter()
308 }
309}
310
311pub struct MultiDigester {
313 md5: Box<dyn Hasher + Send>,
314 sha1: Box<dyn Hasher + Send>,
315 sha256: Box<dyn Hasher + Send>,
316}
317
318impl Default for MultiDigester {
319 fn default() -> Self {
320 Self {
321 md5: Box::new(CleartextHasher::md5()),
322 sha1: Box::new(CleartextHasher::sha1()),
323 sha256: Box::new(CleartextHasher::sha256()),
324 }
325 }
326}
327
328impl MultiDigester {
329 pub fn update(&mut self, data: &[u8]) {
331 self.md5.update(data);
332 self.sha1.update(data);
333 self.sha256.update(data);
334 }
335
336 pub fn finish(self) -> MultiContentDigest {
340 MultiContentDigest {
341 md5: ContentDigest::Md5(self.md5.finish()),
342 sha1: ContentDigest::Sha1(self.sha1.finish()),
343 sha256: ContentDigest::Sha256(self.sha256.finish()),
344 }
345 }
346}
347
348#[pin_project]
350pub struct DigestingReader<R> {
351 digester: MultiDigester,
352 #[pin]
353 source: R,
354}
355
356impl<R> DigestingReader<R> {
357 pub fn new(source: R) -> Self {
359 Self {
360 digester: MultiDigester::default(),
361 source,
362 }
363 }
364
365 pub fn finish(self) -> (R, MultiContentDigest) {
369 (self.source, self.digester.finish())
370 }
371}
372
373impl<R> AsyncRead for DigestingReader<R>
374where
375 R: AsyncRead + Unpin,
376{
377 fn poll_read(
378 self: Pin<&mut Self>,
379 cx: &mut Context<'_>,
380 buf: &mut [u8],
381 ) -> Poll<std::io::Result<usize>> {
382 let mut this = self.project();
383
384 match this.source.as_mut().poll_read(cx, buf) {
385 Poll::Ready(Ok(size)) => {
386 if size > 0 {
387 this.digester.update(&buf[0..size]);
388 }
389
390 Poll::Ready(Ok(size))
391 }
392 res => res,
393 }
394 }
395}
396
397#[pin_project]
399pub struct DigestingWriter<W> {
400 digester: MultiDigester,
401 #[pin]
402 dest: W,
403}
404
405impl<W> DigestingWriter<W> {
406 pub fn new(dest: W) -> Self {
408 Self {
409 digester: MultiDigester::default(),
410 dest,
411 }
412 }
413
414 pub fn finish(self) -> (W, MultiContentDigest) {
418 (self.dest, self.digester.finish())
419 }
420}
421
422impl<W> AsyncWrite for DigestingWriter<W>
423where
424 W: AsyncWrite + Unpin,
425{
426 fn poll_write(
427 self: Pin<&mut Self>,
428 cx: &mut Context<'_>,
429 buf: &[u8],
430 ) -> Poll<std::io::Result<usize>> {
431 let mut this = self.project();
432
433 match this.dest.as_mut().poll_write(cx, buf) {
434 Poll::Ready(Ok(size)) => {
435 if size > 0 {
436 this.digester.update(&buf[0..size]);
437 }
438
439 Poll::Ready(Ok(size))
440 }
441 res => res,
442 }
443 }
444
445 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
446 self.project().dest.as_mut().poll_flush(cx)
447 }
448
449 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
450 self.project().dest.as_mut().poll_close(cx)
451 }
452}
453
454#[async_trait]
462pub trait DataResolver: Sync {
463 async fn get_path(&self, path: &str) -> Result<Pin<Box<dyn AsyncRead + Send>>>;
468
469 async fn get_path_with_digest_verification(
479 &self,
480 path: &str,
481 expected_size: u64,
482 expected_digest: ContentDigest,
483 ) -> Result<Pin<Box<dyn AsyncRead + Send>>> {
484 Ok(Box::pin(ContentValidatingReader::new(
485 self.get_path(path).await?,
486 expected_size,
487 expected_digest,
488 )))
489 }
490
491 async fn get_path_decoded(
493 &self,
494 path: &str,
495 compression: Compression,
496 ) -> Result<Pin<Box<dyn AsyncRead + Send>>> {
497 read_decompressed(
498 Box::pin(futures::io::BufReader::new(self.get_path(path).await?)),
499 compression,
500 )
501 .await
502 }
503
504 async fn get_path_decoded_with_digest_verification(
508 &self,
509 path: &str,
510 compression: Compression,
511 expected_size: u64,
512 expected_digest: ContentDigest,
513 ) -> Result<Pin<Box<dyn AsyncRead + Send>>> {
514 let reader = self
515 .get_path_with_digest_verification(path, expected_size, expected_digest)
516 .await?;
517
518 read_decompressed(Box::pin(futures::io::BufReader::new(reader)), compression).await
519 }
520}
521
522pub struct PathMappingDataResolver<R> {
524 source: R,
525 path_map: HashMap<String, String>,
526}
527
528impl<R: DataResolver + Send> PathMappingDataResolver<R> {
529 pub fn new(source: R) -> Self {
531 Self {
532 source,
533 path_map: HashMap::default(),
534 }
535 }
536
537 pub fn add_path_map(&mut self, from_path: impl ToString, to_path: impl ToString) {
541 self.path_map
542 .insert(from_path.to_string(), to_path.to_string());
543 }
544}
545
546#[async_trait]
547impl<R: DataResolver + Send> DataResolver for PathMappingDataResolver<R> {
548 async fn get_path(&self, path: &str) -> Result<Pin<Box<dyn AsyncRead + Send>>> {
549 self.source
550 .get_path(self.path_map.get(path).map(|s| s.as_str()).unwrap_or(path))
551 .await
552 }
553}