gix_pack/multi_index/
write.rs1use std::{
2 path::PathBuf,
3 sync::atomic::{AtomicBool, Ordering},
4 time::{Instant, SystemTime},
5};
6
7use gix_features::progress::{Count, DynNestedProgress, Progress};
8
9use crate::multi_index;
10
11mod error {
12 #[derive(Debug, thiserror::Error)]
14 #[allow(missing_docs)]
15 pub enum Error {
16 #[error(transparent)]
17 Io(#[from] gix_hash::io::Error),
18 #[error("Interrupted")]
19 Interrupted,
20 #[error(transparent)]
21 OpenIndex(#[from] crate::index::init::Error),
22 }
23}
24pub use error::Error;
25
26pub(crate) struct Entry {
28 pub(crate) id: gix_hash::ObjectId,
29 pub(crate) pack_index: u32,
30 pub(crate) pack_offset: crate::data::Offset,
31 index_mtime: SystemTime,
33}
34
35pub struct Options {
37 pub object_hash: gix_hash::Kind,
39}
40
41pub struct Outcome {
43 pub multi_index_checksum: gix_hash::ObjectId,
45}
46
47#[derive(Debug, Copy, Clone)]
51pub enum ProgressId {
52 FromPathsCollectingEntries,
54 BytesWritten,
56}
57
58impl From<ProgressId> for gix_features::progress::Id {
59 fn from(v: ProgressId) -> Self {
60 match v {
61 ProgressId::FromPathsCollectingEntries => *b"MPCE",
62 ProgressId::BytesWritten => *b"MPBW",
63 }
64 }
65}
66
67impl multi_index::File {
68 pub(crate) const SIGNATURE: &'static [u8] = b"MIDX";
69 pub(crate) const HEADER_LEN: usize = 4 +
70 1 +
71 1 +
72 1 +
73 1 +
74 4 ;
75
76 pub fn write_from_index_paths(
80 mut index_paths: Vec<PathBuf>,
81 out: &mut dyn std::io::Write,
82 progress: &mut dyn DynNestedProgress,
83 should_interrupt: &AtomicBool,
84 Options { object_hash }: Options,
85 ) -> Result<Outcome, Error> {
86 let out = gix_hash::io::Write::new(out, object_hash);
87 let (index_paths_sorted, index_filenames_sorted) = {
88 index_paths.sort();
89 let file_names = index_paths
90 .iter()
91 .map(|p| PathBuf::from(p.file_name().expect("file name present")))
92 .collect::<Vec<_>>();
93 (index_paths, file_names)
94 };
95
96 let entries = {
97 let mut entries = Vec::new();
98 let start = Instant::now();
99 let mut progress = progress.add_child_with_id(
100 "Collecting entries".into(),
101 ProgressId::FromPathsCollectingEntries.into(),
102 );
103 progress.init(Some(index_paths_sorted.len()), gix_features::progress::count("indices"));
104
105 for (index_id, index) in index_paths_sorted.iter().enumerate() {
107 let mtime = index
108 .metadata()
109 .and_then(|m| m.modified())
110 .unwrap_or(SystemTime::UNIX_EPOCH);
111 let index = crate::index::File::at(index, object_hash)?;
112
113 entries.reserve(index.num_objects() as usize);
114 entries.extend(index.iter().map(|e| Entry {
115 id: e.oid,
116 pack_index: index_id as u32,
117 pack_offset: e.pack_offset,
118 index_mtime: mtime,
119 }));
120 progress.inc();
121 if should_interrupt.load(Ordering::Relaxed) {
122 return Err(Error::Interrupted);
123 }
124 }
125 progress.show_throughput(start);
126
127 let start = Instant::now();
128 progress.set_name("Deduplicate".into());
129 progress.init(Some(entries.len()), gix_features::progress::count("entries"));
130 entries.sort_by(|l, r| {
131 l.id.cmp(&r.id)
132 .then_with(|| l.index_mtime.cmp(&r.index_mtime).reverse())
133 .then_with(|| l.pack_index.cmp(&r.pack_index))
134 });
135 entries.dedup_by_key(|e| e.id);
136 progress.inc_by(entries.len());
137 progress.show_throughput(start);
138 if should_interrupt.load(Ordering::Relaxed) {
139 return Err(Error::Interrupted);
140 }
141 entries
142 };
143
144 let mut cf = gix_chunk::file::Index::for_writing();
145 cf.plan_chunk(
146 multi_index::chunk::index_names::ID,
147 multi_index::chunk::index_names::storage_size(&index_filenames_sorted),
148 );
149 cf.plan_chunk(multi_index::chunk::fanout::ID, multi_index::chunk::fanout::SIZE as u64);
150 cf.plan_chunk(
151 multi_index::chunk::lookup::ID,
152 multi_index::chunk::lookup::storage_size(entries.len(), object_hash),
153 );
154 cf.plan_chunk(
155 multi_index::chunk::offsets::ID,
156 multi_index::chunk::offsets::storage_size(entries.len()),
157 );
158
159 let num_large_offsets = multi_index::chunk::large_offsets::num_large_offsets(&entries);
160 if let Some(num_large_offsets) = num_large_offsets {
161 cf.plan_chunk(
162 multi_index::chunk::large_offsets::ID,
163 multi_index::chunk::large_offsets::storage_size(num_large_offsets),
164 );
165 }
166
167 let mut write_progress =
168 progress.add_child_with_id("Writing multi-index".into(), ProgressId::BytesWritten.into());
169 let write_start = Instant::now();
170 write_progress.init(
171 Some(cf.planned_storage_size() as usize + Self::HEADER_LEN),
172 gix_features::progress::bytes(),
173 );
174 let mut out = gix_features::progress::Write {
175 inner: out,
176 progress: write_progress,
177 };
178
179 let bytes_written = Self::write_header(
180 &mut out,
181 cf.num_chunks().try_into().expect("BUG: wrote more than 256 chunks"),
182 index_paths_sorted.len() as u32,
183 object_hash,
184 )
185 .map_err(gix_hash::io::Error::from)?;
186
187 {
188 progress.set_name("Writing chunks".into());
189 progress.init(Some(cf.num_chunks()), gix_features::progress::count("chunks"));
190
191 let mut chunk_write = cf
192 .into_write(&mut out, bytes_written)
193 .map_err(gix_hash::io::Error::from)?;
194 while let Some(chunk_to_write) = chunk_write.next_chunk() {
195 match chunk_to_write {
196 multi_index::chunk::index_names::ID => {
197 multi_index::chunk::index_names::write(&index_filenames_sorted, &mut chunk_write)
198 }
199 multi_index::chunk::fanout::ID => multi_index::chunk::fanout::write(&entries, &mut chunk_write),
200 multi_index::chunk::lookup::ID => multi_index::chunk::lookup::write(&entries, &mut chunk_write),
201 multi_index::chunk::offsets::ID => {
202 multi_index::chunk::offsets::write(&entries, num_large_offsets.is_some(), &mut chunk_write)
203 }
204 multi_index::chunk::large_offsets::ID => multi_index::chunk::large_offsets::write(
205 &entries,
206 num_large_offsets.expect("available if planned"),
207 &mut chunk_write,
208 ),
209 unknown => unreachable!("BUG: forgot to implement chunk {:?}", std::str::from_utf8(&unknown)),
210 }
211 .map_err(gix_hash::io::Error::from)?;
212 progress.inc();
213 if should_interrupt.load(Ordering::Relaxed) {
214 return Err(Error::Interrupted);
215 }
216 }
217 }
218
219 let multi_index_checksum = out.inner.hash.try_finalize().map_err(gix_hash::io::Error::from)?;
221 out.inner
222 .inner
223 .write_all(multi_index_checksum.as_slice())
224 .map_err(gix_hash::io::Error::from)?;
225 out.progress.show_throughput(write_start);
226
227 Ok(Outcome { multi_index_checksum })
228 }
229
230 fn write_header(
231 out: &mut dyn std::io::Write,
232 num_chunks: u8,
233 num_indices: u32,
234 object_hash: gix_hash::Kind,
235 ) -> std::io::Result<usize> {
236 out.write_all(Self::SIGNATURE)?;
237 out.write_all(&[crate::multi_index::Version::V1 as u8])?;
238 out.write_all(&[object_hash as u8])?;
239 out.write_all(&[num_chunks])?;
240 out.write_all(&[0])?; out.write_all(&num_indices.to_be_bytes())?;
242
243 Ok(Self::HEADER_LEN)
244 }
245}