msy 0.4.6

Modern musl rsync alternative - Fast, parallel file synchronization
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
use super::{Adler32, BlockChecksum};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs::File;
use std::io::{self, Read};
use std::path::Path;

/// A single delta operation
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum DeltaOp {
	/// Copy block from existing file at given offset
	Copy { offset: u64, size: usize },
	/// Insert literal data
	Data(Vec<u8>),
}

/// Delta instructions for reconstructing a file
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Delta {
	pub ops: Vec<DeltaOp>,
	#[allow(dead_code)]
	pub source_size: u64,
	#[allow(dead_code)]
	pub block_size: usize,
}

impl Delta {
	/// Calculate compression ratio
	#[allow(dead_code)]
	pub fn compression_ratio(&self) -> f64 {
		let literal_bytes: usize = self
			.ops
			.iter()
			.filter_map(|op| match op {
				DeltaOp::Data(data) => Some(data.len()),
				_ => None,
			})
			.sum();

		let copy_bytes: usize = self
			.ops
			.iter()
			.filter_map(|op| match op {
				DeltaOp::Copy { size, .. } => Some(*size),
				_ => None,
			})
			.sum();

		let total_bytes = literal_bytes + copy_bytes;
		if total_bytes == 0 {
			return 1.0;
		}

		literal_bytes as f64 / total_bytes as f64
	}
}

/// Generate delta operations with streaming (memory-efficient)
///
/// This implements the rsync algorithm with constant memory usage:
/// 1. Build hash table of destination block checksums
/// 2. Read source in chunks (256KB at a time)
/// 3. Slide window through data using rolling hash
/// 4. Generate Copy ops for matches, Data ops for literals
///
/// Memory usage: ~512KB regardless of file size
pub fn generate_delta_streaming(source_path: &Path, dest_checksums: &[BlockChecksum], block_size: usize) -> io::Result<Delta> {
	const CHUNK_SIZE: usize = 256 * 1024; // 256KB chunks
	const MAX_LITERAL_SIZE: usize = 1024 * 1024; // Flush literals at 1MB to bound memory

	// Build hash map for O(1) lookup
	let mut checksum_map: HashMap<u32, Vec<&BlockChecksum>> = HashMap::new();
	for checksum in dest_checksums {
		checksum_map.entry(checksum.weak).or_default().push(checksum);
	}

	let mut source_file = File::open(source_path)?;
	let source_size = source_file.metadata()?.len();

	if source_size == 0 {
		return Ok(Delta { ops: vec![], source_size: 0, block_size });
	}

	let mut ops = Vec::new();
	let mut literal_buffer = Vec::new();

	// Sliding window buffer: large enough for rolling hash + read ahead
	let mut window = Vec::with_capacity(block_size + CHUNK_SIZE);
	let mut chunk_buf = vec![0u8; CHUNK_SIZE];

	// Read initial chunk
	let mut bytes_read = source_file.read(&mut chunk_buf)?;
	if bytes_read > 0 {
		window.extend_from_slice(&chunk_buf[..bytes_read]);
	}

	// Initialize rolling hash
	let mut rolling = Adler32::new(block_size);
	if window.len() >= block_size {
		rolling.update_block(&window[0..block_size]);
	}

	let mut window_pos = 0; // Position within window
	let mut _file_pos = 0u64; // Absolute position in file (for debugging)

	while window_pos < window.len() {
		let remaining = window.len() - window_pos;
		let mut found_match = false;

		// Try to match full blocks
		if remaining >= block_size {
			let weak = rolling.digest();

			if let Some(candidates) = checksum_map.get(&weak) {
				let block = &window[window_pos..window_pos + block_size];

				// Verify with strong hash
				let mut hasher = xxhash_rust::xxh3::Xxh3::new();
				hasher.update(block);
				let strong = hasher.digest();

				for checksum in candidates {
					if checksum.strong == strong {
						// Match found! Flush literals and add Copy
						if !literal_buffer.is_empty() {
							ops.push(DeltaOp::Data(std::mem::take(&mut literal_buffer)));
						}

						ops.push(DeltaOp::Copy { offset: checksum.offset, size: checksum.size });

						window_pos += block_size;
						_file_pos += block_size as u64;
						found_match = true;

						// Re-initialize rolling hash at new position
						if window_pos + block_size <= window.len() {
							rolling.update_block(&window[window_pos..window_pos + block_size]);
						}
						break;
					}
				}
			}
		} else if remaining > 0 {
			// Partial block at end
			let partial = &window[window_pos..];
			let weak = Adler32::hash(partial);

			if let Some(candidates) = checksum_map.get(&weak) {
				let mut hasher = xxhash_rust::xxh3::Xxh3::new();
				hasher.update(partial);
				let strong = hasher.digest();

				for checksum in candidates {
					if checksum.size == partial.len() && checksum.strong == strong {
						if !literal_buffer.is_empty() {
							ops.push(DeltaOp::Data(std::mem::take(&mut literal_buffer)));
						}

						ops.push(DeltaOp::Copy { offset: checksum.offset, size: checksum.size });

						window_pos += partial.len();
						_file_pos += partial.len() as u64;
						found_match = true;
						break;
					}
				}
			}
		}

		if !found_match && window_pos < window.len() {
			// No match - add byte to literal buffer
			literal_buffer.push(window[window_pos]);

			// Periodic flush to bound memory usage for files with few matches
			if literal_buffer.len() >= MAX_LITERAL_SIZE {
				ops.push(DeltaOp::Data(std::mem::take(&mut literal_buffer)));
			}

			// Update rolling hash for next position
			if window_pos + block_size < window.len() {
				rolling.roll(window[window_pos], window[window_pos + block_size]);
			}

			window_pos += 1;
			_file_pos += 1;
		}

		// Refill window when needed
		if window_pos >= block_size && bytes_read > 0 && window.len() - window_pos < block_size {
			// Shift window: remove processed bytes
			window.drain(0..window_pos);
			window_pos = 0;

			// Read more data
			bytes_read = source_file.read(&mut chunk_buf)?;
			if bytes_read > 0 {
				window.extend_from_slice(&chunk_buf[..bytes_read]);

				// Re-initialize rolling hash if we have enough data
				if window.len() >= block_size {
					rolling.update_block(&window[0..block_size]);
				}
			}
		}
	}

	// Flush remaining literals
	if !literal_buffer.is_empty() {
		ops.push(DeltaOp::Data(literal_buffer));
	}

	Ok(Delta { ops, source_size, block_size })
}

