scratch_io/
wharf.rs

1/// <https://github.com/itchio/wharf/blob/189a01902d172b3297051fab12d5d4db2c620e1d/bsdiff/bsdiff.proto>
2///
3/// More information about bsdiff wharf patches:
4/// <https://web.archive.org/web/20211123032456/https://twitter.com/fasterthanlime/status/790617515009437701>
5pub mod bsdiff;
6/// <https://github.com/itchio/wharf/blob/189a01902d172b3297051fab12d5d4db2c620e1d/pwr/pwr.proto>
7pub mod pwr;
8/// <https://github.com/itchio/lake/blob/cc4284ec2b2a9ebc4735d7560ed8216de6ffac6f/tlc/tlc.proto>
9pub mod tlc;
10
11use md5::{Digest, Md5};
12use std::io::{BufRead, Read};
13
14/// <https://github.com/itchio/wharf/blob/189a01902d172b3297051fab12d5d4db2c620e1d/pwr/constants.go#L14>
15const PATCH_MAGIC: u32 = 0x0FEF5F00;
16const SIGNATURE_MAGIC: u32 = PATCH_MAGIC + 1;
17
18/// <https://github.com/itchio/wharf/blob/189a01902d172b3297051fab12d5d4db2c620e1d/pwr/constants.go#L30>
19const _MODE_MASK: u32 = 0o644;
20
21/// <https://github.com/itchio/wharf/blob/189a01902d172b3297051fab12d5d4db2c620e1d/pwr/constants.go#L33>
22const BLOCK_SIZE: usize = 64 * 1024;
23
24/// <https://protobuf.dev/programming-guides/encoding/#varints>
25const PROTOBUF_VARINT_MAX_LENGTH: usize = 10;
26
27const MAX_OPEN_FILES_PATCH: std::num::NonZeroUsize = std::num::NonZeroUsize::new(16).unwrap();
28
29/// Represents a decoded wharf signature file
30///
31/// <https://docs.itch.zone/wharf/master/file-formats/signatures.html>
32///
33/// Contains the header, the container describing the files/dirs/symlinks,
34/// and an iterator over the signature block hashes. The iterator reads
35/// from the underlying stream on the fly as items are requested.
36#[derive(Debug, Clone, PartialEq)]
37pub struct Signature<R> {
38  pub header: pwr::SignatureHeader,
39  pub container_new: tlc::Container,
40  pub block_hash_iter: BlockHashIter<R>,
41}
42
43/// Represents a decoded wharf patch file
44///
45/// <https://docs.itch.zone/wharf/master/file-formats/patches.html>
46///
47/// Contains the header, the old and new containers describing file system
48/// state before and after the patch, and an iterator over the patch operations.
49/// The iterator reads from the underlying stream on the fly as items are requested.
50#[derive(Debug, Clone, PartialEq)]
51pub struct Patch<R> {
52  pub header: pwr::PatchHeader,
53  pub container_old: tlc::Container,
54  pub container_new: tlc::Container,
55  pub sync_op_iter: SyncEntryIter<R>,
56}
57
58/// Iterator over independent, sequential length-delimited [`pwr::BlockHash`] Protobuf messages in a [`std::io::BufRead`] stream
59///
60/// Each message is of the same type, independent and follows directly after the previous one in the stream.
61/// The messages are read and decoded one by one, without loading the entire stream into memory.
62///
63/// The iterator finishes when reaching EOF
64#[derive(Debug, Clone, PartialEq)]
65pub struct BlockHashIter<R> {
66  reader: R,
67}
68
69impl<R> Iterator for BlockHashIter<R>
70where
71  R: BufRead,
72{
73  type Item = Result<pwr::BlockHash, String>;
74
75  fn next(&mut self) -> Option<Self::Item> {
76    match self.reader.fill_buf() {
77      // If it couldn't read from the stream, return an error
78      Err(e) => Some(Err(format!("Couldn't read from reader into buffer!\n{e}"))),
79
80      // If there isn't any data remaining, return None
81      Ok([]) => None,
82
83      // If there is data remaining, return the decoded BlockHash Protobuf message
84      Ok(_) => Some(decode_protobuf::<pwr::BlockHash>(&mut self.reader)),
85    }
86  }
87}
88
89#[derive(Debug, PartialEq)]
90pub struct RsyncOpIter<'a, R> {
91  reader: &'a mut R,
92}
93
94impl<'a, R> Iterator for RsyncOpIter<'a, R>
95where
96  R: BufRead,
97{
98  type Item = Result<pwr::SyncOp, String>;
99
100  fn next(&mut self) -> Option<Self::Item> {
101    match decode_protobuf::<pwr::SyncOp>(&mut self.reader) {
102      Err(e) => Some(Err(format!(
103        "Couldn't decode Rsync SyncOp message from reader!\n{e}"
104      ))),
105
106      Ok(sync_op) => {
107        if let pwr::sync_op::Type::HeyYouDidIt = sync_op.r#type() {
108          None
109        } else {
110          Some(Ok(sync_op))
111        }
112      }
113    }
114  }
115}
116
117#[derive(Debug, PartialEq)]
118pub struct BsdiffOpIter<'a, R> {
119  reader: &'a mut R,
120}
121
122impl<'a, R> Iterator for BsdiffOpIter<'a, R>
123where
124  R: BufRead,
125{
126  type Item = Result<bsdiff::Control, String>;
127
128  fn next(&mut self) -> Option<Self::Item> {
129    match decode_protobuf::<bsdiff::Control>(&mut self.reader) {
130      Err(e) => Some(Err(format!(
131        "Couldn't decode Bsdiff Control message from reader!\n{e}"
132      ))),
133
134      Ok(control_op) => {
135        if control_op.eof {
136          // Wharf adds a Rsync HeyYouDidIt message after the Bsdiff EOF
137          match decode_protobuf::<pwr::SyncOp>(&mut self.reader) {
138            Err(e) => Some(Err(format!(
139              "Couldn't decode Rsync SyncOp message from reader!\n{e}"
140            ))),
141
142            Ok(sync_op) => {
143              if let pwr::sync_op::Type::HeyYouDidIt = sync_op.r#type() {
144                None
145              } else {
146                Some(Err(
147                  "Expected a Rsync HeyYouDidIt sync operation, but did not found it!".to_string(),
148                ))
149              }
150            }
151          }
152        } else {
153          Some(Ok(control_op))
154        }
155      }
156    }
157  }
158}
159
160#[derive(Debug, PartialEq)]
161pub enum SyncHeader<'a, R> {
162  Rsync {
163    file_index: i64,
164    op_iter: RsyncOpIter<'a, R>,
165  },
166  Bsdiff {
167    file_index: i64,
168    target_index: i64,
169    op_iter: BsdiffOpIter<'a, R>,
170  },
171}
172
173#[derive(Debug, Clone, PartialEq)]
174pub struct SyncEntryIter<R> {
175  reader: R,
176}
177
178impl<'a, R> SyncEntryIter<R>
179where
180  R: BufRead,
181{
182  pub fn next_header(&'a mut self) -> Option<Result<SyncHeader<'a, R>, String>> {
183    match self.reader.fill_buf() {
184      // If it couldn't read from the stream, return an error
185      Err(e) => Some(Err(format!("Couldn't read from reader into buffer!\n{e}"))),
186
187      // If there isn't any data remaining, return None
188      Ok([]) => None,
189
190      // If there is data remaining, return the decoded header
191      Ok(_) => {
192        // Decode the SyncHeader
193        let header = match decode_protobuf::<pwr::SyncHeader>(&mut self.reader) {
194          Err(e) => return Some(Err(e)),
195          Ok(sync_header) => sync_header,
196        };
197
198        // Decode the BsdiffHeader (if the header type is Bsdiff)
199        let bsdiff_header = match header.r#type() {
200          pwr::sync_header::Type::Rsync => None,
201          pwr::sync_header::Type::Bsdiff => {
202            match decode_protobuf::<pwr::BsdiffHeader>(&mut self.reader) {
203              Err(e) => return Some(Err(e)),
204              Ok(bsdiff_header) => Some(bsdiff_header),
205            }
206          }
207        };
208
209        // Pack the gathered data into a SyncHeader struct and return it
210        Some(Ok(match bsdiff_header {
211          None => SyncHeader::Rsync {
212            file_index: header.file_index,
213            op_iter: RsyncOpIter {
214              reader: &mut self.reader,
215            },
216          },
217          Some(bsdiff) => SyncHeader::Bsdiff {
218            file_index: header.file_index,
219            target_index: bsdiff.target_index,
220            op_iter: BsdiffOpIter {
221              reader: &mut self.reader,
222            },
223          },
224        }))
225      }
226    }
227  }
228}
229
230/// Read a Protobuf length delimiter encoded as a variable-width integer and consume its bytes
231///
232/// <https://protobuf.dev/programming-guides/encoding/#length-types>
233///
234/// <https://protobuf.dev/programming-guides/encoding/#varints>
235///
236/// # Errors
237///
238/// If the read operation from the buffer fails, an unexpected EOF is encountered, or the length delimiter is invalid
239fn read_length_delimiter(reader: &mut impl Read) -> Result<usize, String> {
240  // A Protobuf varint must be 10 bytes or less
241  let mut varint = [0u8; PROTOBUF_VARINT_MAX_LENGTH];
242
243  for current_byte in &mut varint {
244    // Read one byte
245    let mut byte = [0u8; 1];
246    reader
247      .read_exact(&mut byte)
248      .map_err(|e| format!("Couldn't read from reader into buffer!\n{e}"))?;
249
250    // Save the byte in the array
251    *current_byte = byte[0];
252
253    // The most significant bit indicates whether there are more bytes in the varint
254    if (byte[0] & 0x80) == 0 {
255      break;
256    }
257  }
258
259  // Decode the varint
260  prost::decode_length_delimiter(varint.as_slice())
261    .map_err(|e| format!("Couldn't decode the signature header length delimiter!\n{e}"))
262}
263
264/// Decode a length-delimited Protobuf message
265///
266/// Advance the reader to the end of the message
267///
268/// # Returns
269///
270/// The deserialized Protobuf message
271///
272/// # Errors
273///
274/// If the reader could not be read, or if the Protobuf message is invalid
275fn decode_protobuf<T: prost::Message + Default>(reader: &mut impl Read) -> Result<T, String> {
276  let length = read_length_delimiter(reader)?;
277
278  let mut bytes = vec![0u8; length];
279  reader
280    .read_exact(&mut bytes)
281    .map_err(|e| format!("Couldn't read from reader into buffer!\n{e}"))?;
282
283  T::decode(bytes.as_slice()).map_err(|e| format!("Couldn't decode Protobuf message!\n{e}"))
284}
285
286/// Verify that the next four bytes of the reader match the expected magic number
287///
288/// # Errors
289///
290/// If the bytes couldn't be read from the reader or the magic bytes don't match
291fn check_magic_bytes(reader: &mut impl Read, expected_magic: u32) -> Result<(), String> {
292  // Read the magic bytes
293  let mut magic_bytes = [0u8; _];
294  reader
295    .read_exact(&mut magic_bytes)
296    .map_err(|e| format!("Couldn't read magic bytes!\n{e}"))?;
297
298  // Compare the magic numbers
299  let actual_magic = u32::from_le_bytes(magic_bytes);
300  if actual_magic == expected_magic {
301    Ok(())
302  } else {
303    Err("The magic bytes don't match! The binary file is corrupted!".to_string())
304  }
305}
306
307/// Decompress a stream using the specified decompression algorithm
308///
309/// # Returns
310///
311/// The decompressed buffered stream
312///
313/// # Errors
314///
315///
316fn decompress_stream(
317  reader: &mut impl BufRead,
318  algorithm: pwr::CompressionAlgorithm,
319) -> Result<Box<dyn std::io::BufRead + '_>, String> {
320  match algorithm {
321    pwr::CompressionAlgorithm::None => Ok(Box::new(reader)),
322
323    pwr::CompressionAlgorithm::Brotli => {
324      #[cfg(feature = "brotli")]
325      {
326        Ok(Box::new(std::io::BufReader::new(
327          // Set the buffer size to zero to allow Brotli to select the correct size
328          brotli::Decompressor::new(reader, 0),
329        )))
330      }
331
332      #[cfg(not(feature = "brotli"))]
333      {
334        Err(
335          "This binary was built without Brotli support. Recompile with `--features brotli` to be able to decompress the stream",
336        )
337      }
338    }
339
340    pwr::CompressionAlgorithm::Gzip => {
341      #[cfg(feature = "gzip")]
342      {
343        Ok(Box::new(std::io::BufReader::new(
344          flate2::bufread::GzDecoder::new(reader),
345        )))
346      }
347
348      #[cfg(not(feature = "gzip"))]
349      {
350        Err(
351          "This binary was built without gzip support. Recompile with `--features gzip` to be able to decompress the stream",
352        )
353      }
354    }
355    pwr::CompressionAlgorithm::Zstd => {
356      #[cfg(feature = "zstd")]
357      {
358        Ok(Box::new(std::io::BufReader::new(
359          zstd::Decoder::with_buffer(reader)
360            .map_err(|e| format!("Couldn't create zstd decoder!\n{e}"))?,
361        )))
362      }
363
364      #[cfg(not(feature = "zstd"))]
365      {
366        Err(
367          "This binary was built without Zstd support. Recompile with `--features zstd` to be able to decompress the stream",
368        )
369      }
370    }
371  }
372}
373
374/// <https://docs.itch.zone/wharf/master/file-formats/signatures.html>
375///
376/// The signature structure is:
377///
378/// - [`SIGNATURE_MAGIC`]
379/// - [`pwr::SignatureHeader`]
380/// - decompressed stream follows:
381///   - [`tlc::Container`]    (target container)
382///   - repeated sequence:
383///     - [`pwr::BlockHash`]
384pub fn read_signature(reader: &mut impl BufRead) -> Result<Signature<impl BufRead>, String> {
385  // Check the magic bytes
386  check_magic_bytes(reader, SIGNATURE_MAGIC)?;
387
388  // Decode the signature header
389  let header = decode_protobuf::<pwr::SignatureHeader>(reader)?;
390
391  // Decompress the remaining stream
392  let compression_algorithm = header
393    .compression
394    .ok_or("Missing compressing field in Signature Header!")?
395    .algorithm();
396
397  let mut decompressed = decompress_stream(reader, compression_algorithm)?;
398
399  // Decode the container
400  let container_new = decode_protobuf::<tlc::Container>(&mut decompressed)?;
401
402  // Decode the hashes
403  let block_hash_iter = BlockHashIter {
404    reader: decompressed,
405  };
406
407  Ok(Signature {
408    header,
409    container_new,
410    block_hash_iter,
411  })
412}
413
414pub fn verify_files(
415  build_folder: &std::path::Path,
416  signature: &mut Signature<impl BufRead>,
417) -> Result<(), String> {
418  // This buffer will hold the current block that is being hashed
419  let mut buffer = vec![0u8; BLOCK_SIZE];
420
421  // Create a MD5 hasher
422  let mut hasher = Md5::new();
423
424  // Loop over all the files in the signature container
425  for container_file in &signature.container_new.files {
426    let file_path = build_folder.join(&container_file.path);
427    let file = std::fs::File::open(&file_path).map_err(|e| {
428      format!(
429        "Couldn't open file: \"{}\"\n{e}",
430        file_path.to_string_lossy()
431      )
432    })?;
433
434    // Check if the file length matches
435    let metadata = file.metadata().map_err(|e| {
436      format!(
437        "Couldn't get file metadata: \"{}\"\n{e}",
438        file_path.to_string_lossy()
439      )
440    })?;
441
442    if metadata.len() as i64 != container_file.size {
443      return Err(format!(
444        "The signature and the in-disk size of \"{}\" don't match!",
445        file_path.to_string_lossy()
446      ));
447    }
448
449    // For each block in the file, compare its hash with the one provided in the signature
450    let mut file_bufreader = std::io::BufReader::new(file);
451    let mut block_index: usize = 0;
452
453    loop {
454      let block_start: usize = block_index * BLOCK_SIZE;
455      let block_end: usize = std::cmp::min(block_start + BLOCK_SIZE, container_file.size as usize);
456
457      // Read the current block
458      let buf = &mut buffer[..block_end - block_start];
459      file_bufreader.read_exact(buf).map_err(|e| {
460        format!(
461          "Couldn't read file data into buffer: \"{}\"\n{e}",
462          file_path.to_string_lossy()
463        )
464      })?;
465
466      // Hash the current block
467      hasher.update(buf);
468      let hash = hasher.finalize_reset();
469
470      // Get the expected hash from the signature
471      let signature_hash = signature.block_hash_iter.next().ok_or_else(|| {
472        "Expected a block hash message in the signature, but EOF was encountered!".to_string()
473      })??;
474
475      // Compare the hashes
476      if *signature_hash.strong_hash != *hash {
477        return Err(format!(
478          "Hash mismatch!
479  Signature: {:X?}
480  In-disk: {:X?}",
481          signature_hash.strong_hash, hash,
482        ));
483      }
484
485      // If the file has been fully read, proceed to the next one
486      if block_end == container_file.size as usize {
487        break;
488      }
489
490      block_index += 1;
491    }
492  }
493
494  Ok(())
495}
496
497/// <https://docs.itch.zone/wharf/master/file-formats/patches.html>
498///
499/// The patch structure is:
500///
501/// - [`PATCH_MAGIC`]
502/// - [`pwr::PatchHeader`]
503/// - decompressed stream follows:
504///   - [`tlc::Container`]    (target container)
505///   - [`tlc::Container`]    (source container)
506///   - repeated sequence:
507///     - [`pwr::SyncHeader`]
508///     - Optional [`pwr::BsdiffHeader`] if the previous header type is [`pwr::sync_header::Type::Bsdiff`]
509///       - repeated sequence:
510///       - [`pwr::SyncOp`]
511///     - [`pwr::SyncOp`] (Type = HEY_YOU_DID_IT)  // end of file’s series
512pub fn read_patch(reader: &mut impl BufRead) -> Result<Patch<impl BufRead>, String> {
513  // Check the magic bytes
514  check_magic_bytes(reader, PATCH_MAGIC)?;
515
516  // Decode the patch header
517  let header = decode_protobuf::<pwr::PatchHeader>(reader)?;
518
519  // Decompress the remaining stream
520  let compression_algorithm = header
521    .compression
522    .ok_or("Missing compressing field in Patch Header!")?
523    .algorithm();
524
525  let mut decompressed = decompress_stream(reader, compression_algorithm)?;
526
527  // Decode the containers
528  let container_old = decode_protobuf::<tlc::Container>(&mut decompressed)?;
529  let container_new = decode_protobuf::<tlc::Container>(&mut decompressed)?;
530
531  // Decode the sync operations
532  let sync_op_iter = SyncEntryIter {
533    reader: decompressed,
534  };
535
536  Ok(Patch {
537    header,
538    container_old,
539    container_new,
540    sync_op_iter,
541  })
542}
543
544pub fn apply_patch(
545  _old_build_folder: &std::path::Path,
546  new_build_folder: &std::path::Path,
547  patch: &mut Patch<impl BufRead>,
548) -> Result<(), String> {
549  // Iterate over the folders in the new container and create them
550  for folder in &patch.container_new.dirs {
551    std::fs::create_dir_all(new_build_folder.join(&folder.path)).map_err(|e| {
552      format!(
553        "Couldn't create folder: \"{}\"\n{e}",
554        new_build_folder.join(&folder.path).to_string_lossy()
555      )
556    })?;
557  }
558
559  // Create a cache of open file descriptors for the old files
560  // The key is the file_index of the old file provided by the patch
561  // The value is the open file descriptor
562  let _old_files_cache: lru::LruCache<u64, std::fs::File> =
563    lru::LruCache::new(MAX_OPEN_FILES_PATCH);
564
565  while let Some(header) = patch.sync_op_iter.next_header() {
566    let header = header.map_err(|e| format!("Couldn't get next patch sync operation!\n{e}"))?;
567
568    match header {
569      SyncHeader::Rsync {
570        file_index: _,
571        op_iter: _,
572      } => {}
573
574      SyncHeader::Bsdiff {
575        file_index: _,
576        target_index: _,
577        op_iter: _,
578      } => {}
579    }
580  }
581
582  Ok(())
583}