use std::collections::{HashMap, VecDeque};
use crate::{Config, model::FileInfo};
use super::remote::ProcessedRemoteFileInfo;
pub(super) type Batch = VecDeque<ProcessedRemoteFileInfo>;
#[derive(Debug)]
pub(super) struct Compared {
pub needs_download_batches: VecDeque<Batch>,
pub needs_deletion: VecDeque<FileInfo>,
pub needs_download_bytes: u64,
pub needs_persistent_bytes: u64,
}
pub(super) fn build_compared(remote: Vec<ProcessedRemoteFileInfo>, local: Vec<FileInfo>, config: &Config) -> Compared {
let mut compare_map: HashMap<String, (Option<FileInfo>, Option<ProcessedRemoteFileInfo>)> = HashMap::new();
for l in local {
let _ = compare_map.entry(l.local_unix_path.clone()).or_insert((Some(l), None));
}
for r in remote {
let e = compare_map.entry(r.file_name.clone()).or_insert((None, None));
e.1 = Some(r);
}
let mut needs_download = Vec::new();
let mut needs_deletion = VecDeque::new();
let mut clean_bytes_total = 0;
for value in compare_map.into_values() {
match (value.0, value.1) {
(None, Some(remote)) => {
needs_download.push(remote);
},
(Some(local), None) => {
needs_deletion.push_back(local);
},
(Some(local), Some(remote)) => {
if local.crc32 == remote.crc32 {
clean_bytes_total += remote.compressed_size as u64;
} else {
needs_download.push(remote);
}
},
(None, None) => unreachable!(),
}
}
needs_download.sort_by_key(|e| e.start_offset);
let mut current_batch = VecDeque::new();
let mut needs_download_batches = VecDeque::new();
for rfi in needs_download.into_iter() {
let start = rfi.start_offset;
let end = rfi.end_offset_inclusive;
let file = &rfi.file_name;
let fits_in_current_batch = current_batch.back().is_none_or(|last: &ProcessedRemoteFileInfo| {
(rfi.start_offset as u64)
<= last
.end_offset_inclusive
.saturating_add(1)
.saturating_add(config.max_junk_bytes_before_next_batch)
&& last.end_offset_inclusive < (rfi.start_offset as u64)
});
tracing::trace!(?start, ?end, ?fits_in_current_batch, ?file, "evaluating");
if !fits_in_current_batch {
needs_download_batches.push_back(current_batch);
current_batch = VecDeque::with_capacity(1);
}
current_batch.push_back(rfi);
}
if !current_batch.is_empty() {
needs_download_batches.push_back(current_batch);
}
let needs_download_bytes = needs_download_batches
.iter()
.map(|batch| {
let Some(first) = batch.front() else { return 0 };
let Some(last) = batch.back() else { return 0 };
(last.end_offset_inclusive.saturating_add(1)) - first.start_offset as u64
})
.sum();
let needs_persistent_bytes = needs_download_batches
.iter()
.map(|batch| batch.iter().map(|f| f.uncompressed_size as u64).sum::<u64>())
.sum();
tracing::debug!(?clean_bytes_total, ?needs_persistent_bytes, "compare bytes size");
Compared {
needs_download_batches,
needs_deletion,
needs_download_bytes,
needs_persistent_bytes,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{model::FileInfo, proto::sync::remote::SupportedCompressionMethod};
#[test]
fn test_empty_inputs() {
let remote = Vec::<ProcessedRemoteFileInfo>::new();
let local = Vec::<FileInfo>::new();
let compared = build_compared(remote, local, &Config::default());
assert!(compared.needs_download_batches.is_empty());
assert!(compared.needs_deletion.is_empty());
assert_eq!(0, compared.needs_download_bytes);
assert_eq!(0, compared.needs_persistent_bytes);
}
#[test]
fn test_no_changes() {
let remote = vec![ProcessedRemoteFileInfo {
file_name: "file1.txt".to_string(),
crc32: 1234,
compressed_size: 10,
uncompressed_size: 15,
compression_method: SupportedCompressionMethod::Deflated,
start_offset: 100,
end_offset_inclusive: 109,
}];
let local = vec![FileInfo {
local_unix_path: "file1.txt".to_string(),
crc32: 1234,
}];
let compared = build_compared(remote, local, &Config::default());
assert!(compared.needs_download_batches.is_empty());
assert!(compared.needs_deletion.is_empty());
assert_eq!(0, compared.needs_download_bytes);
assert_eq!(0, compared.needs_persistent_bytes);
}
#[test]
fn test_different_crc32() {
let remote = vec![ProcessedRemoteFileInfo {
file_name: "file1.txt".to_string(),
crc32: 5678,
compressed_size: 10,
uncompressed_size: 15,
compression_method: SupportedCompressionMethod::Deflated,
start_offset: 100,
end_offset_inclusive: 109,
}];
let local = vec![FileInfo {
local_unix_path: "file1.txt".to_string(),
crc32: 1234,
}];
let compared = build_compared(remote, local, &Config::default());
assert_eq!(1, compared.needs_download_batches.len());
assert_eq!(10, compared.needs_download_bytes);
assert!(compared.needs_deletion.is_empty());
}
#[test]
fn test_missing_local_file() {
let remote = vec![ProcessedRemoteFileInfo {
file_name: "file2.txt".to_string(),
crc32: 9012,
compressed_size: 20,
uncompressed_size: 30,
compression_method: SupportedCompressionMethod::Deflated,
start_offset: 100,
end_offset_inclusive: 119,
}];
let local = Vec::<FileInfo>::new();
let compared = build_compared(remote, local, &Config::default());
assert_eq!(1, compared.needs_download_batches.len());
assert_eq!(20, compared.needs_download_bytes);
assert!(compared.needs_deletion.is_empty());
}
#[test]
fn test_local_deleted() {
let remote = Vec::<ProcessedRemoteFileInfo>::new();
let local = vec![FileInfo {
local_unix_path: "file3.txt".to_string(),
crc32: 9012,
}];
let compared = build_compared(remote, local, &Config::default());
assert!(compared.needs_download_batches.is_empty());
assert_eq!(1, compared.needs_deletion.len());
}
#[test]
fn test_multiple_files_same_batch() {
let remote = vec![
ProcessedRemoteFileInfo {
file_name: "fileA.txt".to_string(),
crc32: 111,
compressed_size: 5,
uncompressed_size: 7,
compression_method: SupportedCompressionMethod::Deflated,
start_offset: 0,
end_offset_inclusive: 4,
},
ProcessedRemoteFileInfo {
file_name: "fileB.txt".to_string(),
crc32: 222,
compressed_size: 10,
uncompressed_size: 14,
compression_method: SupportedCompressionMethod::Deflated,
start_offset: 1000,
end_offset_inclusive: 1009,
},
];
let local = vec![
FileInfo {
local_unix_path: "fileA.txt".to_string(),
crc32: 111,
},
FileInfo {
local_unix_path: "fileB_deleted.txt".to_string(),
crc32: 999, },
];
let compared = build_compared(remote, local, &Config::default());
assert_eq!(1, compared.needs_download_batches.len());
assert_eq!(1, compared.needs_deletion.len());
assert_eq!(10, compared.needs_download_bytes);
assert_eq!(14, compared.needs_persistent_bytes);
}
#[test]
fn test_batch_aggregation() {
let remote = vec![
ProcessedRemoteFileInfo {
file_name: "fileA.txt".to_string(),
crc32: 111,
compressed_size: 51,
uncompressed_size: 100,
compression_method: SupportedCompressionMethod::Deflated,
start_offset: 50,
end_offset_inclusive: 100,
},
ProcessedRemoteFileInfo {
file_name: "fileB.txt".to_string(),
crc32: 222,
compressed_size: 101,
uncompressed_size: 123,
compression_method: SupportedCompressionMethod::Deflated,
start_offset: 102, end_offset_inclusive: 202,
},
];
let compared = build_compared(remote.clone(), vec![], &Config {
max_junk_bytes_before_next_batch: 0,
..Default::default()
});
assert_eq!(2, compared.needs_download_batches.len());
let compared = build_compared(remote.clone(), vec![], &Config {
max_junk_bytes_before_next_batch: 1,
..Default::default()
});
assert_eq!(1, compared.needs_download_batches.len());
let compared = build_compared(remote.clone(), vec![], &Config {
max_junk_bytes_before_next_batch: 2,
..Default::default()
});
assert_eq!(1, compared.needs_download_batches.len());
let compared = build_compared(remote.clone(), vec![], &Config {
max_junk_bytes_before_next_batch: u64::MAX,
..Default::default()
});
assert_eq!(1, compared.needs_download_batches.len());
}
}