tsink 0.10.2

A lightweight embedded time-series database with a straightforward API
Documentation
use super::super::super::core_impl::{
    ChunkContext, LifecyclePublicationContext, MaterializedSeriesReadContext,
    MetadataShardPublicationContext, PersistedSealedBudgetContext, RegistryBookkeepingContext,
    RegistryMemoryContext, VisibilityCacheReadContext,
};
use super::super::super::*;

#[derive(Clone, Copy)]
struct VisibilityMemoryMeasurementContext<'a> {
    materialized_series: MaterializedSeriesReadContext<'a>,
    visibility_cache: VisibilityCacheReadContext<'a>,
}

impl<'a> VisibilityMemoryMeasurementContext<'a> {
    fn metadata_visibility_memory_usage_bytes(self) -> usize {
        let materialized_series = self.materialized_series.materialized_series.read();
        let summaries = self.visibility_cache.series_visibility_summaries.read();
        let visible_cache = self.visibility_cache.series_visible_max_timestamps.read();
        let bounded_visible_cache = self
            .visibility_cache
            .series_visible_bounded_max_timestamps
            .read();

        ChunkStorage::btree_set_series_id_memory_usage_bytes(&materialized_series).saturating_add(
            ChunkStorage::series_visibility_state_memory_usage_bytes(
                &summaries,
                &visible_cache,
                &bounded_visible_cache,
            ),
        )
    }
}

#[derive(Clone, Copy)]
struct TombstoneMemoryMeasurementContext<'a> {
    tombstones: &'a RwLock<crate::engine::tombstone::TombstoneMap>,
}

impl<'a> TombstoneMemoryMeasurementContext<'a> {
    fn tombstone_memory_usage_bytes(self) -> usize {
        let tombstones = self.tombstones.read();
        ChunkStorage::tombstone_map_memory_usage_bytes(&tombstones)
    }
}

impl<'a> ChunkContext<'a> {
    pub(super) fn reconciled_shard_memory_usage_bytes(self, shard_idx: usize) -> usize {
        let active = self.active_builders[shard_idx].read();
        let active_total = active.values().fold(0usize, |acc, state| {
            acc.saturating_add(ChunkStorage::active_state_memory_usage_bytes_reconciled(
                state,
            ))
        });

        let sealed = self.sealed_chunks[shard_idx].read();
        let sealed_total = sealed.values().fold(0usize, |series_acc, chunks| {
            series_acc.saturating_add(chunks.values().fold(0usize, |chunk_acc, chunk| {
                chunk_acc.saturating_add(ChunkStorage::chunk_memory_usage_bytes(chunk))
            }))
        });

        active_total.saturating_add(sealed_total)
    }
}

impl<'a> RegistryMemoryContext<'a> {
    pub(super) fn measured_registry_memory_usage_bytes(self) -> usize {
        self.registry.read().memory_usage_bytes()
    }
}

impl<'a> RegistryBookkeepingContext<'a> {
    pub(super) fn pending_series_memory_usage_bytes(self) -> usize {
        let pending = self.pending_series_ids.read();
        ChunkStorage::btree_set_series_id_memory_usage_bytes(&pending)
    }
}

impl<'a> MetadataShardPublicationContext<'a> {
    pub(super) fn metadata_shard_index_memory_usage_bytes(self) -> usize {
        self.metadata_shard_index
            .map(ChunkStorage::metadata_shard_index_memory_usage_bytes)
            .unwrap_or(0)
    }
}

impl<'a> PersistedSealedBudgetContext<'a> {
    pub(super) fn persisted_chunk_watermarks_memory_usage_bytes(self) -> usize {
        let persisted = self.persisted_chunk_watermarks.read();
        ChunkStorage::hash_map_memory_usage_bytes::<SeriesId, u64>(&persisted)
    }
}

impl<'a> LifecyclePublicationContext<'a> {
    pub(super) fn measured_persisted_index_memory_usage_bytes(self) -> usize {
        let persisted_index = self.persisted_index.read();
        ChunkStorage::persisted_index_included_memory_usage_bytes(&persisted_index)
    }

    pub(super) fn measured_persisted_mmap_memory_usage_bytes(self) -> usize {
        let persisted_index = self.persisted_index.read();
        ChunkStorage::persisted_index_persisted_mmap_bytes(&persisted_index)
    }
}

#[derive(Clone, Copy)]
pub(super) struct MemoryAccountingContext<'a> {
    budget_bytes: &'a AtomicU64,
    used_bytes: &'a AtomicU64,
    used_bytes_by_shard: &'a [AtomicU64; IN_MEMORY_SHARD_COUNT],
    registry_used_bytes: &'a AtomicU64,
    metadata_used_bytes: &'a AtomicU64,
    persisted_index_used_bytes: &'a AtomicU64,
    persisted_mmap_used_bytes: &'a AtomicU64,
    tombstone_used_bytes: &'a AtomicU64,
    shared_used_bytes: &'a AtomicU64,
    chunks: ChunkContext<'a>,
    registry_memory: RegistryMemoryContext<'a>,
    registry_bookkeeping: RegistryBookkeepingContext<'a>,
    visibility: VisibilityMemoryMeasurementContext<'a>,
    metadata_shards: MetadataShardPublicationContext<'a>,
    persisted_sealed: PersistedSealedBudgetContext<'a>,
    lifecycle: LifecyclePublicationContext<'a>,
    tombstones: TombstoneMemoryMeasurementContext<'a>,
}

