use {
crate::{
Ident,
database::query::SortKeyCondition,
hash::{ContentHashSet, IdentHashMap, IdentHashState, ident::HashSetExt},
},
dashmap::DashMap,
serde::{Deserialize, Serialize},
std::{
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
task::Waker,
},
};
pub struct CommitResult {
pub new_hashes: Vec<ContentHashRef>,
pub inserted_keys: IdentHashMap<Vec<RecordKey>>,
pub deleted_keys: IdentHashMap<Vec<RecordKey>>,
pub deferred_decrements: Vec<reaper::DeferredDecrement>,
}
impl CommitResult {
pub fn affected_partition_keys(&self) -> impl Iterator<Item = Ident> + '_ {
let inserted_pks = self.inserted_keys.keys().copied();
let deleted_pks = self.deleted_keys.keys().copied();
let mut seen = crate::IdentHashSet::new();
inserted_pks.chain(deleted_pks).filter(move |pk| seen.insert(*pk))
}
pub fn all_inserted_keys(&self) -> impl Iterator<Item = &RecordKey> + '_ {
self.inserted_keys.values().flat_map(|v| v.iter())
}
pub fn all_deleted_keys(&self) -> impl Iterator<Item = &RecordKey> + '_ {
self.deleted_keys.values().flat_map(|v| v.iter())
}
pub fn has_changes(&self) -> bool {
!self.inserted_keys.is_empty() || !self.deleted_keys.is_empty()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ContentHashRef {
pub partition: Ident,
pub hash: crate::ContentHash,
}
impl ContentHashRef {
pub fn new(partition: Ident, hash: crate::ContentHash) -> Self {
Self { partition, hash }
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default, Serialize, Deserialize)]
#[serde(transparent)]
pub struct GenerationEpoch(u64);
impl GenerationEpoch {
pub const fn new(value: u64) -> Self {
Self(value)
}
pub const fn get(self) -> u64 {
self.0
}
}
pub mod chunk;
pub mod gc;
pub mod handle;
pub mod hlist;
pub mod partitions;
pub mod query;
pub mod query_results;
pub mod reaper;
pub mod record_key;
pub mod stats;
pub mod storage;
pub use handle::RecordHandle;
pub use record_key::RecordKey;
pub use partitions::{
CollectIndexHashes, DynPartition, DynPartitionRecord, HasPartition, HasPartitionAt,
MergeFrom, MergeResult, NewStores, Partition, PartitionKey, PartitionReader, PartitionStore,
PartitionStoreInner, PartitionWriteContextRef, PartitionWriter, RecordRef, RefcountOps,
};
pub(crate) use partitions::InternalPartitionWriteContext;
pub use storage::{ContentAddressedStorage, Partitions};
pub(crate) type PartitionWakers =
Arc<DashMap<Ident, Vec<(SortKeyCondition, Waker)>, IdentHashState>>;
pub struct Database<P: Partitions> {
pub(crate) cas: Arc<ContentAddressedStorage<P>>,
pub(crate) current_epoch: Arc<AtomicU64>,
pub(crate) partition_wakers: PartitionWakers,
}
impl<P: Partitions> Clone for Database<P> {
fn clone(&self) -> Self {
Self {
cas: Arc::clone(&self.cas),
current_epoch: Arc::clone(&self.current_epoch),
partition_wakers: Arc::clone(&self.partition_wakers),
}
}
}
impl<P: Partitions> Database<P> {
pub fn new() -> Self {
Self {
cas: Arc::new(ContentAddressedStorage::new()),
current_epoch: Arc::new(AtomicU64::new(0)),
partition_wakers: Arc::new(DashMap::with_hasher(IdentHashState)),
}
}
pub fn get_current_epoch(&self) -> GenerationEpoch {
GenerationEpoch::new(self.current_epoch.load(Ordering::SeqCst))
}
pub(crate) fn get_store(&self) -> &ContentAddressedStorage<P> {
&self.cas
}
pub(crate) fn index_get(
&self,
partition_key: Ident,
sort_key: &str,
) -> Option<crate::ContentHash> {
self.cas.index_get(partition_key, sort_key)
}
pub(crate) fn index_range(
&self,
partition_key: Ident,
prefix: &str,
) -> Vec<(Ident, String, crate::ContentHash)> {
self.cas.index_range(partition_key, prefix)
}
pub(crate) fn index_less_than(
&self,
partition_key: Ident,
value: &str,
inclusive: bool,
) -> Vec<(Ident, String, crate::ContentHash)> {
self.cas.index_less_than(partition_key, value, inclusive)
}
pub(crate) fn index_greater_than(
&self,
partition_key: Ident,
value: &str,
inclusive: bool,
) -> Vec<(Ident, String, crate::ContentHash)> {
self.cas.index_greater_than(partition_key, value, inclusive)
}
pub(crate) fn index_between(
&self,
partition_key: Ident,
from: &str,
to: &str,
) -> Vec<(Ident, String, crate::ContentHash)> {
self.cas.index_between(partition_key, from, to)
}
pub(crate) fn collect_index_hashes(&self) -> Vec<ContentHashRef> {
self.cas.collect_index_hashes()
}
pub(crate) fn span_index_query(
&self,
partition_key: Ident,
uri: &crate::Uri,
byte_offset: u64,
) -> Option<crate::ContentHash> {
self.cas.span_index_query(partition_key, uri, byte_offset)
}
pub(crate) fn register_partition_waker(
&self,
partition_key: Ident,
condition: SortKeyCondition,
waker: Waker,
) {
self
.partition_wakers
.entry(partition_key)
.or_default()
.push((condition, waker));
}
fn wake_partition_waiters(&self, result: &CommitResult) {
for pk in result.affected_partition_keys() {
if let Some((_, wakers)) = self.partition_wakers.remove(&pk) {
let mut unmatched = Vec::new();
for (condition, waker) in wakers {
let inserted_keys = result.inserted_keys.get(&pk);
let deleted_keys = result.deleted_keys.get(&pk);
let any_match = inserted_keys
.into_iter()
.flat_map(|keys| keys.iter())
.chain(deleted_keys.into_iter().flat_map(|keys| keys.iter()))
.any(|rk| condition.matches(rk.sort_key()));
if any_match {
waker.wake();
} else {
unmatched.push((condition, waker));
}
}
if !unmatched.is_empty() {
self.partition_wakers.entry(pk).or_default().extend(unmatched);
}
}
}
}
pub fn commit_chunk(
&self,
chunk: chunk::Chunk<P>,
source_cache: &crate::source::cache::reporter::SourceCacheReader,
) -> CommitResult {
let epoch = GenerationEpoch::new(self.current_epoch.fetch_add(1, Ordering::SeqCst));
let mut deferred_decrements = Vec::new();
let mut cleared_keys: Vec<RecordKey> = Vec::new();
for (partition_key, prefix) in chunk.clear_prefixes().iter() {
for (_pk, sort_key, old_hash) in self.cas.index_range(*partition_key, prefix) {
cleared_keys.push(RecordKey::new(*partition_key, sort_key));
deferred_decrements.push(reaper::DeferredDecrement {
partition: *partition_key,
hash: old_hash,
from_epoch: epoch,
});
}
self.cas.index_remove_prefix(*partition_key, prefix);
}
let mut inserted_key_hash_pairs: Vec<(RecordKey, crate::ContentHash)> = Vec::new();
for (partition_key, sort_keys) in chunk.index().iter() {
for (sort_key, content_hash) in sort_keys.iter() {
if let Some(old_hash) = self.cas.index_get(*partition_key, sort_key)
&& old_hash != *content_hash
{
deferred_decrements.push(reaper::DeferredDecrement {
partition: *partition_key,
hash: old_hash,
from_epoch: epoch,
});
}
self.cas.increment_refcount(*partition_key, *content_hash);
inserted_key_hash_pairs.push((
RecordKey::new(*partition_key, sort_key.clone()),
*content_hash,
));
}
}
self.build_span_indexes(chunk.staged_intervals(), source_cache);
let merge_result = P::merge_stores(self.cas.stores(), chunk.storage(), epoch);
let new_hashes = merge_result.new_hashes;
let inserted_flat: Vec<RecordKey> = {
let new_hash_set: ContentHashSet = new_hashes.iter().map(|r| r.hash).collect();
inserted_key_hash_pairs
.into_iter()
.filter(|(_key, hash)| new_hash_set.contains(hash))
.map(|(key, _hash)| key)
.collect()
};
let mut inserted_keys: IdentHashMap<Vec<RecordKey>> = IdentHashMap::default();
for key in &inserted_flat {
inserted_keys.entry(key.partition_key()).or_default().push(key.clone());
}
let mut deleted_keys: IdentHashMap<Vec<RecordKey>> = IdentHashMap::default();
for key in cleared_keys {
if !inserted_flat.contains(&key) {
deleted_keys.entry(key.partition_key()).or_default().push(key);
}
}
let result = CommitResult {
new_hashes,
inserted_keys,
deleted_keys,
deferred_decrements,
};
self.wake_partition_waiters(&result);
result
}
fn build_span_indexes(
&self,
staged_intervals: &crate::hash::IdentHashMap<Vec<(crate::Span, crate::ContentHash)>>,
source_cache: &crate::source::cache::reporter::SourceCacheReader,
) {
use std::collections::HashMap;
for (partition_key, intervals) in staged_intervals.iter() {
let mut by_uri: HashMap<crate::Uri, Vec<(u64, u64, crate::ContentHash)>> = HashMap::new();
for (span, content_hash) in intervals {
let source_key = span.source_key();
let Some(uri) = source_cache.get_uri(source_key.file_id()) else {
continue;
};
let Some(source_view) = source_cache.get_source(source_key) else {
continue;
};
let Some(span_data) = span.data(source_view.span_cache()) else {
continue;
};
by_uri.entry(uri).or_default().push((
span_data.start as u64,
span_data.len as u64,
*content_hash,
));
}
for (uri, entries) in by_uri {
let index = crate::database::partitions::span_index::SpanIndex::build(entries);
self.cas.span_index_replace(*partition_key, uri, index);
}
}
}
}
impl<P: Partitions> Default for Database<P> {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
pub mod tests;