use super::{store, Action, RawAction};
use crate::{
fields::{
depth::Incremental, Collection, Intent, Key, Load, LocalField, SparseField, Store,
Strategy, Value,
},
index::{FieldWriter, Transaction},
object::{self, serializer::SizedPointer, ObjectError},
};
use scc::HashMap;
use std::{borrow::Borrow, hash::Hash, sync::Arc};
pub struct VersionedMap<K, V>
where
K: Key + 'static,
V: Value + 'static,
{
current: Arc<HashMap<K, Action<V>>>,
base: Arc<HashMap<K, Action<V>>>,
}
impl<K, V> Clone for VersionedMap<K, V>
where
K: Key + 'static,
V: Value + 'static,
{
fn clone(&self) -> Self {
VersionedMap {
base: self.base.clone(),
current: self.current.clone(),
}
}
}
impl<K, V> Default for VersionedMap<K, V>
where
K: Key + 'static,
V: Value + 'static,
{
fn default() -> Self {
VersionedMap {
base: Arc::default(),
current: Arc::default(),
}
}
}
impl<K, V> VersionedMap<K, V>
where
K: Key + Clone,
V: Value,
{
#[inline(always)]
pub fn insert(&self, key: K, value: impl Into<Arc<V>>) -> Arc<V> {
self.insert_with(key, move || value)
}
#[inline(always)]
pub fn insert_with<T: Into<Arc<V>>, F: FnOnce() -> T>(&self, key: K, new: F) -> Arc<V> {
match self.get(&key) {
Some(v) => v,
None => self
.current
.entry(key)
.or_default()
.get_mut()
.get_or_insert_with(|| new().into())
.clone(),
}
}
#[inline(always)]
pub fn update_with<T: Into<Arc<V>>>(
&self,
key: K,
update: impl FnOnce(Arc<V>) -> T,
) -> Action<V> {
match self.get(&key) {
Some(existing) => {
let mut entry = self.current.entry(key).or_default();
let current = entry.get_mut();
*current = Some(update(existing.clone()).into());
current.clone()
}
None => None,
}
}
#[inline(always)]
pub fn get<Q>(&self, key: &Q) -> Option<Arc<V>>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.current
.read(key, |_, v| v.clone())
.or_else(|| self.base.read(key, |_, v| v.clone()))
.flatten()
}
#[inline(always)]
pub fn remove(&self, key: K) {
if self.contains(&key) {
self.current.entry(key).or_default().insert(None);
}
}
#[inline(always)]
pub fn contains(&self, key: &K) -> bool {
let contained = self
.current
.read(key, |_, v| v.is_some())
.or_else(|| self.base.read(key, |_, v| v.is_some()));
contained.unwrap_or(false)
}
#[inline(always)]
pub fn for_each(&self, mut callback: impl FnMut(&K, &V)) {
let mut current = self.base.first_entry();
while let Some(entry) = current {
let Some(v) = entry.get() else {
current = entry.next();
continue;
};
if !self.current.contains(entry.key()) {
(callback)(entry.key(), Arc::as_ref(v));
}
current = entry.next();
}
current = self.current.first_entry();
while let Some(entry) = current {
if let Some(value) = entry.get() {
(callback)(entry.key(), Arc::as_ref(value));
}
current = entry.next();
}
}
#[inline(always)]
pub fn retain(&self, mut callback: impl FnMut(&K, &V) -> bool) {
let mut current = self.base.first_entry();
while let Some(entry) = current {
let Some(v) = entry.get() else {
current = entry.next();
continue;
};
let key = entry.key();
let retain = if let Some(new_v) = self.current.get(key).and_then(|e| e.get().clone()) {
(callback)(key, Arc::as_ref(&new_v))
} else {
(callback)(key, Arc::as_ref(v))
};
if !retain {
*self.current.entry(key.clone()).or_default().get_mut() = None;
}
current = entry.next();
}
current = self.current.first_entry();
while let Some(mut entry) = current {
if self.base.contains(entry.key()) || entry.get().is_none() {
current = entry.next();
continue;
}
if !(callback)(
entry.key(),
Arc::as_ref(entry.get().as_ref().expect("checked above")),
) {
*entry.get_mut() = None;
}
current = entry.next();
}
}
pub fn commit(&self) {
self.current.retain(|k, v| {
if self.base.remove_if(k, |_v_base| v.is_none()).is_none() {
*self.base.entry(k.clone()).or_default().get_mut() = v.clone();
}
false
});
}
#[inline(always)]
pub fn len(&self) -> usize {
let mut stored = self.base.len();
let mut current = self.current.first_entry();
while let Some(e) = current {
match e.get() {
Some(_) => stored += 1,
None => stored -= 1,
}
current = e.next();
}
stored
}
#[inline(always)]
pub fn size(&self) -> usize {
self.base.len() + self.current.len()
}
#[inline(always)]
pub fn capacity(&self) -> usize {
self.base.capacity() + self.current.capacity()
}
#[inline(always)]
pub fn clear(&self) {
self.base.clear();
self.current.clear();
}
#[inline(always)]
pub fn rollback(&self) {
self.current.clear();
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl<K, V> Collection for VersionedMap<K, V>
where
K: Key,
V: Value,
{
type Depth = Incremental;
type Key = K;
type Serialized = (K, Action<V>);
type Item = (K, Action<V>);
#[inline(always)]
fn key(from: &Self::Serialized) -> &Self::Key {
&from.0
}
#[inline(always)]
fn load(from: Self::Serialized, _object: &mut dyn crate::object::Reader) -> Self::Item {
from
}
#[inline(always)]
fn insert(&mut self, record: Self::Item) {
let _ = self.base.insert(record.0, record.1);
}
}
impl<K, V> Store for VersionedMap<K, V>
where
K: Key + Clone,
V: Value,
{
#[inline(always)]
fn store(&mut self, mut transaction: &mut dyn Transaction, _object: &mut dyn object::Writer) {
let mut current = self.current.first_entry();
while let Some(e) = current {
transaction.write_next((e.key(), e.get()));
current = e.next();
}
self.commit();
}
}
impl<K, V> Collection for SparseField<VersionedMap<K, V>>
where
K: Key,
V: Value,
{
type Depth = Incremental;
type Key = K;
type Serialized = (K, RawAction<SizedPointer>);
type Item = (K, Action<V>);
#[inline(always)]
fn key(from: &Self::Serialized) -> &Self::Key {
&from.0
}
#[inline(always)]
fn load(from: Self::Serialized, object: &mut dyn object::Reader) -> Self::Item {
let value = match from.1 {
Some(ptr) => {
let value: V = object::serializer::read(
object,
|x| {
crate::deserialize_from_slice(x).map_err(|e| ObjectError::Deserialize {
source: Box::new(e),
})
},
ptr,
)
.unwrap();
store(value)
}
None => None,
};
(from.0, value)
}
#[inline(always)]
fn insert(&mut self, record: Self::Item) {
let _ = self.field.base.insert(record.0, record.1);
}
}
impl<K, V> Store for SparseField<VersionedMap<K, V>>
where
K: Key + Clone,
V: Value,
{
#[inline(always)]
fn store(&mut self, mut transaction: &mut dyn Transaction, writer: &mut dyn object::Writer) {
let mut current = self.field.current.first_entry();
while let Some(entry) = current {
let key = entry.key();
let value = entry.get();
let ptr = value.as_ref().map(|stored| {
object::serializer::write(
writer,
|x| {
crate::serialize_to_vec(&x).map_err(|e| ObjectError::Serialize {
source: Box::new(e),
})
},
stored,
)
.unwrap()
});
transaction.write_next((key, ptr));
current = entry.next();
}
self.field.commit();
}
}
impl<K, V> crate::Index for VersionedMap<K, V>
where
K: Key + Clone,
V: Value,
{
fn store_all(&self) -> anyhow::Result<Vec<Intent<Box<dyn Store>>>> {
Ok(vec![Intent::new(
"root",
Box::new(LocalField::for_field(self)),
)])
}
fn load_all(&self) -> anyhow::Result<Vec<Intent<Box<dyn Load>>>> {
Ok(vec![Intent::new(
"root",
Box::new(LocalField::for_field(self)),
)])
}
}
#[cfg(test)]
mod test {
use super::VersionedMap;
use crate::{
crypto::UsernamePassword,
fields::{LocalField, SparseField, Strategy},
index::test::store_then_load,
Infinitree,
};
#[test]
fn bare_index_can_be_restored() {
let key = || {
UsernamePassword::with_credentials("bare_index_map".to_string(), "password".to_string())
.unwrap()
};
let storage = crate::backends::test::InMemoryBackend::shared();
{
let tree =
Infinitree::<VersionedMap<usize, usize>>::empty(storage.clone(), key()).unwrap();
tree.index().insert(1000, 1000);
tree.commit(None).unwrap();
tree.index().clear();
for i in 0..100 {
tree.index().insert(i, i + 1);
}
tree.commit(None).unwrap();
}
let tree = Infinitree::<VersionedMap<usize, usize>>::open(storage, key()).unwrap();
tree.load_all().unwrap();
for i in 0..100 {
assert_eq!(i + 1, *tree.index().get(&i).unwrap());
}
assert_eq!(tree.index().len(), 101);
}
#[test]
fn duplicate_insert_is_noop() {
let m = VersionedMap::<usize, String>::default();
assert_eq!(m.insert(1, "first".to_owned()), "first".to_owned().into());
assert_eq!(m.insert(1, "second".to_owned()), "first".to_owned().into());
}
#[test]
fn updating_empty_is_noop() {
let m = VersionedMap::<usize, String>::default();
assert_eq!(m.update_with(1, |_| "first".to_owned()), None);
}
#[test]
fn store_then_confirm_then_remove() {
let m = VersionedMap::<usize, String>::default();
let first = "first".to_owned();
let updated = "updated".to_owned();
let second = "second".to_owned();
assert_eq!(m.insert_with(1, || first.clone()), first.clone().into());
assert_eq!(m.insert_with(2, || second.clone()), second.clone().into());
assert_eq!(m.get(&1), Some(first.into()));
assert!(m.contains(&1));
assert!(m.contains(&2));
assert_eq!(
m.update_with(1, |_| updated.clone()),
Some(updated.clone().into())
);
assert_eq!(m.get(&1), Some(updated.into()));
m.remove(1);
assert_eq!(m.get(&1), None);
assert_eq!(m.get(&2), Some(second.into()));
}
#[test]
fn commit_then_confirm_then_lengths() {
let value = "first".to_owned();
let m = VersionedMap::<usize, String>::default();
assert!(m.is_empty());
let _ = m.insert(1, value.clone());
assert_eq!(m.len(), 1);
assert_eq!(m.size(), 1);
assert!(!m.is_empty());
m.commit();
assert_eq!(m.get(&1), Some(value.into()));
assert!(m.contains(&1));
m.remove(1);
assert_eq!(m.get(&1), None);
assert!(!m.contains(&1));
assert_eq!(m.len(), 0);
assert_eq!(m.size(), 2);
assert!(m.is_empty());
m.clear();
assert_eq!(m.len(), 0);
assert_eq!(m.size(), 0);
assert!(m.is_empty());
}
type TestMap = VersionedMap<usize, String>;
fn init_map(store: &TestMap) {
store.insert(1, "one".to_owned());
store.insert(2, "two".to_owned());
}
crate::len_check_test!(TestMap, LocalField, init_map, |m: TestMap| m.len());
crate::len_check_test!(TestMap, SparseField, init_map, |m: TestMap| m.len());
}