1#![expect(missing_docs)]
16
17use std::collections::HashMap;
18use std::collections::HashSet;
19use std::fs;
20use std::io;
21use std::io::Write as _;
22use std::path::Path;
23use std::path::PathBuf;
24use std::slice;
25use std::sync::Arc;
26
27use async_trait::async_trait;
28use itertools::Itertools as _;
29use prost::Message as _;
30use tempfile::NamedTempFile;
31use thiserror::Error;
32
33use super::changed_path::ChangedPathIndexSegmentId;
34use super::changed_path::CompositeChangedPathIndex;
35use super::changed_path::collect_changed_paths;
36use super::composite::AsCompositeIndex as _;
37use super::composite::CommitIndexSegmentId;
38use super::entry::GlobalCommitPosition;
39use super::mutable::DefaultMutableIndex;
40use super::readonly::DefaultReadonlyIndex;
41use super::readonly::FieldLengths;
42use super::readonly::ReadonlyCommitIndexSegment;
43use super::readonly::ReadonlyIndexLoadError;
44use crate::backend::BackendError;
45use crate::backend::BackendInitError;
46use crate::backend::CommitId;
47use crate::commit::CommitByCommitterTimestamp;
48use crate::dag_walk;
49use crate::file_util;
50use crate::file_util::IoResultExt as _;
51use crate::file_util::PathError;
52use crate::file_util::persist_temp_file;
53use crate::index::IndexStore;
54use crate::index::IndexStoreError;
55use crate::index::IndexStoreResult;
56use crate::index::MutableIndex;
57use crate::index::ReadonlyIndex;
58use crate::object_id::ObjectId as _;
59use crate::op_store::OpStoreError;
60use crate::op_store::OperationId;
61use crate::op_walk;
62use crate::operation::Operation;
63use crate::store::Store;
64
65const SEGMENT_FILE_NAME_LENGTH: usize = 64 * 2;
67
68#[derive(Debug, Error)]
70#[error("Failed to initialize index store")]
71pub struct DefaultIndexStoreInitError(#[from] pub PathError);
72
73impl From<DefaultIndexStoreInitError> for BackendInitError {
74 fn from(err: DefaultIndexStoreInitError) -> Self {
75 Self(err.into())
76 }
77}
78
79#[derive(Debug, Error)]
80pub enum DefaultIndexStoreError {
81 #[error("Failed to associate index files with an operation {op_id}")]
82 AssociateIndex {
83 op_id: OperationId,
84 source: PathError,
85 },
86 #[error("Failed to load associated index file names")]
87 LoadAssociation(#[source] PathError),
88 #[error(transparent)]
89 LoadIndex(ReadonlyIndexLoadError),
90 #[error("Failed to write index file")]
91 SaveIndex(#[source] PathError),
92 #[error("Failed to index commits at operation {op_id}")]
93 IndexCommits {
94 op_id: OperationId,
95 source: BackendError,
96 },
97 #[error(transparent)]
98 OpStore(#[from] OpStoreError),
99}
100
101#[derive(Debug)]
102pub struct DefaultIndexStore {
103 dir: PathBuf,
104}
105
106impl DefaultIndexStore {
107 pub fn name() -> &'static str {
108 "default"
109 }
110
111 pub fn init(dir: &Path) -> Result<Self, DefaultIndexStoreInitError> {
112 let store = Self {
113 dir: dir.to_owned(),
114 };
115 store.ensure_base_dirs()?;
116 Ok(store)
117 }
118
119 pub fn load(dir: &Path) -> Self {
120 Self {
121 dir: dir.to_owned(),
122 }
123 }
124
125 pub fn reinit(&self) -> Result<(), DefaultIndexStoreInitError> {
126 self.ensure_base_dirs()?;
128 file_util::remove_dir_contents(&self.op_links_dir())?;
130 let legacy_operations_dir = self.dir.join("operations"); if legacy_operations_dir.exists() {
132 file_util::remove_dir_contents(&legacy_operations_dir)?;
133 fs::remove_dir(&legacy_operations_dir).context(&legacy_operations_dir)?;
134 }
135 file_util::remove_dir_contents(&self.commit_segments_dir())?;
138 file_util::remove_dir_contents(&self.changed_path_segments_dir())?;
139 for entry in self.dir.read_dir().context(&self.dir)? {
141 let entry = entry.context(&self.dir)?;
142 let path = entry.path();
143 if path.file_name().unwrap().len() != SEGMENT_FILE_NAME_LENGTH {
144 continue;
146 }
147 fs::remove_file(&path).context(&path)?;
148 }
149 Ok(())
150 }
151
152 fn ensure_base_dirs(&self) -> Result<(), PathError> {
153 for dir in [
154 self.op_links_dir(),
155 self.commit_segments_dir(),
156 self.changed_path_segments_dir(),
157 ] {
158 file_util::create_or_reuse_dir(&dir).context(&dir)?;
159 }
160 Ok(())
161 }
162
163 fn op_links_dir(&self) -> PathBuf {
165 self.dir.join("op_links")
166 }
167
168 fn commit_segments_dir(&self) -> PathBuf {
170 self.dir.join("segments")
171 }
172
173 fn changed_path_segments_dir(&self) -> PathBuf {
175 self.dir.join("changed_paths")
176 }
177
178 fn load_index_at_operation(
179 &self,
180 op_id: &OperationId,
181 lengths: FieldLengths,
182 ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
183 let op_link_file = self.op_links_dir().join(op_id.hex());
184 let data = fs::read(&op_link_file)
185 .context(&op_link_file)
186 .map_err(DefaultIndexStoreError::LoadAssociation)?;
187 let proto = crate::protos::default_index::SegmentControl::decode(&*data)
188 .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))
189 .context(&op_link_file)
190 .map_err(DefaultIndexStoreError::LoadAssociation)?;
191 let commit_segment_id = CommitIndexSegmentId::new(proto.commit_segment_id);
192 let changed_path_start_commit_pos = proto
193 .changed_path_start_commit_pos
194 .map(GlobalCommitPosition);
195 let changed_path_segment_ids = proto
196 .changed_path_segment_ids
197 .into_iter()
198 .map(ChangedPathIndexSegmentId::new)
199 .collect_vec();
200
201 let commits = ReadonlyCommitIndexSegment::load(
202 &self.commit_segments_dir(),
203 commit_segment_id,
204 lengths,
205 )
206 .map_err(DefaultIndexStoreError::LoadIndex)?;
207 let changed_paths = if let Some(start_commit_pos) = changed_path_start_commit_pos {
209 CompositeChangedPathIndex::load(
210 &self.changed_path_segments_dir(),
211 start_commit_pos,
212 &changed_path_segment_ids,
213 )
214 .map_err(DefaultIndexStoreError::LoadIndex)?
215 } else {
216 CompositeChangedPathIndex::null()
217 };
218 Ok(DefaultReadonlyIndex::from_segment(commits, changed_paths))
219 }
220
221 #[tracing::instrument(skip(self, store))]
226 pub async fn build_index_at_operation(
227 &self,
228 operation: &Operation,
229 store: &Arc<Store>,
230 ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
231 tracing::info!("scanning operations to index");
232 let op_links_dir = self.op_links_dir();
233 let field_lengths = FieldLengths {
234 commit_id: store.commit_id_length(),
235 change_id: store.change_id_length(),
236 };
237 let mut unindexed_ops = Vec::new();
239 let mut parent_op = None;
240 for op in op_walk::walk_ancestors(slice::from_ref(operation)) {
241 let op = op?;
242 if op_links_dir.join(op.id().hex()).is_file() {
243 parent_op = Some(op);
244 break;
245 } else {
246 unindexed_ops.push(op);
247 }
248 }
249 let ops_to_visit = if let Some(op) = &parent_op {
250 op_walk::walk_ancestors_range(slice::from_ref(operation), slice::from_ref(op))
253 .try_collect()?
254 } else {
255 unindexed_ops
256 };
257 tracing::info!(
258 ops_count = ops_to_visit.len(),
259 "collecting head commits to index"
260 );
261 let mut historical_heads: HashMap<CommitId, OperationId> = HashMap::new();
262 for op in &ops_to_visit {
263 for commit_id in itertools::chain(
264 op.all_referenced_commit_ids(),
265 op.view().await?.all_referenced_commit_ids(),
266 ) {
267 if !historical_heads.contains_key(commit_id) {
268 historical_heads.insert(commit_id.clone(), op.id().clone());
269 }
270 }
271 }
272 let mut mutable_index;
273 let maybe_parent_index;
274 match &parent_op {
275 None => {
276 mutable_index = DefaultMutableIndex::full(field_lengths);
277 maybe_parent_index = None;
278 }
279 Some(op) => {
280 let parent_index = self.load_index_at_operation(op.id(), field_lengths)?;
281 mutable_index = parent_index.start_modification();
282 maybe_parent_index = Some(parent_index);
283 }
284 }
285
286 tracing::info!(
287 ?maybe_parent_index,
288 heads_count = historical_heads.len(),
289 "indexing commits reachable from historical heads"
290 );
291 let parent_index_has_id = |id: &CommitId| {
294 maybe_parent_index
295 .as_ref()
296 .is_some_and(|index| index.has_id_impl(id))
297 };
298 let get_commit_with_op = |commit_id: &CommitId, op_id: &OperationId| {
299 let op_id = op_id.clone();
300 match store.get_commit(commit_id) {
301 Ok(commit) => Ok((CommitByCommitterTimestamp(commit), op_id)),
305 Err(source) => Err(DefaultIndexStoreError::IndexCommits { op_id, source }),
306 }
307 };
308 let commits_to_keep_immediate_predecessors = if ops_to_visit
313 .iter()
314 .any(|op| !op.stores_commit_predecessors())
315 {
316 let mut ancestors = HashSet::new();
317 let mut work = historical_heads.keys().cloned().collect_vec();
318 while let Some(commit_id) = work.pop() {
319 if ancestors.contains(&commit_id) || parent_index_has_id(&commit_id) {
320 continue;
321 }
322 if let Ok(commit) = store.get_commit(&commit_id) {
323 work.extend(commit.parent_ids().iter().cloned());
324 }
325 ancestors.insert(commit_id);
326 }
327 ancestors
328 } else {
329 HashSet::new()
330 };
331 let commits = dag_walk::topo_order_reverse_ord_ok(
332 historical_heads
333 .iter()
334 .filter(|&(commit_id, _)| !parent_index_has_id(commit_id))
335 .map(|(commit_id, op_id)| get_commit_with_op(commit_id, op_id)),
336 |(CommitByCommitterTimestamp(commit), _)| commit.id().clone(),
337 |(CommitByCommitterTimestamp(commit), op_id)| {
338 let keep_predecessors =
339 commits_to_keep_immediate_predecessors.contains(commit.id());
340 itertools::chain(
341 commit.parent_ids(),
342 keep_predecessors
343 .then_some(&commit.store_commit().predecessors)
344 .into_iter()
345 .flatten(),
346 )
347 .filter(|&id| !parent_index_has_id(id))
348 .map(|commit_id| get_commit_with_op(commit_id, op_id))
349 .collect_vec()
350 },
351 |_| panic!("graph has cycle"),
352 )?;
353 for (CommitByCommitterTimestamp(commit), op_id) in commits.iter().rev() {
354 mutable_index.add_commit(commit).await.map_err(|source| {
355 DefaultIndexStoreError::IndexCommits {
356 op_id: op_id.clone(),
357 source,
358 }
359 })?;
360 }
361
362 let index = self.save_mutable_index(mutable_index, operation.id())?;
363 tracing::info!(?index, commits_count = commits.len(), "saved new index");
364
365 Ok(index)
366 }
367
368 #[tracing::instrument(skip(self, store, progress_callback))]
373 pub async fn build_changed_path_index_at_operation(
374 &self,
375 op_id: &OperationId,
376 store: &Arc<Store>,
377 max_commits: u32,
378 mut progress_callback: impl FnMut(&DefaultChangedPathIndexProgress),
379 ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
380 self.ensure_base_dirs()
382 .map_err(DefaultIndexStoreError::SaveIndex)?;
383 let field_lengths = FieldLengths {
384 commit_id: store.commit_id_length(),
385 change_id: store.change_id_length(),
386 };
387 let index = self.load_index_at_operation(op_id, field_lengths)?;
388 let old_changed_paths = index.changed_paths();
389
390 let pre_start;
394 let pre_end;
395 let post_start;
396 let post_end;
397 if let Some(GlobalCommitPosition(pos)) = old_changed_paths.start_commit_pos() {
398 post_start = pos + old_changed_paths.num_commits();
399 assert!(post_start <= index.num_commits());
400 post_end = u32::saturating_add(post_start, max_commits).min(index.num_commits());
401 pre_start = u32::saturating_sub(pos, max_commits - (post_end - post_start));
402 pre_end = pos;
403 } else {
404 pre_start = u32::saturating_sub(index.num_commits(), max_commits);
405 pre_end = index.num_commits();
406 post_start = pre_end;
407 post_end = pre_end;
408 }
409
410 let mut progress = DefaultChangedPathIndexProgress {
411 current: 0,
412 total: (pre_end - pre_start) + (post_end - post_start),
413 };
414 let mut emit_progress = || {
415 progress_callback(&progress);
416 progress.current += 1;
417 };
418
419 let to_index_err = |source| DefaultIndexStoreError::IndexCommits {
420 op_id: op_id.clone(),
421 source,
422 };
423 let index_commit = async |changed_paths: &mut CompositeChangedPathIndex,
424 pos: GlobalCommitPosition| {
425 assert_eq!(changed_paths.next_mutable_commit_pos(), Some(pos));
426 let commit_id = index.as_composite().commits().entry_by_pos(pos).commit_id();
427 let commit = store.get_commit_async(&commit_id).await?;
428 let paths = collect_changed_paths(&index, &commit).await?;
429 changed_paths.add_changed_paths(paths);
430 Ok(())
431 };
432
433 let mut new_changed_paths =
435 CompositeChangedPathIndex::empty(GlobalCommitPosition(pre_start));
436 new_changed_paths.make_mutable();
437 tracing::info!(?pre_start, ?pre_end, "indexing changed paths in commits");
438 for pos in (pre_start..pre_end).map(GlobalCommitPosition) {
439 emit_progress();
440 index_commit(&mut new_changed_paths, pos)
441 .await
442 .map_err(to_index_err)?;
443 }
444 new_changed_paths
445 .save_in(&self.changed_path_segments_dir())
446 .map_err(DefaultIndexStoreError::SaveIndex)?;
447
448 new_changed_paths.append_segments(old_changed_paths);
450
451 new_changed_paths.make_mutable();
453 tracing::info!(?post_start, ?post_end, "indexing changed paths in commits");
454 for pos in (post_start..post_end).map(GlobalCommitPosition) {
455 emit_progress();
456 index_commit(&mut new_changed_paths, pos)
457 .await
458 .map_err(to_index_err)?;
459 }
460 new_changed_paths.maybe_squash_with_ancestors();
461 new_changed_paths
462 .save_in(&self.changed_path_segments_dir())
463 .map_err(DefaultIndexStoreError::SaveIndex)?;
464
465 let commits = index.readonly_commits().clone();
467 let index = DefaultReadonlyIndex::from_segment(commits, new_changed_paths);
468 self.associate_index_with_operation(&index, op_id)
469 .map_err(|source| DefaultIndexStoreError::AssociateIndex {
470 op_id: op_id.to_owned(),
471 source,
472 })?;
473 emit_progress();
474 Ok(index)
475 }
476
477 fn save_mutable_index(
478 &self,
479 index: DefaultMutableIndex,
480 op_id: &OperationId,
481 ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
482 self.ensure_base_dirs()
484 .map_err(DefaultIndexStoreError::SaveIndex)?;
485 let (commits, mut changed_paths) = index.into_segment();
486 let commits = commits
487 .maybe_squash_with_ancestors()
488 .save_in(&self.commit_segments_dir())
489 .map_err(DefaultIndexStoreError::SaveIndex)?;
490 changed_paths.maybe_squash_with_ancestors();
491 changed_paths
492 .save_in(&self.changed_path_segments_dir())
493 .map_err(DefaultIndexStoreError::SaveIndex)?;
494 let index = DefaultReadonlyIndex::from_segment(commits, changed_paths);
495 self.associate_index_with_operation(&index, op_id)
496 .map_err(|source| DefaultIndexStoreError::AssociateIndex {
497 op_id: op_id.to_owned(),
498 source,
499 })?;
500 Ok(index)
501 }
502
503 fn associate_index_with_operation(
505 &self,
506 index: &DefaultReadonlyIndex,
507 op_id: &OperationId,
508 ) -> Result<(), PathError> {
509 let proto = crate::protos::default_index::SegmentControl {
510 commit_segment_id: index.readonly_commits().id().to_bytes(),
511 changed_path_start_commit_pos: index
512 .changed_paths()
513 .start_commit_pos()
514 .map(|GlobalCommitPosition(start)| start),
515 changed_path_segment_ids: index
516 .changed_paths()
517 .readonly_segments()
518 .iter()
519 .map(|segment| segment.id().to_bytes())
520 .collect(),
521 };
522 let dir = self.op_links_dir();
523 let mut temp_file = NamedTempFile::new_in(&dir).context(&dir)?;
524 let file = temp_file.as_file_mut();
525 file.write_all(&proto.encode_to_vec())
526 .context(temp_file.path())?;
527 let path = dir.join(op_id.hex());
528 persist_temp_file(temp_file, &path).context(&path)?;
529 Ok(())
530 }
531}
532
533#[async_trait]
534impl IndexStore for DefaultIndexStore {
535 fn name(&self) -> &str {
536 Self::name()
537 }
538
539 async fn get_index_at_op(
540 &self,
541 op: &Operation,
542 store: &Arc<Store>,
543 ) -> IndexStoreResult<Box<dyn ReadonlyIndex>> {
544 let field_lengths = FieldLengths {
545 commit_id: store.commit_id_length(),
546 change_id: store.change_id_length(),
547 };
548 let index = match self.load_index_at_operation(op.id(), field_lengths) {
549 Err(DefaultIndexStoreError::LoadAssociation(PathError { source: error, .. }))
550 if error.kind() == io::ErrorKind::NotFound =>
551 {
552 self.build_index_at_operation(op, store).await
553 }
554 Err(DefaultIndexStoreError::LoadIndex(err)) if err.is_corrupt_or_not_found() => {
555 match &err {
558 ReadonlyIndexLoadError::UnexpectedVersion {
559 kind,
560 found_version,
561 expected_version,
562 } => {
563 eprintln!(
564 "Found {kind} index format version {found_version}, expected version \
565 {expected_version}. Reindexing..."
566 );
567 }
568 ReadonlyIndexLoadError::Other { error, .. } => {
569 eprintln!("{err} (maybe the format has changed): {error}. Reindexing...");
570 }
571 }
572 self.reinit()
573 .map_err(|err| IndexStoreError::Read(err.into()))?;
574 self.build_index_at_operation(op, store).await
575 }
576 result => result,
577 }
578 .map_err(|err| IndexStoreError::Read(err.into()))?;
579 Ok(Box::new(index))
580 }
581
582 fn write_index(
583 &self,
584 index: Box<dyn MutableIndex>,
585 op: &Operation,
586 ) -> IndexStoreResult<Box<dyn ReadonlyIndex>> {
587 let index: Box<DefaultMutableIndex> = index
588 .downcast()
589 .expect("index to merge in must be a DefaultMutableIndex");
590 let index = self
591 .save_mutable_index(*index, op.id())
592 .map_err(|err| IndexStoreError::Write(err.into()))?;
593 Ok(Box::new(index))
594 }
595}
596
597#[derive(Clone, Debug)]
599pub struct DefaultChangedPathIndexProgress {
600 pub current: u32,
601 pub total: u32,
602}