tsink 0.10.2

A lightweight embedded time-series database with a straightforward API
Documentation
use super::super::super::core_impl::PersistedSealedBudgetContext;
use super::super::super::MemoryDeltaBytes;
use super::super::super::*;

#[derive(Clone, Copy)]
pub(in crate::engine::storage_engine) struct BudgetEnforcementContext<'a> {
    budget_bytes: &'a AtomicU64,
    used_bytes: &'a AtomicU64,
    persisted_sealed: PersistedSealedBudgetContext<'a>,
}

impl<'a> BudgetEnforcementContext<'a> {
    fn tracked_bytes(counter: &AtomicU64) -> usize {
        counter.load(Ordering::Acquire).min(usize::MAX as u64) as usize
    }

    fn sealed_chunk_is_present_in_persisted_chunks(
        persisted_chunks: Option<&[PersistedChunkRef]>,
        key: SealedChunkKey,
        chunk: &Chunk,
    ) -> bool {
        persisted_chunks.is_some_and(|persisted_chunks| {
            persisted_chunks.iter().any(|chunk_ref| {
                chunk_ref.min_ts == key.min_ts
                    && chunk_ref.max_ts == key.max_ts
                    && chunk_ref.point_count == key.point_count
                    && chunk_ref.lane == chunk.header.lane
                    && chunk_ref.ts_codec == chunk.header.ts_codec
                    && chunk_ref.value_codec == chunk.header.value_codec
            })
        })
    }

    pub(in crate::engine::storage_engine) fn budget_value(self) -> usize {
        Self::tracked_bytes(self.budget_bytes)
    }

    pub(in crate::engine::storage_engine) fn used_value(self) -> usize {
        Self::tracked_bytes(self.used_bytes)
    }

    pub(in crate::engine::storage_engine) fn can_reclaim_persisted_memory(self) -> bool {
        self.persisted_sealed.has_persisted_lanes
    }

    pub(in crate::engine::storage_engine) fn lock_backpressure(
        self,
    ) -> parking_lot::MutexGuard<'a, ()> {
        self.persisted_sealed.backpressure_lock.lock()
    }

    fn find_oldest_evictable_sealed_chunk(self) -> Option<(usize, SeriesId, SealedChunkKey)> {
        let persisted = self.persisted_sealed.persisted_chunk_watermarks.read();
        let persisted_index = self.persisted_sealed.persisted_index.read();
        let mut oldest = None;

        for (shard_idx, shard) in self.persisted_sealed.sealed_chunks.iter().enumerate() {
            let sealed = shard.read();
            for (series_id, chunks) in sealed.iter() {
                let persisted_sequence = persisted.get(series_id).copied().unwrap_or(0);
                let persisted_chunks = persisted_index
                    .chunk_refs
                    .get(series_id)
                    .map(|chunks| chunks.as_slice());
                for (key, chunk) in chunks {
                    if key.sequence > persisted_sequence
                        || !Self::sealed_chunk_is_present_in_persisted_chunks(
                            persisted_chunks,
                            *key,
                            chunk,
                        )
                    {
                        continue;
                    }
                    if oldest
                        .map(|(_, _, current): (usize, SeriesId, SealedChunkKey)| {
                            key.sequence < current.sequence
                        })
                        .unwrap_or(true)
                    {
                        oldest = Some((shard_idx, *series_id, *key));
                    }
                }
            }
        }

        oldest
    }

    fn evict_oldest_persisted_sealed_chunk(self) -> bool {
        let Some((shard_idx, series_id, key)) = self.find_oldest_evictable_sealed_chunk() else {
            return false;
        };

        let mut sealed = self.persisted_sealed.sealed_chunks[shard_idx].write();
        let Some(chunks) = sealed.get_mut(&series_id) else {
            return false;
        };
        let removed_chunk = chunks.remove(&key);
        if chunks.is_empty() {
            sealed.remove(&series_id);
        }
        let removed = removed_chunk.is_some();
        if self.persisted_sealed.memory.accounting_enabled {
            if let Some(chunk) = removed_chunk.as_ref() {
                self.persisted_sealed.memory.account_memory_delta(
                    shard_idx,
                    MemoryDeltaBytes::from_totals(0, ChunkStorage::chunk_memory_usage_bytes(chunk)),
                );
            }
        }
        if removed {
            self.persisted_sealed
                .flush_metrics
                .evicted_sealed_chunks_total
                .fetch_add(1, Ordering::Relaxed);
        }
        removed
    }

    pub(in crate::engine::storage_engine) fn evict_persisted_sealed_chunks_to_budget_locked(
        self,
        budget: usize,
    ) {
        let mut used = self.used_value();
        while used > budget {
            if !self.evict_oldest_persisted_sealed_chunk() {
                break;
            }
            used = self.used_value();
        }
    }

    pub(in crate::engine::storage_engine) fn evict_persisted_sealed_chunks(self) -> usize {
        let persisted = self.persisted_sealed.persisted_chunk_watermarks.read();
        let persisted_index = self.persisted_sealed.persisted_index.read();
        let mut evicted = 0usize;

        for (shard_idx, shard) in self.persisted_sealed.sealed_chunks.iter().enumerate() {
            let mut sealed = shard.write();
            let mut removed_bytes = 0usize;
            sealed.retain(|series_id, chunks| {
                let persisted_sequence = persisted.get(series_id).copied().unwrap_or(0);
                let persisted_chunks = persisted_index
                    .chunk_refs
                    .get(series_id)
                    .map(|chunks| chunks.as_slice());

                chunks.retain(|key, chunk| {
                    let remove = key.sequence <= persisted_sequence
                        && Self::sealed_chunk_is_present_in_persisted_chunks(
                            persisted_chunks,
                            *key,
                            chunk,
                        );
                    if remove {
                        evicted = evicted.saturating_add(1);
                        if self.persisted_sealed.memory.accounting_enabled {
                            removed_bytes = removed_bytes
                                .saturating_add(ChunkStorage::chunk_memory_usage_bytes(chunk));
                        }
                    }
                    !remove
                });

                !chunks.is_empty()
            });
            if self.persisted_sealed.memory.accounting_enabled {
                self.persisted_sealed.memory.account_memory_delta(
                    shard_idx,
                    MemoryDeltaBytes::from_totals(0, removed_bytes),
                );
            }
        }

        evicted
    }
}

impl ChunkStorage {
    pub(in crate::engine::storage_engine) fn budget_enforcement_context(
        &self,
    ) -> BudgetEnforcementContext<'_> {
        BudgetEnforcementContext {
            budget_bytes: &self.memory.budget_bytes,
            used_bytes: &self.memory.used_bytes,
            persisted_sealed: self.persisted_sealed_budget_context(),
        }
    }
}