/// Generate delta operations by comparing source file against destination checksums
/// (legacy non-streaming version - loads entire file into memory)
///
/// This implements the rsync algorithm:
/// 1. Build hash table of destination block checksums
/// 2. Slide window through source file using rolling hash
/// 3. When weak hash matches, verify with strong hash
/// 4. Generate Copy ops for matches, Data ops for literals
///
/// Note: This loads entire source file into memory. For large files, use
/// `generate_delta_streaming` instead.
#[allow(dead_code)]
pub fn generate_delta(source_path: &Path, dest_checksums: &[BlockChecksum], block_size: usize) -> io::Result<Delta> {
	// Build hash map for O(1) lookup of weak checksums
	let mut checksum_map: HashMap<u32, Vec<&BlockChecksum>> = HashMap::new();
	for checksum in dest_checksums {
		checksum_map.entry(checksum.weak).or_default().push(checksum);
	}

	// Read source file
	let mut source_file = File::open(source_path)?;
	let mut source_data = Vec::new();
	source_file.read_to_end(&mut source_data)?;
	let source_size = source_data.len() as u64;

	if source_data.is_empty() {
		return Ok(Delta { ops: vec![], source_size: 0, block_size });
	}

	let mut ops = Vec::new();
	let mut literal_buffer = Vec::new();
	let mut pos = 0;

	// Initialize rolling hash with first block
	let mut rolling = Adler32::new(block_size);
	if source_data.len() >= block_size {
		rolling.update_block(&source_data[0..block_size]);
	}

	while pos < source_data.len() {
		let mut found_match = false;
		let remaining = source_data.len() - pos;

		// Try to match full blocks first
		if remaining >= block_size {
			let weak = rolling.digest();

			// Check if weak hash matches any destination blocks
			if let Some(candidates) = checksum_map.get(&weak) {
				let block = &source_data[pos..pos + block_size];

				// Verify with strong hash (xxHash3)
				let mut hasher = xxhash_rust::xxh3::Xxh3::new();
				hasher.update(block);
				let strong = hasher.digest();

				// Find exact match
				for checksum in candidates {
					if checksum.strong == strong {
						// Found a match!
						// First, flush any accumulated literal data
						if !literal_buffer.is_empty() {
							ops.push(DeltaOp::Data(literal_buffer.clone()));
							literal_buffer.clear();
						}

						// Add copy operation
						ops.push(DeltaOp::Copy { offset: checksum.offset, size: checksum.size });

						pos += block_size;
						found_match = true;

						// Update rolling hash for next position (if there is one)
						if pos + block_size <= source_data.len() {
							rolling.update_block(&source_data[pos..pos + block_size]);
						}
						break;
					}
				}
			}
		} else {
			// Partial block at end - try to match against partial blocks in dest
			let partial_block = &source_data[pos..];
			let weak = Adler32::hash(partial_block);

			if let Some(candidates) = checksum_map.get(&weak) {
				let mut hasher = xxhash_rust::xxh3::Xxh3::new();
				hasher.update(partial_block);
				let strong = hasher.digest();

				for checksum in candidates {
					if checksum.size == partial_block.len() && checksum.strong == strong {
						// Found matching partial block!
						if !literal_buffer.is_empty() {
							ops.push(DeltaOp::Data(literal_buffer.clone()));
							literal_buffer.clear();
						}

						ops.push(DeltaOp::Copy { offset: checksum.offset, size: checksum.size });

						pos += partial_block.len();
						found_match = true;
						break;
					}
				}
			}
		}

		if !found_match {
			// No match found - add byte to literal buffer
			literal_buffer.push(source_data[pos]);
			pos += 1;

			// Update rolling hash
			if pos > 0 && pos + block_size - 1 < source_data.len() {
				let old_byte = source_data[pos - 1];
				let new_byte = source_data[pos + block_size - 1];
				rolling.roll(old_byte, new_byte);
			}
		}
	}

	// Flush remaining literal data
	if !literal_buffer.is_empty() {
		ops.push(DeltaOp::Data(literal_buffer));
	}

	Ok(Delta { ops, source_size, block_size })
}

