Skip to main content

jj_lib/default_index/
store.rs

1// Copyright 2023 The Jujutsu Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![expect(missing_docs)]
16
17use std::collections::HashMap;
18use std::fs;
19use std::future;
20use std::io;
21use std::io::Write as _;
22use std::path::Path;
23use std::path::PathBuf;
24use std::pin::pin;
25use std::slice;
26use std::sync::Arc;
27
28use async_trait::async_trait;
29use futures::StreamExt as _;
30use futures::TryStreamExt as _;
31use futures::stream;
32use itertools::Itertools as _;
33use prost::Message as _;
34use tempfile::NamedTempFile;
35use thiserror::Error;
36
37use super::changed_path::ChangedPathIndexSegmentId;
38use super::changed_path::CompositeChangedPathIndex;
39use super::changed_path::collect_changed_paths;
40use super::composite::AsCompositeIndex as _;
41use super::composite::CommitIndexSegmentId;
42use super::entry::GlobalCommitPosition;
43use super::mutable::DefaultMutableIndex;
44use super::readonly::DefaultReadonlyIndex;
45use super::readonly::FieldLengths;
46use super::readonly::ReadonlyCommitIndexSegment;
47use super::readonly::ReadonlyIndexLoadError;
48use crate::backend::BackendError;
49use crate::backend::BackendInitError;
50use crate::backend::CommitId;
51use crate::commit::CommitByCommitterTimestamp;
52use crate::dag_walk_async;
53use crate::file_util;
54use crate::file_util::IoResultExt as _;
55use crate::file_util::PathError;
56use crate::file_util::persist_temp_file;
57use crate::index::IndexStore;
58use crate::index::IndexStoreError;
59use crate::index::IndexStoreResult;
60use crate::index::MutableIndex;
61use crate::index::ReadonlyIndex;
62use crate::object_id::ObjectId as _;
63use crate::op_store::OpStoreError;
64use crate::op_store::OperationId;
65use crate::op_walk;
66use crate::operation::Operation;
67use crate::store::Store;
68
69// BLAKE2b-512 hash length in hex string
70const SEGMENT_FILE_NAME_LENGTH: usize = 64 * 2;
71
72/// Error that may occur during `DefaultIndexStore` initialization.
73#[derive(Debug, Error)]
74#[error("Failed to initialize index store")]
75pub struct DefaultIndexStoreInitError(#[from] pub PathError);
76
77impl From<DefaultIndexStoreInitError> for BackendInitError {
78    fn from(err: DefaultIndexStoreInitError) -> Self {
79        Self(err.into())
80    }
81}
82
83#[derive(Debug, Error)]
84pub enum DefaultIndexStoreError {
85    #[error("Failed to associate index files with an operation {op_id}")]
86    AssociateIndex {
87        op_id: OperationId,
88        source: PathError,
89    },
90    #[error("Failed to load associated index file names")]
91    LoadAssociation(#[source] PathError),
92    #[error(transparent)]
93    LoadIndex(ReadonlyIndexLoadError),
94    #[error("Failed to write index file")]
95    SaveIndex(#[source] PathError),
96    #[error("Failed to index commits at operation {op_id}")]
97    IndexCommits {
98        op_id: OperationId,
99        source: BackendError,
100    },
101    #[error(transparent)]
102    OpStore(#[from] OpStoreError),
103}
104
105#[derive(Debug)]
106pub struct DefaultIndexStore {
107    dir: PathBuf,
108}
109
110impl DefaultIndexStore {
111    pub fn name() -> &'static str {
112        "default"
113    }
114
115    pub fn init(dir: &Path) -> Result<Self, DefaultIndexStoreInitError> {
116        let store = Self {
117            dir: dir.to_owned(),
118        };
119        store.ensure_base_dirs()?;
120        Ok(store)
121    }
122
123    pub fn load(dir: &Path) -> Self {
124        Self {
125            dir: dir.to_owned(),
126        }
127    }
128
129    pub fn reinit(&self) -> Result<(), DefaultIndexStoreInitError> {
130        // Create base directories in case the store was initialized by old jj.
131        self.ensure_base_dirs()?;
132        // Remove all operation links to trigger rebuilding.
133        file_util::remove_dir_contents(&self.op_links_dir())?;
134        let legacy_operations_dir = self.dir.join("operations"); // jj < 0.33
135        if legacy_operations_dir.exists() {
136            file_util::remove_dir_contents(&legacy_operations_dir)?;
137            fs::remove_dir(&legacy_operations_dir).context(&legacy_operations_dir)?;
138        }
139        // Remove index segments to save disk space. If raced, new segment file
140        // will be created by the other process.
141        file_util::remove_dir_contents(&self.commit_segments_dir())?;
142        file_util::remove_dir_contents(&self.changed_path_segments_dir())?;
143        // jj <= 0.14 created segment files in the top directory
144        for entry in self.dir.read_dir().context(&self.dir)? {
145            let entry = entry.context(&self.dir)?;
146            let path = entry.path();
147            if path.file_name().unwrap().len() != SEGMENT_FILE_NAME_LENGTH {
148                // Skip "type" file, "operations" directory, etc.
149                continue;
150            }
151            fs::remove_file(&path).context(&path)?;
152        }
153        Ok(())
154    }
155
156    fn ensure_base_dirs(&self) -> Result<(), PathError> {
157        for dir in [
158            self.op_links_dir(),
159            self.commit_segments_dir(),
160            self.changed_path_segments_dir(),
161        ] {
162            file_util::create_or_reuse_dir(&dir).context(&dir)?;
163        }
164        Ok(())
165    }
166
167    /// Directory for mapping from operations to segments. (jj >= 0.33)
168    fn op_links_dir(&self) -> PathBuf {
169        self.dir.join("op_links")
170    }
171
172    /// Directory for commit segment files.
173    fn commit_segments_dir(&self) -> PathBuf {
174        self.dir.join("segments")
175    }
176
177    /// Directory for changed-path segment files.
178    fn changed_path_segments_dir(&self) -> PathBuf {
179        self.dir.join("changed_paths")
180    }
181
182    fn load_index_at_operation(
183        &self,
184        op_id: &OperationId,
185        lengths: FieldLengths,
186    ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
187        let op_link_file = self.op_links_dir().join(op_id.hex());
188        let data = fs::read(&op_link_file)
189            .context(&op_link_file)
190            .map_err(DefaultIndexStoreError::LoadAssociation)?;
191        let proto = crate::protos::default_index::SegmentControl::decode(&*data)
192            .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))
193            .context(&op_link_file)
194            .map_err(DefaultIndexStoreError::LoadAssociation)?;
195        let commit_segment_id = CommitIndexSegmentId::new(proto.commit_segment_id);
196        let changed_path_start_commit_pos = proto
197            .changed_path_start_commit_pos
198            .map(GlobalCommitPosition);
199        let changed_path_segment_ids = proto
200            .changed_path_segment_ids
201            .into_iter()
202            .map(ChangedPathIndexSegmentId::new)
203            .collect_vec();
204
205        let commits = ReadonlyCommitIndexSegment::load(
206            &self.commit_segments_dir(),
207            commit_segment_id,
208            lengths,
209        )
210        .map_err(DefaultIndexStoreError::LoadIndex)?;
211        // TODO: lazy load or mmap?
212        let changed_paths = if let Some(start_commit_pos) = changed_path_start_commit_pos {
213            CompositeChangedPathIndex::load(
214                &self.changed_path_segments_dir(),
215                start_commit_pos,
216                &changed_path_segment_ids,
217            )
218            .map_err(DefaultIndexStoreError::LoadIndex)?
219        } else {
220            CompositeChangedPathIndex::null()
221        };
222        Ok(DefaultReadonlyIndex::from_segment(commits, changed_paths))
223    }
224
225    /// Rebuilds index for the given `operation`.
226    ///
227    /// The index to be built will be calculated from one of the ancestor
228    /// operations if exists. Use `reinit()` to rebuild index from scratch.
229    #[tracing::instrument(skip(self, store))]
230    pub async fn build_index_at_operation(
231        &self,
232        operation: &Operation,
233        store: &Arc<Store>,
234    ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
235        tracing::info!("scanning operations to index");
236        let op_links_dir = self.op_links_dir();
237        let field_lengths = FieldLengths {
238            commit_id: store.commit_id_length(),
239            change_id: store.change_id_length(),
240        };
241        // Pick the latest existing ancestor operation as the parent segment.
242        let mut unindexed_ops = Vec::new();
243        let mut parent_op = None;
244        let mut ancestors = pin!(op_walk::walk_ancestors(slice::from_ref(operation)));
245        while let Some(op) = ancestors.next().await {
246            let op = op?;
247            if op_links_dir.join(op.id().hex()).is_file() {
248                parent_op = Some(op);
249                break;
250            } else {
251                unindexed_ops.push(op);
252            }
253        }
254        let ops_to_visit = if let Some(op) = &parent_op {
255            // There may be concurrent ops, so revisit from the head. The parent
256            // op is usually shallow if existed.
257            op_walk::walk_ancestors_range(slice::from_ref(operation), slice::from_ref(op))
258                .try_collect()
259                .await?
260        } else {
261            unindexed_ops
262        };
263        tracing::info!(
264            ops_count = ops_to_visit.len(),
265            "collecting head commits to index"
266        );
267        let mut historical_heads: HashMap<CommitId, OperationId> = HashMap::new();
268        for op in &ops_to_visit {
269            for commit_id in itertools::chain(
270                op.all_referenced_commit_ids(),
271                op.view().await?.all_referenced_commit_ids(),
272            ) {
273                if !historical_heads.contains_key(commit_id) {
274                    historical_heads.insert(commit_id.clone(), op.id().clone());
275                }
276            }
277        }
278        let mut mutable_index;
279        let maybe_parent_index;
280        match &parent_op {
281            None => {
282                mutable_index = DefaultMutableIndex::full(field_lengths);
283                maybe_parent_index = None;
284            }
285            Some(op) => {
286                let parent_index = self.load_index_at_operation(op.id(), field_lengths)?;
287                mutable_index = parent_index.start_modification();
288                maybe_parent_index = Some(parent_index);
289            }
290        }
291
292        tracing::info!(
293            ?maybe_parent_index,
294            heads_count = historical_heads.len(),
295            "indexing commits reachable from historical heads"
296        );
297        // Build a list of ancestors of heads where parents come after the
298        // commit itself.
299        let parent_index_has_id = |id: &CommitId| {
300            maybe_parent_index
301                .as_ref()
302                .is_some_and(|index| index.has_id_impl(id))
303        };
304        let get_commit_with_op = async |commit_id: &CommitId, op_id: &OperationId| {
305            let op_id = op_id.clone();
306            match store.get_commit_async(commit_id).await {
307                // Propagate head's op_id to report possible source of an error.
308                // The op_id doesn't have to be included in the sort key, but
309                // that wouldn't matter since the commit should be unique.
310                Ok(commit) => Ok((CommitByCommitterTimestamp(commit), op_id)),
311                Err(source) => Err(DefaultIndexStoreError::IndexCommits { op_id, source }),
312            }
313        };
314        let commits = dag_walk_async::topo_order_reverse_ord(
315            stream::iter(&historical_heads)
316                .filter(|&(commit_id, _)| future::ready(!parent_index_has_id(commit_id)))
317                .map(|(commit_id, op_id)| get_commit_with_op(commit_id, op_id))
318                .buffered(store.concurrency())
319                .collect::<Vec<_>>()
320                .await,
321            |(CommitByCommitterTimestamp(commit), _)| commit.id().clone(),
322            async |(CommitByCommitterTimestamp(commit), op_id)| {
323                stream::iter(commit.parent_ids())
324                    .filter(|&id| future::ready(!parent_index_has_id(id)))
325                    .map(|commit_id| get_commit_with_op(commit_id, op_id))
326                    .buffered(store.concurrency())
327                    .collect::<Vec<_>>()
328                    .await
329            },
330            |_| panic!("graph has cycle"),
331        )
332        .await?;
333        for (CommitByCommitterTimestamp(commit), op_id) in commits.iter().rev() {
334            mutable_index.add_commit(commit).await.map_err(|source| {
335                DefaultIndexStoreError::IndexCommits {
336                    op_id: op_id.clone(),
337                    source,
338                }
339            })?;
340        }
341
342        let index = self.save_mutable_index(mutable_index, operation.id())?;
343        tracing::info!(?index, commits_count = commits.len(), "saved new index");
344
345        Ok(index)
346    }
347
348    /// Builds changed-path index for the specified operation.
349    ///
350    /// At most `max_commits` number of commits will be scanned from the latest
351    /// unindexed commit.
352    #[tracing::instrument(skip(self, store, progress_callback))]
353    pub async fn build_changed_path_index_at_operation(
354        &self,
355        op_id: &OperationId,
356        store: &Arc<Store>,
357        max_commits: u32,
358        mut progress_callback: impl FnMut(&DefaultChangedPathIndexProgress),
359    ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
360        // Create directories in case the store was initialized by jj < 0.33.
361        self.ensure_base_dirs()
362            .map_err(DefaultIndexStoreError::SaveIndex)?;
363        let field_lengths = FieldLengths {
364            commit_id: store.commit_id_length(),
365            change_id: store.change_id_length(),
366        };
367        let index = self.load_index_at_operation(op_id, field_lengths)?;
368        let old_changed_paths = index.changed_paths();
369
370        // Distribute max_commits to contiguous pre/post ranges:
371        //   ..|pre|old_changed_paths|post|
372        //   (where pre.len() + post.len() <= max_commits)
373        let pre_start;
374        let pre_end;
375        let post_start;
376        let post_end;
377        if let Some(GlobalCommitPosition(pos)) = old_changed_paths.start_commit_pos() {
378            post_start = pos + old_changed_paths.num_commits();
379            assert!(post_start <= index.num_commits());
380            post_end = u32::saturating_add(post_start, max_commits).min(index.num_commits());
381            pre_start = u32::saturating_sub(pos, max_commits - (post_end - post_start));
382            pre_end = pos;
383        } else {
384            pre_start = u32::saturating_sub(index.num_commits(), max_commits);
385            pre_end = index.num_commits();
386            post_start = pre_end;
387            post_end = pre_end;
388        }
389
390        let mut progress = DefaultChangedPathIndexProgress {
391            current: 0,
392            total: (pre_end - pre_start) + (post_end - post_start),
393        };
394        let mut emit_progress = || {
395            progress_callback(&progress);
396            progress.current += 1;
397        };
398
399        let to_index_err = |source| DefaultIndexStoreError::IndexCommits {
400            op_id: op_id.clone(),
401            source,
402        };
403        let index_commit = async |changed_paths: &mut CompositeChangedPathIndex,
404                                  pos: GlobalCommitPosition| {
405            assert_eq!(changed_paths.next_mutable_commit_pos(), Some(pos));
406            let commit_id = index.as_composite().commits().entry_by_pos(pos).commit_id();
407            let commit = store.get_commit_async(&commit_id).await?;
408            let paths = collect_changed_paths(&index, &commit).await?;
409            changed_paths.add_changed_paths(paths);
410            Ok(())
411        };
412
413        // Index pre range
414        let mut new_changed_paths =
415            CompositeChangedPathIndex::empty(GlobalCommitPosition(pre_start));
416        new_changed_paths.make_mutable();
417        tracing::info!(?pre_start, ?pre_end, "indexing changed paths in commits");
418        for pos in (pre_start..pre_end).map(GlobalCommitPosition) {
419            emit_progress();
420            index_commit(&mut new_changed_paths, pos)
421                .await
422                .map_err(to_index_err)?;
423        }
424        new_changed_paths
425            .save_in(&self.changed_path_segments_dir())
426            .map_err(DefaultIndexStoreError::SaveIndex)?;
427
428        // Copy previously-indexed segments
429        new_changed_paths.append_segments(old_changed_paths);
430
431        // Index post range, which is usually empty
432        new_changed_paths.make_mutable();
433        tracing::info!(?post_start, ?post_end, "indexing changed paths in commits");
434        for pos in (post_start..post_end).map(GlobalCommitPosition) {
435            emit_progress();
436            index_commit(&mut new_changed_paths, pos)
437                .await
438                .map_err(to_index_err)?;
439        }
440        new_changed_paths.maybe_squash_with_ancestors();
441        new_changed_paths
442            .save_in(&self.changed_path_segments_dir())
443            .map_err(DefaultIndexStoreError::SaveIndex)?;
444
445        // Update the operation link to point to the new segments
446        let commits = index.readonly_commits().clone();
447        let index = DefaultReadonlyIndex::from_segment(commits, new_changed_paths);
448        self.associate_index_with_operation(&index, op_id)
449            .map_err(|source| DefaultIndexStoreError::AssociateIndex {
450                op_id: op_id.to_owned(),
451                source,
452            })?;
453        emit_progress();
454        Ok(index)
455    }
456
457    fn save_mutable_index(
458        &self,
459        index: DefaultMutableIndex,
460        op_id: &OperationId,
461    ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
462        // Create directories in case the store was initialized by jj < 0.33.
463        self.ensure_base_dirs()
464            .map_err(DefaultIndexStoreError::SaveIndex)?;
465        let (commits, mut changed_paths) = index.into_segment();
466        let commits = commits
467            .maybe_squash_with_ancestors()
468            .save_in(&self.commit_segments_dir())
469            .map_err(DefaultIndexStoreError::SaveIndex)?;
470        changed_paths.maybe_squash_with_ancestors();
471        changed_paths
472            .save_in(&self.changed_path_segments_dir())
473            .map_err(DefaultIndexStoreError::SaveIndex)?;
474        let index = DefaultReadonlyIndex::from_segment(commits, changed_paths);
475        self.associate_index_with_operation(&index, op_id)
476            .map_err(|source| DefaultIndexStoreError::AssociateIndex {
477                op_id: op_id.to_owned(),
478                source,
479            })?;
480        Ok(index)
481    }
482
483    /// Records a link from the given operation to the this index version.
484    fn associate_index_with_operation(
485        &self,
486        index: &DefaultReadonlyIndex,
487        op_id: &OperationId,
488    ) -> Result<(), PathError> {
489        let proto = crate::protos::default_index::SegmentControl {
490            commit_segment_id: index.readonly_commits().id().to_bytes(),
491            changed_path_start_commit_pos: index
492                .changed_paths()
493                .start_commit_pos()
494                .map(|GlobalCommitPosition(start)| start),
495            changed_path_segment_ids: index
496                .changed_paths()
497                .readonly_segments()
498                .iter()
499                .map(|segment| segment.id().to_bytes())
500                .collect(),
501        };
502        let dir = self.op_links_dir();
503        let mut temp_file = NamedTempFile::new_in(&dir).context(&dir)?;
504        let file = temp_file.as_file_mut();
505        file.write_all(&proto.encode_to_vec())
506            .context(temp_file.path())?;
507        let path = dir.join(op_id.hex());
508        persist_temp_file(temp_file, &path).context(&path)?;
509        Ok(())
510    }
511}
512
513#[async_trait(?Send)]
514impl IndexStore for DefaultIndexStore {
515    fn name(&self) -> &str {
516        Self::name()
517    }
518
519    async fn get_index_at_op(
520        &self,
521        op: &Operation,
522        store: &Arc<Store>,
523    ) -> IndexStoreResult<Box<dyn ReadonlyIndex>> {
524        let field_lengths = FieldLengths {
525            commit_id: store.commit_id_length(),
526            change_id: store.change_id_length(),
527        };
528        let index = match self.load_index_at_operation(op.id(), field_lengths) {
529            Err(DefaultIndexStoreError::LoadAssociation(PathError { source: error, .. }))
530                if error.kind() == io::ErrorKind::NotFound =>
531            {
532                self.build_index_at_operation(op, store).await
533            }
534            Err(DefaultIndexStoreError::LoadIndex(err)) if err.is_corrupt_or_not_found() => {
535                // If the index was corrupt (maybe it was written in a different format),
536                // we just reindex.
537                match &err {
538                    ReadonlyIndexLoadError::UnexpectedVersion {
539                        kind,
540                        found_version,
541                        expected_version,
542                    } => {
543                        eprintln!(
544                            "Found {kind} index format version {found_version}, expected version \
545                             {expected_version}. Reindexing..."
546                        );
547                    }
548                    ReadonlyIndexLoadError::Other { error, .. } => {
549                        eprintln!("{err} (maybe the format has changed): {error}. Reindexing...");
550                    }
551                }
552                self.reinit()
553                    .map_err(|err| IndexStoreError::Read(err.into()))?;
554                self.build_index_at_operation(op, store).await
555            }
556            result => result,
557        }
558        .map_err(|err| IndexStoreError::Read(err.into()))?;
559        Ok(Box::new(index))
560    }
561
562    fn write_index(
563        &self,
564        index: Box<dyn MutableIndex>,
565        op: &Operation,
566    ) -> IndexStoreResult<Box<dyn ReadonlyIndex>> {
567        let index: Box<DefaultMutableIndex> = index
568            .downcast()
569            .expect("index to merge in must be a DefaultMutableIndex");
570        let index = self
571            .save_mutable_index(*index, op.id())
572            .map_err(|err| IndexStoreError::Write(err.into()))?;
573        Ok(Box::new(index))
574    }
575}
576
577/// Progress of [`DefaultIndexStore::build_changed_path_index_at_operation()`].
578#[derive(Clone, Debug)]
579pub struct DefaultChangedPathIndexProgress {
580    pub current: u32,
581    pub total: u32,
582}