use {
crate::{
ContentHash,
Ident,
database::{
GenerationEpoch,
HasPartition,
Partition,
Partitions,
RecordHandle,
partitions::BluegumStores as _,
},
hash::IdentHashState,
record::Record,
},
serde::Serialize,
std::collections::BTreeMap,
};
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize,
)]
pub struct ChunkId(pub(crate) ContentHash);
pub type ChunkIdHashSet = std::collections::HashSet<ChunkId, IdentHashState>;
pub type ChunkIdHashMap<V> =
std::collections::HashMap<ChunkId, V, IdentHashState>;
pub type SortKey = String;
pub struct Chunk<P: Partitions> {
pub(crate) id: ChunkId,
pub(crate) task_id: Ident,
pub(crate) parent_task_id: Option<Ident>,
pub(crate) commit_time: u64,
pub(crate) record_count: usize,
pub(crate) index:
crate::SpannedIdentHashMap<BTreeMap<P::SortKey, ContentHash>>,
pub(crate) storage: P::Stores,
pub(crate) clear_prefixes: Vec<(crate::SpannedIdent, P::SortKey)>,
pub(crate) pending_sources: Vec<(crate::SourceKey, crate::Source)>,
pub(crate) staged_intervals:
crate::SpannedIdentHashMap<Vec<(P::SortKey, crate::Span, ContentHash)>>,
}
impl<P: Partitions> std::fmt::Debug for Chunk<P> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Chunk")
.field("id", &self.id)
.field("task_id", &self.task_id)
.field("parent_task_id", &self.parent_task_id)
.field("commit_time", &self.commit_time)
.field("record_count", &self.record_count)
.finish_non_exhaustive()
}
}
impl<P: Partitions> Chunk<P> {
pub fn get_hash(
&self,
partition_key: &crate::SpannedIdent,
sort_key: &P::SortKey,
) -> Option<ContentHash> {
self.index.get(partition_key)?.get(sort_key).copied()
}
pub fn records(
&self,
) -> &crate::SpannedIdentHashMap<BTreeMap<P::SortKey, ContentHash>> {
&self.index
}
pub fn is_empty(&self) -> bool {
self.record_count == 0 && self.index.values().all(BTreeMap::is_empty)
}
pub fn storage(&self) -> &P::Stores {
&self.storage
}
pub fn clear_prefixes(&self) -> &[(crate::SpannedIdent, P::SortKey)] {
&self.clear_prefixes
}
pub(crate) fn task_id(&self) -> Ident {
self.task_id
}
pub(crate) fn index(
&self,
) -> &crate::SpannedIdentHashMap<BTreeMap<P::SortKey, ContentHash>> {
&self.index
}
pub(crate) fn staged_intervals(
&self,
) -> &crate::SpannedIdentHashMap<Vec<(P::SortKey, crate::Span, ContentHash)>>
{
&self.staged_intervals
}
pub(crate) fn has_pending_sources(&self) -> bool {
!self.pending_sources.is_empty()
}
pub(crate) fn take_sources(&mut self) -> Vec<(crate::SourceKey, crate::Source)> {
std::mem::take(&mut self.pending_sources)
}
pub fn bluegum_tree(
&self,
state: &(dyn crate::SourceResolver + 'static),
) -> bluegum::Builder
where
P::Stores: crate::database::partitions::BluegumStores,
{
let mut b = bluegum::Builder::new();
b.name("Chunk")
.field("id", self.id.0)
.field("task_id", self.task_id)
.field("record_count", self.record_count);
if let Some(parent) = &self.parent_task_id {
b.field("parent_task_id", parent);
}
if !self.clear_prefixes.is_empty() {
let prefixes: Vec<String> = self
.clear_prefixes
.iter()
.map(|(pk, prefix)| format!("{}:{}", pk.ident(), crate::database::partitions::PartitionSortKey::resolve(prefix, state)))
.collect();
b.field("clear_prefixes", format!("{:?}", prefixes));
}
let partition_nodes = self.storage.bluegum_partition_nodes(state);
if !partition_nodes.is_empty() {
b.add_nodes_of_builders("partitions", partition_nodes);
}
b
}
pub fn error_diagnostic_source_keys(&self) -> Vec<crate::SourceKey> {
use crate::database::PartitionKey as _;
use crate::record::LaburnumRecordRef as _;
let diagnostics_key = crate::partitions::diagnostics::Diagnostics::KEY;
let mut result = Vec::new();
let Some(diag_entries) = self.index.get(&diagnostics_key) else {
return result;
};
for content_hash in diag_entries.values() {
let Some(record_ref) =
P::get_any(&self.storage, diagnostics_key, *content_hash)
else {
continue;
};
let Some(diag) = record_ref.as_dyn_diagnostic() else {
continue;
};
if diag.severity() != Some(crate::protocol::lsp::DiagnosticSeverity::ERROR)
{
continue;
}
if let Some(source_key) = diag.source_key()
&& !result.contains(&source_key)
{
result.push(source_key);
}
}
result
}
}
pub struct RecordWriter<P: Partitions> {
task_id: Ident,
parent_task_id: Option<Ident>,
index: crate::SpannedIdentHashMap<BTreeMap<P::SortKey, ContentHash>>,
storage: P::Stores,
record_count: usize,
clear_prefixes: Vec<(crate::SpannedIdent, P::SortKey)>,
pending_sources: Vec<(crate::SourceKey, crate::Source)>,
staged_intervals:
crate::SpannedIdentHashMap<Vec<(P::SortKey, crate::Span, ContentHash)>>,
}
impl<P: Partitions> RecordWriter<P> {
pub fn new(task_id: Ident) -> Self {
Self {
task_id,
parent_task_id: None,
index: crate::SpannedIdentHashMap::default(),
storage: P::new_stores(),
record_count: 0,
clear_prefixes: Vec::new(),
pending_sources: Vec::new(),
staged_intervals: crate::SpannedIdentHashMap::default(),
}
}
pub fn with_parent_task_id(mut self, parent_task_id: Option<Ident>) -> Self {
self.parent_task_id = parent_task_id;
self
}
pub(crate) fn store<Part>(&mut self, record: Part::Record) -> RecordHandle<Part>
where
Part: Partition,
P::Stores: HasPartition<Part>,
{
let content_hash = record.content_hash();
let (handle, _is_new_to_chunk) = <P::Stores as HasPartition<Part>>::store(&self.storage)
.insert(content_hash, record, GenerationEpoch::default());
self.record_count += 1;
handle
}
pub(crate) fn index_entry<Part>(&mut self, sort_key: Part::SortKey, entry: Part::IndexEntry)
where
Part: Partition + crate::database::partitions::SortKeyOf<P>,
Part::IndexEntry: crate::database::partitions::IndexedEntry,
P::Stores: HasPartition<Part>,
{
use crate::database::partitions::IndexedEntry as _;
let content_hash = entry.content_hash();
let enum_key =
<Part as crate::database::partitions::SortKeyOf<P>>::wrap_sort_key(sort_key.clone());
self
.index
.entry(Part::KEY)
.or_default()
.insert(enum_key, content_hash);
<P::Stores as HasPartition<Part>>::store(&self.storage)
.index_insert(sort_key, entry);
}
pub(crate) fn insert<Part>(&mut self, sort_key: Part::SortKey, record: Part::Record) -> RecordHandle<Part>
where
Part: Partition + crate::database::partitions::SortKeyOf<P>,
P::Stores: HasPartition<Part>,
{
let content_hash = record.content_hash();
let handle = self.store::<Part>(record);
let enum_key =
<Part as crate::database::partitions::SortKeyOf<P>>::wrap_sort_key(sort_key.clone());
self
.index
.entry(Part::KEY)
.or_default()
.insert(enum_key, content_hash);
let entry = <Part as Partition>::index_entry_from_handle(handle);
<P::Stores as HasPartition<Part>>::store(&self.storage)
.index_insert(sort_key, entry);
handle
}
pub fn len(&self) -> usize {
self.record_count
}
pub fn is_empty(&self) -> bool {
self.record_count == 0 && self.index.values().all(BTreeMap::is_empty)
}
pub fn clear_prefix<Part>(&mut self, prefix: Part::SortKey)
where
Part: Partition + crate::database::partitions::SortKeyOf<P>,
{
let enum_key =
<Part as crate::database::partitions::SortKeyOf<P>>::wrap_sort_key(prefix);
self.clear_prefixes.push((Part::KEY, enum_key));
}
pub fn clear_prefixes(&self) -> &[(crate::SpannedIdent, P::SortKey)] {
&self.clear_prefixes
}
pub fn index_span<Part: Partition + crate::database::partitions::SortKeyOf<P> + 'static>(
&mut self,
sort_key: Part::SortKey,
span: crate::Span,
content_hash: ContentHash,
) {
let enum_key =
<Part as crate::database::partitions::SortKeyOf<P>>::wrap_sort_key(sort_key);
self.staged_intervals.entry(Part::KEY).or_default()
.push((enum_key, span, content_hash));
}
pub(crate) fn set_source(&mut self, key: crate::SourceKey, source: crate::Source) {
self.pending_sources.push((key, source));
}
pub fn build(self) -> Chunk<P> {
let mut hasher = ContentHash::hasher();
hasher.update(&self.task_id.to_bytes());
for (partition_key, sort_keys) in &self.index {
hasher.update(&partition_key.ident().to_bytes());
for (sort_key, hash) in sort_keys {
std::hash::Hash::hash(sort_key, &mut hasher);
hasher.update(&hash.0.to_le_bytes());
}
}
let id = ChunkId(hasher.finalize());
Chunk {
id,
task_id: self.task_id,
parent_task_id: self.parent_task_id,
commit_time: 0,
record_count: self.record_count,
index: self.index,
storage: self.storage,
clear_prefixes: self.clear_prefixes,
pending_sources: self.pending_sources,
staged_intervals: self.staged_intervals,
}
}
}