1use crate::{archive::CompressionFormat, repository::DeletionProgressCallback, varint};
2use blake2::{Blake2b, Digest, digest::consts::U32};
3use dashmap::DashMap;
4use flate2::{
5    read::{DeflateDecoder, GzDecoder},
6    write::{DeflateEncoder, GzEncoder},
7};
8use std::{
9    collections::VecDeque,
10    fs::File,
11    io::{Cursor, Read, Seek, SeekFrom, Write},
12    path::PathBuf,
13    sync::{Arc, Mutex, RwLock, atomic::AtomicU64},
14};
15
16mod hasher;
17pub mod lock;
18pub mod reader;
19pub mod storage;
20
21pub type ChunkHash = [u8; 32];
22
23pub struct ChunkIndex {
24    pub directory: PathBuf,
25    pub storage: Arc<dyn storage::ChunkStorage>,
26
27    pub lock: Arc<lock::RwLock>,
28
29    next_id: Arc<AtomicU64>,
30    deleted_chunks: Arc<Mutex<VecDeque<u64>>>,
31    chunks: Arc<DashMap<u64, (ChunkHash, u64), hasher::RandomizingHasherBuilder>>,
32    chunk_hashes: Arc<DashMap<ChunkHash, u64, hasher::RandomizingHasherBuilder>>,
33
34    chunk_size: usize,
35    max_chunk_count: usize,
36}
37
38impl Clone for ChunkIndex {
39    fn clone(&self) -> Self {
40        ChunkIndex {
41            directory: self.directory.clone(),
42            storage: Arc::clone(&self.storage),
43
44            lock: Arc::clone(&self.lock),
45
46            next_id: Arc::clone(&self.next_id),
47            deleted_chunks: Arc::clone(&self.deleted_chunks),
48            chunks: Arc::clone(&self.chunks),
49            chunk_hashes: Arc::clone(&self.chunk_hashes),
50
51            chunk_size: self.chunk_size,
52            max_chunk_count: self.max_chunk_count,
53        }
54    }
55}
56
57impl ChunkIndex {
58    pub fn new(
59        directory: PathBuf,
60        chunk_size: usize,
61        max_chunk_count: usize,
62        storage: Arc<dyn storage::ChunkStorage>,
63    ) -> Self {
64        let lock = lock::RwLock::new(directory.join("index.lock").to_str().unwrap()).unwrap();
65
66        Self {
67            directory,
68            storage,
69
70            lock: Arc::new(lock),
71
72            next_id: Arc::new(AtomicU64::new(1)),
73            deleted_chunks: Arc::new(Mutex::new(VecDeque::new())),
74            chunks: Arc::new(DashMap::with_capacity_and_hasher_and_shard_amount(
75                10_000,
76                hasher::RandomizingHasherBuilder,
77                1024,
78            )),
79            chunk_hashes: Arc::new(DashMap::with_capacity_and_hasher_and_shard_amount(
80                10_000,
81                hasher::RandomizingHasherBuilder,
82                1024,
83            )),
84
85            chunk_size,
86            max_chunk_count,
87        }
88    }
89
90    pub fn open(
91        directory: PathBuf,
92        storage: Arc<dyn storage::ChunkStorage>,
93    ) -> std::io::Result<Self> {
94        let file = File::open(directory.join("index"))?;
95        let mut decoder = DeflateDecoder::new(file);
96
97        let mut buffer = [0; 32];
98        decoder.read_exact(&mut buffer)?;
99
100        let deleted_chunks = u64::from_le_bytes(buffer[0..8].try_into().unwrap()) as usize;
101        let chunk_size = u32::from_le_bytes(buffer[8..12].try_into().unwrap()) as usize;
102        let max_chunk_count = u32::from_le_bytes(buffer[12..16].try_into().unwrap()) as usize;
103        let chunk_count = u64::from_le_bytes(buffer[16..24].try_into().unwrap()) as usize;
104        let next_id = u64::from_le_bytes(buffer[24..32].try_into().unwrap());
105
106        let mut result_deleted_chunks = VecDeque::with_capacity(deleted_chunks);
107        let result_chunks = DashMap::with_capacity_and_hasher_and_shard_amount(
108            chunk_count,
109            hasher::RandomizingHasherBuilder,
110            1024,
111        );
112        let result_chunk_hashes = DashMap::with_capacity_and_hasher_and_shard_amount(
113            chunk_count,
114            hasher::RandomizingHasherBuilder,
115            1024,
116        );
117
118        for _ in 0..deleted_chunks {
119            let id = varint::decode_u64(&mut decoder);
120            result_deleted_chunks.push_back(id);
121        }
122
123        loop {
124            let mut buffer = [0; 32];
125            if decoder.read_exact(&mut buffer).is_err() {
126                break;
127            }
128
129            let id = varint::decode_u64(&mut decoder);
130            let count = varint::decode_u64(&mut decoder);
131
132            result_chunks.insert(id, (buffer, count));
133            result_chunk_hashes.insert(buffer, id);
134        }
135
136        let lock = lock::RwLock::new(directory.join("index.lock").to_str().unwrap())?;
137
138        Ok(Self {
139            directory,
140            storage,
141
142            lock: Arc::new(lock),
143
144            next_id: Arc::new(AtomicU64::new(next_id)),
145            deleted_chunks: Arc::new(Mutex::new(result_deleted_chunks)),
146            chunks: Arc::new(result_chunks),
147            chunk_hashes: Arc::new(result_chunk_hashes),
148
149            chunk_size,
150            max_chunk_count,
151        })
152    }
153
154    pub fn save(&self) -> std::io::Result<()> {
155        let file = File::create(self.directory.join("index"))?;
156        let mut encoder = DeflateEncoder::new(file, flate2::Compression::default());
157
158        let deleted_chunks = self.deleted_chunks.lock().unwrap();
159
160        encoder.write_all(&(deleted_chunks.len() as u64).to_le_bytes())?;
161        encoder.write_all(&(self.chunk_size as u32).to_le_bytes())?;
162        encoder.write_all(&(self.max_chunk_count as u32).to_le_bytes())?;
163        encoder.write_all(&(self.chunks.len() as u64).to_le_bytes())?;
164        encoder.write_all(
165            &self
166                .next_id
167                .load(std::sync::atomic::Ordering::Relaxed)
168                .to_le_bytes(),
169        )?;
170
171        for id in deleted_chunks.iter() {
172            encoder.write_all(&varint::encode_u64(*id))?;
173        }
174
175        for entry in self.chunks.iter() {
176            let (id, (chunk, count)) = entry.pair();
177
178            encoder.write_all(chunk)?;
179            encoder.write_all(&varint::encode_u64(*id))?;
180            encoder.write_all(&varint::encode_u64(*count))?;
181        }
182
183        encoder.finish()?;
184
185        Ok(())
186    }
187
188    #[inline]
189    pub fn references(&self, chunk: &ChunkHash) -> u64 {
190        if let Some(id) = self.chunk_hashes.get(chunk) {
191            let id = *id.value();
192
193            if let Some(entry) = self.chunks.get(&id) {
194                let (_, count) = entry.value();
195                return *count;
196            }
197        }
198
199        0
200    }
201
202    pub fn clean(&self, progress: DeletionProgressCallback) -> std::io::Result<()> {
203        let chunks_to_delete: Vec<_> = self
204            .chunks
205            .iter()
206            .filter_map(|entry| {
207                let (id, (chunk, count)) = (entry.key(), entry.value());
208                if *count == 0 {
209                    Some((*id, *chunk))
210                } else {
211                    None
212                }
213            })
214            .collect();
215
216        for (id, chunk) in chunks_to_delete {
217            if let Some(f) = progress.clone() {
218                f(id, true);
219            }
220
221            self.storage.delete_chunk_content(&chunk)?;
222
223            self.chunk_hashes.remove(&chunk);
224            self.chunks.remove(&id);
225
226            self.deleted_chunks.lock().unwrap().push_back(id);
227        }
228
229        Ok(())
230    }
231
232    #[inline]
233    pub fn dereference_chunk_id(&self, chunk_id: u64, clean: bool) -> Option<bool> {
234        let mut entry = self.chunks.get_mut(&chunk_id)?;
235        let (chunk, count) = entry.value_mut();
236        let chunk = *chunk;
237
238        *count -= 1;
239
240        if *count == 0 && clean {
241            drop(entry);
242
243            self.chunks.remove(&chunk_id);
244            self.chunk_hashes.remove(&chunk);
245
246            self.storage.delete_chunk_content(&chunk).ok()?;
247            self.deleted_chunks.lock().unwrap().push_back(chunk_id);
248
249            return Some(true);
250        }
251
252        Some(false)
253    }
254
255    #[inline]
256    pub fn read_chunk_id_content(&self, chunk_id: u64) -> std::io::Result<Box<dyn Read + Send>> {
257        let entry = self.chunks.get(&chunk_id).ok_or_else(|| {
258            std::io::Error::new(
259                std::io::ErrorKind::NotFound,
260                format!("Chunk ID {chunk_id} not found"),
261            )
262        })?;
263
264        let (chunk, _) = entry.value();
265        let chunk = *chunk;
266        drop(entry);
267
268        let mut reader = self.storage.read_chunk_content(&chunk)?;
269
270        let mut compression_bytes = [0; 1];
271        reader.read_exact(&mut compression_bytes)?;
272        let compression = CompressionFormat::decode(compression_bytes[0]);
273
274        match compression {
275            CompressionFormat::None => Ok(reader),
276            CompressionFormat::Gzip => Ok(Box::new(GzDecoder::new(reader))),
277            CompressionFormat::Deflate => Ok(Box::new(DeflateDecoder::new(reader))),
278
279            #[cfg(feature = "brotli")]
280            CompressionFormat::Brotli => Ok(Box::new(brotli::Decompressor::new(reader, 4096))),
281            #[cfg(not(feature = "brotli"))]
282            CompressionFormat::Brotli => Err(std::io::Error::new(
283                std::io::ErrorKind::Unsupported,
284                "Brotli support is not enabled. Please enable the 'brotli' feature.",
285            )),
286        }
287    }
288
289    #[inline]
290    pub fn get_chunk_id(&self, chunk: &ChunkHash) -> Option<u64> {
291        self.chunk_hashes.get(chunk).map(|v| *v)
292    }
293
294    #[inline]
295    fn next_id(&self) -> u64 {
296        if let Some(id) = self.deleted_chunks.lock().unwrap().pop_front() {
297            return id;
298        }
299
300        self.next_id
301            .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
302    }
303
304    fn add_chunk(
305        &self,
306        chunk: &ChunkHash,
307        data: &[u8],
308        compression: CompressionFormat,
309    ) -> std::io::Result<u64> {
310        let id = self.chunk_hashes.get(chunk).map(|v| *v);
311        let id = match id {
312            Some(id) => id,
313            None => {
314                let id = self.next_id();
315                self.chunk_hashes.insert(*chunk, id);
316
317                id
318            }
319        };
320
321        let has_references = if let Some(entry) = self.chunks.get(&id) {
322            let (_, count) = entry.value();
323            *count > 0
324        } else {
325            false
326        };
327
328        if has_references {
329            return Ok(id);
330        }
331
332        let mut final_data = vec![compression.encode()];
333
334        match compression {
335            CompressionFormat::None => final_data.extend_from_slice(data),
336            CompressionFormat::Gzip => {
337                let mut encoder = GzEncoder::new(&mut final_data, flate2::Compression::default());
338                encoder.write_all(data)?;
339                encoder.flush()?;
340            }
341            CompressionFormat::Deflate => {
342                let mut encoder =
343                    DeflateEncoder::new(&mut final_data, flate2::Compression::default());
344                encoder.write_all(data)?;
345                encoder.flush()?;
346            }
347            #[cfg(feature = "brotli")]
348            CompressionFormat::Brotli => {
349                let mut encoder = brotli::CompressorWriter::new(&mut final_data, 4096, 11, 22);
350                encoder.write_all(data)?;
351                encoder.flush()?;
352            }
353            #[cfg(not(feature = "brotli"))]
354            CompressionFormat::Brotli => {
355                return Err(std::io::Error::new(
356                    std::io::ErrorKind::Unsupported,
357                    "Brotli support is not enabled. Please enable the 'brotli' feature.",
358                ));
359            }
360        }
361
362        self.storage
363            .write_chunk_content(chunk, Box::new(Cursor::new(final_data)))?;
364
365        Ok(id)
366    }
367
368    pub fn chunk_file(
369        &self,
370        path: &PathBuf,
371        compression: CompressionFormat,
372        scope: Option<&rayon::Scope<'_>>,
373    ) -> std::io::Result<Vec<u64>> {
374        let file = File::open(path)?;
375        let len = file.metadata()?.len() as usize;
376
377        let mut chunk_count = len / self.chunk_size;
378        let mut chunk_size = self.chunk_size;
379        let mut chunk_threshold = 50;
380        if self.max_chunk_count > 0 {
381            while chunk_count > self.max_chunk_count {
382                chunk_count /= 2;
383                chunk_size *= 2;
384            }
385
386            chunk_threshold = self.max_chunk_count / 2;
387        }
388
389        if chunk_count > chunk_threshold && scope.is_some() {
390            let threads = rayon::current_num_threads();
391
392            if let Some(scope) = scope {
393                let path = path.clone();
394                let self_clone = self.clone();
395
396                let (sender, receiver) = std::sync::mpsc::channel();
397
398                scope.spawn(move |_| {
399                    match self_clone.chunk_file_parallel(
400                        &path,
401                        compression,
402                        chunk_size,
403                        chunk_count,
404                        threads,
405                    ) {
406                        Ok(chunk_ids) => {
407                            let _ = sender.send(Ok(chunk_ids));
408                        }
409                        Err(e) => {
410                            let _ = sender.send(Err(e));
411                        }
412                    }
413                });
414
415                match receiver.recv() {
416                    Ok(result) => result,
417                    Err(_) => Err(std::io::Error::other(
418                        "Failed to receive result from parallel chunking task",
419                    )),
420                }
421            } else {
422                self.chunk_file_parallel(path, compression, chunk_size, chunk_count, threads)
423            }
424        } else {
425            let mut file = File::open(path)?;
426            let mut chunks = Vec::with_capacity(chunk_count);
427            let mut chunk_ids = Vec::with_capacity(chunk_count);
428            let mut buffer = vec![0; chunk_size];
429            let mut hasher = Blake2b::<U32>::new();
430
431            loop {
432                let bytes_read = file.read(&mut buffer)?;
433                if bytes_read == 0 {
434                    break;
435                }
436
437                hasher.update(&buffer[..bytes_read]);
438                let hash = hasher.finalize_reset();
439                let mut hash_array = [0; 32];
440                hash_array.copy_from_slice(&hash);
441
442                chunk_ids.push(self.add_chunk(&hash_array, &buffer[..bytes_read], compression)?);
443                chunks.push(hash_array);
444            }
445
446            for (i, chunk_id) in chunk_ids.iter().enumerate() {
447                let mut entry = self
448                    .chunks
449                    .entry(*chunk_id)
450                    .or_insert_with(|| (chunks[i], 0));
451
452                entry.1 += 1;
453            }
454
455            Ok(chunk_ids)
456        }
457    }
458
459    fn chunk_file_parallel(
460        &self,
461        path: &PathBuf,
462        compression: CompressionFormat,
463        chunk_size: usize,
464        chunk_count: usize,
465        threads: usize,
466    ) -> std::io::Result<Vec<u64>> {
467        let file_size = std::fs::metadata(path)?.len() as usize;
468
469        let mut chunk_boundaries = VecDeque::with_capacity(chunk_count);
470        for i in 0..chunk_count {
471            let start = i * chunk_size;
472            let end = if i == chunk_count - 1 {
473                file_size
474            } else {
475                (i + 1) * chunk_size
476            };
477
478            if start < file_size {
479                chunk_boundaries.push_back((i, start, end.min(file_size)));
480            }
481        }
482
483        let expected_chunks = chunk_boundaries.len();
484
485        let pool_size = threads.min(expected_chunks);
486        let path = path.clone();
487
488        let chunk_queue = Arc::new(Mutex::new(chunk_boundaries));
489        let results = Arc::new(Mutex::new(Vec::with_capacity(expected_chunks)));
490        let error = Arc::new(RwLock::new(None));
491
492        let mut handles = Vec::with_capacity(pool_size);
493        for _ in 0..pool_size {
494            let chunk_queue = Arc::clone(&chunk_queue);
495            let results = Arc::clone(&results);
496            let error = Arc::clone(&error);
497            let path = path.clone();
498            let self_clone = self.clone();
499
500            let handle = std::thread::spawn(move || {
501                loop {
502                    let (idx, start, end) =
503                        if let Some(chunk) = chunk_queue.lock().unwrap().pop_front() {
504                            chunk
505                        } else {
506                            break;
507                        };
508
509                    if error.read().unwrap().is_some() {
510                        continue;
511                    }
512
513                    let result = (|| {
514                        let mut file = File::open(&path)?;
515                        file.seek(SeekFrom::Start(start as u64))?;
516
517                        let chunk_size = end - start;
518                        let mut buffer = vec![0; chunk_size];
519                        let bytes_read = file.read(&mut buffer[0..chunk_size])?;
520
521                        if bytes_read == 0 && start < file_size {
522                            return Err(std::io::Error::new(
523                                std::io::ErrorKind::UnexpectedEof,
524                                format!(
525                                    "Read 0 bytes at position {start} (expected up to {chunk_size})"
526                                ),
527                            ));
528                        }
529
530                        buffer.truncate(bytes_read);
531
532                        let mut hasher = Blake2b::<U32>::new();
533                        hasher.update(&buffer);
534                        let hash = hasher.finalize();
535
536                        let mut hash_array = [0; 32];
537                        hash_array.copy_from_slice(&hash);
538
539                        let chunk_id = self_clone.add_chunk(&hash_array, &buffer, compression)?;
540
541                        Ok((idx, chunk_id, hash_array))
542                    })();
543
544                    match result {
545                        Ok(data) => {
546                            results.lock().unwrap().push(data);
547                        }
548                        Err(e) => {
549                            *error.write().unwrap() = Some(e);
550                        }
551                    }
552                }
553            });
554
555            handles.push(handle);
556        }
557
558        for (i, handle) in handles.into_iter().enumerate() {
559            if let Err(e) = handle.join() {
560                return Err(std::io::Error::other(format!(
561                    "Worker thread {i} panicked: {e:?}"
562                )));
563            }
564        }
565
566        if let Some(err) = error.write().unwrap().take() {
567            return Err(err);
568        }
569
570        let mut results_lock = results.lock().unwrap();
571        if results_lock.len() != expected_chunks {
572            return Err(std::io::Error::other(format!(
573                "Missing chunks: got {} out of {}",
574                results_lock.len(),
575                expected_chunks
576            )));
577        }
578
579        results_lock.sort_by_key(|(idx, _, _)| *idx);
580
581        let mut chunk_ids = Vec::with_capacity(results_lock.len());
582        let mut chunks = Vec::with_capacity(results_lock.len());
583
584        for (_, chunk_id, hash) in results_lock.iter() {
585            chunk_ids.push(*chunk_id);
586            chunks.push(*hash);
587        }
588        drop(results_lock);
589
590        for (i, chunk_id) in chunk_ids.iter().enumerate() {
591            let mut entry = self
592                .chunks
593                .entry(*chunk_id)
594                .or_insert_with(|| (chunks[i], 0));
595
596            entry.1 += 1;
597        }
598
599        Ok(chunk_ids)
600    }
601}