#[cfg(test)]
mod tests {
	use super::*;
	use crate::delta::compute_checksums;
	use std::io::Write;
	use tempfile::NamedTempFile;

	#[test]
	fn test_delta_identical_files() {
		// Create two identical files
		let mut source = NamedTempFile::new().unwrap();
		let mut dest = NamedTempFile::new().unwrap();
		let data = b"Hello, World! This is a test.";
		source.write_all(data).unwrap();
		dest.write_all(data).unwrap();
		source.flush().unwrap();
		dest.flush().unwrap();

		// Compute checksums and delta
		let checksums = compute_checksums(dest.path(), 8).unwrap();
		let delta = generate_delta(source.path(), &checksums, 8).unwrap();

		// Should have only Copy operations
		assert!(delta.ops.iter().all(|op| matches!(op, DeltaOp::Copy { .. })));

		// Compression ratio should be 0 (no literal data)
		assert_eq!(delta.compression_ratio(), 0.0);
	}

	#[test]
	fn test_delta_completely_different() {
		// Create two completely different files
		let mut source = NamedTempFile::new().unwrap();
		let mut dest = NamedTempFile::new().unwrap();
		source.write_all(b"AAAAAAAA").unwrap();
		dest.write_all(b"BBBBBBBB").unwrap();
		source.flush().unwrap();
		dest.flush().unwrap();

		// Compute checksums and delta
		let checksums = compute_checksums(dest.path(), 4).unwrap();
		let delta = generate_delta(source.path(), &checksums, 4).unwrap();

		// Should have only Data operations
		assert!(delta.ops.iter().all(|op| matches!(op, DeltaOp::Data(_))));

		// Compression ratio should be 1.0 (all literal data)
		assert_eq!(delta.compression_ratio(), 1.0);
	}

	#[test]
	fn test_delta_partial_match() {
		// Source: "AAAABBBBCCCC" (12 bytes)
		// Dest:   "AAAA____CCCC" (12 bytes, middle is different)
		let mut source = NamedTempFile::new().unwrap();
		let mut dest = NamedTempFile::new().unwrap();
		source.write_all(b"AAAABBBBCCCC").unwrap();
		dest.write_all(b"AAAADDDDCCCC").unwrap();
		source.flush().unwrap();
		dest.flush().unwrap();

		// Compute checksums and delta
		let block_size = 4;
		let checksums = compute_checksums(dest.path(), block_size).unwrap();
		let delta = generate_delta(source.path(), &checksums, block_size).unwrap();

		// Should have mix of Copy and Data
		let has_copy = delta.ops.iter().any(|op| matches!(op, DeltaOp::Copy { .. }));
		let has_data = delta.ops.iter().any(|op| matches!(op, DeltaOp::Data(_)));
		assert!(has_copy && has_data);

		// Compression ratio should be between 0 and 1
		let ratio = delta.compression_ratio();
		assert!(ratio > 0.0 && ratio < 1.0);
	}

	#[test]
	fn test_delta_empty_source() {
		let source = NamedTempFile::new().unwrap();
		let mut dest = NamedTempFile::new().unwrap();
		dest.write_all(b"some data").unwrap();
		dest.flush().unwrap();

		let checksums = compute_checksums(dest.path(), 4).unwrap();
		let delta = generate_delta(source.path(), &checksums, 4).unwrap();

		assert_eq!(delta.ops.len(), 0);
		assert_eq!(delta.source_size, 0);
	}

