#![allow(missing_docs)]
use std::any::Any;
use std::collections::HashSet;
use std::fs;
use std::io;
use std::io::Write;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use itertools::Itertools;
use tempfile::NamedTempFile;
use thiserror::Error;
use super::mutable::DefaultMutableIndex;
use super::readonly::DefaultReadonlyIndex;
use super::readonly::ReadonlyIndexLoadError;
use super::readonly::ReadonlyIndexSegment;
use crate::backend::BackendError;
use crate::backend::BackendInitError;
use crate::backend::CommitId;
use crate::commit::CommitByCommitterTimestamp;
use crate::dag_walk;
use crate::file_util;
use crate::file_util::persist_content_addressed_temp_file;
use crate::file_util::IoResultExt as _;
use crate::file_util::PathError;
use crate::index::Index;
use crate::index::IndexReadError;
use crate::index::IndexStore;
use crate::index::IndexWriteError;
use crate::index::MutableIndex;
use crate::index::ReadonlyIndex;
use crate::object_id::ObjectId;
use crate::op_store::OpStoreError;
use crate::op_store::OperationId;
use crate::operation::Operation;
use crate::store::Store;
const SEGMENT_FILE_NAME_LENGTH: usize = 64 * 2;
#[derive(Debug, Error)]
#[error("Failed to initialize index store")]
pub struct DefaultIndexStoreInitError(#[from] pub PathError);
impl From<DefaultIndexStoreInitError> for BackendInitError {
fn from(err: DefaultIndexStoreInitError) -> Self {
BackendInitError(err.into())
}
}
#[derive(Debug, Error)]
pub enum DefaultIndexStoreError {
#[error("Failed to associate commit index file with an operation {op_id}")]
AssociateIndex {
op_id: OperationId,
source: io::Error,
},
#[error("Failed to load associated commit index file name")]
LoadAssociation(#[source] io::Error),
#[error(transparent)]
LoadIndex(ReadonlyIndexLoadError),
#[error("Failed to write commit index file")]
SaveIndex(#[source] io::Error),
#[error("Failed to index commits at operation {op_id}")]
IndexCommits {
op_id: OperationId,
source: BackendError,
},
#[error(transparent)]
OpStore(#[from] OpStoreError),
}
#[derive(Debug)]
pub struct DefaultIndexStore {
dir: PathBuf,
}
impl DefaultIndexStore {
pub fn name() -> &'static str {
"default"
}
pub fn init(dir: &Path) -> Result<Self, DefaultIndexStoreInitError> {
let store = DefaultIndexStore {
dir: dir.to_owned(),
};
store.ensure_base_dirs()?;
Ok(store)
}
pub fn load(dir: &Path) -> DefaultIndexStore {
DefaultIndexStore {
dir: dir.to_owned(),
}
}
pub fn reinit(&self) -> Result<(), DefaultIndexStoreInitError> {
self.ensure_base_dirs()?;
file_util::remove_dir_contents(&self.operations_dir())?;
file_util::remove_dir_contents(&self.segments_dir())?;
for entry in self.dir.read_dir().context(&self.dir)? {
let entry = entry.context(&self.dir)?;
let path = entry.path();
if path.file_name().unwrap().len() != SEGMENT_FILE_NAME_LENGTH {
continue;
}
fs::remove_file(&path).context(&path)?;
}
Ok(())
}
fn ensure_base_dirs(&self) -> Result<(), PathError> {
for dir in [self.operations_dir(), self.segments_dir()] {
file_util::create_or_reuse_dir(&dir).context(&dir)?;
}
Ok(())
}
fn operations_dir(&self) -> PathBuf {
self.dir.join("operations")
}
fn segments_dir(&self) -> PathBuf {
self.dir.join("segments")
}
fn load_index_segments_at_operation(
&self,
op_id: &OperationId,
commit_id_length: usize,
change_id_length: usize,
) -> Result<Arc<ReadonlyIndexSegment>, DefaultIndexStoreError> {
let op_id_file = self.operations_dir().join(op_id.hex());
let index_file_id_hex =
fs::read_to_string(op_id_file).map_err(DefaultIndexStoreError::LoadAssociation)?;
ReadonlyIndexSegment::load(
&self.segments_dir(),
index_file_id_hex,
commit_id_length,
change_id_length,
)
.map_err(DefaultIndexStoreError::LoadIndex)
}
pub fn build_index_at_operation(
&self,
operation: &Operation,
store: &Arc<Store>,
) -> Result<DefaultReadonlyIndex, DefaultIndexStoreError> {
let index_segment = self.build_index_segments_at_operation(operation, store)?;
Ok(DefaultReadonlyIndex::from_segment(index_segment))
}
#[tracing::instrument(skip(self, store))]
fn build_index_segments_at_operation(
&self,
operation: &Operation,
store: &Arc<Store>,
) -> Result<Arc<ReadonlyIndexSegment>, DefaultIndexStoreError> {
let view = operation.view()?;
let operations_dir = self.operations_dir();
let commit_id_length = store.commit_id_length();
let change_id_length = store.change_id_length();
let mut visited_heads: HashSet<CommitId> =
view.all_referenced_commit_ids().cloned().collect();
let mut historical_heads: Vec<(CommitId, OperationId)> = visited_heads
.iter()
.map(|commit_id| (commit_id.clone(), operation.id().clone()))
.collect();
let mut parent_op_id: Option<OperationId> = None;
for op in dag_walk::dfs_ok(
[Ok(operation.clone())],
|op: &Operation| op.id().clone(),
|op: &Operation| op.parents().collect_vec(),
) {
let op = op?;
if parent_op_id.is_none() && operations_dir.join(op.id().hex()).is_file() {
parent_op_id = Some(op.id().clone());
}
for commit_id in op.view()?.all_referenced_commit_ids() {
if visited_heads.insert(commit_id.clone()) {
historical_heads.push((commit_id.clone(), op.id().clone()));
}
}
}
let maybe_parent_file;
let mut mutable_index;
match parent_op_id {
None => {
maybe_parent_file = None;
mutable_index = DefaultMutableIndex::full(commit_id_length, change_id_length);
}
Some(parent_op_id) => {
let parent_file = self.load_index_segments_at_operation(
&parent_op_id,
commit_id_length,
change_id_length,
)?;
maybe_parent_file = Some(parent_file.clone());
mutable_index = DefaultMutableIndex::incremental(parent_file);
}
}
tracing::info!(
?maybe_parent_file,
heads_count = historical_heads.len(),
"indexing commits reachable from historical heads"
);
let parent_file_has_id = |id: &CommitId| {
maybe_parent_file
.as_ref()
.is_some_and(|segment| segment.as_composite().has_id(id))
};
let get_commit_with_op = |commit_id: &CommitId, op_id: &OperationId| {
let op_id = op_id.clone();
match store.get_commit(commit_id) {
Ok(commit) => Ok((CommitByCommitterTimestamp(commit), op_id)),
Err(source) => Err(DefaultIndexStoreError::IndexCommits { op_id, source }),
}
};
let commits = dag_walk::topo_order_reverse_ord_ok(
historical_heads
.iter()
.filter(|&(commit_id, _)| !parent_file_has_id(commit_id))
.map(|(commit_id, op_id)| get_commit_with_op(commit_id, op_id)),
|(CommitByCommitterTimestamp(commit), _)| commit.id().clone(),
|(CommitByCommitterTimestamp(commit), op_id)| {
itertools::chain(commit.parent_ids(), commit.predecessor_ids())
.filter(|&id| !parent_file_has_id(id))
.map(|commit_id| get_commit_with_op(commit_id, op_id))
.collect_vec()
},
)?;
for (CommitByCommitterTimestamp(commit), _) in commits.iter().rev() {
mutable_index.add_commit(commit);
}
let index_file = self.save_mutable_index(mutable_index, operation.id())?;
tracing::info!(
?index_file,
commits_count = commits.len(),
"saved new index file"
);
Ok(index_file)
}
fn save_mutable_index(
&self,
mutable_index: DefaultMutableIndex,
op_id: &OperationId,
) -> Result<Arc<ReadonlyIndexSegment>, DefaultIndexStoreError> {
let index_segment = mutable_index
.squash_and_save_in(&self.segments_dir())
.map_err(DefaultIndexStoreError::SaveIndex)?;
self.associate_file_with_operation(&index_segment, op_id)
.map_err(|source| DefaultIndexStoreError::AssociateIndex {
op_id: op_id.to_owned(),
source,
})?;
Ok(index_segment)
}
fn associate_file_with_operation(
&self,
index: &ReadonlyIndexSegment,
op_id: &OperationId,
) -> io::Result<()> {
let mut temp_file = NamedTempFile::new_in(&self.dir)?;
let file = temp_file.as_file_mut();
file.write_all(index.name().as_bytes())?;
persist_content_addressed_temp_file(temp_file, self.operations_dir().join(op_id.hex()))?;
Ok(())
}
}
impl IndexStore for DefaultIndexStore {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
Self::name()
}
fn get_index_at_op(
&self,
op: &Operation,
store: &Arc<Store>,
) -> Result<Box<dyn ReadonlyIndex>, IndexReadError> {
let index_segment = match self.load_index_segments_at_operation(
op.id(),
store.commit_id_length(),
store.change_id_length(),
) {
Err(DefaultIndexStoreError::LoadAssociation(err))
if err.kind() == io::ErrorKind::NotFound =>
{
self.build_index_segments_at_operation(op, store)
}
Err(DefaultIndexStoreError::LoadIndex(err)) if err.is_corrupt_or_not_found() => {
match &err {
ReadonlyIndexLoadError::UnexpectedVersion {
found_version,
expected_version,
} => {
eprintln!(
"Found index format version {found_version}, expected version \
{expected_version}. Reindexing..."
);
}
ReadonlyIndexLoadError::Other { name: _, error } => {
eprintln!("{err} (maybe the format has changed): {error}. Reindexing...");
}
}
self.reinit().map_err(|err| IndexReadError(err.into()))?;
self.build_index_segments_at_operation(op, store)
}
result => result,
}
.map_err(|err| IndexReadError(err.into()))?;
Ok(Box::new(DefaultReadonlyIndex::from_segment(index_segment)))
}
fn write_index(
&self,
index: Box<dyn MutableIndex>,
op: &Operation,
) -> Result<Box<dyn ReadonlyIndex>, IndexWriteError> {
let index = index
.into_any()
.downcast::<DefaultMutableIndex>()
.expect("index to merge in must be a DefaultMutableIndex");
let index_segment = self
.save_mutable_index(*index, op.id())
.map_err(|err| IndexWriteError(err.into()))?;
Ok(Box::new(DefaultReadonlyIndex::from_segment(index_segment)))
}
}