use {
crate::{
ContentHash,
Ident,
database::{
GenerationEpoch,
HasPartition,
Partition,
Partitions,
RecordHandle,
partitions::BluegumStores as _,
},
hash::{
IdentHashMap,
IdentHashState,
},
record::Record,
},
serde::Serialize,
std::collections::BTreeMap,
};
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize,
)]
pub struct ChunkId(pub(crate) ContentHash);
pub type ChunkIdHashSet = std::collections::HashSet<ChunkId, IdentHashState>;
pub type ChunkIdHashMap<V> =
std::collections::HashMap<ChunkId, V, IdentHashState>;
pub type SortKey = String;
pub struct Chunk<P: Partitions> {
pub(crate) id: ChunkId,
pub(crate) task_id: Ident,
pub(crate) parent_task_id: Option<Ident>,
pub(crate) commit_time: u64,
pub(crate) record_count: usize,
pub(crate) index: IdentHashMap<BTreeMap<SortKey, ContentHash>>,
pub(crate) storage: P::Stores,
pub(crate) clear_prefixes: Vec<(Ident, String)>,
pub(crate) pending_sources: Vec<(crate::SourceKey, crate::Source)>,
pub(crate) staged_intervals: IdentHashMap<Vec<(crate::Span, ContentHash)>>,
}
impl<P: Partitions> std::fmt::Debug for Chunk<P> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Chunk")
.field("id", &self.id)
.field("task_id", &self.task_id)
.field("parent_task_id", &self.parent_task_id)
.field("commit_time", &self.commit_time)
.field("record_count", &self.record_count)
.field("index", &self.index)
.finish_non_exhaustive()
}
}
impl<P: Partitions> Chunk<P> {
pub fn get_hash(
&self,
partition_key: &Ident,
sort_key: &str,
) -> Option<ContentHash> {
self.index.get(partition_key)?.get(sort_key).copied()
}
pub fn records(&self) -> &IdentHashMap<BTreeMap<SortKey, ContentHash>> {
&self.index
}
pub fn is_empty(&self) -> bool {
self.record_count == 0
}
pub fn storage(&self) -> &P::Stores {
&self.storage
}
pub fn clear_prefixes(&self) -> &[(Ident, String)] {
&self.clear_prefixes
}
pub(crate) fn task_id(&self) -> Ident {
self.task_id
}
pub(crate) fn index(&self) -> &IdentHashMap<BTreeMap<SortKey, ContentHash>> {
&self.index
}
pub(crate) fn staged_intervals(&self) -> &IdentHashMap<Vec<(crate::Span, ContentHash)>> {
&self.staged_intervals
}
pub(crate) fn has_pending_sources(&self) -> bool {
!self.pending_sources.is_empty()
}
pub(crate) fn take_sources(&mut self) -> Vec<(crate::SourceKey, crate::Source)> {
std::mem::take(&mut self.pending_sources)
}
pub fn bluegum_tree(
&self,
state: &(dyn crate::SpanResolver + 'static),
) -> bluegum::Builder
where
P::Stores: crate::database::partitions::BluegumStores,
{
let mut b = bluegum::Builder::new();
b.name("Chunk")
.field("id", self.id.0)
.field("task_id", self.task_id)
.field("record_count", self.record_count);
if let Some(parent) = &self.parent_task_id {
b.field("parent_task_id", parent);
}
if !self.clear_prefixes.is_empty() {
let prefixes: Vec<String> = self
.clear_prefixes
.iter()
.map(|(pk, prefix)| format!("{}:{}", pk, prefix))
.collect();
b.field("clear_prefixes", format!("{:?}", prefixes));
}
let partition_nodes = self.storage.bluegum_partition_nodes(state);
if !partition_nodes.is_empty() {
b.add_nodes_of_builders("partitions", partition_nodes);
}
b
}
pub fn error_diagnostic_source_keys(&self) -> Vec<crate::SourceKey> {
use crate::database::PartitionKey as _;
use crate::record::LaburnumRecordRef as _;
let diagnostics_key = crate::partitions::diagnostics::Diagnostics::KEY;
let mut result = Vec::new();
let Some(diag_entries) = self.index.get(&diagnostics_key) else {
return result;
};
for content_hash in diag_entries.values() {
let Some(record_ref) =
P::get_any(&self.storage, diagnostics_key, *content_hash)
else {
continue;
};
let Some(diag) = record_ref.as_dyn_diagnostic() else {
continue;
};
if diag.severity() != Some(crate::protocol::lsp::DiagnosticSeverity::ERROR)
{
continue;
}
if let Some(source_key) = diag.source_key()
&& !result.contains(&source_key)
{
result.push(source_key);
}
}
result
}
}
pub struct RecordWriter<P: Partitions> {
task_id: Ident,
parent_task_id: Option<Ident>,
index: IdentHashMap<BTreeMap<SortKey, ContentHash>>,
storage: P::Stores,
record_count: usize,
clear_prefixes: Vec<(Ident, String)>,
pending_sources: Vec<(crate::SourceKey, crate::Source)>,
staged_intervals: IdentHashMap<Vec<(crate::Span, ContentHash)>>,
}
impl<P: Partitions> RecordWriter<P> {
pub fn new(task_id: Ident) -> Self {
Self {
task_id,
parent_task_id: None,
index: IdentHashMap::default(),
storage: P::new_stores(),
record_count: 0,
clear_prefixes: Vec::new(),
pending_sources: Vec::new(),
staged_intervals: IdentHashMap::default(),
}
}
pub fn with_parent_task_id(mut self, parent_task_id: Option<Ident>) -> Self {
self.parent_task_id = parent_task_id;
self
}
pub(crate) fn store<Part>(&mut self, record: Part::Record) -> RecordHandle<Part>
where
Part: Partition,
P::Stores: HasPartition<Part>,
{
let content_hash = record.content_hash();
let (handle, _is_new_to_chunk) = <P::Stores as HasPartition<Part>>::store(&self.storage)
.insert(content_hash, record, GenerationEpoch::default());
self.record_count += 1;
handle
}
pub(crate) fn index_entry<Part, SK>(&mut self, sort_key: SK, entry: Part::IndexEntry)
where
Part: Partition,
Part::IndexEntry: crate::database::partitions::IndexedEntry,
SK: std::fmt::Display,
P::Stores: HasPartition<Part>,
{
use crate::database::partitions::IndexedEntry as _;
let sort_key = sort_key.to_string();
let content_hash = entry.content_hash();
self
.index
.entry(Part::KEY)
.or_default()
.insert(sort_key.clone(), content_hash);
<P::Stores as HasPartition<Part>>::store(&self.storage)
.index_insert(sort_key, entry);
}
pub(crate) fn insert<Part, SK>(&mut self, sort_key: SK, record: Part::Record) -> RecordHandle<Part>
where
Part: Partition,
SK: std::fmt::Display,
P::Stores: HasPartition<Part>,
{
let content_hash = record.content_hash();
let handle = self.store::<Part>(record);
let sort_key = sort_key.to_string();
self
.index
.entry(Part::KEY)
.or_default()
.insert(sort_key.clone(), content_hash);
let entry = <Part as Partition>::index_entry_from_handle(handle);
<P::Stores as HasPartition<Part>>::store(&self.storage)
.index_insert(sort_key, entry);
handle
}
pub fn len(&self) -> usize {
self.record_count
}
pub fn is_empty(&self) -> bool {
self.record_count == 0
}
pub fn clear_prefix(&mut self, partition_key: Ident, prefix: impl std::fmt::Display) {
self.clear_prefixes.push((partition_key, prefix.to_string()));
}
pub fn clear_prefixes(&self) -> &[(Ident, String)] {
&self.clear_prefixes
}
pub fn index_span<Part: Partition + crate::database::PartitionKey + 'static>(
&mut self,
span: crate::Span,
content_hash: ContentHash,
) {
self.staged_intervals.entry(Part::KEY).or_default().push((span, content_hash));
}
pub(crate) fn set_source(&mut self, key: crate::SourceKey, source: crate::Source) {
self.pending_sources.push((key, source));
}
pub fn build(self) -> Chunk<P> {
let mut hasher = ContentHash::hasher();
hasher.update(&self.task_id.0.to_le_bytes());
for (partition_key, sort_keys) in &self.index {
hasher.update(&partition_key.0.to_le_bytes());
for (sort_key, hash) in sort_keys {
hasher.update(sort_key.as_bytes());
hasher.update(&hash.0.to_le_bytes());
}
}
let id = ChunkId(hasher.finalize());
Chunk {
id,
task_id: self.task_id,
parent_task_id: self.parent_task_id,
commit_time: 0,
record_count: self.record_count,
index: self.index,
storage: self.storage,
clear_prefixes: self.clear_prefixes,
pending_sources: self.pending_sources,
staged_intervals: self.staged_intervals,
}
}
}