use crate::algorithm::BlockChecksum;
use crate::checksum::{get_checksum1, strong_checksum, ChecksumType};
use anyhow::Result;
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
use std::path::Path;
const CHUNK_SIZE: usize = 16 * 1024 * 1024;
pub struct StreamingDelta {
block_size: usize,
checksum_type: ChecksumType,
}
impl StreamingDelta {
pub fn new(block_size: usize) -> Self {
Self {
block_size,
checksum_type: ChecksumType::default(),
}
}
pub fn generate_checksums_streaming(&self, path: &Path) -> Result<Vec<BlockChecksum>> {
let file = File::open(path)?;
let file_size = file.metadata()?.len();
let mut reader = BufReader::new(file);
let mut checksums = Vec::new();
let mut offset = 0u64;
let mut buffer = vec![0u8; self.block_size];
let mut last_progress = std::time::Instant::now();
while offset < file_size {
let bytes_to_read = std::cmp::min(self.block_size, (file_size - offset) as usize);
reader.read_exact(&mut buffer[..bytes_to_read])?;
if file_size > 100 * 1024 * 1024
&& last_progress.elapsed() >= std::time::Duration::from_secs(2)
{
eprint!(
"\r Checksum generation: {:.1}% ({}/{} MB)",
(offset as f64 / file_size as f64) * 100.0,
offset / (1024 * 1024),
file_size / (1024 * 1024)
);
last_progress = std::time::Instant::now();
}
let rolling_checksum = get_checksum1(&buffer[..bytes_to_read]);
let strong_checksum = strong_checksum(&buffer[..bytes_to_read], self.checksum_type)?;
let mut strong_hash = [0u8; 32];
let copy_len = std::cmp::min(strong_checksum.len(), 32);
strong_hash[..copy_len].copy_from_slice(&strong_checksum[..copy_len]);
checksums.push(BlockChecksum {
offset,
rolling_checksum,
strong_checksum: strong_hash,
length: bytes_to_read,
});
offset += bytes_to_read as u64;
}
if file_size > 100 * 1024 * 1024 {
eprint!("\r \r");
}
Ok(checksums)
}
pub fn apply_delta_streaming(
&self,
source_path: &Path,
dest_path: &Path,
temp_path: &Path,
checksums: &[BlockChecksum],
) -> Result<u64> {
let mut hash_table = HashMap::new();
for (i, checksum) in checksums.iter().enumerate() {
hash_table
.entry(checksum.rolling_checksum)
.or_insert_with(Vec::new)
.push(i);
}
let source_file = File::open(source_path)?;
let source_size = source_file.metadata()?.len();
let mut source_reader = BufReader::new(source_file);
let dest_file = File::open(dest_path)?;
let mut dest_reader = BufReader::new(dest_file);
let temp_file = File::create(temp_path)?;
let mut writer = BufWriter::new(temp_file);
let mut transferred_bytes = 0u64;
let mut source_offset = 0u64;
let mut lookahead_buffer = vec![0u8; CHUNK_SIZE];
let mut last_progress_report = std::time::Instant::now();
let report_interval = std::time::Duration::from_secs(5);
while source_offset < source_size {
let bytes_to_read = std::cmp::min(CHUNK_SIZE, (source_size - source_offset) as usize);
source_reader.read_exact(&mut lookahead_buffer[..bytes_to_read])?;
let (matches, non_match_len) =
self.find_all_matches(&lookahead_buffer[..bytes_to_read], &hash_table, checksums)?;
if !matches.is_empty() {
let mut last_match_end = 0;
for (start, end, block_indices) in matches {
if start > last_match_end {
writer.write_all(&lookahead_buffer[last_match_end..start])?;
transferred_bytes += (start - last_match_end) as u64;
}
for &block_index in &block_indices {
let block_checksum = &checksums[block_index];
dest_reader.seek(SeekFrom::Start(block_checksum.offset))?;
let mut block_buffer = vec![0u8; block_checksum.length];
dest_reader.read_exact(&mut block_buffer)?;
writer.write_all(&block_buffer)?;
}
last_match_end = end;
}
if last_match_end < bytes_to_read {
writer.write_all(&lookahead_buffer[last_match_end..bytes_to_read])?;
transferred_bytes += (bytes_to_read - last_match_end) as u64;
}
source_offset += bytes_to_read as u64;
if last_progress_report.elapsed() >= report_interval {
eprintln!(
" Delta progress: {:.1}% ({}/{} MB processed, {} MB transferred)",
(source_offset as f64 / source_size as f64) * 100.0,
source_offset / (1024 * 1024),
source_size / (1024 * 1024),
transferred_bytes / (1024 * 1024)
);
last_progress_report = std::time::Instant::now();
}
} else if non_match_len > 0 {
writer.write_all(&lookahead_buffer[..non_match_len])?;
transferred_bytes += non_match_len as u64;
source_offset += non_match_len as u64;
} else {
break;
}
}
writer.flush()?;
Ok(transferred_bytes)
}
fn find_all_matches(
&self,
buffer: &[u8],
hash_table: &HashMap<u32, Vec<usize>>,
checksums: &[BlockChecksum],
) -> Result<(Vec<(usize, usize, Vec<usize>)>, usize)> {
let mut matches = Vec::new();
let mut current_offset = 0;
while current_offset + self.block_size <= buffer.len() {
let window = &buffer[current_offset..current_offset + self.block_size];
let rolling_checksum = get_checksum1(window);
if let Some(indices) = hash_table.get(&rolling_checksum) {
let strong_hash = strong_checksum(window, self.checksum_type)?;
let mut found_match_in_block = false;
for &block_index in indices {
let block_checksum = &checksums[block_index];
if strong_hash[..32] == block_checksum.strong_checksum {
let mut consecutive_blocks = vec![block_index];
let mut next_offset = current_offset + self.block_size;
while next_offset + self.block_size <= buffer.len() {
let next_window = &buffer[next_offset..next_offset + self.block_size];
let next_rolling_checksum = get_checksum1(next_window);
if let Some(next_indices) = hash_table.get(&next_rolling_checksum) {
let next_strong_hash =
strong_checksum(next_window, self.checksum_type)?;
let mut found_consecutive = false;
for &next_block_index in next_indices {
let next_block_checksum = &checksums[next_block_index];
if next_strong_hash[..32] == next_block_checksum.strong_checksum
{
consecutive_blocks.push(next_block_index);
next_offset += self.block_size;
found_consecutive = true;
break;
}
}
if !found_consecutive {
break;
}
} else {
break;
}
}
let match_start = current_offset;
let match_end = next_offset;
matches.push((match_start, match_end, consecutive_blocks));
current_offset = match_end;
found_match_in_block = true;
break;
}
}
if !found_match_in_block {
current_offset += std::cmp::min(self.block_size / 4, 1024).max(1);
}
} else {
current_offset += std::cmp::min(self.block_size / 4, 1024).max(1);
}
}
let non_match_len = if matches.is_empty() { buffer.len() } else { 0 };
Ok((matches, non_match_len))
}
}