	#[test]
	fn test_delta_empty_dest() {
		let mut source = NamedTempFile::new().unwrap();
		source.write_all(b"some data").unwrap();
		source.flush().unwrap();

		let delta = generate_delta(source.path(), &[], 4).unwrap();

		// Should be all literal data
		assert_eq!(delta.ops.len(), 1);
		assert!(matches!(delta.ops[0], DeltaOp::Data(_)));
		assert_eq!(delta.compression_ratio(), 1.0);
	}

	// Tests for streaming version
	#[test]
	fn test_streaming_identical_files() {
		let mut source = NamedTempFile::new().unwrap();
		let mut dest = NamedTempFile::new().unwrap();
		let data = b"Hello, World! This is a test.";
		source.write_all(data).unwrap();
		dest.write_all(data).unwrap();
		source.flush().unwrap();
		dest.flush().unwrap();

		let checksums = compute_checksums(dest.path(), 8).unwrap();
		let delta = generate_delta_streaming(source.path(), &checksums, 8).unwrap();

		// Should have only Copy operations
		assert!(delta.ops.iter().all(|op| matches!(op, DeltaOp::Copy { .. })));
		assert_eq!(delta.compression_ratio(), 0.0);
	}

	#[test]
	fn test_streaming_large_file() {
		// Test with file larger than CHUNK_SIZE (128KB)
		let mut source = NamedTempFile::new().unwrap();
		let mut dest = NamedTempFile::new().unwrap();

		// Create 256KB of data
		let data = vec![0xAB; 256 * 1024];
		source.write_all(&data).unwrap();
		dest.write_all(&data).unwrap();
		source.flush().unwrap();
		dest.flush().unwrap();

		let checksums = compute_checksums(dest.path(), 4096).unwrap();
		let delta = generate_delta_streaming(source.path(), &checksums, 4096).unwrap();

		// Should be all Copy operations
		assert!(delta.ops.iter().all(|op| matches!(op, DeltaOp::Copy { .. })));
		assert_eq!(delta.source_size, 256 * 1024);
	}

	#[test]
	fn test_streaming_vs_nonstreaming_identical() {
		// Verify streaming produces same result as non-streaming
		let mut source = NamedTempFile::new().unwrap();
		let mut dest = NamedTempFile::new().unwrap();

		// Create test data with mix of matches and mismatches
		let source_data = b"AAAABBBBCCCCDDDDEEEEFFFFGGGGHHHHIIIIJJJJ";
		let dest_data = b"AAAABBBBXXXXDDDDEEEEYYYYGGGGHHHHZZZZJJJJ";
		source.write_all(source_data).unwrap();
		dest.write_all(dest_data).unwrap();
		source.flush().unwrap();
		dest.flush().unwrap();

		let block_size = 4;
		let checksums = compute_checksums(dest.path(), block_size).unwrap();

		let delta1 = generate_delta(source.path(), &checksums, block_size).unwrap();
		let delta2 = generate_delta_streaming(source.path(), &checksums, block_size).unwrap();

		// Both should produce same operations
		assert_eq!(delta1.ops.len(), delta2.ops.len());
		assert_eq!(delta1.source_size, delta2.source_size);
		assert_eq!(delta1.ops, delta2.ops);
	}

	#[test]
	fn test_streaming_window_refill() {
		// Test that window refilling works correctly
		// Create file larger than 2*CHUNK_SIZE to force multiple refills
		let mut source = NamedTempFile::new().unwrap();
		let mut dest = NamedTempFile::new().unwrap();

		// Create 512KB file (4 * 128KB chunks)
		let mut data = Vec::with_capacity(512 * 1024);
		for i in 0..512 {
			data.extend_from_slice(&[(i % 256) as u8; 1024]);
		}
		source.write_all(&data).unwrap();
		dest.write_all(&data).unwrap();
		source.flush().unwrap();
		dest.flush().unwrap();

		let block_size = 8192;
		let checksums = compute_checksums(dest.path(), block_size).unwrap();
		let delta = generate_delta_streaming(source.path(), &checksums, block_size).unwrap();

		// Should be all Copy operations
		assert!(delta.ops.iter().all(|op| matches!(op, DeltaOp::Copy { .. })));
		assert_eq!(delta.source_size, 512 * 1024);
	}

	#[test]
	fn test_streaming_empty_file() {
		let source = NamedTempFile::new().unwrap();
		let mut dest = NamedTempFile::new().unwrap();
		dest.write_all(b"some data").unwrap();
		dest.flush().unwrap();

		let checksums = compute_checksums(dest.path(), 4).unwrap();
		let delta = generate_delta_streaming(source.path(), &checksums, 4).unwrap();

		assert_eq!(delta.ops.len(), 0);
		assert_eq!(delta.source_size, 0);
	}
}