Skip to main content

jj_lib/default_index/
store.rs

1// Copyright 2023 The Jujutsu Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![expect(missing_docs)]
16
17use std::collections::HashMap;
18use std::collections::HashSet;
19use std::fs;
20use std::io;
21use std::io::Write as _;
22use std::path::Path;
23use std::path::PathBuf;
24use std::pin::pin;
25use std::slice;
26use std::sync::Arc;
27
28use async_trait::async_trait;
29use futures::StreamExt as _;
30use futures::TryStreamExt as _;
31use itertools::Itertools as _;
32use prost::Message as _;
33use tempfile::NamedTempFile;
34use thiserror::Error;
35
36use super::changed_path::ChangedPathIndexSegmentId;
37use super::changed_path::CompositeChangedPathIndex;
38use super::changed_path::collect_changed_paths;
39use super::composite::AsCompositeIndex as _;
40use super::composite::CommitIndexSegmentId;
41use super::entry::GlobalCommitPosition;
42use super::mutable::DefaultMutableIndex;
43use super::readonly::DefaultReadonlyIndex;
44use super::readonly::FieldLengths;
45use super::readonly::ReadonlyCommitIndexSegment;
46use super::readonly::ReadonlyIndexLoadError;
47use crate::backend::BackendError;
48use crate::backend::BackendInitError;
49use crate::backend::CommitId;
50use crate::commit::CommitByCommitterTimestamp;
51use crate::dag_walk_async;
52use crate::file_util;
53use crate::file_util::IoResultExt as _;
54use crate::file_util::PathError;
55use crate::file_util::persist_temp_file;
56use crate::index::IndexStore;
57use crate::index::IndexStoreError;
58use crate::index::IndexStoreResult;
59use crate::index::MutableIndex;
60use crate::index::ReadonlyIndex;
61use crate::object_id::ObjectId as _;
62use crate::op_store::OpStoreError;
63use crate::op_store::OperationId;
64use crate::op_walk;
65use crate::operation::Operation;
66use crate::store::Store;
67
68// BLAKE2b-512 hash length in hex string
69const SEGMENT_FILE_NAME_LENGTH: usize = 64 * 2;
70
71/// Error that may occur during `DefaultIndexStore` initialization.
72#[derive(Debug, Error)]
73#[error("Failed to initialize index store")]
74pub struct DefaultIndexStoreInitError(#[from] pub PathError);
75
76impl From<DefaultIndexStoreInitError> for BackendInitError {
77    fn from(err: DefaultIndexStoreInitError) -> Self {
78        Self(err.into())
79    }
80}
81
82#[derive(Debug, Error)]
83pub enum DefaultIndexStoreError {
84    #[error("Failed to associate index files with an operation {op_id}")]
85    AssociateIndex {
86        op_id: OperationId,
87        source: PathError,
88    },
89    #[error("Failed to load associated index file names")]
90    LoadAssociation(#[source] PathError),
91    #[error(transparent)]
92    LoadIndex(ReadonlyIndexLoadError),
93    #[error("Failed to write index file")]
94    SaveIndex(#[source] PathError),
95    #[error("Failed to index commits at operation {op_id}")]
96    IndexCommits {
97        op_id: OperationId,
98        source: BackendError,
99    },
100    #[error(transparent)]
101    OpStore(#[from] OpStoreError),
102}
103
104#[derive(Debug)]
105pub struct DefaultIndexStore {
106    dir: PathBuf,
107}
108
109impl DefaultIndexStore {
110    pub fn name() -> &'static str {
111        "default"
112    }
113
114    pub fn init(dir: &Path) -> Result<Self, DefaultIndexStoreInitError> {
115        let store = Self {
116            dir: dir.to_owned(),
117        };
118        store.ensure_base_dirs()?;
119        Ok(store)
120    }
121
122    pub fn load(dir: &Path) -> Self {
123        Self {
124            dir: dir.to_owned(),
125        }
126    }
127
128    pub fn reinit(&self) -> Result<(), DefaultIndexStoreInitError> {
129        // Create base directories in case the store was initialized by old jj.
130        self.ensure_base_dirs()?;
131        // Remove all operation links to trigger rebuilding.
132        file_util::remove_dir_contents(&self.op_links_dir())?;
133        let legacy_operations_dir = self.dir.join("operations"); // jj < 0.33
134        if legacy_operations_dir.exists() {
135            file_util::remove_dir_contents(&legacy_operations_dir)?;
136            fs::remove_dir(&legacy_operations_dir).context(&legacy_operations_dir)?;
137        }
138        // Remove index segments to save disk space. If raced, new segment file
139        // will be created by the other process.
140        file_util::remove_dir_contents(&self.commit_segments_dir())?;
141        file_util::remove_dir_contents(&self.changed_path_segments_dir())?;
142        // jj <= 0.14 created segment files in the top directory
143        for entry in self.dir.read_dir().context(&self.dir)? {
144            let entry = entry.context(&self.dir)?;
145            let path = entry.path();
146            if path.file_name().unwrap().len() != SEGMENT_FILE_NAME_LENGTH {
147                // Skip "type" file, "operations" directory, etc.
148                continue;
149            }
150            fs::remove_file(&path).context(&path)?;
151        }
152        Ok(())
153    }
154
155    fn ensure_base_dirs(&self) -> Result<(), PathError> {
156        for dir in [
157            self.op_links_dir(),
158            self.commit_segments_dir(),
159            self.changed_path_segments_dir(),
160        ] {
161            file_util::create_or_reuse_dir(&dir).context(&dir)?;
162        }
163        Ok(())
164    }
165
166    /// Directory for mapping from operations to segments. (jj >= 0.33)
167    fn op_links_dir(&self) -> PathBuf {
168        self.dir.join("op_links")
169    }
170
171    /// Directory for commit segment files.
172    fn commit_segments_dir(&self) -> PathBuf {
173        self.dir.join("segments")
174    }
175
176    /// Directory for changed-path segment files.
177    fn changed_path_segments_dir(&self) -> PathBuf {
178        self.dir.join("changed_paths")
179    }
180
181    fn load_index_at_operation(
182        &self,
183        op_id: &OperationId,
184        lengths: FieldLengths,
185    ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
186        let op_link_file = self.op_links_dir().join(op_id.hex());
187        let data = fs::read(&op_link_file)
188            .context(&op_link_file)
189            .map_err(DefaultIndexStoreError::LoadAssociation)?;
190        let proto = crate::protos::default_index::SegmentControl::decode(&*data)
191            .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))
192            .context(&op_link_file)
193            .map_err(DefaultIndexStoreError::LoadAssociation)?;
194        let commit_segment_id = CommitIndexSegmentId::new(proto.commit_segment_id);
195        let changed_path_start_commit_pos = proto
196            .changed_path_start_commit_pos
197            .map(GlobalCommitPosition);
198        let changed_path_segment_ids = proto
199            .changed_path_segment_ids
200            .into_iter()
201            .map(ChangedPathIndexSegmentId::new)
202            .collect_vec();
203
204        let commits = ReadonlyCommitIndexSegment::load(
205            &self.commit_segments_dir(),
206            commit_segment_id,
207            lengths,
208        )
209        .map_err(DefaultIndexStoreError::LoadIndex)?;
210        // TODO: lazy load or mmap?
211        let changed_paths = if let Some(start_commit_pos) = changed_path_start_commit_pos {
212            CompositeChangedPathIndex::load(
213                &self.changed_path_segments_dir(),
214                start_commit_pos,
215                &changed_path_segment_ids,
216            )
217            .map_err(DefaultIndexStoreError::LoadIndex)?
218        } else {
219            CompositeChangedPathIndex::null()
220        };
221        Ok(DefaultReadonlyIndex::from_segment(commits, changed_paths))
222    }
223
224    /// Rebuilds index for the given `operation`.
225    ///
226    /// The index to be built will be calculated from one of the ancestor
227    /// operations if exists. Use `reinit()` to rebuild index from scratch.
228    #[tracing::instrument(skip(self, store))]
229    pub async fn build_index_at_operation(
230        &self,
231        operation: &Operation,
232        store: &Arc<Store>,
233    ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
234        tracing::info!("scanning operations to index");
235        let op_links_dir = self.op_links_dir();
236        let field_lengths = FieldLengths {
237            commit_id: store.commit_id_length(),
238            change_id: store.change_id_length(),
239        };
240        // Pick the latest existing ancestor operation as the parent segment.
241        let mut unindexed_ops = Vec::new();
242        let mut parent_op = None;
243        let mut ancestors = pin!(op_walk::walk_ancestors(slice::from_ref(operation)));
244        while let Some(op) = ancestors.next().await {
245            let op = op?;
246            if op_links_dir.join(op.id().hex()).is_file() {
247                parent_op = Some(op);
248                break;
249            } else {
250                unindexed_ops.push(op);
251            }
252        }
253        let ops_to_visit = if let Some(op) = &parent_op {
254            // There may be concurrent ops, so revisit from the head. The parent
255            // op is usually shallow if existed.
256            op_walk::walk_ancestors_range(slice::from_ref(operation), slice::from_ref(op))
257                .try_collect()
258                .await?
259        } else {
260            unindexed_ops
261        };
262        tracing::info!(
263            ops_count = ops_to_visit.len(),
264            "collecting head commits to index"
265        );
266        let mut historical_heads: HashMap<CommitId, OperationId> = HashMap::new();
267        for op in &ops_to_visit {
268            for commit_id in itertools::chain(
269                op.all_referenced_commit_ids(),
270                op.view().await?.all_referenced_commit_ids(),
271            ) {
272                if !historical_heads.contains_key(commit_id) {
273                    historical_heads.insert(commit_id.clone(), op.id().clone());
274                }
275            }
276        }
277        let mut mutable_index;
278        let maybe_parent_index;
279        match &parent_op {
280            None => {
281                mutable_index = DefaultMutableIndex::full(field_lengths);
282                maybe_parent_index = None;
283            }
284            Some(op) => {
285                let parent_index = self.load_index_at_operation(op.id(), field_lengths)?;
286                mutable_index = parent_index.start_modification();
287                maybe_parent_index = Some(parent_index);
288            }
289        }
290
291        tracing::info!(
292            ?maybe_parent_index,
293            heads_count = historical_heads.len(),
294            "indexing commits reachable from historical heads"
295        );
296        // Build a list of ancestors of heads where parents come after the
297        // commit itself.
298        let parent_index_has_id = |id: &CommitId| {
299            maybe_parent_index
300                .as_ref()
301                .is_some_and(|index| index.has_id_impl(id))
302        };
303        let get_commit_with_op = |commit_id: &CommitId, op_id: &OperationId| {
304            let op_id = op_id.clone();
305            match store.get_commit(commit_id) {
306                // Propagate head's op_id to report possible source of an error.
307                // The op_id doesn't have to be included in the sort key, but
308                // that wouldn't matter since the commit should be unique.
309                Ok(commit) => Ok((CommitByCommitterTimestamp(commit), op_id)),
310                Err(source) => Err(DefaultIndexStoreError::IndexCommits { op_id, source }),
311            }
312        };
313        // Retain immediate predecessors if legacy operation exists. Some
314        // commands (e.g. squash into grandparent) may leave transitive
315        // predecessors, which aren't visible to any views.
316        // TODO: delete this workaround with commit.predecessors.
317        let commits_to_keep_immediate_predecessors = if ops_to_visit
318            .iter()
319            .any(|op| !op.stores_commit_predecessors())
320        {
321            let mut ancestors = HashSet::new();
322            let mut work = historical_heads.keys().cloned().collect_vec();
323            while let Some(commit_id) = work.pop() {
324                if ancestors.contains(&commit_id) || parent_index_has_id(&commit_id) {
325                    continue;
326                }
327                if let Ok(commit) = store.get_commit(&commit_id) {
328                    work.extend(commit.parent_ids().iter().cloned());
329                }
330                ancestors.insert(commit_id);
331            }
332            ancestors
333        } else {
334            HashSet::new()
335        };
336        let commits = dag_walk_async::topo_order_reverse_ord(
337            historical_heads
338                .iter()
339                .filter(|&(commit_id, _)| !parent_index_has_id(commit_id))
340                .map(|(commit_id, op_id)| get_commit_with_op(commit_id, op_id)),
341            |(CommitByCommitterTimestamp(commit), _)| commit.id().clone(),
342            async |(CommitByCommitterTimestamp(commit), op_id)| {
343                let keep_predecessors =
344                    commits_to_keep_immediate_predecessors.contains(commit.id());
345                itertools::chain(
346                    commit.parent_ids(),
347                    keep_predecessors
348                        .then_some(&commit.store_commit().predecessors)
349                        .into_iter()
350                        .flatten(),
351                )
352                .filter(|&id| !parent_index_has_id(id))
353                .map(|commit_id| get_commit_with_op(commit_id, op_id))
354                .collect_vec()
355            },
356            |_| panic!("graph has cycle"),
357        )
358        .await?;
359        for (CommitByCommitterTimestamp(commit), op_id) in commits.iter().rev() {
360            mutable_index.add_commit(commit).await.map_err(|source| {
361                DefaultIndexStoreError::IndexCommits {
362                    op_id: op_id.clone(),
363                    source,
364                }
365            })?;
366        }
367
368        let index = self.save_mutable_index(mutable_index, operation.id())?;
369        tracing::info!(?index, commits_count = commits.len(), "saved new index");
370
371        Ok(index)
372    }
373
374    /// Builds changed-path index for the specified operation.
375    ///
376    /// At most `max_commits` number of commits will be scanned from the latest
377    /// unindexed commit.
378    #[tracing::instrument(skip(self, store, progress_callback))]
379    pub async fn build_changed_path_index_at_operation(
380        &self,
381        op_id: &OperationId,
382        store: &Arc<Store>,
383        max_commits: u32,
384        mut progress_callback: impl FnMut(&DefaultChangedPathIndexProgress),
385    ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
386        // Create directories in case the store was initialized by jj < 0.33.
387        self.ensure_base_dirs()
388            .map_err(DefaultIndexStoreError::SaveIndex)?;
389        let field_lengths = FieldLengths {
390            commit_id: store.commit_id_length(),
391            change_id: store.change_id_length(),
392        };
393        let index = self.load_index_at_operation(op_id, field_lengths)?;
394        let old_changed_paths = index.changed_paths();
395
396        // Distribute max_commits to contiguous pre/post ranges:
397        //   ..|pre|old_changed_paths|post|
398        //   (where pre.len() + post.len() <= max_commits)
399        let pre_start;
400        let pre_end;
401        let post_start;
402        let post_end;
403        if let Some(GlobalCommitPosition(pos)) = old_changed_paths.start_commit_pos() {
404            post_start = pos + old_changed_paths.num_commits();
405            assert!(post_start <= index.num_commits());
406            post_end = u32::saturating_add(post_start, max_commits).min(index.num_commits());
407            pre_start = u32::saturating_sub(pos, max_commits - (post_end - post_start));
408            pre_end = pos;
409        } else {
410            pre_start = u32::saturating_sub(index.num_commits(), max_commits);
411            pre_end = index.num_commits();
412            post_start = pre_end;
413            post_end = pre_end;
414        }
415
416        let mut progress = DefaultChangedPathIndexProgress {
417            current: 0,
418            total: (pre_end - pre_start) + (post_end - post_start),
419        };
420        let mut emit_progress = || {
421            progress_callback(&progress);
422            progress.current += 1;
423        };
424
425        let to_index_err = |source| DefaultIndexStoreError::IndexCommits {
426            op_id: op_id.clone(),
427            source,
428        };
429        let index_commit = async |changed_paths: &mut CompositeChangedPathIndex,
430                                  pos: GlobalCommitPosition| {
431            assert_eq!(changed_paths.next_mutable_commit_pos(), Some(pos));
432            let commit_id = index.as_composite().commits().entry_by_pos(pos).commit_id();
433            let commit = store.get_commit_async(&commit_id).await?;
434            let paths = collect_changed_paths(&index, &commit).await?;
435            changed_paths.add_changed_paths(paths);
436            Ok(())
437        };
438
439        // Index pre range
440        let mut new_changed_paths =
441            CompositeChangedPathIndex::empty(GlobalCommitPosition(pre_start));
442        new_changed_paths.make_mutable();
443        tracing::info!(?pre_start, ?pre_end, "indexing changed paths in commits");
444        for pos in (pre_start..pre_end).map(GlobalCommitPosition) {
445            emit_progress();
446            index_commit(&mut new_changed_paths, pos)
447                .await
448                .map_err(to_index_err)?;
449        }
450        new_changed_paths
451            .save_in(&self.changed_path_segments_dir())
452            .map_err(DefaultIndexStoreError::SaveIndex)?;
453
454        // Copy previously-indexed segments
455        new_changed_paths.append_segments(old_changed_paths);
456
457        // Index post range, which is usually empty
458        new_changed_paths.make_mutable();
459        tracing::info!(?post_start, ?post_end, "indexing changed paths in commits");
460        for pos in (post_start..post_end).map(GlobalCommitPosition) {
461            emit_progress();
462            index_commit(&mut new_changed_paths, pos)
463                .await
464                .map_err(to_index_err)?;
465        }
466        new_changed_paths.maybe_squash_with_ancestors();
467        new_changed_paths
468            .save_in(&self.changed_path_segments_dir())
469            .map_err(DefaultIndexStoreError::SaveIndex)?;
470
471        // Update the operation link to point to the new segments
472        let commits = index.readonly_commits().clone();
473        let index = DefaultReadonlyIndex::from_segment(commits, new_changed_paths);
474        self.associate_index_with_operation(&index, op_id)
475            .map_err(|source| DefaultIndexStoreError::AssociateIndex {
476                op_id: op_id.to_owned(),
477                source,
478            })?;
479        emit_progress();
480        Ok(index)
481    }
482
483    fn save_mutable_index(
484        &self,
485        index: DefaultMutableIndex,
486        op_id: &OperationId,
487    ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
488        // Create directories in case the store was initialized by jj < 0.33.
489        self.ensure_base_dirs()
490            .map_err(DefaultIndexStoreError::SaveIndex)?;
491        let (commits, mut changed_paths) = index.into_segment();
492        let commits = commits
493            .maybe_squash_with_ancestors()
494            .save_in(&self.commit_segments_dir())
495            .map_err(DefaultIndexStoreError::SaveIndex)?;
496        changed_paths.maybe_squash_with_ancestors();
497        changed_paths
498            .save_in(&self.changed_path_segments_dir())
499            .map_err(DefaultIndexStoreError::SaveIndex)?;
500        let index = DefaultReadonlyIndex::from_segment(commits, changed_paths);
501        self.associate_index_with_operation(&index, op_id)
502            .map_err(|source| DefaultIndexStoreError::AssociateIndex {
503                op_id: op_id.to_owned(),
504                source,
505            })?;
506        Ok(index)
507    }
508
509    /// Records a link from the given operation to the this index version.
510    fn associate_index_with_operation(
511        &self,
512        index: &DefaultReadonlyIndex,
513        op_id: &OperationId,
514    ) -> Result<(), PathError> {
515        let proto = crate::protos::default_index::SegmentControl {
516            commit_segment_id: index.readonly_commits().id().to_bytes(),
517            changed_path_start_commit_pos: index
518                .changed_paths()
519                .start_commit_pos()
520                .map(|GlobalCommitPosition(start)| start),
521            changed_path_segment_ids: index
522                .changed_paths()
523                .readonly_segments()
524                .iter()
525                .map(|segment| segment.id().to_bytes())
526                .collect(),
527        };
528        let dir = self.op_links_dir();
529        let mut temp_file = NamedTempFile::new_in(&dir).context(&dir)?;
530        let file = temp_file.as_file_mut();
531        file.write_all(&proto.encode_to_vec())
532            .context(temp_file.path())?;
533        let path = dir.join(op_id.hex());
534        persist_temp_file(temp_file, &path).context(&path)?;
535        Ok(())
536    }
537}
538
539#[async_trait(?Send)]
540impl IndexStore for DefaultIndexStore {
541    fn name(&self) -> &str {
542        Self::name()
543    }
544
545    async fn get_index_at_op(
546        &self,
547        op: &Operation,
548        store: &Arc<Store>,
549    ) -> IndexStoreResult<Box<dyn ReadonlyIndex>> {
550        let field_lengths = FieldLengths {
551            commit_id: store.commit_id_length(),
552            change_id: store.change_id_length(),
553        };
554        let index = match self.load_index_at_operation(op.id(), field_lengths) {
555            Err(DefaultIndexStoreError::LoadAssociation(PathError { source: error, .. }))
556                if error.kind() == io::ErrorKind::NotFound =>
557            {
558                self.build_index_at_operation(op, store).await
559            }
560            Err(DefaultIndexStoreError::LoadIndex(err)) if err.is_corrupt_or_not_found() => {
561                // If the index was corrupt (maybe it was written in a different format),
562                // we just reindex.
563                match &err {
564                    ReadonlyIndexLoadError::UnexpectedVersion {
565                        kind,
566                        found_version,
567                        expected_version,
568                    } => {
569                        eprintln!(
570                            "Found {kind} index format version {found_version}, expected version \
571                             {expected_version}. Reindexing..."
572                        );
573                    }
574                    ReadonlyIndexLoadError::Other { error, .. } => {
575                        eprintln!("{err} (maybe the format has changed): {error}. Reindexing...");
576                    }
577                }
578                self.reinit()
579                    .map_err(|err| IndexStoreError::Read(err.into()))?;
580                self.build_index_at_operation(op, store).await
581            }
582            result => result,
583        }
584        .map_err(|err| IndexStoreError::Read(err.into()))?;
585        Ok(Box::new(index))
586    }
587
588    fn write_index(
589        &self,
590        index: Box<dyn MutableIndex>,
591        op: &Operation,
592    ) -> IndexStoreResult<Box<dyn ReadonlyIndex>> {
593        let index: Box<DefaultMutableIndex> = index
594            .downcast()
595            .expect("index to merge in must be a DefaultMutableIndex");
596        let index = self
597            .save_mutable_index(*index, op.id())
598            .map_err(|err| IndexStoreError::Write(err.into()))?;
599        Ok(Box::new(index))
600    }
601}
602
603/// Progress of [`DefaultIndexStore::build_changed_path_index_at_operation()`].
604#[derive(Clone, Debug)]
605pub struct DefaultChangedPathIndexProgress {
606    pub current: u32,
607    pub total: u32,
608}