pub struct File { /* private fields */ }
Expand description
A representation of a pack index file
Implementations§
source§impl File
impl File
Instantiation
sourcepub fn at(path: impl AsRef<Path>, object_hash: Kind) -> Result<File, Error>
pub fn at(path: impl AsRef<Path>, object_hash: Kind) -> Result<File, Error>
Open the pack index file at the given path
.
The object_hash
is a way to read (and write) the same file format with different hashes, as the hash kind
isn’t stored within the file format itself.
Examples found in repository?
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
fn at_inner(path: &Path, object_hash: git_hash::Kind) -> Result<Self, Error> {
let ext = path
.extension()
.and_then(|e| e.to_str())
.ok_or_else(|| Error::InvalidPath(path.to_owned()))?;
Ok(match ext {
"idx" => Self {
index: crate::index::File::at(path, object_hash)?,
pack: crate::data::File::at(path.with_extension("pack"), object_hash)?,
},
"pack" => Self {
pack: crate::data::File::at(path, object_hash)?,
index: crate::index::File::at(path.with_extension("idx"), object_hash)?,
},
_ => return Err(Error::InvalidPath(path.to_owned())),
})
}
More examples
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206
pub fn write_from_index_paths<P>(
mut index_paths: Vec<PathBuf>,
out: impl std::io::Write,
mut progress: P,
should_interrupt: &AtomicBool,
Options { object_hash }: Options,
) -> Result<Outcome<P>, Error>
where
P: Progress,
{
let out = git_features::hash::Write::new(out, object_hash);
let (index_paths_sorted, index_filenames_sorted) = {
index_paths.sort();
let file_names = index_paths
.iter()
.map(|p| PathBuf::from(p.file_name().expect("file name present")))
.collect::<Vec<_>>();
(index_paths, file_names)
};
let entries = {
let mut entries = Vec::new();
let start = Instant::now();
let mut progress = progress.add_child_with_id("Collecting entries", *b"MPCE"); /* Multiindex from Paths Collecting Entries */
progress.init(Some(index_paths_sorted.len()), git_features::progress::count("indices"));
// This could be parallelized… but it's probably not worth it unless you have 500mio objects.
for (index_id, index) in index_paths_sorted.iter().enumerate() {
let mtime = index
.metadata()
.and_then(|m| m.modified())
.unwrap_or(SystemTime::UNIX_EPOCH);
let index = crate::index::File::at(index, object_hash)?;
entries.reserve(index.num_objects() as usize);
entries.extend(index.iter().map(|e| Entry {
id: e.oid,
pack_index: index_id as u32,
pack_offset: e.pack_offset,
index_mtime: mtime,
}));
progress.inc();
if should_interrupt.load(Ordering::Relaxed) {
return Err(Error::Interrupted);
}
}
progress.show_throughput(start);
let start = Instant::now();
progress.set_name("Deduplicate");
progress.init(Some(entries.len()), git_features::progress::count("entries"));
entries.sort_by(|l, r| {
l.id.cmp(&r.id)
.then_with(|| l.index_mtime.cmp(&r.index_mtime).reverse())
.then_with(|| l.pack_index.cmp(&r.pack_index))
});
entries.dedup_by_key(|e| e.id);
progress.inc_by(entries.len());
progress.show_throughput(start);
if should_interrupt.load(Ordering::Relaxed) {
return Err(Error::Interrupted);
}
entries
};
let mut cf = git_chunk::file::Index::for_writing();
cf.plan_chunk(
multi_index::chunk::index_names::ID,
multi_index::chunk::index_names::storage_size(&index_filenames_sorted),
);
cf.plan_chunk(multi_index::chunk::fanout::ID, multi_index::chunk::fanout::SIZE as u64);
cf.plan_chunk(
multi_index::chunk::lookup::ID,
multi_index::chunk::lookup::storage_size(entries.len(), object_hash),
);
cf.plan_chunk(
multi_index::chunk::offsets::ID,
multi_index::chunk::offsets::storage_size(entries.len()),
);
let num_large_offsets = multi_index::chunk::large_offsets::num_large_offsets(&entries);
if let Some(num_large_offsets) = num_large_offsets {
cf.plan_chunk(
multi_index::chunk::large_offsets::ID,
multi_index::chunk::large_offsets::storage_size(num_large_offsets),
);
}
let mut write_progress = progress.add_child_with_id("Writing multi-index", *b"MPBW"); /* Multiindex Bytes Written */
let write_start = Instant::now();
write_progress.init(
Some(cf.planned_storage_size() as usize + Self::HEADER_LEN),
git_features::progress::bytes(),
);
let mut out = git_features::progress::Write {
inner: out,
progress: write_progress,
};
let bytes_written = Self::write_header(
&mut out,
cf.num_chunks().try_into().expect("BUG: wrote more than 256 chunks"),
index_paths_sorted.len() as u32,
object_hash,
)?;
{
progress.set_name("Writing chunks");
progress.init(Some(cf.num_chunks()), git_features::progress::count("chunks"));
let mut chunk_write = cf.into_write(&mut out, bytes_written)?;
while let Some(chunk_to_write) = chunk_write.next_chunk() {
match chunk_to_write {
multi_index::chunk::index_names::ID => {
multi_index::chunk::index_names::write(&index_filenames_sorted, &mut chunk_write)?
}
multi_index::chunk::fanout::ID => multi_index::chunk::fanout::write(&entries, &mut chunk_write)?,
multi_index::chunk::lookup::ID => multi_index::chunk::lookup::write(&entries, &mut chunk_write)?,
multi_index::chunk::offsets::ID => {
multi_index::chunk::offsets::write(&entries, num_large_offsets.is_some(), &mut chunk_write)?
}
multi_index::chunk::large_offsets::ID => multi_index::chunk::large_offsets::write(
&entries,
num_large_offsets.expect("available if planned"),
&mut chunk_write,
)?,
unknown => unreachable!("BUG: forgot to implement chunk {:?}", std::str::from_utf8(&unknown)),
}
progress.inc();
if should_interrupt.load(Ordering::Relaxed) {
return Err(Error::Interrupted);
}
}
}
// write trailing checksum
let multi_index_checksum: git_hash::ObjectId = out.inner.hash.digest().into();
out.inner.inner.write_all(multi_index_checksum.as_slice())?;
out.progress.show_throughput(write_start);
Ok(Outcome {
multi_index_checksum,
progress,
})
}
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312
fn verify_integrity_inner<C, P, F>(
&self,
mut progress: P,
should_interrupt: &AtomicBool,
deep_check: bool,
options: index::verify::integrity::Options<F>,
) -> Result<integrity::Outcome<P>, index::traverse::Error<integrity::Error>>
where
P: Progress,
C: crate::cache::DecodeEntry,
F: Fn() -> C + Send + Clone,
{
let parent = self.path.parent().expect("must be in a directory");
let actual_index_checksum = self
.verify_checksum(
progress.add_child_with_id(format!("{}: checksum", self.path.display()), *b"MVCK"), /* Multiindex Verify ChecKsum */
should_interrupt,
)
.map_err(integrity::Error::from)
.map_err(index::traverse::Error::Processor)?;
if let Some(first_invalid) = crate::verify::fan(&self.fan) {
return Err(index::traverse::Error::Processor(integrity::Error::Fan {
index: first_invalid,
}));
}
if self.num_objects == 0 {
return Err(index::traverse::Error::Processor(integrity::Error::Empty));
}
let mut pack_traverse_statistics = Vec::new();
let operation_start = Instant::now();
let mut total_objects_checked = 0;
let mut pack_ids_and_offsets = Vec::with_capacity(self.num_objects as usize);
{
let order_start = Instant::now();
let mut progress = progress.add_child_with_id("checking oid order", *b"MVOR"); /* Multiindex Verify Oid oRder */
progress.init(
Some(self.num_objects as usize),
git_features::progress::count("objects"),
);
for entry_index in 0..(self.num_objects - 1) {
let lhs = self.oid_at_index(entry_index);
let rhs = self.oid_at_index(entry_index + 1);
if rhs.cmp(lhs) != Ordering::Greater {
return Err(index::traverse::Error::Processor(integrity::Error::OutOfOrder {
index: entry_index,
}));
}
let (pack_id, _) = self.pack_id_and_pack_offset_at_index(entry_index);
pack_ids_and_offsets.push((pack_id, entry_index));
progress.inc();
}
{
let entry_index = self.num_objects - 1;
let (pack_id, _) = self.pack_id_and_pack_offset_at_index(entry_index);
pack_ids_and_offsets.push((pack_id, entry_index));
}
// sort by pack-id to allow handling all indices matching a pack while its open.
pack_ids_and_offsets.sort_by(|l, r| l.0.cmp(&r.0));
progress.show_throughput(order_start);
};
progress.init(
Some(self.num_indices as usize),
git_features::progress::count("indices"),
);
let mut pack_ids_slice = pack_ids_and_offsets.as_slice();
for (pack_id, index_file_name) in self.index_names.iter().enumerate() {
progress.set_name(index_file_name.display().to_string());
progress.inc();
let mut bundle = None;
let index;
let index_path = parent.join(index_file_name);
let index = if deep_check {
bundle = crate::Bundle::at(index_path, self.object_hash)
.map_err(integrity::Error::from)
.map_err(index::traverse::Error::Processor)?
.into();
bundle.as_ref().map(|b| &b.index).expect("just set")
} else {
index = Some(
index::File::at(index_path, self.object_hash)
.map_err(|err| integrity::Error::BundleInit(crate::bundle::init::Error::Index(err)))
.map_err(index::traverse::Error::Processor)?,
);
index.as_ref().expect("just set")
};
let slice_end = pack_ids_slice.partition_point(|e| e.0 == pack_id as crate::data::Id);
let multi_index_entries_to_check = &pack_ids_slice[..slice_end];
{
let offset_start = Instant::now();
let mut offsets_progress = progress.add_child_with_id("verify object offsets", *b"MVOF"); /* Multiindex Verify Object Offsets */
offsets_progress.init(
Some(pack_ids_and_offsets.len()),
git_features::progress::count("objects"),
);
pack_ids_slice = &pack_ids_slice[slice_end..];
for entry_id in multi_index_entries_to_check.iter().map(|e| e.1) {
let oid = self.oid_at_index(entry_id);
let (_, expected_pack_offset) = self.pack_id_and_pack_offset_at_index(entry_id);
let entry_in_bundle_index = index.lookup(oid).ok_or_else(|| {
index::traverse::Error::Processor(integrity::Error::OidNotFound { id: oid.to_owned() })
})?;
let actual_pack_offset = index.pack_offset_at_index(entry_in_bundle_index);
if actual_pack_offset != expected_pack_offset {
return Err(index::traverse::Error::Processor(
integrity::Error::PackOffsetMismatch {
id: oid.to_owned(),
expected_pack_offset,
actual_pack_offset,
},
));
}
offsets_progress.inc();
}
if should_interrupt.load(std::sync::atomic::Ordering::Relaxed) {
return Err(index::traverse::Error::Processor(integrity::Error::Interrupted));
}
offsets_progress.show_throughput(offset_start);
}
total_objects_checked += multi_index_entries_to_check.len();
if let Some(bundle) = bundle {
progress.set_name(format!("Validating {}", index_file_name.display()));
let crate::bundle::verify::integrity::Outcome {
actual_index_checksum: _,
pack_traverse_outcome,
progress: returned_progress,
} = bundle
.verify_integrity(progress, should_interrupt, options.clone())
.map_err(|err| {
use index::traverse::Error::*;
match err {
Processor(err) => Processor(integrity::Error::IndexIntegrity(err)),
VerifyChecksum(err) => VerifyChecksum(err),
Tree(err) => Tree(err),
TreeTraversal(err) => TreeTraversal(err),
PackDecode { id, offset, source } => PackDecode { id, offset, source },
PackMismatch { expected, actual } => PackMismatch { expected, actual },
PackObjectMismatch {
expected,
actual,
offset,
kind,
} => PackObjectMismatch {
expected,
actual,
offset,
kind,
},
Crc32Mismatch {
expected,
actual,
offset,
kind,
} => Crc32Mismatch {
expected,
actual,
offset,
kind,
},
Interrupted => Interrupted,
}
})?;
progress = returned_progress;
pack_traverse_statistics.push(pack_traverse_outcome);
}
}
assert_eq!(
self.num_objects as usize, total_objects_checked,
"BUG: our slicing should allow to visit all objects"
);
progress.set_name("Validating multi-pack");
progress.show_throughput(operation_start);
Ok(integrity::Outcome {
actual_index_checksum,
pack_traverse_statistics,
progress,
})
}
source§impl File
impl File
Iteration and access
sourcepub fn oid_at_index(&self, index: EntryIndex) -> &oid
pub fn oid_at_index(&self, index: EntryIndex) -> &oid
Returns the object hash at the given index in our list of (sorted) sha1 hashes. The index ranges from 0 to self.num_objects()
Panics
If index
is out of bounds.
Examples found in repository?
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
pub fn lookup(&self, id: impl AsRef<git_hash::oid>) -> Option<EntryIndex> {
lookup(id, &self.fan, |idx| self.oid_at_index(idx))
}
/// Given a `prefix`, find an object that matches it uniquely within this index and return `Some(Ok(entry_index))`.
/// If there is more than one object matching the object `Some(Err(())` is returned.
///
/// Finally, if no object matches the index, the return value is `None`.
///
/// Pass `candidates` to obtain the set of entry-indices matching `prefix`, with the same return value as
/// one would have received if it remained `None`. It will be empty if no object matched the `prefix`.
///
// NOTE: pretty much the same things as in `index::File::lookup`, change things there
// as well.
pub fn lookup_prefix(
&self,
prefix: git_hash::Prefix,
candidates: Option<&mut Range<EntryIndex>>,
) -> Option<PrefixLookupResult> {
lookup_prefix(
prefix,
candidates,
&self.fan,
|idx| self.oid_at_index(idx),
self.num_objects,
)
}
sourcepub fn pack_offset_at_index(&self, index: EntryIndex) -> Offset
pub fn pack_offset_at_index(&self, index: EntryIndex) -> Offset
Returns the offset into our pack data file at which to start reading the object at index
.
Panics
If index
is out of bounds.
Examples found in repository?
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
pub fn get_object_by_index<'a>(
&self,
idx: u32,
out: &'a mut Vec<u8>,
cache: &mut impl crate::cache::DecodeEntry,
) -> Result<(git_object::Data<'a>, crate::data::entry::Location), crate::data::decode::Error> {
let ofs = self.index.pack_offset_at_index(idx);
let pack_entry = self.pack.entry(ofs);
let header_size = pack_entry.header_size();
self.pack
.decode_entry(
pack_entry,
out,
|id, _out| {
self.index.lookup(id).map(|idx| {
crate::data::decode::entry::ResolvedBase::InPack(
self.pack.entry(self.index.pack_offset_at_index(idx)),
)
})
},
cache,
)
.map(move |r| {
(
git_object::Data {
kind: r.kind,
data: out.as_slice(),
},
crate::data::entry::Location {
pack_id: self.pack.id,
pack_offset: ofs,
entry_size: r.compressed_size + header_size,
},
)
})
}
More examples
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
fn decode_and_process_entry<C, P, E>(
&self,
check: SafetyCheck,
pack: &crate::data::File,
cache: &mut C,
buf: &mut Vec<u8>,
progress: &mut P,
index_entry: &crate::index::Entry,
processor: &mut impl FnMut(git_object::Kind, &[u8], &index::Entry, &mut P) -> Result<(), E>,
) -> Result<crate::data::decode::entry::Outcome, Error<E>>
where
C: crate::cache::DecodeEntry,
P: Progress,
E: std::error::Error + Send + Sync + 'static,
{
let pack_entry = pack.entry(index_entry.pack_offset);
let pack_entry_data_offset = pack_entry.data_offset;
let entry_stats = pack
.decode_entry(
pack_entry,
buf,
|id, _| {
self.lookup(id).map(|index| {
crate::data::decode::entry::ResolvedBase::InPack(pack.entry(self.pack_offset_at_index(index)))
})
},
cache,
)
.map_err(|e| Error::PackDecode {
source: e,
id: index_entry.oid,
offset: index_entry.pack_offset,
})?;
let object_kind = entry_stats.kind;
let header_size = (pack_entry_data_offset - index_entry.pack_offset) as usize;
let entry_len = header_size + entry_stats.compressed_size;
process_entry(
check,
object_kind,
buf,
progress,
index_entry,
|| pack.entry_crc32(index_entry.pack_offset, entry_len),
processor,
)?;
Ok(entry_stats)
}
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
pub fn traverse_with_index<P, Processor, E>(
&self,
pack: &crate::data::File,
new_processor: impl Fn() -> Processor + Send + Clone,
mut progress: P,
should_interrupt: &AtomicBool,
Options { check, thread_limit }: Options,
) -> Result<Outcome<P>, Error<E>>
where
P: Progress,
Processor: FnMut(
git_object::Kind,
&[u8],
&index::Entry,
&mut <P::SubProgress as Progress>::SubProgress,
) -> Result<(), E>,
E: std::error::Error + Send + Sync + 'static,
{
let (verify_result, traversal_result) = parallel::join(
{
let pack_progress = progress.add_child_with_id(
format!(
"Hash of pack '{}'",
pack.path().file_name().expect("pack has filename").to_string_lossy()
),
*b"PTHP", /* Pack Traverse Hash Pack bytes */
);
let index_progress = progress.add_child_with_id(
format!(
"Hash of index '{}'",
self.path.file_name().expect("index has filename").to_string_lossy()
),
*b"PTHI", /* Pack Traverse Hash Index bytes */
);
move || {
let res = self.possibly_verify(pack, check, pack_progress, index_progress, should_interrupt);
if res.is_err() {
should_interrupt.store(true, Ordering::SeqCst);
}
res
}
},
|| -> Result<_, Error<_>> {
let sorted_entries = index_entries_sorted_by_offset_ascending(
self,
progress.add_child_with_id("collecting sorted index", *b"PTCE"),
); /* Pack Traverse Collect sorted Entries */
let tree = crate::cache::delta::Tree::from_offsets_in_pack(
pack.path(),
sorted_entries.into_iter().map(Entry::from),
|e| e.index_entry.pack_offset,
|id| self.lookup(id).map(|idx| self.pack_offset_at_index(idx)),
progress.add_child_with_id("indexing", *b"PTDI"), /* Pack Traverse Delta Index creation */
should_interrupt,
self.object_hash,
)?;
let mut outcome = digest_statistics(tree.traverse(
|slice, out| pack.entry_slice(slice).map(|entry| out.copy_from_slice(entry)),
pack.pack_end() as u64,
new_processor,
|data,
progress,
traverse::Context {
entry: pack_entry,
entry_end,
decompressed: bytes,
state: ref mut processor,
level,
}| {
let object_kind = pack_entry.header.as_kind().expect("non-delta object");
data.level = level;
data.decompressed_size = pack_entry.decompressed_size;
data.object_kind = object_kind;
data.compressed_size = entry_end - pack_entry.data_offset;
data.object_size = bytes.len() as u64;
let result = crate::index::traverse::process_entry(
check,
object_kind,
bytes,
progress,
&data.index_entry,
|| {
// TODO: Fix this - we overwrite the header of 'data' which also changes the computed entry size,
// causing index and pack to seemingly mismatch. This is surprising, and should be done differently.
// debug_assert_eq!(&data.index_entry.pack_offset, &pack_entry.pack_offset());
git_features::hash::crc32(
pack.entry_slice(data.index_entry.pack_offset..entry_end)
.expect("slice pointing into the pack (by now data is verified)"),
)
},
processor,
);
match result {
Err(err @ Error::PackDecode { .. }) if !check.fatal_decode_error() => {
progress.info(format!("Ignoring decode error: {}", err));
Ok(())
}
res => res,
}
},
crate::cache::delta::traverse::Options {
object_progress: progress.add_child_with_id("Resolving", *b"PTRO"), /* Pack Traverse Resolve Objects */
size_progress: progress.add_child_with_id("Decoding", *b"PTDB"), /* Pack Traverse Decode Bytes */
thread_limit,
should_interrupt,
object_hash: self.object_hash,
},
)?);
outcome.pack_size = pack.data_len() as u64;
Ok(outcome)
},
);
Ok(Outcome {
actual_index_checksum: verify_result?,
statistics: traversal_result?,
progress,
})
}
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312
fn verify_integrity_inner<C, P, F>(
&self,
mut progress: P,
should_interrupt: &AtomicBool,
deep_check: bool,
options: index::verify::integrity::Options<F>,
) -> Result<integrity::Outcome<P>, index::traverse::Error<integrity::Error>>
where
P: Progress,
C: crate::cache::DecodeEntry,
F: Fn() -> C + Send + Clone,
{
let parent = self.path.parent().expect("must be in a directory");
let actual_index_checksum = self
.verify_checksum(
progress.add_child_with_id(format!("{}: checksum", self.path.display()), *b"MVCK"), /* Multiindex Verify ChecKsum */
should_interrupt,
)
.map_err(integrity::Error::from)
.map_err(index::traverse::Error::Processor)?;
if let Some(first_invalid) = crate::verify::fan(&self.fan) {
return Err(index::traverse::Error::Processor(integrity::Error::Fan {
index: first_invalid,
}));
}
if self.num_objects == 0 {
return Err(index::traverse::Error::Processor(integrity::Error::Empty));
}
let mut pack_traverse_statistics = Vec::new();
let operation_start = Instant::now();
let mut total_objects_checked = 0;
let mut pack_ids_and_offsets = Vec::with_capacity(self.num_objects as usize);
{
let order_start = Instant::now();
let mut progress = progress.add_child_with_id("checking oid order", *b"MVOR"); /* Multiindex Verify Oid oRder */
progress.init(
Some(self.num_objects as usize),
git_features::progress::count("objects"),
);
for entry_index in 0..(self.num_objects - 1) {
let lhs = self.oid_at_index(entry_index);
let rhs = self.oid_at_index(entry_index + 1);
if rhs.cmp(lhs) != Ordering::Greater {
return Err(index::traverse::Error::Processor(integrity::Error::OutOfOrder {
index: entry_index,
}));
}
let (pack_id, _) = self.pack_id_and_pack_offset_at_index(entry_index);
pack_ids_and_offsets.push((pack_id, entry_index));
progress.inc();
}
{
let entry_index = self.num_objects - 1;
let (pack_id, _) = self.pack_id_and_pack_offset_at_index(entry_index);
pack_ids_and_offsets.push((pack_id, entry_index));
}
// sort by pack-id to allow handling all indices matching a pack while its open.
pack_ids_and_offsets.sort_by(|l, r| l.0.cmp(&r.0));
progress.show_throughput(order_start);
};
progress.init(
Some(self.num_indices as usize),
git_features::progress::count("indices"),
);
let mut pack_ids_slice = pack_ids_and_offsets.as_slice();
for (pack_id, index_file_name) in self.index_names.iter().enumerate() {
progress.set_name(index_file_name.display().to_string());
progress.inc();
let mut bundle = None;
let index;
let index_path = parent.join(index_file_name);
let index = if deep_check {
bundle = crate::Bundle::at(index_path, self.object_hash)
.map_err(integrity::Error::from)
.map_err(index::traverse::Error::Processor)?
.into();
bundle.as_ref().map(|b| &b.index).expect("just set")
} else {
index = Some(
index::File::at(index_path, self.object_hash)
.map_err(|err| integrity::Error::BundleInit(crate::bundle::init::Error::Index(err)))
.map_err(index::traverse::Error::Processor)?,
);
index.as_ref().expect("just set")
};
let slice_end = pack_ids_slice.partition_point(|e| e.0 == pack_id as crate::data::Id);
let multi_index_entries_to_check = &pack_ids_slice[..slice_end];
{
let offset_start = Instant::now();
let mut offsets_progress = progress.add_child_with_id("verify object offsets", *b"MVOF"); /* Multiindex Verify Object Offsets */
offsets_progress.init(
Some(pack_ids_and_offsets.len()),
git_features::progress::count("objects"),
);
pack_ids_slice = &pack_ids_slice[slice_end..];
for entry_id in multi_index_entries_to_check.iter().map(|e| e.1) {
let oid = self.oid_at_index(entry_id);
let (_, expected_pack_offset) = self.pack_id_and_pack_offset_at_index(entry_id);
let entry_in_bundle_index = index.lookup(oid).ok_or_else(|| {
index::traverse::Error::Processor(integrity::Error::OidNotFound { id: oid.to_owned() })
})?;
let actual_pack_offset = index.pack_offset_at_index(entry_in_bundle_index);
if actual_pack_offset != expected_pack_offset {
return Err(index::traverse::Error::Processor(
integrity::Error::PackOffsetMismatch {
id: oid.to_owned(),
expected_pack_offset,
actual_pack_offset,
},
));
}
offsets_progress.inc();
}
if should_interrupt.load(std::sync::atomic::Ordering::Relaxed) {
return Err(index::traverse::Error::Processor(integrity::Error::Interrupted));
}
offsets_progress.show_throughput(offset_start);
}
total_objects_checked += multi_index_entries_to_check.len();
if let Some(bundle) = bundle {
progress.set_name(format!("Validating {}", index_file_name.display()));
let crate::bundle::verify::integrity::Outcome {
actual_index_checksum: _,
pack_traverse_outcome,
progress: returned_progress,
} = bundle
.verify_integrity(progress, should_interrupt, options.clone())
.map_err(|err| {
use index::traverse::Error::*;
match err {
Processor(err) => Processor(integrity::Error::IndexIntegrity(err)),
VerifyChecksum(err) => VerifyChecksum(err),
Tree(err) => Tree(err),
TreeTraversal(err) => TreeTraversal(err),
PackDecode { id, offset, source } => PackDecode { id, offset, source },
PackMismatch { expected, actual } => PackMismatch { expected, actual },
PackObjectMismatch {
expected,
actual,
offset,
kind,
} => PackObjectMismatch {
expected,
actual,
offset,
kind,
},
Crc32Mismatch {
expected,
actual,
offset,
kind,
} => Crc32Mismatch {
expected,
actual,
offset,
kind,
},
Interrupted => Interrupted,
}
})?;
progress = returned_progress;
pack_traverse_statistics.push(pack_traverse_outcome);
}
}
assert_eq!(
self.num_objects as usize, total_objects_checked,
"BUG: our slicing should allow to visit all objects"
);
progress.set_name("Validating multi-pack");
progress.show_throughput(operation_start);
Ok(integrity::Outcome {
actual_index_checksum,
pack_traverse_statistics,
progress,
})
}
sourcepub fn crc32_at_index(&self, index: EntryIndex) -> Option<u32>
pub fn crc32_at_index(&self, index: EntryIndex) -> Option<u32>
Returns the CRC32 of the object at the given index
.
Note: These are always present for index version 2 or higher.
Panics
If index
is out of bounds.
sourcepub fn lookup(&self, id: impl AsRef<oid>) -> Option<EntryIndex>
pub fn lookup(&self, id: impl AsRef<oid>) -> Option<EntryIndex>
Returns the index
of the given hash for use with the oid_at_index()
,
pack_offset_at_index()
or crc32_at_index()
.
Examples found in repository?
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
pub fn find<'a>(
&self,
id: impl AsRef<git_hash::oid>,
out: &'a mut Vec<u8>,
cache: &mut impl crate::cache::DecodeEntry,
) -> Result<Option<(git_object::Data<'a>, crate::data::entry::Location)>, crate::data::decode::Error> {
let idx = match self.index.lookup(id) {
Some(idx) => idx,
None => return Ok(None),
};
self.get_object_by_index(idx, out, cache).map(Some)
}
/// Special-use function to get an object given an index previously returned from
/// internal_find_pack_index.
///
/// # Panics
///
/// If `index` is out of bounds.
pub fn get_object_by_index<'a>(
&self,
idx: u32,
out: &'a mut Vec<u8>,
cache: &mut impl crate::cache::DecodeEntry,
) -> Result<(git_object::Data<'a>, crate::data::entry::Location), crate::data::decode::Error> {
let ofs = self.index.pack_offset_at_index(idx);
let pack_entry = self.pack.entry(ofs);
let header_size = pack_entry.header_size();
self.pack
.decode_entry(
pack_entry,
out,
|id, _out| {
self.index.lookup(id).map(|idx| {
crate::data::decode::entry::ResolvedBase::InPack(
self.pack.entry(self.index.pack_offset_at_index(idx)),
)
})
},
cache,
)
.map(move |r| {
(
git_object::Data {
kind: r.kind,
data: out.as_slice(),
},
crate::data::entry::Location {
pack_id: self.pack.id,
pack_offset: ofs,
entry_size: r.compressed_size + header_size,
},
)
})
}
More examples
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
fn decode_and_process_entry<C, P, E>(
&self,
check: SafetyCheck,
pack: &crate::data::File,
cache: &mut C,
buf: &mut Vec<u8>,
progress: &mut P,
index_entry: &crate::index::Entry,
processor: &mut impl FnMut(git_object::Kind, &[u8], &index::Entry, &mut P) -> Result<(), E>,
) -> Result<crate::data::decode::entry::Outcome, Error<E>>
where
C: crate::cache::DecodeEntry,
P: Progress,
E: std::error::Error + Send + Sync + 'static,
{
let pack_entry = pack.entry(index_entry.pack_offset);
let pack_entry_data_offset = pack_entry.data_offset;
let entry_stats = pack
.decode_entry(
pack_entry,
buf,
|id, _| {
self.lookup(id).map(|index| {
crate::data::decode::entry::ResolvedBase::InPack(pack.entry(self.pack_offset_at_index(index)))
})
},
cache,
)
.map_err(|e| Error::PackDecode {
source: e,
id: index_entry.oid,
offset: index_entry.pack_offset,
})?;
let object_kind = entry_stats.kind;
let header_size = (pack_entry_data_offset - index_entry.pack_offset) as usize;
let entry_len = header_size + entry_stats.compressed_size;
process_entry(
check,
object_kind,
buf,
progress,
index_entry,
|| pack.entry_crc32(index_entry.pack_offset, entry_len),
processor,
)?;
Ok(entry_stats)
}
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
pub fn traverse_with_index<P, Processor, E>(
&self,
pack: &crate::data::File,
new_processor: impl Fn() -> Processor + Send + Clone,
mut progress: P,
should_interrupt: &AtomicBool,
Options { check, thread_limit }: Options,
) -> Result<Outcome<P>, Error<E>>
where
P: Progress,
Processor: FnMut(
git_object::Kind,
&[u8],
&index::Entry,
&mut <P::SubProgress as Progress>::SubProgress,
) -> Result<(), E>,
E: std::error::Error + Send + Sync + 'static,
{
let (verify_result, traversal_result) = parallel::join(
{
let pack_progress = progress.add_child_with_id(
format!(
"Hash of pack '{}'",
pack.path().file_name().expect("pack has filename").to_string_lossy()
),
*b"PTHP", /* Pack Traverse Hash Pack bytes */
);
let index_progress = progress.add_child_with_id(
format!(
"Hash of index '{}'",
self.path.file_name().expect("index has filename").to_string_lossy()
),
*b"PTHI", /* Pack Traverse Hash Index bytes */
);
move || {
let res = self.possibly_verify(pack, check, pack_progress, index_progress, should_interrupt);
if res.is_err() {
should_interrupt.store(true, Ordering::SeqCst);
}
res
}
},
|| -> Result<_, Error<_>> {
let sorted_entries = index_entries_sorted_by_offset_ascending(
self,
progress.add_child_with_id("collecting sorted index", *b"PTCE"),
); /* Pack Traverse Collect sorted Entries */
let tree = crate::cache::delta::Tree::from_offsets_in_pack(
pack.path(),
sorted_entries.into_iter().map(Entry::from),
|e| e.index_entry.pack_offset,
|id| self.lookup(id).map(|idx| self.pack_offset_at_index(idx)),
progress.add_child_with_id("indexing", *b"PTDI"), /* Pack Traverse Delta Index creation */
should_interrupt,
self.object_hash,
)?;
let mut outcome = digest_statistics(tree.traverse(
|slice, out| pack.entry_slice(slice).map(|entry| out.copy_from_slice(entry)),
pack.pack_end() as u64,
new_processor,
|data,
progress,
traverse::Context {
entry: pack_entry,
entry_end,
decompressed: bytes,
state: ref mut processor,
level,
}| {
let object_kind = pack_entry.header.as_kind().expect("non-delta object");
data.level = level;
data.decompressed_size = pack_entry.decompressed_size;
data.object_kind = object_kind;
data.compressed_size = entry_end - pack_entry.data_offset;
data.object_size = bytes.len() as u64;
let result = crate::index::traverse::process_entry(
check,
object_kind,
bytes,
progress,
&data.index_entry,
|| {
// TODO: Fix this - we overwrite the header of 'data' which also changes the computed entry size,
// causing index and pack to seemingly mismatch. This is surprising, and should be done differently.
// debug_assert_eq!(&data.index_entry.pack_offset, &pack_entry.pack_offset());
git_features::hash::crc32(
pack.entry_slice(data.index_entry.pack_offset..entry_end)
.expect("slice pointing into the pack (by now data is verified)"),
)
},
processor,
);
match result {
Err(err @ Error::PackDecode { .. }) if !check.fatal_decode_error() => {
progress.info(format!("Ignoring decode error: {}", err));
Ok(())
}
res => res,
}
},
crate::cache::delta::traverse::Options {
object_progress: progress.add_child_with_id("Resolving", *b"PTRO"), /* Pack Traverse Resolve Objects */
size_progress: progress.add_child_with_id("Decoding", *b"PTDB"), /* Pack Traverse Decode Bytes */
thread_limit,
should_interrupt,
object_hash: self.object_hash,
},
)?);
outcome.pack_size = pack.data_len() as u64;
Ok(outcome)
},
);
Ok(Outcome {
actual_index_checksum: verify_result?,
statistics: traversal_result?,
progress,
})
}
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312
fn verify_integrity_inner<C, P, F>(
&self,
mut progress: P,
should_interrupt: &AtomicBool,
deep_check: bool,
options: index::verify::integrity::Options<F>,
) -> Result<integrity::Outcome<P>, index::traverse::Error<integrity::Error>>
where
P: Progress,
C: crate::cache::DecodeEntry,
F: Fn() -> C + Send + Clone,
{
let parent = self.path.parent().expect("must be in a directory");
let actual_index_checksum = self
.verify_checksum(
progress.add_child_with_id(format!("{}: checksum", self.path.display()), *b"MVCK"), /* Multiindex Verify ChecKsum */
should_interrupt,
)
.map_err(integrity::Error::from)
.map_err(index::traverse::Error::Processor)?;
if let Some(first_invalid) = crate::verify::fan(&self.fan) {
return Err(index::traverse::Error::Processor(integrity::Error::Fan {
index: first_invalid,
}));
}
if self.num_objects == 0 {
return Err(index::traverse::Error::Processor(integrity::Error::Empty));
}
let mut pack_traverse_statistics = Vec::new();
let operation_start = Instant::now();
let mut total_objects_checked = 0;
let mut pack_ids_and_offsets = Vec::with_capacity(self.num_objects as usize);
{
let order_start = Instant::now();
let mut progress = progress.add_child_with_id("checking oid order", *b"MVOR"); /* Multiindex Verify Oid oRder */
progress.init(
Some(self.num_objects as usize),
git_features::progress::count("objects"),
);
for entry_index in 0..(self.num_objects - 1) {
let lhs = self.oid_at_index(entry_index);
let rhs = self.oid_at_index(entry_index + 1);
if rhs.cmp(lhs) != Ordering::Greater {
return Err(index::traverse::Error::Processor(integrity::Error::OutOfOrder {
index: entry_index,
}));
}
let (pack_id, _) = self.pack_id_and_pack_offset_at_index(entry_index);
pack_ids_and_offsets.push((pack_id, entry_index));
progress.inc();
}
{
let entry_index = self.num_objects - 1;
let (pack_id, _) = self.pack_id_and_pack_offset_at_index(entry_index);
pack_ids_and_offsets.push((pack_id, entry_index));
}
// sort by pack-id to allow handling all indices matching a pack while its open.
pack_ids_and_offsets.sort_by(|l, r| l.0.cmp(&r.0));
progress.show_throughput(order_start);
};
progress.init(
Some(self.num_indices as usize),
git_features::progress::count("indices"),
);
let mut pack_ids_slice = pack_ids_and_offsets.as_slice();
for (pack_id, index_file_name) in self.index_names.iter().enumerate() {
progress.set_name(index_file_name.display().to_string());
progress.inc();
let mut bundle = None;
let index;
let index_path = parent.join(index_file_name);
let index = if deep_check {
bundle = crate::Bundle::at(index_path, self.object_hash)
.map_err(integrity::Error::from)
.map_err(index::traverse::Error::Processor)?
.into();
bundle.as_ref().map(|b| &b.index).expect("just set")
} else {
index = Some(
index::File::at(index_path, self.object_hash)
.map_err(|err| integrity::Error::BundleInit(crate::bundle::init::Error::Index(err)))
.map_err(index::traverse::Error::Processor)?,
);
index.as_ref().expect("just set")
};
let slice_end = pack_ids_slice.partition_point(|e| e.0 == pack_id as crate::data::Id);
let multi_index_entries_to_check = &pack_ids_slice[..slice_end];
{
let offset_start = Instant::now();
let mut offsets_progress = progress.add_child_with_id("verify object offsets", *b"MVOF"); /* Multiindex Verify Object Offsets */
offsets_progress.init(
Some(pack_ids_and_offsets.len()),
git_features::progress::count("objects"),
);
pack_ids_slice = &pack_ids_slice[slice_end..];
for entry_id in multi_index_entries_to_check.iter().map(|e| e.1) {
let oid = self.oid_at_index(entry_id);
let (_, expected_pack_offset) = self.pack_id_and_pack_offset_at_index(entry_id);
let entry_in_bundle_index = index.lookup(oid).ok_or_else(|| {
index::traverse::Error::Processor(integrity::Error::OidNotFound { id: oid.to_owned() })
})?;
let actual_pack_offset = index.pack_offset_at_index(entry_in_bundle_index);
if actual_pack_offset != expected_pack_offset {
return Err(index::traverse::Error::Processor(
integrity::Error::PackOffsetMismatch {
id: oid.to_owned(),
expected_pack_offset,
actual_pack_offset,
},
));
}
offsets_progress.inc();
}
if should_interrupt.load(std::sync::atomic::Ordering::Relaxed) {
return Err(index::traverse::Error::Processor(integrity::Error::Interrupted));
}
offsets_progress.show_throughput(offset_start);
}
total_objects_checked += multi_index_entries_to_check.len();
if let Some(bundle) = bundle {
progress.set_name(format!("Validating {}", index_file_name.display()));
let crate::bundle::verify::integrity::Outcome {
actual_index_checksum: _,
pack_traverse_outcome,
progress: returned_progress,
} = bundle
.verify_integrity(progress, should_interrupt, options.clone())
.map_err(|err| {
use index::traverse::Error::*;
match err {
Processor(err) => Processor(integrity::Error::IndexIntegrity(err)),
VerifyChecksum(err) => VerifyChecksum(err),
Tree(err) => Tree(err),
TreeTraversal(err) => TreeTraversal(err),
PackDecode { id, offset, source } => PackDecode { id, offset, source },
PackMismatch { expected, actual } => PackMismatch { expected, actual },
PackObjectMismatch {
expected,
actual,
offset,
kind,
} => PackObjectMismatch {
expected,
actual,
offset,
kind,
},
Crc32Mismatch {
expected,
actual,
offset,
kind,
} => Crc32Mismatch {
expected,
actual,
offset,
kind,
},
Interrupted => Interrupted,
}
})?;
progress = returned_progress;
pack_traverse_statistics.push(pack_traverse_outcome);
}
}
assert_eq!(
self.num_objects as usize, total_objects_checked,
"BUG: our slicing should allow to visit all objects"
);
progress.set_name("Validating multi-pack");
progress.show_throughput(operation_start);
Ok(integrity::Outcome {
actual_index_checksum,
pack_traverse_statistics,
progress,
})
}
sourcepub fn lookup_prefix(
&self,
prefix: Prefix,
candidates: Option<&mut Range<EntryIndex>>
) -> Option<PrefixLookupResult>
pub fn lookup_prefix(
&self,
prefix: Prefix,
candidates: Option<&mut Range<EntryIndex>>
) -> Option<PrefixLookupResult>
Given a prefix
, find an object that matches it uniquely within this index and return Some(Ok(entry_index))
.
If there is more than one object matching the object Some(Err(())
is returned.
Finally, if no object matches the index, the return value is None
.
Pass candidates
to obtain the set of entry-indices matching prefix
, with the same return value as
one would have received if it remained None
. It will be empty if no object matched the prefix
.
sourcepub fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = Entry> + 'a>
pub fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = Entry> + 'a>
An iterator over all Entries
of this index file.
Examples found in repository?
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
pub(crate) fn index_entries_sorted_by_offset_ascending(
idx: &crate::index::File,
mut progress: impl Progress,
) -> Vec<crate::index::Entry> {
progress.init(Some(idx.num_objects as usize), progress::count("entries"));
let start = Instant::now();
let mut v = Vec::with_capacity(idx.num_objects as usize);
for entry in idx.iter() {
v.push(entry);
progress.inc();
}
v.sort_by_key(|e| e.pack_offset);
progress.show_throughput(start);
v
}
More examples
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
pub fn sorted_offsets(&self) -> Vec<data::Offset> {
let mut ofs: Vec<_> = match self.version {
index::Version::V1 => self.iter().map(|e| e.pack_offset).collect(),
index::Version::V2 => {
let offset32_start = &self.data[self.offset_pack_offset_v2()..];
let pack_offset_64_start = self.offset_pack_offset64_v2();
offset32_start
.chunks(N32_SIZE)
.take(self.num_objects as usize)
.map(|offset| self.pack_offset_from_offset_v2(offset, pack_offset_64_start))
.collect()
}
};
ofs.sort_unstable();
ofs
}
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206
pub fn write_from_index_paths<P>(
mut index_paths: Vec<PathBuf>,
out: impl std::io::Write,
mut progress: P,
should_interrupt: &AtomicBool,
Options { object_hash }: Options,
) -> Result<Outcome<P>, Error>
where
P: Progress,
{
let out = git_features::hash::Write::new(out, object_hash);
let (index_paths_sorted, index_filenames_sorted) = {
index_paths.sort();
let file_names = index_paths
.iter()
.map(|p| PathBuf::from(p.file_name().expect("file name present")))
.collect::<Vec<_>>();
(index_paths, file_names)
};
let entries = {
let mut entries = Vec::new();
let start = Instant::now();
let mut progress = progress.add_child_with_id("Collecting entries", *b"MPCE"); /* Multiindex from Paths Collecting Entries */
progress.init(Some(index_paths_sorted.len()), git_features::progress::count("indices"));
// This could be parallelized… but it's probably not worth it unless you have 500mio objects.
for (index_id, index) in index_paths_sorted.iter().enumerate() {
let mtime = index
.metadata()
.and_then(|m| m.modified())
.unwrap_or(SystemTime::UNIX_EPOCH);
let index = crate::index::File::at(index, object_hash)?;
entries.reserve(index.num_objects() as usize);
entries.extend(index.iter().map(|e| Entry {
id: e.oid,
pack_index: index_id as u32,
pack_offset: e.pack_offset,
index_mtime: mtime,
}));
progress.inc();
if should_interrupt.load(Ordering::Relaxed) {
return Err(Error::Interrupted);
}
}
progress.show_throughput(start);
let start = Instant::now();
progress.set_name("Deduplicate");
progress.init(Some(entries.len()), git_features::progress::count("entries"));
entries.sort_by(|l, r| {
l.id.cmp(&r.id)
.then_with(|| l.index_mtime.cmp(&r.index_mtime).reverse())
.then_with(|| l.pack_index.cmp(&r.pack_index))
});
entries.dedup_by_key(|e| e.id);
progress.inc_by(entries.len());
progress.show_throughput(start);
if should_interrupt.load(Ordering::Relaxed) {
return Err(Error::Interrupted);
}
entries
};
let mut cf = git_chunk::file::Index::for_writing();
cf.plan_chunk(
multi_index::chunk::index_names::ID,
multi_index::chunk::index_names::storage_size(&index_filenames_sorted),
);
cf.plan_chunk(multi_index::chunk::fanout::ID, multi_index::chunk::fanout::SIZE as u64);
cf.plan_chunk(
multi_index::chunk::lookup::ID,
multi_index::chunk::lookup::storage_size(entries.len(), object_hash),
);
cf.plan_chunk(
multi_index::chunk::offsets::ID,
multi_index::chunk::offsets::storage_size(entries.len()),
);
let num_large_offsets = multi_index::chunk::large_offsets::num_large_offsets(&entries);
if let Some(num_large_offsets) = num_large_offsets {
cf.plan_chunk(
multi_index::chunk::large_offsets::ID,
multi_index::chunk::large_offsets::storage_size(num_large_offsets),
);
}
let mut write_progress = progress.add_child_with_id("Writing multi-index", *b"MPBW"); /* Multiindex Bytes Written */
let write_start = Instant::now();
write_progress.init(
Some(cf.planned_storage_size() as usize + Self::HEADER_LEN),
git_features::progress::bytes(),
);
let mut out = git_features::progress::Write {
inner: out,
progress: write_progress,
};
let bytes_written = Self::write_header(
&mut out,
cf.num_chunks().try_into().expect("BUG: wrote more than 256 chunks"),
index_paths_sorted.len() as u32,
object_hash,
)?;
{
progress.set_name("Writing chunks");
progress.init(Some(cf.num_chunks()), git_features::progress::count("chunks"));
let mut chunk_write = cf.into_write(&mut out, bytes_written)?;
while let Some(chunk_to_write) = chunk_write.next_chunk() {
match chunk_to_write {
multi_index::chunk::index_names::ID => {
multi_index::chunk::index_names::write(&index_filenames_sorted, &mut chunk_write)?
}
multi_index::chunk::fanout::ID => multi_index::chunk::fanout::write(&entries, &mut chunk_write)?,
multi_index::chunk::lookup::ID => multi_index::chunk::lookup::write(&entries, &mut chunk_write)?,
multi_index::chunk::offsets::ID => {
multi_index::chunk::offsets::write(&entries, num_large_offsets.is_some(), &mut chunk_write)?
}
multi_index::chunk::large_offsets::ID => multi_index::chunk::large_offsets::write(
&entries,
num_large_offsets.expect("available if planned"),
&mut chunk_write,
)?,
unknown => unreachable!("BUG: forgot to implement chunk {:?}", std::str::from_utf8(&unknown)),
}
progress.inc();
if should_interrupt.load(Ordering::Relaxed) {
return Err(Error::Interrupted);
}
}
}
// write trailing checksum
let multi_index_checksum: git_hash::ObjectId = out.inner.hash.digest().into();
out.inner.inner.write_all(multi_index_checksum.as_slice())?;
out.progress.show_throughput(write_start);
Ok(Outcome {
multi_index_checksum,
progress,
})
}
sourcepub fn sorted_offsets(&self) -> Vec<Offset> ⓘ
pub fn sorted_offsets(&self) -> Vec<Offset> ⓘ
Return a vector of ascending offsets into our respective pack data file.
Useful to control an iteration over all pack entries in a cache-friendly way.
source§impl File
impl File
Traversal with index
sourcepub fn traverse_with_index<P, Processor, E>(
&self,
pack: &File,
new_processor: impl Fn() -> Processor + Send + Clone,
progress: P,
should_interrupt: &AtomicBool,
_: Options
) -> Result<Outcome<P>, Error<E>>where
P: Progress,
Processor: FnMut(Kind, &[u8], &Entry, &mut <P::SubProgress as Progress>::SubProgress) -> Result<(), E>,
E: Error + Send + Sync + 'static,
pub fn traverse_with_index<P, Processor, E>(
&self,
pack: &File,
new_processor: impl Fn() -> Processor + Send + Clone,
progress: P,
should_interrupt: &AtomicBool,
_: Options
) -> Result<Outcome<P>, Error<E>>where
P: Progress,
Processor: FnMut(Kind, &[u8], &Entry, &mut <P::SubProgress as Progress>::SubProgress) -> Result<(), E>,
E: Error + Send + Sync + 'static,
Iterate through all decoded objects in the given pack
and handle them with a Processor
, using an index to reduce waste
at the cost of memory.
For more details, see the documentation on the traverse()
method.
Examples found in repository?
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
pub fn traverse<P, C, Processor, E, F>(
&self,
pack: &crate::data::File,
progress: P,
should_interrupt: &AtomicBool,
new_processor: impl Fn() -> Processor + Send + Clone,
Options {
traversal,
thread_limit,
check,
make_pack_lookup_cache,
}: Options<F>,
) -> Result<Outcome<P>, Error<E>>
where
P: Progress,
C: crate::cache::DecodeEntry,
E: std::error::Error + Send + Sync + 'static,
Processor: FnMut(
git_object::Kind,
&[u8],
&index::Entry,
&mut <P::SubProgress as Progress>::SubProgress,
) -> Result<(), E>,
F: Fn() -> C + Send + Clone,
{
match traversal {
Algorithm::Lookup => self.traverse_with_lookup(
new_processor,
pack,
progress,
should_interrupt,
with_lookup::Options {
thread_limit,
check,
make_pack_lookup_cache,
},
),
Algorithm::DeltaTreeLookup => self.traverse_with_index(
pack,
new_processor,
progress,
should_interrupt,
crate::index::traverse::with_index::Options { check, thread_limit },
),
}
}
source§impl File
impl File
Verify and validate the content of the index file
sourcepub fn traverse_with_lookup<P, C, Processor, E, F>(
&self,
new_processor: impl Fn() -> Processor + Send + Clone,
pack: &File,
progress: P,
should_interrupt: &AtomicBool,
_: Options<F>
) -> Result<Outcome<P>, Error<E>>where
P: Progress,
C: DecodeEntry,
E: Error + Send + Sync + 'static,
Processor: FnMut(Kind, &[u8], &Entry, &mut <P::SubProgress as Progress>::SubProgress) -> Result<(), E>,
F: Fn() -> C + Send + Clone,
pub fn traverse_with_lookup<P, C, Processor, E, F>(
&self,
new_processor: impl Fn() -> Processor + Send + Clone,
pack: &File,
progress: P,
should_interrupt: &AtomicBool,
_: Options<F>
) -> Result<Outcome<P>, Error<E>>where
P: Progress,
C: DecodeEntry,
E: Error + Send + Sync + 'static,
Processor: FnMut(Kind, &[u8], &Entry, &mut <P::SubProgress as Progress>::SubProgress) -> Result<(), E>,
F: Fn() -> C + Send + Clone,
Iterate through all decoded objects in the given pack
and handle them with a Processor
using a cache to reduce the amount of
waste while decoding objects.
For more details, see the documentation on the traverse()
method.
Examples found in repository?
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
pub fn traverse<P, C, Processor, E, F>(
&self,
pack: &crate::data::File,
progress: P,
should_interrupt: &AtomicBool,
new_processor: impl Fn() -> Processor + Send + Clone,
Options {
traversal,
thread_limit,
check,
make_pack_lookup_cache,
}: Options<F>,
) -> Result<Outcome<P>, Error<E>>
where
P: Progress,
C: crate::cache::DecodeEntry,
E: std::error::Error + Send + Sync + 'static,
Processor: FnMut(
git_object::Kind,
&[u8],
&index::Entry,
&mut <P::SubProgress as Progress>::SubProgress,
) -> Result<(), E>,
F: Fn() -> C + Send + Clone,
{
match traversal {
Algorithm::Lookup => self.traverse_with_lookup(
new_processor,
pack,
progress,
should_interrupt,
with_lookup::Options {
thread_limit,
check,
make_pack_lookup_cache,
},
),
Algorithm::DeltaTreeLookup => self.traverse_with_index(
pack,
new_processor,
progress,
should_interrupt,
crate::index::traverse::with_index::Options { check, thread_limit },
),
}
}
source§impl File
impl File
Traversal of pack data files using an index file
sourcepub fn traverse<P, C, Processor, E, F>(
&self,
pack: &File,
progress: P,
should_interrupt: &AtomicBool,
new_processor: impl Fn() -> Processor + Send + Clone,
_: Options<F>
) -> Result<Outcome<P>, Error<E>>where
P: Progress,
C: DecodeEntry,
E: Error + Send + Sync + 'static,
Processor: FnMut(Kind, &[u8], &Entry, &mut <P::SubProgress as Progress>::SubProgress) -> Result<(), E>,
F: Fn() -> C + Send + Clone,
pub fn traverse<P, C, Processor, E, F>(
&self,
pack: &File,
progress: P,
should_interrupt: &AtomicBool,
new_processor: impl Fn() -> Processor + Send + Clone,
_: Options<F>
) -> Result<Outcome<P>, Error<E>>where
P: Progress,
C: DecodeEntry,
E: Error + Send + Sync + 'static,
Processor: FnMut(Kind, &[u8], &Entry, &mut <P::SubProgress as Progress>::SubProgress) -> Result<(), E>,
F: Fn() -> C + Send + Clone,
Iterate through all decoded objects in the given pack
and handle them with a Processor
.
The return value is (pack-checksum, Outcome
, progress
), thus the pack traversal will always verify
the whole packs checksum to assure it was correct. In case of bit-rod, the operation will abort early without
verifying all objects using the interrupt mechanism mechanism.
Algorithms
Using the Options::traversal
field one can chose between two algorithms providing different tradeoffs. Both invoke
new_processor()
to create functions receiving decoded objects, their object kind, index entry and a progress instance to provide
progress information.
Algorithm::DeltaTreeLookup
builds an index to avoid any unnecessary computation while resolving objects, avoiding the need for a cache entirely, renderingnew_cache()
unused. One could also calltraverse_with_index()
directly.Algorithm::Lookup
uses a cache created bynew_cache()
to avoid having to re-compute all bases of a delta-chain while decoding objects. One could also calltraverse_with_lookup()
directly.
Use thread_limit
to further control parallelism and check
to define how much the passed
objects shall be verified beforehand.
Examples found in repository?
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
pub fn verify_integrity<P, C, F>(
&self,
pack: Option<PackContext<'_, F>>,
mut progress: P,
should_interrupt: &AtomicBool,
) -> Result<integrity::Outcome<P>, index::traverse::Error<index::verify::integrity::Error>>
where
P: Progress,
C: crate::cache::DecodeEntry,
F: Fn() -> C + Send + Clone,
{
if let Some(first_invalid) = crate::verify::fan(&self.fan) {
return Err(index::traverse::Error::Processor(integrity::Error::Fan {
index: first_invalid,
}));
}
match pack {
Some(PackContext {
data: pack,
options:
integrity::Options {
verify_mode,
traversal,
thread_limit,
make_pack_lookup_cache,
},
}) => self
.traverse(
pack,
progress,
should_interrupt,
|| {
let mut encode_buf = Vec::with_capacity(2048);
move |kind, data, index_entry, progress| {
Self::verify_entry(verify_mode, &mut encode_buf, kind, data, index_entry, progress)
}
},
index::traverse::Options {
traversal,
thread_limit,
check: index::traverse::SafetyCheck::All,
make_pack_lookup_cache,
},
)
.map(|o| integrity::Outcome {
actual_index_checksum: o.actual_index_checksum,
pack_traverse_statistics: Some(o.statistics),
progress: o.progress,
}),
None => self
.verify_checksum(
progress.add_child_with_id(
"Sha1 of index",
*b"PTHI", /* Pack Traverse Hash Index bytes (semantically the same as in branch above) */
),
should_interrupt,
)
.map_err(Into::into)
.map(|id| integrity::Outcome {
actual_index_checksum: id,
pack_traverse_statistics: None,
progress,
}),
}
}
source§impl File
impl File
Verify and validate the content of the index file
sourcepub fn index_checksum(&self) -> ObjectId
pub fn index_checksum(&self) -> ObjectId
Returns the trailing hash stored at the end of this index file.
It’s a hash over all bytes of the index.
Examples found in repository?
120 121 122 123 124 125 126 127 128 129 130 131 132 133
pub fn verify_checksum(
&self,
progress: impl Progress,
should_interrupt: &AtomicBool,
) -> Result<git_hash::ObjectId, checksum::Error> {
crate::verify::checksum_on_disk_or_mmap(
self.path(),
&self.data,
self.index_checksum(),
self.object_hash,
progress,
should_interrupt,
)
}
More examples
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
fn possibly_verify<E>(
&self,
pack: &crate::data::File,
check: SafetyCheck,
pack_progress: impl Progress,
index_progress: impl Progress,
should_interrupt: &AtomicBool,
) -> Result<git_hash::ObjectId, Error<E>>
where
E: std::error::Error + Send + Sync + 'static,
{
Ok(if check.file_checksum() {
if self.pack_checksum() != pack.checksum() {
return Err(Error::PackMismatch {
actual: pack.checksum(),
expected: self.pack_checksum(),
});
}
let (pack_res, id) = parallel::join(
move || pack.verify_checksum(pack_progress, should_interrupt),
move || self.verify_checksum(index_progress, should_interrupt),
);
pack_res?;
id?
} else {
self.index_checksum()
})
}
sourcepub fn pack_checksum(&self) -> ObjectId
pub fn pack_checksum(&self) -> ObjectId
Returns the hash of the pack data file that this index file corresponds to.
It should crate::data::File::checksum()
of the corresponding pack data file.
Examples found in repository?
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
fn possibly_verify<E>(
&self,
pack: &crate::data::File,
check: SafetyCheck,
pack_progress: impl Progress,
index_progress: impl Progress,
should_interrupt: &AtomicBool,
) -> Result<git_hash::ObjectId, Error<E>>
where
E: std::error::Error + Send + Sync + 'static,
{
Ok(if check.file_checksum() {
if self.pack_checksum() != pack.checksum() {
return Err(Error::PackMismatch {
actual: pack.checksum(),
expected: self.pack_checksum(),
});
}
let (pack_res, id) = parallel::join(
move || pack.verify_checksum(pack_progress, should_interrupt),
move || self.verify_checksum(index_progress, should_interrupt),
);
pack_res?;
id?
} else {
self.index_checksum()
})
}
sourcepub fn verify_checksum(
&self,
progress: impl Progress,
should_interrupt: &AtomicBool
) -> Result<ObjectId, Error>
pub fn verify_checksum(
&self,
progress: impl Progress,
should_interrupt: &AtomicBool
) -> Result<ObjectId, Error>
Validate that our index_checksum()
matches the actual contents
of this index file, and return it if it does.
Examples found in repository?
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
fn possibly_verify<E>(
&self,
pack: &crate::data::File,
check: SafetyCheck,
pack_progress: impl Progress,
index_progress: impl Progress,
should_interrupt: &AtomicBool,
) -> Result<git_hash::ObjectId, Error<E>>
where
E: std::error::Error + Send + Sync + 'static,
{
Ok(if check.file_checksum() {
if self.pack_checksum() != pack.checksum() {
return Err(Error::PackMismatch {
actual: pack.checksum(),
expected: self.pack_checksum(),
});
}
let (pack_res, id) = parallel::join(
move || pack.verify_checksum(pack_progress, should_interrupt),
move || self.verify_checksum(index_progress, should_interrupt),
);
pack_res?;
id?
} else {
self.index_checksum()
})
}
More examples
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
pub fn verify_integrity<P, C, F>(
&self,
pack: Option<PackContext<'_, F>>,
mut progress: P,
should_interrupt: &AtomicBool,
) -> Result<integrity::Outcome<P>, index::traverse::Error<index::verify::integrity::Error>>
where
P: Progress,
C: crate::cache::DecodeEntry,
F: Fn() -> C + Send + Clone,
{
if let Some(first_invalid) = crate::verify::fan(&self.fan) {
return Err(index::traverse::Error::Processor(integrity::Error::Fan {
index: first_invalid,
}));
}
match pack {
Some(PackContext {
data: pack,
options:
integrity::Options {
verify_mode,
traversal,
thread_limit,
make_pack_lookup_cache,
},
}) => self
.traverse(
pack,
progress,
should_interrupt,
|| {
let mut encode_buf = Vec::with_capacity(2048);
move |kind, data, index_entry, progress| {
Self::verify_entry(verify_mode, &mut encode_buf, kind, data, index_entry, progress)
}
},
index::traverse::Options {
traversal,
thread_limit,
check: index::traverse::SafetyCheck::All,
make_pack_lookup_cache,
},
)
.map(|o| integrity::Outcome {
actual_index_checksum: o.actual_index_checksum,
pack_traverse_statistics: Some(o.statistics),
progress: o.progress,
}),
None => self
.verify_checksum(
progress.add_child_with_id(
"Sha1 of index",
*b"PTHI", /* Pack Traverse Hash Index bytes (semantically the same as in branch above) */
),
should_interrupt,
)
.map_err(Into::into)
.map(|id| integrity::Outcome {
actual_index_checksum: id,
pack_traverse_statistics: None,
progress,
}),
}
}
sourcepub fn verify_integrity<P, C, F>(
&self,
pack: Option<PackContext<'_, F>>,
progress: P,
should_interrupt: &AtomicBool
) -> Result<Outcome<P>, Error<Error>>where
P: Progress,
C: DecodeEntry,
F: Fn() -> C + Send + Clone,
pub fn verify_integrity<P, C, F>(
&self,
pack: Option<PackContext<'_, F>>,
progress: P,
should_interrupt: &AtomicBool
) -> Result<Outcome<P>, Error<Error>>where
P: Progress,
C: DecodeEntry,
F: Fn() -> C + Send + Clone,
The most thorough validation of integrity of both index file and the corresponding pack data file, if provided. Returns the checksum of the index file, the traversal outcome and the given progress if the integrity check is successful.
If pack
is provided, it is expected (and validated to be) the pack belonging to this index.
It will be used to validate internal integrity of the pack before checking each objects integrity
is indeed as advertised via its SHA1 as stored in this index, as well as the CRC32 hash.
The last member of the Option is a function returning an implementation of crate::cache::DecodeEntry
to be used if
the index::traverse::Algorithm
is Lookup
.
To set this to None
, use None::<(_, _, _, fn() -> crate::cache::Never)>
.
The thread_limit
optionally specifies the amount of threads to be used for the pack traversal.
make_cache
is only used in case a pack
is specified, use existing implementations in the crate::cache
module.
Tradeoffs
The given progress
is inevitably consumed if there is an error, which is a tradeoff chosen to easily allow using ?
in the
error case.
Examples found in repository?
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
pub fn verify_integrity<C, P, F>(
&self,
progress: P,
should_interrupt: &AtomicBool,
options: crate::index::verify::integrity::Options<F>,
) -> Result<integrity::Outcome<P>, crate::index::traverse::Error<crate::index::verify::integrity::Error>>
where
P: Progress,
C: crate::cache::DecodeEntry,
F: Fn() -> C + Send + Clone,
{
self.index
.verify_integrity(
Some(crate::index::verify::PackContext {
data: &self.pack,
options,
}),
progress,
should_interrupt,
)
.map(|o| integrity::Outcome {
actual_index_checksum: o.actual_index_checksum,
pack_traverse_outcome: o.pack_traverse_statistics.expect("pack is set"),
progress: o.progress,
})
}
source§impl File
impl File
Various ways of writing an index file from pack entries
sourcepub fn write_data_iter_to_stream<F, F2>(
version: Version,
make_resolver: F,
entries: impl Iterator<Item = Result<Entry, Error>>,
thread_limit: Option<usize>,
root_progress: impl Progress,
out: impl Write,
should_interrupt: &AtomicBool,
object_hash: Kind,
pack_version: Version
) -> Result<Outcome, Error>where
F: FnOnce() -> Result<F2>,
F2: for<'r> Fn(EntryRange, &'r mut Vec<u8>) -> Option<()> + Send + Clone,
pub fn write_data_iter_to_stream<F, F2>(
version: Version,
make_resolver: F,
entries: impl Iterator<Item = Result<Entry, Error>>,
thread_limit: Option<usize>,
root_progress: impl Progress,
out: impl Write,
should_interrupt: &AtomicBool,
object_hash: Kind,
pack_version: Version
) -> Result<Outcome, Error>where
F: FnOnce() -> Result<F2>,
F2: for<'r> Fn(EntryRange, &'r mut Vec<u8>) -> Option<()> + Send + Clone,
Write information about entries
as obtained from a pack data file into a pack index file via the out
stream.
The resolver produced by make_resolver
must resolve pack entries from the same pack data file that produced the
entries
iterator.
kind
is the version of pack index to produce, usecrate::index::Version::default()
if in doubt.tread_limit
is used for a parallel tree traversal for obtaining object hashes with optimal performance.root_progress
is the top-level progress to stay informed about the progress of this potentially long-running computation.object_hash
defines what kind of object hash we write into the index file.pack_version
is the version of the underlying pack for whichentries
are read. It’s used in case none of these objects are provided to compute a pack-hash.
Remarks
- neither in-pack nor out-of-pack Ref Deltas are supported here, these must have been resolved beforehand.
make_resolver()
will only be called after the iterator stopped returning elements and produces a function that provides all bytes belonging to a pack entry writing them to the given mutable outputVec
. It should returnNone
if the entry cannot be resolved from the pack that produced theentries
iterator, causing the write operation to fail.
Examples found in repository?
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328
fn inner_write(
directory: Option<impl AsRef<Path>>,
mut progress: impl Progress,
Options {
thread_limit,
iteration_mode: _,
index_version: index_kind,
object_hash,
}: Options,
data_file: SharedTempFile,
pack_entries_iter: impl Iterator<Item = Result<data::input::Entry, data::input::Error>>,
should_interrupt: &AtomicBool,
pack_version: data::Version,
) -> Result<WriteOutcome, Error> {
let indexing_progress = progress.add_child_with_id("create index file", *b"BWCI"); /* Bundle Write Create Index */
Ok(match directory {
Some(directory) => {
let directory = directory.as_ref();
let mut index_file = git_tempfile::new(directory, ContainingDirectory::Exists, AutoRemove::Tempfile)?;
let outcome = crate::index::File::write_data_iter_to_stream(
index_kind,
{
let data_file = Arc::clone(&data_file);
move || new_pack_file_resolver(data_file)
},
pack_entries_iter,
thread_limit,
indexing_progress,
&mut index_file,
should_interrupt,
object_hash,
pack_version,
)?;
let data_path = directory.join(format!("pack-{}.pack", outcome.data_hash.to_hex()));
let index_path = data_path.with_extension("idx");
let keep_path = data_path.with_extension("keep");
std::fs::write(&keep_path, b"")?;
Arc::try_unwrap(data_file)
.expect("only one handle left after pack was consumed")
.into_inner()
.into_inner()
.map_err(|err| Error::from(err.into_error()))?
.persist(&data_path)?;
index_file
.persist(&index_path)
.map_err(|err| {
progress.info(format!(
"pack file at {} is retained despite failing to move the index file into place. You can use plumbing to make it usable.",
data_path.display()
));
err
})?;
WriteOutcome {
outcome,
data_path: Some(data_path),
index_path: Some(index_path),
keep_path: Some(keep_path),
}
}
None => WriteOutcome {
outcome: crate::index::File::write_data_iter_to_stream(
index_kind,
move || new_pack_file_resolver(data_file),
pack_entries_iter,
thread_limit,
indexing_progress,
io::sink(),
should_interrupt,
object_hash,
pack_version,
)?,
data_path: None,
index_path: None,
keep_path: None,
},
})
}
source§impl File
impl File
Basic file information
sourcepub fn path(&self) -> &Path
pub fn path(&self) -> &Path
The path of the opened index file
Examples found in repository?
120 121 122 123 124 125 126 127 128 129 130 131 132 133
pub fn verify_checksum(
&self,
progress: impl Progress,
should_interrupt: &AtomicBool,
) -> Result<git_hash::ObjectId, checksum::Error> {
crate::verify::checksum_on_disk_or_mmap(
self.path(),
&self.data,
self.index_checksum(),
self.object_hash,
progress,
should_interrupt,
)
}
sourcepub fn num_objects(&self) -> EntryIndex
pub fn num_objects(&self) -> EntryIndex
The amount of objects stored in the pack and index, as one past the highest entry index.
Examples found in repository?
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
pub fn traverse_with_lookup<P, C, Processor, E, F>(
&self,
new_processor: impl Fn() -> Processor + Send + Clone,
pack: &crate::data::File,
mut progress: P,
should_interrupt: &AtomicBool,
Options {
thread_limit,
check,
make_pack_lookup_cache,
}: Options<F>,
) -> Result<Outcome<P>, Error<E>>
where
P: Progress,
C: crate::cache::DecodeEntry,
E: std::error::Error + Send + Sync + 'static,
Processor: FnMut(
git_object::Kind,
&[u8],
&index::Entry,
&mut <P::SubProgress as Progress>::SubProgress,
) -> Result<(), E>,
F: Fn() -> C + Send + Clone,
{
let (verify_result, traversal_result) = parallel::join(
{
let pack_progress = progress.add_child_with_id(
format!(
"Hash of pack '{}'",
pack.path().file_name().expect("pack has filename").to_string_lossy()
),
*b"PTHP", /* Pack Traverse Hash Pack bytes */
);
let index_progress = progress.add_child_with_id(
format!(
"Hash of index '{}'",
self.path.file_name().expect("index has filename").to_string_lossy()
),
*b"PTHI", /* Pack Traverse Hash Index bytes */
);
move || {
let res = self.possibly_verify(pack, check, pack_progress, index_progress, should_interrupt);
if res.is_err() {
should_interrupt.store(true, Ordering::SeqCst);
}
res
}
},
|| {
let index_entries = util::index_entries_sorted_by_offset_ascending(
self,
progress.add_child_with_id("collecting sorted index", *b"PTCE"),
); /* Pack Traverse Collect sorted Entries */
let (chunk_size, thread_limit, available_cores) =
parallel::optimize_chunk_size_and_thread_limit(1000, Some(index_entries.len()), thread_limit, None);
let there_are_enough_entries_to_process = || index_entries.len() > chunk_size * available_cores;
let input_chunks = index_entries.chunks(chunk_size.max(chunk_size));
let reduce_progress = OwnShared::new(Mutable::new({
let mut p = progress.add_child_with_id("Traversing", *b"PTRO"); /* Pack Traverse Resolve Objects */
p.init(Some(self.num_objects() as usize), progress::count("objects"));
p
}));
let state_per_thread = {
let reduce_progress = reduce_progress.clone();
move |index| {
(
make_pack_lookup_cache(),
new_processor(),
Vec::with_capacity(2048), // decode buffer
lock(&reduce_progress)
.add_child_with_id(format!("thread {}", index), git_features::progress::UNKNOWN), // per thread progress
)
}
};
in_parallel_if(
there_are_enough_entries_to_process,
input_chunks,
thread_limit,
state_per_thread,
|entries: &[index::Entry],
(cache, ref mut processor, buf, progress)|
-> Result<Vec<data::decode::entry::Outcome>, Error<_>> {
progress.init(
Some(entries.len()),
Some(unit::dynamic(unit::Human::new(
unit::human::Formatter::new(),
"objects",
))),
);
let mut stats = Vec::with_capacity(entries.len());
progress.set(0);
for index_entry in entries.iter() {
let result = self.decode_and_process_entry(
check,
pack,
cache,
buf,
progress,
index_entry,
processor,
);
progress.inc();
let stat = match result {
Err(err @ Error::PackDecode { .. }) if !check.fatal_decode_error() => {
progress.info(format!("Ignoring decode error: {}", err));
continue;
}
res => res,
}?;
stats.push(stat);
}
Ok(stats)
},
Reducer::from_progress(reduce_progress, pack.data_len(), check, should_interrupt),
)
},
);
Ok(Outcome {
actual_index_checksum: verify_result?,
statistics: traversal_result?,
progress,
})
}
More examples
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206
pub fn write_from_index_paths<P>(
mut index_paths: Vec<PathBuf>,
out: impl std::io::Write,
mut progress: P,
should_interrupt: &AtomicBool,
Options { object_hash }: Options,
) -> Result<Outcome<P>, Error>
where
P: Progress,
{
let out = git_features::hash::Write::new(out, object_hash);
let (index_paths_sorted, index_filenames_sorted) = {
index_paths.sort();
let file_names = index_paths
.iter()
.map(|p| PathBuf::from(p.file_name().expect("file name present")))
.collect::<Vec<_>>();
(index_paths, file_names)
};
let entries = {
let mut entries = Vec::new();
let start = Instant::now();
let mut progress = progress.add_child_with_id("Collecting entries", *b"MPCE"); /* Multiindex from Paths Collecting Entries */
progress.init(Some(index_paths_sorted.len()), git_features::progress::count("indices"));
// This could be parallelized… but it's probably not worth it unless you have 500mio objects.
for (index_id, index) in index_paths_sorted.iter().enumerate() {
let mtime = index
.metadata()
.and_then(|m| m.modified())
.unwrap_or(SystemTime::UNIX_EPOCH);
let index = crate::index::File::at(index, object_hash)?;
entries.reserve(index.num_objects() as usize);
entries.extend(index.iter().map(|e| Entry {
id: e.oid,
pack_index: index_id as u32,
pack_offset: e.pack_offset,
index_mtime: mtime,
}));
progress.inc();
if should_interrupt.load(Ordering::Relaxed) {
return Err(Error::Interrupted);
}
}
progress.show_throughput(start);
let start = Instant::now();
progress.set_name("Deduplicate");
progress.init(Some(entries.len()), git_features::progress::count("entries"));
entries.sort_by(|l, r| {
l.id.cmp(&r.id)
.then_with(|| l.index_mtime.cmp(&r.index_mtime).reverse())
.then_with(|| l.pack_index.cmp(&r.pack_index))
});
entries.dedup_by_key(|e| e.id);
progress.inc_by(entries.len());
progress.show_throughput(start);
if should_interrupt.load(Ordering::Relaxed) {
return Err(Error::Interrupted);
}
entries
};
let mut cf = git_chunk::file::Index::for_writing();
cf.plan_chunk(
multi_index::chunk::index_names::ID,
multi_index::chunk::index_names::storage_size(&index_filenames_sorted),
);
cf.plan_chunk(multi_index::chunk::fanout::ID, multi_index::chunk::fanout::SIZE as u64);
cf.plan_chunk(
multi_index::chunk::lookup::ID,
multi_index::chunk::lookup::storage_size(entries.len(), object_hash),
);
cf.plan_chunk(
multi_index::chunk::offsets::ID,
multi_index::chunk::offsets::storage_size(entries.len()),
);
let num_large_offsets = multi_index::chunk::large_offsets::num_large_offsets(&entries);
if let Some(num_large_offsets) = num_large_offsets {
cf.plan_chunk(
multi_index::chunk::large_offsets::ID,
multi_index::chunk::large_offsets::storage_size(num_large_offsets),
);
}
let mut write_progress = progress.add_child_with_id("Writing multi-index", *b"MPBW"); /* Multiindex Bytes Written */
let write_start = Instant::now();
write_progress.init(
Some(cf.planned_storage_size() as usize + Self::HEADER_LEN),
git_features::progress::bytes(),
);
let mut out = git_features::progress::Write {
inner: out,
progress: write_progress,
};
let bytes_written = Self::write_header(
&mut out,
cf.num_chunks().try_into().expect("BUG: wrote more than 256 chunks"),
index_paths_sorted.len() as u32,
object_hash,
)?;
{
progress.set_name("Writing chunks");
progress.init(Some(cf.num_chunks()), git_features::progress::count("chunks"));
let mut chunk_write = cf.into_write(&mut out, bytes_written)?;
while let Some(chunk_to_write) = chunk_write.next_chunk() {
match chunk_to_write {
multi_index::chunk::index_names::ID => {
multi_index::chunk::index_names::write(&index_filenames_sorted, &mut chunk_write)?
}
multi_index::chunk::fanout::ID => multi_index::chunk::fanout::write(&entries, &mut chunk_write)?,
multi_index::chunk::lookup::ID => multi_index::chunk::lookup::write(&entries, &mut chunk_write)?,
multi_index::chunk::offsets::ID => {
multi_index::chunk::offsets::write(&entries, num_large_offsets.is_some(), &mut chunk_write)?
}
multi_index::chunk::large_offsets::ID => multi_index::chunk::large_offsets::write(
&entries,
num_large_offsets.expect("available if planned"),
&mut chunk_write,
)?,
unknown => unreachable!("BUG: forgot to implement chunk {:?}", std::str::from_utf8(&unknown)),
}
progress.inc();
if should_interrupt.load(Ordering::Relaxed) {
return Err(Error::Interrupted);
}
}
}
// write trailing checksum
let multi_index_checksum: git_hash::ObjectId = out.inner.hash.digest().into();
out.inner.inner.write_all(multi_index_checksum.as_slice())?;
out.progress.show_throughput(write_start);
Ok(Outcome {
multi_index_checksum,
progress,
})
}
sourcepub fn object_hash(&self) -> Kind
pub fn object_hash(&self) -> Kind
The kind of hash we assume