jj_lib/default_index/
store.rs

1// Copyright 2023 The Jujutsu Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(missing_docs)]
16
17use std::any::Any;
18use std::collections::HashSet;
19use std::fs;
20use std::io;
21use std::io::Write as _;
22use std::path::Path;
23use std::path::PathBuf;
24use std::sync::Arc;
25
26use itertools::Itertools as _;
27use tempfile::NamedTempFile;
28use thiserror::Error;
29
30use super::mutable::DefaultMutableIndex;
31use super::readonly::DefaultReadonlyIndex;
32use super::readonly::ReadonlyIndexLoadError;
33use super::readonly::ReadonlyIndexSegment;
34use crate::backend::BackendError;
35use crate::backend::BackendInitError;
36use crate::backend::CommitId;
37use crate::commit::CommitByCommitterTimestamp;
38use crate::dag_walk;
39use crate::file_util;
40use crate::file_util::persist_content_addressed_temp_file;
41use crate::file_util::IoResultExt as _;
42use crate::file_util::PathError;
43use crate::index::Index as _;
44use crate::index::IndexReadError;
45use crate::index::IndexStore;
46use crate::index::IndexWriteError;
47use crate::index::MutableIndex;
48use crate::index::ReadonlyIndex;
49use crate::object_id::ObjectId as _;
50use crate::op_store::OpStoreError;
51use crate::op_store::OperationId;
52use crate::operation::Operation;
53use crate::store::Store;
54
55// BLAKE2b-512 hash length in hex string
56const SEGMENT_FILE_NAME_LENGTH: usize = 64 * 2;
57
58/// Error that may occur during `DefaultIndexStore` initialization.
59#[derive(Debug, Error)]
60#[error("Failed to initialize index store")]
61pub struct DefaultIndexStoreInitError(#[from] pub PathError);
62
63impl From<DefaultIndexStoreInitError> for BackendInitError {
64    fn from(err: DefaultIndexStoreInitError) -> Self {
65        BackendInitError(err.into())
66    }
67}
68
69#[derive(Debug, Error)]
70pub enum DefaultIndexStoreError {
71    #[error("Failed to associate commit index file with an operation {op_id}")]
72    AssociateIndex {
73        op_id: OperationId,
74        source: io::Error,
75    },
76    #[error("Failed to load associated commit index file name")]
77    LoadAssociation(#[source] io::Error),
78    #[error(transparent)]
79    LoadIndex(ReadonlyIndexLoadError),
80    #[error("Failed to write commit index file")]
81    SaveIndex(#[source] io::Error),
82    #[error("Failed to index commits at operation {op_id}")]
83    IndexCommits {
84        op_id: OperationId,
85        source: BackendError,
86    },
87    #[error(transparent)]
88    OpStore(#[from] OpStoreError),
89}
90
91#[derive(Debug)]
92pub struct DefaultIndexStore {
93    dir: PathBuf,
94}
95
96impl DefaultIndexStore {
97    pub fn name() -> &'static str {
98        "default"
99    }
100
101    pub fn init(dir: &Path) -> Result<Self, DefaultIndexStoreInitError> {
102        let store = DefaultIndexStore {
103            dir: dir.to_owned(),
104        };
105        store.ensure_base_dirs()?;
106        Ok(store)
107    }
108
109    pub fn load(dir: &Path) -> DefaultIndexStore {
110        DefaultIndexStore {
111            dir: dir.to_owned(),
112        }
113    }
114
115    pub fn reinit(&self) -> Result<(), DefaultIndexStoreInitError> {
116        // Create base directories in case the store was initialized by old jj.
117        self.ensure_base_dirs()?;
118        // Remove all operation links to trigger rebuilding.
119        file_util::remove_dir_contents(&self.operations_dir())?;
120        // Remove index segments to save disk space. If raced, new segment file
121        // will be created by the other process.
122        file_util::remove_dir_contents(&self.segments_dir())?;
123        // jj <= 0.14 created segment files in the top directory
124        for entry in self.dir.read_dir().context(&self.dir)? {
125            let entry = entry.context(&self.dir)?;
126            let path = entry.path();
127            if path.file_name().unwrap().len() != SEGMENT_FILE_NAME_LENGTH {
128                // Skip "type" file, "operations" directory, etc.
129                continue;
130            }
131            fs::remove_file(&path).context(&path)?;
132        }
133        Ok(())
134    }
135
136    fn ensure_base_dirs(&self) -> Result<(), PathError> {
137        for dir in [self.operations_dir(), self.segments_dir()] {
138            file_util::create_or_reuse_dir(&dir).context(&dir)?;
139        }
140        Ok(())
141    }
142
143    fn operations_dir(&self) -> PathBuf {
144        self.dir.join("operations")
145    }
146
147    fn segments_dir(&self) -> PathBuf {
148        self.dir.join("segments")
149    }
150
151    fn load_index_segments_at_operation(
152        &self,
153        op_id: &OperationId,
154        commit_id_length: usize,
155        change_id_length: usize,
156    ) -> Result<Arc<ReadonlyIndexSegment>, DefaultIndexStoreError> {
157        let op_id_file = self.operations_dir().join(op_id.hex());
158        let index_file_id_hex =
159            fs::read_to_string(op_id_file).map_err(DefaultIndexStoreError::LoadAssociation)?;
160        ReadonlyIndexSegment::load(
161            &self.segments_dir(),
162            index_file_id_hex,
163            commit_id_length,
164            change_id_length,
165        )
166        .map_err(DefaultIndexStoreError::LoadIndex)
167    }
168
169    /// Rebuilds index for the given `operation`.
170    ///
171    /// The index to be built will be calculated from one of the ancestor
172    /// operations if exists. Use `reinit()` to rebuild index from scratch.
173    pub fn build_index_at_operation(
174        &self,
175        operation: &Operation,
176        store: &Arc<Store>,
177    ) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
178        let index_segment = self.build_index_segments_at_operation(operation, store)?;
179        Ok(DefaultReadonlyIndex::from_segment(index_segment))
180    }
181
182    #[tracing::instrument(skip(self, store))]
183    fn build_index_segments_at_operation(
184        &self,
185        operation: &Operation,
186        store: &Arc<Store>,
187    ) -> Result<Arc<ReadonlyIndexSegment>, DefaultIndexStoreError> {
188        let view = operation.view()?;
189        let operations_dir = self.operations_dir();
190        let commit_id_length = store.commit_id_length();
191        let change_id_length = store.change_id_length();
192        let mut visited_heads: HashSet<CommitId> =
193            view.all_referenced_commit_ids().cloned().collect();
194        let mut historical_heads: Vec<(CommitId, OperationId)> = visited_heads
195            .iter()
196            .map(|commit_id| (commit_id.clone(), operation.id().clone()))
197            .collect();
198        let mut parent_op_id: Option<OperationId> = None;
199        for op in dag_walk::dfs_ok(
200            [Ok(operation.clone())],
201            |op: &Operation| op.id().clone(),
202            |op: &Operation| op.parents().collect_vec(),
203        ) {
204            let op = op?;
205            // Pick the latest existing ancestor operation as the parent
206            // segment. Perhaps, breadth-first search is more appropriate here,
207            // but that wouldn't matter in practice as the operation log is
208            // mostly linear.
209            if parent_op_id.is_none() && operations_dir.join(op.id().hex()).is_file() {
210                parent_op_id = Some(op.id().clone());
211            }
212            // TODO: no need to walk ancestors of the parent_op_id operation
213            for commit_id in op.view()?.all_referenced_commit_ids() {
214                if visited_heads.insert(commit_id.clone()) {
215                    historical_heads.push((commit_id.clone(), op.id().clone()));
216                }
217            }
218        }
219        let maybe_parent_file;
220        let mut mutable_index;
221        match parent_op_id {
222            None => {
223                maybe_parent_file = None;
224                mutable_index = DefaultMutableIndex::full(commit_id_length, change_id_length);
225            }
226            Some(parent_op_id) => {
227                let parent_file = self.load_index_segments_at_operation(
228                    &parent_op_id,
229                    commit_id_length,
230                    change_id_length,
231                )?;
232                maybe_parent_file = Some(parent_file.clone());
233                mutable_index = DefaultMutableIndex::incremental(parent_file);
234            }
235        }
236
237        tracing::info!(
238            ?maybe_parent_file,
239            heads_count = historical_heads.len(),
240            "indexing commits reachable from historical heads"
241        );
242        // Build a list of ancestors of heads where parents and predecessors come after
243        // the commit itself.
244        let parent_file_has_id = |id: &CommitId| {
245            maybe_parent_file
246                .as_ref()
247                .is_some_and(|segment| segment.as_composite().has_id(id))
248        };
249        let get_commit_with_op = |commit_id: &CommitId, op_id: &OperationId| {
250            let op_id = op_id.clone();
251            match store.get_commit(commit_id) {
252                // Propagate head's op_id to report possible source of an error.
253                // The op_id doesn't have to be included in the sort key, but
254                // that wouldn't matter since the commit should be unique.
255                Ok(commit) => Ok((CommitByCommitterTimestamp(commit), op_id)),
256                Err(source) => Err(DefaultIndexStoreError::IndexCommits { op_id, source }),
257            }
258        };
259        let commits = dag_walk::topo_order_reverse_ord_ok(
260            historical_heads
261                .iter()
262                .filter(|&(commit_id, _)| !parent_file_has_id(commit_id))
263                .map(|(commit_id, op_id)| get_commit_with_op(commit_id, op_id)),
264            |(CommitByCommitterTimestamp(commit), _)| commit.id().clone(),
265            |(CommitByCommitterTimestamp(commit), op_id)| {
266                itertools::chain(commit.parent_ids(), commit.predecessor_ids())
267                    .filter(|&id| !parent_file_has_id(id))
268                    .map(|commit_id| get_commit_with_op(commit_id, op_id))
269                    .collect_vec()
270            },
271        )?;
272        for (CommitByCommitterTimestamp(commit), _) in commits.iter().rev() {
273            mutable_index.add_commit(commit);
274        }
275
276        let index_file = self.save_mutable_index(mutable_index, operation.id())?;
277        tracing::info!(
278            ?index_file,
279            commits_count = commits.len(),
280            "saved new index file"
281        );
282
283        Ok(index_file)
284    }
285
286    fn save_mutable_index(
287        &self,
288        mutable_index: DefaultMutableIndex,
289        op_id: &OperationId,
290    ) -> Result<Arc<ReadonlyIndexSegment>, DefaultIndexStoreError> {
291        let index_segment = mutable_index
292            .squash_and_save_in(&self.segments_dir())
293            .map_err(DefaultIndexStoreError::SaveIndex)?;
294        self.associate_file_with_operation(&index_segment, op_id)
295            .map_err(|source| DefaultIndexStoreError::AssociateIndex {
296                op_id: op_id.to_owned(),
297                source,
298            })?;
299        Ok(index_segment)
300    }
301
302    /// Records a link from the given operation to the this index version.
303    fn associate_file_with_operation(
304        &self,
305        index: &ReadonlyIndexSegment,
306        op_id: &OperationId,
307    ) -> io::Result<()> {
308        let dir = self.operations_dir();
309        let mut temp_file = NamedTempFile::new_in(&dir)?;
310        let file = temp_file.as_file_mut();
311        file.write_all(index.name().as_bytes())?;
312        persist_content_addressed_temp_file(temp_file, dir.join(op_id.hex()))?;
313        Ok(())
314    }
315}
316
317impl IndexStore for DefaultIndexStore {
318    fn as_any(&self) -> &dyn Any {
319        self
320    }
321
322    fn name(&self) -> &str {
323        Self::name()
324    }
325
326    fn get_index_at_op(
327        &self,
328        op: &Operation,
329        store: &Arc<Store>,
330    ) -> Result<Box<dyn ReadonlyIndex>, IndexReadError> {
331        let index_segment = match self.load_index_segments_at_operation(
332            op.id(),
333            store.commit_id_length(),
334            store.change_id_length(),
335        ) {
336            Err(DefaultIndexStoreError::LoadAssociation(err))
337                if err.kind() == io::ErrorKind::NotFound =>
338            {
339                self.build_index_segments_at_operation(op, store)
340            }
341            Err(DefaultIndexStoreError::LoadIndex(err)) if err.is_corrupt_or_not_found() => {
342                // If the index was corrupt (maybe it was written in a different format),
343                // we just reindex.
344                match &err {
345                    ReadonlyIndexLoadError::UnexpectedVersion {
346                        found_version,
347                        expected_version,
348                    } => {
349                        eprintln!(
350                            "Found index format version {found_version}, expected version \
351                             {expected_version}. Reindexing..."
352                        );
353                    }
354                    ReadonlyIndexLoadError::Other { name: _, error } => {
355                        eprintln!("{err} (maybe the format has changed): {error}. Reindexing...");
356                    }
357                }
358                self.reinit().map_err(|err| IndexReadError(err.into()))?;
359                self.build_index_segments_at_operation(op, store)
360            }
361            result => result,
362        }
363        .map_err(|err| IndexReadError(err.into()))?;
364        Ok(Box::new(DefaultReadonlyIndex::from_segment(index_segment)))
365    }
366
367    fn write_index(
368        &self,
369        index: Box<dyn MutableIndex>,
370        op: &Operation,
371    ) -> Result<Box<dyn ReadonlyIndex>, IndexWriteError> {
372        let index = index
373            .into_any()
374            .downcast::<DefaultMutableIndex>()
375            .expect("index to merge in must be a DefaultMutableIndex");
376        let index_segment = self
377            .save_mutable_index(*index, op.id())
378            .map_err(|err| IndexWriteError(err.into()))?;
379        Ok(Box::new(DefaultReadonlyIndex::from_segment(index_segment)))
380    }
381}