use crate::ledger::map::{BatchOperation, Map, MapRead};
use console::network::prelude::*;
use indexmap::IndexMap;
use core::{borrow::Borrow, hash::Hash};
use indexmap::map;
use parking_lot::{Mutex, RwLock};
use std::{
borrow::Cow,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
#[derive(Clone)]
pub struct MemoryMap<
K: Copy + Clone + PartialEq + Eq + Hash + Serialize + for<'de> Deserialize<'de> + Send + Sync,
V: Clone + PartialEq + Eq + Serialize + for<'de> Deserialize<'de> + Send + Sync,
> {
map: Arc<RwLock<IndexMap<K, V>>>,
batch_in_progress: Arc<AtomicBool>,
atomic_batch: Arc<Mutex<Vec<BatchOperation<K, V>>>>,
}
impl<
K: Copy + Clone + PartialEq + Eq + Hash + Serialize + for<'de> Deserialize<'de> + Send + Sync,
V: Clone + PartialEq + Eq + Serialize + for<'de> Deserialize<'de> + Send + Sync,
> Default for MemoryMap<K, V>
{
fn default() -> Self {
Self { map: Default::default(), batch_in_progress: Default::default(), atomic_batch: Default::default() }
}
}
impl<
K: Copy + Clone + PartialEq + Eq + Hash + Serialize + for<'de> Deserialize<'de> + Send + Sync,
V: Clone + PartialEq + Eq + Serialize + for<'de> Deserialize<'de> + Send + Sync,
> FromIterator<(K, V)> for MemoryMap<K, V>
{
fn from_iter<I: IntoIterator<Item = (K, V)>>(iter: I) -> Self {
Self {
map: Arc::new(RwLock::new(IndexMap::from_iter(iter))),
batch_in_progress: Default::default(),
atomic_batch: Default::default(),
}
}
}
impl<
'a,
K: 'a + Copy + Clone + PartialEq + Eq + Hash + Serialize + for<'de> Deserialize<'de> + Send + Sync,
V: 'a + Clone + PartialEq + Eq + Serialize + for<'de> Deserialize<'de> + Send + Sync,
> Map<'a, K, V> for MemoryMap<K, V>
{
fn insert(&self, key: K, value: V) -> Result<()> {
let is_batch = self.batch_in_progress.load(Ordering::SeqCst);
match is_batch {
true => self.atomic_batch.lock().push(BatchOperation::Insert(key, value)),
false => {
self.map.write().insert(key, value);
}
}
Ok(())
}
fn remove(&self, key: &K) -> Result<()> {
let is_batch = self.batch_in_progress.load(Ordering::SeqCst);
match is_batch {
true => self.atomic_batch.lock().push(BatchOperation::Remove(*key)),
false => {
self.map.write().remove(key);
}
}
Ok(())
}
fn start_atomic(&self) {
self.batch_in_progress.store(true, Ordering::SeqCst);
assert!(self.atomic_batch.lock().is_empty());
}
fn is_atomic_in_progress(&self) -> bool {
self.batch_in_progress.load(Ordering::SeqCst)
}
fn abort_atomic(&self) {
self.atomic_batch.lock().clear();
self.batch_in_progress.store(false, Ordering::SeqCst);
}
fn finish_atomic(&self) -> Result<()> {
let operations = core::mem::take(&mut *self.atomic_batch.lock());
if !operations.is_empty() {
let mut locked_map = self.map.write();
for operation in operations {
match operation {
BatchOperation::Insert(key, value) => locked_map.insert(key, value),
BatchOperation::Remove(key) => locked_map.remove(&key),
};
}
}
self.batch_in_progress.store(false, Ordering::SeqCst);
Ok(())
}
}
impl<
'a,
K: 'a + Copy + Clone + PartialEq + Eq + Hash + Serialize + for<'de> Deserialize<'de> + Send + Sync,
V: 'a + Clone + PartialEq + Eq + Serialize + for<'de> Deserialize<'de> + Send + Sync,
> MapRead<'a, K, V> for MemoryMap<K, V>
{
type Iterator = core::iter::Map<map::IntoIter<K, V>, fn((K, V)) -> (Cow<'a, K>, Cow<'a, V>)>;
type Keys = core::iter::Map<map::IntoKeys<K, V>, fn(K) -> Cow<'a, K>>;
type Values = core::iter::Map<map::IntoValues<K, V>, fn(V) -> Cow<'a, V>>;
fn contains_key<Q>(&self, key: &Q) -> Result<bool>
where
K: Borrow<Q>,
Q: PartialEq + Eq + Hash + Serialize + ?Sized,
{
Ok(self.map.read().contains_key(key))
}
fn get<Q>(&'a self, key: &Q) -> Result<Option<Cow<'a, V>>>
where
K: Borrow<Q>,
Q: PartialEq + Eq + Hash + Serialize + ?Sized,
{
Ok(self.map.read().get(key).cloned().map(Cow::Owned))
}
fn iter(&'a self) -> Self::Iterator {
self.map.read().clone().into_iter().map(|(k, v)| (Cow::Owned(k), Cow::Owned(v)))
}
fn keys(&'a self) -> Self::Keys {
self.map.read().clone().into_keys().map(Cow::Owned)
}
fn values(&'a self) -> Self::Values {
self.map.read().clone().into_values().map(Cow::Owned)
}
}
impl<
K: Copy + Clone + PartialEq + Eq + Hash + Serialize + for<'de> Deserialize<'de> + Send + Sync,
V: Clone + PartialEq + Eq + Serialize + for<'de> Deserialize<'de> + Send + Sync,
> Deref for MemoryMap<K, V>
{
type Target = Arc<RwLock<IndexMap<K, V>>>;
fn deref(&self) -> &Self::Target {
&self.map
}
}
#[cfg(test)]
mod tests {
use super::*;
use console::{account::Address, network::Testnet3};
type CurrentNetwork = Testnet3;
#[test]
fn test_contains_key() {
let address =
Address::<CurrentNetwork>::from_str("aleo1q6qstg8q8shwqf5m6q5fcenuwsdqsvp4hhsgfnx5chzjm3secyzqt9mxm8")
.unwrap();
let addresses: IndexMap<Address<CurrentNetwork>, ()> = [(address, ())].into_iter().collect();
assert!(addresses.contains_key(&address));
let map: MemoryMap<Address<CurrentNetwork>, ()> = [(address, ())].into_iter().collect();
assert!(map.contains_key(&address).unwrap());
}
#[test]
fn test_atomic_writes_are_batched() {
const NUM_ITEMS: usize = 10;
let map: MemoryMap<usize, String> = Default::default();
assert!(map.iter().next().is_none());
map.start_atomic();
for i in 0..NUM_ITEMS {
map.insert(i, i.to_string()).unwrap();
}
assert!(map.iter().next().is_none());
map.finish_atomic().unwrap();
for i in 0..NUM_ITEMS {
assert_eq!(map.get(&i).unwrap(), Some(Cow::Borrowed(&i.to_string())));
}
map.start_atomic();
for i in 0..NUM_ITEMS {
map.remove(&i).unwrap();
}
assert_eq!(map.iter().count(), NUM_ITEMS);
map.finish_atomic().unwrap();
assert!(map.iter().next().is_none());
}
#[test]
fn test_atomic_writes_can_be_aborted() {
const NUM_ITEMS: usize = 10;
let map: MemoryMap<usize, String> = Default::default();
assert!(map.iter().next().is_none());
map.start_atomic();
for i in 0..NUM_ITEMS {
map.insert(i, i.to_string()).unwrap();
}
assert!(map.iter().next().is_none());
map.abort_atomic();
assert!(map.iter().next().is_none());
map.start_atomic();
for i in 0..NUM_ITEMS {
map.insert(i, i.to_string()).unwrap();
}
map.finish_atomic().unwrap();
assert_eq!(map.iter().count(), NUM_ITEMS);
}
}