1use std::{
2 collections::{BTreeMap, VecDeque},
3 ffi::OsStr,
4 ops::Deref,
5 path::{Path, PathBuf},
6 sync::{
7 atomic::{AtomicU16, AtomicUsize, Ordering},
8 Arc,
9 },
10 time::SystemTime,
11};
12
13use crate::store::{handle, types, RefreshMode};
14
15pub(crate) struct Snapshot {
16 pub(crate) indices: Vec<handle::IndexLookup>,
18 pub(crate) loose_dbs: Arc<Vec<crate::loose::Store>>,
20 pub(crate) marker: types::SlotIndexMarker,
22}
23
24mod error {
25 use std::path::PathBuf;
26
27 use git_pack::multi_index::PackIndex;
28
29 #[derive(thiserror::Error, Debug)]
31 #[allow(missing_docs)]
32 pub enum Error {
33 #[error("The objects directory at '{0}' is not an accessible directory")]
34 Inaccessible(PathBuf),
35 #[error(transparent)]
36 Io(#[from] std::io::Error),
37 #[error(transparent)]
38 Alternate(#[from] crate::alternate::Error),
39 #[error("The slotmap turned out to be too small with {} entries, would need {} more", .current, .needed)]
40 InsufficientSlots { current: usize, needed: usize },
41 #[error(
45 "Would have overflown amount of max possible generations of {}",
46 super::Generation::MAX
47 )]
48 GenerationOverflow,
49 #[error("Cannot numerically handle more than {limit} packs in a single multi-pack index, got {actual} in file {index_path:?}")]
50 TooManyPacksInMultiIndex {
51 actual: PackIndex,
52 limit: PackIndex,
53 index_path: PathBuf,
54 },
55 }
56}
57
58pub use error::Error;
59
60use crate::store::types::{Generation, IndexAndPacks, MutableIndexAndPack, PackId, SlotMapIndex};
61
62impl super::Store {
63 pub(crate) fn load_all_indices(&self) -> Result<Snapshot, Error> {
65 let mut snapshot = self.collect_snapshot();
66 while let Some(new_snapshot) = self.load_one_index(RefreshMode::Never, snapshot.marker)? {
67 snapshot = new_snapshot
68 }
69 Ok(snapshot)
70 }
71
72 pub(crate) fn load_one_index(
75 &self,
76 refresh_mode: RefreshMode,
77 marker: types::SlotIndexMarker,
78 ) -> Result<Option<Snapshot>, Error> {
79 let index = self.index.load();
80 if !index.is_initialized() {
81 return self.consolidate_with_disk_state(true , false );
82 }
83
84 if marker.generation != index.generation || marker.state_id != index.state_id() {
85 Ok(Some(self.collect_snapshot()))
87 } else {
88 if self.load_next_index(index) {
91 Ok(Some(self.collect_snapshot()))
92 } else {
93 match refresh_mode {
95 RefreshMode::Never => Ok(None),
96 RefreshMode::AfterAllIndicesLoaded => {
97 self.consolidate_with_disk_state(false , true )
98 }
99 }
100 }
101 }
102 }
103
104 fn load_next_index(&self, mut index: arc_swap::Guard<Arc<SlotMapIndex>>) -> bool {
108 'retry_with_changed_index: loop {
109 let previous_state_id = index.state_id();
110 'retry_with_next_slot_index: loop {
111 match index
112 .next_index_to_load
113 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
114 (current != index.slot_indices.len()).then_some(current + 1)
115 }) {
116 Ok(slot_map_index) => {
117 let _ongoing_operation = IncOnNewAndDecOnDrop::new(&index.num_indices_currently_being_loaded);
119 let slot = &self.files[index.slot_indices[slot_map_index]];
120 let _lock = slot.write.lock();
121 if slot.generation.load(Ordering::SeqCst) > index.generation {
122 continue 'retry_with_next_slot_index;
126 }
127 let mut bundle = slot.files.load_full();
128 let bundle_mut = Arc::make_mut(&mut bundle);
129 if let Some(files) = bundle_mut.as_mut() {
130 let _loaded_count = IncOnDrop(&index.loaded_indices);
132 match files.load_index(self.object_hash) {
133 Ok(_) => {
134 slot.files.store(bundle);
135 break 'retry_with_next_slot_index;
136 }
137 Err(_) => {
138 slot.files.store(bundle);
139 continue 'retry_with_next_slot_index;
140 }
141 }
142 }
143 }
144 Err(_nothing_more_to_load) => {
145 let num_load_operations = index.num_indices_currently_being_loaded.deref();
149 while num_load_operations.load(Ordering::Relaxed) != 0 {
151 std::thread::yield_now()
152 }
153 break 'retry_with_next_slot_index;
154 }
155 }
156 }
157 if previous_state_id == index.state_id() {
158 let potentially_new_index = self.index.load();
159 if Arc::as_ptr(&potentially_new_index) == Arc::as_ptr(&index) {
160 return false;
162 } else {
163 index = potentially_new_index;
165 continue 'retry_with_changed_index;
166 }
167 } else {
168 return true;
171 }
172 }
173 }
174
175 pub(crate) fn consolidate_with_disk_state(
178 &self,
179 needs_init: bool,
180 load_new_index: bool,
181 ) -> Result<Option<Snapshot>, Error> {
182 let index = self.index.load();
183 let previous_index_state = Arc::as_ptr(&index) as usize;
184
185 let write = self.write.lock();
187 let objects_directory = &self.path;
188
189 let index = self.index.load();
191 if previous_index_state != Arc::as_ptr(&index) as usize {
192 return Ok(Some(self.collect_snapshot()));
194 }
195
196 let was_uninitialized = !index.is_initialized();
197
198 if !was_uninitialized && needs_init {
204 return Ok(Some(self.collect_snapshot()));
205 }
206 self.num_disk_state_consolidation.fetch_add(1, Ordering::Relaxed);
207
208 let db_paths: Vec<_> = std::iter::once(objects_directory.to_owned())
209 .chain(crate::alternate::resolve(objects_directory, &self.current_dir)?)
210 .collect();
211
212 let loose_dbs = if was_uninitialized
214 || db_paths.len() != index.loose_dbs.len()
215 || db_paths
216 .iter()
217 .zip(index.loose_dbs.iter().map(|ldb| &ldb.path))
218 .any(|(lhs, rhs)| lhs != rhs)
219 {
220 Arc::new(
221 db_paths
222 .iter()
223 .map(|path| crate::loose::Store::at(path, self.object_hash))
224 .collect::<Vec<_>>(),
225 )
226 } else {
227 Arc::clone(&index.loose_dbs)
228 };
229
230 let indices_by_modification_time = Self::collect_indices_and_mtime_sorted_by_size(
231 db_paths,
232 index.slot_indices.len().into(),
233 self.use_multi_pack_index.then_some(self.object_hash),
234 )?;
235 let mut idx_by_index_path: BTreeMap<_, _> = index
236 .slot_indices
237 .iter()
238 .filter_map(|&idx| {
239 let f = &self.files[idx];
240 Option::as_ref(&f.files.load()).map(|f| (f.index_path().to_owned(), idx))
241 })
242 .collect();
243
244 let mut new_slot_map_indices = Vec::new(); let mut index_paths_to_add = was_uninitialized
246 .then(|| VecDeque::with_capacity(indices_by_modification_time.len()))
247 .unwrap_or_default();
248
249 let mut num_loaded_indices = 0;
251 for (index_info, mtime) in indices_by_modification_time.into_iter().map(|(a, b, _)| (a, b)) {
252 match idx_by_index_path.remove(index_info.path()) {
253 Some(slot_idx) => {
254 let slot = &self.files[slot_idx];
255 let files_guard = slot.files.load();
256 let files =
257 Option::as_ref(&files_guard).expect("slot is set or we wouldn't know it points to this file");
258 if index_info.is_multi_index() && files.mtime() != mtime {
259 index_paths_to_add.push_back((index_info, mtime, Some(slot_idx)));
263 if files.index_is_loaded() {
265 num_loaded_indices += 1;
266 }
267 } else {
268 if Self::assure_slot_matches_index(&write, slot, index_info, mtime, index.generation) {
272 num_loaded_indices += 1;
273 }
274 new_slot_map_indices.push(slot_idx);
275 }
276 }
277 None => index_paths_to_add.push_back((index_info, mtime, None)),
278 }
279 }
280 let needs_stable_indices = self.maintain_stable_indices(&write);
281
282 let mut next_possibly_free_index = index
283 .slot_indices
284 .iter()
285 .max()
286 .map(|idx| (idx + 1) % self.files.len())
287 .unwrap_or(0);
288 let mut num_indices_checked = 0;
289 let mut needs_generation_change = false;
290 let mut slot_indices_to_remove: Vec<_> = idx_by_index_path.into_values().collect();
291 while let Some((mut index_info, mtime, move_from_slot_idx)) = index_paths_to_add.pop_front() {
292 'increment_slot_index: loop {
293 if num_indices_checked == self.files.len() {
294 return Err(Error::InsufficientSlots {
295 current: self.files.len(),
296 needed: index_paths_to_add.len() + 1, });
298 }
299 let slot_index = next_possibly_free_index;
300 let slot = &self.files[slot_index];
301 next_possibly_free_index = (next_possibly_free_index + 1) % self.files.len();
302 num_indices_checked += 1;
303 match move_from_slot_idx {
304 Some(move_from_slot_idx) => {
305 debug_assert!(index_info.is_multi_index(), "only set for multi-pack indices");
306 if slot_index == move_from_slot_idx {
307 continue 'increment_slot_index;
309 }
310 match Self::try_set_index_slot(
311 &write,
312 slot,
313 index_info,
314 mtime,
315 index.generation,
316 needs_stable_indices,
317 ) {
318 Ok(dest_was_empty) => {
319 slot_indices_to_remove.push(move_from_slot_idx);
320 new_slot_map_indices.push(slot_index);
321 if !dest_was_empty {
323 needs_generation_change = true;
324 }
325 break 'increment_slot_index;
326 }
327 Err(unused_index_info) => index_info = unused_index_info,
328 }
329 }
330 None => {
331 match Self::try_set_index_slot(
332 &write,
333 slot,
334 index_info,
335 mtime,
336 index.generation,
337 needs_stable_indices,
338 ) {
339 Ok(dest_was_empty) => {
340 new_slot_map_indices.push(slot_index);
341 if !dest_was_empty {
342 needs_generation_change = true;
343 }
344 break 'increment_slot_index;
345 }
346 Err(unused_index_info) => index_info = unused_index_info,
347 }
348 }
349 }
350 }
352 }
353 assert_eq!(
354 index_paths_to_add.len(),
355 0,
356 "By this time we have assigned all new files to slots"
357 );
358
359 let generation = if needs_generation_change {
360 index.generation.checked_add(1).ok_or(Error::GenerationOverflow)?
361 } else {
362 index.generation
363 };
364 let index_unchanged = index.slot_indices == new_slot_map_indices;
365 if generation != index.generation {
366 assert!(
367 !index_unchanged,
368 "if the generation changed, the slot index must have changed for sure"
369 );
370 }
371 if !index_unchanged || loose_dbs != index.loose_dbs {
372 let new_index = Arc::new(SlotMapIndex {
373 slot_indices: new_slot_map_indices,
374 loose_dbs,
375 generation,
376 next_index_to_load: index_unchanged
379 .then(|| Arc::clone(&index.next_index_to_load))
380 .unwrap_or_default(),
381 loaded_indices: index_unchanged
382 .then(|| Arc::clone(&index.loaded_indices))
383 .unwrap_or_else(|| Arc::new(num_loaded_indices.into())),
384 num_indices_currently_being_loaded: Default::default(),
385 });
386 self.index.store(new_index);
387 }
388
389 for slot in slot_indices_to_remove.into_iter().map(|idx| &self.files[idx]) {
392 let _lock = slot.write.lock();
393 let mut files = slot.files.load_full();
394 let files_mut = Arc::make_mut(&mut files);
395 if needs_stable_indices {
396 if let Some(files) = files_mut.as_mut() {
397 files.trash();
398 }
400 } else {
401 *files_mut = None;
402 };
403 slot.files.store(files);
404 if !needs_stable_indices {
405 slot.generation.store(generation, Ordering::SeqCst);
407 }
408 }
409
410 let new_index = self.index.load();
411 Ok(if index.state_id() == new_index.state_id() {
412 None
414 } else {
415 if load_new_index {
416 self.load_next_index(new_index);
417 }
418 Some(self.collect_snapshot())
419 })
420 }
421
422 pub(crate) fn collect_indices_and_mtime_sorted_by_size(
423 db_paths: Vec<PathBuf>,
424 initial_capacity: Option<usize>,
425 multi_pack_index_object_hash: Option<git_hash::Kind>,
426 ) -> Result<Vec<(Either, SystemTime, u64)>, Error> {
427 let mut indices_by_modification_time = Vec::with_capacity(initial_capacity.unwrap_or_default());
428 for db_path in db_paths {
429 let packs = db_path.join("pack");
430 let entries = match std::fs::read_dir(packs) {
431 Ok(e) => e,
432 Err(err) if err.kind() == std::io::ErrorKind::NotFound => continue,
433 Err(err) => return Err(err.into()),
434 };
435 let indices = entries
436 .filter_map(Result::ok)
437 .filter_map(|e| e.metadata().map(|md| (e.path(), md)).ok())
438 .filter(|(_, md)| md.file_type().is_file())
439 .filter(|(p, _)| {
440 let ext = p.extension();
441 (ext == Some(OsStr::new("idx")) && p.with_extension("pack").is_file())
442 || (multi_pack_index_object_hash.is_some() && ext.is_none() && is_multipack_index(p))
443 })
444 .map(|(p, md)| md.modified().map_err(Error::from).map(|mtime| (p, mtime, md.len())))
445 .collect::<Result<Vec<_>, _>>()?;
446
447 let multi_index_info = multi_pack_index_object_hash
448 .and_then(|hash| {
449 indices.iter().find_map(|(p, a, b)| {
450 is_multipack_index(p)
451 .then(|| {
452 git_pack::multi_index::File::at(p)
454 .ok()
455 .filter(|midx| midx.object_hash() == hash)
456 .map(|midx| (midx, *a, *b))
457 })
458 .flatten()
459 .map(|t| {
460 if t.0.num_indices() > PackId::max_packs_in_multi_index() {
461 Err(Error::TooManyPacksInMultiIndex {
462 index_path: p.to_owned(),
463 actual: t.0.num_indices(),
464 limit: PackId::max_packs_in_multi_index(),
465 })
466 } else {
467 Ok(t)
468 }
469 })
470 })
471 })
472 .transpose()?;
473 if let Some((multi_index, mtime, flen)) = multi_index_info {
474 let index_names_in_multi_index: Vec<_> =
475 multi_index.index_names().iter().map(|p| p.as_path()).collect();
476 let mut indices_not_in_multi_index: Vec<(Either, _, _)> = indices
477 .into_iter()
478 .filter_map(|(path, a, b)| {
479 (path != multi_index.path()
480 && !index_names_in_multi_index
481 .contains(&Path::new(path.file_name().expect("file name present"))))
482 .then_some((Either::IndexPath(path), a, b))
483 })
484 .collect();
485 indices_not_in_multi_index.insert(0, (Either::MultiIndexFile(Arc::new(multi_index)), mtime, flen));
486 indices_by_modification_time.extend(indices_not_in_multi_index);
487 } else {
488 indices_by_modification_time.extend(
489 indices
490 .into_iter()
491 .filter_map(|(p, a, b)| (!is_multipack_index(&p)).then_some((Either::IndexPath(p), a, b))),
492 )
493 }
494 }
495 indices_by_modification_time.sort_by(|l, r| l.2.cmp(&r.2).reverse());
499 Ok(indices_by_modification_time)
500 }
501
502 #[allow(clippy::too_many_arguments)]
504 fn try_set_index_slot(
505 lock: &parking_lot::MutexGuard<'_, ()>,
506 dest_slot: &MutableIndexAndPack,
507 index_info: Either,
508 mtime: SystemTime,
509 current_generation: Generation,
510 needs_stable_indices: bool,
511 ) -> Result<bool, Either> {
512 let (dest_slot_was_empty, generation) = match &**dest_slot.files.load() {
513 Some(bundle) => {
514 if bundle.index_path() == index_info.path() || (bundle.is_disposable() && needs_stable_indices) {
515 return Err(index_info);
518 }
519 (false, current_generation + 1)
528 }
529 None => {
530 (true, current_generation)
538 }
539 };
540 Self::set_slot_to_index(lock, dest_slot, index_info, mtime, generation);
541 Ok(dest_slot_was_empty)
542 }
543
544 fn set_slot_to_index(
545 _lock: &parking_lot::MutexGuard<'_, ()>,
546 slot: &MutableIndexAndPack,
547 index_info: Either,
548 mtime: SystemTime,
549 generation: Generation,
550 ) {
551 let _lock = slot.write.lock();
552 let mut files = slot.files.load_full();
553 let files_mut = Arc::make_mut(&mut files);
554 slot.generation.store(generation, Ordering::SeqCst);
559 *files_mut = Some(index_info.into_index_and_packs(mtime));
560 slot.files.store(files);
561 }
562
563 fn assure_slot_matches_index(
565 _lock: &parking_lot::MutexGuard<'_, ()>,
566 slot: &MutableIndexAndPack,
567 index_info: Either,
568 mtime: SystemTime,
569 current_generation: Generation,
570 ) -> bool {
571 match Option::as_ref(&slot.files.load()) {
572 Some(bundle) => {
573 assert_eq!(
574 bundle.index_path(),
575 index_info.path(),
576 "Parallel writers cannot change the file the slot points to."
577 );
578 if bundle.is_disposable() {
579 let _lock = slot.write.lock();
583 let mut files = slot.files.load_full();
584 let files_mut = Arc::make_mut(&mut files)
585 .as_mut()
586 .expect("BUG: cannot change from something to nothing, would be race");
587 files_mut.put_back();
588 debug_assert_eq!(
589 files_mut.mtime(),
590 mtime,
591 "BUG: we can only put back files that didn't obviously change"
592 );
593 slot.generation.store(current_generation, Ordering::SeqCst);
597 slot.files.store(files);
598 } else {
599 }
601 bundle.index_is_loaded()
602 }
603 None => {
604 unreachable!("BUG: a slot can never be deleted if we have it recorded in the index WHILE changing said index. There shouldn't be a race")
605 }
606 }
607 }
608
609 fn maintain_stable_indices(&self, _guard: &parking_lot::MutexGuard<'_, ()>) -> bool {
615 self.num_handles_stable.load(Ordering::SeqCst) > 0
616 }
617
618 pub(crate) fn collect_snapshot(&self) -> Snapshot {
619 let index = self.index.load();
620 let indices = if index.is_initialized() {
621 index
622 .slot_indices
623 .iter()
624 .map(|idx| (*idx, &self.files[*idx]))
625 .filter_map(|(id, file)| {
626 let lookup = match (**file.files.load()).as_ref()? {
627 types::IndexAndPacks::Index(bundle) => handle::SingleOrMultiIndex::Single {
628 index: bundle.index.loaded()?.clone(),
629 data: bundle.data.loaded().cloned(),
630 },
631 types::IndexAndPacks::MultiIndex(multi) => handle::SingleOrMultiIndex::Multi {
632 index: multi.multi_index.loaded()?.clone(),
633 data: multi.data.iter().map(|f| f.loaded().cloned()).collect(),
634 },
635 };
636 handle::IndexLookup { file: lookup, id }.into()
637 })
638 .collect()
639 } else {
640 Vec::new()
641 };
642
643 Snapshot {
644 indices,
645 loose_dbs: Arc::clone(&index.loose_dbs),
646 marker: index.marker(),
647 }
648 }
649}
650
651fn is_multipack_index(path: &Path) -> bool {
653 path.file_name() == Some(OsStr::new("multi-pack-index"))
654}
655
656struct IncOnNewAndDecOnDrop<'a>(&'a AtomicU16);
657impl<'a> IncOnNewAndDecOnDrop<'a> {
658 pub fn new(v: &'a AtomicU16) -> Self {
659 v.fetch_add(1, Ordering::SeqCst);
660 Self(v)
661 }
662}
663impl<'a> Drop for IncOnNewAndDecOnDrop<'a> {
664 fn drop(&mut self) {
665 self.0.fetch_sub(1, Ordering::SeqCst);
666 }
667}
668
669struct IncOnDrop<'a>(&'a AtomicUsize);
670impl<'a> Drop for IncOnDrop<'a> {
671 fn drop(&mut self) {
672 self.0.fetch_add(1, Ordering::SeqCst);
673 }
674}
675
676pub(crate) enum Either {
677 IndexPath(PathBuf),
678 MultiIndexFile(Arc<git_pack::multi_index::File>),
679}
680
681impl Either {
682 fn path(&self) -> &Path {
683 match self {
684 Either::IndexPath(p) => p,
685 Either::MultiIndexFile(f) => f.path(),
686 }
687 }
688
689 fn into_index_and_packs(self, mtime: SystemTime) -> IndexAndPacks {
690 match self {
691 Either::IndexPath(path) => IndexAndPacks::new_single(path, mtime),
692 Either::MultiIndexFile(file) => IndexAndPacks::new_multi_from_open_file(file, mtime),
693 }
694 }
695
696 fn is_multi_index(&self) -> bool {
697 matches!(self, Either::MultiIndexFile(_))
698 }
699}
700
701impl Eq for Either {}
702
703impl PartialEq<Self> for Either {
704 fn eq(&self, other: &Self) -> bool {
705 self.path().eq(other.path())
706 }
707}
708
709impl PartialOrd<Self> for Either {
710 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
711 self.path().partial_cmp(other.path())
712 }
713}
714
715impl Ord for Either {
716 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
717 self.path().cmp(other.path())
718 }
719}