jj_lib/default_index/
store.rs

1// Copyright 2023 The Jujutsu Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![expect(missing_docs)]
16
17use std::collections::HashMap;
18use std::collections::HashSet;
19use std::fs;
20use std::io;
21use std::io::Write as _;
22use std::path::Path;
23use std::path::PathBuf;
24use std::slice;
25use std::sync::Arc;
26
27use itertools::Itertools as _;
28use pollster::FutureExt as _;
29use prost::Message as _;
30use tempfile::NamedTempFile;
31use thiserror::Error;
32
33use super::changed_path::ChangedPathIndexSegmentId;
34use super::changed_path::CompositeChangedPathIndex;
35use super::changed_path::collect_changed_paths;
36use super::composite::AsCompositeIndex as _;
37use super::composite::CommitIndexSegmentId;
38use super::entry::GlobalCommitPosition;
39use super::mutable::DefaultMutableIndex;
40use super::readonly::DefaultReadonlyIndex;
41use super::readonly::FieldLengths;
42use super::readonly::ReadonlyCommitIndexSegment;
43use super::readonly::ReadonlyIndexLoadError;
44use crate::backend::BackendError;
45use crate::backend::BackendInitError;
46use crate::backend::CommitId;
47use crate::commit::CommitByCommitterTimestamp;
48use crate::dag_walk;
49use crate::file_util;
50use crate::file_util::IoResultExt as _;
51use crate::file_util::PathError;
52use crate::file_util::persist_temp_file;
53use crate::index::Index as _;
54use crate::index::IndexReadError;
55use crate::index::IndexStore;
56use crate::index::IndexWriteError;
57use crate::index::MutableIndex;
58use crate::index::ReadonlyIndex;
59use crate::object_id::ObjectId as _;
60use crate::op_store::OpStoreError;
61use crate::op_store::OperationId;
62use crate::op_walk;
63use crate::operation::Operation;
64use crate::store::Store;
65
66// BLAKE2b-512 hash length in hex string
67const SEGMENT_FILE_NAME_LENGTH: usize = 64 * 2;
68
69/// Error that may occur during `DefaultIndexStore` initialization.
70#[derive(Debug, Error)]
71#[error("Failed to initialize index store")]
72pub struct DefaultIndexStoreInitError(#[from] pub PathError);
73
74impl From<DefaultIndexStoreInitError> for BackendInitError {
75    fn from(err: DefaultIndexStoreInitError) -> Self {
76        Self(err.into())
77    }
78}
79
80#[derive(Debug, Error)]
81pub enum DefaultIndexStoreError {
82    #[error("Failed to associate index files with an operation {op_id}")]
83    AssociateIndex {
84        op_id: OperationId,
85        source: PathError,
86    },
87    #[error("Failed to load associated index file names")]
88    LoadAssociation(#[source] PathError),
89    #[error(transparent)]
90    LoadIndex(ReadonlyIndexLoadError),
91    #[error("Failed to write index file")]
92    SaveIndex(#[source] PathError),
93    #[error("Failed to index commits at operation {op_id}")]
94    IndexCommits {
95        op_id: OperationId,
96        source: BackendError,
97    },
98    #[error(transparent)]
99    OpStore(#[from] OpStoreError),
100}
101
102#[derive(Debug)]
103pub struct DefaultIndexStore {
104    dir: PathBuf,
105}
106
107impl DefaultIndexStore {
108    pub fn name() -> &'static str {
109        "default"
110    }
111
112    pub fn init(dir: &Path) -> Result<Self, DefaultIndexStoreInitError> {
113        let store = Self {
114            dir: dir.to_owned(),
115        };
116        store.ensure_base_dirs()?;
117        Ok(store)
118    }
119
120    pub fn load(dir: &Path) -> Self {
121        Self {
122            dir: dir.to_owned(),
123        }
124    }
125
126    pub fn reinit(&self) -> Result<(), DefaultIndexStoreInitError> {
127        // Create base directories in case the store was initialized by old jj.
128        self.ensure_base_dirs()?;
129        // Remove all operation links to trigger rebuilding.
130        file_util::remove_dir_contents(&self.op_links_dir())?;
131        file_util::remove_dir_contents(&self.legacy_operations_dir())?;
132        // Remove index segments to save disk space. If raced, new segment file
133        // will be created by the other process.
134        file_util::remove_dir_contents(&self.commit_segments_dir())?;
135        file_util::remove_dir_contents(&self.changed_path_segments_dir())?;
136        // jj <= 0.14 created segment files in the top directory
137        for entry in self.dir.read_dir().context(&self.dir)? {
138            let entry = entry.context(&self.dir)?;
139            let path = entry.path();
140            if path.file_name().unwrap().len() != SEGMENT_FILE_NAME_LENGTH {
141                // Skip "type" file, "operations" directory, etc.
142                continue;
143            }
144            fs::remove_file(&path).context(&path)?;
145        }
146        Ok(())
147    }
148
149    fn ensure_base_dirs(&self) -> Result<(), PathError> {
150        for dir in [
151            self.op_links_dir(),
152            self.legacy_operations_dir(),
153            self.commit_segments_dir(),
154            self.changed_path_segments_dir(),
155        ] {
156            file_util::create_or_reuse_dir(&dir).context(&dir)?;
157        }
158        Ok(())
159    }
160
161    /// Directory for mapping from operations to segments. (jj >= 0.33)
162    fn op_links_dir(&self) -> PathBuf {
163        self.dir.join("op_links")
164    }
165
166    /// Directory for mapping from operations to commit segments. (jj < 0.33)
167    fn legacy_operations_dir(&self) -> PathBuf {
168        self.dir.join("operations")
169    }
170
171    /// Directory for commit segment files.
172    fn commit_segments_dir(&self) -> PathBuf {
173        self.dir.join("segments")
174    }
175
176    /// Directory for changed-path segment files.
177    fn changed_path_segments_dir(&self) -> PathBuf {
178        self.dir.join("changed_paths")
179    }
180
181    fn load_index_at_operation(
182        &self,
183        op_id: &OperationId,
184        lengths: FieldLengths,
185    ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
186        let commit_segment_id;
187        let changed_path_start_commit_pos;
188        let changed_path_segment_ids;
189        let op_link_file = self.op_links_dir().join(op_id.hex());
190        match fs::read(&op_link_file).context(&op_link_file) {
191            Ok(data) => {
192                let proto = crate::protos::default_index::SegmentControl::decode(&*data)
193                    .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))
194                    .context(&op_link_file)
195                    .map_err(DefaultIndexStoreError::LoadAssociation)?;
196                commit_segment_id = CommitIndexSegmentId::new(proto.commit_segment_id);
197                changed_path_start_commit_pos = proto
198                    .changed_path_start_commit_pos
199                    .map(GlobalCommitPosition);
200                changed_path_segment_ids = proto
201                    .changed_path_segment_ids
202                    .into_iter()
203                    .map(ChangedPathIndexSegmentId::new)
204                    .collect_vec();
205            }
206            // TODO: drop support for legacy operation link file in jj 0.39 or so
207            Err(PathError { source: error, .. }) if error.kind() == io::ErrorKind::NotFound => {
208                let op_id_file = self.legacy_operations_dir().join(op_id.hex());
209                let data = fs::read(&op_id_file)
210                    .context(&op_id_file)
211                    .map_err(DefaultIndexStoreError::LoadAssociation)?;
212                commit_segment_id = CommitIndexSegmentId::try_from_hex(&data)
213                    .ok_or_else(|| {
214                        io::Error::new(io::ErrorKind::InvalidData, "file name is not valid hex")
215                    })
216                    .context(&op_id_file)
217                    .map_err(DefaultIndexStoreError::LoadAssociation)?;
218                changed_path_start_commit_pos = None;
219                changed_path_segment_ids = vec![];
220            }
221            Err(err) => return Err(DefaultIndexStoreError::LoadAssociation(err)),
222        };
223
224        let commits = ReadonlyCommitIndexSegment::load(
225            &self.commit_segments_dir(),
226            commit_segment_id,
227            lengths,
228        )
229        .map_err(DefaultIndexStoreError::LoadIndex)?;
230        // TODO: lazy load or mmap?
231        let changed_paths = if let Some(start_commit_pos) = changed_path_start_commit_pos {
232            CompositeChangedPathIndex::load(
233                &self.changed_path_segments_dir(),
234                start_commit_pos,
235                &changed_path_segment_ids,
236            )
237            .map_err(DefaultIndexStoreError::LoadIndex)?
238        } else {
239            CompositeChangedPathIndex::null()
240        };
241        Ok(DefaultReadonlyIndex::from_segment(commits, changed_paths))
242    }
243
244    /// Rebuilds index for the given `operation`.
245    ///
246    /// The index to be built will be calculated from one of the ancestor
247    /// operations if exists. Use `reinit()` to rebuild index from scratch.
248    #[tracing::instrument(skip(self, store))]
249    pub async fn build_index_at_operation(
250        &self,
251        operation: &Operation,
252        store: &Arc<Store>,
253    ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
254        tracing::info!("scanning operations to index");
255        let op_links_dir = self.op_links_dir();
256        let legacy_operations_dir = self.legacy_operations_dir();
257        let field_lengths = FieldLengths {
258            commit_id: store.commit_id_length(),
259            change_id: store.change_id_length(),
260        };
261        // Pick the latest existing ancestor operation as the parent segment.
262        let mut unindexed_ops = Vec::new();
263        let mut parent_op = None;
264        for op in op_walk::walk_ancestors(slice::from_ref(operation)) {
265            let op = op?;
266            if op_links_dir.join(op.id().hex()).is_file()
267                || legacy_operations_dir.join(op.id().hex()).is_file()
268            {
269                parent_op = Some(op);
270                break;
271            } else {
272                unindexed_ops.push(op);
273            }
274        }
275        let ops_to_visit = if let Some(op) = &parent_op {
276            // There may be concurrent ops, so revisit from the head. The parent
277            // op is usually shallow if existed.
278            op_walk::walk_ancestors_range(slice::from_ref(operation), slice::from_ref(op))
279                .try_collect()?
280        } else {
281            unindexed_ops
282        };
283        tracing::info!(
284            ops_count = ops_to_visit.len(),
285            "collecting head commits to index"
286        );
287        let mut historical_heads: HashMap<CommitId, OperationId> = HashMap::new();
288        for op in &ops_to_visit {
289            for commit_id in itertools::chain(
290                op.all_referenced_commit_ids(),
291                op.view()?.all_referenced_commit_ids(),
292            ) {
293                if !historical_heads.contains_key(commit_id) {
294                    historical_heads.insert(commit_id.clone(), op.id().clone());
295                }
296            }
297        }
298        let mut mutable_index;
299        let maybe_parent_index;
300        match &parent_op {
301            None => {
302                mutable_index = DefaultMutableIndex::full(field_lengths);
303                maybe_parent_index = None;
304            }
305            Some(op) => {
306                let parent_index = self.load_index_at_operation(op.id(), field_lengths)?;
307                mutable_index = parent_index.start_modification();
308                maybe_parent_index = Some(parent_index);
309            }
310        }
311
312        tracing::info!(
313            ?maybe_parent_index,
314            heads_count = historical_heads.len(),
315            "indexing commits reachable from historical heads"
316        );
317        // Build a list of ancestors of heads where parents come after the
318        // commit itself.
319        let parent_index_has_id = |id: &CommitId| {
320            maybe_parent_index
321                .as_ref()
322                .is_some_and(|index| index.has_id(id))
323        };
324        let get_commit_with_op = |commit_id: &CommitId, op_id: &OperationId| {
325            let op_id = op_id.clone();
326            match store.get_commit(commit_id) {
327                // Propagate head's op_id to report possible source of an error.
328                // The op_id doesn't have to be included in the sort key, but
329                // that wouldn't matter since the commit should be unique.
330                Ok(commit) => Ok((CommitByCommitterTimestamp(commit), op_id)),
331                Err(source) => Err(DefaultIndexStoreError::IndexCommits { op_id, source }),
332            }
333        };
334        // Retain immediate predecessors if legacy operation exists. Some
335        // commands (e.g. squash into grandparent) may leave transitive
336        // predecessors, which aren't visible to any views.
337        // TODO: delete this workaround with commit.predecessors.
338        let commits_to_keep_immediate_predecessors = if ops_to_visit
339            .iter()
340            .any(|op| !op.stores_commit_predecessors())
341        {
342            let mut ancestors = HashSet::new();
343            let mut work = historical_heads.keys().cloned().collect_vec();
344            while let Some(commit_id) = work.pop() {
345                if ancestors.contains(&commit_id) || parent_index_has_id(&commit_id) {
346                    continue;
347                }
348                if let Ok(commit) = store.get_commit(&commit_id) {
349                    work.extend(commit.parent_ids().iter().cloned());
350                }
351                ancestors.insert(commit_id);
352            }
353            ancestors
354        } else {
355            HashSet::new()
356        };
357        let commits = dag_walk::topo_order_reverse_ord_ok(
358            historical_heads
359                .iter()
360                .filter(|&(commit_id, _)| !parent_index_has_id(commit_id))
361                .map(|(commit_id, op_id)| get_commit_with_op(commit_id, op_id)),
362            |(CommitByCommitterTimestamp(commit), _)| commit.id().clone(),
363            |(CommitByCommitterTimestamp(commit), op_id)| {
364                let keep_predecessors =
365                    commits_to_keep_immediate_predecessors.contains(commit.id());
366                itertools::chain(
367                    commit.parent_ids(),
368                    keep_predecessors
369                        .then_some(&commit.store_commit().predecessors)
370                        .into_iter()
371                        .flatten(),
372                )
373                .filter(|&id| !parent_index_has_id(id))
374                .map(|commit_id| get_commit_with_op(commit_id, op_id))
375                .collect_vec()
376            },
377            |_| panic!("graph has cycle"),
378        )?;
379        for (CommitByCommitterTimestamp(commit), op_id) in commits.iter().rev() {
380            mutable_index.add_commit(commit).await.map_err(|source| {
381                DefaultIndexStoreError::IndexCommits {
382                    op_id: op_id.clone(),
383                    source,
384                }
385            })?;
386        }
387
388        let index = self.save_mutable_index(mutable_index, operation.id())?;
389        tracing::info!(?index, commits_count = commits.len(), "saved new index");
390
391        Ok(index)
392    }
393
394    /// Builds changed-path index for the specified operation.
395    ///
396    /// At most `max_commits` number of commits will be scanned from the latest
397    /// unindexed commit.
398    #[tracing::instrument(skip(self, store))]
399    pub async fn build_changed_path_index_at_operation(
400        &self,
401        op_id: &OperationId,
402        store: &Arc<Store>,
403        max_commits: u32,
404        // TODO: add progress callback?
405    ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
406        // Create directories in case the store was initialized by jj < 0.33.
407        self.ensure_base_dirs()
408            .map_err(DefaultIndexStoreError::SaveIndex)?;
409        let field_lengths = FieldLengths {
410            commit_id: store.commit_id_length(),
411            change_id: store.change_id_length(),
412        };
413        let index = self.load_index_at_operation(op_id, field_lengths)?;
414        let old_changed_paths = index.changed_paths();
415
416        // Distribute max_commits to contiguous pre/post ranges:
417        //   ..|pre|old_changed_paths|post|
418        //   (where pre.len() + post.len() <= max_commits)
419        let pre_start;
420        let pre_end;
421        let post_start;
422        let post_end;
423        if let Some(GlobalCommitPosition(pos)) = old_changed_paths.start_commit_pos() {
424            post_start = pos + old_changed_paths.num_commits();
425            assert!(post_start <= index.num_commits());
426            post_end = u32::saturating_add(post_start, max_commits).min(index.num_commits());
427            pre_start = u32::saturating_sub(pos, max_commits - (post_end - post_start));
428            pre_end = pos;
429        } else {
430            pre_start = u32::saturating_sub(index.num_commits(), max_commits);
431            pre_end = index.num_commits();
432            post_start = pre_end;
433            post_end = pre_end;
434        }
435
436        let to_index_err = |source| DefaultIndexStoreError::IndexCommits {
437            op_id: op_id.clone(),
438            source,
439        };
440        let index_commit = async |changed_paths: &mut CompositeChangedPathIndex,
441                                  pos: GlobalCommitPosition| {
442            assert_eq!(changed_paths.next_mutable_commit_pos(), Some(pos));
443            let commit_id = index.as_composite().commits().entry_by_pos(pos).commit_id();
444            let commit = store.get_commit_async(&commit_id).await?;
445            let paths = collect_changed_paths(&index, &commit).await?;
446            changed_paths.add_changed_paths(paths);
447            Ok(())
448        };
449
450        // Index pre range
451        let mut new_changed_paths =
452            CompositeChangedPathIndex::empty(GlobalCommitPosition(pre_start));
453        new_changed_paths.make_mutable();
454        tracing::info!(?pre_start, ?pre_end, "indexing changed paths in commits");
455        for pos in (pre_start..pre_end).map(GlobalCommitPosition) {
456            index_commit(&mut new_changed_paths, pos)
457                .await
458                .map_err(to_index_err)?;
459        }
460        new_changed_paths
461            .save_in(&self.changed_path_segments_dir())
462            .map_err(DefaultIndexStoreError::SaveIndex)?;
463
464        // Copy previously-indexed segments
465        new_changed_paths.append_segments(old_changed_paths);
466
467        // Index post range, which is usually empty
468        new_changed_paths.make_mutable();
469        tracing::info!(?post_start, ?post_end, "indexing changed paths in commits");
470        for pos in (post_start..post_end).map(GlobalCommitPosition) {
471            index_commit(&mut new_changed_paths, pos)
472                .await
473                .map_err(to_index_err)?;
474        }
475        new_changed_paths.maybe_squash_with_ancestors();
476        new_changed_paths
477            .save_in(&self.changed_path_segments_dir())
478            .map_err(DefaultIndexStoreError::SaveIndex)?;
479
480        // Update the operation link to point to the new segments
481        let commits = index.readonly_commits().clone();
482        let index = DefaultReadonlyIndex::from_segment(commits, new_changed_paths);
483        self.associate_index_with_operation(&index, op_id)
484            .map_err(|source| DefaultIndexStoreError::AssociateIndex {
485                op_id: op_id.to_owned(),
486                source,
487            })?;
488        Ok(index)
489    }
490
491    fn save_mutable_index(
492        &self,
493        index: DefaultMutableIndex,
494        op_id: &OperationId,
495    ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
496        // Create directories in case the store was initialized by jj < 0.33.
497        self.ensure_base_dirs()
498            .map_err(DefaultIndexStoreError::SaveIndex)?;
499        let (commits, mut changed_paths) = index.into_segment();
500        let commits = commits
501            .maybe_squash_with_ancestors()
502            .save_in(&self.commit_segments_dir())
503            .map_err(DefaultIndexStoreError::SaveIndex)?;
504        changed_paths.maybe_squash_with_ancestors();
505        changed_paths
506            .save_in(&self.changed_path_segments_dir())
507            .map_err(DefaultIndexStoreError::SaveIndex)?;
508        let index = DefaultReadonlyIndex::from_segment(commits, changed_paths);
509        self.associate_index_with_operation(&index, op_id)
510            .map_err(|source| DefaultIndexStoreError::AssociateIndex {
511                op_id: op_id.to_owned(),
512                source,
513            })?;
514        Ok(index)
515    }
516
517    /// Records a link from the given operation to the this index version.
518    fn associate_index_with_operation(
519        &self,
520        index: &DefaultReadonlyIndex,
521        op_id: &OperationId,
522    ) -> Result<(), PathError> {
523        let proto = crate::protos::default_index::SegmentControl {
524            commit_segment_id: index.readonly_commits().id().to_bytes(),
525            changed_path_start_commit_pos: index
526                .changed_paths()
527                .start_commit_pos()
528                .map(|GlobalCommitPosition(start)| start),
529            changed_path_segment_ids: index
530                .changed_paths()
531                .readonly_segments()
532                .iter()
533                .map(|segment| segment.id().to_bytes())
534                .collect(),
535        };
536        let dir = self.op_links_dir();
537        let mut temp_file = NamedTempFile::new_in(&dir).context(&dir)?;
538        let file = temp_file.as_file_mut();
539        file.write_all(&proto.encode_to_vec())
540            .context(temp_file.path())?;
541        let path = dir.join(op_id.hex());
542        persist_temp_file(temp_file, &path).context(&path)?;
543
544        // TODO: drop support for legacy operation link file in jj 0.39 or so
545        let dir = self.legacy_operations_dir();
546        let mut temp_file = NamedTempFile::new_in(&dir).context(&dir)?;
547        let file = temp_file.as_file_mut();
548        file.write_all(index.readonly_commits().id().hex().as_bytes())
549            .context(temp_file.path())?;
550        let path = dir.join(op_id.hex());
551        persist_temp_file(temp_file, &path).context(&path)?;
552        Ok(())
553    }
554}
555
556impl IndexStore for DefaultIndexStore {
557    fn name(&self) -> &str {
558        Self::name()
559    }
560
561    fn get_index_at_op(
562        &self,
563        op: &Operation,
564        store: &Arc<Store>,
565    ) -> Result<Box<dyn ReadonlyIndex>, IndexReadError> {
566        let field_lengths = FieldLengths {
567            commit_id: store.commit_id_length(),
568            change_id: store.change_id_length(),
569        };
570        let index = match self.load_index_at_operation(op.id(), field_lengths) {
571            Err(DefaultIndexStoreError::LoadAssociation(PathError { source: error, .. }))
572                if error.kind() == io::ErrorKind::NotFound =>
573            {
574                self.build_index_at_operation(op, store).block_on()
575            }
576            Err(DefaultIndexStoreError::LoadIndex(err)) if err.is_corrupt_or_not_found() => {
577                // If the index was corrupt (maybe it was written in a different format),
578                // we just reindex.
579                match &err {
580                    ReadonlyIndexLoadError::UnexpectedVersion {
581                        kind,
582                        found_version,
583                        expected_version,
584                    } => {
585                        eprintln!(
586                            "Found {kind} index format version {found_version}, expected version \
587                             {expected_version}. Reindexing..."
588                        );
589                    }
590                    ReadonlyIndexLoadError::Other { error, .. } => {
591                        eprintln!("{err} (maybe the format has changed): {error}. Reindexing...");
592                    }
593                }
594                self.reinit().map_err(|err| IndexReadError(err.into()))?;
595                self.build_index_at_operation(op, store).block_on()
596            }
597            result => result,
598        }
599        .map_err(|err| IndexReadError(err.into()))?;
600        Ok(Box::new(index))
601    }
602
603    fn write_index(
604        &self,
605        index: Box<dyn MutableIndex>,
606        op: &Operation,
607    ) -> Result<Box<dyn ReadonlyIndex>, IndexWriteError> {
608        let index: Box<DefaultMutableIndex> = index
609            .downcast()
610            .expect("index to merge in must be a DefaultMutableIndex");
611        let index = self
612            .save_mutable_index(*index, op.id())
613            .map_err(|err| IndexWriteError(err.into()))?;
614        Ok(Box::new(index))
615    }
616}