use {
crate::{
Ident,
database::query::SortKeyCondition,
hash::{ContentHashSet, IdentHashState},
},
dashmap::DashMap,
serde::{Deserialize, Serialize},
std::{
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
task::Waker,
},
};
pub struct CommitResult<P: Partitions> {
pub new_hashes: Vec<ContentHashRef>,
pub inserted_keys: crate::SpannedIdentHashMap<Vec<RecordKey<P>>>,
pub deleted_keys: crate::SpannedIdentHashMap<Vec<RecordKey<P>>>,
pub deferred_decrements: Vec<reaper::DeferredDecrement>,
}
impl<P: Partitions> CommitResult<P> {
pub fn affected_partition_keys(
&self,
) -> impl Iterator<Item = crate::SpannedIdent> + '_ {
let inserted_pks = self.inserted_keys.keys().copied();
let deleted_pks = self.deleted_keys.keys().copied();
let mut seen =
std::collections::HashSet::<crate::SpannedIdent>::new();
inserted_pks
.chain(deleted_pks)
.filter(move |pk| seen.insert(*pk))
}
pub fn all_inserted_keys(&self) -> impl Iterator<Item = &RecordKey<P>> + '_ {
self.inserted_keys.values().flat_map(|v| v.iter())
}
pub fn all_deleted_keys(&self) -> impl Iterator<Item = &RecordKey<P>> + '_ {
self.deleted_keys.values().flat_map(|v| v.iter())
}
pub fn has_changes(&self) -> bool {
!self.inserted_keys.is_empty() || !self.deleted_keys.is_empty()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ContentHashRef {
pub partition: crate::SpannedIdent,
pub hash: crate::ContentHash,
}
impl ContentHashRef {
pub fn new(
partition: crate::SpannedIdent,
hash: crate::ContentHash,
) -> Self {
Self { partition, hash }
}
}
#[derive(
Debug,
Clone,
Copy,
PartialEq,
Eq,
PartialOrd,
Ord,
Hash,
Default,
Serialize,
Deserialize,
)]
#[serde(transparent)]
pub struct GenerationEpoch(u64);
impl GenerationEpoch {
pub const fn new(value: u64) -> Self {
Self(value)
}
pub const fn get(self) -> u64 {
self.0
}
}
pub mod chunk;
pub mod gc;
pub mod handle;
pub mod hlist;
pub mod partitions;
pub mod query;
pub mod query_results;
pub mod reaper;
pub mod record_key;
pub mod stats;
pub mod storage;
pub use handle::RecordHandle;
pub(crate) use partitions::InternalPartitionWriteContext;
pub use partitions::{
CollectIndexHashes, DynPartition, DynPartitionRecord, HasPartition,
HasPartitionAt, MergeFrom, MergeResult, NewStores, Partition, PartitionKey,
PartitionReader, PartitionSortKey, PartitionStore, PartitionStoreInner,
PartitionWriteContextRef, PartitionWriter, RecordRef, RefcountOps,
};
pub use record_key::RecordKey;
pub use storage::{ContentAddressedStorage, Partitions};
pub(crate) type PartitionWakers<P> = Arc<
DashMap<
Ident,
Vec<(SortKeyCondition<<P as Partitions>::SortKey>, Waker)>,
IdentHashState,
>,
>;
pub struct Database<P: Partitions> {
pub(crate) cas: Arc<ContentAddressedStorage<P>>,
pub(crate) current_epoch: Arc<AtomicU64>,
pub(crate) partition_wakers: PartitionWakers<P>,
}
impl<P: Partitions> Clone for Database<P> {
fn clone(&self) -> Self {
Self {
cas: Arc::clone(&self.cas),
current_epoch: Arc::clone(&self.current_epoch),
partition_wakers: Arc::clone(&self.partition_wakers),
}
}
}
impl<P: Partitions> Database<P> {
pub fn new() -> Self {
Self {
cas: Arc::new(ContentAddressedStorage::new()),
current_epoch: Arc::new(AtomicU64::new(0)),
partition_wakers: Arc::new(DashMap::with_hasher(IdentHashState)),
}
}
pub fn get_current_epoch(&self) -> GenerationEpoch {
GenerationEpoch::new(self.current_epoch.load(Ordering::SeqCst))
}
pub(crate) fn get_store(&self) -> &ContentAddressedStorage<P> {
&self.cas
}
pub(crate) fn index_get(
&self,
partition_key: crate::SpannedIdent,
sort_key: &P::SortKey,
) -> Option<crate::ContentHash> {
self.cas.index_get(partition_key, sort_key)
}
pub(crate) fn index_all(
&self,
partition_key: crate::SpannedIdent,
) -> Vec<(Ident, P::SortKey, crate::ContentHash)> {
self.cas.index_all(partition_key)
}
pub(crate) fn index_range(
&self,
partition_key: crate::SpannedIdent,
prefix: &P::SortKey,
) -> Vec<(Ident, P::SortKey, crate::ContentHash)> {
self.cas.index_range(partition_key, prefix)
}
pub(crate) fn index_less_than(
&self,
partition_key: crate::SpannedIdent,
value: &P::SortKey,
inclusive: bool,
) -> Vec<(Ident, P::SortKey, crate::ContentHash)> {
self.cas.index_less_than(partition_key, value, inclusive)
}
pub(crate) fn index_greater_than(
&self,
partition_key: crate::SpannedIdent,
value: &P::SortKey,
inclusive: bool,
) -> Vec<(Ident, P::SortKey, crate::ContentHash)> {
self.cas.index_greater_than(partition_key, value, inclusive)
}
pub(crate) fn index_between(
&self,
partition_key: crate::SpannedIdent,
from: &P::SortKey,
to: &P::SortKey,
) -> Vec<(Ident, P::SortKey, crate::ContentHash)> {
self.cas.index_between(partition_key, from, to)
}
pub(crate) fn collect_index_hashes(&self) -> Vec<ContentHashRef> {
self.cas.collect_index_hashes()
}
pub(crate) fn span_index_query(
&self,
partition_key: crate::SpannedIdent,
uri: &crate::Uri,
byte_offset: u64,
) -> Option<crate::ContentHash> {
self.cas.span_index_query(partition_key, uri, byte_offset)
}
pub(crate) fn register_partition_waker(
&self,
partition_key: crate::SpannedIdent,
condition: SortKeyCondition<P::SortKey>,
waker: Waker,
) {
self
.partition_wakers
.entry(partition_key.ident())
.or_default()
.push((condition, waker));
}
fn wake_partition_waiters(&self, result: &CommitResult<P>) {
for pk in result.affected_partition_keys() {
if let Some((_, wakers)) = self.partition_wakers.remove(&pk.ident()) {
let mut unmatched = Vec::new();
for (condition, waker) in wakers {
let inserted_keys = result.inserted_keys.get(&pk);
let deleted_keys = result.deleted_keys.get(&pk);
let any_match = inserted_keys
.into_iter()
.flat_map(|keys| keys.iter())
.chain(deleted_keys.into_iter().flat_map(|keys| keys.iter()))
.any(|rk| condition.matches(rk.sort_key()));
if any_match {
waker.wake();
} else {
unmatched.push((condition, waker));
}
}
if !unmatched.is_empty() {
self
.partition_wakers
.entry(pk.ident())
.or_default()
.extend(unmatched);
}
}
}
}
pub fn commit_chunk(
&self,
chunk: chunk::Chunk<P>,
source_cache: &crate::source::cache::reporter::SourceCacheReader,
) -> CommitResult<P> {
let epoch =
GenerationEpoch::new(self.current_epoch.fetch_add(1, Ordering::SeqCst));
let mut deferred_decrements = Vec::new();
let mut cleared_keys: Vec<RecordKey<P>> = Vec::new();
let mut removed_span_keys: crate::SpannedIdentHashMap<Vec<P::SortKey>> =
crate::SpannedIdentHashMap::default();
for (partition_key, prefix) in chunk.clear_prefixes().iter() {
debug_assert!(
crate::database::partitions::PartitionSortKey::debug_is_valid(prefix),
"task produced a clear-prefix keyed by an invalid sort key",
);
for (_pk, sort_key, old_hash) in
self.cas.index_range(*partition_key, prefix)
{
removed_span_keys
.entry(*partition_key)
.or_default()
.push(sort_key.clone());
cleared_keys.push(RecordKey::new(*partition_key, sort_key));
deferred_decrements.push(reaper::DeferredDecrement {
partition: *partition_key,
hash: old_hash,
from_epoch: epoch,
});
}
self.cas.index_remove_prefix(*partition_key, prefix);
}
let mut inserted_key_hash_pairs: Vec<(RecordKey<P>, crate::ContentHash)> =
Vec::new();
for (partition_key, sort_keys) in chunk.index().iter() {
for (sort_key, content_hash) in sort_keys.iter() {
debug_assert!(
crate::database::partitions::PartitionSortKey::debug_is_valid(
sort_key
),
"task produced an index entry keyed by an invalid sort key",
);
if let Some(old_hash) = self.cas.index_get(*partition_key, sort_key)
&& old_hash != *content_hash
{
deferred_decrements.push(reaper::DeferredDecrement {
partition: *partition_key,
hash: old_hash,
from_epoch: epoch,
});
}
self.cas.increment_refcount(*partition_key, *content_hash);
inserted_key_hash_pairs.push((
RecordKey::new(*partition_key, sort_key.clone()),
*content_hash,
));
}
}
self.apply_span_indexes(
chunk.staged_intervals(),
&removed_span_keys,
source_cache,
);
let merge_result =
P::merge_stores(self.cas.stores(), chunk.storage(), epoch);
let new_hashes = merge_result.new_hashes;
let inserted_flat: Vec<RecordKey<P>> = {
let new_hash_set: ContentHashSet =
new_hashes.iter().map(|r| r.hash).collect();
inserted_key_hash_pairs
.into_iter()
.filter(|(_key, hash)| new_hash_set.contains(hash))
.map(|(key, _hash)| key)
.collect()
};
let mut inserted_keys: crate::SpannedIdentHashMap<Vec<RecordKey<P>>> =
crate::SpannedIdentHashMap::default();
for key in &inserted_flat {
inserted_keys
.entry(key.partition_key_spanned())
.or_default()
.push(key.clone());
}
let mut deleted_keys: crate::SpannedIdentHashMap<Vec<RecordKey<P>>> =
crate::SpannedIdentHashMap::default();
for key in cleared_keys {
if !inserted_flat.contains(&key) {
deleted_keys
.entry(key.partition_key_spanned())
.or_default()
.push(key);
}
}
let result = CommitResult {
new_hashes,
inserted_keys,
deleted_keys,
deferred_decrements,
};
self.wake_partition_waiters(&result);
result
}
fn apply_span_indexes(
&self,
staged_intervals: &crate::SpannedIdentHashMap<
Vec<(P::SortKey, crate::Span, crate::ContentHash)>,
>,
removed_span_keys: &crate::SpannedIdentHashMap<Vec<P::SortKey>>,
source_cache: &crate::source::cache::reporter::SourceCacheReader,
) {
let mut partition_keys: std::collections::HashSet<crate::SpannedIdent> =
std::collections::HashSet::default();
partition_keys.extend(staged_intervals.keys().copied());
partition_keys.extend(removed_span_keys.keys().copied());
for partition_key in partition_keys {
let mut upserts: Vec<(
P::SortKey,
crate::Uri,
u64,
u64,
crate::ContentHash,
)> = Vec::new();
if let Some(intervals) = staged_intervals.get(&partition_key) {
for (sort_key, span, content_hash) in intervals {
debug_assert!(
crate::database::partitions::PartitionSortKey::debug_is_valid(
sort_key
),
"task produced a span-indexed entry keyed by an invalid sort key",
);
let source_key = span.source_key();
let Some(uri) = source_cache.get_uri(source_key.file_id()) else {
continue;
};
let Some(source_view) = source_cache.get_source(source_key) else {
continue;
};
let Some(span_data) = span.data(source_view.span_cache()) else {
continue;
};
upserts.push((
sort_key.clone(),
uri,
span_data.start as u64,
span_data.len as u64,
*content_hash,
));
}
}
let empty: Vec<P::SortKey> = Vec::new();
let removed = removed_span_keys.get(&partition_key).unwrap_or(&empty);
self.cas.span_index_apply(partition_key, removed, upserts);
}
}
}
impl<P: Partitions> Default for Database<P> {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
pub mod tests;