1#![allow(missing_docs)]
16
17use std::any::Any;
18use std::collections::HashSet;
19use std::fs;
20use std::io;
21use std::io::Write as _;
22use std::path::Path;
23use std::path::PathBuf;
24use std::sync::Arc;
25
26use itertools::Itertools as _;
27use tempfile::NamedTempFile;
28use thiserror::Error;
29
30use super::mutable::DefaultMutableIndex;
31use super::readonly::DefaultReadonlyIndex;
32use super::readonly::ReadonlyIndexLoadError;
33use super::readonly::ReadonlyIndexSegment;
34use crate::backend::BackendError;
35use crate::backend::BackendInitError;
36use crate::backend::CommitId;
37use crate::commit::CommitByCommitterTimestamp;
38use crate::dag_walk;
39use crate::file_util;
40use crate::file_util::persist_content_addressed_temp_file;
41use crate::file_util::IoResultExt as _;
42use crate::file_util::PathError;
43use crate::index::Index as _;
44use crate::index::IndexReadError;
45use crate::index::IndexStore;
46use crate::index::IndexWriteError;
47use crate::index::MutableIndex;
48use crate::index::ReadonlyIndex;
49use crate::object_id::ObjectId as _;
50use crate::op_store::OpStoreError;
51use crate::op_store::OperationId;
52use crate::operation::Operation;
53use crate::store::Store;
54
55const SEGMENT_FILE_NAME_LENGTH: usize = 64 * 2;
57
58#[derive(Debug, Error)]
60#[error("Failed to initialize index store")]
61pub struct DefaultIndexStoreInitError(#[from] pub PathError);
62
63impl From<DefaultIndexStoreInitError> for BackendInitError {
64 fn from(err: DefaultIndexStoreInitError) -> Self {
65 BackendInitError(err.into())
66 }
67}
68
69#[derive(Debug, Error)]
70pub enum DefaultIndexStoreError {
71 #[error("Failed to associate commit index file with an operation {op_id}")]
72 AssociateIndex {
73 op_id: OperationId,
74 source: io::Error,
75 },
76 #[error("Failed to load associated commit index file name")]
77 LoadAssociation(#[source] io::Error),
78 #[error(transparent)]
79 LoadIndex(ReadonlyIndexLoadError),
80 #[error("Failed to write commit index file")]
81 SaveIndex(#[source] io::Error),
82 #[error("Failed to index commits at operation {op_id}")]
83 IndexCommits {
84 op_id: OperationId,
85 source: BackendError,
86 },
87 #[error(transparent)]
88 OpStore(#[from] OpStoreError),
89}
90
91#[derive(Debug)]
92pub struct DefaultIndexStore {
93 dir: PathBuf,
94}
95
96impl DefaultIndexStore {
97 pub fn name() -> &'static str {
98 "default"
99 }
100
101 pub fn init(dir: &Path) -> Result<Self, DefaultIndexStoreInitError> {
102 let store = DefaultIndexStore {
103 dir: dir.to_owned(),
104 };
105 store.ensure_base_dirs()?;
106 Ok(store)
107 }
108
109 pub fn load(dir: &Path) -> DefaultIndexStore {
110 DefaultIndexStore {
111 dir: dir.to_owned(),
112 }
113 }
114
115 pub fn reinit(&self) -> Result<(), DefaultIndexStoreInitError> {
116 self.ensure_base_dirs()?;
118 file_util::remove_dir_contents(&self.operations_dir())?;
120 file_util::remove_dir_contents(&self.segments_dir())?;
123 for entry in self.dir.read_dir().context(&self.dir)? {
125 let entry = entry.context(&self.dir)?;
126 let path = entry.path();
127 if path.file_name().unwrap().len() != SEGMENT_FILE_NAME_LENGTH {
128 continue;
130 }
131 fs::remove_file(&path).context(&path)?;
132 }
133 Ok(())
134 }
135
136 fn ensure_base_dirs(&self) -> Result<(), PathError> {
137 for dir in [self.operations_dir(), self.segments_dir()] {
138 file_util::create_or_reuse_dir(&dir).context(&dir)?;
139 }
140 Ok(())
141 }
142
143 fn operations_dir(&self) -> PathBuf {
144 self.dir.join("operations")
145 }
146
147 fn segments_dir(&self) -> PathBuf {
148 self.dir.join("segments")
149 }
150
151 fn load_index_segments_at_operation(
152 &self,
153 op_id: &OperationId,
154 commit_id_length: usize,
155 change_id_length: usize,
156 ) -> Result<Arc<ReadonlyIndexSegment>, DefaultIndexStoreError> {
157 let op_id_file = self.operations_dir().join(op_id.hex());
158 let index_file_id_hex =
159 fs::read_to_string(op_id_file).map_err(DefaultIndexStoreError::LoadAssociation)?;
160 ReadonlyIndexSegment::load(
161 &self.segments_dir(),
162 index_file_id_hex,
163 commit_id_length,
164 change_id_length,
165 )
166 .map_err(DefaultIndexStoreError::LoadIndex)
167 }
168
169 pub fn build_index_at_operation(
174 &self,
175 operation: &Operation,
176 store: &Arc<Store>,
177 ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
178 let index_segment = self.build_index_segments_at_operation(operation, store)?;
179 Ok(DefaultReadonlyIndex::from_segment(index_segment))
180 }
181
182 #[tracing::instrument(skip(self, store))]
183 fn build_index_segments_at_operation(
184 &self,
185 operation: &Operation,
186 store: &Arc<Store>,
187 ) -> Result<Arc<ReadonlyIndexSegment>, DefaultIndexStoreError> {
188 let view = operation.view()?;
189 let operations_dir = self.operations_dir();
190 let commit_id_length = store.commit_id_length();
191 let change_id_length = store.change_id_length();
192 let mut visited_heads: HashSet<CommitId> =
193 view.all_referenced_commit_ids().cloned().collect();
194 let mut historical_heads: Vec<(CommitId, OperationId)> = visited_heads
195 .iter()
196 .map(|commit_id| (commit_id.clone(), operation.id().clone()))
197 .collect();
198 let mut parent_op_id: Option<OperationId> = None;
199 for op in dag_walk::dfs_ok(
200 [Ok(operation.clone())],
201 |op: &Operation| op.id().clone(),
202 |op: &Operation| op.parents().collect_vec(),
203 ) {
204 let op = op?;
205 if parent_op_id.is_none() && operations_dir.join(op.id().hex()).is_file() {
210 parent_op_id = Some(op.id().clone());
211 }
212 for commit_id in op.view()?.all_referenced_commit_ids() {
214 if visited_heads.insert(commit_id.clone()) {
215 historical_heads.push((commit_id.clone(), op.id().clone()));
216 }
217 }
218 }
219 let maybe_parent_file;
220 let mut mutable_index;
221 match parent_op_id {
222 None => {
223 maybe_parent_file = None;
224 mutable_index = DefaultMutableIndex::full(commit_id_length, change_id_length);
225 }
226 Some(parent_op_id) => {
227 let parent_file = self.load_index_segments_at_operation(
228 &parent_op_id,
229 commit_id_length,
230 change_id_length,
231 )?;
232 maybe_parent_file = Some(parent_file.clone());
233 mutable_index = DefaultMutableIndex::incremental(parent_file);
234 }
235 }
236
237 tracing::info!(
238 ?maybe_parent_file,
239 heads_count = historical_heads.len(),
240 "indexing commits reachable from historical heads"
241 );
242 let parent_file_has_id = |id: &CommitId| {
245 maybe_parent_file
246 .as_ref()
247 .is_some_and(|segment| segment.as_composite().has_id(id))
248 };
249 let get_commit_with_op = |commit_id: &CommitId, op_id: &OperationId| {
250 let op_id = op_id.clone();
251 match store.get_commit(commit_id) {
252 Ok(commit) => Ok((CommitByCommitterTimestamp(commit), op_id)),
256 Err(source) => Err(DefaultIndexStoreError::IndexCommits { op_id, source }),
257 }
258 };
259 let commits = dag_walk::topo_order_reverse_ord_ok(
260 historical_heads
261 .iter()
262 .filter(|&(commit_id, _)| !parent_file_has_id(commit_id))
263 .map(|(commit_id, op_id)| get_commit_with_op(commit_id, op_id)),
264 |(CommitByCommitterTimestamp(commit), _)| commit.id().clone(),
265 |(CommitByCommitterTimestamp(commit), op_id)| {
266 itertools::chain(commit.parent_ids(), commit.predecessor_ids())
267 .filter(|&id| !parent_file_has_id(id))
268 .map(|commit_id| get_commit_with_op(commit_id, op_id))
269 .collect_vec()
270 },
271 )?;
272 for (CommitByCommitterTimestamp(commit), _) in commits.iter().rev() {
273 mutable_index.add_commit(commit);
274 }
275
276 let index_file = self.save_mutable_index(mutable_index, operation.id())?;
277 tracing::info!(
278 ?index_file,
279 commits_count = commits.len(),
280 "saved new index file"
281 );
282
283 Ok(index_file)
284 }
285
286 fn save_mutable_index(
287 &self,
288 mutable_index: DefaultMutableIndex,
289 op_id: &OperationId,
290 ) -> Result<Arc<ReadonlyIndexSegment>, DefaultIndexStoreError> {
291 let index_segment = mutable_index
292 .squash_and_save_in(&self.segments_dir())
293 .map_err(DefaultIndexStoreError::SaveIndex)?;
294 self.associate_file_with_operation(&index_segment, op_id)
295 .map_err(|source| DefaultIndexStoreError::AssociateIndex {
296 op_id: op_id.to_owned(),
297 source,
298 })?;
299 Ok(index_segment)
300 }
301
302 fn associate_file_with_operation(
304 &self,
305 index: &ReadonlyIndexSegment,
306 op_id: &OperationId,
307 ) -> io::Result<()> {
308 let dir = self.operations_dir();
309 let mut temp_file = NamedTempFile::new_in(&dir)?;
310 let file = temp_file.as_file_mut();
311 file.write_all(index.name().as_bytes())?;
312 persist_content_addressed_temp_file(temp_file, dir.join(op_id.hex()))?;
313 Ok(())
314 }
315}
316
317impl IndexStore for DefaultIndexStore {
318 fn as_any(&self) -> &dyn Any {
319 self
320 }
321
322 fn name(&self) -> &str {
323 Self::name()
324 }
325
326 fn get_index_at_op(
327 &self,
328 op: &Operation,
329 store: &Arc<Store>,
330 ) -> Result<Box<dyn ReadonlyIndex>, IndexReadError> {
331 let index_segment = match self.load_index_segments_at_operation(
332 op.id(),
333 store.commit_id_length(),
334 store.change_id_length(),
335 ) {
336 Err(DefaultIndexStoreError::LoadAssociation(err))
337 if err.kind() == io::ErrorKind::NotFound =>
338 {
339 self.build_index_segments_at_operation(op, store)
340 }
341 Err(DefaultIndexStoreError::LoadIndex(err)) if err.is_corrupt_or_not_found() => {
342 match &err {
345 ReadonlyIndexLoadError::UnexpectedVersion {
346 found_version,
347 expected_version,
348 } => {
349 eprintln!(
350 "Found index format version {found_version}, expected version \
351 {expected_version}. Reindexing..."
352 );
353 }
354 ReadonlyIndexLoadError::Other { name: _, error } => {
355 eprintln!("{err} (maybe the format has changed): {error}. Reindexing...");
356 }
357 }
358 self.reinit().map_err(|err| IndexReadError(err.into()))?;
359 self.build_index_segments_at_operation(op, store)
360 }
361 result => result,
362 }
363 .map_err(|err| IndexReadError(err.into()))?;
364 Ok(Box::new(DefaultReadonlyIndex::from_segment(index_segment)))
365 }
366
367 fn write_index(
368 &self,
369 index: Box<dyn MutableIndex>,
370 op: &Operation,
371 ) -> Result<Box<dyn ReadonlyIndex>, IndexWriteError> {
372 let index = index
373 .into_any()
374 .downcast::<DefaultMutableIndex>()
375 .expect("index to merge in must be a DefaultMutableIndex");
376 let index_segment = self
377 .save_mutable_index(*index, op.id())
378 .map_err(|err| IndexWriteError(err.into()))?;
379 Ok(Box::new(DefaultReadonlyIndex::from_segment(index_segment)))
380 }
381}