#![allow(dead_code)]
use std::sync::Arc;
use crossbeam_skiplist::SkipMap;
use parking_lot::RwLock;
use smallvec::SmallVec;
use super::{SequenceNumber, VersionPointer};
#[derive(Default)]
struct VersionChain {
versions: RwLock<SmallVec<[VersionPointer; 4]>>,
last: RwLock<Option<VersionPointer>>, }
impl VersionChain {
fn append(&self, pointer: VersionPointer) {
let mut guard = self.versions.write();
if let Some(last) = guard.last_mut() {
if last.sequence == pointer.sequence {
*last = pointer;
*self.last.write() = Some(last.clone());
return;
}
if last.sequence < pointer.sequence {
guard.push(pointer);
*self.last.write() = guard.last().cloned();
return;
}
}
guard.push(pointer);
if guard.len() > 1 && guard[guard.len() - 2].sequence > guard[guard.len() - 1].sequence {
guard.sort_by_key(|p| p.sequence);
}
*self.last.write() = guard.last().cloned();
}
fn latest_visible(&self, snapshot: SequenceNumber) -> Option<VersionPointer> {
if let Some(cached) = self.last.read().as_ref() {
if cached.is_visible_at(snapshot) {
return Some(cached.clone());
}
}
let guard = self.versions.read();
guard.iter().rev().find(|p| p.is_visible_at(snapshot)).cloned()
}
fn iter_visible(&self, snapshot: SequenceNumber) -> Vec<VersionPointer> {
let guard = self.versions.read();
guard
.iter()
.filter(|pointer| pointer.is_visible_at(snapshot))
.cloned()
.collect()
}
}
#[derive(Clone, Default)]
pub struct HotIndex {
inner: Arc<SkipMap<Arc<[u8]>, Arc<VersionChain>>>,
}
impl HotIndex {
pub fn new() -> Self {
Self {
inner: Arc::new(SkipMap::new()),
}
}
pub fn len(&self) -> usize {
self.inner.len()
}
pub fn upsert_arc(&self, key: Arc<[u8]>, pointer: VersionPointer) {
if let Some(entry) = self.inner.get(&*key) {
entry.value().append(pointer);
} else {
let chain = Arc::new(VersionChain::default());
chain.append(pointer);
self.inner.insert(key, chain);
}
}
pub fn upsert(&self, key: impl AsRef<[u8]>, pointer: VersionPointer) {
let arc: Arc<[u8]> = Arc::from(key.as_ref().to_vec());
self.upsert_arc(arc, pointer);
}
pub fn upsert_batch<I>(&self, entries: I)
where
I: IntoIterator<Item = (Arc<[u8]>, VersionPointer)>,
{
for (key, pointer) in entries {
self.upsert_arc(key, pointer);
}
}
pub fn get_latest(&self, key: &[u8], snapshot: SequenceNumber) -> Option<VersionPointer> {
self.inner
.get(key)
.and_then(|entry| entry.value().latest_visible(snapshot))
}
pub fn scan_prefix(
&self,
prefix: &[u8],
snapshot: SequenceNumber,
) -> Vec<(Vec<u8>, VersionPointer)> {
self.inner
.iter()
.filter_map(|entry| {
let key = entry.key();
if key.as_ref().starts_with(prefix) {
entry
.value()
.latest_visible(snapshot)
.map(|pointer| (key.as_ref().to_vec(), pointer))
} else {
None
}
})
.collect()
}
pub fn snapshot_iter(&self, snapshot: SequenceNumber) -> Vec<(Vec<u8>, Vec<VersionPointer>)> {
self.inner
.iter()
.map(|entry| (entry.key().as_ref().to_vec(), entry.value().iter_visible(snapshot)))
.collect()
}
}