use {
super::Partition,
crate::{
ContentHash, Uri,
database::{
GenerationEpoch,
handle::RecordHandle,
hlist::{HCons, HNil, Here, There},
},
hash::{
ContentHashMap, ContentHashMapExt,
},
},
parking_lot::{RwLock, RwLockReadGuard},
std::{collections::{BTreeMap, HashMap}, marker::PhantomData},
};
use {
crate::record::CollectReferences,
std::{
borrow::Borrow,
sync::atomic::{AtomicUsize, Ordering},
},
};
pub struct PartitionStoreInner<P: Partition> {
pub(crate) records: ContentHashMap<(P::Record, GenerationEpoch)>,
pub(crate) index: BTreeMap<String, P::IndexEntry>,
pub(crate) refcounts: ContentHashMap<AtomicUsize>,
pub(crate) span_indexes: HashMap<Uri, super::span_index::SpanIndex>,
}
impl<P: Partition> PartitionStoreInner<P> {
pub fn new() -> Self {
Self {
records: ContentHashMap::new(),
index: BTreeMap::new(),
refcounts: ContentHashMap::new(),
span_indexes: HashMap::new(),
}
}
}
impl<P: Partition> Default for PartitionStoreInner<P> {
fn default() -> Self {
Self::new()
}
}
pub struct RecordRef<'a, P: Partition> {
guard: RwLockReadGuard<'a, PartitionStoreInner<P>>,
hash: ContentHash,
}
impl<'a, P: Partition> RecordRef<'a, P> {
pub fn record(&self) -> Option<&P::Record> {
self.guard.records.get(&self.hash).map(|(r, _)| r)
}
pub fn epoch(&self) -> Option<GenerationEpoch> {
self.guard.records.get(&self.hash).map(|(_, e)| *e)
}
pub fn value(&self) -> Option<&(P::Record, GenerationEpoch)> {
self.guard.records.get(&self.hash)
}
pub fn clone_record(&self) -> Option<P::Record>
where
P::Record: Clone,
{
self.record().cloned()
}
}
pub struct PartitionStore<P: Partition> {
inner: RwLock<PartitionStoreInner<P>>,
_marker: PhantomData<P>,
}
impl<P: Partition> PartitionStore<P> {
pub fn new() -> Self {
Self {
inner: RwLock::new(PartitionStoreInner::new()),
_marker: PhantomData,
}
}
pub fn get(&self, hash: &ContentHash) -> Option<RecordRef<'_, P>> {
let guard = self.inner.read();
if guard.records.contains_key(hash) {
Some(RecordRef { guard, hash: *hash })
} else {
None
}
}
pub fn get_by_handle(
&self,
handle: &RecordHandle<P>,
) -> Option<RecordRef<'_, P>> {
self.get(&handle.content_hash())
}
pub fn insert(
&self,
hash: ContentHash,
record: P::Record,
epoch: GenerationEpoch,
) -> (RecordHandle<P>, bool) {
use std::collections::hash_map::Entry;
let mut inner = self.inner.write();
let is_new = match inner.records.entry(hash) {
| Entry::Occupied(_) => false,
| Entry::Vacant(entry) => {
entry.insert((record, epoch));
true
},
};
(RecordHandle::new(hash), is_new)
}
pub fn len(&self) -> usize {
self.inner.read().records.len()
}
pub fn is_empty(&self) -> bool {
self.inner.read().records.is_empty()
}
pub fn retain<F>(&self, mut predicate: F)
where
F: FnMut(ContentHash, GenerationEpoch) -> bool,
{
let mut inner = self.inner.write();
inner.records.retain(|k, v| predicate(*k, v.1));
inner.records.shrink_to_fit();
}
pub fn iter_hashes(&self) -> Vec<(ContentHash, GenerationEpoch)> {
let inner = self.inner.read();
inner.records.iter().map(|(k, v)| (*k, v.1)).collect()
}
pub fn index_get(&self, sort_key: &str) -> Option<P::IndexEntry> {
let inner = self.inner.read();
inner.index.get(sort_key).cloned()
}
pub fn index_range(&self, prefix: &str) -> Vec<(String, P::IndexEntry)> {
let inner = self.inner.read();
inner
.index
.range(prefix.to_string()..)
.take_while(|(sk, _)| sk.starts_with(prefix))
.map(|(sk, entry)| (sk.clone(), entry.clone()))
.collect()
}
pub fn index_less_than(&self, value: &str, inclusive: bool) -> Vec<(String, P::IndexEntry)> {
let inner = self.inner.read();
use std::ops::Bound;
let end = if inclusive {
Bound::Included(value.to_string())
} else {
Bound::Excluded(value.to_string())
};
inner
.index
.range::<String, _>((Bound::Unbounded, end))
.map(|(sk, entry)| (sk.clone(), entry.clone()))
.collect()
}
pub fn index_greater_than(&self, value: &str, inclusive: bool) -> Vec<(String, P::IndexEntry)> {
let inner = self.inner.read();
use std::ops::Bound;
let start = if inclusive {
Bound::Included(value.to_string())
} else {
Bound::Excluded(value.to_string())
};
inner
.index
.range::<String, _>((start, Bound::Unbounded))
.map(|(sk, entry)| (sk.clone(), entry.clone()))
.collect()
}
pub fn index_between(&self, from: &str, to: &str) -> Vec<(String, P::IndexEntry)> {
let inner = self.inner.read();
inner
.index
.range(from.to_string()..=to.to_string())
.map(|(sk, entry)| (sk.clone(), entry.clone()))
.collect()
}
pub fn index_insert(&self, sort_key: String, entry: P::IndexEntry) -> Option<P::IndexEntry> {
let mut inner = self.inner.write();
inner.index.insert(sort_key, entry)
}
pub fn index_remove_prefix(&self, prefix: &str) -> Vec<P::IndexEntry> {
let mut inner = self.inner.write();
let keys_to_remove: Vec<_> = inner
.index
.range(prefix.to_string()..)
.take_while(|(sk, _)| sk.starts_with(prefix))
.map(|(sk, _)| sk.clone())
.collect();
let mut removed = Vec::with_capacity(keys_to_remove.len());
for key in keys_to_remove {
if let Some(entry) = inner.index.remove(&key) {
removed.push(entry);
}
}
removed
}
pub fn index_values(&self) -> Vec<P::IndexEntry> {
let inner = self.inner.read();
inner.index.values().cloned().collect()
}
pub fn index_hashes(&self) -> Vec<ContentHash> {
use super::IndexEntry;
let inner = self.inner.read();
inner
.index
.values()
.filter_map(|entry| entry.primary_hash())
.collect()
}
pub fn index_len(&self) -> usize {
self.inner.read().index.len()
}
pub fn index_is_empty(&self) -> bool {
self.inner.read().index.is_empty()
}
pub fn index_entries(&self) -> Vec<(String, P::IndexEntry)> {
let inner = self.inner.read();
inner
.index
.iter()
.map(|(sk, entry)| (sk.clone(), entry.clone()))
.collect()
}
pub fn drain_from<I>(
&self,
records: I,
index_entries: Vec<(String, P::IndexEntry)>,
epoch: GenerationEpoch,
) where
I: IntoIterator<Item = (ContentHash, P::Record)>,
{
let mut inner = self.inner.write();
for (hash, record) in records {
inner.records.entry(hash).or_insert((record, epoch));
}
for (sort_key, entry) in index_entries {
inner.index.insert(sort_key, entry);
}
}
pub fn increment_refcount(&self, hash: ContentHash) {
let mut inner = self.inner.write();
inner
.refcounts
.entry(hash)
.or_insert_with(|| AtomicUsize::new(0))
.fetch_add(1, Ordering::SeqCst);
}
pub fn decrement_refcount(&self, hash: ContentHash) -> usize {
let mut inner = self.inner.write();
if let Some(count) = inner.refcounts.get(&hash) {
let prev = count.fetch_sub(1, Ordering::SeqCst);
if prev == 1 {
inner.refcounts.remove(&hash);
return 0;
}
prev - 1
} else {
0
}
}
pub fn get_refcount(&self, hash: ContentHash) -> usize {
let inner = self.inner.read();
inner
.refcounts
.get(&hash)
.map(|c| c.load(Ordering::SeqCst))
.unwrap_or(0)
}
pub fn remove_record(&self, hash: ContentHash) -> Option<P::Record>
where
P::Record: Clone,
{
let mut inner = self.inner.write();
inner.records.remove(&hash).map(|(r, _)| r)
}
pub fn span_index_query(&self, uri: &Uri, byte_offset: u64) -> Option<ContentHash> {
let inner = self.inner.read();
inner.span_indexes.get(uri)?.query(byte_offset)
}
pub fn span_index_replace(&self, uri: Uri, index: super::span_index::SpanIndex) {
let mut inner = self.inner.write();
inner.span_indexes.insert(uri, index);
}
pub fn span_index_remove(&self, uri: &Uri) {
let mut inner = self.inner.write();
inner.span_indexes.remove(uri);
}
}
impl<P: Partition> Default for PartitionStore<P> {
fn default() -> Self {
Self::new()
}
}
unsafe impl<P: Partition> Send for PartitionStore<P> where P::Record: Send {}
unsafe impl<P: Partition> Sync for PartitionStore<P> where P::Record: Sync {}
pub trait HasPartitionAt<P: Partition, Idx> {
fn store_at(&self) -> &PartitionStore<P>;
}
impl<P: Partition, Tail> HasPartitionAt<P, Here>
for HCons<PartitionStore<P>, Tail>
{
fn store_at(&self) -> &PartitionStore<P> {
&self.head
}
}
impl<P: Partition, Head, Tail, Idx> HasPartitionAt<P, There<Idx>>
for HCons<Head, Tail>
where
Tail: HasPartitionAt<P, Idx>,
{
fn store_at(&self) -> &PartitionStore<P> {
self.tail.store_at()
}
}
pub trait HasPartition<P: Partition> {
fn store(&self) -> &PartitionStore<P>;
}
pub trait HasIndexPartition<Part: super::Partition> {
fn index_map(&self) -> &std::collections::BTreeMap<String, Part::IndexEntry>;
fn index_map_mut(&mut self) -> &mut std::collections::BTreeMap<String, Part::IndexEntry>;
}
pub trait NewStores {
fn new_stores() -> Self;
}
impl NewStores for HNil {
fn new_stores() -> Self {
HNil
}
}
impl<P: Partition, T: NewStores> NewStores for HCons<PartitionStore<P>, T> {
fn new_stores() -> Self {
HCons::new(PartitionStore::new(), T::new_stores())
}
}
pub struct MergeResult {
pub new_hashes: Vec<crate::database::ContentHashRef>,
}
impl MergeResult {
pub fn new() -> Self {
Self {
new_hashes: Vec::new(),
}
}
pub fn extend(&mut self, other: MergeResult) {
self.new_hashes.extend(other.new_hashes);
}
}
impl Default for MergeResult {
fn default() -> Self {
Self::new()
}
}
pub trait MergeFrom {
fn merge_from(&self, source: &Self, epoch: GenerationEpoch) -> MergeResult;
}
impl MergeFrom for HNil {
fn merge_from(&self, _source: &Self, _epoch: GenerationEpoch) -> MergeResult {
MergeResult::new()
}
}
impl<P: Partition, T: MergeFrom> MergeFrom for HCons<PartitionStore<P>, T>
where
P::Record: Clone,
{
fn merge_from(&self, source: &Self, epoch: GenerationEpoch) -> MergeResult {
let mut result = MergeResult::new();
for (hash, _) in source.head.iter_hashes() {
if let Some(record_ref) = source.head.get(&hash)
&& let Some(record) = record_ref.record()
{
let (_handle, is_new) = self.head.insert(hash, record.clone(), epoch);
if is_new {
result.new_hashes.push(crate::database::ContentHashRef::new(P::KEY, hash));
}
}
}
for (sort_key, entry) in source.head.index_entries() {
self.head.index_insert(sort_key, entry);
}
result.extend(self.tail.merge_from(&source.tail, epoch));
result
}
}
pub trait CollectIndexHashes {
fn collect_index_hashes(&self, result: &mut Vec<crate::database::ContentHashRef>);
}
impl CollectIndexHashes for HNil {
fn collect_index_hashes(&self, _result: &mut Vec<crate::database::ContentHashRef>) {}
}
impl<P: Partition, T: CollectIndexHashes> CollectIndexHashes
for HCons<PartitionStore<P>, T>
{
fn collect_index_hashes(&self, result: &mut Vec<crate::database::ContentHashRef>) {
for hash in self.head.index_hashes() {
result.push(crate::database::ContentHashRef::new(P::KEY, hash));
}
self.tail.collect_index_hashes(result);
}
}
pub trait RefcountOps {
fn increment_refcount(&self, partition: crate::Ident, hash: ContentHash);
fn decrement_refcount(
&self,
partition: crate::Ident,
hash: ContentHash,
) -> usize;
fn remove_record(&self, partition: crate::Ident, hash: ContentHash) -> bool;
fn get_refcount(&self, partition: crate::Ident, hash: ContentHash) -> usize;
}
impl RefcountOps for HNil {
fn increment_refcount(&self, _partition: crate::Ident, _hash: ContentHash) {}
fn decrement_refcount(
&self,
_partition: crate::Ident,
_hash: ContentHash,
) -> usize {
0
}
fn remove_record(
&self,
_partition: crate::Ident,
_hash: ContentHash,
) -> bool {
false
}
fn get_refcount(
&self,
_partition: crate::Ident,
_hash: ContentHash,
) -> usize {
0
}
}
impl<P: Partition, T: RefcountOps> RefcountOps for HCons<PartitionStore<P>, T>
where
P::Record: Clone,
{
fn increment_refcount(&self, partition: crate::Ident, hash: ContentHash) {
if partition == P::KEY {
self.head.increment_refcount(hash);
} else {
self.tail.increment_refcount(partition, hash);
}
}
fn decrement_refcount(
&self,
partition: crate::Ident,
hash: ContentHash,
) -> usize {
if partition == P::KEY {
self.head.decrement_refcount(hash)
} else {
self.tail.decrement_refcount(partition, hash)
}
}
fn remove_record(&self, partition: crate::Ident, hash: ContentHash) -> bool {
if partition == P::KEY {
self.head.remove_record(hash).is_some()
} else {
self.tail.remove_record(partition, hash)
}
}
fn get_refcount(&self, partition: crate::Ident, hash: ContentHash) -> usize {
if partition == P::KEY {
self.head.get_refcount(hash)
} else {
self.tail.get_refcount(partition, hash)
}
}
}
pub trait CollectCascadingRefs<P: crate::database::storage::Partitions + ?Sized> {
fn collect_cascading_refs(
&self,
partition: crate::Ident,
hash: ContentHash,
) -> Vec<(crate::Ident, ContentHash)>;
}
impl<P: crate::database::storage::Partitions> CollectCascadingRefs<P> for HNil {
fn collect_cascading_refs(
&self,
_partition: crate::Ident,
_hash: ContentHash,
) -> Vec<(crate::Ident, ContentHash)> {
Vec::new()
}
}
impl<Part, T, P> CollectCascadingRefs<P> for HCons<PartitionStore<Part>, T>
where
Part: Partition,
Part::Record: Clone + crate::record::CollectReferences<P>,
T: CollectCascadingRefs<P>,
P: crate::database::storage::Partitions,
P::Stores: HasPartition<Part>,
{
fn collect_cascading_refs(
&self,
partition: crate::Ident,
hash: ContentHash,
) -> Vec<(crate::Ident, ContentHash)> {
if partition == Part::KEY {
if let Some(record_ref) = self.head.get(&hash)
&& let Some(record) = record_ref.record()
{
let mut collector = CascadingRefCollector::<P>::new();
record.collect_references(&mut collector);
return collector.into_refs();
}
Vec::new()
} else {
self.tail.collect_cascading_refs(partition, hash)
}
}
}
pub struct CascadingRefCollector<P: crate::database::storage::Partitions> {
refs: Vec<(crate::Ident, ContentHash)>,
_marker: std::marker::PhantomData<P>,
}
impl<P: crate::database::storage::Partitions> CascadingRefCollector<P> {
pub fn new() -> Self {
Self {
refs: Vec::new(),
_marker: std::marker::PhantomData,
}
}
pub fn into_refs(self) -> Vec<(crate::Ident, ContentHash)> {
self.refs
}
}
impl<P: crate::database::storage::Partitions> Default
for CascadingRefCollector<P>
{
fn default() -> Self {
Self::new()
}
}
impl<P: crate::database::storage::Partitions> crate::record::References<P>
for CascadingRefCollector<P>
{
fn add<Part: Partition>(
&mut self,
handle: crate::database::handle::RecordHandle<Part>,
) where
P::Stores: HasPartition<Part>,
{
self.refs.push((Part::KEY, handle.content_hash()));
}
}
pub trait MergeFromWithRefcount<P: crate::database::storage::Partitions>:
MergeFrom
{
fn merge_from_with_refcount(
&self,
source: &Self,
epoch: GenerationEpoch,
) -> MergeResult
where
P::Stores: RefcountOps;
}
impl<P: crate::database::storage::Partitions> MergeFromWithRefcount<P>
for HNil
{
fn merge_from_with_refcount(
&self,
_source: &Self,
_epoch: GenerationEpoch,
) -> MergeResult
where
P::Stores: RefcountOps,
{
MergeResult::new()
}
}
impl<Part, T, P> MergeFromWithRefcount<P> for HCons<PartitionStore<Part>, T>
where
Part: Partition,
Part::Record: Clone + crate::record::CollectReferences<P>,
T: MergeFromWithRefcount<P>,
P: crate::database::storage::Partitions,
P::Stores: HasPartition<Part> + RefcountOps,
Self: std::borrow::Borrow<P::Stores>,
{
fn merge_from_with_refcount(
&self,
source: &Self,
epoch: GenerationEpoch,
) -> MergeResult
where
P::Stores: RefcountOps,
{
let mut result = MergeResult::new();
for (hash, _) in source.head.iter_hashes() {
if let Some(record_ref) = source.head.get(&hash)
&& let Some(record) = record_ref.record()
{
let (_handle, is_new) = self.head.insert(hash, record.clone(), epoch);
if is_new {
result.new_hashes.push(crate::database::ContentHashRef::new(Part::KEY, hash));
let stores: &P::Stores = self.borrow();
let mut incrementer =
crate::database::reaper::RefCountIncrementer::<P>::new(stores);
record.collect_references(&mut incrementer);
}
}
}
result.extend(self.tail.merge_from_with_refcount(&source.tail, epoch));
result
}
}
pub trait BluegumStores {
fn bluegum_partition_nodes(
&self,
state: &(dyn crate::SpanResolver + 'static),
) -> Vec<bluegum::Builder>;
}
impl BluegumStores for HNil {
fn bluegum_partition_nodes(
&self,
_state: &(dyn crate::SpanResolver + 'static),
) -> Vec<bluegum::Builder> {
Vec::new()
}
}
impl<P: Partition, T: BluegumStores> BluegumStores for HCons<PartitionStore<P>, T>
where
P::Record: Clone,
{
fn bluegum_partition_nodes(
&self,
state: &(dyn crate::SpanResolver + 'static),
) -> Vec<bluegum::Builder> {
let mut nodes = Vec::new();
let mut partition_builder = bluegum::Builder::new();
match P::KEY_NAME {
Some(name) => partition_builder.name(&format!("Partition({})", name)),
None => partition_builder.name(&format!("Partition({})", P::KEY)),
};
let entries = self.head.index_entries();
if !entries.is_empty() {
let mut entry_builders = Vec::new();
for (sort_key, entry) in &entries {
let mut eb = bluegum::Builder::new();
eb.name("IndexEntry")
.field("sort_key", sort_key);
<P::IndexEntry as bluegum::BluegumWithState<dyn crate::SpanResolver>>::node_with_state(entry, &mut eb, state);
entry_builders.push(eb);
}
partition_builder.add_nodes_of_builders("index_entries", entry_builders);
}
let inner = self.head.inner.read();
if !inner.records.is_empty() {
let mut record_builders = Vec::new();
for (hash, (record, _epoch)) in inner.records.iter() {
let mut rb = bluegum::Builder::new();
rb.field("hash", hash);
<P::Record as bluegum::BluegumWithState<dyn crate::SpanResolver>>::node_with_state(record, &mut rb, state);
record_builders.push(rb);
}
partition_builder.add_nodes_of_builders("records", record_builders);
}
nodes.push(partition_builder);
nodes.extend(self.tail.bluegum_partition_nodes(state));
nodes
}
}