jj_lib/default_index/
store.rs

1// Copyright 2023 The Jujutsu Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![expect(missing_docs)]
16
17use std::collections::HashMap;
18use std::collections::HashSet;
19use std::fs;
20use std::io;
21use std::io::Write as _;
22use std::path::Path;
23use std::path::PathBuf;
24use std::slice;
25use std::sync::Arc;
26
27use itertools::Itertools as _;
28use pollster::FutureExt as _;
29use prost::Message as _;
30use tempfile::NamedTempFile;
31use thiserror::Error;
32
33use super::changed_path::ChangedPathIndexSegmentId;
34use super::changed_path::CompositeChangedPathIndex;
35use super::changed_path::collect_changed_paths;
36use super::composite::AsCompositeIndex as _;
37use super::composite::CommitIndexSegmentId;
38use super::entry::GlobalCommitPosition;
39use super::mutable::DefaultMutableIndex;
40use super::readonly::DefaultReadonlyIndex;
41use super::readonly::FieldLengths;
42use super::readonly::ReadonlyCommitIndexSegment;
43use super::readonly::ReadonlyIndexLoadError;
44use crate::backend::BackendError;
45use crate::backend::BackendInitError;
46use crate::backend::CommitId;
47use crate::commit::CommitByCommitterTimestamp;
48use crate::dag_walk;
49use crate::file_util;
50use crate::file_util::IoResultExt as _;
51use crate::file_util::PathError;
52use crate::file_util::persist_temp_file;
53use crate::index::IndexStore;
54use crate::index::IndexStoreError;
55use crate::index::IndexStoreResult;
56use crate::index::MutableIndex;
57use crate::index::ReadonlyIndex;
58use crate::object_id::ObjectId as _;
59use crate::op_store::OpStoreError;
60use crate::op_store::OperationId;
61use crate::op_walk;
62use crate::operation::Operation;
63use crate::store::Store;
64
65// BLAKE2b-512 hash length in hex string
66const SEGMENT_FILE_NAME_LENGTH: usize = 64 * 2;
67
68/// Error that may occur during `DefaultIndexStore` initialization.
69#[derive(Debug, Error)]
70#[error("Failed to initialize index store")]
71pub struct DefaultIndexStoreInitError(#[from] pub PathError);
72
73impl From<DefaultIndexStoreInitError> for BackendInitError {
74    fn from(err: DefaultIndexStoreInitError) -> Self {
75        Self(err.into())
76    }
77}
78
79#[derive(Debug, Error)]
80pub enum DefaultIndexStoreError {
81    #[error("Failed to associate index files with an operation {op_id}")]
82    AssociateIndex {
83        op_id: OperationId,
84        source: PathError,
85    },
86    #[error("Failed to load associated index file names")]
87    LoadAssociation(#[source] PathError),
88    #[error(transparent)]
89    LoadIndex(ReadonlyIndexLoadError),
90    #[error("Failed to write index file")]
91    SaveIndex(#[source] PathError),
92    #[error("Failed to index commits at operation {op_id}")]
93    IndexCommits {
94        op_id: OperationId,
95        source: BackendError,
96    },
97    #[error(transparent)]
98    OpStore(#[from] OpStoreError),
99}
100
101#[derive(Debug)]
102pub struct DefaultIndexStore {
103    dir: PathBuf,
104}
105
106impl DefaultIndexStore {
107    pub fn name() -> &'static str {
108        "default"
109    }
110
111    pub fn init(dir: &Path) -> Result<Self, DefaultIndexStoreInitError> {
112        let store = Self {
113            dir: dir.to_owned(),
114        };
115        store.ensure_base_dirs()?;
116        Ok(store)
117    }
118
119    pub fn load(dir: &Path) -> Self {
120        Self {
121            dir: dir.to_owned(),
122        }
123    }
124
125    pub fn reinit(&self) -> Result<(), DefaultIndexStoreInitError> {
126        // Create base directories in case the store was initialized by old jj.
127        self.ensure_base_dirs()?;
128        // Remove all operation links to trigger rebuilding.
129        file_util::remove_dir_contents(&self.op_links_dir())?;
130        file_util::remove_dir_contents(&self.legacy_operations_dir())?;
131        // Remove index segments to save disk space. If raced, new segment file
132        // will be created by the other process.
133        file_util::remove_dir_contents(&self.commit_segments_dir())?;
134        file_util::remove_dir_contents(&self.changed_path_segments_dir())?;
135        // jj <= 0.14 created segment files in the top directory
136        for entry in self.dir.read_dir().context(&self.dir)? {
137            let entry = entry.context(&self.dir)?;
138            let path = entry.path();
139            if path.file_name().unwrap().len() != SEGMENT_FILE_NAME_LENGTH {
140                // Skip "type" file, "operations" directory, etc.
141                continue;
142            }
143            fs::remove_file(&path).context(&path)?;
144        }
145        Ok(())
146    }
147
148    fn ensure_base_dirs(&self) -> Result<(), PathError> {
149        for dir in [
150            self.op_links_dir(),
151            self.legacy_operations_dir(),
152            self.commit_segments_dir(),
153            self.changed_path_segments_dir(),
154        ] {
155            file_util::create_or_reuse_dir(&dir).context(&dir)?;
156        }
157        Ok(())
158    }
159
160    /// Directory for mapping from operations to segments. (jj >= 0.33)
161    fn op_links_dir(&self) -> PathBuf {
162        self.dir.join("op_links")
163    }
164
165    /// Directory for mapping from operations to commit segments. (jj < 0.33)
166    fn legacy_operations_dir(&self) -> PathBuf {
167        self.dir.join("operations")
168    }
169
170    /// Directory for commit segment files.
171    fn commit_segments_dir(&self) -> PathBuf {
172        self.dir.join("segments")
173    }
174
175    /// Directory for changed-path segment files.
176    fn changed_path_segments_dir(&self) -> PathBuf {
177        self.dir.join("changed_paths")
178    }
179
180    fn load_index_at_operation(
181        &self,
182        op_id: &OperationId,
183        lengths: FieldLengths,
184    ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
185        let commit_segment_id;
186        let changed_path_start_commit_pos;
187        let changed_path_segment_ids;
188        let op_link_file = self.op_links_dir().join(op_id.hex());
189        match fs::read(&op_link_file).context(&op_link_file) {
190            Ok(data) => {
191                let proto = crate::protos::default_index::SegmentControl::decode(&*data)
192                    .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))
193                    .context(&op_link_file)
194                    .map_err(DefaultIndexStoreError::LoadAssociation)?;
195                commit_segment_id = CommitIndexSegmentId::new(proto.commit_segment_id);
196                changed_path_start_commit_pos = proto
197                    .changed_path_start_commit_pos
198                    .map(GlobalCommitPosition);
199                changed_path_segment_ids = proto
200                    .changed_path_segment_ids
201                    .into_iter()
202                    .map(ChangedPathIndexSegmentId::new)
203                    .collect_vec();
204            }
205            // TODO: drop support for legacy operation link file in jj 0.39 or so
206            Err(PathError { source: error, .. }) if error.kind() == io::ErrorKind::NotFound => {
207                let op_id_file = self.legacy_operations_dir().join(op_id.hex());
208                let data = fs::read(&op_id_file)
209                    .context(&op_id_file)
210                    .map_err(DefaultIndexStoreError::LoadAssociation)?;
211                commit_segment_id = CommitIndexSegmentId::try_from_hex(&data)
212                    .ok_or_else(|| {
213                        io::Error::new(io::ErrorKind::InvalidData, "file name is not valid hex")
214                    })
215                    .context(&op_id_file)
216                    .map_err(DefaultIndexStoreError::LoadAssociation)?;
217                changed_path_start_commit_pos = None;
218                changed_path_segment_ids = vec![];
219            }
220            Err(err) => return Err(DefaultIndexStoreError::LoadAssociation(err)),
221        };
222
223        let commits = ReadonlyCommitIndexSegment::load(
224            &self.commit_segments_dir(),
225            commit_segment_id,
226            lengths,
227        )
228        .map_err(DefaultIndexStoreError::LoadIndex)?;
229        // TODO: lazy load or mmap?
230        let changed_paths = if let Some(start_commit_pos) = changed_path_start_commit_pos {
231            CompositeChangedPathIndex::load(
232                &self.changed_path_segments_dir(),
233                start_commit_pos,
234                &changed_path_segment_ids,
235            )
236            .map_err(DefaultIndexStoreError::LoadIndex)?
237        } else {
238            CompositeChangedPathIndex::null()
239        };
240        Ok(DefaultReadonlyIndex::from_segment(commits, changed_paths))
241    }
242
243    /// Rebuilds index for the given `operation`.
244    ///
245    /// The index to be built will be calculated from one of the ancestor
246    /// operations if exists. Use `reinit()` to rebuild index from scratch.
247    #[tracing::instrument(skip(self, store))]
248    pub async fn build_index_at_operation(
249        &self,
250        operation: &Operation,
251        store: &Arc<Store>,
252    ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
253        tracing::info!("scanning operations to index");
254        let op_links_dir = self.op_links_dir();
255        let legacy_operations_dir = self.legacy_operations_dir();
256        let field_lengths = FieldLengths {
257            commit_id: store.commit_id_length(),
258            change_id: store.change_id_length(),
259        };
260        // Pick the latest existing ancestor operation as the parent segment.
261        let mut unindexed_ops = Vec::new();
262        let mut parent_op = None;
263        for op in op_walk::walk_ancestors(slice::from_ref(operation)) {
264            let op = op?;
265            if op_links_dir.join(op.id().hex()).is_file()
266                || legacy_operations_dir.join(op.id().hex()).is_file()
267            {
268                parent_op = Some(op);
269                break;
270            } else {
271                unindexed_ops.push(op);
272            }
273        }
274        let ops_to_visit = if let Some(op) = &parent_op {
275            // There may be concurrent ops, so revisit from the head. The parent
276            // op is usually shallow if existed.
277            op_walk::walk_ancestors_range(slice::from_ref(operation), slice::from_ref(op))
278                .try_collect()?
279        } else {
280            unindexed_ops
281        };
282        tracing::info!(
283            ops_count = ops_to_visit.len(),
284            "collecting head commits to index"
285        );
286        let mut historical_heads: HashMap<CommitId, OperationId> = HashMap::new();
287        for op in &ops_to_visit {
288            for commit_id in itertools::chain(
289                op.all_referenced_commit_ids(),
290                op.view()?.all_referenced_commit_ids(),
291            ) {
292                if !historical_heads.contains_key(commit_id) {
293                    historical_heads.insert(commit_id.clone(), op.id().clone());
294                }
295            }
296        }
297        let mut mutable_index;
298        let maybe_parent_index;
299        match &parent_op {
300            None => {
301                mutable_index = DefaultMutableIndex::full(field_lengths);
302                maybe_parent_index = None;
303            }
304            Some(op) => {
305                let parent_index = self.load_index_at_operation(op.id(), field_lengths)?;
306                mutable_index = parent_index.start_modification();
307                maybe_parent_index = Some(parent_index);
308            }
309        }
310
311        tracing::info!(
312            ?maybe_parent_index,
313            heads_count = historical_heads.len(),
314            "indexing commits reachable from historical heads"
315        );
316        // Build a list of ancestors of heads where parents come after the
317        // commit itself.
318        let parent_index_has_id = |id: &CommitId| {
319            maybe_parent_index
320                .as_ref()
321                .is_some_and(|index| index.has_id_impl(id))
322        };
323        let get_commit_with_op = |commit_id: &CommitId, op_id: &OperationId| {
324            let op_id = op_id.clone();
325            match store.get_commit(commit_id) {
326                // Propagate head's op_id to report possible source of an error.
327                // The op_id doesn't have to be included in the sort key, but
328                // that wouldn't matter since the commit should be unique.
329                Ok(commit) => Ok((CommitByCommitterTimestamp(commit), op_id)),
330                Err(source) => Err(DefaultIndexStoreError::IndexCommits { op_id, source }),
331            }
332        };
333        // Retain immediate predecessors if legacy operation exists. Some
334        // commands (e.g. squash into grandparent) may leave transitive
335        // predecessors, which aren't visible to any views.
336        // TODO: delete this workaround with commit.predecessors.
337        let commits_to_keep_immediate_predecessors = if ops_to_visit
338            .iter()
339            .any(|op| !op.stores_commit_predecessors())
340        {
341            let mut ancestors = HashSet::new();
342            let mut work = historical_heads.keys().cloned().collect_vec();
343            while let Some(commit_id) = work.pop() {
344                if ancestors.contains(&commit_id) || parent_index_has_id(&commit_id) {
345                    continue;
346                }
347                if let Ok(commit) = store.get_commit(&commit_id) {
348                    work.extend(commit.parent_ids().iter().cloned());
349                }
350                ancestors.insert(commit_id);
351            }
352            ancestors
353        } else {
354            HashSet::new()
355        };
356        let commits = dag_walk::topo_order_reverse_ord_ok(
357            historical_heads
358                .iter()
359                .filter(|&(commit_id, _)| !parent_index_has_id(commit_id))
360                .map(|(commit_id, op_id)| get_commit_with_op(commit_id, op_id)),
361            |(CommitByCommitterTimestamp(commit), _)| commit.id().clone(),
362            |(CommitByCommitterTimestamp(commit), op_id)| {
363                let keep_predecessors =
364                    commits_to_keep_immediate_predecessors.contains(commit.id());
365                itertools::chain(
366                    commit.parent_ids(),
367                    keep_predecessors
368                        .then_some(&commit.store_commit().predecessors)
369                        .into_iter()
370                        .flatten(),
371                )
372                .filter(|&id| !parent_index_has_id(id))
373                .map(|commit_id| get_commit_with_op(commit_id, op_id))
374                .collect_vec()
375            },
376            |_| panic!("graph has cycle"),
377        )?;
378        for (CommitByCommitterTimestamp(commit), op_id) in commits.iter().rev() {
379            mutable_index.add_commit(commit).await.map_err(|source| {
380                DefaultIndexStoreError::IndexCommits {
381                    op_id: op_id.clone(),
382                    source,
383                }
384            })?;
385        }
386
387        let index = self.save_mutable_index(mutable_index, operation.id())?;
388        tracing::info!(?index, commits_count = commits.len(), "saved new index");
389
390        Ok(index)
391    }
392
393    /// Builds changed-path index for the specified operation.
394    ///
395    /// At most `max_commits` number of commits will be scanned from the latest
396    /// unindexed commit.
397    #[tracing::instrument(skip(self, store))]
398    pub async fn build_changed_path_index_at_operation(
399        &self,
400        op_id: &OperationId,
401        store: &Arc<Store>,
402        max_commits: u32,
403        // TODO: add progress callback?
404    ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
405        // Create directories in case the store was initialized by jj < 0.33.
406        self.ensure_base_dirs()
407            .map_err(DefaultIndexStoreError::SaveIndex)?;
408        let field_lengths = FieldLengths {
409            commit_id: store.commit_id_length(),
410            change_id: store.change_id_length(),
411        };
412        let index = self.load_index_at_operation(op_id, field_lengths)?;
413        let old_changed_paths = index.changed_paths();
414
415        // Distribute max_commits to contiguous pre/post ranges:
416        //   ..|pre|old_changed_paths|post|
417        //   (where pre.len() + post.len() <= max_commits)
418        let pre_start;
419        let pre_end;
420        let post_start;
421        let post_end;
422        if let Some(GlobalCommitPosition(pos)) = old_changed_paths.start_commit_pos() {
423            post_start = pos + old_changed_paths.num_commits();
424            assert!(post_start <= index.num_commits());
425            post_end = u32::saturating_add(post_start, max_commits).min(index.num_commits());
426            pre_start = u32::saturating_sub(pos, max_commits - (post_end - post_start));
427            pre_end = pos;
428        } else {
429            pre_start = u32::saturating_sub(index.num_commits(), max_commits);
430            pre_end = index.num_commits();
431            post_start = pre_end;
432            post_end = pre_end;
433        }
434
435        let to_index_err = |source| DefaultIndexStoreError::IndexCommits {
436            op_id: op_id.clone(),
437            source,
438        };
439        let index_commit = async |changed_paths: &mut CompositeChangedPathIndex,
440                                  pos: GlobalCommitPosition| {
441            assert_eq!(changed_paths.next_mutable_commit_pos(), Some(pos));
442            let commit_id = index.as_composite().commits().entry_by_pos(pos).commit_id();
443            let commit = store.get_commit_async(&commit_id).await?;
444            let paths = collect_changed_paths(&index, &commit).await?;
445            changed_paths.add_changed_paths(paths);
446            Ok(())
447        };
448
449        // Index pre range
450        let mut new_changed_paths =
451            CompositeChangedPathIndex::empty(GlobalCommitPosition(pre_start));
452        new_changed_paths.make_mutable();
453        tracing::info!(?pre_start, ?pre_end, "indexing changed paths in commits");
454        for pos in (pre_start..pre_end).map(GlobalCommitPosition) {
455            index_commit(&mut new_changed_paths, pos)
456                .await
457                .map_err(to_index_err)?;
458        }
459        new_changed_paths
460            .save_in(&self.changed_path_segments_dir())
461            .map_err(DefaultIndexStoreError::SaveIndex)?;
462
463        // Copy previously-indexed segments
464        new_changed_paths.append_segments(old_changed_paths);
465
466        // Index post range, which is usually empty
467        new_changed_paths.make_mutable();
468        tracing::info!(?post_start, ?post_end, "indexing changed paths in commits");
469        for pos in (post_start..post_end).map(GlobalCommitPosition) {
470            index_commit(&mut new_changed_paths, pos)
471                .await
472                .map_err(to_index_err)?;
473        }
474        new_changed_paths.maybe_squash_with_ancestors();
475        new_changed_paths
476            .save_in(&self.changed_path_segments_dir())
477            .map_err(DefaultIndexStoreError::SaveIndex)?;
478
479        // Update the operation link to point to the new segments
480        let commits = index.readonly_commits().clone();
481        let index = DefaultReadonlyIndex::from_segment(commits, new_changed_paths);
482        self.associate_index_with_operation(&index, op_id)
483            .map_err(|source| DefaultIndexStoreError::AssociateIndex {
484                op_id: op_id.to_owned(),
485                source,
486            })?;
487        Ok(index)
488    }
489
490    fn save_mutable_index(
491        &self,
492        index: DefaultMutableIndex,
493        op_id: &OperationId,
494    ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
495        // Create directories in case the store was initialized by jj < 0.33.
496        self.ensure_base_dirs()
497            .map_err(DefaultIndexStoreError::SaveIndex)?;
498        let (commits, mut changed_paths) = index.into_segment();
499        let commits = commits
500            .maybe_squash_with_ancestors()
501            .save_in(&self.commit_segments_dir())
502            .map_err(DefaultIndexStoreError::SaveIndex)?;
503        changed_paths.maybe_squash_with_ancestors();
504        changed_paths
505            .save_in(&self.changed_path_segments_dir())
506            .map_err(DefaultIndexStoreError::SaveIndex)?;
507        let index = DefaultReadonlyIndex::from_segment(commits, changed_paths);
508        self.associate_index_with_operation(&index, op_id)
509            .map_err(|source| DefaultIndexStoreError::AssociateIndex {
510                op_id: op_id.to_owned(),
511                source,
512            })?;
513        Ok(index)
514    }
515
516    /// Records a link from the given operation to the this index version.
517    fn associate_index_with_operation(
518        &self,
519        index: &DefaultReadonlyIndex,
520        op_id: &OperationId,
521    ) -> Result<(), PathError> {
522        let proto = crate::protos::default_index::SegmentControl {
523            commit_segment_id: index.readonly_commits().id().to_bytes(),
524            changed_path_start_commit_pos: index
525                .changed_paths()
526                .start_commit_pos()
527                .map(|GlobalCommitPosition(start)| start),
528            changed_path_segment_ids: index
529                .changed_paths()
530                .readonly_segments()
531                .iter()
532                .map(|segment| segment.id().to_bytes())
533                .collect(),
534        };
535        let dir = self.op_links_dir();
536        let mut temp_file = NamedTempFile::new_in(&dir).context(&dir)?;
537        let file = temp_file.as_file_mut();
538        file.write_all(&proto.encode_to_vec())
539            .context(temp_file.path())?;
540        let path = dir.join(op_id.hex());
541        persist_temp_file(temp_file, &path).context(&path)?;
542
543        // TODO: drop support for legacy operation link file in jj 0.39 or so
544        let dir = self.legacy_operations_dir();
545        let mut temp_file = NamedTempFile::new_in(&dir).context(&dir)?;
546        let file = temp_file.as_file_mut();
547        file.write_all(index.readonly_commits().id().hex().as_bytes())
548            .context(temp_file.path())?;
549        let path = dir.join(op_id.hex());
550        persist_temp_file(temp_file, &path).context(&path)?;
551        Ok(())
552    }
553}
554
555impl IndexStore for DefaultIndexStore {
556    fn name(&self) -> &str {
557        Self::name()
558    }
559
560    fn get_index_at_op(
561        &self,
562        op: &Operation,
563        store: &Arc<Store>,
564    ) -> IndexStoreResult<Box<dyn ReadonlyIndex>> {
565        let field_lengths = FieldLengths {
566            commit_id: store.commit_id_length(),
567            change_id: store.change_id_length(),
568        };
569        let index = match self.load_index_at_operation(op.id(), field_lengths) {
570            Err(DefaultIndexStoreError::LoadAssociation(PathError { source: error, .. }))
571                if error.kind() == io::ErrorKind::NotFound =>
572            {
573                self.build_index_at_operation(op, store).block_on()
574            }
575            Err(DefaultIndexStoreError::LoadIndex(err)) if err.is_corrupt_or_not_found() => {
576                // If the index was corrupt (maybe it was written in a different format),
577                // we just reindex.
578                match &err {
579                    ReadonlyIndexLoadError::UnexpectedVersion {
580                        kind,
581                        found_version,
582                        expected_version,
583                    } => {
584                        eprintln!(
585                            "Found {kind} index format version {found_version}, expected version \
586                             {expected_version}. Reindexing..."
587                        );
588                    }
589                    ReadonlyIndexLoadError::Other { error, .. } => {
590                        eprintln!("{err} (maybe the format has changed): {error}. Reindexing...");
591                    }
592                }
593                self.reinit()
594                    .map_err(|err| IndexStoreError::Read(err.into()))?;
595                self.build_index_at_operation(op, store).block_on()
596            }
597            result => result,
598        }
599        .map_err(|err| IndexStoreError::Read(err.into()))?;
600        Ok(Box::new(index))
601    }
602
603    fn write_index(
604        &self,
605        index: Box<dyn MutableIndex>,
606        op: &Operation,
607    ) -> IndexStoreResult<Box<dyn ReadonlyIndex>> {
608        let index: Box<DefaultMutableIndex> = index
609            .downcast()
610            .expect("index to merge in must be a DefaultMutableIndex");
611        let index = self
612            .save_mutable_index(*index, op.id())
613            .map_err(|err| IndexStoreError::Write(err.into()))?;
614        Ok(Box::new(index))
615    }
616}