1use crate::{archive::CompressionFormat, repository::DeletionProgressCallback, varint};
2use blake2::{Blake2b, Digest, digest::consts::U32};
3use dashmap::DashMap;
4use flate2::{
5 read::{DeflateDecoder, GzDecoder},
6 write::{DeflateEncoder, GzEncoder},
7};
8use std::{
9 collections::VecDeque,
10 fs::File,
11 io::{Cursor, Read, Seek, SeekFrom, Write},
12 path::PathBuf,
13 sync::{Arc, Mutex, RwLock, atomic::AtomicU64},
14};
15
16mod hasher;
17pub mod lock;
18pub mod reader;
19pub mod storage;
20
21pub type ChunkHash = [u8; 32];
22
23pub struct ChunkIndex {
24 pub directory: PathBuf,
25 pub storage: Arc<dyn storage::ChunkStorage>,
26
27 pub lock: Arc<lock::RwLock>,
28
29 next_id: Arc<AtomicU64>,
30 deleted_chunks: Arc<Mutex<VecDeque<u64>>>,
31 chunks: Arc<DashMap<u64, (ChunkHash, u64), hasher::RandomizingHasherBuilder>>,
32 chunk_hashes: Arc<DashMap<ChunkHash, u64, hasher::RandomizingHasherBuilder>>,
33
34 chunk_size: usize,
35 max_chunk_count: usize,
36}
37
38impl Clone for ChunkIndex {
39 fn clone(&self) -> Self {
40 ChunkIndex {
41 directory: self.directory.clone(),
42 storage: Arc::clone(&self.storage),
43
44 lock: Arc::clone(&self.lock),
45
46 next_id: Arc::clone(&self.next_id),
47 deleted_chunks: Arc::clone(&self.deleted_chunks),
48 chunks: Arc::clone(&self.chunks),
49 chunk_hashes: Arc::clone(&self.chunk_hashes),
50
51 chunk_size: self.chunk_size,
52 max_chunk_count: self.max_chunk_count,
53 }
54 }
55}
56
57impl ChunkIndex {
58 pub fn new(
59 directory: PathBuf,
60 chunk_size: usize,
61 max_chunk_count: usize,
62 storage: Arc<dyn storage::ChunkStorage>,
63 ) -> Self {
64 let lock = lock::RwLock::new(directory.join("index.lock").to_str().unwrap()).unwrap();
65
66 Self {
67 directory,
68 storage,
69
70 lock: Arc::new(lock),
71
72 next_id: Arc::new(AtomicU64::new(1)),
73 deleted_chunks: Arc::new(Mutex::new(VecDeque::new())),
74 chunks: Arc::new(DashMap::with_capacity_and_hasher_and_shard_amount(
75 10_000,
76 hasher::RandomizingHasherBuilder,
77 1024,
78 )),
79 chunk_hashes: Arc::new(DashMap::with_capacity_and_hasher_and_shard_amount(
80 10_000,
81 hasher::RandomizingHasherBuilder,
82 1024,
83 )),
84
85 chunk_size,
86 max_chunk_count,
87 }
88 }
89
90 pub fn open(
91 directory: PathBuf,
92 storage: Arc<dyn storage::ChunkStorage>,
93 ) -> std::io::Result<Self> {
94 let file = File::open(directory.join("index"))?;
95 let mut decoder = DeflateDecoder::new(file);
96
97 let mut buffer = [0; 32];
98 decoder.read_exact(&mut buffer)?;
99
100 let deleted_chunks = u64::from_le_bytes(buffer[0..8].try_into().unwrap()) as usize;
101 let chunk_size = u32::from_le_bytes(buffer[8..12].try_into().unwrap()) as usize;
102 let max_chunk_count = u32::from_le_bytes(buffer[12..16].try_into().unwrap()) as usize;
103 let chunk_count = u64::from_le_bytes(buffer[16..24].try_into().unwrap()) as usize;
104 let next_id = u64::from_le_bytes(buffer[24..32].try_into().unwrap());
105
106 let mut result_deleted_chunks = VecDeque::with_capacity(deleted_chunks);
107 let result_chunks = DashMap::with_capacity_and_hasher_and_shard_amount(
108 chunk_count,
109 hasher::RandomizingHasherBuilder,
110 1024,
111 );
112 let result_chunk_hashes = DashMap::with_capacity_and_hasher_and_shard_amount(
113 chunk_count,
114 hasher::RandomizingHasherBuilder,
115 1024,
116 );
117
118 for _ in 0..deleted_chunks {
119 let id = varint::decode_u64(&mut decoder);
120 result_deleted_chunks.push_back(id);
121 }
122
123 loop {
124 let mut buffer = [0; 32];
125 if decoder.read_exact(&mut buffer).is_err() {
126 break;
127 }
128
129 let id = varint::decode_u64(&mut decoder);
130 let count = varint::decode_u64(&mut decoder);
131
132 result_chunks.insert(id, (buffer, count));
133 result_chunk_hashes.insert(buffer, id);
134 }
135
136 let lock = lock::RwLock::new(directory.join("index.lock").to_str().unwrap())?;
137
138 Ok(Self {
139 directory,
140 storage,
141
142 lock: Arc::new(lock),
143
144 next_id: Arc::new(AtomicU64::new(next_id)),
145 deleted_chunks: Arc::new(Mutex::new(result_deleted_chunks)),
146 chunks: Arc::new(result_chunks),
147 chunk_hashes: Arc::new(result_chunk_hashes),
148
149 chunk_size,
150 max_chunk_count,
151 })
152 }
153
154 pub fn save(&self) -> std::io::Result<()> {
155 let file = File::create(self.directory.join("index"))?;
156 let mut encoder = DeflateEncoder::new(file, flate2::Compression::default());
157
158 let deleted_chunks = self.deleted_chunks.lock().unwrap();
159
160 encoder.write_all(&(deleted_chunks.len() as u64).to_le_bytes())?;
161 encoder.write_all(&(self.chunk_size as u32).to_le_bytes())?;
162 encoder.write_all(&(self.max_chunk_count as u32).to_le_bytes())?;
163 encoder.write_all(&(self.chunks.len() as u64).to_le_bytes())?;
164 encoder.write_all(
165 &self
166 .next_id
167 .load(std::sync::atomic::Ordering::Relaxed)
168 .to_le_bytes(),
169 )?;
170
171 for id in deleted_chunks.iter() {
172 encoder.write_all(&varint::encode_u64(*id))?;
173 }
174
175 for entry in self.chunks.iter() {
176 let (id, (chunk, count)) = entry.pair();
177
178 encoder.write_all(chunk)?;
179 encoder.write_all(&varint::encode_u64(*id))?;
180 encoder.write_all(&varint::encode_u64(*count))?;
181 }
182
183 encoder.finish()?;
184
185 Ok(())
186 }
187
188 #[inline]
189 pub fn references(&self, chunk: &ChunkHash) -> u64 {
190 if let Some(id) = self.chunk_hashes.get(chunk) {
191 let id = *id.value();
192
193 if let Some(entry) = self.chunks.get(&id) {
194 let (_, count) = entry.value();
195 return *count;
196 }
197 }
198
199 0
200 }
201
202 pub fn clean(&self, progress: DeletionProgressCallback) -> std::io::Result<()> {
203 let chunks_to_delete: Vec<_> = self
204 .chunks
205 .iter()
206 .filter_map(|entry| {
207 let (id, (chunk, count)) = (entry.key(), entry.value());
208 if *count == 0 {
209 Some((*id, *chunk))
210 } else {
211 None
212 }
213 })
214 .collect();
215
216 for (id, chunk) in chunks_to_delete {
217 if let Some(f) = progress.clone() {
218 f(id, true);
219 }
220
221 self.storage.delete_chunk_content(&chunk)?;
222
223 self.chunk_hashes.remove(&chunk);
224 self.chunks.remove(&id);
225
226 self.deleted_chunks.lock().unwrap().push_back(id);
227 }
228
229 Ok(())
230 }
231
232 #[inline]
233 pub fn dereference_chunk_id(&self, chunk_id: u64, clean: bool) -> Option<bool> {
234 let mut entry = self.chunks.get_mut(&chunk_id)?;
235 let (chunk, count) = entry.value_mut();
236 let chunk = *chunk;
237
238 *count -= 1;
239
240 if *count == 0 && clean {
241 drop(entry);
242
243 self.chunks.remove(&chunk_id);
244 self.chunk_hashes.remove(&chunk);
245
246 self.storage.delete_chunk_content(&chunk).ok()?;
247 self.deleted_chunks.lock().unwrap().push_back(chunk_id);
248
249 return Some(true);
250 }
251
252 Some(false)
253 }
254
255 #[inline]
256 pub fn read_chunk_id_content(&self, chunk_id: u64) -> std::io::Result<Box<dyn Read + Send>> {
257 let entry = self.chunks.get(&chunk_id).ok_or_else(|| {
258 std::io::Error::new(
259 std::io::ErrorKind::NotFound,
260 format!("Chunk ID {chunk_id} not found"),
261 )
262 })?;
263
264 let (chunk, _) = entry.value();
265 let chunk = *chunk;
266 drop(entry);
267
268 let mut reader = self.storage.read_chunk_content(&chunk)?;
269
270 let mut compression_bytes = [0; 1];
271 reader.read_exact(&mut compression_bytes)?;
272 let compression = CompressionFormat::decode(compression_bytes[0]);
273
274 match compression {
275 CompressionFormat::None => Ok(reader),
276 CompressionFormat::Gzip => Ok(Box::new(GzDecoder::new(reader))),
277 CompressionFormat::Deflate => Ok(Box::new(DeflateDecoder::new(reader))),
278
279 #[cfg(feature = "brotli")]
280 CompressionFormat::Brotli => Ok(Box::new(brotli::Decompressor::new(reader, 4096))),
281 #[cfg(not(feature = "brotli"))]
282 CompressionFormat::Brotli => Err(std::io::Error::new(
283 std::io::ErrorKind::Unsupported,
284 "Brotli support is not enabled. Please enable the 'brotli' feature.",
285 )),
286 }
287 }
288
289 #[inline]
290 pub fn get_chunk_id(&self, chunk: &ChunkHash) -> Option<u64> {
291 self.chunk_hashes.get(chunk).map(|v| *v)
292 }
293
294 #[inline]
295 fn next_id(&self) -> u64 {
296 if let Some(id) = self.deleted_chunks.lock().unwrap().pop_front() {
297 return id;
298 }
299
300 self.next_id
301 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
302 }
303
304 fn add_chunk(
305 &self,
306 chunk: &ChunkHash,
307 data: &[u8],
308 compression: CompressionFormat,
309 ) -> std::io::Result<u64> {
310 let id = self.chunk_hashes.get(chunk).map(|v| *v);
311 let id = match id {
312 Some(id) => id,
313 None => {
314 let id = self.next_id();
315 self.chunk_hashes.insert(*chunk, id);
316
317 id
318 }
319 };
320
321 let has_references = if let Some(entry) = self.chunks.get(&id) {
322 let (_, count) = entry.value();
323 *count > 0
324 } else {
325 false
326 };
327
328 if has_references {
329 return Ok(id);
330 }
331
332 let mut final_data = vec![compression.encode()];
333
334 match compression {
335 CompressionFormat::None => final_data.extend_from_slice(data),
336 CompressionFormat::Gzip => {
337 let mut encoder = GzEncoder::new(&mut final_data, flate2::Compression::default());
338 encoder.write_all(data)?;
339 encoder.flush()?;
340 }
341 CompressionFormat::Deflate => {
342 let mut encoder =
343 DeflateEncoder::new(&mut final_data, flate2::Compression::default());
344 encoder.write_all(data)?;
345 encoder.flush()?;
346 }
347 #[cfg(feature = "brotli")]
348 CompressionFormat::Brotli => {
349 let mut encoder = brotli::CompressorWriter::new(&mut final_data, 4096, 11, 22);
350 encoder.write_all(data)?;
351 encoder.flush()?;
352 }
353 #[cfg(not(feature = "brotli"))]
354 CompressionFormat::Brotli => {
355 return Err(std::io::Error::new(
356 std::io::ErrorKind::Unsupported,
357 "Brotli support is not enabled. Please enable the 'brotli' feature.",
358 ));
359 }
360 }
361
362 self.storage
363 .write_chunk_content(chunk, Box::new(Cursor::new(final_data)))?;
364
365 Ok(id)
366 }
367
368 pub fn chunk_file(
369 &self,
370 path: &PathBuf,
371 compression: CompressionFormat,
372 scope: Option<&rayon::Scope<'_>>,
373 ) -> std::io::Result<Vec<u64>> {
374 let file = File::open(path)?;
375 let len = file.metadata()?.len() as usize;
376
377 let mut chunk_count = len / self.chunk_size;
378 let mut chunk_size = self.chunk_size;
379 let mut chunk_threshold = 50;
380 if self.max_chunk_count > 0 {
381 while chunk_count > self.max_chunk_count {
382 chunk_count /= 2;
383 chunk_size *= 2;
384 }
385
386 chunk_threshold = self.max_chunk_count / 2;
387 }
388
389 if chunk_count > chunk_threshold && scope.is_some() {
390 let threads = rayon::current_num_threads();
391
392 if let Some(scope) = scope {
393 let path = path.clone();
394 let self_clone = self.clone();
395
396 let (sender, receiver) = std::sync::mpsc::channel();
397
398 scope.spawn(move |_| {
399 match self_clone.chunk_file_parallel(
400 &path,
401 compression,
402 chunk_size,
403 chunk_count,
404 threads,
405 ) {
406 Ok(chunk_ids) => {
407 let _ = sender.send(Ok(chunk_ids));
408 }
409 Err(e) => {
410 let _ = sender.send(Err(e));
411 }
412 }
413 });
414
415 match receiver.recv() {
416 Ok(result) => result,
417 Err(_) => Err(std::io::Error::other(
418 "Failed to receive result from parallel chunking task",
419 )),
420 }
421 } else {
422 self.chunk_file_parallel(path, compression, chunk_size, chunk_count, threads)
423 }
424 } else {
425 let mut file = File::open(path)?;
426 let mut chunks = Vec::with_capacity(chunk_count);
427 let mut chunk_ids = Vec::with_capacity(chunk_count);
428 let mut buffer = vec![0; chunk_size];
429 let mut hasher = Blake2b::<U32>::new();
430
431 loop {
432 let bytes_read = file.read(&mut buffer)?;
433 if bytes_read == 0 {
434 break;
435 }
436
437 hasher.update(&buffer[..bytes_read]);
438 let hash = hasher.finalize_reset();
439 let mut hash_array = [0; 32];
440 hash_array.copy_from_slice(&hash);
441
442 chunk_ids.push(self.add_chunk(&hash_array, &buffer[..bytes_read], compression)?);
443 chunks.push(hash_array);
444 }
445
446 for (i, chunk_id) in chunk_ids.iter().enumerate() {
447 let mut entry = self
448 .chunks
449 .entry(*chunk_id)
450 .or_insert_with(|| (chunks[i], 0));
451
452 entry.1 += 1;
453 }
454
455 Ok(chunk_ids)
456 }
457 }
458
459 fn chunk_file_parallel(
460 &self,
461 path: &PathBuf,
462 compression: CompressionFormat,
463 chunk_size: usize,
464 chunk_count: usize,
465 threads: usize,
466 ) -> std::io::Result<Vec<u64>> {
467 let file_size = std::fs::metadata(path)?.len() as usize;
468
469 let mut chunk_boundaries = VecDeque::with_capacity(chunk_count);
470 for i in 0..chunk_count {
471 let start = i * chunk_size;
472 let end = if i == chunk_count - 1 {
473 file_size
474 } else {
475 (i + 1) * chunk_size
476 };
477
478 if start < file_size {
479 chunk_boundaries.push_back((i, start, end.min(file_size)));
480 }
481 }
482
483 let expected_chunks = chunk_boundaries.len();
484
485 let pool_size = threads.min(expected_chunks);
486 let path = path.clone();
487
488 let chunk_queue = Arc::new(Mutex::new(chunk_boundaries));
489 let results = Arc::new(Mutex::new(Vec::with_capacity(expected_chunks)));
490 let error = Arc::new(RwLock::new(None));
491
492 let mut handles = Vec::with_capacity(pool_size);
493 for _ in 0..pool_size {
494 let chunk_queue = Arc::clone(&chunk_queue);
495 let results = Arc::clone(&results);
496 let error = Arc::clone(&error);
497 let path = path.clone();
498 let self_clone = self.clone();
499
500 let handle = std::thread::spawn(move || {
501 loop {
502 let (idx, start, end) =
503 if let Some(chunk) = chunk_queue.lock().unwrap().pop_front() {
504 chunk
505 } else {
506 break;
507 };
508
509 if error.read().unwrap().is_some() {
510 continue;
511 }
512
513 let result = (|| {
514 let mut file = File::open(&path)?;
515 file.seek(SeekFrom::Start(start as u64))?;
516
517 let chunk_size = end - start;
518 let mut buffer = vec![0; chunk_size];
519 let bytes_read = file.read(&mut buffer[0..chunk_size])?;
520
521 if bytes_read == 0 && start < file_size {
522 return Err(std::io::Error::new(
523 std::io::ErrorKind::UnexpectedEof,
524 format!(
525 "Read 0 bytes at position {start} (expected up to {chunk_size})"
526 ),
527 ));
528 }
529
530 buffer.truncate(bytes_read);
531
532 let mut hasher = Blake2b::<U32>::new();
533 hasher.update(&buffer);
534 let hash = hasher.finalize();
535
536 let mut hash_array = [0; 32];
537 hash_array.copy_from_slice(&hash);
538
539 let chunk_id = self_clone.add_chunk(&hash_array, &buffer, compression)?;
540
541 Ok((idx, chunk_id, hash_array))
542 })();
543
544 match result {
545 Ok(data) => {
546 results.lock().unwrap().push(data);
547 }
548 Err(e) => {
549 *error.write().unwrap() = Some(e);
550 }
551 }
552 }
553 });
554
555 handles.push(handle);
556 }
557
558 for (i, handle) in handles.into_iter().enumerate() {
559 if let Err(e) = handle.join() {
560 return Err(std::io::Error::other(format!(
561 "Worker thread {i} panicked: {e:?}"
562 )));
563 }
564 }
565
566 if let Some(err) = error.write().unwrap().take() {
567 return Err(err);
568 }
569
570 let mut results_lock = results.lock().unwrap();
571 if results_lock.len() != expected_chunks {
572 return Err(std::io::Error::other(format!(
573 "Missing chunks: got {} out of {}",
574 results_lock.len(),
575 expected_chunks
576 )));
577 }
578
579 results_lock.sort_by_key(|(idx, _, _)| *idx);
580
581 let mut chunk_ids = Vec::with_capacity(results_lock.len());
582 let mut chunks = Vec::with_capacity(results_lock.len());
583
584 for (_, chunk_id, hash) in results_lock.iter() {
585 chunk_ids.push(*chunk_id);
586 chunks.push(*hash);
587 }
588 drop(results_lock);
589
590 for (i, chunk_id) in chunk_ids.iter().enumerate() {
591 let mut entry = self
592 .chunks
593 .entry(*chunk_id)
594 .or_insert_with(|| (chunks[i], 0));
595
596 entry.1 += 1;
597 }
598
599 Ok(chunk_ids)
600 }
601}