use std::collections::{BTreeMap, HashMap};
use std::hash::{DefaultHasher, Hash, Hasher};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use parking_lot::RwLock;
use roaring::RoaringTreemap;
use crate::Label;
mod dictionary;
mod identity;
mod persistence;
mod postings;
#[cfg(test)]
mod tests;
mod value_family;
pub type SeriesId = u64;
pub type DictionaryId = u32;
const REGISTRY_INDEX_MAGIC: [u8; 4] = *b"RIDX";
const REGISTRY_INDEX_VERSION: u16 = 2;
const REGISTRY_SECTION_VALUE_FAMILY: u64 = 0b0000_0001;
pub const REGISTRY_INCREMENTAL_FILE_NAME: &str = "series_index.delta.bin";
pub const REGISTRY_INCREMENTAL_DIR_NAME: &str = "series_index.delta.d";
const REGISTRY_INCREMENTAL_SEGMENT_PREFIX: &str = "delta-";
const REGISTRY_INCREMENTAL_SEGMENT_SUFFIX: &str = ".bin";
static REGISTRY_INCREMENTAL_SEGMENT_COUNTER: AtomicU64 = AtomicU64::new(1);
const SERIES_REGISTRY_SHARD_COUNT: usize = 64;
#[derive(Debug)]
pub struct LoadedSeriesRegistry {
pub registry: SeriesRegistry,
pub delta_series_count: usize,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum SeriesValueFamily {
F64,
I64,
U64,
Bool,
Blob,
Histogram,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct LabelPairId {
pub name_id: DictionaryId,
pub value_id: DictionaryId,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct SeriesKeyIds {
metric_id: DictionaryId,
label_pairs: Vec<LabelPairId>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SeriesDefinition {
pub series_id: SeriesId,
pub metric_id: DictionaryId,
pub label_pairs: Vec<LabelPairId>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SeriesKey {
pub metric: String,
pub labels: Vec<Label>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SeriesResolution {
pub series_id: SeriesId,
pub metric_id: DictionaryId,
pub label_pairs: Vec<LabelPairId>,
pub created: bool,
}
#[derive(Debug, Default)]
struct StringDictionary {
by_value: HashMap<String, DictionaryId>,
by_id: Vec<String>,
estimated_heap_bytes: usize,
}
#[derive(Debug, Default)]
struct SeriesRegistryShard {
by_key: HashMap<SeriesKeyIds, SeriesId>,
by_id: HashMap<SeriesId, SeriesDefinition>,
value_families: HashMap<SeriesId, SeriesValueFamily>,
estimated_series_bytes: usize,
}
#[derive(Debug, Default)]
struct SeriesIdShardIndex {
series_id_to_registry_shard: HashMap<SeriesId, usize>,
}
#[derive(Debug, Default)]
struct AllSeriesPostingsShard {
series_ids: RoaringTreemap,
estimated_postings_bytes: usize,
}
#[derive(Debug, Clone)]
struct MissingLabelPostingsCacheEntry {
bitmap: RoaringTreemap,
postings_generation: u64,
}
#[derive(Debug, Default, Clone)]
struct LabelNamePostingsState {
present: RoaringTreemap,
missing_cache: Option<MissingLabelPostingsCacheEntry>,
bucket_count: usize,
}
#[derive(Debug, Default)]
struct MetricPostingsShard {
metric_postings: HashMap<DictionaryId, RoaringTreemap>,
estimated_postings_bytes: usize,
}
#[derive(Debug, Default)]
struct LabelPostingsShard {
label_name_states: HashMap<DictionaryId, LabelNamePostingsState>,
postings: BTreeMap<LabelPairId, RoaringTreemap>,
estimated_postings_bytes: usize,
}
#[derive(Debug)]
pub struct SeriesRegistry {
next_series_id: AtomicU64,
pending_series_reservations: AtomicUsize,
estimated_total_bytes: AtomicUsize,
postings_generation: AtomicU64,
metric_dict: RwLock<StringDictionary>,
label_name_dict: RwLock<StringDictionary>,
label_value_dict: RwLock<StringDictionary>,
series_shards: [RwLock<SeriesRegistryShard>; SERIES_REGISTRY_SHARD_COUNT],
series_id_shards: [RwLock<SeriesIdShardIndex>; SERIES_REGISTRY_SHARD_COUNT],
all_series_shards: [RwLock<AllSeriesPostingsShard>; SERIES_REGISTRY_SHARD_COUNT],
metric_postings_shards: [RwLock<MetricPostingsShard>; SERIES_REGISTRY_SHARD_COUNT],
label_postings_shards: [RwLock<LabelPostingsShard>; SERIES_REGISTRY_SHARD_COUNT],
series_count: AtomicUsize,
}
impl Default for SeriesRegistry {
fn default() -> Self {
Self::new()
}
}
impl SeriesRegistry {
pub fn new() -> Self {
Self {
next_series_id: AtomicU64::new(1),
pending_series_reservations: AtomicUsize::new(0),
estimated_total_bytes: AtomicUsize::new(0),
postings_generation: AtomicU64::new(0),
metric_dict: RwLock::new(StringDictionary::default()),
label_name_dict: RwLock::new(StringDictionary::default()),
label_value_dict: RwLock::new(StringDictionary::default()),
series_shards: std::array::from_fn(|_| RwLock::new(SeriesRegistryShard::default())),
series_id_shards: std::array::from_fn(|_| RwLock::new(SeriesIdShardIndex::default())),
all_series_shards: std::array::from_fn(|_| {
RwLock::new(AllSeriesPostingsShard::default())
}),
metric_postings_shards: std::array::from_fn(|_| {
RwLock::new(MetricPostingsShard::default())
}),
label_postings_shards: std::array::from_fn(|_| {
RwLock::new(LabelPostingsShard::default())
}),
series_count: AtomicUsize::new(0),
}
}
fn series_label_pairs_memory_bytes(label_pairs: &[LabelPairId]) -> usize {
label_pairs
.len()
.saturating_mul(std::mem::size_of::<LabelPairId>())
.saturating_mul(2)
}
pub(crate) fn value_family_entry_bytes() -> usize {
std::mem::size_of::<SeriesId>() + std::mem::size_of::<SeriesValueFamily>()
}
fn label_name_postings_count_entry_bytes() -> usize {
std::mem::size_of::<DictionaryId>() + std::mem::size_of::<usize>()
}
fn bitmap_memory_usage_bytes(bitmap: &RoaringTreemap) -> usize {
if bitmap.is_empty() {
0
} else {
bitmap.serialized_size()
}
}
fn recompute_series_bytes(&self) -> usize {
self.series_shards.iter().fold(0usize, |acc, shard| {
acc.saturating_add(shard.read().estimated_series_bytes)
})
}
fn recompute_postings_bytes(&self) -> usize {
let all_series_bytes = self.all_series_shards.iter().fold(0usize, |acc, shard| {
acc.saturating_add(shard.read().estimated_postings_bytes)
});
let metric_bytes = self
.metric_postings_shards
.iter()
.fold(0usize, |acc, shard| {
acc.saturating_add(shard.read().estimated_postings_bytes)
});
let label_bytes = self
.label_postings_shards
.iter()
.fold(0usize, |acc, shard| {
acc.saturating_add(shard.read().estimated_postings_bytes)
});
all_series_bytes
.saturating_add(metric_bytes)
.saturating_add(label_bytes)
}
fn key_shard_idx_for_key(key: &SeriesKeyIds) -> usize {
let mut hasher = DefaultHasher::new();
key.hash(&mut hasher);
(hasher.finish() as usize) % SERIES_REGISTRY_SHARD_COUNT
}
fn metric_postings_shard_idx(metric_id: DictionaryId) -> usize {
(metric_id as usize) % SERIES_REGISTRY_SHARD_COUNT
}
fn label_postings_shard_idx(name_id: DictionaryId) -> usize {
(name_id as usize) % SERIES_REGISTRY_SHARD_COUNT
}
fn all_series_shard_idx(series_id: SeriesId) -> usize {
(series_id as usize) % SERIES_REGISTRY_SHARD_COUNT
}
fn series_id_index_shard_idx(series_id: SeriesId) -> usize {
(series_id as usize) % SERIES_REGISTRY_SHARD_COUNT
}
fn next_series_id_value(&self) -> SeriesId {
self.next_series_id.load(Ordering::Acquire).max(1)
}
fn load_series_registry_shard_idx(&self, series_id: SeriesId) -> Option<usize> {
self.series_id_shards[Self::series_id_index_shard_idx(series_id)]
.read()
.series_id_to_registry_shard
.get(&series_id)
.copied()
}
pub fn memory_usage_bytes(&self) -> usize {
self.estimated_total_bytes.load(Ordering::Acquire)
}
fn recompute_memory_usage_bytes(&self) -> usize {
self.metric_dict
.read()
.memory_usage_bytes()
.saturating_add(self.label_name_dict.read().memory_usage_bytes())
.saturating_add(self.label_value_dict.read().memory_usage_bytes())
.saturating_add(self.recompute_series_bytes())
.saturating_add(self.recompute_postings_bytes())
}
fn add_estimated_memory_bytes(&self, bytes: usize) {
if bytes == 0 {
return;
}
let mut current = self.estimated_total_bytes.load(Ordering::Acquire);
loop {
let next = current.saturating_add(bytes);
match self.estimated_total_bytes.compare_exchange(
current,
next,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => return,
Err(observed) => current = observed,
}
}
}
fn bump_postings_generation(&self) {
self.postings_generation.fetch_add(1, Ordering::AcqRel);
}
fn sub_estimated_memory_bytes(&self, bytes: usize) {
if bytes == 0 {
return;
}
let mut current = self.estimated_total_bytes.load(Ordering::Acquire);
loop {
let next = current.saturating_sub(bytes);
match self.estimated_total_bytes.compare_exchange(
current,
next,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => return,
Err(observed) => current = observed,
}
}
}
fn refresh_estimated_memory_usage_bytes(&self) -> usize {
let total = self.recompute_memory_usage_bytes();
self.estimated_total_bytes.store(total, Ordering::Release);
total
}
}