git_pack/multi_index/
write.rs1use std::{
2 convert::TryInto,
3 path::PathBuf,
4 sync::atomic::{AtomicBool, Ordering},
5 time::{Instant, SystemTime},
6};
7
8use git_features::progress::Progress;
9
10use crate::multi_index;
11
12mod error {
13 #[derive(Debug, thiserror::Error)]
15 #[allow(missing_docs)]
16 pub enum Error {
17 #[error(transparent)]
18 Io(#[from] std::io::Error),
19 #[error("Interrupted")]
20 Interrupted,
21 #[error(transparent)]
22 OpenIndex(#[from] crate::index::init::Error),
23 }
24}
25pub use error::Error;
26
27pub(crate) struct Entry {
29 pub(crate) id: git_hash::ObjectId,
30 pub(crate) pack_index: u32,
31 pub(crate) pack_offset: crate::data::Offset,
32 index_mtime: SystemTime,
34}
35
36pub struct Options {
38 pub object_hash: git_hash::Kind,
40}
41
42pub struct Outcome<P> {
44 pub multi_index_checksum: git_hash::ObjectId,
46 pub progress: P,
48}
49
50#[derive(Debug, Copy, Clone)]
54pub enum ProgressId {
55 FromPathsCollectingEntries,
57 BytesWritten,
59}
60
61impl From<ProgressId> for git_features::progress::Id {
62 fn from(v: ProgressId) -> Self {
63 match v {
64 ProgressId::FromPathsCollectingEntries => *b"MPCE",
65 ProgressId::BytesWritten => *b"MPBW",
66 }
67 }
68}
69
70impl multi_index::File {
71 pub(crate) const SIGNATURE: &'static [u8] = b"MIDX";
72 pub(crate) const HEADER_LEN: usize = 4 +
73 1 +
74 1 +
75 1 +
76 1 +
77 4 ;
78
79 pub fn write_from_index_paths<P>(
83 mut index_paths: Vec<PathBuf>,
84 out: impl std::io::Write,
85 mut progress: P,
86 should_interrupt: &AtomicBool,
87 Options { object_hash }: Options,
88 ) -> Result<Outcome<P>, Error>
89 where
90 P: Progress,
91 {
92 let out = git_features::hash::Write::new(out, object_hash);
93 let (index_paths_sorted, index_filenames_sorted) = {
94 index_paths.sort();
95 let file_names = index_paths
96 .iter()
97 .map(|p| PathBuf::from(p.file_name().expect("file name present")))
98 .collect::<Vec<_>>();
99 (index_paths, file_names)
100 };
101
102 let entries = {
103 let mut entries = Vec::new();
104 let start = Instant::now();
105 let mut progress =
106 progress.add_child_with_id("Collecting entries", ProgressId::FromPathsCollectingEntries.into());
107 progress.init(Some(index_paths_sorted.len()), git_features::progress::count("indices"));
108
109 for (index_id, index) in index_paths_sorted.iter().enumerate() {
111 let mtime = index
112 .metadata()
113 .and_then(|m| m.modified())
114 .unwrap_or(SystemTime::UNIX_EPOCH);
115 let index = crate::index::File::at(index, object_hash)?;
116
117 entries.reserve(index.num_objects() as usize);
118 entries.extend(index.iter().map(|e| Entry {
119 id: e.oid,
120 pack_index: index_id as u32,
121 pack_offset: e.pack_offset,
122 index_mtime: mtime,
123 }));
124 progress.inc();
125 if should_interrupt.load(Ordering::Relaxed) {
126 return Err(Error::Interrupted);
127 }
128 }
129 progress.show_throughput(start);
130
131 let start = Instant::now();
132 progress.set_name("Deduplicate");
133 progress.init(Some(entries.len()), git_features::progress::count("entries"));
134 entries.sort_by(|l, r| {
135 l.id.cmp(&r.id)
136 .then_with(|| l.index_mtime.cmp(&r.index_mtime).reverse())
137 .then_with(|| l.pack_index.cmp(&r.pack_index))
138 });
139 entries.dedup_by_key(|e| e.id);
140 progress.inc_by(entries.len());
141 progress.show_throughput(start);
142 if should_interrupt.load(Ordering::Relaxed) {
143 return Err(Error::Interrupted);
144 }
145 entries
146 };
147
148 let mut cf = git_chunk::file::Index::for_writing();
149 cf.plan_chunk(
150 multi_index::chunk::index_names::ID,
151 multi_index::chunk::index_names::storage_size(&index_filenames_sorted),
152 );
153 cf.plan_chunk(multi_index::chunk::fanout::ID, multi_index::chunk::fanout::SIZE as u64);
154 cf.plan_chunk(
155 multi_index::chunk::lookup::ID,
156 multi_index::chunk::lookup::storage_size(entries.len(), object_hash),
157 );
158 cf.plan_chunk(
159 multi_index::chunk::offsets::ID,
160 multi_index::chunk::offsets::storage_size(entries.len()),
161 );
162
163 let num_large_offsets = multi_index::chunk::large_offsets::num_large_offsets(&entries);
164 if let Some(num_large_offsets) = num_large_offsets {
165 cf.plan_chunk(
166 multi_index::chunk::large_offsets::ID,
167 multi_index::chunk::large_offsets::storage_size(num_large_offsets),
168 );
169 }
170
171 let mut write_progress = progress.add_child_with_id("Writing multi-index", ProgressId::BytesWritten.into());
172 let write_start = Instant::now();
173 write_progress.init(
174 Some(cf.planned_storage_size() as usize + Self::HEADER_LEN),
175 git_features::progress::bytes(),
176 );
177 let mut out = git_features::progress::Write {
178 inner: out,
179 progress: write_progress,
180 };
181
182 let bytes_written = Self::write_header(
183 &mut out,
184 cf.num_chunks().try_into().expect("BUG: wrote more than 256 chunks"),
185 index_paths_sorted.len() as u32,
186 object_hash,
187 )?;
188
189 {
190 progress.set_name("Writing chunks");
191 progress.init(Some(cf.num_chunks()), git_features::progress::count("chunks"));
192
193 let mut chunk_write = cf.into_write(&mut out, bytes_written)?;
194 while let Some(chunk_to_write) = chunk_write.next_chunk() {
195 match chunk_to_write {
196 multi_index::chunk::index_names::ID => {
197 multi_index::chunk::index_names::write(&index_filenames_sorted, &mut chunk_write)?
198 }
199 multi_index::chunk::fanout::ID => multi_index::chunk::fanout::write(&entries, &mut chunk_write)?,
200 multi_index::chunk::lookup::ID => multi_index::chunk::lookup::write(&entries, &mut chunk_write)?,
201 multi_index::chunk::offsets::ID => {
202 multi_index::chunk::offsets::write(&entries, num_large_offsets.is_some(), &mut chunk_write)?
203 }
204 multi_index::chunk::large_offsets::ID => multi_index::chunk::large_offsets::write(
205 &entries,
206 num_large_offsets.expect("available if planned"),
207 &mut chunk_write,
208 )?,
209 unknown => unreachable!("BUG: forgot to implement chunk {:?}", std::str::from_utf8(&unknown)),
210 }
211 progress.inc();
212 if should_interrupt.load(Ordering::Relaxed) {
213 return Err(Error::Interrupted);
214 }
215 }
216 }
217
218 let multi_index_checksum: git_hash::ObjectId = out.inner.hash.digest().into();
220 out.inner.inner.write_all(multi_index_checksum.as_slice())?;
221 out.progress.show_throughput(write_start);
222
223 Ok(Outcome {
224 multi_index_checksum,
225 progress,
226 })
227 }
228
229 fn write_header(
230 mut out: impl std::io::Write,
231 num_chunks: u8,
232 num_indices: u32,
233 object_hash: git_hash::Kind,
234 ) -> std::io::Result<usize> {
235 out.write_all(Self::SIGNATURE)?;
236 out.write_all(&[crate::multi_index::Version::V1 as u8])?;
237 out.write_all(&[object_hash as u8])?;
238 out.write_all(&[num_chunks])?;
239 out.write_all(&[0])?; out.write_all(&num_indices.to_be_bytes())?;
241
242 Ok(Self::HEADER_LEN)
243 }
244}