1#![expect(missing_docs)]
16
17use std::collections::HashMap;
18use std::collections::HashSet;
19use std::fs;
20use std::io;
21use std::io::Write as _;
22use std::path::Path;
23use std::path::PathBuf;
24use std::slice;
25use std::sync::Arc;
26
27use itertools::Itertools as _;
28use pollster::FutureExt as _;
29use prost::Message as _;
30use tempfile::NamedTempFile;
31use thiserror::Error;
32
33use super::changed_path::ChangedPathIndexSegmentId;
34use super::changed_path::CompositeChangedPathIndex;
35use super::changed_path::collect_changed_paths;
36use super::composite::AsCompositeIndex as _;
37use super::composite::CommitIndexSegmentId;
38use super::entry::GlobalCommitPosition;
39use super::mutable::DefaultMutableIndex;
40use super::readonly::DefaultReadonlyIndex;
41use super::readonly::FieldLengths;
42use super::readonly::ReadonlyCommitIndexSegment;
43use super::readonly::ReadonlyIndexLoadError;
44use crate::backend::BackendError;
45use crate::backend::BackendInitError;
46use crate::backend::CommitId;
47use crate::commit::CommitByCommitterTimestamp;
48use crate::dag_walk;
49use crate::file_util;
50use crate::file_util::IoResultExt as _;
51use crate::file_util::PathError;
52use crate::file_util::persist_temp_file;
53use crate::index::Index as _;
54use crate::index::IndexReadError;
55use crate::index::IndexStore;
56use crate::index::IndexWriteError;
57use crate::index::MutableIndex;
58use crate::index::ReadonlyIndex;
59use crate::object_id::ObjectId as _;
60use crate::op_store::OpStoreError;
61use crate::op_store::OperationId;
62use crate::op_walk;
63use crate::operation::Operation;
64use crate::store::Store;
65
66const SEGMENT_FILE_NAME_LENGTH: usize = 64 * 2;
68
69#[derive(Debug, Error)]
71#[error("Failed to initialize index store")]
72pub struct DefaultIndexStoreInitError(#[from] pub PathError);
73
74impl From<DefaultIndexStoreInitError> for BackendInitError {
75 fn from(err: DefaultIndexStoreInitError) -> Self {
76 Self(err.into())
77 }
78}
79
80#[derive(Debug, Error)]
81pub enum DefaultIndexStoreError {
82 #[error("Failed to associate index files with an operation {op_id}")]
83 AssociateIndex {
84 op_id: OperationId,
85 source: PathError,
86 },
87 #[error("Failed to load associated index file names")]
88 LoadAssociation(#[source] PathError),
89 #[error(transparent)]
90 LoadIndex(ReadonlyIndexLoadError),
91 #[error("Failed to write index file")]
92 SaveIndex(#[source] PathError),
93 #[error("Failed to index commits at operation {op_id}")]
94 IndexCommits {
95 op_id: OperationId,
96 source: BackendError,
97 },
98 #[error(transparent)]
99 OpStore(#[from] OpStoreError),
100}
101
102#[derive(Debug)]
103pub struct DefaultIndexStore {
104 dir: PathBuf,
105}
106
107impl DefaultIndexStore {
108 pub fn name() -> &'static str {
109 "default"
110 }
111
112 pub fn init(dir: &Path) -> Result<Self, DefaultIndexStoreInitError> {
113 let store = Self {
114 dir: dir.to_owned(),
115 };
116 store.ensure_base_dirs()?;
117 Ok(store)
118 }
119
120 pub fn load(dir: &Path) -> Self {
121 Self {
122 dir: dir.to_owned(),
123 }
124 }
125
126 pub fn reinit(&self) -> Result<(), DefaultIndexStoreInitError> {
127 self.ensure_base_dirs()?;
129 file_util::remove_dir_contents(&self.op_links_dir())?;
131 file_util::remove_dir_contents(&self.legacy_operations_dir())?;
132 file_util::remove_dir_contents(&self.commit_segments_dir())?;
135 file_util::remove_dir_contents(&self.changed_path_segments_dir())?;
136 for entry in self.dir.read_dir().context(&self.dir)? {
138 let entry = entry.context(&self.dir)?;
139 let path = entry.path();
140 if path.file_name().unwrap().len() != SEGMENT_FILE_NAME_LENGTH {
141 continue;
143 }
144 fs::remove_file(&path).context(&path)?;
145 }
146 Ok(())
147 }
148
149 fn ensure_base_dirs(&self) -> Result<(), PathError> {
150 for dir in [
151 self.op_links_dir(),
152 self.legacy_operations_dir(),
153 self.commit_segments_dir(),
154 self.changed_path_segments_dir(),
155 ] {
156 file_util::create_or_reuse_dir(&dir).context(&dir)?;
157 }
158 Ok(())
159 }
160
161 fn op_links_dir(&self) -> PathBuf {
163 self.dir.join("op_links")
164 }
165
166 fn legacy_operations_dir(&self) -> PathBuf {
168 self.dir.join("operations")
169 }
170
171 fn commit_segments_dir(&self) -> PathBuf {
173 self.dir.join("segments")
174 }
175
176 fn changed_path_segments_dir(&self) -> PathBuf {
178 self.dir.join("changed_paths")
179 }
180
181 fn load_index_at_operation(
182 &self,
183 op_id: &OperationId,
184 lengths: FieldLengths,
185 ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
186 let commit_segment_id;
187 let changed_path_start_commit_pos;
188 let changed_path_segment_ids;
189 let op_link_file = self.op_links_dir().join(op_id.hex());
190 match fs::read(&op_link_file).context(&op_link_file) {
191 Ok(data) => {
192 let proto = crate::protos::default_index::SegmentControl::decode(&*data)
193 .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))
194 .context(&op_link_file)
195 .map_err(DefaultIndexStoreError::LoadAssociation)?;
196 commit_segment_id = CommitIndexSegmentId::new(proto.commit_segment_id);
197 changed_path_start_commit_pos = proto
198 .changed_path_start_commit_pos
199 .map(GlobalCommitPosition);
200 changed_path_segment_ids = proto
201 .changed_path_segment_ids
202 .into_iter()
203 .map(ChangedPathIndexSegmentId::new)
204 .collect_vec();
205 }
206 Err(PathError { source: error, .. }) if error.kind() == io::ErrorKind::NotFound => {
208 let op_id_file = self.legacy_operations_dir().join(op_id.hex());
209 let data = fs::read(&op_id_file)
210 .context(&op_id_file)
211 .map_err(DefaultIndexStoreError::LoadAssociation)?;
212 commit_segment_id = CommitIndexSegmentId::try_from_hex(&data)
213 .ok_or_else(|| {
214 io::Error::new(io::ErrorKind::InvalidData, "file name is not valid hex")
215 })
216 .context(&op_id_file)
217 .map_err(DefaultIndexStoreError::LoadAssociation)?;
218 changed_path_start_commit_pos = None;
219 changed_path_segment_ids = vec![];
220 }
221 Err(err) => return Err(DefaultIndexStoreError::LoadAssociation(err)),
222 };
223
224 let commits = ReadonlyCommitIndexSegment::load(
225 &self.commit_segments_dir(),
226 commit_segment_id,
227 lengths,
228 )
229 .map_err(DefaultIndexStoreError::LoadIndex)?;
230 let changed_paths = if let Some(start_commit_pos) = changed_path_start_commit_pos {
232 CompositeChangedPathIndex::load(
233 &self.changed_path_segments_dir(),
234 start_commit_pos,
235 &changed_path_segment_ids,
236 )
237 .map_err(DefaultIndexStoreError::LoadIndex)?
238 } else {
239 CompositeChangedPathIndex::null()
240 };
241 Ok(DefaultReadonlyIndex::from_segment(commits, changed_paths))
242 }
243
244 #[tracing::instrument(skip(self, store))]
249 pub async fn build_index_at_operation(
250 &self,
251 operation: &Operation,
252 store: &Arc<Store>,
253 ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
254 tracing::info!("scanning operations to index");
255 let op_links_dir = self.op_links_dir();
256 let legacy_operations_dir = self.legacy_operations_dir();
257 let field_lengths = FieldLengths {
258 commit_id: store.commit_id_length(),
259 change_id: store.change_id_length(),
260 };
261 let mut unindexed_ops = Vec::new();
263 let mut parent_op = None;
264 for op in op_walk::walk_ancestors(slice::from_ref(operation)) {
265 let op = op?;
266 if op_links_dir.join(op.id().hex()).is_file()
267 || legacy_operations_dir.join(op.id().hex()).is_file()
268 {
269 parent_op = Some(op);
270 break;
271 } else {
272 unindexed_ops.push(op);
273 }
274 }
275 let ops_to_visit = if let Some(op) = &parent_op {
276 op_walk::walk_ancestors_range(slice::from_ref(operation), slice::from_ref(op))
279 .try_collect()?
280 } else {
281 unindexed_ops
282 };
283 tracing::info!(
284 ops_count = ops_to_visit.len(),
285 "collecting head commits to index"
286 );
287 let mut historical_heads: HashMap<CommitId, OperationId> = HashMap::new();
288 for op in &ops_to_visit {
289 for commit_id in itertools::chain(
290 op.all_referenced_commit_ids(),
291 op.view()?.all_referenced_commit_ids(),
292 ) {
293 if !historical_heads.contains_key(commit_id) {
294 historical_heads.insert(commit_id.clone(), op.id().clone());
295 }
296 }
297 }
298 let mut mutable_index;
299 let maybe_parent_index;
300 match &parent_op {
301 None => {
302 mutable_index = DefaultMutableIndex::full(field_lengths);
303 maybe_parent_index = None;
304 }
305 Some(op) => {
306 let parent_index = self.load_index_at_operation(op.id(), field_lengths)?;
307 mutable_index = parent_index.start_modification();
308 maybe_parent_index = Some(parent_index);
309 }
310 }
311
312 tracing::info!(
313 ?maybe_parent_index,
314 heads_count = historical_heads.len(),
315 "indexing commits reachable from historical heads"
316 );
317 let parent_index_has_id = |id: &CommitId| {
320 maybe_parent_index
321 .as_ref()
322 .is_some_and(|index| index.has_id(id))
323 };
324 let get_commit_with_op = |commit_id: &CommitId, op_id: &OperationId| {
325 let op_id = op_id.clone();
326 match store.get_commit(commit_id) {
327 Ok(commit) => Ok((CommitByCommitterTimestamp(commit), op_id)),
331 Err(source) => Err(DefaultIndexStoreError::IndexCommits { op_id, source }),
332 }
333 };
334 let commits_to_keep_immediate_predecessors = if ops_to_visit
339 .iter()
340 .any(|op| !op.stores_commit_predecessors())
341 {
342 let mut ancestors = HashSet::new();
343 let mut work = historical_heads.keys().cloned().collect_vec();
344 while let Some(commit_id) = work.pop() {
345 if ancestors.contains(&commit_id) || parent_index_has_id(&commit_id) {
346 continue;
347 }
348 if let Ok(commit) = store.get_commit(&commit_id) {
349 work.extend(commit.parent_ids().iter().cloned());
350 }
351 ancestors.insert(commit_id);
352 }
353 ancestors
354 } else {
355 HashSet::new()
356 };
357 let commits = dag_walk::topo_order_reverse_ord_ok(
358 historical_heads
359 .iter()
360 .filter(|&(commit_id, _)| !parent_index_has_id(commit_id))
361 .map(|(commit_id, op_id)| get_commit_with_op(commit_id, op_id)),
362 |(CommitByCommitterTimestamp(commit), _)| commit.id().clone(),
363 |(CommitByCommitterTimestamp(commit), op_id)| {
364 let keep_predecessors =
365 commits_to_keep_immediate_predecessors.contains(commit.id());
366 itertools::chain(
367 commit.parent_ids(),
368 keep_predecessors
369 .then_some(&commit.store_commit().predecessors)
370 .into_iter()
371 .flatten(),
372 )
373 .filter(|&id| !parent_index_has_id(id))
374 .map(|commit_id| get_commit_with_op(commit_id, op_id))
375 .collect_vec()
376 },
377 |_| panic!("graph has cycle"),
378 )?;
379 for (CommitByCommitterTimestamp(commit), op_id) in commits.iter().rev() {
380 mutable_index.add_commit(commit).await.map_err(|source| {
381 DefaultIndexStoreError::IndexCommits {
382 op_id: op_id.clone(),
383 source,
384 }
385 })?;
386 }
387
388 let index = self.save_mutable_index(mutable_index, operation.id())?;
389 tracing::info!(?index, commits_count = commits.len(), "saved new index");
390
391 Ok(index)
392 }
393
394 #[tracing::instrument(skip(self, store))]
399 pub async fn build_changed_path_index_at_operation(
400 &self,
401 op_id: &OperationId,
402 store: &Arc<Store>,
403 max_commits: u32,
404 ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
406 self.ensure_base_dirs()
408 .map_err(DefaultIndexStoreError::SaveIndex)?;
409 let field_lengths = FieldLengths {
410 commit_id: store.commit_id_length(),
411 change_id: store.change_id_length(),
412 };
413 let index = self.load_index_at_operation(op_id, field_lengths)?;
414 let old_changed_paths = index.changed_paths();
415
416 let pre_start;
420 let pre_end;
421 let post_start;
422 let post_end;
423 if let Some(GlobalCommitPosition(pos)) = old_changed_paths.start_commit_pos() {
424 post_start = pos + old_changed_paths.num_commits();
425 assert!(post_start <= index.num_commits());
426 post_end = u32::saturating_add(post_start, max_commits).min(index.num_commits());
427 pre_start = u32::saturating_sub(pos, max_commits - (post_end - post_start));
428 pre_end = pos;
429 } else {
430 pre_start = u32::saturating_sub(index.num_commits(), max_commits);
431 pre_end = index.num_commits();
432 post_start = pre_end;
433 post_end = pre_end;
434 }
435
436 let to_index_err = |source| DefaultIndexStoreError::IndexCommits {
437 op_id: op_id.clone(),
438 source,
439 };
440 let index_commit = async |changed_paths: &mut CompositeChangedPathIndex,
441 pos: GlobalCommitPosition| {
442 assert_eq!(changed_paths.next_mutable_commit_pos(), Some(pos));
443 let commit_id = index.as_composite().commits().entry_by_pos(pos).commit_id();
444 let commit = store.get_commit_async(&commit_id).await?;
445 let paths = collect_changed_paths(&index, &commit).await?;
446 changed_paths.add_changed_paths(paths);
447 Ok(())
448 };
449
450 let mut new_changed_paths =
452 CompositeChangedPathIndex::empty(GlobalCommitPosition(pre_start));
453 new_changed_paths.make_mutable();
454 tracing::info!(?pre_start, ?pre_end, "indexing changed paths in commits");
455 for pos in (pre_start..pre_end).map(GlobalCommitPosition) {
456 index_commit(&mut new_changed_paths, pos)
457 .await
458 .map_err(to_index_err)?;
459 }
460 new_changed_paths
461 .save_in(&self.changed_path_segments_dir())
462 .map_err(DefaultIndexStoreError::SaveIndex)?;
463
464 new_changed_paths.append_segments(old_changed_paths);
466
467 new_changed_paths.make_mutable();
469 tracing::info!(?post_start, ?post_end, "indexing changed paths in commits");
470 for pos in (post_start..post_end).map(GlobalCommitPosition) {
471 index_commit(&mut new_changed_paths, pos)
472 .await
473 .map_err(to_index_err)?;
474 }
475 new_changed_paths.maybe_squash_with_ancestors();
476 new_changed_paths
477 .save_in(&self.changed_path_segments_dir())
478 .map_err(DefaultIndexStoreError::SaveIndex)?;
479
480 let commits = index.readonly_commits().clone();
482 let index = DefaultReadonlyIndex::from_segment(commits, new_changed_paths);
483 self.associate_index_with_operation(&index, op_id)
484 .map_err(|source| DefaultIndexStoreError::AssociateIndex {
485 op_id: op_id.to_owned(),
486 source,
487 })?;
488 Ok(index)
489 }
490
491 fn save_mutable_index(
492 &self,
493 index: DefaultMutableIndex,
494 op_id: &OperationId,
495 ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
496 self.ensure_base_dirs()
498 .map_err(DefaultIndexStoreError::SaveIndex)?;
499 let (commits, mut changed_paths) = index.into_segment();
500 let commits = commits
501 .maybe_squash_with_ancestors()
502 .save_in(&self.commit_segments_dir())
503 .map_err(DefaultIndexStoreError::SaveIndex)?;
504 changed_paths.maybe_squash_with_ancestors();
505 changed_paths
506 .save_in(&self.changed_path_segments_dir())
507 .map_err(DefaultIndexStoreError::SaveIndex)?;
508 let index = DefaultReadonlyIndex::from_segment(commits, changed_paths);
509 self.associate_index_with_operation(&index, op_id)
510 .map_err(|source| DefaultIndexStoreError::AssociateIndex {
511 op_id: op_id.to_owned(),
512 source,
513 })?;
514 Ok(index)
515 }
516
517 fn associate_index_with_operation(
519 &self,
520 index: &DefaultReadonlyIndex,
521 op_id: &OperationId,
522 ) -> Result<(), PathError> {
523 let proto = crate::protos::default_index::SegmentControl {
524 commit_segment_id: index.readonly_commits().id().to_bytes(),
525 changed_path_start_commit_pos: index
526 .changed_paths()
527 .start_commit_pos()
528 .map(|GlobalCommitPosition(start)| start),
529 changed_path_segment_ids: index
530 .changed_paths()
531 .readonly_segments()
532 .iter()
533 .map(|segment| segment.id().to_bytes())
534 .collect(),
535 };
536 let dir = self.op_links_dir();
537 let mut temp_file = NamedTempFile::new_in(&dir).context(&dir)?;
538 let file = temp_file.as_file_mut();
539 file.write_all(&proto.encode_to_vec())
540 .context(temp_file.path())?;
541 let path = dir.join(op_id.hex());
542 persist_temp_file(temp_file, &path).context(&path)?;
543
544 let dir = self.legacy_operations_dir();
546 let mut temp_file = NamedTempFile::new_in(&dir).context(&dir)?;
547 let file = temp_file.as_file_mut();
548 file.write_all(index.readonly_commits().id().hex().as_bytes())
549 .context(temp_file.path())?;
550 let path = dir.join(op_id.hex());
551 persist_temp_file(temp_file, &path).context(&path)?;
552 Ok(())
553 }
554}
555
556impl IndexStore for DefaultIndexStore {
557 fn name(&self) -> &str {
558 Self::name()
559 }
560
561 fn get_index_at_op(
562 &self,
563 op: &Operation,
564 store: &Arc<Store>,
565 ) -> Result<Box<dyn ReadonlyIndex>, IndexReadError> {
566 let field_lengths = FieldLengths {
567 commit_id: store.commit_id_length(),
568 change_id: store.change_id_length(),
569 };
570 let index = match self.load_index_at_operation(op.id(), field_lengths) {
571 Err(DefaultIndexStoreError::LoadAssociation(PathError { source: error, .. }))
572 if error.kind() == io::ErrorKind::NotFound =>
573 {
574 self.build_index_at_operation(op, store).block_on()
575 }
576 Err(DefaultIndexStoreError::LoadIndex(err)) if err.is_corrupt_or_not_found() => {
577 match &err {
580 ReadonlyIndexLoadError::UnexpectedVersion {
581 kind,
582 found_version,
583 expected_version,
584 } => {
585 eprintln!(
586 "Found {kind} index format version {found_version}, expected version \
587 {expected_version}. Reindexing..."
588 );
589 }
590 ReadonlyIndexLoadError::Other { error, .. } => {
591 eprintln!("{err} (maybe the format has changed): {error}. Reindexing...");
592 }
593 }
594 self.reinit().map_err(|err| IndexReadError(err.into()))?;
595 self.build_index_at_operation(op, store).block_on()
596 }
597 result => result,
598 }
599 .map_err(|err| IndexReadError(err.into()))?;
600 Ok(Box::new(index))
601 }
602
603 fn write_index(
604 &self,
605 index: Box<dyn MutableIndex>,
606 op: &Operation,
607 ) -> Result<Box<dyn ReadonlyIndex>, IndexWriteError> {
608 let index: Box<DefaultMutableIndex> = index
609 .downcast()
610 .expect("index to merge in must be a DefaultMutableIndex");
611 let index = self
612 .save_mutable_index(*index, op.id())
613 .map_err(|err| IndexWriteError(err.into()))?;
614 Ok(Box::new(index))
615 }
616}