1#![expect(missing_docs)]
16
17use std::collections::HashMap;
18use std::collections::HashSet;
19use std::fs;
20use std::io;
21use std::io::Write as _;
22use std::path::Path;
23use std::path::PathBuf;
24use std::pin::pin;
25use std::slice;
26use std::sync::Arc;
27
28use async_trait::async_trait;
29use futures::StreamExt as _;
30use futures::TryStreamExt as _;
31use itertools::Itertools as _;
32use prost::Message as _;
33use tempfile::NamedTempFile;
34use thiserror::Error;
35
36use super::changed_path::ChangedPathIndexSegmentId;
37use super::changed_path::CompositeChangedPathIndex;
38use super::changed_path::collect_changed_paths;
39use super::composite::AsCompositeIndex as _;
40use super::composite::CommitIndexSegmentId;
41use super::entry::GlobalCommitPosition;
42use super::mutable::DefaultMutableIndex;
43use super::readonly::DefaultReadonlyIndex;
44use super::readonly::FieldLengths;
45use super::readonly::ReadonlyCommitIndexSegment;
46use super::readonly::ReadonlyIndexLoadError;
47use crate::backend::BackendError;
48use crate::backend::BackendInitError;
49use crate::backend::CommitId;
50use crate::commit::CommitByCommitterTimestamp;
51use crate::dag_walk_async;
52use crate::file_util;
53use crate::file_util::IoResultExt as _;
54use crate::file_util::PathError;
55use crate::file_util::persist_temp_file;
56use crate::index::IndexStore;
57use crate::index::IndexStoreError;
58use crate::index::IndexStoreResult;
59use crate::index::MutableIndex;
60use crate::index::ReadonlyIndex;
61use crate::object_id::ObjectId as _;
62use crate::op_store::OpStoreError;
63use crate::op_store::OperationId;
64use crate::op_walk;
65use crate::operation::Operation;
66use crate::store::Store;
67
68const SEGMENT_FILE_NAME_LENGTH: usize = 64 * 2;
70
71#[derive(Debug, Error)]
73#[error("Failed to initialize index store")]
74pub struct DefaultIndexStoreInitError(#[from] pub PathError);
75
76impl From<DefaultIndexStoreInitError> for BackendInitError {
77 fn from(err: DefaultIndexStoreInitError) -> Self {
78 Self(err.into())
79 }
80}
81
82#[derive(Debug, Error)]
83pub enum DefaultIndexStoreError {
84 #[error("Failed to associate index files with an operation {op_id}")]
85 AssociateIndex {
86 op_id: OperationId,
87 source: PathError,
88 },
89 #[error("Failed to load associated index file names")]
90 LoadAssociation(#[source] PathError),
91 #[error(transparent)]
92 LoadIndex(ReadonlyIndexLoadError),
93 #[error("Failed to write index file")]
94 SaveIndex(#[source] PathError),
95 #[error("Failed to index commits at operation {op_id}")]
96 IndexCommits {
97 op_id: OperationId,
98 source: BackendError,
99 },
100 #[error(transparent)]
101 OpStore(#[from] OpStoreError),
102}
103
104#[derive(Debug)]
105pub struct DefaultIndexStore {
106 dir: PathBuf,
107}
108
109impl DefaultIndexStore {
110 pub fn name() -> &'static str {
111 "default"
112 }
113
114 pub fn init(dir: &Path) -> Result<Self, DefaultIndexStoreInitError> {
115 let store = Self {
116 dir: dir.to_owned(),
117 };
118 store.ensure_base_dirs()?;
119 Ok(store)
120 }
121
122 pub fn load(dir: &Path) -> Self {
123 Self {
124 dir: dir.to_owned(),
125 }
126 }
127
128 pub fn reinit(&self) -> Result<(), DefaultIndexStoreInitError> {
129 self.ensure_base_dirs()?;
131 file_util::remove_dir_contents(&self.op_links_dir())?;
133 let legacy_operations_dir = self.dir.join("operations"); if legacy_operations_dir.exists() {
135 file_util::remove_dir_contents(&legacy_operations_dir)?;
136 fs::remove_dir(&legacy_operations_dir).context(&legacy_operations_dir)?;
137 }
138 file_util::remove_dir_contents(&self.commit_segments_dir())?;
141 file_util::remove_dir_contents(&self.changed_path_segments_dir())?;
142 for entry in self.dir.read_dir().context(&self.dir)? {
144 let entry = entry.context(&self.dir)?;
145 let path = entry.path();
146 if path.file_name().unwrap().len() != SEGMENT_FILE_NAME_LENGTH {
147 continue;
149 }
150 fs::remove_file(&path).context(&path)?;
151 }
152 Ok(())
153 }
154
155 fn ensure_base_dirs(&self) -> Result<(), PathError> {
156 for dir in [
157 self.op_links_dir(),
158 self.commit_segments_dir(),
159 self.changed_path_segments_dir(),
160 ] {
161 file_util::create_or_reuse_dir(&dir).context(&dir)?;
162 }
163 Ok(())
164 }
165
166 fn op_links_dir(&self) -> PathBuf {
168 self.dir.join("op_links")
169 }
170
171 fn commit_segments_dir(&self) -> PathBuf {
173 self.dir.join("segments")
174 }
175
176 fn changed_path_segments_dir(&self) -> PathBuf {
178 self.dir.join("changed_paths")
179 }
180
181 fn load_index_at_operation(
182 &self,
183 op_id: &OperationId,
184 lengths: FieldLengths,
185 ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
186 let op_link_file = self.op_links_dir().join(op_id.hex());
187 let data = fs::read(&op_link_file)
188 .context(&op_link_file)
189 .map_err(DefaultIndexStoreError::LoadAssociation)?;
190 let proto = crate::protos::default_index::SegmentControl::decode(&*data)
191 .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))
192 .context(&op_link_file)
193 .map_err(DefaultIndexStoreError::LoadAssociation)?;
194 let commit_segment_id = CommitIndexSegmentId::new(proto.commit_segment_id);
195 let changed_path_start_commit_pos = proto
196 .changed_path_start_commit_pos
197 .map(GlobalCommitPosition);
198 let changed_path_segment_ids = proto
199 .changed_path_segment_ids
200 .into_iter()
201 .map(ChangedPathIndexSegmentId::new)
202 .collect_vec();
203
204 let commits = ReadonlyCommitIndexSegment::load(
205 &self.commit_segments_dir(),
206 commit_segment_id,
207 lengths,
208 )
209 .map_err(DefaultIndexStoreError::LoadIndex)?;
210 let changed_paths = if let Some(start_commit_pos) = changed_path_start_commit_pos {
212 CompositeChangedPathIndex::load(
213 &self.changed_path_segments_dir(),
214 start_commit_pos,
215 &changed_path_segment_ids,
216 )
217 .map_err(DefaultIndexStoreError::LoadIndex)?
218 } else {
219 CompositeChangedPathIndex::null()
220 };
221 Ok(DefaultReadonlyIndex::from_segment(commits, changed_paths))
222 }
223
224 #[tracing::instrument(skip(self, store))]
229 pub async fn build_index_at_operation(
230 &self,
231 operation: &Operation,
232 store: &Arc<Store>,
233 ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
234 tracing::info!("scanning operations to index");
235 let op_links_dir = self.op_links_dir();
236 let field_lengths = FieldLengths {
237 commit_id: store.commit_id_length(),
238 change_id: store.change_id_length(),
239 };
240 let mut unindexed_ops = Vec::new();
242 let mut parent_op = None;
243 let mut ancestors = pin!(op_walk::walk_ancestors(slice::from_ref(operation)));
244 while let Some(op) = ancestors.next().await {
245 let op = op?;
246 if op_links_dir.join(op.id().hex()).is_file() {
247 parent_op = Some(op);
248 break;
249 } else {
250 unindexed_ops.push(op);
251 }
252 }
253 let ops_to_visit = if let Some(op) = &parent_op {
254 op_walk::walk_ancestors_range(slice::from_ref(operation), slice::from_ref(op))
257 .try_collect()
258 .await?
259 } else {
260 unindexed_ops
261 };
262 tracing::info!(
263 ops_count = ops_to_visit.len(),
264 "collecting head commits to index"
265 );
266 let mut historical_heads: HashMap<CommitId, OperationId> = HashMap::new();
267 for op in &ops_to_visit {
268 for commit_id in itertools::chain(
269 op.all_referenced_commit_ids(),
270 op.view().await?.all_referenced_commit_ids(),
271 ) {
272 if !historical_heads.contains_key(commit_id) {
273 historical_heads.insert(commit_id.clone(), op.id().clone());
274 }
275 }
276 }
277 let mut mutable_index;
278 let maybe_parent_index;
279 match &parent_op {
280 None => {
281 mutable_index = DefaultMutableIndex::full(field_lengths);
282 maybe_parent_index = None;
283 }
284 Some(op) => {
285 let parent_index = self.load_index_at_operation(op.id(), field_lengths)?;
286 mutable_index = parent_index.start_modification();
287 maybe_parent_index = Some(parent_index);
288 }
289 }
290
291 tracing::info!(
292 ?maybe_parent_index,
293 heads_count = historical_heads.len(),
294 "indexing commits reachable from historical heads"
295 );
296 let parent_index_has_id = |id: &CommitId| {
299 maybe_parent_index
300 .as_ref()
301 .is_some_and(|index| index.has_id_impl(id))
302 };
303 let get_commit_with_op = |commit_id: &CommitId, op_id: &OperationId| {
304 let op_id = op_id.clone();
305 match store.get_commit(commit_id) {
306 Ok(commit) => Ok((CommitByCommitterTimestamp(commit), op_id)),
310 Err(source) => Err(DefaultIndexStoreError::IndexCommits { op_id, source }),
311 }
312 };
313 let commits_to_keep_immediate_predecessors = if ops_to_visit
318 .iter()
319 .any(|op| !op.stores_commit_predecessors())
320 {
321 let mut ancestors = HashSet::new();
322 let mut work = historical_heads.keys().cloned().collect_vec();
323 while let Some(commit_id) = work.pop() {
324 if ancestors.contains(&commit_id) || parent_index_has_id(&commit_id) {
325 continue;
326 }
327 if let Ok(commit) = store.get_commit(&commit_id) {
328 work.extend(commit.parent_ids().iter().cloned());
329 }
330 ancestors.insert(commit_id);
331 }
332 ancestors
333 } else {
334 HashSet::new()
335 };
336 let commits = dag_walk_async::topo_order_reverse_ord(
337 historical_heads
338 .iter()
339 .filter(|&(commit_id, _)| !parent_index_has_id(commit_id))
340 .map(|(commit_id, op_id)| get_commit_with_op(commit_id, op_id)),
341 |(CommitByCommitterTimestamp(commit), _)| commit.id().clone(),
342 async |(CommitByCommitterTimestamp(commit), op_id)| {
343 let keep_predecessors =
344 commits_to_keep_immediate_predecessors.contains(commit.id());
345 itertools::chain(
346 commit.parent_ids(),
347 keep_predecessors
348 .then_some(&commit.store_commit().predecessors)
349 .into_iter()
350 .flatten(),
351 )
352 .filter(|&id| !parent_index_has_id(id))
353 .map(|commit_id| get_commit_with_op(commit_id, op_id))
354 .collect_vec()
355 },
356 |_| panic!("graph has cycle"),
357 )
358 .await?;
359 for (CommitByCommitterTimestamp(commit), op_id) in commits.iter().rev() {
360 mutable_index.add_commit(commit).await.map_err(|source| {
361 DefaultIndexStoreError::IndexCommits {
362 op_id: op_id.clone(),
363 source,
364 }
365 })?;
366 }
367
368 let index = self.save_mutable_index(mutable_index, operation.id())?;
369 tracing::info!(?index, commits_count = commits.len(), "saved new index");
370
371 Ok(index)
372 }
373
374 #[tracing::instrument(skip(self, store, progress_callback))]
379 pub async fn build_changed_path_index_at_operation(
380 &self,
381 op_id: &OperationId,
382 store: &Arc<Store>,
383 max_commits: u32,
384 mut progress_callback: impl FnMut(&DefaultChangedPathIndexProgress),
385 ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
386 self.ensure_base_dirs()
388 .map_err(DefaultIndexStoreError::SaveIndex)?;
389 let field_lengths = FieldLengths {
390 commit_id: store.commit_id_length(),
391 change_id: store.change_id_length(),
392 };
393 let index = self.load_index_at_operation(op_id, field_lengths)?;
394 let old_changed_paths = index.changed_paths();
395
396 let pre_start;
400 let pre_end;
401 let post_start;
402 let post_end;
403 if let Some(GlobalCommitPosition(pos)) = old_changed_paths.start_commit_pos() {
404 post_start = pos + old_changed_paths.num_commits();
405 assert!(post_start <= index.num_commits());
406 post_end = u32::saturating_add(post_start, max_commits).min(index.num_commits());
407 pre_start = u32::saturating_sub(pos, max_commits - (post_end - post_start));
408 pre_end = pos;
409 } else {
410 pre_start = u32::saturating_sub(index.num_commits(), max_commits);
411 pre_end = index.num_commits();
412 post_start = pre_end;
413 post_end = pre_end;
414 }
415
416 let mut progress = DefaultChangedPathIndexProgress {
417 current: 0,
418 total: (pre_end - pre_start) + (post_end - post_start),
419 };
420 let mut emit_progress = || {
421 progress_callback(&progress);
422 progress.current += 1;
423 };
424
425 let to_index_err = |source| DefaultIndexStoreError::IndexCommits {
426 op_id: op_id.clone(),
427 source,
428 };
429 let index_commit = async |changed_paths: &mut CompositeChangedPathIndex,
430 pos: GlobalCommitPosition| {
431 assert_eq!(changed_paths.next_mutable_commit_pos(), Some(pos));
432 let commit_id = index.as_composite().commits().entry_by_pos(pos).commit_id();
433 let commit = store.get_commit_async(&commit_id).await?;
434 let paths = collect_changed_paths(&index, &commit).await?;
435 changed_paths.add_changed_paths(paths);
436 Ok(())
437 };
438
439 let mut new_changed_paths =
441 CompositeChangedPathIndex::empty(GlobalCommitPosition(pre_start));
442 new_changed_paths.make_mutable();
443 tracing::info!(?pre_start, ?pre_end, "indexing changed paths in commits");
444 for pos in (pre_start..pre_end).map(GlobalCommitPosition) {
445 emit_progress();
446 index_commit(&mut new_changed_paths, pos)
447 .await
448 .map_err(to_index_err)?;
449 }
450 new_changed_paths
451 .save_in(&self.changed_path_segments_dir())
452 .map_err(DefaultIndexStoreError::SaveIndex)?;
453
454 new_changed_paths.append_segments(old_changed_paths);
456
457 new_changed_paths.make_mutable();
459 tracing::info!(?post_start, ?post_end, "indexing changed paths in commits");
460 for pos in (post_start..post_end).map(GlobalCommitPosition) {
461 emit_progress();
462 index_commit(&mut new_changed_paths, pos)
463 .await
464 .map_err(to_index_err)?;
465 }
466 new_changed_paths.maybe_squash_with_ancestors();
467 new_changed_paths
468 .save_in(&self.changed_path_segments_dir())
469 .map_err(DefaultIndexStoreError::SaveIndex)?;
470
471 let commits = index.readonly_commits().clone();
473 let index = DefaultReadonlyIndex::from_segment(commits, new_changed_paths);
474 self.associate_index_with_operation(&index, op_id)
475 .map_err(|source| DefaultIndexStoreError::AssociateIndex {
476 op_id: op_id.to_owned(),
477 source,
478 })?;
479 emit_progress();
480 Ok(index)
481 }
482
483 fn save_mutable_index(
484 &self,
485 index: DefaultMutableIndex,
486 op_id: &OperationId,
487 ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
488 self.ensure_base_dirs()
490 .map_err(DefaultIndexStoreError::SaveIndex)?;
491 let (commits, mut changed_paths) = index.into_segment();
492 let commits = commits
493 .maybe_squash_with_ancestors()
494 .save_in(&self.commit_segments_dir())
495 .map_err(DefaultIndexStoreError::SaveIndex)?;
496 changed_paths.maybe_squash_with_ancestors();
497 changed_paths
498 .save_in(&self.changed_path_segments_dir())
499 .map_err(DefaultIndexStoreError::SaveIndex)?;
500 let index = DefaultReadonlyIndex::from_segment(commits, changed_paths);
501 self.associate_index_with_operation(&index, op_id)
502 .map_err(|source| DefaultIndexStoreError::AssociateIndex {
503 op_id: op_id.to_owned(),
504 source,
505 })?;
506 Ok(index)
507 }
508
509 fn associate_index_with_operation(
511 &self,
512 index: &DefaultReadonlyIndex,
513 op_id: &OperationId,
514 ) -> Result<(), PathError> {
515 let proto = crate::protos::default_index::SegmentControl {
516 commit_segment_id: index.readonly_commits().id().to_bytes(),
517 changed_path_start_commit_pos: index
518 .changed_paths()
519 .start_commit_pos()
520 .map(|GlobalCommitPosition(start)| start),
521 changed_path_segment_ids: index
522 .changed_paths()
523 .readonly_segments()
524 .iter()
525 .map(|segment| segment.id().to_bytes())
526 .collect(),
527 };
528 let dir = self.op_links_dir();
529 let mut temp_file = NamedTempFile::new_in(&dir).context(&dir)?;
530 let file = temp_file.as_file_mut();
531 file.write_all(&proto.encode_to_vec())
532 .context(temp_file.path())?;
533 let path = dir.join(op_id.hex());
534 persist_temp_file(temp_file, &path).context(&path)?;
535 Ok(())
536 }
537}
538
539#[async_trait(?Send)]
540impl IndexStore for DefaultIndexStore {
541 fn name(&self) -> &str {
542 Self::name()
543 }
544
545 async fn get_index_at_op(
546 &self,
547 op: &Operation,
548 store: &Arc<Store>,
549 ) -> IndexStoreResult<Box<dyn ReadonlyIndex>> {
550 let field_lengths = FieldLengths {
551 commit_id: store.commit_id_length(),
552 change_id: store.change_id_length(),
553 };
554 let index = match self.load_index_at_operation(op.id(), field_lengths) {
555 Err(DefaultIndexStoreError::LoadAssociation(PathError { source: error, .. }))
556 if error.kind() == io::ErrorKind::NotFound =>
557 {
558 self.build_index_at_operation(op, store).await
559 }
560 Err(DefaultIndexStoreError::LoadIndex(err)) if err.is_corrupt_or_not_found() => {
561 match &err {
564 ReadonlyIndexLoadError::UnexpectedVersion {
565 kind,
566 found_version,
567 expected_version,
568 } => {
569 eprintln!(
570 "Found {kind} index format version {found_version}, expected version \
571 {expected_version}. Reindexing..."
572 );
573 }
574 ReadonlyIndexLoadError::Other { error, .. } => {
575 eprintln!("{err} (maybe the format has changed): {error}. Reindexing...");
576 }
577 }
578 self.reinit()
579 .map_err(|err| IndexStoreError::Read(err.into()))?;
580 self.build_index_at_operation(op, store).await
581 }
582 result => result,
583 }
584 .map_err(|err| IndexStoreError::Read(err.into()))?;
585 Ok(Box::new(index))
586 }
587
588 fn write_index(
589 &self,
590 index: Box<dyn MutableIndex>,
591 op: &Operation,
592 ) -> IndexStoreResult<Box<dyn ReadonlyIndex>> {
593 let index: Box<DefaultMutableIndex> = index
594 .downcast()
595 .expect("index to merge in must be a DefaultMutableIndex");
596 let index = self
597 .save_mutable_index(*index, op.id())
598 .map_err(|err| IndexStoreError::Write(err.into()))?;
599 Ok(Box::new(index))
600 }
601}
602
603#[derive(Clone, Debug)]
605pub struct DefaultChangedPathIndexProgress {
606 pub current: u32,
607 pub total: u32,
608}