1#![expect(missing_docs)]
16
17use std::collections::HashMap;
18use std::collections::HashSet;
19use std::fs;
20use std::io;
21use std::io::Write as _;
22use std::path::Path;
23use std::path::PathBuf;
24use std::slice;
25use std::sync::Arc;
26
27use itertools::Itertools as _;
28use pollster::FutureExt as _;
29use prost::Message as _;
30use tempfile::NamedTempFile;
31use thiserror::Error;
32
33use super::changed_path::ChangedPathIndexSegmentId;
34use super::changed_path::CompositeChangedPathIndex;
35use super::changed_path::collect_changed_paths;
36use super::composite::AsCompositeIndex as _;
37use super::composite::CommitIndexSegmentId;
38use super::entry::GlobalCommitPosition;
39use super::mutable::DefaultMutableIndex;
40use super::readonly::DefaultReadonlyIndex;
41use super::readonly::FieldLengths;
42use super::readonly::ReadonlyCommitIndexSegment;
43use super::readonly::ReadonlyIndexLoadError;
44use crate::backend::BackendError;
45use crate::backend::BackendInitError;
46use crate::backend::CommitId;
47use crate::commit::CommitByCommitterTimestamp;
48use crate::dag_walk;
49use crate::file_util;
50use crate::file_util::IoResultExt as _;
51use crate::file_util::PathError;
52use crate::file_util::persist_temp_file;
53use crate::index::IndexStore;
54use crate::index::IndexStoreError;
55use crate::index::IndexStoreResult;
56use crate::index::MutableIndex;
57use crate::index::ReadonlyIndex;
58use crate::object_id::ObjectId as _;
59use crate::op_store::OpStoreError;
60use crate::op_store::OperationId;
61use crate::op_walk;
62use crate::operation::Operation;
63use crate::store::Store;
64
65const SEGMENT_FILE_NAME_LENGTH: usize = 64 * 2;
67
68#[derive(Debug, Error)]
70#[error("Failed to initialize index store")]
71pub struct DefaultIndexStoreInitError(#[from] pub PathError);
72
73impl From<DefaultIndexStoreInitError> for BackendInitError {
74 fn from(err: DefaultIndexStoreInitError) -> Self {
75 Self(err.into())
76 }
77}
78
79#[derive(Debug, Error)]
80pub enum DefaultIndexStoreError {
81 #[error("Failed to associate index files with an operation {op_id}")]
82 AssociateIndex {
83 op_id: OperationId,
84 source: PathError,
85 },
86 #[error("Failed to load associated index file names")]
87 LoadAssociation(#[source] PathError),
88 #[error(transparent)]
89 LoadIndex(ReadonlyIndexLoadError),
90 #[error("Failed to write index file")]
91 SaveIndex(#[source] PathError),
92 #[error("Failed to index commits at operation {op_id}")]
93 IndexCommits {
94 op_id: OperationId,
95 source: BackendError,
96 },
97 #[error(transparent)]
98 OpStore(#[from] OpStoreError),
99}
100
101#[derive(Debug)]
102pub struct DefaultIndexStore {
103 dir: PathBuf,
104}
105
106impl DefaultIndexStore {
107 pub fn name() -> &'static str {
108 "default"
109 }
110
111 pub fn init(dir: &Path) -> Result<Self, DefaultIndexStoreInitError> {
112 let store = Self {
113 dir: dir.to_owned(),
114 };
115 store.ensure_base_dirs()?;
116 Ok(store)
117 }
118
119 pub fn load(dir: &Path) -> Self {
120 Self {
121 dir: dir.to_owned(),
122 }
123 }
124
125 pub fn reinit(&self) -> Result<(), DefaultIndexStoreInitError> {
126 self.ensure_base_dirs()?;
128 file_util::remove_dir_contents(&self.op_links_dir())?;
130 file_util::remove_dir_contents(&self.legacy_operations_dir())?;
131 file_util::remove_dir_contents(&self.commit_segments_dir())?;
134 file_util::remove_dir_contents(&self.changed_path_segments_dir())?;
135 for entry in self.dir.read_dir().context(&self.dir)? {
137 let entry = entry.context(&self.dir)?;
138 let path = entry.path();
139 if path.file_name().unwrap().len() != SEGMENT_FILE_NAME_LENGTH {
140 continue;
142 }
143 fs::remove_file(&path).context(&path)?;
144 }
145 Ok(())
146 }
147
148 fn ensure_base_dirs(&self) -> Result<(), PathError> {
149 for dir in [
150 self.op_links_dir(),
151 self.legacy_operations_dir(),
152 self.commit_segments_dir(),
153 self.changed_path_segments_dir(),
154 ] {
155 file_util::create_or_reuse_dir(&dir).context(&dir)?;
156 }
157 Ok(())
158 }
159
160 fn op_links_dir(&self) -> PathBuf {
162 self.dir.join("op_links")
163 }
164
165 fn legacy_operations_dir(&self) -> PathBuf {
167 self.dir.join("operations")
168 }
169
170 fn commit_segments_dir(&self) -> PathBuf {
172 self.dir.join("segments")
173 }
174
175 fn changed_path_segments_dir(&self) -> PathBuf {
177 self.dir.join("changed_paths")
178 }
179
180 fn load_index_at_operation(
181 &self,
182 op_id: &OperationId,
183 lengths: FieldLengths,
184 ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
185 let commit_segment_id;
186 let changed_path_start_commit_pos;
187 let changed_path_segment_ids;
188 let op_link_file = self.op_links_dir().join(op_id.hex());
189 match fs::read(&op_link_file).context(&op_link_file) {
190 Ok(data) => {
191 let proto = crate::protos::default_index::SegmentControl::decode(&*data)
192 .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))
193 .context(&op_link_file)
194 .map_err(DefaultIndexStoreError::LoadAssociation)?;
195 commit_segment_id = CommitIndexSegmentId::new(proto.commit_segment_id);
196 changed_path_start_commit_pos = proto
197 .changed_path_start_commit_pos
198 .map(GlobalCommitPosition);
199 changed_path_segment_ids = proto
200 .changed_path_segment_ids
201 .into_iter()
202 .map(ChangedPathIndexSegmentId::new)
203 .collect_vec();
204 }
205 Err(PathError { source: error, .. }) if error.kind() == io::ErrorKind::NotFound => {
207 let op_id_file = self.legacy_operations_dir().join(op_id.hex());
208 let data = fs::read(&op_id_file)
209 .context(&op_id_file)
210 .map_err(DefaultIndexStoreError::LoadAssociation)?;
211 commit_segment_id = CommitIndexSegmentId::try_from_hex(&data)
212 .ok_or_else(|| {
213 io::Error::new(io::ErrorKind::InvalidData, "file name is not valid hex")
214 })
215 .context(&op_id_file)
216 .map_err(DefaultIndexStoreError::LoadAssociation)?;
217 changed_path_start_commit_pos = None;
218 changed_path_segment_ids = vec![];
219 }
220 Err(err) => return Err(DefaultIndexStoreError::LoadAssociation(err)),
221 };
222
223 let commits = ReadonlyCommitIndexSegment::load(
224 &self.commit_segments_dir(),
225 commit_segment_id,
226 lengths,
227 )
228 .map_err(DefaultIndexStoreError::LoadIndex)?;
229 let changed_paths = if let Some(start_commit_pos) = changed_path_start_commit_pos {
231 CompositeChangedPathIndex::load(
232 &self.changed_path_segments_dir(),
233 start_commit_pos,
234 &changed_path_segment_ids,
235 )
236 .map_err(DefaultIndexStoreError::LoadIndex)?
237 } else {
238 CompositeChangedPathIndex::null()
239 };
240 Ok(DefaultReadonlyIndex::from_segment(commits, changed_paths))
241 }
242
243 #[tracing::instrument(skip(self, store))]
248 pub async fn build_index_at_operation(
249 &self,
250 operation: &Operation,
251 store: &Arc<Store>,
252 ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
253 tracing::info!("scanning operations to index");
254 let op_links_dir = self.op_links_dir();
255 let legacy_operations_dir = self.legacy_operations_dir();
256 let field_lengths = FieldLengths {
257 commit_id: store.commit_id_length(),
258 change_id: store.change_id_length(),
259 };
260 let mut unindexed_ops = Vec::new();
262 let mut parent_op = None;
263 for op in op_walk::walk_ancestors(slice::from_ref(operation)) {
264 let op = op?;
265 if op_links_dir.join(op.id().hex()).is_file()
266 || legacy_operations_dir.join(op.id().hex()).is_file()
267 {
268 parent_op = Some(op);
269 break;
270 } else {
271 unindexed_ops.push(op);
272 }
273 }
274 let ops_to_visit = if let Some(op) = &parent_op {
275 op_walk::walk_ancestors_range(slice::from_ref(operation), slice::from_ref(op))
278 .try_collect()?
279 } else {
280 unindexed_ops
281 };
282 tracing::info!(
283 ops_count = ops_to_visit.len(),
284 "collecting head commits to index"
285 );
286 let mut historical_heads: HashMap<CommitId, OperationId> = HashMap::new();
287 for op in &ops_to_visit {
288 for commit_id in itertools::chain(
289 op.all_referenced_commit_ids(),
290 op.view()?.all_referenced_commit_ids(),
291 ) {
292 if !historical_heads.contains_key(commit_id) {
293 historical_heads.insert(commit_id.clone(), op.id().clone());
294 }
295 }
296 }
297 let mut mutable_index;
298 let maybe_parent_index;
299 match &parent_op {
300 None => {
301 mutable_index = DefaultMutableIndex::full(field_lengths);
302 maybe_parent_index = None;
303 }
304 Some(op) => {
305 let parent_index = self.load_index_at_operation(op.id(), field_lengths)?;
306 mutable_index = parent_index.start_modification();
307 maybe_parent_index = Some(parent_index);
308 }
309 }
310
311 tracing::info!(
312 ?maybe_parent_index,
313 heads_count = historical_heads.len(),
314 "indexing commits reachable from historical heads"
315 );
316 let parent_index_has_id = |id: &CommitId| {
319 maybe_parent_index
320 .as_ref()
321 .is_some_and(|index| index.has_id_impl(id))
322 };
323 let get_commit_with_op = |commit_id: &CommitId, op_id: &OperationId| {
324 let op_id = op_id.clone();
325 match store.get_commit(commit_id) {
326 Ok(commit) => Ok((CommitByCommitterTimestamp(commit), op_id)),
330 Err(source) => Err(DefaultIndexStoreError::IndexCommits { op_id, source }),
331 }
332 };
333 let commits_to_keep_immediate_predecessors = if ops_to_visit
338 .iter()
339 .any(|op| !op.stores_commit_predecessors())
340 {
341 let mut ancestors = HashSet::new();
342 let mut work = historical_heads.keys().cloned().collect_vec();
343 while let Some(commit_id) = work.pop() {
344 if ancestors.contains(&commit_id) || parent_index_has_id(&commit_id) {
345 continue;
346 }
347 if let Ok(commit) = store.get_commit(&commit_id) {
348 work.extend(commit.parent_ids().iter().cloned());
349 }
350 ancestors.insert(commit_id);
351 }
352 ancestors
353 } else {
354 HashSet::new()
355 };
356 let commits = dag_walk::topo_order_reverse_ord_ok(
357 historical_heads
358 .iter()
359 .filter(|&(commit_id, _)| !parent_index_has_id(commit_id))
360 .map(|(commit_id, op_id)| get_commit_with_op(commit_id, op_id)),
361 |(CommitByCommitterTimestamp(commit), _)| commit.id().clone(),
362 |(CommitByCommitterTimestamp(commit), op_id)| {
363 let keep_predecessors =
364 commits_to_keep_immediate_predecessors.contains(commit.id());
365 itertools::chain(
366 commit.parent_ids(),
367 keep_predecessors
368 .then_some(&commit.store_commit().predecessors)
369 .into_iter()
370 .flatten(),
371 )
372 .filter(|&id| !parent_index_has_id(id))
373 .map(|commit_id| get_commit_with_op(commit_id, op_id))
374 .collect_vec()
375 },
376 |_| panic!("graph has cycle"),
377 )?;
378 for (CommitByCommitterTimestamp(commit), op_id) in commits.iter().rev() {
379 mutable_index.add_commit(commit).await.map_err(|source| {
380 DefaultIndexStoreError::IndexCommits {
381 op_id: op_id.clone(),
382 source,
383 }
384 })?;
385 }
386
387 let index = self.save_mutable_index(mutable_index, operation.id())?;
388 tracing::info!(?index, commits_count = commits.len(), "saved new index");
389
390 Ok(index)
391 }
392
393 #[tracing::instrument(skip(self, store))]
398 pub async fn build_changed_path_index_at_operation(
399 &self,
400 op_id: &OperationId,
401 store: &Arc<Store>,
402 max_commits: u32,
403 ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
405 self.ensure_base_dirs()
407 .map_err(DefaultIndexStoreError::SaveIndex)?;
408 let field_lengths = FieldLengths {
409 commit_id: store.commit_id_length(),
410 change_id: store.change_id_length(),
411 };
412 let index = self.load_index_at_operation(op_id, field_lengths)?;
413 let old_changed_paths = index.changed_paths();
414
415 let pre_start;
419 let pre_end;
420 let post_start;
421 let post_end;
422 if let Some(GlobalCommitPosition(pos)) = old_changed_paths.start_commit_pos() {
423 post_start = pos + old_changed_paths.num_commits();
424 assert!(post_start <= index.num_commits());
425 post_end = u32::saturating_add(post_start, max_commits).min(index.num_commits());
426 pre_start = u32::saturating_sub(pos, max_commits - (post_end - post_start));
427 pre_end = pos;
428 } else {
429 pre_start = u32::saturating_sub(index.num_commits(), max_commits);
430 pre_end = index.num_commits();
431 post_start = pre_end;
432 post_end = pre_end;
433 }
434
435 let to_index_err = |source| DefaultIndexStoreError::IndexCommits {
436 op_id: op_id.clone(),
437 source,
438 };
439 let index_commit = async |changed_paths: &mut CompositeChangedPathIndex,
440 pos: GlobalCommitPosition| {
441 assert_eq!(changed_paths.next_mutable_commit_pos(), Some(pos));
442 let commit_id = index.as_composite().commits().entry_by_pos(pos).commit_id();
443 let commit = store.get_commit_async(&commit_id).await?;
444 let paths = collect_changed_paths(&index, &commit).await?;
445 changed_paths.add_changed_paths(paths);
446 Ok(())
447 };
448
449 let mut new_changed_paths =
451 CompositeChangedPathIndex::empty(GlobalCommitPosition(pre_start));
452 new_changed_paths.make_mutable();
453 tracing::info!(?pre_start, ?pre_end, "indexing changed paths in commits");
454 for pos in (pre_start..pre_end).map(GlobalCommitPosition) {
455 index_commit(&mut new_changed_paths, pos)
456 .await
457 .map_err(to_index_err)?;
458 }
459 new_changed_paths
460 .save_in(&self.changed_path_segments_dir())
461 .map_err(DefaultIndexStoreError::SaveIndex)?;
462
463 new_changed_paths.append_segments(old_changed_paths);
465
466 new_changed_paths.make_mutable();
468 tracing::info!(?post_start, ?post_end, "indexing changed paths in commits");
469 for pos in (post_start..post_end).map(GlobalCommitPosition) {
470 index_commit(&mut new_changed_paths, pos)
471 .await
472 .map_err(to_index_err)?;
473 }
474 new_changed_paths.maybe_squash_with_ancestors();
475 new_changed_paths
476 .save_in(&self.changed_path_segments_dir())
477 .map_err(DefaultIndexStoreError::SaveIndex)?;
478
479 let commits = index.readonly_commits().clone();
481 let index = DefaultReadonlyIndex::from_segment(commits, new_changed_paths);
482 self.associate_index_with_operation(&index, op_id)
483 .map_err(|source| DefaultIndexStoreError::AssociateIndex {
484 op_id: op_id.to_owned(),
485 source,
486 })?;
487 Ok(index)
488 }
489
490 fn save_mutable_index(
491 &self,
492 index: DefaultMutableIndex,
493 op_id: &OperationId,
494 ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
495 self.ensure_base_dirs()
497 .map_err(DefaultIndexStoreError::SaveIndex)?;
498 let (commits, mut changed_paths) = index.into_segment();
499 let commits = commits
500 .maybe_squash_with_ancestors()
501 .save_in(&self.commit_segments_dir())
502 .map_err(DefaultIndexStoreError::SaveIndex)?;
503 changed_paths.maybe_squash_with_ancestors();
504 changed_paths
505 .save_in(&self.changed_path_segments_dir())
506 .map_err(DefaultIndexStoreError::SaveIndex)?;
507 let index = DefaultReadonlyIndex::from_segment(commits, changed_paths);
508 self.associate_index_with_operation(&index, op_id)
509 .map_err(|source| DefaultIndexStoreError::AssociateIndex {
510 op_id: op_id.to_owned(),
511 source,
512 })?;
513 Ok(index)
514 }
515
516 fn associate_index_with_operation(
518 &self,
519 index: &DefaultReadonlyIndex,
520 op_id: &OperationId,
521 ) -> Result<(), PathError> {
522 let proto = crate::protos::default_index::SegmentControl {
523 commit_segment_id: index.readonly_commits().id().to_bytes(),
524 changed_path_start_commit_pos: index
525 .changed_paths()
526 .start_commit_pos()
527 .map(|GlobalCommitPosition(start)| start),
528 changed_path_segment_ids: index
529 .changed_paths()
530 .readonly_segments()
531 .iter()
532 .map(|segment| segment.id().to_bytes())
533 .collect(),
534 };
535 let dir = self.op_links_dir();
536 let mut temp_file = NamedTempFile::new_in(&dir).context(&dir)?;
537 let file = temp_file.as_file_mut();
538 file.write_all(&proto.encode_to_vec())
539 .context(temp_file.path())?;
540 let path = dir.join(op_id.hex());
541 persist_temp_file(temp_file, &path).context(&path)?;
542
543 let dir = self.legacy_operations_dir();
545 let mut temp_file = NamedTempFile::new_in(&dir).context(&dir)?;
546 let file = temp_file.as_file_mut();
547 file.write_all(index.readonly_commits().id().hex().as_bytes())
548 .context(temp_file.path())?;
549 let path = dir.join(op_id.hex());
550 persist_temp_file(temp_file, &path).context(&path)?;
551 Ok(())
552 }
553}
554
555impl IndexStore for DefaultIndexStore {
556 fn name(&self) -> &str {
557 Self::name()
558 }
559
560 fn get_index_at_op(
561 &self,
562 op: &Operation,
563 store: &Arc<Store>,
564 ) -> IndexStoreResult<Box<dyn ReadonlyIndex>> {
565 let field_lengths = FieldLengths {
566 commit_id: store.commit_id_length(),
567 change_id: store.change_id_length(),
568 };
569 let index = match self.load_index_at_operation(op.id(), field_lengths) {
570 Err(DefaultIndexStoreError::LoadAssociation(PathError { source: error, .. }))
571 if error.kind() == io::ErrorKind::NotFound =>
572 {
573 self.build_index_at_operation(op, store).block_on()
574 }
575 Err(DefaultIndexStoreError::LoadIndex(err)) if err.is_corrupt_or_not_found() => {
576 match &err {
579 ReadonlyIndexLoadError::UnexpectedVersion {
580 kind,
581 found_version,
582 expected_version,
583 } => {
584 eprintln!(
585 "Found {kind} index format version {found_version}, expected version \
586 {expected_version}. Reindexing..."
587 );
588 }
589 ReadonlyIndexLoadError::Other { error, .. } => {
590 eprintln!("{err} (maybe the format has changed): {error}. Reindexing...");
591 }
592 }
593 self.reinit()
594 .map_err(|err| IndexStoreError::Read(err.into()))?;
595 self.build_index_at_operation(op, store).block_on()
596 }
597 result => result,
598 }
599 .map_err(|err| IndexStoreError::Read(err.into()))?;
600 Ok(Box::new(index))
601 }
602
603 fn write_index(
604 &self,
605 index: Box<dyn MutableIndex>,
606 op: &Operation,
607 ) -> IndexStoreResult<Box<dyn ReadonlyIndex>> {
608 let index: Box<DefaultMutableIndex> = index
609 .downcast()
610 .expect("index to merge in must be a DefaultMutableIndex");
611 let index = self
612 .save_mutable_index(*index, op.id())
613 .map_err(|err| IndexStoreError::Write(err.into()))?;
614 Ok(Box::new(index))
615 }
616}