impl<'a> MemoryAccountingContext<'a> {
    fn tracked_bytes(counter: &AtomicU64) -> usize {
        counter.load(Ordering::Acquire).min(usize::MAX as u64) as usize
    }

    fn store_tracked_bytes(counter: &AtomicU64, bytes: usize) {
        counter.store(saturating_u64_from_usize(bytes), Ordering::Release);
    }

    pub(super) fn budget_value(self) -> usize {
        Self::tracked_bytes(self.budget_bytes)
    }

    pub(super) fn used_value(self) -> usize {
        Self::tracked_bytes(self.used_bytes)
    }

    pub(super) fn component_value(component: &AtomicU64) -> usize {
        Self::tracked_bytes(component)
    }

    pub(super) fn active_and_sealed_used_value(self) -> usize {
        self.used_bytes_by_shard.iter().fold(0usize, |acc, shard| {
            acc.saturating_add(Self::component_value(shard))
        })
    }

    fn sync_shard_memory_usage(self, shard_idx: usize) -> usize {
        let shard_used = self.chunks.reconciled_shard_memory_usage_bytes(shard_idx);
        Self::store_tracked_bytes(&self.used_bytes_by_shard[shard_idx], shard_used);
        shard_used
    }

    fn metadata_memory_usage_bytes(self) -> usize {
        self.registry_bookkeeping
            .pending_series_memory_usage_bytes()
            .saturating_add(self.visibility.metadata_visibility_memory_usage_bytes())
            .saturating_add(
                self.persisted_sealed
                    .persisted_chunk_watermarks_memory_usage_bytes(),
            )
            .saturating_add(
                self.metadata_shards
                    .metadata_shard_index_memory_usage_bytes(),
            )
    }

    pub(super) fn refresh_memory_usage(self) -> usize {
        let mut active_and_sealed_used = 0usize;
        for shard_idx in 0..IN_MEMORY_SHARD_COUNT {
            active_and_sealed_used =
                active_and_sealed_used.saturating_add(self.sync_shard_memory_usage(shard_idx));
        }

        let registry_used = self.registry_memory.measured_registry_memory_usage_bytes();
        let metadata_used = self.metadata_memory_usage_bytes();
        let persisted_index_used = self.lifecycle.measured_persisted_index_memory_usage_bytes();
        let persisted_mmap_used = self.lifecycle.measured_persisted_mmap_memory_usage_bytes();
        let tombstone_used = self.tombstones.tombstone_memory_usage_bytes();

        let shared_used = registry_used
            .saturating_add(metadata_used)
            .saturating_add(persisted_index_used)
            .saturating_add(persisted_mmap_used)
            .saturating_add(tombstone_used);
        let used = active_and_sealed_used.saturating_add(shared_used);

        Self::store_tracked_bytes(self.registry_used_bytes, registry_used);
        Self::store_tracked_bytes(self.metadata_used_bytes, metadata_used);
        Self::store_tracked_bytes(self.persisted_index_used_bytes, persisted_index_used);
        Self::store_tracked_bytes(self.persisted_mmap_used_bytes, persisted_mmap_used);
        Self::store_tracked_bytes(self.tombstone_used_bytes, tombstone_used);
        Self::store_tracked_bytes(self.shared_used_bytes, shared_used);
        Self::store_tracked_bytes(self.used_bytes, used);
        used
    }
}

impl ChunkStorage {
    pub(super) fn memory_accounting_context(&self) -> MemoryAccountingContext<'_> {
        MemoryAccountingContext {
            budget_bytes: &self.memory.budget_bytes,
            used_bytes: &self.memory.used_bytes,
            used_bytes_by_shard: &self.memory.used_bytes_by_shard,
            registry_used_bytes: &self.memory.registry_used_bytes,
            metadata_used_bytes: &self.memory.metadata_used_bytes,
            persisted_index_used_bytes: &self.memory.persisted_index_used_bytes,
            persisted_mmap_used_bytes: &self.memory.persisted_mmap_used_bytes,
            tombstone_used_bytes: &self.memory.tombstone_used_bytes,
            shared_used_bytes: &self.memory.shared_used_bytes,
            chunks: self.chunk_context(),
            registry_memory: self.registry_memory_context(),
            registry_bookkeeping: self.registry_bookkeeping_context(),
            visibility: VisibilityMemoryMeasurementContext {
                materialized_series: self.materialized_series_read_context(),
                visibility_cache: self.visibility_cache_read_context(),
            },
            metadata_shards: self.metadata_shard_publication_context(),
            persisted_sealed: self.persisted_sealed_budget_context(),
            lifecycle: self.lifecycle_publication_context(),
            tombstones: TombstoneMemoryMeasurementContext {
                tombstones: &self.visibility.tombstones,
            },
        }
    }
}