1#![expect(missing_docs)]
16
17use std::collections::HashMap;
18use std::fs;
19use std::future;
20use std::io;
21use std::io::Write as _;
22use std::path::Path;
23use std::path::PathBuf;
24use std::pin::pin;
25use std::slice;
26use std::sync::Arc;
27
28use async_trait::async_trait;
29use futures::StreamExt as _;
30use futures::TryStreamExt as _;
31use futures::stream;
32use itertools::Itertools as _;
33use prost::Message as _;
34use tempfile::NamedTempFile;
35use thiserror::Error;
36
37use super::changed_path::ChangedPathIndexSegmentId;
38use super::changed_path::CompositeChangedPathIndex;
39use super::changed_path::collect_changed_paths;
40use super::composite::AsCompositeIndex as _;
41use super::composite::CommitIndexSegmentId;
42use super::entry::GlobalCommitPosition;
43use super::mutable::DefaultMutableIndex;
44use super::readonly::DefaultReadonlyIndex;
45use super::readonly::FieldLengths;
46use super::readonly::ReadonlyCommitIndexSegment;
47use super::readonly::ReadonlyIndexLoadError;
48use crate::backend::BackendError;
49use crate::backend::BackendInitError;
50use crate::backend::CommitId;
51use crate::commit::CommitByCommitterTimestamp;
52use crate::dag_walk_async;
53use crate::file_util;
54use crate::file_util::IoResultExt as _;
55use crate::file_util::PathError;
56use crate::file_util::persist_temp_file;
57use crate::index::IndexStore;
58use crate::index::IndexStoreError;
59use crate::index::IndexStoreResult;
60use crate::index::MutableIndex;
61use crate::index::ReadonlyIndex;
62use crate::object_id::ObjectId as _;
63use crate::op_store::OpStoreError;
64use crate::op_store::OperationId;
65use crate::op_walk;
66use crate::operation::Operation;
67use crate::store::Store;
68
69const SEGMENT_FILE_NAME_LENGTH: usize = 64 * 2;
71
72#[derive(Debug, Error)]
74#[error("Failed to initialize index store")]
75pub struct DefaultIndexStoreInitError(#[from] pub PathError);
76
77impl From<DefaultIndexStoreInitError> for BackendInitError {
78 fn from(err: DefaultIndexStoreInitError) -> Self {
79 Self(err.into())
80 }
81}
82
83#[derive(Debug, Error)]
84pub enum DefaultIndexStoreError {
85 #[error("Failed to associate index files with an operation {op_id}")]
86 AssociateIndex {
87 op_id: OperationId,
88 source: PathError,
89 },
90 #[error("Failed to load associated index file names")]
91 LoadAssociation(#[source] PathError),
92 #[error(transparent)]
93 LoadIndex(ReadonlyIndexLoadError),
94 #[error("Failed to write index file")]
95 SaveIndex(#[source] PathError),
96 #[error("Failed to index commits at operation {op_id}")]
97 IndexCommits {
98 op_id: OperationId,
99 source: BackendError,
100 },
101 #[error(transparent)]
102 OpStore(#[from] OpStoreError),
103}
104
105#[derive(Debug)]
106pub struct DefaultIndexStore {
107 dir: PathBuf,
108}
109
110impl DefaultIndexStore {
111 pub fn name() -> &'static str {
112 "default"
113 }
114
115 pub fn init(dir: &Path) -> Result<Self, DefaultIndexStoreInitError> {
116 let store = Self {
117 dir: dir.to_owned(),
118 };
119 store.ensure_base_dirs()?;
120 Ok(store)
121 }
122
123 pub fn load(dir: &Path) -> Self {
124 Self {
125 dir: dir.to_owned(),
126 }
127 }
128
129 pub fn reinit(&self) -> Result<(), DefaultIndexStoreInitError> {
130 self.ensure_base_dirs()?;
132 file_util::remove_dir_contents(&self.op_links_dir())?;
134 let legacy_operations_dir = self.dir.join("operations"); if legacy_operations_dir.exists() {
136 file_util::remove_dir_contents(&legacy_operations_dir)?;
137 fs::remove_dir(&legacy_operations_dir).context(&legacy_operations_dir)?;
138 }
139 file_util::remove_dir_contents(&self.commit_segments_dir())?;
142 file_util::remove_dir_contents(&self.changed_path_segments_dir())?;
143 for entry in self.dir.read_dir().context(&self.dir)? {
145 let entry = entry.context(&self.dir)?;
146 let path = entry.path();
147 if path.file_name().unwrap().len() != SEGMENT_FILE_NAME_LENGTH {
148 continue;
150 }
151 fs::remove_file(&path).context(&path)?;
152 }
153 Ok(())
154 }
155
156 fn ensure_base_dirs(&self) -> Result<(), PathError> {
157 for dir in [
158 self.op_links_dir(),
159 self.commit_segments_dir(),
160 self.changed_path_segments_dir(),
161 ] {
162 file_util::create_or_reuse_dir(&dir).context(&dir)?;
163 }
164 Ok(())
165 }
166
167 fn op_links_dir(&self) -> PathBuf {
169 self.dir.join("op_links")
170 }
171
172 fn commit_segments_dir(&self) -> PathBuf {
174 self.dir.join("segments")
175 }
176
177 fn changed_path_segments_dir(&self) -> PathBuf {
179 self.dir.join("changed_paths")
180 }
181
182 fn load_index_at_operation(
183 &self,
184 op_id: &OperationId,
185 lengths: FieldLengths,
186 ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
187 let op_link_file = self.op_links_dir().join(op_id.hex());
188 let data = fs::read(&op_link_file)
189 .context(&op_link_file)
190 .map_err(DefaultIndexStoreError::LoadAssociation)?;
191 let proto = crate::protos::default_index::SegmentControl::decode(&*data)
192 .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))
193 .context(&op_link_file)
194 .map_err(DefaultIndexStoreError::LoadAssociation)?;
195 let commit_segment_id = CommitIndexSegmentId::new(proto.commit_segment_id);
196 let changed_path_start_commit_pos = proto
197 .changed_path_start_commit_pos
198 .map(GlobalCommitPosition);
199 let changed_path_segment_ids = proto
200 .changed_path_segment_ids
201 .into_iter()
202 .map(ChangedPathIndexSegmentId::new)
203 .collect_vec();
204
205 let commits = ReadonlyCommitIndexSegment::load(
206 &self.commit_segments_dir(),
207 commit_segment_id,
208 lengths,
209 )
210 .map_err(DefaultIndexStoreError::LoadIndex)?;
211 let changed_paths = if let Some(start_commit_pos) = changed_path_start_commit_pos {
213 CompositeChangedPathIndex::load(
214 &self.changed_path_segments_dir(),
215 start_commit_pos,
216 &changed_path_segment_ids,
217 )
218 .map_err(DefaultIndexStoreError::LoadIndex)?
219 } else {
220 CompositeChangedPathIndex::null()
221 };
222 Ok(DefaultReadonlyIndex::from_segment(commits, changed_paths))
223 }
224
225 #[tracing::instrument(skip(self, store))]
230 pub async fn build_index_at_operation(
231 &self,
232 operation: &Operation,
233 store: &Arc<Store>,
234 ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
235 tracing::info!("scanning operations to index");
236 let op_links_dir = self.op_links_dir();
237 let field_lengths = FieldLengths {
238 commit_id: store.commit_id_length(),
239 change_id: store.change_id_length(),
240 };
241 let mut unindexed_ops = Vec::new();
243 let mut parent_op = None;
244 let mut ancestors = pin!(op_walk::walk_ancestors(slice::from_ref(operation)));
245 while let Some(op) = ancestors.next().await {
246 let op = op?;
247 if op_links_dir.join(op.id().hex()).is_file() {
248 parent_op = Some(op);
249 break;
250 } else {
251 unindexed_ops.push(op);
252 }
253 }
254 let ops_to_visit = if let Some(op) = &parent_op {
255 op_walk::walk_ancestors_range(slice::from_ref(operation), slice::from_ref(op))
258 .try_collect()
259 .await?
260 } else {
261 unindexed_ops
262 };
263 tracing::info!(
264 ops_count = ops_to_visit.len(),
265 "collecting head commits to index"
266 );
267 let mut historical_heads: HashMap<CommitId, OperationId> = HashMap::new();
268 for op in &ops_to_visit {
269 for commit_id in itertools::chain(
270 op.all_referenced_commit_ids(),
271 op.view().await?.all_referenced_commit_ids(),
272 ) {
273 if !historical_heads.contains_key(commit_id) {
274 historical_heads.insert(commit_id.clone(), op.id().clone());
275 }
276 }
277 }
278 let mut mutable_index;
279 let maybe_parent_index;
280 match &parent_op {
281 None => {
282 mutable_index = DefaultMutableIndex::full(field_lengths);
283 maybe_parent_index = None;
284 }
285 Some(op) => {
286 let parent_index = self.load_index_at_operation(op.id(), field_lengths)?;
287 mutable_index = parent_index.start_modification();
288 maybe_parent_index = Some(parent_index);
289 }
290 }
291
292 tracing::info!(
293 ?maybe_parent_index,
294 heads_count = historical_heads.len(),
295 "indexing commits reachable from historical heads"
296 );
297 let parent_index_has_id = |id: &CommitId| {
300 maybe_parent_index
301 .as_ref()
302 .is_some_and(|index| index.has_id_impl(id))
303 };
304 let get_commit_with_op = async |commit_id: &CommitId, op_id: &OperationId| {
305 let op_id = op_id.clone();
306 match store.get_commit_async(commit_id).await {
307 Ok(commit) => Ok((CommitByCommitterTimestamp(commit), op_id)),
311 Err(source) => Err(DefaultIndexStoreError::IndexCommits { op_id, source }),
312 }
313 };
314 let commits = dag_walk_async::topo_order_reverse_ord(
315 stream::iter(&historical_heads)
316 .filter(|&(commit_id, _)| future::ready(!parent_index_has_id(commit_id)))
317 .map(|(commit_id, op_id)| get_commit_with_op(commit_id, op_id))
318 .buffered(store.concurrency())
319 .collect::<Vec<_>>()
320 .await,
321 |(CommitByCommitterTimestamp(commit), _)| commit.id().clone(),
322 async |(CommitByCommitterTimestamp(commit), op_id)| {
323 stream::iter(commit.parent_ids())
324 .filter(|&id| future::ready(!parent_index_has_id(id)))
325 .map(|commit_id| get_commit_with_op(commit_id, op_id))
326 .buffered(store.concurrency())
327 .collect::<Vec<_>>()
328 .await
329 },
330 |_| panic!("graph has cycle"),
331 )
332 .await?;
333 for (CommitByCommitterTimestamp(commit), op_id) in commits.iter().rev() {
334 mutable_index.add_commit(commit).await.map_err(|source| {
335 DefaultIndexStoreError::IndexCommits {
336 op_id: op_id.clone(),
337 source,
338 }
339 })?;
340 }
341
342 let index = self.save_mutable_index(mutable_index, operation.id())?;
343 tracing::info!(?index, commits_count = commits.len(), "saved new index");
344
345 Ok(index)
346 }
347
348 #[tracing::instrument(skip(self, store, progress_callback))]
353 pub async fn build_changed_path_index_at_operation(
354 &self,
355 op_id: &OperationId,
356 store: &Arc<Store>,
357 max_commits: u32,
358 mut progress_callback: impl FnMut(&DefaultChangedPathIndexProgress),
359 ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
360 self.ensure_base_dirs()
362 .map_err(DefaultIndexStoreError::SaveIndex)?;
363 let field_lengths = FieldLengths {
364 commit_id: store.commit_id_length(),
365 change_id: store.change_id_length(),
366 };
367 let index = self.load_index_at_operation(op_id, field_lengths)?;
368 let old_changed_paths = index.changed_paths();
369
370 let pre_start;
374 let pre_end;
375 let post_start;
376 let post_end;
377 if let Some(GlobalCommitPosition(pos)) = old_changed_paths.start_commit_pos() {
378 post_start = pos + old_changed_paths.num_commits();
379 assert!(post_start <= index.num_commits());
380 post_end = u32::saturating_add(post_start, max_commits).min(index.num_commits());
381 pre_start = u32::saturating_sub(pos, max_commits - (post_end - post_start));
382 pre_end = pos;
383 } else {
384 pre_start = u32::saturating_sub(index.num_commits(), max_commits);
385 pre_end = index.num_commits();
386 post_start = pre_end;
387 post_end = pre_end;
388 }
389
390 let mut progress = DefaultChangedPathIndexProgress {
391 current: 0,
392 total: (pre_end - pre_start) + (post_end - post_start),
393 };
394 let mut emit_progress = || {
395 progress_callback(&progress);
396 progress.current += 1;
397 };
398
399 let to_index_err = |source| DefaultIndexStoreError::IndexCommits {
400 op_id: op_id.clone(),
401 source,
402 };
403 let index_commit = async |changed_paths: &mut CompositeChangedPathIndex,
404 pos: GlobalCommitPosition| {
405 assert_eq!(changed_paths.next_mutable_commit_pos(), Some(pos));
406 let commit_id = index.as_composite().commits().entry_by_pos(pos).commit_id();
407 let commit = store.get_commit_async(&commit_id).await?;
408 let paths = collect_changed_paths(&index, &commit).await?;
409 changed_paths.add_changed_paths(paths);
410 Ok(())
411 };
412
413 let mut new_changed_paths =
415 CompositeChangedPathIndex::empty(GlobalCommitPosition(pre_start));
416 new_changed_paths.make_mutable();
417 tracing::info!(?pre_start, ?pre_end, "indexing changed paths in commits");
418 for pos in (pre_start..pre_end).map(GlobalCommitPosition) {
419 emit_progress();
420 index_commit(&mut new_changed_paths, pos)
421 .await
422 .map_err(to_index_err)?;
423 }
424 new_changed_paths
425 .save_in(&self.changed_path_segments_dir())
426 .map_err(DefaultIndexStoreError::SaveIndex)?;
427
428 new_changed_paths.append_segments(old_changed_paths);
430
431 new_changed_paths.make_mutable();
433 tracing::info!(?post_start, ?post_end, "indexing changed paths in commits");
434 for pos in (post_start..post_end).map(GlobalCommitPosition) {
435 emit_progress();
436 index_commit(&mut new_changed_paths, pos)
437 .await
438 .map_err(to_index_err)?;
439 }
440 new_changed_paths.maybe_squash_with_ancestors();
441 new_changed_paths
442 .save_in(&self.changed_path_segments_dir())
443 .map_err(DefaultIndexStoreError::SaveIndex)?;
444
445 let commits = index.readonly_commits().clone();
447 let index = DefaultReadonlyIndex::from_segment(commits, new_changed_paths);
448 self.associate_index_with_operation(&index, op_id)
449 .map_err(|source| DefaultIndexStoreError::AssociateIndex {
450 op_id: op_id.to_owned(),
451 source,
452 })?;
453 emit_progress();
454 Ok(index)
455 }
456
457 fn save_mutable_index(
458 &self,
459 index: DefaultMutableIndex,
460 op_id: &OperationId,
461 ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
462 self.ensure_base_dirs()
464 .map_err(DefaultIndexStoreError::SaveIndex)?;
465 let (commits, mut changed_paths) = index.into_segment();
466 let commits = commits
467 .maybe_squash_with_ancestors()
468 .save_in(&self.commit_segments_dir())
469 .map_err(DefaultIndexStoreError::SaveIndex)?;
470 changed_paths.maybe_squash_with_ancestors();
471 changed_paths
472 .save_in(&self.changed_path_segments_dir())
473 .map_err(DefaultIndexStoreError::SaveIndex)?;
474 let index = DefaultReadonlyIndex::from_segment(commits, changed_paths);
475 self.associate_index_with_operation(&index, op_id)
476 .map_err(|source| DefaultIndexStoreError::AssociateIndex {
477 op_id: op_id.to_owned(),
478 source,
479 })?;
480 Ok(index)
481 }
482
483 fn associate_index_with_operation(
485 &self,
486 index: &DefaultReadonlyIndex,
487 op_id: &OperationId,
488 ) -> Result<(), PathError> {
489 let proto = crate::protos::default_index::SegmentControl {
490 commit_segment_id: index.readonly_commits().id().to_bytes(),
491 changed_path_start_commit_pos: index
492 .changed_paths()
493 .start_commit_pos()
494 .map(|GlobalCommitPosition(start)| start),
495 changed_path_segment_ids: index
496 .changed_paths()
497 .readonly_segments()
498 .iter()
499 .map(|segment| segment.id().to_bytes())
500 .collect(),
501 };
502 let dir = self.op_links_dir();
503 let mut temp_file = NamedTempFile::new_in(&dir).context(&dir)?;
504 let file = temp_file.as_file_mut();
505 file.write_all(&proto.encode_to_vec())
506 .context(temp_file.path())?;
507 let path = dir.join(op_id.hex());
508 persist_temp_file(temp_file, &path).context(&path)?;
509 Ok(())
510 }
511}
512
513#[async_trait(?Send)]
514impl IndexStore for DefaultIndexStore {
515 fn name(&self) -> &str {
516 Self::name()
517 }
518
519 async fn get_index_at_op(
520 &self,
521 op: &Operation,
522 store: &Arc<Store>,
523 ) -> IndexStoreResult<Box<dyn ReadonlyIndex>> {
524 let field_lengths = FieldLengths {
525 commit_id: store.commit_id_length(),
526 change_id: store.change_id_length(),
527 };
528 let index = match self.load_index_at_operation(op.id(), field_lengths) {
529 Err(DefaultIndexStoreError::LoadAssociation(PathError { source: error, .. }))
530 if error.kind() == io::ErrorKind::NotFound =>
531 {
532 self.build_index_at_operation(op, store).await
533 }
534 Err(DefaultIndexStoreError::LoadIndex(err)) if err.is_corrupt_or_not_found() => {
535 match &err {
538 ReadonlyIndexLoadError::UnexpectedVersion {
539 kind,
540 found_version,
541 expected_version,
542 } => {
543 eprintln!(
544 "Found {kind} index format version {found_version}, expected version \
545 {expected_version}. Reindexing..."
546 );
547 }
548 ReadonlyIndexLoadError::Other { error, .. } => {
549 eprintln!("{err} (maybe the format has changed): {error}. Reindexing...");
550 }
551 }
552 self.reinit()
553 .map_err(|err| IndexStoreError::Read(err.into()))?;
554 self.build_index_at_operation(op, store).await
555 }
556 result => result,
557 }
558 .map_err(|err| IndexStoreError::Read(err.into()))?;
559 Ok(Box::new(index))
560 }
561
562 fn write_index(
563 &self,
564 index: Box<dyn MutableIndex>,
565 op: &Operation,
566 ) -> IndexStoreResult<Box<dyn ReadonlyIndex>> {
567 let index: Box<DefaultMutableIndex> = index
568 .downcast()
569 .expect("index to merge in must be a DefaultMutableIndex");
570 let index = self
571 .save_mutable_index(*index, op.id())
572 .map_err(|err| IndexStoreError::Write(err.into()))?;
573 Ok(Box::new(index))
574 }
575}
576
577#[derive(Clone, Debug)]
579pub struct DefaultChangedPathIndexProgress {
580 pub current: u32,
581 pub total: u32,
582}