use anyhow::{ensure, format_err};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use serde::{Deserialize, Serialize};
use std::{borrow::Cow, convert::TryFrom, io::Error, mem, num::NonZeroU64, vec};
use crate::{
access::{AccessError, AccessErrorKind},
validation::check_index_valid_full_name,
views::{IndexAddress, RawAccess, RawAccessMut, ResolvedAddress, View},
BinaryKey, BinaryValue,
};
const INDEXES_POOL_NAME: &str = "__INDEXES_POOL__";
#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize)]
#[repr(u32)]
pub enum IndexType {
Map = 1,
List = 2,
Entry = 3,
KeySet = 5,
SparseList = 6,
Tombstone = 254,
#[doc(hidden)]
Unknown = 255,
}
impl TryFrom<u32> for IndexType {
type Error = &'static str;
fn try_from(value: u32) -> Result<Self, Self::Error> {
let value = match value {
1 => Self::Map,
2 => Self::List,
3 => Self::Entry,
5 => Self::KeySet,
6 => Self::SparseList,
254 => Self::Tombstone,
255 => Self::Unknown,
_ => return Err("Unknown index type"),
};
Ok(value)
}
}
const INDEX_STATE_TAG: u32 = 0;
pub trait BinaryAttribute: Sized {
fn size(&self) -> usize;
fn write(&self, buffer: &mut Vec<u8>);
fn read(buffer: &[u8]) -> Result<Self, Error>;
}
impl BinaryAttribute for () {
fn size(&self) -> usize {
0
}
fn write(&self, _buffer: &mut Vec<u8>) {}
fn read(_buffer: &[u8]) -> Result<Self, Error> {
Ok(())
}
}
#[allow(clippy::use_self)] impl BinaryAttribute for u64 {
fn size(&self) -> usize {
mem::size_of_val(self)
}
fn write(&self, buffer: &mut Vec<u8>) {
buffer.write_u64::<LittleEndian>(*self).unwrap();
}
fn read(mut buffer: &[u8]) -> Result<Self, Error> {
buffer.read_u64::<LittleEndian>()
}
}
impl BinaryAttribute for Vec<u8> {
fn size(&self) -> usize {
self.len()
}
fn write(&self, buffer: &mut Vec<u8>) {
buffer.extend_from_slice(self);
}
fn read(buffer: &[u8]) -> Result<Self, Error> {
Ok(buffer.to_vec())
}
}
impl Default for IndexType {
fn default() -> Self {
Self::Unknown
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct IndexMetadata<V = Vec<u8>> {
identifier: NonZeroU64,
index_type: IndexType,
state: Option<V>,
}
impl<V> BinaryValue for IndexMetadata<V>
where
V: BinaryAttribute,
{
fn to_bytes(&self) -> Vec<u8> {
let mut capacity = mem::size_of_val(&self.identifier) + mem::size_of_val(&self.index_type);
if let Some(ref state) = self.state {
capacity += mem::size_of_val(&INDEX_STATE_TAG) + mem::size_of::<u32>() + state.size();
}
let mut buf = Vec::with_capacity(capacity);
buf.write_u64::<LittleEndian>(self.identifier.get())
.unwrap();
buf.write_u32::<LittleEndian>(self.index_type as u32)
.unwrap();
if let Some(ref state) = self.state {
buf.write_u32::<LittleEndian>(INDEX_STATE_TAG).unwrap();
buf.write_u32::<LittleEndian>(state.size() as u32).unwrap();
state.write(&mut buf);
}
buf
}
fn from_bytes(bytes: Cow<'_, [u8]>) -> anyhow::Result<Self> {
let mut bytes = bytes.as_ref();
let identifier = NonZeroU64::new(bytes.read_u64::<LittleEndian>()?)
.ok_or_else(|| format_err!("IndexMetadata identifier is 0"))?;
let index_type = bytes.read_u32::<LittleEndian>()?;
let index_type = IndexType::try_from(index_type)
.map_err(|_| format_err!("Unknown index type: {}", index_type))?;
if bytes.is_empty() {
return Ok(Self {
identifier,
index_type,
state: None,
});
}
let state_tag = bytes.read_u32::<LittleEndian>()?;
let state_len = bytes.read_u32::<LittleEndian>()? as usize;
ensure!(
state_tag == INDEX_STATE_TAG,
"Attribute with unknown tag: {}",
state_tag
);
ensure!(bytes.len() >= state_len, "Index state is too short");
let state_bytes = &bytes[0..state_len];
Ok(Self {
identifier,
index_type,
state: Some(V::read(state_bytes)?),
})
}
}
impl<V> IndexMetadata<V> {
pub fn index_type(&self) -> IndexType {
self.index_type
}
#[allow(clippy::doc_markdown)] pub fn identifier(&self) -> NonZeroU64 {
self.identifier
}
}
#[allow(clippy::use_self)] impl IndexMetadata {
fn convert<V: BinaryAttribute>(self) -> IndexMetadata<V> {
let index_type = self.index_type;
IndexMetadata {
identifier: self.identifier,
index_type,
state: self.state.map(|state| {
V::read(&state).unwrap_or_else(|e| {
panic!(
"Error while reading state for index with type {:?}: {}. \
This can be caused by database corruption",
index_type, e
);
})
}),
}
}
}
#[derive(Debug)]
pub struct IndexState<T, V> {
metadata: IndexMetadata<V>,
index_access: Option<T>,
index_full_name: Vec<u8>,
}
impl<T, V> IndexState<T, V>
where
T: RawAccess,
V: BinaryAttribute + Copy,
{
pub fn get(&self) -> Option<V> {
self.metadata.state
}
}
impl<T, V> IndexState<T, V>
where
T: RawAccessMut,
V: BinaryAttribute,
{
fn update_metadata_view(&self) {
if let Some(access) = self.index_access.clone() {
View::new(access, ResolvedAddress::system(INDEXES_POOL_NAME))
.put(&self.index_full_name, self.metadata.to_bytes());
}
}
pub fn set(&mut self, state: V) {
self.metadata.state = Some(state);
self.update_metadata_view();
}
pub fn unset(&mut self) {
self.metadata.state = None;
self.update_metadata_view();
}
}
pub struct IndexesPool<T: RawAccess>(View<T>);
impl<T: RawAccess> IndexesPool<T> {
pub(crate) fn new(index_access: T) -> Self {
let view = View::new(index_access, ResolvedAddress::system(INDEXES_POOL_NAME));
Self(view)
}
pub(super) fn len(&self) -> u64 {
self.0.get(&()).unwrap_or_default()
}
fn index_metadata(&self, index_name: &[u8]) -> Option<IndexMetadata> {
self.0.get(index_name)
}
fn set_len(&mut self, len: u64) {
self.0.put_or_forget(&(), len);
}
fn create_index_metadata<V>(
&mut self,
index_name: &[u8],
index_type: IndexType,
) -> (IndexMetadata<V>, bool)
where
V: BinaryAttribute,
{
let len = self.len();
let metadata = IndexMetadata {
identifier: NonZeroU64::new(len + 1).unwrap(),
index_type,
state: None,
};
let is_phantom = !self.0.put_or_forget(index_name, metadata.to_bytes());
self.set_len(len + 1);
(metadata, is_phantom)
}
}
impl<T: RawAccessMut> IndexesPool<T> {
pub(crate) fn flush_migration(&mut self, prefix: &str) -> Vec<ResolvedAddress> {
let prefix = IndexAddress::qualify_migration_namespace(prefix);
let min_name_len = prefix.len() - 1;
let moved_indexes: Vec<_> = self.0.iter::<_, Vec<u8>, IndexMetadata>(&prefix).collect();
let mut removed_addrs = Vec::new();
for (key, metadata) in moved_indexes {
let migrated_key = IndexAddress::migrate_qualified_name(&key);
debug_assert!({
let migrated_prefix = IndexAddress::migrate_qualified_name(&prefix);
migrated_key.starts_with(migrated_prefix)
});
if let Some(old_metadata) = self.0.get::<_, IndexMetadata>(migrated_key) {
let (name, _is_in_group) =
IndexAddress::parse_fully_qualified_name(migrated_key, min_name_len);
let resolved = ResolvedAddress::new(name, Some(old_metadata.identifier));
removed_addrs.push(resolved);
}
if metadata.index_type == IndexType::Tombstone {
self.0.remove(migrated_key);
} else {
self.0.put(migrated_key, metadata);
}
self.0.remove(&key);
}
removed_addrs
}
pub(crate) fn rollback_migration(&mut self, prefix: &str) -> Vec<ResolvedAddress> {
let prefix = IndexAddress::qualify_migration_namespace(prefix);
self.remove_by_prefix(&prefix, |key| {
IndexAddress::parse_fully_qualified_name(key, prefix.len()).0
})
}
pub(crate) fn remove_indexes(&mut self, prefix: &IndexAddress) -> Vec<ResolvedAddress> {
let name = prefix.name();
let prefix = prefix.fully_qualified_name();
self.remove_by_prefix(&prefix, |_| name.to_owned())
}
fn remove_by_prefix(
&mut self,
prefix: &[u8],
extract_name: impl Fn(&[u8]) -> String,
) -> Vec<ResolvedAddress> {
let (removed_names, removed_addrs): (Vec<_>, Vec<_>) = self
.0
.iter::<_, Vec<u8>, IndexMetadata>(prefix)
.map(|(key, metadata)| {
let resolved = ResolvedAddress::new(extract_name(&key), Some(metadata.identifier));
(key, resolved)
})
.unzip();
for full_name in &removed_names {
self.0.remove(full_name);
}
removed_addrs
}
}
#[derive(Debug)]
pub struct GroupKeys<T: RawAccess, K: BinaryKey + ?Sized> {
access: T,
key_prefix: Vec<u8>,
next_key: Option<Vec<u8>>,
buffered_keys: vec::IntoIter<K::Owned>,
buffer_size: usize,
}
impl<T, K> GroupKeys<T, K>
where
T: RawAccess,
K: BinaryKey + ?Sized,
{
pub fn new(access: T, addr: &IndexAddress) -> Self {
const DEFAULT_BUFFER_SIZE: usize = 1_000;
Self::with_custom_buffer(access, addr, DEFAULT_BUFFER_SIZE)
}
fn with_custom_buffer(access: T, addr: &IndexAddress, buffer_size: usize) -> Self {
assert!(buffer_size > 0);
let key_prefix = addr.qualified_prefix();
let mut this = Self {
access,
key_prefix: key_prefix.clone(),
next_key: None,
buffered_keys: Vec::new().into_iter(),
buffer_size,
};
this.buffer_keys(&key_prefix);
this
}
fn buffer_keys(&mut self, start_key: &[u8]) {
let indexes_pool = IndexesPool::new(self.access.clone());
let mut buffer = Vec::with_capacity(self.buffer_size);
let mut iter = indexes_pool.0.iter_bytes(start_key);
while let Some((key, _)) = iter.next() {
if !key.starts_with(&self.key_prefix) {
break;
} else if buffer.len() == self.buffer_size {
self.next_key = Some(key.to_owned());
break;
}
buffer.push(K::read(&key[self.key_prefix.len()..]));
}
debug_assert!(buffer.len() <= self.buffer_size);
self.buffered_keys = buffer.into_iter();
}
}
impl<T, K> Iterator for GroupKeys<T, K>
where
T: RawAccess,
K: BinaryKey + ?Sized,
{
type Item = K::Owned;
fn next(&mut self) -> Option<Self::Item> {
self.buffered_keys.next().or_else(|| {
self.next_key.take().and_then(|next_key| {
self.buffer_keys(&next_key);
self.buffered_keys.next()
})
})
}
}
#[derive(Debug)]
pub struct ViewWithMetadata<T: RawAccess> {
view: View<T>,
metadata: IndexMetadata,
index_full_name: Vec<u8>,
is_phantom: bool,
}
impl<T> ViewWithMetadata<T>
where
T: RawAccess,
{
pub(crate) fn get_or_create(
index_access: T,
index_address: &IndexAddress,
index_type: IndexType,
) -> Result<Self, AccessError> {
check_index_valid_full_name(&index_address.name).map_err(|kind| AccessError {
addr: index_address.clone(),
kind,
})?;
Self::get_or_create_unchecked(index_access, index_address, index_type)
}
pub(crate) fn get_metadata(
index_access: T,
index_address: &IndexAddress,
) -> Result<Option<IndexMetadata>, AccessError> {
check_index_valid_full_name(index_address.name()).map_err(|kind| AccessError {
addr: index_address.clone(),
kind,
})?;
Ok(Self::get_metadata_unchecked(index_access, index_address))
}
pub(crate) fn get_metadata_unchecked(
index_access: T,
index_address: &IndexAddress,
) -> Option<IndexMetadata> {
let index_full_name = index_address.fully_qualified_name();
let pool = IndexesPool::new(index_access);
pool.index_metadata(&index_full_name)
}
pub(crate) fn get_or_create_unchecked(
index_access: T,
index_address: &IndexAddress,
index_type: IndexType,
) -> Result<Self, AccessError> {
if index_type == IndexType::Tombstone && !index_address.in_migration {
return Err(AccessError {
kind: AccessErrorKind::InvalidTombstone,
addr: index_address.clone(),
});
}
let index_name = index_address.name().to_owned();
let index_full_name = index_address.fully_qualified_name();
let mut pool = IndexesPool::new(index_access.clone());
let (metadata, is_phantom) = pool.index_metadata(&index_full_name).map_or_else(
|| pool.create_index_metadata(&index_full_name, index_type),
|metadata| (metadata, false),
);
let real_index_type = metadata.index_type;
let addr = ResolvedAddress::new(index_name, Some(metadata.identifier));
let view = if is_phantom {
View::new_phantom()
} else {
View::new(index_access, addr)
};
let this = Self {
view,
metadata,
index_full_name,
is_phantom,
};
if real_index_type == index_type {
Ok(this)
} else {
Err(AccessError {
addr: index_address.clone(),
kind: AccessErrorKind::WrongIndexType {
expected: index_type,
actual: real_index_type,
},
})
}
}
pub fn index_type(&self) -> IndexType {
self.metadata.index_type
}
pub fn is_phantom(&self) -> bool {
self.is_phantom
}
pub(crate) fn into_parts<V>(self) -> (View<T>, IndexState<T, V>)
where
V: BinaryAttribute,
{
let state = IndexState {
metadata: self.metadata.convert(),
index_access: self.view.access().cloned(),
index_full_name: self.index_full_name,
};
(self.view, state)
}
}
impl<T: RawAccess> From<ViewWithMetadata<T>> for View<T> {
fn from(view_with_metadata: ViewWithMetadata<T>) -> Self {
view_with_metadata.view
}
}
#[cfg(test)]
mod tests {
use super::{
vec, BinaryKey, BinaryValue, GroupKeys, IndexAddress, IndexMetadata, IndexType,
IndexesPool, NonZeroU64,
};
use crate::{access::CopyAccessExt, Database, TemporaryDB};
use std::collections::{BTreeSet, HashMap};
use rand::{seq::SliceRandom, thread_rng, Rng};
#[test]
fn test_index_metadata_binary_value() {
let metadata = IndexMetadata {
identifier: NonZeroU64::new(12).unwrap(),
index_type: IndexType::List,
state: Some(16_u64),
};
let bytes = metadata.to_bytes();
assert_eq!(IndexMetadata::from_bytes(bytes.into()).unwrap(), metadata);
let metadata = IndexMetadata {
identifier: NonZeroU64::new(12).unwrap(),
index_type: IndexType::List,
state: None::<u64>,
};
let bytes = metadata.to_bytes();
assert_eq!(IndexMetadata::from_bytes(bytes.into()).unwrap(), metadata);
}
#[test]
#[should_panic(expected = "Attribute with unknown tag")]
fn test_index_metadata_unknown_tag() {
let metadata = IndexMetadata {
identifier: NonZeroU64::new(12).unwrap(),
index_type: IndexType::List,
state: Some(16_u64),
};
let mut bytes = metadata.to_bytes();
bytes[13] = 1; assert_eq!(IndexMetadata::from_bytes(bytes.into()).unwrap(), metadata);
}
#[test]
fn index_type_does_not_create_indexes() {
let db = TemporaryDB::new();
let fork = db.fork();
let index_count = IndexesPool::new(&fork).len();
assert!(fork.index_type("foo").is_none());
let pool = IndexesPool::new(&fork);
assert!(pool.index_metadata(b"foo").is_none());
assert_eq!(pool.len(), index_count);
}
#[test]
fn group_keys_edge_cases() {
let db = TemporaryDB::new();
let fork = db.fork();
fork.get_entry("before").set(0_u32);
fork.get_entry("unrelated").set(0_u32);
let addr: IndexAddress = "test".into();
let mut keys: GroupKeys<_, u8> = GroupKeys::with_custom_buffer(&fork, &addr, 2);
assert_eq!(keys.key_prefix, b"test\0");
assert_eq!(keys.next_key, None);
assert!(keys.next().is_none());
fork.get_entry(("test", &0_u8)).set(0_u32);
let keys: GroupKeys<_, u8> = GroupKeys::with_custom_buffer(&fork, &addr, 2);
assert_eq!(keys.next_key, None);
assert_eq!(keys.collect::<Vec<_>>(), vec![0]);
fork.get_entry(("test", &1_u8)).set(0_u32);
let keys: GroupKeys<_, u8> = GroupKeys::with_custom_buffer(&fork, &addr, 2);
assert_eq!(keys.next_key, None);
assert_eq!(keys.collect::<Vec<_>>(), vec![0, 1]);
fork.get_entry(("test", &2_u8)).set(0_u32);
let keys: GroupKeys<_, u8> = GroupKeys::with_custom_buffer(&fork, &addr, 2);
assert_eq!(keys.next_key, Some(b"test\0\x02".to_vec()));
assert_eq!(keys.collect::<Vec<_>>(), vec![0, 1, 2]);
fork.get_entry(("test", &3_u8)).set(0_u32);
let keys: GroupKeys<_, u8> = GroupKeys::with_custom_buffer(&fork, &addr, 2);
assert_eq!(keys.next_key, Some(b"test\0\x02".to_vec()));
assert_eq!(keys.collect::<Vec<_>>(), vec![0, 1, 2, 3]);
fork.get_entry(("test", &4_u8)).set(0_u32);
let keys: GroupKeys<_, u8> = GroupKeys::with_custom_buffer(&fork, &addr, 2);
assert_eq!(keys.next_key, Some(b"test\0\x02".to_vec()));
assert_eq!(keys.collect::<Vec<_>>(), vec![0, 1, 2, 3, 4]);
}
#[test]
fn keys_within_a_group_with_prefix() {
let db = TemporaryDB::new();
let fork = db.fork();
let addr: IndexAddress = ("test", &0_u32).into();
let mut keys: GroupKeys<_, u32> = GroupKeys::with_custom_buffer(&fork, &addr, 2);
assert_eq!(keys.key_prefix, b"test\0\0\0\0\0");
assert_eq!(keys.next_key, None);
assert!(keys.next().is_none());
fork.get_entry(("test", &concat_keys!(&0_u32, &5_u32)))
.set("!".to_owned());
let keys: GroupKeys<_, u32> = GroupKeys::with_custom_buffer(&fork, &addr, 2);
assert_eq!(keys.next_key, None);
assert_eq!(keys.collect::<Vec<_>>(), vec![5]);
}
#[test]
fn group_keys_mini_fuzz() {
const GROUPS: &[&str] = &["bar", "foo", "test"];
let db = TemporaryDB::new();
let fork = db.fork();
fork.get_entry("ba").set(0_u8);
fork.get_entry(("ba", "r")).set(0_u8);
fork.get_entry("bar_").set(0_u8);
fork.get_entry("fo").set(0_u8);
fork.get_entry(("fo", "oo")).set(0_u8);
fork.get_entry("foo1").set(0_u8);
fork.get_entry("test").set(0_u8);
fork.get_entry(("te", "st")).set(0_u8);
fork.get_entry("test_test").set(0_u8);
let mut rng = thread_rng();
let mut groups: HashMap<&'static str, BTreeSet<_>> = HashMap::new();
for _ in 0..1_000 {
let group = *GROUPS.choose(&mut rng).unwrap();
let prefix: u32 = rng.gen();
groups.entry(group).or_default().insert(prefix);
fork.get_entry((group, &prefix)).set(0_u8);
}
for &group in GROUPS {
let actual_keys: Vec<_> =
GroupKeys::<_, u32>::with_custom_buffer(&fork, &group.into(), 10).collect();
let expected_keys: Vec<_> = groups
.remove(&group)
.unwrap_or_default()
.into_iter()
.collect();
assert_eq!(actual_keys, expected_keys);
}
}
}
#[cfg(test)]
mod prop_tests {
use super::{GroupKeys, IndexAddress, RawAccess};
use crate::{access::CopyAccessExt, Database, TemporaryDB};
use proptest::{
collection::vec,
num, prop_assert_eq, prop_oneof, proptest, sample,
strategy::{self, Strategy},
test_runner::TestCaseResult,
};
use std::collections::{BTreeSet, HashMap};
const ACTIONS_MAX_LEN: usize = 30;
const DEFAULT_BUFFER_SIZE: usize = 1_000;
const SMALL_BUFFER_SIZE: usize = 3;
type GroupKey = (&'static str, Option<u32>);
fn group_address(group: GroupKey) -> IndexAddress {
group
.1
.map_or_else(|| group.0.into(), |prefix| (group.0, &prefix).into())
}
const GROUPS: &[GroupKey] = &[
("fo", None),
("foo", Some(0)),
("foo", Some(1)),
("foo", Some(256)),
("foo", Some(u32::max_value())),
("foo_", None),
("foo1", Some(0)),
];
fn check_groups<T: RawAccess + Copy>(
access: T,
expected_groups: &HashMap<GroupKey, BTreeSet<u32>>,
buffer_size: usize,
) -> TestCaseResult {
for &group in GROUPS {
let group_addr = group_address(group);
let keys: GroupKeys<_, u32> =
GroupKeys::with_custom_buffer(access, &group_addr, buffer_size);
let keys: Vec<_> = keys.collect();
let expected_keys = expected_groups
.get(&group)
.map(|set| set.iter().copied().collect::<Vec<_>>())
.unwrap_or_default();
prop_assert_eq!(keys, expected_keys);
}
Ok(())
}
#[derive(Debug, Clone)]
enum Action {
CreateEntry { group: GroupKey, id_in_group: u32 },
FlushFork,
MergeFork,
}
fn generate_action(keys: impl Strategy<Value = u32>) -> impl Strategy<Value = Action> {
prop_oneof![
4 => (sample::select(GROUPS), keys)
.prop_map(|(group, id_in_group)| Action::CreateEntry {
group,
id_in_group,
}),
1 => strategy::Just(Action::FlushFork),
1 => strategy::Just(Action::MergeFork),
]
}
fn apply_actions(db: &TemporaryDB, buffer_size: usize, actions: Vec<Action>) -> TestCaseResult {
let mut fork = db.fork();
let mut groups: HashMap<GroupKey, BTreeSet<_>> = HashMap::new();
for action in actions {
match action {
Action::CreateEntry { group, id_in_group } => {
let addr = group_address(group).append_key(&id_in_group);
fork.get_entry(addr).set(1_u32);
groups.entry(group).or_default().insert(id_in_group);
}
Action::FlushFork => {
fork.flush();
}
Action::MergeFork => {
let patch = fork.into_patch();
check_groups(&patch, &groups, buffer_size)?;
db.merge(patch).unwrap();
check_groups(&db.snapshot(), &groups, buffer_size)?;
fork = db.fork();
}
}
check_groups(&fork, &groups, buffer_size)?;
}
Ok(())
}
#[test]
fn normal_buffer_and_small_keys() {
let actions_generator = vec(generate_action(0_u32..4), 1..ACTIONS_MAX_LEN);
let db = TemporaryDB::new();
proptest!(|(actions in actions_generator)| {
apply_actions(&db, DEFAULT_BUFFER_SIZE, actions)?;
db.clear().unwrap();
});
}
#[test]
fn small_buffer_and_small_keys() {
let actions_generator = vec(generate_action(0_u32..4), 1..ACTIONS_MAX_LEN);
let db = TemporaryDB::new();
proptest!(|(actions in actions_generator)| {
apply_actions(&db, SMALL_BUFFER_SIZE, actions)?;
db.clear().unwrap();
});
}
#[test]
fn small_buffer_and_any_keys() {
let actions_generator = vec(generate_action(num::u32::ANY), 1..ACTIONS_MAX_LEN);
let db = TemporaryDB::new();
proptest!(|(actions in actions_generator)| {
apply_actions(&db, SMALL_BUFFER_SIZE, actions)?;
db.clear().unwrap();
});
}
}