use crate::{
index::{
storage::{insert_front, Cursor as CursorImpl, ImmutableCursor, IndexEntry, Record},
Cursor as CursorTrait, Ordered, Unordered,
},
translator::Translator,
};
use commonware_runtime::{
telemetry::metrics::{Counter, Gauge, MetricsExt as _},
Metrics,
};
use std::{
collections::{
btree_map::{
Entry as BTreeEntry, OccupiedEntry as BTreeOccupiedEntry,
VacantEntry as BTreeVacantEntry,
},
BTreeMap,
},
ops::Bound::{Excluded, Unbounded},
};
impl<K: Ord + Send + Sync, V: Send + Sync> IndexEntry<V> for BTreeOccupiedEntry<'_, K, Record<V>> {
fn get_mut(&mut self) -> &mut Record<V> {
self.get_mut()
}
fn remove(self) {
self.remove_entry();
}
}
pub type Cursor<'a, K, V> = CursorImpl<'a, V, BTreeOccupiedEntry<'a, K, Record<V>>>;
pub struct Index<T: Translator, V: Send + Sync> {
translator: T,
map: BTreeMap<T::Key, Record<V>>,
keys: Gauge,
items: Gauge,
pruned: Counter,
}
impl<T: Translator, V: Send + Sync> Index<T, V> {
fn create(keys: &Gauge, items: &Gauge, vacant: BTreeVacantEntry<'_, T::Key, Record<V>>, v: V) {
keys.inc();
items.inc();
vacant.insert(Record {
value: v,
next: None,
});
}
pub fn new(ctx: impl Metrics, translator: T) -> Self {
Self {
translator,
map: BTreeMap::new(),
keys: ctx.gauge("keys", "Number of translated keys in the index"),
items: ctx.gauge("items", "Number of items in the index"),
pruned: ctx.counter("pruned", "Number of items pruned"),
}
}
pub(super) fn next_translated_key_no_cycle<'a>(
&'a self,
key: &[u8],
) -> Option<ImmutableCursor<'a, V>> {
let k = self.translator.transform(key);
self.map
.range((Excluded(k), Unbounded))
.next()
.map(|(_, record)| ImmutableCursor::new(record))
}
pub(super) fn prev_translated_key_no_cycle<'a>(
&'a self,
key: &[u8],
) -> Option<ImmutableCursor<'a, V>> {
let k = self.translator.transform(key);
self.map
.range(..k)
.next_back()
.map(|(_, record)| ImmutableCursor::new(record))
}
}
impl<T: Translator, V: Send + Sync> Ordered for Index<T, V> {
type Iterator<'a>
= ImmutableCursor<'a, V>
where
Self: 'a;
fn prev_translated_key<'a>(&'a self, key: &[u8]) -> Option<(Self::Iterator<'a>, bool)>
where
Self::Value: 'a,
{
let res = self.prev_translated_key_no_cycle(key);
if let Some(res) = res {
return Some((res, false));
}
self.last_translated_key().map(|res| (res, true))
}
fn next_translated_key<'a>(&'a self, key: &[u8]) -> Option<(Self::Iterator<'a>, bool)>
where
Self::Value: 'a,
{
let res = self.next_translated_key_no_cycle(key);
if let Some(res) = res {
return Some((res, false));
}
self.first_translated_key().map(|res| (res, true))
}
fn first_translated_key<'a>(&'a self) -> Option<Self::Iterator<'a>>
where
Self::Value: 'a,
{
self.map
.first_key_value()
.map(|(_, record)| ImmutableCursor::new(record))
}
fn last_translated_key<'a>(&'a self) -> Option<Self::Iterator<'a>>
where
Self::Value: 'a,
{
self.map
.last_key_value()
.map(|(_, record)| ImmutableCursor::new(record))
}
}
impl<T: Translator, V: Send + Sync> super::Factory<T> for Index<T, V> {
fn new(ctx: impl commonware_runtime::Metrics, translator: T) -> Self {
Self::new(ctx, translator)
}
}
impl<T: Translator, V: Send + Sync> Unordered for Index<T, V> {
type Value = V;
type Cursor<'a>
= Cursor<'a, T::Key, V>
where
Self: 'a;
fn get<'a>(&'a self, key: &[u8]) -> impl Iterator<Item = &'a V> + 'a
where
V: 'a,
{
let k = self.translator.transform(key);
self.map
.get(&k)
.map(|record| ImmutableCursor::new(record))
.into_iter()
.flatten()
}
fn get_mut<'a>(&'a mut self, key: &[u8]) -> Option<Self::Cursor<'a>> {
let k = self.translator.transform(key);
match self.map.entry(k) {
BTreeEntry::Occupied(entry) => Some(Cursor::<'_, T::Key, V>::new(
entry,
&self.keys,
&self.items,
&self.pruned,
)),
BTreeEntry::Vacant(_) => None,
}
}
fn get_mut_or_insert<'a>(&'a mut self, key: &[u8], value: V) -> Option<Self::Cursor<'a>> {
let k = self.translator.transform(key);
match self.map.entry(k) {
BTreeEntry::Occupied(entry) => Some(Cursor::<'_, T::Key, V>::new(
entry,
&self.keys,
&self.items,
&self.pruned,
)),
BTreeEntry::Vacant(entry) => {
Self::create(&self.keys, &self.items, entry, value);
None
}
}
}
fn insert(&mut self, key: &[u8], value: V) {
let k = self.translator.transform(key);
match self.map.entry(k) {
BTreeEntry::Occupied(mut entry) => {
insert_front(entry.get_mut(), value);
self.items.inc();
}
BTreeEntry::Vacant(entry) => {
Self::create(&self.keys, &self.items, entry, value);
}
}
}
fn insert_and_retain(&mut self, key: &[u8], value: V, should_retain: impl Fn(&V) -> bool) {
let k = self.translator.transform(key);
match self.map.entry(k) {
BTreeEntry::Occupied(entry) => {
let mut cursor =
Cursor::<'_, T::Key, V>::new(entry, &self.keys, &self.items, &self.pruned);
cursor.retain(&should_retain);
if should_retain(&value) {
cursor.insert(value);
}
}
BTreeEntry::Vacant(entry) => {
if should_retain(&value) {
Self::create(&self.keys, &self.items, entry, value);
}
}
}
}
fn remove(&mut self, key: &[u8]) {
let k = self.translator.transform(key);
if let Some(mut record) = self.map.remove(&k) {
self.keys.dec();
self.items.dec();
self.pruned.inc();
while let Some(next) = record.next.take() {
self.items.dec();
self.pruned.inc();
record = *next;
}
}
}
#[cfg(test)]
fn keys(&self) -> usize {
self.map.len()
}
#[cfg(test)]
fn items(&self) -> usize {
self.items.get() as usize
}
#[cfg(test)]
fn pruned(&self) -> usize {
self.pruned.get() as usize
}
}
impl<T: Translator, V: Send + Sync> Drop for Index<T, V> {
fn drop(&mut self) {
for (_, record) in self.map.iter_mut() {
let mut next = record.next.take();
while let Some(mut record) = next {
next = record.next.take();
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::translator::OneCap;
use commonware_formatting::hex;
use commonware_macros::test_traced;
use commonware_runtime::{deterministic, Runner};
#[test_traced]
fn test_ordered_empty_index() {
let runner = deterministic::Runner::default();
runner.start(|context| async move {
let index = Index::<_, u64>::new(context, OneCap);
assert!(index.first_translated_key().is_none());
assert!(index.last_translated_key().is_none());
assert!(index.prev_translated_key(b"key").is_none());
assert!(index.next_translated_key(b"key").is_none());
});
}
#[test_traced]
fn test_ordered_index_ordering() {
let runner = deterministic::Runner::default();
runner.start(|context| async move {
let mut index = Index::<_, u64>::new(context, OneCap);
assert_eq!(index.keys(), 0);
let k1 = &hex!("0x0b02AA"); let k2 = &hex!("0x1c04CC"); let k2_collides = &hex!("0x1c0311");
let k3 = &hex!("0x2d06EE"); index.insert(k1, 1);
index.insert(k2, 21);
index.insert(k2_collides, 22);
index.insert(k3, 3);
assert_eq!(index.keys(), 3);
let mut next = index.first_translated_key().unwrap();
assert_eq!(next.next().unwrap(), &1);
assert_eq!(next.next(), None);
let (mut next, wrapped) = index.next_translated_key(&[0x00]).unwrap();
assert!(!wrapped);
assert_eq!(next.next().unwrap(), &1);
assert_eq!(next.next(), None);
let (mut next, wrapped) = index.next_translated_key(&hex!("0x0b0102")).unwrap();
assert!(!wrapped);
assert_eq!(next.next().unwrap(), &22);
assert_eq!(next.next().unwrap(), &21);
assert_eq!(next.next(), None);
let (mut next, wrapped) = index.next_translated_key(&hex!("0x1b010203")).unwrap();
assert!(!wrapped);
assert_eq!(next.next().unwrap(), &22);
assert_eq!(next.next().unwrap(), &21);
assert_eq!(next.next(), None);
let (mut next, wrapped) = index.next_translated_key(&hex!("0x2a01020304")).unwrap();
assert!(!wrapped);
assert_eq!(next.next().unwrap(), &3);
assert_eq!(next.next(), None);
let (mut next, wrapped) = index.next_translated_key(k3).unwrap();
assert!(wrapped);
assert_eq!(next.next().unwrap(), &1);
assert_eq!(next.next(), None);
let (mut next, wrapped) = index.next_translated_key(&hex!("0x2eFF")).unwrap();
assert!(wrapped);
assert_eq!(next.next().unwrap(), &1);
assert_eq!(next.next(), None);
let (mut prev, wrapped) = index.prev_translated_key(k1).unwrap();
assert!(wrapped);
assert_eq!(prev.next().unwrap(), &3);
assert_eq!(prev.next(), None);
let (mut prev, wrapped) = index.prev_translated_key(&hex!("0x0c0102")).unwrap();
assert!(!wrapped);
assert_eq!(prev.next().unwrap(), &1);
assert_eq!(prev.next(), None);
let (mut prev, wrapped) = index.prev_translated_key(&hex!("0x1d0102")).unwrap();
assert!(!wrapped);
assert_eq!(prev.next().unwrap(), &22);
assert_eq!(prev.next().unwrap(), &21);
assert_eq!(prev.next(), None);
let (mut prev, wrapped) = index.prev_translated_key(&hex!("0xCC0102")).unwrap();
assert!(!wrapped);
assert_eq!(prev.next().unwrap(), &3);
assert_eq!(prev.next(), None);
let mut last = index.last_translated_key().unwrap();
assert_eq!(last.next().unwrap(), &3);
assert_eq!(last.next(), None);
});
}
}