use std::{
collections::{hash_map::Entry, HashMap},
sync::Arc,
};
use itertools::Itertools;
use parking_lot::RwLock;
use crate::engine::block::{manager::BlockId, serde::Sequence};
#[derive(Debug, Clone)]
pub enum Index {
Address(EntryAddress),
Tombstone(Sequence),
}
impl Index {
fn sequence(&self) -> Sequence {
match self {
Index::Address(addr) => addr.sequence,
Index::Tombstone(seq) => *seq,
}
}
}
#[derive(Debug, Clone)]
pub struct HashedEntryAddress {
pub hash: u64,
pub address: EntryAddress,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EntryAddress {
pub block: BlockId,
pub offset: u32,
pub len: u32,
pub sequence: Sequence,
}
type IndexerShard = HashMap<u64, Index>;
#[derive(Debug, Clone)]
pub struct Indexer {
shards: Arc<Vec<RwLock<IndexerShard>>>,
}
impl Indexer {
pub fn new(shards: usize) -> Self {
let shards = (0..shards).map(|_| RwLock::new(HashMap::new())).collect_vec();
Self {
shards: Arc::new(shards),
}
}
#[cfg_attr(
feature = "tracing",
fastrace::trace(name = "foyer::storage::block::indexer::insert_tombstone")
)]
pub fn insert_tombstone(&self, hash: u64, sequence: Sequence) -> Option<EntryAddress> {
let shard = self.shard(hash);
let mut shard = self.shards[shard].write();
self.insert_inner(&mut shard, hash, Index::Tombstone(sequence))
}
#[cfg_attr(
feature = "tracing",
fastrace::trace(name = "foyer::storage::block::indexer::insert_batch")
)]
pub fn insert_batch(&self, batch: Vec<HashedEntryAddress>) -> Vec<HashedEntryAddress> {
let shards: HashMap<usize, Vec<HashedEntryAddress>> =
batch.into_iter().into_group_map_by(|haddr| self.shard(haddr.hash));
let mut olds = vec![];
for (s, batch) in shards {
let mut shard = self.shards[s].write();
for haddr in batch {
if let Some(old) = self.insert_inner(&mut shard, haddr.hash, Index::Address(haddr.address)) {
olds.push(HashedEntryAddress {
hash: haddr.hash,
address: old,
});
}
}
}
olds
}
#[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::storage::block::indexer::get"))]
pub fn get(&self, hash: u64) -> Option<EntryAddress> {
let shard = self.shard(hash);
match self.shards[shard].read().get(&hash) {
Some(index) => match index {
Index::Address(addr) => Some(addr.clone()),
Index::Tombstone(_) => None,
},
None => None,
}
}
#[cfg_attr(
feature = "tracing",
fastrace::trace(name = "foyer::storage::block::indexer::remove")
)]
pub fn remove(&self, hash: u64) -> Option<EntryAddress> {
let shard = self.shard(hash);
match self.shards[shard].write().entry(hash) {
Entry::Occupied(o) => match o.get() {
Index::Address(_) => self.extract_address(o.remove()),
Index::Tombstone(_) => None,
},
Entry::Vacant(_) => None,
}
}
#[cfg_attr(
feature = "tracing",
fastrace::trace(name = "foyer::storage::block::indexer::remove_batch")
)]
pub fn remove_batch<I>(&self, batch: I) -> Vec<EntryAddress>
where
I: IntoIterator<Item = (u64, Sequence)>,
{
let shards = batch.into_iter().into_group_map_by(|(hash, _)| self.shard(*hash));
let mut olds = vec![];
for (s, hashes) in shards {
let mut shard = self.shards[s].write();
for (hash, sequence) in hashes {
match shard.entry(hash) {
Entry::Occupied(o) => {
if sequence >= o.get().sequence() {
if let Some(addr) = self.extract_address(o.remove()) {
olds.push(addr);
}
}
}
Entry::Vacant(_) => {}
}
}
}
olds
}
#[cfg_attr(feature = "tracing", fastrace::trace(name = "foyer::storage::block::indexer::clear"))]
pub fn clear(&self) {
self.shards.iter().for_each(|shard| shard.write().clear());
}
#[inline(always)]
fn shard(&self, hash: u64) -> usize {
hash as usize % self.shards.len()
}
fn insert_inner(&self, shard: &mut IndexerShard, hash: u64, index: Index) -> Option<EntryAddress> {
match shard.entry(hash) {
Entry::Occupied(mut o) => {
if index.sequence() >= o.get().sequence() {
self.extract_address(o.insert(index))
} else {
self.extract_address(index)
}
}
Entry::Vacant(v) => {
v.insert(index);
None
}
}
}
fn extract_address(&self, index: Index) -> Option<EntryAddress> {
match index {
Index::Address(addr) => Some(addr),
Index::Tombstone(_) => None,
}
}
}