use std::{
collections::btree_map::BTreeMap,
io::{Read, Write},
ops::Range,
path::{Path, PathBuf},
sync::Arc,
};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use exocore_core::simple_store::{json_disk_store::JsonDiskStore, SimpleStore};
use exocore_protos::generated::data_chain_capnp::block_header;
use extindex::{Builder, Encodable, Reader};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use super::{DirectoryChainStoreConfig, DirectoryError};
use crate::{
block::{Block, BlockOffset},
chain::Error,
operation::OperationId,
};
pub struct OperationIndex {
config: DirectoryChainStoreConfig,
directory: PathBuf,
metadata_store: JsonDiskStore<Metadata>,
memory_offset_from: BlockOffset,
memory_index: BTreeMap<OperationId, BlockOffset>,
next_expected_offset: BlockOffset,
stored_indices: Vec<StoredIndex>,
}
impl OperationIndex {
pub fn create(
config: DirectoryChainStoreConfig,
directory_path: &Path,
) -> Result<OperationIndex, Error> {
let metadata_path = Metadata::file_path(directory_path);
let metadata_store = JsonDiskStore::<Metadata>::new(&metadata_path).map_err(|err| {
Error::new_io(
err,
format!(
"Error creating operation index metadata file {:?}",
metadata_path
),
)
})?;
let operation_index = OperationIndex {
config,
directory: directory_path.to_path_buf(),
metadata_store,
memory_offset_from: 0,
memory_index: BTreeMap::new(),
next_expected_offset: 0,
stored_indices: vec![],
};
operation_index.write_metadata()?;
Ok(operation_index)
}
pub fn open(
config: DirectoryChainStoreConfig,
directory_path: &Path,
) -> Result<OperationIndex, Error> {
let metadata_path = Metadata::file_path(directory_path);
let metadata_store = JsonDiskStore::<Metadata>::new(&metadata_path).map_err(|err| {
Error::new_io(
err,
format!(
"Error creating operation index metadata file {:?}",
metadata_path
),
)
})?;
let metadata = metadata_store
.read()
.map_err(|err| {
Error::new_io(
err,
format!(
"Error reading operation index metadata file {:?}",
metadata_path
),
)
})?
.ok_or_else(|| {
Error::UnexpectedState(anyhow!("Operation index metadata file didn't exist"))
})?;
let mut stored_indices = Vec::new();
for index_file_metadata in metadata.files.iter() {
let index_file_path = directory_path.join(&index_file_metadata.file_name);
let index_reader = Reader::open(index_file_path)
.map_err(|err| DirectoryError::OperationIndexRead(Arc::new(err)))?;
stored_indices.push(StoredIndex {
range: index_file_metadata.offset_from..index_file_metadata.offset_to,
index_reader,
});
}
let next_expected_offset = stored_indices.last().map_or(0, |index| index.range.end);
let memory_offset_from = next_expected_offset;
let memory_index = BTreeMap::new();
Ok(OperationIndex {
config,
directory: directory_path.to_path_buf(),
metadata_store,
memory_offset_from,
memory_index,
next_expected_offset,
stored_indices,
})
}
pub fn next_expected_block_offset(&self) -> BlockOffset {
self.next_expected_offset
}
pub fn index_blocks<I: Iterator<Item = Result<B, Error>>, B: Block>(
&mut self,
iterator: I,
) -> Result<(), Error> {
for block in iterator {
let block = block?;
if block.offset() >= self.memory_offset_from {
self.index_block(&block)?;
}
}
Ok(())
}
pub fn index_block<B: Block>(&mut self, block: &B) -> Result<(), Error> {
if self.next_expected_offset != block.offset() {
return Err(Error::Integrity(anyhow!(
"Tried to index operations from a block with unexpected offset: block={} != expected={}",
block.offset(),
self.next_expected_offset
)));
}
let block_header_reader: block_header::Reader = block
.header()
.get_reader()
.map_err(|err| Error::Block(err.into()))?;
let block_propose_op_id = block_header_reader.get_proposed_operation_id();
self.put_operation_block(block_propose_op_id, block.offset());
for operation in block.operations_iter()? {
let operation_reader = operation.get_reader()?;
self.put_operation_block(operation_reader.get_operation_id(), block.offset());
}
self.next_expected_offset = block.next_offset();
self.maybe_flush_to_disk()?;
Ok(())
}
pub fn get_operation_block(
&self,
operation_id: OperationId,
) -> Result<Option<BlockOffset>, Error> {
if let Some(block_offset) = self.memory_index.get(&operation_id) {
return Ok(Some(*block_offset));
}
let needle = StoredIndexKey { operation_id };
for index in self.stored_indices.iter() {
let opt_entry = index
.index_reader
.find(&needle)
.map_err(|err| DirectoryError::OperationIndexRead(Arc::new(err)))?;
if let Some(entry) = opt_entry {
return Ok(Some(entry.value().offset));
}
}
Ok(None)
}
pub fn truncate_from_offset(&mut self, from_offset: BlockOffset) -> Result<(), Error> {
if from_offset >= self.memory_offset_from {
self.memory_index.clear();
self.next_expected_offset = self.memory_offset_from;
} else {
let mut previous_indices = Vec::new();
std::mem::swap(&mut self.stored_indices, &mut previous_indices);
for index in previous_indices {
if index.range.end >= from_offset {
self.next_expected_offset = self.next_expected_offset.min(index.range.start);
let index_path = StoredIndex::file_path(&self.directory, &index.range);
let _ = std::fs::remove_file(index_path);
} else {
self.stored_indices.push(index);
}
}
self.next_expected_offset = self
.stored_indices
.last()
.map_or(0, |index| index.range.end);
self.memory_offset_from = self.next_expected_offset;
}
self.write_metadata()?;
Ok(())
}
fn put_operation_block(&mut self, operation_id: OperationId, block_offset: BlockOffset) {
self.memory_index.insert(operation_id, block_offset);
}
fn maybe_flush_to_disk(&mut self) -> Result<(), Error> {
if self.memory_index.len() > self.config.operation_index_max_memory_items {
debug!(
"Storing in-memory index of operations to disk ({} items)",
self.memory_index.len()
);
let from_offset = self.memory_offset_from;
let to_offset = self.next_expected_offset;
let range = from_offset..to_offset;
let index_file = StoredIndex::file_path(&self.directory, &range);
let ops_count = self.memory_index.len() as u64;
let ops_iter = self.memory_index.iter().map(|(operation_id, offset)| {
let key = StoredIndexKey {
operation_id: *operation_id,
};
let value = StoredIndexValue { offset: *offset };
extindex::Entry::new(key, value)
});
let index_builder =
Builder::<StoredIndexKey, StoredIndexValue>::new(index_file.clone());
index_builder
.build_from_sorted(ops_iter, ops_count)
.map_err(|err| DirectoryError::OperationIndexBuild(Arc::new(err)))?;
let index_reader = Reader::open(index_file)
.map_err(|err| DirectoryError::OperationIndexRead(Arc::new(err)))?;
let stored_index = StoredIndex {
range,
index_reader,
};
self.stored_indices.push(stored_index);
self.write_metadata()?;
self.memory_offset_from = self.next_expected_offset;
self.memory_index.clear();
}
Ok(())
}
fn write_metadata(&self) -> Result<(), Error> {
let files = self
.stored_indices
.iter()
.map(|index| {
let file_name = StoredIndex::file_name(&index.range);
MetadataIndexFile {
offset_from: index.range.start,
offset_to: index.range.end,
file_name,
}
})
.collect_vec();
let metadata = Metadata { files };
self.metadata_store
.write(&metadata)
.map_err(|err| Error::new_io(err, "Error storing into operation index metadata file"))
}
}
struct StoredIndex {
range: Range<BlockOffset>,
index_reader: Reader<StoredIndexKey, StoredIndexValue>,
}
impl StoredIndex {
fn file_path(directory: &Path, range: &Range<BlockOffset>) -> PathBuf {
directory.join(Self::file_name(range))
}
fn file_name(range: &Range<BlockOffset>) -> String {
format!("opsidx_{}.bin", range.start)
}
}
#[derive(Serialize, Deserialize)]
struct Metadata {
files: Vec<MetadataIndexFile>,
}
impl Metadata {
fn file_path(directory: &Path) -> PathBuf {
directory.join("ops_idx.json")
}
}
#[derive(Serialize, Deserialize)]
struct MetadataIndexFile {
offset_from: BlockOffset,
offset_to: BlockOffset,
file_name: String,
}
#[derive(PartialEq, Eq, PartialOrd, Ord)]
struct StoredIndexKey {
operation_id: OperationId,
}
impl Encodable for StoredIndexKey {
fn encoded_size(&self) -> Option<usize> {
Some(8) }
fn encode<W: Write>(&self, write: &mut W) -> Result<(), std::io::Error> {
write.write_u64::<LittleEndian>(self.operation_id)
}
fn decode<R: Read>(data: &mut R, _size: usize) -> Result<StoredIndexKey, std::io::Error> {
let operation_id = data.read_u64::<LittleEndian>()?;
Ok(StoredIndexKey { operation_id })
}
}
struct StoredIndexValue {
offset: BlockOffset,
}
impl Encodable for StoredIndexValue {
fn encoded_size(&self) -> Option<usize> {
Some(8) }
fn encode<W: Write>(&self, write: &mut W) -> Result<(), std::io::Error> {
write.write_u64::<LittleEndian>(self.offset)
}
fn decode<R: Read>(data: &mut R, _size: usize) -> Result<StoredIndexValue, std::io::Error> {
let offset = data.read_u64::<LittleEndian>()?;
Ok(StoredIndexValue { offset })
}
}
#[cfg(test)]
mod tests {
use exocore_core::cell::{FullCell, LocalNode};
use super::*;
use crate::chain::directory::tests::create_block;
#[test]
fn create_from_iterator() -> anyhow::Result<()> {
let local_node = LocalNode::generate();
let cell = FullCell::generate(local_node)?;
let dir = tempfile::tempdir()?;
let config = DirectoryChainStoreConfig {
operation_index_max_memory_items: 100,
..DirectoryChainStoreConfig::default()
};
let mut index = OperationIndex::create(config, dir.path())?;
let generated_ops = generate_index_blocks(&cell, &mut index, 0, 1000)?;
assert_eq!(19, index.stored_indices.len());
for (op, offset) in &generated_ops {
assert_eq!(Some(*offset), index.get_operation_block(*op)?);
}
assert_eq!(None, index.get_operation_block(435_874_985)?);
Ok(())
}
#[test]
fn open_existing() -> anyhow::Result<()> {
let local_node = LocalNode::generate();
let cell = FullCell::generate(local_node)?;
let dir = tempfile::tempdir()?;
let config = DirectoryChainStoreConfig {
operation_index_max_memory_items: 100,
..DirectoryChainStoreConfig::default()
};
let (memory_offset_from, generated_ops) = {
let mut index = OperationIndex::create(config, dir.path())?;
let generated_ops = generate_index_blocks(&cell, &mut index, 0, 1000)?;
(index.memory_offset_from, generated_ops)
};
let mut index = OperationIndex::open(config, dir.path())?;
assert_eq!(memory_offset_from, index.memory_offset_from);
assert_eq!(memory_offset_from, index.next_expected_block_offset());
assert_eq!(19, index.stored_indices.len());
for (op, offset) in &generated_ops {
if *offset < memory_offset_from {
assert_eq!(Some(*offset), index.get_operation_block(*op)?);
}
}
let new_ops = generate_index_blocks(&cell, &mut index, memory_offset_from, 200)?;
for (op, offset) in &new_ops {
assert_eq!(Some(*offset), index.get_operation_block(*op)?);
}
assert_eq!(22, index.stored_indices.len());
Ok(())
}
#[test]
fn truncate_from_offset_memory() -> anyhow::Result<()> {
let local_node = LocalNode::generate();
let cell = FullCell::generate(local_node)?;
let dir = tempfile::tempdir()?;
let config = DirectoryChainStoreConfig {
operation_index_max_memory_items: 100,
..DirectoryChainStoreConfig::default()
};
let mut index = OperationIndex::create(config, dir.path())?;
generate_index_blocks(&cell, &mut index, 0, 1000)?;
let files_count_before = index.stored_indices.len();
index.truncate_from_offset(index.memory_offset_from)?;
assert_eq!(index.memory_offset_from, index.next_expected_offset);
assert_eq!(index.stored_indices.len(), files_count_before);
Ok(())
}
#[test]
fn truncate_from_offset_disk() -> anyhow::Result<()> {
let local_node = LocalNode::generate();
let cell = FullCell::generate(local_node)?;
let dir = tempfile::tempdir()?;
let config = DirectoryChainStoreConfig {
operation_index_max_memory_items: 100,
..DirectoryChainStoreConfig::default()
};
let next_expected_offset = {
let mut index = OperationIndex::create(config, dir.path())?;
let generated_ops = generate_index_blocks(&cell, &mut index, 0, 1000)?;
let operation_ids = generated_ops.keys().collect_vec();
let middle_block_offset = generated_ops[operation_ids[operation_ids.len() / 2]];
let files_count_before = index.stored_indices.len();
index.truncate_from_offset(middle_block_offset)?;
assert!(index.next_expected_offset <= middle_block_offset);
assert_eq!(index.memory_offset_from, index.next_expected_offset);
assert!(index.stored_indices.len() <= files_count_before / 2);
index.next_expected_offset
};
{
let index = OperationIndex::open(config, dir.path())?;
assert_eq!(next_expected_offset, index.next_expected_offset);
}
Ok(())
}
fn generate_index_blocks(
full_cell: &FullCell,
index: &mut OperationIndex,
from_offset: BlockOffset,
count: usize,
) -> Result<BTreeMap<OperationId, BlockOffset>, Error> {
let mut generated_ops = BTreeMap::new();
let mut next_offset = from_offset;
let blocks_iter = (0..count).map(|_i| {
let block = create_block(full_cell, next_offset);
generated_ops.insert(next_offset, next_offset);
generated_ops.insert(next_offset + 1, next_offset);
next_offset = block.next_offset();
Ok(block)
});
index.index_blocks(blocks_iter)?;
Ok(generated_ops)
}
}